http://git-wip-us.apache.org/repos/asf/hbase/blob/b14e2ab1/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java new file mode 100644 index 0000000..9355d07 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java @@ -0,0 +1,540 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.backup.impl; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.backup.BackupCopyService; +import org.apache.hadoop.hbase.backup.BackupInfo; +import org.apache.hadoop.hbase.backup.BackupInfo.BackupPhase; +import org.apache.hadoop.hbase.backup.BackupInfo.BackupState; +import org.apache.hadoop.hbase.backup.BackupRequest; +import org.apache.hadoop.hbase.backup.BackupRestoreServerFactory; +import org.apache.hadoop.hbase.backup.BackupType; +import org.apache.hadoop.hbase.backup.HBackupFileSystem; +import org.apache.hadoop.hbase.backup.impl.BackupException; +import org.apache.hadoop.hbase.backup.impl.BackupManifest; +import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage; +import org.apache.hadoop.hbase.backup.impl.BackupRestoreConstants; +import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager; +import org.apache.hadoop.hbase.backup.util.BackupClientUtil; +import org.apache.hadoop.hbase.backup.util.BackupServerUtil; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; +import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.FSUtils; + +@InterfaceAudience.Private +public class FullTableBackupClient { + private static final Log LOG = LogFactory.getLog(FullTableBackupClient.class); + + private Configuration conf; + private Connection conn; + private String backupId; + private List<TableName> tableList; + HashMap<String, Long> newTimestamps = null; + + private BackupManager backupManager; + private BackupInfo backupContext; + + public FullTableBackupClient() { + // Required by the Procedure framework to create the procedure on replay + } + + public FullTableBackupClient(final Connection conn, final String backupId, + BackupRequest request) + throws IOException { + backupManager = new BackupManager(conn, conn.getConfiguration()); + this.backupId = backupId; + this.tableList = request.getTableList(); + this.conn = conn; + this.conf = conn.getConfiguration(); + backupContext = + backupManager.createBackupContext(backupId, BackupType.FULL, tableList, + request.getTargetRootDir(), + request.getWorkers(), request.getBandwidth()); + if (tableList == null || tableList.isEmpty()) { + this.tableList = new ArrayList<>(backupContext.getTables()); + } + } + + /** + * Begin the overall backup. + * @param backupContext backup context + * @throws IOException exception + */ + static void beginBackup(BackupManager backupManager, BackupInfo backupContext) throws IOException { + backupManager.setBackupContext(backupContext); + // set the start timestamp of the overall backup + long startTs = EnvironmentEdgeManager.currentTime(); + backupContext.setStartTs(startTs); + // set overall backup status: ongoing + backupContext.setState(BackupState.RUNNING); + LOG.info("Backup " + backupContext.getBackupId() + " started at " + startTs + "."); + + backupManager.updateBackupInfo(backupContext); + if (LOG.isDebugEnabled()) { + LOG.debug("Backup session " + backupContext.getBackupId() + " has been started."); + } + } + + private static String getMessage(Exception e) { + String msg = e.getMessage(); + if (msg == null || msg.equals("")) { + msg = e.getClass().getName(); + } + return msg; + } + + /** + * Delete HBase snapshot for backup. + * @param backupCtx backup context + * @throws Exception exception + */ + private static void + deleteSnapshot(final Connection conn, BackupInfo backupCtx, Configuration conf) + throws IOException { + LOG.debug("Trying to delete snapshot for full backup."); + for (String snapshotName : backupCtx.getSnapshotNames()) { + if (snapshotName == null) { + continue; + } + LOG.debug("Trying to delete snapshot: " + snapshotName); + + try (Admin admin = conn.getAdmin();) { + admin.deleteSnapshot(snapshotName); + } catch (IOException ioe) { + LOG.debug("when deleting snapshot " + snapshotName, ioe); + } + LOG.debug("Deleting the snapshot " + snapshotName + " for backup " + backupCtx.getBackupId() + + " succeeded."); + } + } + + /** + * Clean up directories with prefix "exportSnapshot-", which are generated when exporting + * snapshots. + * @throws IOException exception + */ + private static void cleanupExportSnapshotLog(Configuration conf) throws IOException { + FileSystem fs = FSUtils.getCurrentFileSystem(conf); + Path stagingDir = + new Path(conf.get(BackupRestoreConstants.CONF_STAGING_ROOT, fs.getWorkingDirectory() + .toString())); + FileStatus[] files = FSUtils.listStatus(fs, stagingDir); + if (files == null) { + return; + } + for (FileStatus file : files) { + if (file.getPath().getName().startsWith("exportSnapshot-")) { + LOG.debug("Delete log files of exporting snapshot: " + file.getPath().getName()); + if (FSUtils.delete(fs, file.getPath(), true) == false) { + LOG.warn("Can not delete " + file.getPath()); + } + } + } + } + + /** + * Clean up the uncompleted data at target directory if the ongoing backup has already entered the + * copy phase. + */ + static void cleanupTargetDir(BackupInfo backupContext, Configuration conf) { + try { + // clean up the uncompleted data at target directory if the ongoing backup has already entered + // the copy phase + LOG.debug("Trying to cleanup up target dir. Current backup phase: " + + backupContext.getPhase()); + if (backupContext.getPhase().equals(BackupPhase.SNAPSHOTCOPY) + || backupContext.getPhase().equals(BackupPhase.INCREMENTAL_COPY) + || backupContext.getPhase().equals(BackupPhase.STORE_MANIFEST)) { + FileSystem outputFs = + FileSystem.get(new Path(backupContext.getTargetRootDir()).toUri(), conf); + + // now treat one backup as a transaction, clean up data that has been partially copied at + // table level + for (TableName table : backupContext.getTables()) { + Path targetDirPath = + new Path(HBackupFileSystem.getTableBackupDir(backupContext.getTargetRootDir(), + backupContext.getBackupId(), table)); + if (outputFs.delete(targetDirPath, true)) { + LOG.info("Cleaning up uncompleted backup data at " + targetDirPath.toString() + + " done."); + } else { + LOG.info("No data has been copied to " + targetDirPath.toString() + "."); + } + + Path tableDir = targetDirPath.getParent(); + FileStatus[] backups = FSUtils.listStatus(outputFs, tableDir); + if (backups == null || backups.length == 0) { + outputFs.delete(tableDir, true); + LOG.debug(tableDir.toString() + " is empty, remove it."); + } + } + } + + } catch (IOException e1) { + LOG.error("Cleaning up uncompleted backup data of " + backupContext.getBackupId() + " at " + + backupContext.getTargetRootDir() + " failed due to " + e1.getMessage() + "."); + } + } + + /** + * Fail the overall backup. + * @param backupContext backup context + * @param e exception + * @throws Exception exception + */ + static void failBackup(Connection conn, BackupInfo backupContext, BackupManager backupManager, + Exception e, String msg, BackupType type, Configuration conf) throws IOException { + LOG.error(msg + getMessage(e), e); + // If this is a cancel exception, then we've already cleaned. + + // set the failure timestamp of the overall backup + backupContext.setEndTs(EnvironmentEdgeManager.currentTime()); + + // set failure message + backupContext.setFailedMsg(e.getMessage()); + + // set overall backup status: failed + backupContext.setState(BackupState.FAILED); + + // compose the backup failed data + String backupFailedData = + "BackupId=" + backupContext.getBackupId() + ",startts=" + backupContext.getStartTs() + + ",failedts=" + backupContext.getEndTs() + ",failedphase=" + backupContext.getPhase() + + ",failedmessage=" + backupContext.getFailedMsg(); + LOG.error(backupFailedData); + + backupManager.updateBackupInfo(backupContext); + + // if full backup, then delete HBase snapshots if there already are snapshots taken + // and also clean up export snapshot log files if exist + if (type == BackupType.FULL) { + deleteSnapshot(conn, backupContext, conf); + cleanupExportSnapshotLog(conf); + } + + // clean up the uncompleted data at target directory if the ongoing backup has already entered + // the copy phase + // For incremental backup, DistCp logs will be cleaned with the targetDir. + cleanupTargetDir(backupContext, conf); + + LOG.info("Backup " + backupContext.getBackupId() + " failed."); + } + + /** + * Do snapshot copy. + * @param backupContext backup context + * @throws Exception exception + */ + private void snapshotCopy(BackupInfo backupContext) throws Exception { + LOG.info("Snapshot copy is starting."); + + // set overall backup phase: snapshot_copy + backupContext.setPhase(BackupPhase.SNAPSHOTCOPY); + + // call ExportSnapshot to copy files based on hbase snapshot for backup + // ExportSnapshot only support single snapshot export, need loop for multiple tables case + BackupCopyService copyService = BackupRestoreServerFactory.getBackupCopyService(conf); + + // number of snapshots matches number of tables + float numOfSnapshots = backupContext.getSnapshotNames().size(); + + LOG.debug("There are " + (int) numOfSnapshots + " snapshots to be copied."); + + for (TableName table : backupContext.getTables()) { + // Currently we simply set the sub copy tasks by counting the table snapshot number, we can + // calculate the real files' size for the percentage in the future. + // backupCopier.setSubTaskPercntgInWholeTask(1f / numOfSnapshots); + int res = 0; + String[] args = new String[4]; + args[0] = "-snapshot"; + args[1] = backupContext.getSnapshotName(table); + args[2] = "-copy-to"; + args[3] = backupContext.getBackupStatus(table).getTargetDir(); + + LOG.debug("Copy snapshot " + args[1] + " to " + args[3]); + res = copyService.copy(backupContext, backupManager, conf, BackupCopyService.Type.FULL, args); + // if one snapshot export failed, do not continue for remained snapshots + if (res != 0) { + LOG.error("Exporting Snapshot " + args[1] + " failed with return code: " + res + "."); + + throw new IOException("Failed of exporting snapshot " + args[1] + " to " + args[3] + + " with reason code " + res); + } + LOG.info("Snapshot copy " + args[1] + " finished."); + } + } + + /** + * Add manifest for the current backup. The manifest is stored within the table backup directory. + * @param backupContext The current backup context + * @throws IOException exception + * @throws BackupException exception + */ + private static void addManifest(BackupInfo backupContext, BackupManager backupManager, + BackupType type, Configuration conf) throws IOException, BackupException { + // set the overall backup phase : store manifest + backupContext.setPhase(BackupPhase.STORE_MANIFEST); + + BackupManifest manifest; + + // Since we have each table's backup in its own directory structure, + // we'll store its manifest with the table directory. + for (TableName table : backupContext.getTables()) { + manifest = new BackupManifest(backupContext, table); + ArrayList<BackupImage> ancestors = backupManager.getAncestors(backupContext, table); + for (BackupImage image : ancestors) { + manifest.addDependentImage(image); + } + + if (type == BackupType.INCREMENTAL) { + // We'll store the log timestamps for this table only in its manifest. + HashMap<TableName, HashMap<String, Long>> tableTimestampMap = + new HashMap<TableName, HashMap<String, Long>>(); + tableTimestampMap.put(table, backupContext.getIncrTimestampMap().get(table)); + manifest.setIncrTimestampMap(tableTimestampMap); + ArrayList<BackupImage> ancestorss = backupManager.getAncestors(backupContext); + for (BackupImage image : ancestorss) { + manifest.addDependentImage(image); + } + } + manifest.store(conf); + } + + // For incremental backup, we store a overall manifest in + // <backup-root-dir>/WALs/<backup-id> + // This is used when created the next incremental backup + if (type == BackupType.INCREMENTAL) { + manifest = new BackupManifest(backupContext); + // set the table region server start and end timestamps for incremental backup + manifest.setIncrTimestampMap(backupContext.getIncrTimestampMap()); + ArrayList<BackupImage> ancestors = backupManager.getAncestors(backupContext); + for (BackupImage image : ancestors) { + manifest.addDependentImage(image); + } + manifest.store(conf); + } + } + + /** + * Get backup request meta data dir as string. + * @param backupContext backup context + * @return meta data dir + */ + private static String obtainBackupMetaDataStr(BackupInfo backupContext) { + StringBuffer sb = new StringBuffer(); + sb.append("type=" + backupContext.getType() + ",tablelist="); + for (TableName table : backupContext.getTables()) { + sb.append(table + ";"); + } + if (sb.lastIndexOf(";") > 0) { + sb.delete(sb.lastIndexOf(";"), sb.lastIndexOf(";") + 1); + } + sb.append(",targetRootDir=" + backupContext.getTargetRootDir()); + + return sb.toString(); + } + + /** + * Clean up directories with prefix "_distcp_logs-", which are generated when DistCp copying + * hlogs. + * @throws IOException exception + */ + private static void cleanupDistCpLog(BackupInfo backupContext, Configuration conf) + throws IOException { + Path rootPath = new Path(backupContext.getHLogTargetDir()).getParent(); + FileSystem fs = FileSystem.get(rootPath.toUri(), conf); + FileStatus[] files = FSUtils.listStatus(fs, rootPath); + if (files == null) { + return; + } + for (FileStatus file : files) { + if (file.getPath().getName().startsWith("_distcp_logs")) { + LOG.debug("Delete log files of DistCp: " + file.getPath().getName()); + FSUtils.delete(fs, file.getPath(), true); + } + } + } + + /** + * Complete the overall backup. + * @param backupContext backup context + * @throws Exception exception + */ + static void completeBackup(final Connection conn, BackupInfo backupContext, + BackupManager backupManager, BackupType type, Configuration conf) throws IOException { + // set the complete timestamp of the overall backup + backupContext.setEndTs(EnvironmentEdgeManager.currentTime()); + // set overall backup status: complete + backupContext.setState(BackupState.COMPLETE); + backupContext.setProgress(100); + // add and store the manifest for the backup + addManifest(backupContext, backupManager, type, conf); + + // after major steps done and manifest persisted, do convert if needed for incremental backup + /* in-fly convert code here, provided by future jira */ + LOG.debug("in-fly convert code here, provided by future jira"); + + // compose the backup complete data + String backupCompleteData = + obtainBackupMetaDataStr(backupContext) + ",startts=" + backupContext.getStartTs() + + ",completets=" + backupContext.getEndTs() + ",bytescopied=" + + backupContext.getTotalBytesCopied(); + if (LOG.isDebugEnabled()) { + LOG.debug("Backup " + backupContext.getBackupId() + " finished: " + backupCompleteData); + } + backupManager.updateBackupInfo(backupContext); + + // when full backup is done: + // - delete HBase snapshot + // - clean up directories with prefix "exportSnapshot-", which are generated when exporting + // snapshots + if (type == BackupType.FULL) { + deleteSnapshot(conn, backupContext, conf); + cleanupExportSnapshotLog(conf); + } else if (type == BackupType.INCREMENTAL) { + cleanupDistCpLog(backupContext, conf); + } + + LOG.info("Backup " + backupContext.getBackupId() + " completed."); + } + + /** + * Wrap a SnapshotDescription for a target table. + * @param table table + * @return a SnapshotDescription especially for backup. + */ + static SnapshotDescription wrapSnapshotDescription(TableName tableName, String snapshotName) { + // Mock a SnapshotDescription from backupContext to call SnapshotManager function, + // Name it in the format "snapshot_<timestamp>_<table>" + HBaseProtos.SnapshotDescription.Builder builder = HBaseProtos.SnapshotDescription.newBuilder(); + builder.setTable(tableName.getNameAsString()); + builder.setName(snapshotName); + HBaseProtos.SnapshotDescription backupSnapshot = builder.build(); + + LOG.debug("Wrapped a SnapshotDescription " + backupSnapshot.getName() + + " from backupContext to request snapshot for backup."); + + return backupSnapshot; + } + + /** + * Backup request execution + * @throws IOException + */ + public void execute() throws IOException { + + try (Admin admin = conn.getAdmin();) { + + // Begin BACKUP + beginBackup(backupManager, backupContext); + String savedStartCode = null; + boolean firstBackup = false; + // do snapshot for full table backup + + savedStartCode = backupManager.readBackupStartCode(); + firstBackup = savedStartCode == null || Long.parseLong(savedStartCode) == 0L; + if (firstBackup) { + // This is our first backup. Let's put some marker on ZK so that we can hold the logs + // while we do the backup. + backupManager.writeBackupStartCode(0L); + } + // We roll log here before we do the snapshot. It is possible there is duplicate data + // in the log that is already in the snapshot. But if we do it after the snapshot, we + // could have data loss. + // A better approach is to do the roll log on each RS in the same global procedure as + // the snapshot. + LOG.info("Execute roll log procedure for full backup ..."); + + Map<String, String> props = new HashMap<String, String>(); + props.put("backupRoot", backupContext.getTargetRootDir()); + admin.execProcedure(LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_SIGNATURE, + LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_NAME, props); + + newTimestamps = backupManager.readRegionServerLastLogRollResult(); + if (firstBackup) { + // Updates registered log files + // We record ALL old WAL files as registered, because + // this is a first full backup in the system and these + // files are not needed for next incremental backup + List<String> logFiles = BackupServerUtil.getWALFilesOlderThan(conf, newTimestamps); + backupManager.recordWALFiles(logFiles); + } + + // SNAPSHOT_TABLES: + for (TableName tableName : tableList) { + String snapshotName = + "snapshot_" + Long.toString(EnvironmentEdgeManager.currentTime()) + "_" + + tableName.getNamespaceAsString() + "_" + tableName.getQualifierAsString(); + + admin.snapshot(snapshotName, tableName); + + backupContext.setSnapshotName(tableName, snapshotName); + } + + // SNAPSHOT_COPY: + // do snapshot copy + LOG.debug("snapshot copy for " + backupId); + snapshotCopy(backupContext); + // Updates incremental backup table set + backupManager.addIncrementalBackupTableSet(backupContext.getTables()); + + // BACKUP_COMPLETE: + // set overall backup status: complete. Here we make sure to complete the backup. + // After this checkpoint, even if entering cancel process, will let the backup finished + backupContext.setState(BackupState.COMPLETE); + // The table list in backupContext is good for both full backup and incremental backup. + // For incremental backup, it contains the incremental backup table set. + backupManager.writeRegionServerLogTimestamp(backupContext.getTables(), newTimestamps); + + HashMap<TableName, HashMap<String, Long>> newTableSetTimestampMap = + backupManager.readLogTimestampMap(); + + Long newStartCode = + BackupClientUtil.getMinValue(BackupServerUtil + .getRSLogTimestampMins(newTableSetTimestampMap)); + backupManager.writeBackupStartCode(newStartCode); + + // backup complete + completeBackup(conn, backupContext, backupManager, BackupType.FULL, conf); + } catch (Exception e) { + failBackup(conn, backupContext, backupManager, e, "Unexpected BackupException : ", + BackupType.FULL, conf); + throw new IOException(e); + } + + } + +}
http://git-wip-us.apache.org/repos/asf/hbase/blob/b14e2ab1/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/HBaseBackupAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/HBaseBackupAdmin.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/HBaseBackupAdmin.java new file mode 100644 index 0000000..8c63f98 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/HBaseBackupAdmin.java @@ -0,0 +1,555 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.backup.impl; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Future; + +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.backup.BackupInfo; +import org.apache.hadoop.hbase.backup.BackupInfo.BackupState; +import org.apache.hadoop.hbase.backup.BackupAdmin; +import org.apache.hadoop.hbase.backup.BackupRequest; +import org.apache.hadoop.hbase.backup.BackupType; +import org.apache.hadoop.hbase.backup.HBackupFileSystem; +import org.apache.hadoop.hbase.backup.RestoreRequest; +import org.apache.hadoop.hbase.backup.util.BackupClientUtil; +import org.apache.hadoop.hbase.backup.util.BackupSet; +import org.apache.hadoop.hbase.backup.util.RestoreServerUtil; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; + +import com.google.common.collect.Lists; + +/** + * The administrative API implementation for HBase Backup . Obtain an instance from + * an {@link Admin#getBackupAdmin()} and call {@link #close()} afterwards. + * <p>BackupAdmin can be used to create backups, restore data from backups and for + * other backup-related operations. + * + * @see Admin + * @since 2.0 + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving + +public class HBaseBackupAdmin implements BackupAdmin { + private static final Log LOG = LogFactory.getLog(HBaseBackupAdmin.class); + + private final Connection conn; + + public HBaseBackupAdmin(Connection conn) { + this.conn = conn; + } + + @Override + public void close() throws IOException { + } + + @Override + public BackupInfo getBackupInfo(String backupId) throws IOException { + BackupInfo backupInfo = null; + try (final BackupSystemTable table = new BackupSystemTable(conn)) { + backupInfo = table.readBackupInfo(backupId); + return backupInfo; + } + } + + @Override + public int getProgress(String backupId) throws IOException { + BackupInfo backupInfo = null; + try (final BackupSystemTable table = new BackupSystemTable(conn)) { + if (backupId == null) { + ArrayList<BackupInfo> recentSessions = table.getBackupContexts(BackupState.RUNNING); + if (recentSessions.isEmpty()) { + LOG.warn("No ongoing sessions found."); + return -1; + } + // else show status for ongoing session + // must be one maximum + return recentSessions.get(0).getProgress(); + } else { + + backupInfo = table.readBackupInfo(backupId); + if (backupInfo != null) { + return backupInfo.getProgress(); + } else { + LOG.warn("No information found for backupID=" + backupId); + return -1; + } + } + } + } + + @Override + public int deleteBackups(String[] backupIds) throws IOException { + // TODO: requires FT, failure will leave system + // in non-consistent state + // see HBASE-15227 + + int totalDeleted = 0; + Map<String, HashSet<TableName>> allTablesMap = new HashMap<String, HashSet<TableName>>(); + + try (final BackupSystemTable sysTable = new BackupSystemTable(conn)) { + for (int i = 0; i < backupIds.length; i++) { + BackupInfo info = sysTable.readBackupInfo(backupIds[i]); + if (info != null) { + String rootDir = info.getTargetRootDir(); + HashSet<TableName> allTables = allTablesMap.get(rootDir); + if (allTables == null) { + allTables = new HashSet<TableName>(); + allTablesMap.put(rootDir, allTables); + } + allTables.addAll(info.getTableNames()); + totalDeleted += deleteBackup(backupIds[i], sysTable); + } + } + finalizeDelete(allTablesMap, sysTable); + } + return totalDeleted; + } + + /** + * Updates incremental backup set for every backupRoot + * @param tablesMap - Map [backupRoot: Set<TableName>] + * @param table - backup system table + * @throws IOException + */ + + private void finalizeDelete(Map<String, HashSet<TableName>> tablesMap, BackupSystemTable table) + throws IOException { + for (String backupRoot : tablesMap.keySet()) { + Set<TableName> incrTableSet = table.getIncrementalBackupTableSet(backupRoot); + Map<TableName, ArrayList<BackupInfo>> tableMap = + table.getBackupHistoryForTableSet(incrTableSet, backupRoot); + for(Map.Entry<TableName, ArrayList<BackupInfo>> entry: tableMap.entrySet()) { + if(entry.getValue() == null) { + // No more backups for a table + incrTableSet.remove(entry.getKey()); + } + } + if (!incrTableSet.isEmpty()) { + table.addIncrementalBackupTableSet(incrTableSet, backupRoot); + } else { // empty + table.deleteIncrementalBackupTableSet(backupRoot); + } + } + } + + /** + * Delete single backup and all related backups + * Algorithm: + * + * Backup type: FULL or INCREMENTAL + * Is this last backup session for table T: YES or NO + * For every table T from table list 'tables': + * if(FULL, YES) deletes only physical data (PD) + * if(FULL, NO), deletes PD, scans all newer backups and removes T from backupInfo, until + * we either reach the most recent backup for T in the system or FULL backup which + * includes T + * if(INCREMENTAL, YES) deletes only physical data (PD) + * if(INCREMENTAL, NO) deletes physical data and for table T scans all backup images + * between last FULL backup, which is older than the backup being deleted and the next + * FULL backup (if exists) or last one for a particular table T and removes T from list + * of backup tables. + * @param backupId - backup id + * @param sysTable - backup system table + * @return total - number of deleted backup images + * @throws IOException + */ + private int deleteBackup(String backupId, BackupSystemTable sysTable) throws IOException { + + BackupInfo backupInfo = sysTable.readBackupInfo(backupId); + + int totalDeleted = 0; + if (backupInfo != null) { + LOG.info("Deleting backup " + backupInfo.getBackupId() + " ..."); + BackupClientUtil.cleanupBackupData(backupInfo, conn.getConfiguration()); + // List of tables in this backup; + List<TableName> tables = backupInfo.getTableNames(); + long startTime = backupInfo.getStartTs(); + for (TableName tn : tables) { + boolean isLastBackupSession = isLastBackupSession(sysTable, tn, startTime); + if (isLastBackupSession) { + continue; + } + // else + List<BackupInfo> affectedBackups = getAffectedBackupInfos(backupInfo, tn, sysTable); + for (BackupInfo info : affectedBackups) { + if (info.equals(backupInfo)) { + continue; + } + removeTableFromBackupImage(info, tn, sysTable); + } + } + LOG.debug("Delete backup info "+ backupInfo.getBackupId()); + + sysTable.deleteBackupInfo(backupInfo.getBackupId()); + LOG.info("Delete backup " + backupInfo.getBackupId() + " completed."); + totalDeleted++; + } else { + LOG.warn("Delete backup failed: no information found for backupID=" + backupId); + } + return totalDeleted; + } + + private void removeTableFromBackupImage(BackupInfo info, TableName tn, BackupSystemTable sysTable) + throws IOException { + List<TableName> tables = info.getTableNames(); + LOG.debug("Remove "+ tn +" from " + info.getBackupId() + " tables=" + + info.getTableListAsString()); + if (tables.contains(tn)) { + tables.remove(tn); + + if (tables.isEmpty()) { + LOG.debug("Delete backup info "+ info.getBackupId()); + + sysTable.deleteBackupInfo(info.getBackupId()); + BackupClientUtil.cleanupBackupData(info, conn.getConfiguration()); + } else { + info.setTables(tables); + sysTable.updateBackupInfo(info); + // Now, clean up directory for table + cleanupBackupDir(info, tn, conn.getConfiguration()); + } + } + } + + private List<BackupInfo> getAffectedBackupInfos(BackupInfo backupInfo, TableName tn, + BackupSystemTable table) throws IOException { + LOG.debug("GetAffectedBackupInfos for: " + backupInfo.getBackupId() + " table=" + tn); + long ts = backupInfo.getStartTs(); + List<BackupInfo> list = new ArrayList<BackupInfo>(); + List<BackupInfo> history = table.getBackupHistory(backupInfo.getTargetRootDir()); + // Scan from most recent to backupInfo + // break when backupInfo reached + for (BackupInfo info : history) { + if (info.getStartTs() == ts) { + break; + } + List<TableName> tables = info.getTableNames(); + if (tables.contains(tn)) { + BackupType bt = info.getType(); + if (bt == BackupType.FULL) { + // Clear list if we encounter FULL backup + list.clear(); + } else { + LOG.debug("GetAffectedBackupInfos for: " + backupInfo.getBackupId() + " table=" + tn + + " added " + info.getBackupId() + " tables=" + info.getTableListAsString()); + list.add(info); + } + } + } + return list; + } + + + + /** + * Clean up the data at target directory + * @throws IOException + */ + private void cleanupBackupDir(BackupInfo backupInfo, TableName table, Configuration conf) + throws IOException { + try { + // clean up the data at target directory + String targetDir = backupInfo.getTargetRootDir(); + if (targetDir == null) { + LOG.warn("No target directory specified for " + backupInfo.getBackupId()); + return; + } + + FileSystem outputFs = FileSystem.get(new Path(backupInfo.getTargetRootDir()).toUri(), conf); + + Path targetDirPath = + new Path(BackupClientUtil.getTableBackupDir(backupInfo.getTargetRootDir(), + backupInfo.getBackupId(), table)); + if (outputFs.delete(targetDirPath, true)) { + LOG.info("Cleaning up backup data at " + targetDirPath.toString() + " done."); + } else { + LOG.info("No data has been found in " + targetDirPath.toString() + "."); + } + + } catch (IOException e1) { + LOG.error("Cleaning up backup data of " + backupInfo.getBackupId() + " for table " + table + + "at " + backupInfo.getTargetRootDir() + " failed due to " + e1.getMessage() + "."); + throw e1; + } + } + + private boolean isLastBackupSession(BackupSystemTable table, TableName tn, long startTime) + throws IOException { + List<BackupInfo> history = table.getBackupHistory(); + for (BackupInfo info : history) { + List<TableName> tables = info.getTableNames(); + if (!tables.contains(tn)) { + continue; + } + if (info.getStartTs() <= startTime) { + return true; + } else { + return false; + } + } + return false; + } + + @Override + public List<BackupInfo> getHistory(int n) throws IOException { + try (final BackupSystemTable table = new BackupSystemTable(conn)) { + List<BackupInfo> history = table.getBackupHistory(); + if (history.size() <= n) return history; + List<BackupInfo> list = new ArrayList<BackupInfo>(); + for (int i = 0; i < n; i++) { + list.add(history.get(i)); + } + return list; + } + } + + @Override + public List<BackupInfo> getHistory(int n, BackupInfo.Filter ... filters) throws IOException { + if (filters.length == 0) return getHistory(n); + try (final BackupSystemTable table = new BackupSystemTable(conn)) { + List<BackupInfo> history = table.getBackupHistory(); + List<BackupInfo> result = new ArrayList<BackupInfo>(); + for(BackupInfo bi: history) { + if(result.size() == n) break; + boolean passed = true; + for(int i=0; i < filters.length; i++) { + if(!filters[i].apply(bi)) { + passed = false; + break; + } + } + if(passed) { + result.add(bi); + } + } + return result; + } + } + + @Override + public List<BackupSet> listBackupSets() throws IOException { + try (final BackupSystemTable table = new BackupSystemTable(conn)) { + List<String> list = table.listBackupSets(); + List<BackupSet> bslist = new ArrayList<BackupSet>(); + for (String s : list) { + List<TableName> tables = table.describeBackupSet(s); + if (tables != null) { + bslist.add(new BackupSet(s, tables)); + } + } + return bslist; + } + } + + @Override + public BackupSet getBackupSet(String name) throws IOException { + try (final BackupSystemTable table = new BackupSystemTable(conn)) { + List<TableName> list = table.describeBackupSet(name); + if (list == null) return null; + return new BackupSet(name, list); + } + } + + @Override + public boolean deleteBackupSet(String name) throws IOException { + try (final BackupSystemTable table = new BackupSystemTable(conn)) { + if (table.describeBackupSet(name) == null) { + return false; + } + table.deleteBackupSet(name); + return true; + } + } + + @Override + public void addToBackupSet(String name, TableName[] tables) throws IOException { + String[] tableNames = new String[tables.length]; + try (final BackupSystemTable table = new BackupSystemTable(conn); + final Admin admin = conn.getAdmin();) { + for (int i = 0; i < tables.length; i++) { + tableNames[i] = tables[i].getNameAsString(); + if (!admin.tableExists(TableName.valueOf(tableNames[i]))) { + throw new IOException("Cannot add " + tableNames[i] + " because it doesn't exist"); + } + } + table.addToBackupSet(name, tableNames); + LOG.info("Added tables [" + StringUtils.join(tableNames, " ") + "] to '" + name + + "' backup set"); + } + } + + @Override + public void removeFromBackupSet(String name, String[] tables) throws IOException { + LOG.info("Removing tables [" + StringUtils.join(tables, " ") + "] from '" + name + "'"); + try (final BackupSystemTable table = new BackupSystemTable(conn)) { + table.removeFromBackupSet(name, tables); + LOG.info("Removing tables [" + StringUtils.join(tables, " ") + "] from '" + name + + "' completed."); + } + } + + @Override + public void restore(RestoreRequest request) throws IOException { + if (request.isCheck()) { + HashMap<TableName, BackupManifest> backupManifestMap = new HashMap<>(); + // check and load backup image manifest for the tables + Path rootPath = new Path(request.getBackupRootDir()); + String backupId = request.getBackupId(); + TableName[] sTableArray = request.getFromTables(); + HBackupFileSystem.checkImageManifestExist(backupManifestMap, + sTableArray, conn.getConfiguration(), rootPath, backupId); + + // Check and validate the backup image and its dependencies + + if (RestoreServerUtil.validate(backupManifestMap, conn.getConfiguration())) { + LOG.info("Checking backup images: ok"); + } else { + String errMsg = "Some dependencies are missing for restore"; + LOG.error(errMsg); + throw new IOException(errMsg); + } + + } + // Execute restore request + new RestoreTablesClient(conn, request).execute(); + } + + @Override + public Future<Void> restoreAsync(RestoreRequest request) throws IOException { + // TBI + return null; + } + + @Override + public String backupTables(final BackupRequest request) throws IOException { + String setName = request.getBackupSetName(); + BackupType type = request.getBackupType(); + String targetRootDir = request.getTargetRootDir(); + List<TableName> tableList = request.getTableList(); + + String backupId = + (setName == null || setName.length() == 0 ? BackupRestoreConstants.BACKUPID_PREFIX + : setName + "_") + EnvironmentEdgeManager.currentTime(); + if (type == BackupType.INCREMENTAL) { + Set<TableName> incrTableSet = null; + try (BackupSystemTable table = new BackupSystemTable(conn)) { + incrTableSet = table.getIncrementalBackupTableSet(targetRootDir); + } + + if (incrTableSet.isEmpty()) { + System.err.println("Incremental backup table set contains no table.\n" + + "Use 'backup create full' or 'backup stop' to \n " + + "change the tables covered by incremental backup."); + throw new IOException("No table covered by incremental backup."); + } + + tableList.removeAll(incrTableSet); + if (!tableList.isEmpty()) { + String extraTables = StringUtils.join(tableList, ","); + System.err.println("Some tables (" + extraTables + ") haven't gone through full backup"); + throw new IOException("Perform full backup on " + extraTables + " first, " + + "then retry the command"); + } + System.out.println("Incremental backup for the following table set: " + incrTableSet); + tableList = Lists.newArrayList(incrTableSet); + } + if (tableList != null && !tableList.isEmpty()) { + for (TableName table : tableList) { + String targetTableBackupDir = + HBackupFileSystem.getTableBackupDir(targetRootDir, backupId, table); + Path targetTableBackupDirPath = new Path(targetTableBackupDir); + FileSystem outputFs = + FileSystem.get(targetTableBackupDirPath.toUri(), conn.getConfiguration()); + if (outputFs.exists(targetTableBackupDirPath)) { + throw new IOException("Target backup directory " + targetTableBackupDir + + " exists already."); + } + } + ArrayList<TableName> nonExistingTableList = null; + try (Admin admin = conn.getAdmin();) { + for (TableName tableName : tableList) { + if (!admin.tableExists(tableName)) { + if (nonExistingTableList == null) { + nonExistingTableList = new ArrayList<>(); + } + nonExistingTableList.add(tableName); + } + } + } + if (nonExistingTableList != null) { + if (type == BackupType.INCREMENTAL) { + System.err.println("Incremental backup table set contains non-exising table: " + + nonExistingTableList); + // Update incremental backup set + tableList = excludeNonExistingTables(tableList, nonExistingTableList); + } else { + // Throw exception only in full mode - we try to backup non-existing table + throw new IOException("Non-existing tables found in the table list: " + + nonExistingTableList); + } + } + } + + // update table list + request.setTableList(tableList); + + if (type == BackupType.FULL) { + new FullTableBackupClient(conn, backupId, request).execute(); + } else { + new IncrementalTableBackupClient(conn, backupId, request).execute(); + } + return backupId; + } + + + private List<TableName> excludeNonExistingTables(List<TableName> tableList, + List<TableName> nonExistingTableList) { + + for (TableName table : nonExistingTableList) { + tableList.remove(table); + } + return tableList; + } + + @Override + public Future<String> backupTablesAsync(final BackupRequest userRequest) throws IOException { + // TBI + return null; + } + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/b14e2ab1/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalBackupManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalBackupManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalBackupManager.java index 8f6aeb8..be5fd23 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalBackupManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalBackupManager.java @@ -23,7 +23,6 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.List; -import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -35,6 +34,7 @@ import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.backup.BackupInfo; +import org.apache.hadoop.hbase.backup.impl.BackupSystemTable.WALItem; import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager; import org.apache.hadoop.hbase.backup.util.BackupClientUtil; import org.apache.hadoop.hbase.backup.util.BackupServerUtil; @@ -42,13 +42,8 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.master.MasterServices; -import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil; -import org.apache.hadoop.hbase.procedure.MasterProcedureManager; -import org.apache.hadoop.hbase.procedure.ProcedureUtil; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.wal.DefaultWALProvider; -import org.apache.hadoop.hbase.backup.impl.BackupSystemTable.WALItem; /** * After a full backup was created, the incremental backup will only store the changes made @@ -64,12 +59,10 @@ public class IncrementalBackupManager { // parent manager private final BackupManager backupManager; private final Configuration conf; - private final Connection conn; public IncrementalBackupManager(BackupManager bm) { this.backupManager = bm; this.conf = bm.getConf(); - this.conn = bm.getConnection(); } /** @@ -80,7 +73,7 @@ public class IncrementalBackupManager { * @return The new HashMap of RS log timestamps after the log roll for this incremental backup. * @throws IOException exception */ - public HashMap<String, Long> getIncrBackupLogFileList(MasterServices svc,BackupInfo backupContext) + public HashMap<String, Long> getIncrBackupLogFileList(Connection conn,BackupInfo backupContext) throws IOException { List<String> logList; HashMap<String, Long> newTimestamps; @@ -109,19 +102,13 @@ public class IncrementalBackupManager { LOG.info("Execute roll log procedure for incremental backup ..."); HashMap<String, String> props = new HashMap<String, String>(); props.put("backupRoot", backupContext.getTargetRootDir()); - MasterProcedureManager mpm = svc.getMasterProcedureManagerHost() - .getProcedureManager(LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_SIGNATURE); - long waitTime = ProcedureUtil.execProcedure(mpm, - LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_SIGNATURE, + + try(Admin admin = conn.getAdmin();) { + + admin.execProcedure(LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_SIGNATURE, LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_NAME, props); - ProcedureUtil.waitForProcedure(mpm, - LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_SIGNATURE, - LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_NAME, props, waitTime, - conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, - HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER), - conf.getLong(HConstants.HBASE_CLIENT_PAUSE, - HConstants.DEFAULT_HBASE_CLIENT_PAUSE)); + } newTimestamps = backupManager.readRegionServerLastLogRollResult(); logList = getLogFilesForNewBackup(previousTimestampMins, newTimestamps, conf, savedStartCode); http://git-wip-us.apache.org/repos/asf/hbase/blob/b14e2ab1/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java new file mode 100644 index 0000000..d9610a2 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java @@ -0,0 +1,235 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.backup.impl; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.backup.BackupCopyService; +import org.apache.hadoop.hbase.backup.BackupInfo; +import org.apache.hadoop.hbase.backup.BackupInfo.BackupPhase; +import org.apache.hadoop.hbase.backup.BackupInfo.BackupState; +import org.apache.hadoop.hbase.backup.BackupRequest; +import org.apache.hadoop.hbase.backup.BackupRestoreServerFactory; +import org.apache.hadoop.hbase.backup.BackupType; +import org.apache.hadoop.hbase.backup.util.BackupClientUtil; +import org.apache.hadoop.hbase.backup.util.BackupServerUtil; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Connection; + +@InterfaceAudience.Private +public class IncrementalTableBackupClient { + private static final Log LOG = LogFactory.getLog(IncrementalTableBackupClient.class); + + private Configuration conf; + private Connection conn; + //private String backupId; + HashMap<String, Long> newTimestamps = null; + + private String backupId; + private BackupManager backupManager; + private BackupInfo backupContext; + + public IncrementalTableBackupClient() { + // Required by the Procedure framework to create the procedure on replay + } + + public IncrementalTableBackupClient(final Connection conn, final String backupId, + BackupRequest request) + throws IOException { + + this.conn = conn; + this.conf = conn.getConfiguration(); + backupManager = new BackupManager(conn, conf); + this.backupId = backupId; + backupContext = + backupManager.createBackupContext(backupId, BackupType.INCREMENTAL, request.getTableList(), + request.getTargetRootDir(), request.getWorkers(), (int) request.getBandwidth()); + } + + private List<String> filterMissingFiles(List<String> incrBackupFileList) throws IOException { + FileSystem fs = FileSystem.get(conf); + List<String> list = new ArrayList<String>(); + for (String file : incrBackupFileList) { + if (fs.exists(new Path(file))) { + list.add(file); + } else { + LOG.warn("Can't find file: " + file); + } + } + return list; + } + + private List<String> getMissingFiles(List<String> incrBackupFileList) throws IOException { + FileSystem fs = FileSystem.get(conf); + List<String> list = new ArrayList<String>(); + for (String file : incrBackupFileList) { + if (!fs.exists(new Path(file))) { + list.add(file); + } + } + return list; + + } + + /** + * Do incremental copy. + * @param backupContext backup context + */ + private void incrementalCopy(BackupInfo backupContext) throws Exception { + + LOG.info("Incremental copy is starting."); + // set overall backup phase: incremental_copy + backupContext.setPhase(BackupPhase.INCREMENTAL_COPY); + // get incremental backup file list and prepare parms for DistCp + List<String> incrBackupFileList = backupContext.getIncrBackupFileList(); + // filter missing files out (they have been copied by previous backups) + incrBackupFileList = filterMissingFiles(incrBackupFileList); + String[] strArr = incrBackupFileList.toArray(new String[incrBackupFileList.size() + 1]); + strArr[strArr.length - 1] = backupContext.getHLogTargetDir(); + + BackupCopyService copyService = BackupRestoreServerFactory.getBackupCopyService(conf); + int counter = 0; + int MAX_ITERAIONS = 2; + while (counter++ < MAX_ITERAIONS) { + // We run DistCp maximum 2 times + // If it fails on a second time, we throw Exception + int res = + copyService.copy(backupContext, backupManager, conf, BackupCopyService.Type.INCREMENTAL, + strArr); + + if (res != 0) { + LOG.error("Copy incremental log files failed with return code: " + res + "."); + throw new IOException("Failed of Hadoop Distributed Copy from " + + StringUtils.join(incrBackupFileList, ",") + " to " + backupContext.getHLogTargetDir()); + } + List<String> missingFiles = getMissingFiles(incrBackupFileList); + + if (missingFiles.isEmpty()) { + break; + } else { + // Repeat DistCp, some files have been moved from WALs to oldWALs during previous run + // update backupContext and strAttr + if (counter == MAX_ITERAIONS) { + String msg = + "DistCp could not finish the following files: " + StringUtils.join(missingFiles, ","); + LOG.error(msg); + throw new IOException(msg); + } + List<String> converted = convertFilesFromWALtoOldWAL(missingFiles); + incrBackupFileList.removeAll(missingFiles); + incrBackupFileList.addAll(converted); + backupContext.setIncrBackupFileList(incrBackupFileList); + + // Run DistCp only for missing files (which have been moved from WALs to oldWALs + // during previous run) + strArr = converted.toArray(new String[converted.size() + 1]); + strArr[strArr.length - 1] = backupContext.getHLogTargetDir(); + } + } + + LOG.info("Incremental copy from " + StringUtils.join(incrBackupFileList, ",") + " to " + + backupContext.getHLogTargetDir() + " finished."); + } + + private List<String> convertFilesFromWALtoOldWAL(List<String> missingFiles) throws IOException { + List<String> list = new ArrayList<String>(); + for (String path : missingFiles) { + if (path.indexOf(Path.SEPARATOR + HConstants.HREGION_LOGDIR_NAME) < 0) { + LOG.error("Copy incremental log files failed, file is missing : " + path); + throw new IOException("Failed of Hadoop Distributed Copy to " + + backupContext.getHLogTargetDir() + ", file is missing " + path); + } + list.add(path.replace(Path.SEPARATOR + HConstants.HREGION_LOGDIR_NAME, Path.SEPARATOR + + HConstants.HREGION_OLDLOGDIR_NAME)); + } + return list; + } + + public void execute() throws IOException { + + // case PREPARE_INCREMENTAL: + FullTableBackupClient.beginBackup(backupManager, backupContext); + LOG.debug("For incremental backup, current table set is " + + backupManager.getIncrementalBackupTableSet()); + try { + IncrementalBackupManager incrBackupManager = new IncrementalBackupManager(backupManager); + + newTimestamps = incrBackupManager.getIncrBackupLogFileList(conn, backupContext); + } catch (Exception e) { + // fail the overall backup and return + FullTableBackupClient.failBackup(conn, backupContext, backupManager, e, + "Unexpected Exception : ", BackupType.INCREMENTAL, conf); + } + + // case INCREMENTAL_COPY: + try { + // copy out the table and region info files for each table + BackupServerUtil.copyTableRegionInfo(conn, backupContext, conf); + incrementalCopy(backupContext); + // Save list of WAL files copied + backupManager.recordWALFiles(backupContext.getIncrBackupFileList()); + } catch (Exception e) { + String msg = "Unexpected exception in incremental-backup: incremental copy " + backupId; + // fail the overall backup and return + FullTableBackupClient.failBackup(conn, backupContext, backupManager, e, msg, + BackupType.INCREMENTAL, conf); + } + // case INCR_BACKUP_COMPLETE: + // set overall backup status: complete. Here we make sure to complete the backup. + // After this checkpoint, even if entering cancel process, will let the backup finished + try { + backupContext.setState(BackupState.COMPLETE); + // Set the previousTimestampMap which is before this current log roll to the manifest. + HashMap<TableName, HashMap<String, Long>> previousTimestampMap = + backupManager.readLogTimestampMap(); + backupContext.setIncrTimestampMap(previousTimestampMap); + + // The table list in backupContext is good for both full backup and incremental backup. + // For incremental backup, it contains the incremental backup table set. + backupManager.writeRegionServerLogTimestamp(backupContext.getTables(), newTimestamps); + + HashMap<TableName, HashMap<String, Long>> newTableSetTimestampMap = + backupManager.readLogTimestampMap(); + + Long newStartCode = + BackupClientUtil.getMinValue(BackupServerUtil + .getRSLogTimestampMins(newTableSetTimestampMap)); + backupManager.writeBackupStartCode(newStartCode); + // backup complete + FullTableBackupClient.completeBackup(conn, backupContext, backupManager, + BackupType.INCREMENTAL, conf); + + } catch (IOException e) { + FullTableBackupClient.failBackup(conn, backupContext, backupManager, e, + "Unexpected Exception : ", BackupType.INCREMENTAL, conf); + } + } + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/b14e2ab1/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java new file mode 100644 index 0000000..91f2d68 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java @@ -0,0 +1,236 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.backup.impl; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.TreeSet; + +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.backup.BackupType; +import org.apache.hadoop.hbase.backup.HBackupFileSystem; +import org.apache.hadoop.hbase.backup.RestoreRequest; +import org.apache.hadoop.hbase.backup.impl.BackupManifest; +import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage; +import org.apache.hadoop.hbase.backup.util.RestoreServerUtil; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; + +@InterfaceAudience.Private +public class RestoreTablesClient { + private static final Log LOG = LogFactory.getLog(RestoreTablesClient.class); + + private Configuration conf; + private Connection conn; + private String backupId; + private TableName[] sTableArray; + private TableName[] tTableArray; + private String targetRootDir; + private boolean isOverwrite; + + public RestoreTablesClient() { + // Required by the Procedure framework to create the procedure on replay + } + + public RestoreTablesClient(Connection conn, RestoreRequest request) + throws IOException { + this.targetRootDir = request.getBackupRootDir(); + this.backupId = request.getBackupId(); + this.sTableArray = request.getFromTables(); + this.tTableArray = request.getToTables(); + if (tTableArray == null || tTableArray.length == 0) { + this.tTableArray = sTableArray; + } + this.isOverwrite = request.isOverwrite(); + this.conn = conn; + this.conf = conn.getConfiguration(); + + } + + /** + * Validate target Tables + * @param conn connection + * @param mgr table state manager + * @param tTableArray: target tables + * @param isOverwrite overwrite existing table + * @throws IOException exception + */ + private void checkTargetTables(TableName[] tTableArray, boolean isOverwrite) throws IOException { + ArrayList<TableName> existTableList = new ArrayList<>(); + ArrayList<TableName> disabledTableList = new ArrayList<>(); + + // check if the tables already exist + try (Admin admin = conn.getAdmin();) { + for (TableName tableName : tTableArray) { + if (admin.tableExists(tableName)) { + existTableList.add(tableName); + if (admin.isTableDisabled(tableName)) { + disabledTableList.add(tableName); + } + } else { + LOG.info("HBase table " + tableName + + " does not exist. It will be created during restore process"); + } + } + } + + if (existTableList.size() > 0) { + if (!isOverwrite) { + LOG.error("Existing table (" + + existTableList + + ") found in the restore target, please add " + + "\"-overwrite\" option in the command if you mean to restore to these existing tables"); + throw new IOException("Existing table found in target while no \"-overwrite\" " + + "option found"); + } else { + if (disabledTableList.size() > 0) { + LOG.error("Found offline table in the restore target, " + + "please enable them before restore with \"-overwrite\" option"); + LOG.info("Offline table list in restore target: " + disabledTableList); + throw new IOException( + "Found offline table in the target when restore with \"-overwrite\" option"); + } + } + } + } + + /** + * Restore operation handle each backupImage in array + * @param svc: master services + * @param images: array BackupImage + * @param sTable: table to be restored + * @param tTable: table to be restored to + * @param truncateIfExists: truncate table + * @throws IOException exception + */ + + private void restoreImages(BackupImage[] images, TableName sTable, TableName tTable, + boolean truncateIfExists) throws IOException { + + // First image MUST be image of a FULL backup + BackupImage image = images[0]; + String rootDir = image.getRootDir(); + String backupId = image.getBackupId(); + Path backupRoot = new Path(rootDir); + RestoreServerUtil restoreTool = new RestoreServerUtil(conf, backupRoot, backupId); + Path tableBackupPath = HBackupFileSystem.getTableBackupPath(sTable, backupRoot, backupId); + String lastIncrBackupId = images.length == 1 ? null : images[images.length - 1].getBackupId(); + // We need hFS only for full restore (see the code) + BackupManifest manifest = HBackupFileSystem.getManifest(sTable, conf, backupRoot, backupId); + if (manifest.getType() == BackupType.FULL) { + LOG.info("Restoring '" + sTable + "' to '" + tTable + "' from full" + " backup image " + + tableBackupPath.toString()); + restoreTool.fullRestoreTable(conn, tableBackupPath, sTable, tTable, truncateIfExists, + lastIncrBackupId); + } else { // incremental Backup + throw new IOException("Unexpected backup type " + image.getType()); + } + + if (images.length == 1) { + // full backup restore done + return; + } + + List<Path> dirList = new ArrayList<Path>(); + // add full backup path + // full backup path comes first + for (int i = 1; i < images.length; i++) { + BackupImage im = images[i]; + String logBackupDir = HBackupFileSystem.getLogBackupDir(im.getRootDir(), im.getBackupId()); + dirList.add(new Path(logBackupDir)); + } + + String dirs = StringUtils.join(dirList, ","); + LOG.info("Restoring '" + sTable + "' to '" + tTable + "' from log dirs: " + dirs); + Path[] paths = new Path[dirList.size()]; + dirList.toArray(paths); + restoreTool.incrementalRestoreTable(conn, tableBackupPath, paths, new TableName[] { sTable }, + new TableName[] { tTable }, lastIncrBackupId); + LOG.info(sTable + " has been successfully restored to " + tTable); + + } + + /** + * Restore operation. Stage 2: resolved Backup Image dependency + * @param backupManifestMap : tableName, Manifest + * @param sTableArray The array of tables to be restored + * @param tTableArray The array of mapping tables to restore to + * @return set of BackupImages restored + * @throws IOException exception + */ + private void restore(HashMap<TableName, BackupManifest> backupManifestMap, + TableName[] sTableArray, TableName[] tTableArray, boolean isOverwrite) throws IOException { + TreeSet<BackupImage> restoreImageSet = new TreeSet<BackupImage>(); + boolean truncateIfExists = isOverwrite; + try { + for (int i = 0; i < sTableArray.length; i++) { + TableName table = sTableArray[i]; + BackupManifest manifest = backupManifestMap.get(table); + // Get the image list of this backup for restore in time order from old + // to new. + List<BackupImage> list = new ArrayList<BackupImage>(); + list.add(manifest.getBackupImage()); + TreeSet<BackupImage> set = new TreeSet<BackupImage>(list); + List<BackupImage> depList = manifest.getDependentListByTable(table); + set.addAll(depList); + BackupImage[] arr = new BackupImage[set.size()]; + set.toArray(arr); + restoreImages(arr, table, tTableArray[i], truncateIfExists); + restoreImageSet.addAll(list); + if (restoreImageSet != null && !restoreImageSet.isEmpty()) { + LOG.info("Restore includes the following image(s):"); + for (BackupImage image : restoreImageSet) { + LOG.info("Backup: " + + image.getBackupId() + + " " + + HBackupFileSystem.getTableBackupDir(image.getRootDir(), image.getBackupId(), + table)); + } + } + } + } catch (Exception e) { + LOG.error("Failed", e); + throw new IOException(e); + } + LOG.debug("restoreStage finished"); + } + + public void execute() throws IOException { + + // case VALIDATION: + // check the target tables + checkTargetTables(tTableArray, isOverwrite); + // case RESTORE_IMAGES: + HashMap<TableName, BackupManifest> backupManifestMap = new HashMap<>(); + // check and load backup image manifest for the tables + Path rootPath = new Path(targetRootDir); + HBackupFileSystem.checkImageManifestExist(backupManifestMap, sTableArray, conf, rootPath, + backupId); + restore(backupManifestMap, sTableArray, tTableArray, isOverwrite); + } + +}