http://git-wip-us.apache.org/repos/asf/hbase/blob/b14e2ab1/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManifest.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManifest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManifest.java new file mode 100644 index 0000000..d10713d --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManifest.java @@ -0,0 +1,791 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.backup.impl; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.TreeMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.backup.BackupInfo; +import org.apache.hadoop.hbase.backup.BackupType; +import org.apache.hadoop.hbase.backup.util.BackupClientUtil; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.exceptions.DeserializationException; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.BackupProtos; +import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; + +import com.google.protobuf.InvalidProtocolBufferException; + + +/** + * Backup manifest Contains all the meta data of a backup image. The manifest info will be bundled + * as manifest file together with data. So that each backup image will contain all the info needed + * for restore. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class BackupManifest { + + private static final Log LOG = LogFactory.getLog(BackupManifest.class); + + // manifest file name + public static final String MANIFEST_FILE_NAME = ".backup.manifest"; + + // manifest file version, current is 1.0 + public static final String MANIFEST_VERSION = "1.0"; + + // backup image, the dependency graph is made up by series of backup images + + public static class BackupImage implements Comparable<BackupImage> { + + private String backupId; + private BackupType type; + private String rootDir; + private List<TableName> tableList; + private long startTs; + private long completeTs; + private ArrayList<BackupImage> ancestors; + + public BackupImage() { + super(); + } + + public BackupImage(String backupId, BackupType type, String rootDir, + List<TableName> tableList, long startTs, long completeTs) { + this.backupId = backupId; + this.type = type; + this.rootDir = rootDir; + this.tableList = tableList; + this.startTs = startTs; + this.completeTs = completeTs; + } + + static BackupImage fromProto(BackupProtos.BackupImage im) { + String backupId = im.getBackupId(); + String rootDir = im.getRootDir(); + long startTs = im.getStartTs(); + long completeTs = im.getCompleteTs(); + List<HBaseProtos.TableName> tableListList = im.getTableListList(); + List<TableName> tableList = new ArrayList<TableName>(); + for(HBaseProtos.TableName tn : tableListList) { + tableList.add(ProtobufUtil.toTableName(tn)); + } + + List<BackupProtos.BackupImage> ancestorList = im.getAncestorsList(); + + BackupType type = + im.getBackupType() == BackupProtos.BackupType.FULL ? BackupType.FULL: + BackupType.INCREMENTAL; + + BackupImage image = new BackupImage(backupId, type, rootDir, tableList, startTs, completeTs); + for(BackupProtos.BackupImage img: ancestorList) { + image.addAncestor(fromProto(img)); + } + return image; + } + + BackupProtos.BackupImage toProto() { + BackupProtos.BackupImage.Builder builder = BackupProtos.BackupImage.newBuilder(); + builder.setBackupId(backupId); + builder.setCompleteTs(completeTs); + builder.setStartTs(startTs); + builder.setRootDir(rootDir); + if (type == BackupType.FULL) { + builder.setBackupType(BackupProtos.BackupType.FULL); + } else{ + builder.setBackupType(BackupProtos.BackupType.INCREMENTAL); + } + + for (TableName name: tableList) { + builder.addTableList(ProtobufUtil.toProtoTableName(name)); + } + + if (ancestors != null){ + for (BackupImage im: ancestors){ + builder.addAncestors(im.toProto()); + } + } + + return builder.build(); + } + + public String getBackupId() { + return backupId; + } + + public void setBackupId(String backupId) { + this.backupId = backupId; + } + + public BackupType getType() { + return type; + } + + public void setType(BackupType type) { + this.type = type; + } + + public String getRootDir() { + return rootDir; + } + + public void setRootDir(String rootDir) { + this.rootDir = rootDir; + } + + public List<TableName> getTableNames() { + return tableList; + } + + public void setTableList(List<TableName> tableList) { + this.tableList = tableList; + } + + public long getStartTs() { + return startTs; + } + + public void setStartTs(long startTs) { + this.startTs = startTs; + } + + public long getCompleteTs() { + return completeTs; + } + + public void setCompleteTs(long completeTs) { + this.completeTs = completeTs; + } + + public ArrayList<BackupImage> getAncestors() { + if (this.ancestors == null) { + this.ancestors = new ArrayList<BackupImage>(); + } + return this.ancestors; + } + + public void addAncestor(BackupImage backupImage) { + this.getAncestors().add(backupImage); + } + + public boolean hasAncestor(String token) { + for (BackupImage image : this.getAncestors()) { + if (image.getBackupId().equals(token)) { + return true; + } + } + return false; + } + + public boolean hasTable(TableName table) { + for (TableName t : tableList) { + if (t.equals(table)) { + return true; + } + } + return false; + } + + @Override + public int compareTo(BackupImage other) { + String thisBackupId = this.getBackupId(); + String otherBackupId = other.getBackupId(); + int index1 = thisBackupId.lastIndexOf("_"); + int index2 = otherBackupId.lastIndexOf("_"); + String name1 = thisBackupId.substring(0, index1); + String name2 = otherBackupId.substring(0, index2); + if(name1.equals(name2)) { + Long thisTS = new Long(thisBackupId.substring(index1 + 1)); + Long otherTS = new Long(otherBackupId.substring(index2 + 1)); + return thisTS.compareTo(otherTS); + } else { + return name1.compareTo(name2); + } + } + } + + // manifest version + private String version = MANIFEST_VERSION; + + // hadoop hbase configuration + protected Configuration config = null; + + // backup root directory + private String rootDir = null; + + // backup image directory + private String tableBackupDir = null; + + // backup log directory if this is an incremental backup + private String logBackupDir = null; + + // backup token + private String backupId; + + // backup type, full or incremental + private BackupType type; + + // the table list for the backup + private ArrayList<TableName> tableList; + + // actual start timestamp of the backup process + private long startTs; + + // actual complete timestamp of the backup process + private long completeTs; + + // the region server timestamp for tables: + // <table, <rs, timestamp>> + private Map<TableName, HashMap<String, Long>> incrTimeRanges; + + // dependency of this backup, including all the dependent images to do PIT recovery + private Map<String, BackupImage> dependency; + + /** + * Construct manifest for a ongoing backup. + * @param backupCtx The ongoing backup context + */ + public BackupManifest(BackupInfo backupCtx) { + this.backupId = backupCtx.getBackupId(); + this.type = backupCtx.getType(); + this.rootDir = backupCtx.getTargetRootDir(); + if (this.type == BackupType.INCREMENTAL) { + this.logBackupDir = backupCtx.getHLogTargetDir(); + } + this.startTs = backupCtx.getStartTs(); + this.completeTs = backupCtx.getEndTs(); + this.loadTableList(backupCtx.getTableNames()); + } + + + /** + * Construct a table level manifest for a backup of the named table. + * @param backupCtx The ongoing backup context + */ + public BackupManifest(BackupInfo backupCtx, TableName table) { + this.backupId = backupCtx.getBackupId(); + this.type = backupCtx.getType(); + this.rootDir = backupCtx.getTargetRootDir(); + this.tableBackupDir = backupCtx.getBackupStatus(table).getTargetDir(); + if (this.type == BackupType.INCREMENTAL) { + this.logBackupDir = backupCtx.getHLogTargetDir(); + } + this.startTs = backupCtx.getStartTs(); + this.completeTs = backupCtx.getEndTs(); + List<TableName> tables = new ArrayList<TableName>(); + tables.add(table); + this.loadTableList(tables); + } + + /** + * Construct manifest from a backup directory. + * @param conf configuration + * @param backupPath backup path + * @throws IOException + */ + + public BackupManifest(Configuration conf, Path backupPath) throws IOException { + this(backupPath.getFileSystem(conf), backupPath); + } + + /** + * Construct manifest from a backup directory. + * @param conf configuration + * @param backupPath backup path + * @throws BackupException exception + */ + + public BackupManifest(FileSystem fs, Path backupPath) throws BackupException { + if (LOG.isDebugEnabled()) { + LOG.debug("Loading manifest from: " + backupPath.toString()); + } + // The input backupDir may not exactly be the backup table dir. + // It could be the backup log dir where there is also a manifest file stored. + // This variable's purpose is to keep the correct and original location so + // that we can store/persist it. + this.tableBackupDir = backupPath.toString(); + this.config = fs.getConf(); + try { + + FileStatus[] subFiles = BackupClientUtil.listStatus(fs, backupPath, null); + if (subFiles == null) { + String errorMsg = backupPath.toString() + " does not exist"; + LOG.error(errorMsg); + throw new IOException(errorMsg); + } + for (FileStatus subFile : subFiles) { + if (subFile.getPath().getName().equals(MANIFEST_FILE_NAME)) { + + // load and set manifest field from file content + FSDataInputStream in = fs.open(subFile.getPath()); + long len = subFile.getLen(); + byte[] pbBytes = new byte[(int) len]; + in.readFully(pbBytes); + BackupProtos.BackupManifest proto = null; + try{ + proto = parseFrom(pbBytes); + } catch(Exception e){ + throw new BackupException(e); + } + this.version = proto.getVersion(); + this.backupId = proto.getBackupId(); + this.type = BackupType.valueOf(proto.getType().name()); + // Here the parameter backupDir is where the manifest file is. + // There should always be a manifest file under: + // backupRootDir/namespace/table/backupId/.backup.manifest + this.rootDir = backupPath.getParent().getParent().getParent().toString(); + + Path p = backupPath.getParent(); + if (p.getName().equals(HConstants.HREGION_LOGDIR_NAME)) { + this.rootDir = p.getParent().toString(); + } else { + this.rootDir = p.getParent().getParent().toString(); + } + + loadTableList(proto); + this.startTs = proto.getStartTs(); + this.completeTs = proto.getCompleteTs(); + loadIncrementalTimestampMap(proto); + loadDependency(proto); + //TODO: merge will be implemented by future jira + LOG.debug("Loaded manifest instance from manifest file: " + + BackupClientUtil.getPath(subFile.getPath())); + return; + } + } + String errorMsg = "No manifest file found in: " + backupPath.toString(); + throw new IOException(errorMsg); + + } catch (IOException e) { + throw new BackupException(e.getMessage()); + } + } + + private void loadIncrementalTimestampMap(BackupProtos.BackupManifest proto) { + List<BackupProtos.TableServerTimestamp> list = proto.getTstMapList(); + if(list == null || list.size() == 0) return; + this.incrTimeRanges = new HashMap<TableName, HashMap<String, Long>>(); + for(BackupProtos.TableServerTimestamp tst: list){ + TableName tn = ProtobufUtil.toTableName(tst.getTable()); + HashMap<String, Long> map = this.incrTimeRanges.get(tn); + if(map == null){ + map = new HashMap<String, Long>(); + this.incrTimeRanges.put(tn, map); + } + List<BackupProtos.ServerTimestamp> listSt = tst.getServerTimestampList(); + for(BackupProtos.ServerTimestamp stm: listSt) { + map.put(stm.getServer(), stm.getTimestamp()); + } + } + } + + private void loadDependency(BackupProtos.BackupManifest proto) { + if(LOG.isDebugEnabled()) { + LOG.debug("load dependency for: "+proto.getBackupId()); + } + + dependency = new HashMap<String, BackupImage>(); + List<BackupProtos.BackupImage> list = proto.getDependentBackupImageList(); + for (BackupProtos.BackupImage im : list) { + BackupImage bim = BackupImage.fromProto(im); + if(im.getBackupId() != null){ + dependency.put(im.getBackupId(), bim); + } else{ + LOG.warn("Load dependency for backup manifest: "+ backupId+ + ". Null backup id in dependent image"); + } + } + } + + private void loadTableList(BackupProtos.BackupManifest proto) { + this.tableList = new ArrayList<TableName>(); + List<HBaseProtos.TableName> list = proto.getTableListList(); + for (HBaseProtos.TableName name: list) { + this.tableList.add(ProtobufUtil.toTableName(name)); + } + } + + public BackupType getType() { + return type; + } + + public void setType(BackupType type) { + this.type = type; + } + + /** + * Loads table list. + * @param tableList Table list + */ + private void loadTableList(List<TableName> tableList) { + + this.tableList = this.getTableList(); + if (this.tableList.size() > 0) { + this.tableList.clear(); + } + for (int i = 0; i < tableList.size(); i++) { + this.tableList.add(tableList.get(i)); + } + + LOG.debug(tableList.size() + " tables exist in table set."); + } + + /** + * Get the table set of this image. + * @return The table set list + */ + public ArrayList<TableName> getTableList() { + if (this.tableList == null) { + this.tableList = new ArrayList<TableName>(); + } + return this.tableList; + } + + /** + * Persist the manifest file. + * @throws IOException IOException when storing the manifest file. + */ + + public void store(Configuration conf) throws BackupException { + byte[] data = toByteArray(); + + // write the file, overwrite if already exist + Path manifestFilePath = + new Path(new Path((this.tableBackupDir != null ? this.tableBackupDir : this.logBackupDir)) + ,MANIFEST_FILE_NAME); + try { + FSDataOutputStream out = + manifestFilePath.getFileSystem(conf).create(manifestFilePath, true); + out.write(data); + out.close(); + } catch (IOException e) { + throw new BackupException(e.getMessage()); + } + + LOG.info("Manifest file stored to " + manifestFilePath); + } + + /** + * Protobuf serialization + * @return The filter serialized using pb + */ + public byte[] toByteArray() { + BackupProtos.BackupManifest.Builder builder = BackupProtos.BackupManifest.newBuilder(); + builder.setVersion(this.version); + builder.setBackupId(this.backupId); + builder.setType(BackupProtos.BackupType.valueOf(this.type.name())); + setTableList(builder); + builder.setStartTs(this.startTs); + builder.setCompleteTs(this.completeTs); + setIncrementalTimestampMap(builder); + setDependencyMap(builder); + return builder.build().toByteArray(); + } + + private void setIncrementalTimestampMap(BackupProtos.BackupManifest.Builder builder) { + if (this.incrTimeRanges == null) { + return; + } + for (Entry<TableName, HashMap<String,Long>> entry: this.incrTimeRanges.entrySet()) { + TableName key = entry.getKey(); + HashMap<String, Long> value = entry.getValue(); + BackupProtos.TableServerTimestamp.Builder tstBuilder = + BackupProtos.TableServerTimestamp.newBuilder(); + tstBuilder.setTable(ProtobufUtil.toProtoTableName(key)); + + for (String s : value.keySet()) { + BackupProtos.ServerTimestamp.Builder stBuilder = BackupProtos.ServerTimestamp.newBuilder(); + stBuilder.setServer(s); + stBuilder.setTimestamp(value.get(s)); + tstBuilder.addServerTimestamp(stBuilder.build()); + } + builder.addTstMap(tstBuilder.build()); + } + } + + private void setDependencyMap(BackupProtos.BackupManifest.Builder builder) { + for (BackupImage image: getDependency().values()) { + builder.addDependentBackupImage(image.toProto()); + } + } + + private void setTableList(BackupProtos.BackupManifest.Builder builder) { + for(TableName name: tableList){ + builder.addTableList(ProtobufUtil.toProtoTableName(name)); + } + } + + /** + * Parse protobuf from byte array + * @param pbBytes A pb serialized BackupManifest instance + * @return An instance of made from <code>bytes</code> + * @throws DeserializationException + */ + private static BackupProtos.BackupManifest parseFrom(final byte[] pbBytes) + throws DeserializationException { + BackupProtos.BackupManifest proto; + try { + proto = BackupProtos.BackupManifest.parseFrom(pbBytes); + } catch (InvalidProtocolBufferException e) { + throw new DeserializationException(e); + } + return proto; + } + + /** + * Get manifest file version + * @return version + */ + public String getVersion() { + return version; + } + + /** + * Get this backup image. + * @return the backup image. + */ + public BackupImage getBackupImage() { + return this.getDependency().get(this.backupId); + } + + /** + * Add dependent backup image for this backup. + * @param image The direct dependent backup image + */ + public void addDependentImage(BackupImage image) { + this.getDependency().get(this.backupId).addAncestor(image); + this.setDependencyMap(this.getDependency(), image); + } + + + + /** + * Get all dependent backup images. The image of this backup is also contained. + * @return The dependent backup images map + */ + public Map<String, BackupImage> getDependency() { + if (this.dependency == null) { + this.dependency = new HashMap<String, BackupImage>(); + LOG.debug(this.rootDir + " " + this.backupId + " " + this.type); + this.dependency.put(this.backupId, + new BackupImage(this.backupId, this.type, this.rootDir, tableList, this.startTs, + this.completeTs)); + } + return this.dependency; + } + + /** + * Set the incremental timestamp map directly. + * @param incrTimestampMap timestamp map + */ + public void setIncrTimestampMap(HashMap<TableName, HashMap<String, Long>> incrTimestampMap) { + this.incrTimeRanges = incrTimestampMap; + } + + + public Map<TableName, HashMap<String, Long>> getIncrTimestampMap() { + if (this.incrTimeRanges == null) { + this.incrTimeRanges = new HashMap<TableName, HashMap<String, Long>>(); + } + return this.incrTimeRanges; + } + + + /** + * Get the image list of this backup for restore in time order. + * @param reverse If true, then output in reverse order, otherwise in time order from old to new + * @return the backup image list for restore in time order + */ + public ArrayList<BackupImage> getRestoreDependentList(boolean reverse) { + TreeMap<Long, BackupImage> restoreImages = new TreeMap<Long, BackupImage>(); + for (BackupImage image : this.getDependency().values()) { + restoreImages.put(Long.valueOf(image.startTs), image); + } + return new ArrayList<BackupImage>(reverse ? (restoreImages.descendingMap().values()) + : (restoreImages.values())); + } + + /** + * Get the dependent image list for a specific table of this backup in time order from old to new + * if want to restore to this backup image level. + * @param table table + * @return the backup image list for a table in time order + */ + public ArrayList<BackupImage> getDependentListByTable(TableName table) { + ArrayList<BackupImage> tableImageList = new ArrayList<BackupImage>(); + ArrayList<BackupImage> imageList = getRestoreDependentList(true); + for (BackupImage image : imageList) { + if (image.hasTable(table)) { + tableImageList.add(image); + if (image.getType() == BackupType.FULL) { + break; + } + } + } + Collections.reverse(tableImageList); + return tableImageList; + } + + /** + * Get the full dependent image list in the whole dependency scope for a specific table of this + * backup in time order from old to new. + * @param table table + * @return the full backup image list for a table in time order in the whole scope of the + * dependency of this image + */ + public ArrayList<BackupImage> getAllDependentListByTable(TableName table) { + ArrayList<BackupImage> tableImageList = new ArrayList<BackupImage>(); + ArrayList<BackupImage> imageList = getRestoreDependentList(false); + for (BackupImage image : imageList) { + if (image.hasTable(table)) { + tableImageList.add(image); + } + } + return tableImageList; + } + + + /** + * Recursively set the dependency map of the backup images. + * @param map The dependency map + * @param image The backup image + */ + private void setDependencyMap(Map<String, BackupImage> map, BackupImage image) { + if (image == null) { + return; + } else { + map.put(image.getBackupId(), image); + for (BackupImage img : image.getAncestors()) { + setDependencyMap(map, img); + } + } + } + + /** + * Check whether backup image1 could cover backup image2 or not. + * @param image1 backup image 1 + * @param image2 backup image 2 + * @return true if image1 can cover image2, otherwise false + */ + public static boolean canCoverImage(BackupImage image1, BackupImage image2) { + // image1 can cover image2 only when the following conditions are satisfied: + // - image1 must not be an incremental image; + // - image1 must be taken after image2 has been taken; + // - table set of image1 must cover the table set of image2. + if (image1.getType() == BackupType.INCREMENTAL) { + return false; + } + if (image1.getStartTs() < image2.getStartTs()) { + return false; + } + List<TableName> image1TableList = image1.getTableNames(); + List<TableName> image2TableList = image2.getTableNames(); + boolean found = false; + for (int i = 0; i < image2TableList.size(); i++) { + found = false; + for (int j = 0; j < image1TableList.size(); j++) { + if (image2TableList.get(i).equals(image1TableList.get(j))) { + found = true; + break; + } + } + if (!found) { + return false; + } + } + + LOG.debug("Backup image " + image1.getBackupId() + " can cover " + image2.getBackupId()); + return true; + } + + /** + * Check whether backup image set could cover a backup image or not. + * @param fullImages The backup image set + * @param image The target backup image + * @return true if fullImages can cover image, otherwise false + */ + public static boolean canCoverImage(ArrayList<BackupImage> fullImages, BackupImage image) { + // fullImages can cover image only when the following conditions are satisfied: + // - each image of fullImages must not be an incremental image; + // - each image of fullImages must be taken after image has been taken; + // - sum table set of fullImages must cover the table set of image. + for (BackupImage image1 : fullImages) { + if (image1.getType() == BackupType.INCREMENTAL) { + return false; + } + if (image1.getStartTs() < image.getStartTs()) { + return false; + } + } + + ArrayList<String> image1TableList = new ArrayList<String>(); + for (BackupImage image1 : fullImages) { + List<TableName> tableList = image1.getTableNames(); + for (TableName table : tableList) { + image1TableList.add(table.getNameAsString()); + } + } + ArrayList<String> image2TableList = new ArrayList<String>(); + List<TableName> tableList = image.getTableNames(); + for (TableName table : tableList) { + image2TableList.add(table.getNameAsString()); + } + + for (int i = 0; i < image2TableList.size(); i++) { + if (image1TableList.contains(image2TableList.get(i)) == false) { + return false; + } + } + + LOG.debug("Full image set can cover image " + image.getBackupId()); + return true; + } + + public BackupInfo toBackupInfo() + { + BackupInfo info = new BackupInfo(); + info.setType(type); + TableName[] tables = new TableName[tableList.size()]; + info.addTables(getTableList().toArray(tables)); + info.setBackupId(backupId); + info.setStartTs(startTs); + info.setTargetRootDir(rootDir); + if(type == BackupType.INCREMENTAL) { + info.setHlogTargetDir(logBackupDir); + } + return info; + } +}
http://git-wip-us.apache.org/repos/asf/hbase/blob/b14e2ab1/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupRestoreConstants.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupRestoreConstants.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupRestoreConstants.java new file mode 100644 index 0000000..ac1d2bc --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupRestoreConstants.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.backup.impl; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * BackupRestoreConstants holds a bunch of HBase Backup and Restore constants + */ +@InterfaceAudience.Private +@InterfaceStability.Stable +public final class BackupRestoreConstants { + + + // delimiter in tablename list in restore command + public static final String TABLENAME_DELIMITER_IN_COMMAND = ","; + + public static final String CONF_STAGING_ROOT = "snapshot.export.staging.root"; + + public static final String BACKUPID_PREFIX = "backup_"; + + public static enum BackupCommand { + CREATE, CANCEL, DELETE, DESCRIBE, HISTORY, STATUS, CONVERT, MERGE, STOP, SHOW, HELP, PROGRESS, SET, + SET_ADD, SET_REMOVE, SET_DELETE, SET_DESCRIBE, SET_LIST + } + + private BackupRestoreConstants() { + // Can't be instantiated with this ctor. + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/b14e2ab1/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java new file mode 100644 index 0000000..d05d54c --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java @@ -0,0 +1,926 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.backup.impl; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.TreeSet; + +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.backup.BackupInfo; +import org.apache.hadoop.hbase.backup.BackupInfo.BackupState; +import org.apache.hadoop.hbase.backup.util.BackupClientUtil; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.BackupProtos; + +/** + * This class provides 'hbase:backup' table API + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public final class BackupSystemTable implements Closeable { + + static class WALItem { + String backupId; + String walFile; + String backupRoot; + + WALItem(String backupId, String walFile, String backupRoot) { + this.backupId = backupId; + this.walFile = walFile; + this.backupRoot = backupRoot; + } + + public String getBackupId() { + return backupId; + } + + public String getWalFile() { + return walFile; + } + + public String getBackupRoot() { + return backupRoot; + } + + public String toString() { + return "/" + backupRoot + "/" + backupId + "/" + walFile; + } + + } + + private static final Log LOG = LogFactory.getLog(BackupSystemTable.class); + private final static TableName tableName = TableName.BACKUP_TABLE_NAME; + // Stores backup sessions (contexts) + final static byte[] SESSIONS_FAMILY = "session".getBytes(); + // Stores other meta + final static byte[] META_FAMILY = "meta".getBytes(); + // Connection to HBase cluster, shared + // among all instances + private final Connection connection; + + public BackupSystemTable(Connection conn) throws IOException { + this.connection = conn; + } + + public void close() { + // do nothing + } + + /** + * Updates status (state) of a backup session in hbase:backup table + * @param context context + * @throws IOException exception + */ + public void updateBackupInfo(BackupInfo context) throws IOException { + + if (LOG.isDebugEnabled()) { + LOG.debug("update backup status in hbase:backup for: " + context.getBackupId() + + " set status=" + context.getState()); + } + try (Table table = connection.getTable(tableName)) { + Put put = BackupSystemTableHelper.createPutForBackupContext(context); + table.put(put); + } + } + + /** + * Deletes backup status from hbase:backup table + * @param backupId backup id + * @throws IOException exception + */ + + public void deleteBackupInfo(String backupId) throws IOException { + + if (LOG.isDebugEnabled()) { + LOG.debug("delete backup status in hbase:backup for " + backupId); + } + try (Table table = connection.getTable(tableName)) { + Delete del = BackupSystemTableHelper.createDeleteForBackupInfo(backupId); + table.delete(del); + } + } + + /** + * Reads backup status object (instance of BackupContext) from hbase:backup table + * @param backupId - backupId + * @return Current status of backup session or null + */ + + public BackupInfo readBackupInfo(String backupId) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("read backup status from hbase:backup for: " + backupId); + } + + try (Table table = connection.getTable(tableName)) { + Get get = BackupSystemTableHelper.createGetForBackupContext(backupId); + Result res = table.get(get); + if (res.isEmpty()) { + return null; + } + return BackupSystemTableHelper.resultToBackupInfo(res); + } + } + + /** + * Read the last backup start code (timestamp) of last successful backup. Will return null if + * there is no start code stored on hbase or the value is of length 0. These two cases indicate + * there is no successful backup completed so far. + * @param backupRoot root directory path to backup + * @return the timestamp of last successful backup + * @throws IOException exception + */ + public String readBackupStartCode(String backupRoot) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("read backup start code from hbase:backup"); + } + try (Table table = connection.getTable(tableName)) { + Get get = BackupSystemTableHelper.createGetForStartCode(backupRoot); + Result res = table.get(get); + if (res.isEmpty()) { + return null; + } + Cell cell = res.listCells().get(0); + byte[] val = CellUtil.cloneValue(cell); + if (val.length == 0) { + return null; + } + return new String(val); + } + } + + /** + * Write the start code (timestamp) to hbase:backup. If passed in null, then write 0 byte. + * @param startCode start code + * @param backupRoot root directory path to backup + * @throws IOException exception + */ + public void writeBackupStartCode(Long startCode, String backupRoot) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("write backup start code to hbase:backup " + startCode); + } + try (Table table = connection.getTable(tableName)) { + Put put = BackupSystemTableHelper.createPutForStartCode(startCode.toString(), backupRoot); + table.put(put); + } + } + + /** + * Get the Region Servers log information after the last log roll from hbase:backup. + * @param backupRoot root directory path to backup + * @return RS log info + * @throws IOException exception + */ + public HashMap<String, Long> readRegionServerLastLogRollResult(String backupRoot) + throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("read region server last roll log result to hbase:backup"); + } + + Scan scan = BackupSystemTableHelper.createScanForReadRegionServerLastLogRollResult(backupRoot); + + try (Table table = connection.getTable(tableName); + ResultScanner scanner = table.getScanner(scan)) { + Result res = null; + HashMap<String, Long> rsTimestampMap = new HashMap<String, Long>(); + while ((res = scanner.next()) != null) { + res.advance(); + Cell cell = res.current(); + byte[] row = CellUtil.cloneRow(cell); + String server = + BackupSystemTableHelper.getServerNameForReadRegionServerLastLogRollResult(row); + byte[] data = CellUtil.cloneValue(cell); + rsTimestampMap.put(server, Long.parseLong(new String(data))); + } + return rsTimestampMap; + } + } + + /** + * Writes Region Server last roll log result (timestamp) to hbase:backup table + * @param server - Region Server name + * @param ts- last log timestamp + * @param backupRoot root directory path to backup + * @throws IOException exception + */ + public void writeRegionServerLastLogRollResult(String server, Long ts, String backupRoot) + throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("write region server last roll log result to hbase:backup"); + } + try (Table table = connection.getTable(tableName)) { + Put put = + BackupSystemTableHelper.createPutForRegionServerLastLogRollResult(server, ts, backupRoot); + table.put(put); + } + } + + /** + * Get all completed backup information (in desc order by time) + * @param onlyCompeleted, true, if only successfully completed sessions + * @return history info of BackupCompleteData + * @throws IOException exception + */ + public ArrayList<BackupInfo> getBackupHistory(boolean onlyCompleted) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("get backup history from hbase:backup"); + } + ArrayList<BackupInfo> list; + BackupState state = onlyCompleted ? BackupState.COMPLETE : BackupState.ANY; + list = getBackupContexts(state); + return BackupClientUtil.sortHistoryListDesc(list); + } + + /** + * Get all backups history + * @return list of backup info + * @throws IOException + */ + public List<BackupInfo> getBackupHistory() throws IOException { + return getBackupHistory(false); + } + + /** + * Get first n backup history records + * @param n - number of records + * @return list of records + * @throws IOException + */ + public List<BackupInfo> getHistory(int n) throws IOException { + + List<BackupInfo> history = getBackupHistory(); + if (history.size() <= n) return history; + List<BackupInfo> list = new ArrayList<BackupInfo>(); + for (int i = 0; i < n; i++) { + list.add(history.get(i)); + } + return list; + + } + + /** + * Get backup history records filtered by list + * of filters. + * @param n - max number of records + * @param filters - list of filters + * @return backup records + * @throws IOException + */ + public List<BackupInfo> getBackupHistory(int n, BackupInfo.Filter... filters) throws IOException { + if (filters.length == 0) return getHistory(n); + + List<BackupInfo> history = getBackupHistory(); + List<BackupInfo> result = new ArrayList<BackupInfo>(); + for (BackupInfo bi : history) { + if (result.size() == n) break; + boolean passed = true; + for (int i = 0; i < filters.length; i++) { + if (!filters[i].apply(bi)) { + passed = false; + break; + } + } + if (passed) { + result.add(bi); + } + } + return result; + + } + + /** + * Get history for backup destination + * @param backupRoot - backup destination + * @return List of backup info + * @throws IOException + */ + public List<BackupInfo> getBackupHistory(String backupRoot) throws IOException { + ArrayList<BackupInfo> history = getBackupHistory(false); + for (Iterator<BackupInfo> iterator = history.iterator(); iterator.hasNext();) { + BackupInfo info = iterator.next(); + if (!backupRoot.equals(info.getTargetRootDir())) { + iterator.remove(); + } + } + return history; + } + + /** + * Get history for a table + * @param name - table name + * @return history for a table + * @throws IOException + */ + public List<BackupInfo> getBackupHistoryForTable(TableName name) throws IOException { + List<BackupInfo> history = getBackupHistory(); + List<BackupInfo> tableHistory = new ArrayList<BackupInfo>(); + for (BackupInfo info : history) { + List<TableName> tables = info.getTableNames(); + if (tables.contains(name)) { + tableHistory.add(info); + } + } + return tableHistory; + } + + public Map<TableName, ArrayList<BackupInfo>> + getBackupHistoryForTableSet(Set<TableName> set, String backupRoot) throws IOException { + List<BackupInfo> history = getBackupHistory(backupRoot); + Map<TableName, ArrayList<BackupInfo>> tableHistoryMap = + new HashMap<TableName, ArrayList<BackupInfo>>(); + for (Iterator<BackupInfo> iterator = history.iterator(); iterator.hasNext();) { + BackupInfo info = iterator.next(); + if (!backupRoot.equals(info.getTargetRootDir())) { + continue; + } + List<TableName> tables = info.getTableNames(); + for (TableName tableName: tables) { + if (set.contains(tableName)) { + ArrayList<BackupInfo> list = tableHistoryMap.get(tableName); + if (list == null) { + list = new ArrayList<BackupInfo>(); + tableHistoryMap.put(tableName, list); + } + list.add(info); + } + } + } + return tableHistoryMap; + } + + /** + * Get all backup session with a given status (in desc order by time) + * @param status status + * @return history info of backup contexts + * @throws IOException exception + */ + public ArrayList<BackupInfo> getBackupContexts(BackupState status) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("get backup contexts from hbase:backup"); + } + + Scan scan = BackupSystemTableHelper.createScanForBackupHistory(); + ArrayList<BackupInfo> list = new ArrayList<BackupInfo>(); + + try (Table table = connection.getTable(tableName); + ResultScanner scanner = table.getScanner(scan)) { + Result res = null; + while ((res = scanner.next()) != null) { + res.advance(); + BackupInfo context = BackupSystemTableHelper.cellToBackupInfo(res.current()); + if (status != BackupState.ANY && context.getState() != status) { + continue; + } + list.add(context); + } + return list; + } + } + + /** + * Write the current timestamps for each regionserver to hbase:backup after a successful full or + * incremental backup. The saved timestamp is of the last log file that was backed up already. + * @param tables tables + * @param newTimestamps timestamps + * @param backupRoot root directory path to backup + * @throws IOException exception + */ + public void writeRegionServerLogTimestamp(Set<TableName> tables, + HashMap<String, Long> newTimestamps, String backupRoot) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("write RS log time stamps to hbase:backup for tables [" + + StringUtils.join(tables, ",") + "]"); + } + List<Put> puts = new ArrayList<Put>(); + for (TableName table : tables) { + byte[] smapData = toTableServerTimestampProto(table, newTimestamps).toByteArray(); + Put put = + BackupSystemTableHelper.createPutForWriteRegionServerLogTimestamp(table, smapData, + backupRoot); + puts.add(put); + } + try (Table table = connection.getTable(tableName)) { + table.put(puts); + } + } + + /** + * Read the timestamp for each region server log after the last successful backup. Each table has + * its own set of the timestamps. The info is stored for each table as a concatenated string of + * rs->timestapmp + * @param backupRoot root directory path to backup + * @return the timestamp for each region server. key: tableName value: + * RegionServer,PreviousTimeStamp + * @throws IOException exception + */ + public HashMap<TableName, HashMap<String, Long>> readLogTimestampMap(String backupRoot) + throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("read RS log ts from hbase:backup for root=" + backupRoot); + } + + HashMap<TableName, HashMap<String, Long>> tableTimestampMap = + new HashMap<TableName, HashMap<String, Long>>(); + + Scan scan = BackupSystemTableHelper.createScanForReadLogTimestampMap(backupRoot); + try (Table table = connection.getTable(tableName); + ResultScanner scanner = table.getScanner(scan)) { + Result res = null; + while ((res = scanner.next()) != null) { + res.advance(); + Cell cell = res.current(); + byte[] row = CellUtil.cloneRow(cell); + String tabName = BackupSystemTableHelper.getTableNameForReadLogTimestampMap(row); + TableName tn = TableName.valueOf(tabName); + byte[] data = CellUtil.cloneValue(cell); + if (data == null) { + throw new IOException("Data of last backup data from hbase:backup " + + "is empty. Create a backup first."); + } + if (data != null && data.length > 0) { + HashMap<String, Long> lastBackup = + fromTableServerTimestampProto(BackupProtos.TableServerTimestamp.parseFrom(data)); + tableTimestampMap.put(tn, lastBackup); + } + } + return tableTimestampMap; + } + } + + private BackupProtos.TableServerTimestamp toTableServerTimestampProto(TableName table, + Map<String, Long> map) { + BackupProtos.TableServerTimestamp.Builder tstBuilder = + BackupProtos.TableServerTimestamp.newBuilder(); + tstBuilder.setTable(ProtobufUtil.toProtoTableName(table)); + + for (Entry<String, Long> entry : map.entrySet()) { + BackupProtos.ServerTimestamp.Builder builder = BackupProtos.ServerTimestamp.newBuilder(); + builder.setServer(entry.getKey()); + builder.setTimestamp(entry.getValue()); + tstBuilder.addServerTimestamp(builder.build()); + } + + return tstBuilder.build(); + } + + private HashMap<String, Long> fromTableServerTimestampProto( + BackupProtos.TableServerTimestamp proto) { + HashMap<String, Long> map = new HashMap<String, Long>(); + List<BackupProtos.ServerTimestamp> list = proto.getServerTimestampList(); + for (BackupProtos.ServerTimestamp st : list) { + map.put(st.getServer(), st.getTimestamp()); + } + return map; + } + + /** + * Return the current tables covered by incremental backup. + * @param backupRoot root directory path to backup + * @return set of tableNames + * @throws IOException exception + */ + public Set<TableName> getIncrementalBackupTableSet(String backupRoot) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("get incr backup table set from hbase:backup"); + } + TreeSet<TableName> set = new TreeSet<>(); + + try (Table table = connection.getTable(tableName)) { + Get get = BackupSystemTableHelper.createGetForIncrBackupTableSet(backupRoot); + Result res = table.get(get); + if (res.isEmpty()) { + return set; + } + List<Cell> cells = res.listCells(); + for (Cell cell : cells) { + // qualifier = table name - we use table names as qualifiers + set.add(TableName.valueOf(CellUtil.cloneQualifier(cell))); + } + return set; + } + } + + /** + * Add tables to global incremental backup set + * @param tables - set of tables + * @param backupRoot root directory path to backup + * @throws IOException exception + */ + public void addIncrementalBackupTableSet(Set<TableName> tables, String backupRoot) + throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Add incremental backup table set to hbase:backup. ROOT=" + backupRoot + + " tables [" + StringUtils.join(tables, " ") + "]"); + for (TableName table : tables) { + LOG.debug(table); + } + } + try (Table table = connection.getTable(tableName)) { + Put put = BackupSystemTableHelper.createPutForIncrBackupTableSet(tables, backupRoot); + table.put(put); + } + } + + /** + * Removes incremental backup set + * @param backupRoot backup root + */ + + public void deleteIncrementalBackupTableSet(String backupRoot) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Delete incremental backup table set to hbase:backup. ROOT=" + backupRoot); + } + try (Table table = connection.getTable(tableName)) { + Delete delete = BackupSystemTableHelper.createDeleteForIncrBackupTableSet(backupRoot); + table.delete(delete); + } + } + + /** + * Register WAL files as eligible for deletion + * @param files files + * @param backupId backup id + * @param backupRoot root directory path to backup + * @throws IOException exception + */ + public void addWALFiles(List<String> files, String backupId, String backupRoot) + throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("add WAL files to hbase:backup: " + backupId + " " + backupRoot + " files [" + + StringUtils.join(files, ",") + "]"); + for (String f : files) { + LOG.debug("add :" + f); + } + } + try (Table table = connection.getTable(tableName)) { + List<Put> puts = + BackupSystemTableHelper.createPutsForAddWALFiles(files, backupId, backupRoot); + table.put(puts); + } + } + + /** + * Register WAL files as eligible for deletion + * @param backupRoot root directory path to backup + * @throws IOException exception + */ + public Iterator<WALItem> getWALFilesIterator(String backupRoot) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("get WAL files from hbase:backup"); + } + final Table table = connection.getTable(tableName); + Scan scan = BackupSystemTableHelper.createScanForGetWALs(backupRoot); + final ResultScanner scanner = table.getScanner(scan); + final Iterator<Result> it = scanner.iterator(); + return new Iterator<WALItem>() { + + @Override + public boolean hasNext() { + boolean next = it.hasNext(); + if (!next) { + // close all + try { + scanner.close(); + table.close(); + } catch (IOException e) { + LOG.error("Close WAL Iterator", e); + } + } + return next; + } + + @Override + public WALItem next() { + Result next = it.next(); + List<Cell> cells = next.listCells(); + byte[] buf = cells.get(0).getValueArray(); + int len = cells.get(0).getValueLength(); + int offset = cells.get(0).getValueOffset(); + String backupId = new String(buf, offset, len); + buf = cells.get(1).getValueArray(); + len = cells.get(1).getValueLength(); + offset = cells.get(1).getValueOffset(); + String walFile = new String(buf, offset, len); + buf = cells.get(2).getValueArray(); + len = cells.get(2).getValueLength(); + offset = cells.get(2).getValueOffset(); + String backupRoot = new String(buf, offset, len); + return new WALItem(backupId, walFile, backupRoot); + } + + @Override + public void remove() { + // not implemented + throw new RuntimeException("remove is not supported"); + } + }; + + } + + /** + * Check if WAL file is eligible for deletion Future: to support all backup destinations + * @param file file + * @return true, if - yes. + * @throws IOException exception + */ + public boolean isWALFileDeletable(String file) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Check if WAL file has been already backed up in hbase:backup " + file); + } + try (Table table = connection.getTable(tableName)) { + Get get = BackupSystemTableHelper.createGetForCheckWALFile(file); + Result res = table.get(get); + if (res.isEmpty()) { + return false; + } + return true; + } + } + + /** + * Checks if we have at least one backup session in hbase:backup This API is used by + * BackupLogCleaner + * @return true, if - at least one session exists in hbase:backup table + * @throws IOException exception + */ + public boolean hasBackupSessions() throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Has backup sessions from hbase:backup"); + } + boolean result = false; + Scan scan = BackupSystemTableHelper.createScanForBackupHistory(); + scan.setCaching(1); + try (Table table = connection.getTable(tableName); + ResultScanner scanner = table.getScanner(scan)) { + if (scanner.next() != null) { + result = true; + } + return result; + } + } + + /** + * BACKUP SETS + */ + + /** + * Get backup set list + * @return backup set list + * @throws IOException + */ + public List<String> listBackupSets() throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug(" Backup set list"); + } + List<String> list = new ArrayList<String>(); + Table table = null; + ResultScanner scanner = null; + try { + table = connection.getTable(tableName); + Scan scan = BackupSystemTableHelper.createScanForBackupSetList(); + scan.setMaxVersions(1); + scanner = table.getScanner(scan); + Result res = null; + while ((res = scanner.next()) != null) { + res.advance(); + list.add(BackupSystemTableHelper.cellKeyToBackupSetName(res.current())); + } + return list; + } finally { + if (scanner != null) { + scanner.close(); + } + if (table != null) { + table.close(); + } + } + } + + /** + * Get backup set description (list of tables) + * @param name - set's name + * @return list of tables in a backup set + * @throws IOException + */ + public List<TableName> describeBackupSet(String name) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug(" Backup set describe: " + name); + } + Table table = null; + try { + table = connection.getTable(tableName); + Get get = BackupSystemTableHelper.createGetForBackupSet(name); + Result res = table.get(get); + if (res.isEmpty()) return null; + res.advance(); + String[] tables = BackupSystemTableHelper.cellValueToBackupSet(res.current()); + return toList(tables); + } finally { + if (table != null) { + table.close(); + } + } + } + + private List<TableName> toList(String[] tables) { + List<TableName> list = new ArrayList<TableName>(tables.length); + for (String name : tables) { + list.add(TableName.valueOf(name)); + } + return list; + } + + /** + * Add backup set (list of tables) + * @param name - set name + * @param tables - list of tables, comma-separated + * @throws IOException + */ + public void addToBackupSet(String name, String[] newTables) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Backup set add: " + name + " tables [" + StringUtils.join(newTables, " ") + "]"); + } + Table table = null; + String[] union = null; + try { + table = connection.getTable(tableName); + Get get = BackupSystemTableHelper.createGetForBackupSet(name); + Result res = table.get(get); + if (res.isEmpty()) { + union = newTables; + } else { + res.advance(); + String[] tables = BackupSystemTableHelper.cellValueToBackupSet(res.current()); + union = merge(tables, newTables); + } + Put put = BackupSystemTableHelper.createPutForBackupSet(name, union); + table.put(put); + } finally { + if (table != null) { + table.close(); + } + } + } + + private String[] merge(String[] tables, String[] newTables) { + List<String> list = new ArrayList<String>(); + // Add all from tables + for (String t : tables) { + list.add(t); + } + for (String nt : newTables) { + if (list.contains(nt)) continue; + list.add(nt); + } + String[] arr = new String[list.size()]; + list.toArray(arr); + return arr; + } + + /** + * Remove tables from backup set (list of tables) + * @param name - set name + * @param tables - list of tables, comma-separated + * @throws IOException + */ + public void removeFromBackupSet(String name, String[] toRemove) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug(" Backup set remove from : " + name + " tables [" + StringUtils.join(toRemove, " ") + + "]"); + } + Table table = null; + String[] disjoint = null; + try { + table = connection.getTable(tableName); + Get get = BackupSystemTableHelper.createGetForBackupSet(name); + Result res = table.get(get); + if (res.isEmpty()) { + LOG.warn("Backup set '" + name + "' not found."); + return; + } else { + res.advance(); + String[] tables = BackupSystemTableHelper.cellValueToBackupSet(res.current()); + disjoint = disjoin(tables, toRemove); + } + if (disjoint.length > 0) { + Put put = BackupSystemTableHelper.createPutForBackupSet(name, disjoint); + table.put(put); + } else { + // Delete + // describeBackupSet(name); + LOG.warn("Backup set '" + name + "' does not contain tables [" + + StringUtils.join(toRemove, " ") + "]"); + } + } finally { + if (table != null) { + table.close(); + } + } + } + + private String[] disjoin(String[] tables, String[] toRemove) { + List<String> list = new ArrayList<String>(); + // Add all from tables + for (String t : tables) { + list.add(t); + } + for (String nt : toRemove) { + if (list.contains(nt)) { + list.remove(nt); + } + } + String[] arr = new String[list.size()]; + list.toArray(arr); + return arr; + } + + /** + * Delete backup set + * @param name set's name + * @throws IOException + */ + public void deleteBackupSet(String name) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug(" Backup set delete: " + name); + } + Table table = null; + try { + table = connection.getTable(tableName); + Delete del = BackupSystemTableHelper.createDeleteForBackupSet(name); + table.delete(del); + } finally { + if (table != null) { + table.close(); + } + } + } + + /** + * Get backup system table descriptor + * @return descriptor + */ + public static HTableDescriptor getSystemTableDescriptor() { + HTableDescriptor tableDesc = new HTableDescriptor(tableName); + HColumnDescriptor colSessionsDesc = new HColumnDescriptor(SESSIONS_FAMILY); + colSessionsDesc.setMaxVersions(1); + // Time to keep backup sessions (secs) + Configuration config = HBaseConfiguration.create(); + int ttl = config.getInt(HConstants.BACKUP_SYSTEM_TTL_KEY, HConstants.BACKUP_SYSTEM_TTL_DEFAULT); + colSessionsDesc.setTimeToLive(ttl); + tableDesc.addFamily(colSessionsDesc); + HColumnDescriptor colMetaDesc = new HColumnDescriptor(META_FAMILY); + // colDesc.setMaxVersions(1); + tableDesc.addFamily(colMetaDesc); + return tableDesc; + } + + public static String getTableNameAsString() { + return tableName.getNameAsString(); + } + + public static TableName getTableName() { + return tableName; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/b14e2ab1/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTableHelper.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTableHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTableHelper.java new file mode 100644 index 0000000..37f29f8 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTableHelper.java @@ -0,0 +1,433 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.backup.impl; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Set; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.backup.BackupInfo; +import org.apache.hadoop.hbase.backup.util.BackupClientUtil; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.util.Bytes; + + +/** + * A collection for methods used by BackupSystemTable. + */ + +@InterfaceAudience.Private +@InterfaceStability.Evolving +public final class BackupSystemTableHelper { + + /** + * hbase:backup schema: + * 1. Backup sessions rowkey= "session:" + backupId; value = serialized BackupContext + * 2. Backup start code rowkey = "startcode:" + backupRoot; value = startcode + * 3. Incremental backup set rowkey="incrbackupset:" + backupRoot; value=[list of tables] + * 4. Table-RS-timestamp map rowkey="trslm:"+ backupRoot+table_name; value = map[RS-> last WAL + * timestamp] + * 5. RS - WAL ts map rowkey="rslogts:"+backupRoot +server; value = last WAL timestamp + * 6. WALs recorded rowkey="wals:"+WAL unique file name; value = backupId and full WAL file name + */ + + private final static String BACKUP_INFO_PREFIX = "session:"; + private final static String START_CODE_ROW = "startcode:"; + private final static String INCR_BACKUP_SET = "incrbackupset:"; + private final static String TABLE_RS_LOG_MAP_PREFIX = "trslm:"; + private final static String RS_LOG_TS_PREFIX = "rslogts:"; + private final static String WALS_PREFIX = "wals:"; + private final static String SET_KEY_PREFIX = "backupset:"; + + private final static byte[] EMPTY_VALUE = new byte[] {}; + + // Safe delimiter in a string + private final static String NULL = "\u0000"; + + private BackupSystemTableHelper() { + throw new AssertionError("Instantiating utility class..."); + } + + /** + * Creates Put operation for a given backup context object + * @param context backup context + * @return put operation + * @throws IOException exception + */ + static Put createPutForBackupContext(BackupInfo context) throws IOException { + Put put = new Put(rowkey(BACKUP_INFO_PREFIX, context.getBackupId())); + put.addColumn(BackupSystemTable.SESSIONS_FAMILY, "context".getBytes(), context.toByteArray()); + return put; + } + + /** + * Creates Get operation for a given backup id + * @param backupId - backup's ID + * @return get operation + * @throws IOException exception + */ + static Get createGetForBackupContext(String backupId) throws IOException { + Get get = new Get(rowkey(BACKUP_INFO_PREFIX, backupId)); + get.addFamily(BackupSystemTable.SESSIONS_FAMILY); + get.setMaxVersions(1); + return get; + } + + /** + * Creates Delete operation for a given backup id + * @param backupId - backup's ID + * @return delete operation + * @throws IOException exception + */ + public static Delete createDeleteForBackupInfo(String backupId) { + Delete del = new Delete(rowkey(BACKUP_INFO_PREFIX, backupId)); + del.addFamily(BackupSystemTable.SESSIONS_FAMILY); + return del; + } + + /** + * Converts Result to BackupContext + * @param res - HBase result + * @return backup context instance + * @throws IOException exception + */ + static BackupInfo resultToBackupInfo(Result res) throws IOException { + res.advance(); + Cell cell = res.current(); + return cellToBackupInfo(cell); + } + + /** + * Creates Get operation to retrieve start code from hbase:backup + * @return get operation + * @throws IOException exception + */ + static Get createGetForStartCode(String rootPath) throws IOException { + Get get = new Get(rowkey(START_CODE_ROW, rootPath)); + get.addFamily(BackupSystemTable.META_FAMILY); + get.setMaxVersions(1); + return get; + } + + /** + * Creates Put operation to store start code to hbase:backup + * @return put operation + * @throws IOException exception + */ + static Put createPutForStartCode(String startCode, String rootPath) { + Put put = new Put(rowkey(START_CODE_ROW, rootPath)); + put.addColumn(BackupSystemTable.META_FAMILY, "startcode".getBytes(), startCode.getBytes()); + return put; + } + + /** + * Creates Get to retrieve incremental backup table set from hbase:backup + * @return get operation + * @throws IOException exception + */ + static Get createGetForIncrBackupTableSet(String backupRoot) throws IOException { + Get get = new Get(rowkey(INCR_BACKUP_SET, backupRoot)); + get.addFamily(BackupSystemTable.META_FAMILY); + get.setMaxVersions(1); + return get; + } + + /** + * Creates Put to store incremental backup table set + * @param tables tables + * @return put operation + */ + static Put createPutForIncrBackupTableSet(Set<TableName> tables, String backupRoot) { + Put put = new Put(rowkey(INCR_BACKUP_SET, backupRoot)); + for (TableName table : tables) { + put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes(table.getNameAsString()), + EMPTY_VALUE); + } + return put; + } + + /** + * Creates Delete for incremental backup table set + * @param backupRoot backup root + * @return delete operation + */ + static Delete createDeleteForIncrBackupTableSet(String backupRoot) { + Delete delete = new Delete(rowkey(INCR_BACKUP_SET, backupRoot)); + delete.addFamily(BackupSystemTable.META_FAMILY); + return delete; + } + + /** + * Creates Scan operation to load backup history + * @return scan operation + */ + static Scan createScanForBackupHistory() { + Scan scan = new Scan(); + byte[] startRow = BACKUP_INFO_PREFIX.getBytes(); + byte[] stopRow = Arrays.copyOf(startRow, startRow.length); + stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1); + scan.setStartRow(startRow); + scan.setStopRow(stopRow); + scan.addFamily(BackupSystemTable.SESSIONS_FAMILY); + scan.setMaxVersions(1); + return scan; + } + + /** + * Converts cell to backup context instance. + * @param current - cell + * @return backup context instance + * @throws IOException exception + */ + static BackupInfo cellToBackupInfo(Cell current) throws IOException { + byte[] data = CellUtil.cloneValue(current); + return BackupInfo.fromByteArray(data); + } + + /** + * Creates Put to write RS last roll log timestamp map + * @param table - table + * @param smap - map, containing RS:ts + * @return put operation + */ + static Put createPutForWriteRegionServerLogTimestamp(TableName table, byte[] smap, + String backupRoot) { + Put put = new Put(rowkey(TABLE_RS_LOG_MAP_PREFIX, backupRoot, NULL, table.getNameAsString())); + put.addColumn(BackupSystemTable.META_FAMILY, "log-roll-map".getBytes(), smap); + return put; + } + + /** + * Creates Scan to load table-> { RS -> ts} map of maps + * @return scan operation + */ + static Scan createScanForReadLogTimestampMap(String backupRoot) { + Scan scan = new Scan(); + byte[] startRow = rowkey(TABLE_RS_LOG_MAP_PREFIX, backupRoot); + byte[] stopRow = Arrays.copyOf(startRow, startRow.length); + stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1); + scan.setStartRow(startRow); + scan.setStopRow(stopRow); + scan.addFamily(BackupSystemTable.META_FAMILY); + + return scan; + } + + /** + * Get table name from rowkey + * @param cloneRow rowkey + * @return table name + */ + static String getTableNameForReadLogTimestampMap(byte[] cloneRow) { + String s = new String(cloneRow); + int index = s.lastIndexOf(NULL); + return s.substring(index + 1); + } + + /** + * Creates Put to store RS last log result + * @param server - server name + * @param timestamp - log roll result (timestamp) + * @return put operation + */ + static Put createPutForRegionServerLastLogRollResult(String server, Long timestamp, + String backupRoot) { + Put put = new Put(rowkey(RS_LOG_TS_PREFIX, backupRoot, NULL, server)); + put.addColumn(BackupSystemTable.META_FAMILY, "rs-log-ts".getBytes(), timestamp.toString() + .getBytes()); + return put; + } + + /** + * Creates Scan operation to load last RS log roll results + * @return scan operation + */ + static Scan createScanForReadRegionServerLastLogRollResult(String backupRoot) { + Scan scan = new Scan(); + byte[] startRow = rowkey(RS_LOG_TS_PREFIX, backupRoot); + byte[] stopRow = Arrays.copyOf(startRow, startRow.length); + stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1); + scan.setStartRow(startRow); + scan.setStopRow(stopRow); + scan.addFamily(BackupSystemTable.META_FAMILY); + scan.setMaxVersions(1); + + return scan; + } + + /** + * Get server's name from rowkey + * @param row - rowkey + * @return server's name + */ + static String getServerNameForReadRegionServerLastLogRollResult(byte[] row) { + String s = new String(row); + int index = s.lastIndexOf(NULL); + return s.substring(index + 1); + } + + /** + * Creates put list for list of WAL files + * @param files list of WAL file paths + * @param backupId backup id + * @return put list + * @throws IOException exception + */ + public static List<Put> createPutsForAddWALFiles(List<String> files, String backupId, + String backupRoot) throws IOException { + + List<Put> puts = new ArrayList<Put>(); + for (String file : files) { + Put put = new Put(rowkey(WALS_PREFIX, BackupClientUtil.getUniqueWALFileNamePart(file))); + put.addColumn(BackupSystemTable.META_FAMILY, "backupId".getBytes(), backupId.getBytes()); + put.addColumn(BackupSystemTable.META_FAMILY, "file".getBytes(), file.getBytes()); + put.addColumn(BackupSystemTable.META_FAMILY, "root".getBytes(), backupRoot.getBytes()); + puts.add(put); + } + return puts; + } + + /** + * Creates Scan operation to load WALs TODO: support for backupRoot + * @param backupRoot - path to backup destination + * @return scan operation + */ + public static Scan createScanForGetWALs(String backupRoot) { + Scan scan = new Scan(); + byte[] startRow = WALS_PREFIX.getBytes(); + byte[] stopRow = Arrays.copyOf(startRow, startRow.length); + stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1); + scan.setStartRow(startRow); + scan.setStopRow(stopRow); + scan.addFamily(BackupSystemTable.META_FAMILY); + return scan; + } + + /** + * Creates Get operation for a given wal file name TODO: support for backup destination + * @param file file + * @return get operation + * @throws IOException exception + */ + public static Get createGetForCheckWALFile(String file) throws IOException { + Get get = new Get(rowkey(WALS_PREFIX, BackupClientUtil.getUniqueWALFileNamePart(file))); + // add backup root column + get.addFamily(BackupSystemTable.META_FAMILY); + return get; + } + + /** + * Creates Scan operation to load backup set list + * @return scan operation + */ + static Scan createScanForBackupSetList() { + Scan scan = new Scan(); + byte[] startRow = SET_KEY_PREFIX.getBytes(); + byte[] stopRow = Arrays.copyOf(startRow, startRow.length); + stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1); + scan.setStartRow(startRow); + scan.setStopRow(stopRow); + scan.addFamily(BackupSystemTable.META_FAMILY); + return scan; + } + + /** + * Creates Get operation to load backup set content + * @return get operation + */ + static Get createGetForBackupSet(String name) { + Get get = new Get(rowkey(SET_KEY_PREFIX, name)); + get.addFamily(BackupSystemTable.META_FAMILY); + return get; + } + + /** + * Creates Delete operation to delete backup set content + * @param name - backup set's name + * @return delete operation + */ + static Delete createDeleteForBackupSet(String name) { + Delete del = new Delete(rowkey(SET_KEY_PREFIX, name)); + del.addFamily(BackupSystemTable.META_FAMILY); + return del; + } + + /** + * Creates Put operation to update backup set content + * @param name - backup set's name + * @param tables - list of tables + * @return put operation + */ + static Put createPutForBackupSet(String name, String[] tables) { + Put put = new Put(rowkey(SET_KEY_PREFIX, name)); + byte[] value = convertToByteArray(tables); + put.addColumn(BackupSystemTable.META_FAMILY, "tables".getBytes(), value); + return put; + } + + private static byte[] convertToByteArray(String[] tables) { + return StringUtils.join(tables, ",").getBytes(); + } + + /** + * Converts cell to backup set list. + * @param current - cell + * @return backup set + * @throws IOException + */ + static String[] cellValueToBackupSet(Cell current) throws IOException { + byte[] data = CellUtil.cloneValue(current); + if (data != null && data.length > 0) { + return new String(data).split(","); + } else { + return new String[0]; + } + } + + /** + * Converts cell key to backup set name. + * @param current - cell + * @return backup set name + * @throws IOException + */ + static String cellKeyToBackupSetName(Cell current) throws IOException { + byte[] data = CellUtil.cloneRow(current); + return new String(data).substring(SET_KEY_PREFIX.length()); + } + + static byte[] rowkey(String s, String... other) { + StringBuilder sb = new StringBuilder(s); + for (String ss : other) { + sb.append(ss); + } + return sb.toString().getBytes(); + } + +}