http://git-wip-us.apache.org/repos/asf/hbase/blob/2dda3712/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalBackupManager.java ---------------------------------------------------------------------- diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalBackupManager.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalBackupManager.java new file mode 100644 index 0000000..6330899 --- /dev/null +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalBackupManager.java @@ -0,0 +1,387 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.backup.impl; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.backup.impl.BackupSystemTable.WALItem; +import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager; +import org.apache.hadoop.hbase.backup.util.BackupUtils; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; + +/** + * After a full backup was created, the incremental backup will only store the changes made after + * the last full or incremental backup. Creating the backup copies the logfiles in .logs and + * .oldlogs since the last backup timestamp. + */ +@InterfaceAudience.Private +public class IncrementalBackupManager extends BackupManager { + public static final Log LOG = LogFactory.getLog(IncrementalBackupManager.class); + + public IncrementalBackupManager(Connection conn, Configuration conf) throws IOException { + super(conn, conf); + } + + /** + * Obtain the list of logs that need to be copied out for this incremental backup. The list is set + * in BackupInfo. + * @return The new HashMap of RS log time stamps after the log roll for this incremental backup. + * @throws IOException exception + */ + public HashMap<String, Long> getIncrBackupLogFileMap() + throws IOException { + List<String> logList; + HashMap<String, Long> newTimestamps; + HashMap<String, Long> previousTimestampMins; + + String savedStartCode = readBackupStartCode(); + + // key: tableName + // value: <RegionServer,PreviousTimeStamp> + HashMap<TableName, HashMap<String, Long>> previousTimestampMap = readLogTimestampMap(); + + previousTimestampMins = BackupUtils.getRSLogTimestampMins(previousTimestampMap); + + if (LOG.isDebugEnabled()) { + LOG.debug("StartCode " + savedStartCode + "for backupID " + backupInfo.getBackupId()); + } + // get all new log files from .logs and .oldlogs after last TS and before new timestamp + if (savedStartCode == null || previousTimestampMins == null + || previousTimestampMins.isEmpty()) { + throw new IOException( + "Cannot read any previous back up timestamps from backup system table. " + + "In order to create an incremental backup, at least one full backup is needed."); + } + + LOG.info("Execute roll log procedure for incremental backup ..."); + HashMap<String, String> props = new HashMap<String, String>(); + props.put("backupRoot", backupInfo.getBackupRootDir()); + + try (Admin admin = conn.getAdmin();) { + + admin.execProcedure(LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_SIGNATURE, + LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_NAME, props); + + } + newTimestamps = readRegionServerLastLogRollResult(); + + logList = getLogFilesForNewBackup(previousTimestampMins, newTimestamps, conf, savedStartCode); + List<WALItem> logFromSystemTable = + getLogFilesFromBackupSystem(previousTimestampMins, newTimestamps, getBackupInfo() + .getBackupRootDir()); + logList = excludeAlreadyBackedUpWALs(logList, logFromSystemTable); + backupInfo.setIncrBackupFileList(logList); + + return newTimestamps; + } + + /** + * Get list of WAL files eligible for incremental backup + * @return list of WAL files + * @throws IOException + */ + public List<String> getIncrBackupLogFileList() + throws IOException { + List<String> logList; + HashMap<String, Long> newTimestamps; + HashMap<String, Long> previousTimestampMins; + + String savedStartCode = readBackupStartCode(); + + // key: tableName + // value: <RegionServer,PreviousTimeStamp> + HashMap<TableName, HashMap<String, Long>> previousTimestampMap = readLogTimestampMap(); + + previousTimestampMins = BackupUtils.getRSLogTimestampMins(previousTimestampMap); + + if (LOG.isDebugEnabled()) { + LOG.debug("StartCode " + savedStartCode + "for backupID " + backupInfo.getBackupId()); + } + // get all new log files from .logs and .oldlogs after last TS and before new timestamp + if (savedStartCode == null || previousTimestampMins == null + || previousTimestampMins.isEmpty()) { + throw new IOException( + "Cannot read any previous back up timestamps from backup system table. " + + "In order to create an incremental backup, at least one full backup is needed."); + } + + newTimestamps = readRegionServerLastLogRollResult(); + + logList = getLogFilesForNewBackup(previousTimestampMins, newTimestamps, conf, savedStartCode); + List<WALItem> logFromSystemTable = + getLogFilesFromBackupSystem(previousTimestampMins, newTimestamps, getBackupInfo() + .getBackupRootDir()); + + logList = excludeAlreadyBackedUpWALs(logList, logFromSystemTable); + backupInfo.setIncrBackupFileList(logList); + + return logList; + } + + + private List<String> excludeAlreadyBackedUpWALs(List<String> logList, + List<WALItem> logFromSystemTable) { + + Set<String> walFileNameSet = convertToSet(logFromSystemTable); + + List<String> list = new ArrayList<String>(); + for (int i=0; i < logList.size(); i++) { + Path p = new Path(logList.get(i)); + String name = p.getName(); + if (walFileNameSet.contains(name)) continue; + list.add(logList.get(i)); + } + return list; + } + + /** + * Create Set of WAL file names (not full path names) + * @param logFromSystemTable + * @return set of WAL file names + */ + private Set<String> convertToSet(List<WALItem> logFromSystemTable) { + + Set<String> set = new HashSet<String>(); + for (int i=0; i < logFromSystemTable.size(); i++) { + WALItem item = logFromSystemTable.get(i); + set.add(item.walFile); + } + return set; + } + + /** + * For each region server: get all log files newer than the last timestamps, but not newer than + * the newest timestamps. + * @param olderTimestamps timestamp map for each region server of the last backup. + * @param newestTimestamps timestamp map for each region server that the backup should lead to. + * @return list of log files which needs to be added to this backup + * @throws IOException + */ + private List<WALItem> getLogFilesFromBackupSystem(HashMap<String, Long> olderTimestamps, + HashMap<String, Long> newestTimestamps, String backupRoot) throws IOException { + List<WALItem> logFiles = new ArrayList<WALItem>(); + Iterator<WALItem> it = getWALFilesFromBackupSystem(); + while (it.hasNext()) { + WALItem item = it.next(); + String rootDir = item.getBackupRoot(); + if (!rootDir.equals(backupRoot)) { + continue; + } + String walFileName = item.getWalFile(); + String server = BackupUtils.parseHostNameFromLogFile(new Path(walFileName)); + if (server == null) { + continue; + } + Long tss = getTimestamp(walFileName); + Long oldTss = olderTimestamps.get(server); + Long newTss = newestTimestamps.get(server); + if (oldTss == null) { + logFiles.add(item); + continue; + } + if (newTss == null) { + newTss = Long.MAX_VALUE; + } + if (tss > oldTss && tss < newTss) { + logFiles.add(item); + } + } + return logFiles; + } + + private Long getTimestamp(String walFileName) { + int index = walFileName.lastIndexOf(BackupUtils.LOGNAME_SEPARATOR); + return Long.parseLong(walFileName.substring(index + 1)); + } + + /** + * For each region server: get all log files newer than the last timestamps but not newer than the + * newest timestamps. + * @param olderTimestamps the timestamp for each region server of the last backup. + * @param newestTimestamps the timestamp for each region server that the backup should lead to. + * @param conf the Hadoop and Hbase configuration + * @param savedStartCode the startcode (timestamp) of last successful backup. + * @return a list of log files to be backed up + * @throws IOException exception + */ + private List<String> getLogFilesForNewBackup(HashMap<String, Long> olderTimestamps, + HashMap<String, Long> newestTimestamps, Configuration conf, String savedStartCode) + throws IOException { + LOG.debug("In getLogFilesForNewBackup()\n" + "olderTimestamps: " + olderTimestamps + + "\n newestTimestamps: " + newestTimestamps); + Path rootdir = FSUtils.getRootDir(conf); + Path logDir = new Path(rootdir, HConstants.HREGION_LOGDIR_NAME); + Path oldLogDir = new Path(rootdir, HConstants.HREGION_OLDLOGDIR_NAME); + FileSystem fs = rootdir.getFileSystem(conf); + NewestLogFilter pathFilter = new NewestLogFilter(); + + List<String> resultLogFiles = new ArrayList<String>(); + List<String> newestLogs = new ArrayList<String>(); + + /* + * The old region servers and timestamps info we kept in backup system table may be out of sync + * if new region server is added or existing one lost. We'll deal with it here when processing + * the logs. If data in backup system table has more hosts, just ignore it. If the .logs + * directory includes more hosts, the additional hosts will not have old timestamps to compare + * with. We'll just use all the logs in that directory. We always write up-to-date region server + * and timestamp info to backup system table at the end of successful backup. + */ + + FileStatus[] rss; + Path p; + String host; + Long oldTimeStamp; + String currentLogFile; + long currentLogTS; + + // Get the files in .logs. + rss = fs.listStatus(logDir); + for (FileStatus rs : rss) { + p = rs.getPath(); + host = BackupUtils.parseHostNameFromLogFile(p); + if (host == null) { + continue; + } + FileStatus[] logs; + oldTimeStamp = olderTimestamps.get(host); + // It is possible that there is no old timestamp in backup system table for this host if + // this region server is newly added after our last backup. + if (oldTimeStamp == null) { + logs = fs.listStatus(p); + } else { + pathFilter.setLastBackupTS(oldTimeStamp); + logs = fs.listStatus(p, pathFilter); + } + for (FileStatus log : logs) { + LOG.debug("currentLogFile: " + log.getPath().toString()); + if (AbstractFSWALProvider.isMetaFile(log.getPath())) { + if (LOG.isDebugEnabled()) { + LOG.debug("Skip hbase:meta log file: " + log.getPath().getName()); + } + continue; + } + currentLogFile = log.getPath().toString(); + resultLogFiles.add(currentLogFile); + currentLogTS = BackupUtils.getCreationTime(log.getPath()); + // newestTimestamps is up-to-date with the current list of hosts + // so newestTimestamps.get(host) will not be null. + if (currentLogTS > newestTimestamps.get(host)) { + newestLogs.add(currentLogFile); + } + } + } + + // Include the .oldlogs files too. + FileStatus[] oldlogs = fs.listStatus(oldLogDir); + for (FileStatus oldlog : oldlogs) { + p = oldlog.getPath(); + currentLogFile = p.toString(); + if (AbstractFSWALProvider.isMetaFile(p)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Skip .meta log file: " + currentLogFile); + } + continue; + } + host = BackupUtils.parseHostFromOldLog(p); + if (host == null) { + continue; + } + currentLogTS = BackupUtils.getCreationTime(p); + oldTimeStamp = olderTimestamps.get(host); + /* + * It is possible that there is no old timestamp in backup system table for this host. At the + * time of our last backup operation, this rs did not exist. The reason can be one of the two: + * 1. The rs already left/crashed. Its logs were moved to .oldlogs. 2. The rs was added after + * our last backup. + */ + if (oldTimeStamp == null) { + if (currentLogTS < Long.parseLong(savedStartCode)) { + // This log file is really old, its region server was before our last backup. + continue; + } else { + resultLogFiles.add(currentLogFile); + } + } else if (currentLogTS > oldTimeStamp) { + resultLogFiles.add(currentLogFile); + } + + // It is possible that a host in .oldlogs is an obsolete region server + // so newestTimestamps.get(host) here can be null. + // Even if these logs belong to a obsolete region server, we still need + // to include they to avoid loss of edits for backup. + Long newTimestamp = newestTimestamps.get(host); + if (newTimestamp != null && currentLogTS > newTimestamp) { + newestLogs.add(currentLogFile); + } + } + // remove newest log per host because they are still in use + resultLogFiles.removeAll(newestLogs); + return resultLogFiles; + } + + static class NewestLogFilter implements PathFilter { + private Long lastBackupTS = 0L; + + public NewestLogFilter() { + } + + protected void setLastBackupTS(Long ts) { + this.lastBackupTS = ts; + } + + @Override + public boolean accept(Path path) { + // skip meta table log -- ts.meta file + if (AbstractFSWALProvider.isMetaFile(path)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Skip .meta log file: " + path.getName()); + } + return false; + } + long timestamp; + try { + timestamp = BackupUtils.getCreationTime(path); + return timestamp > lastBackupTS; + } catch (Exception e) { + LOG.warn("Cannot read timestamp of log file " + path); + return false; + } + } + } + +}
http://git-wip-us.apache.org/repos/asf/hbase/blob/2dda3712/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java ---------------------------------------------------------------------- diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java new file mode 100644 index 0000000..6d48c32 --- /dev/null +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java @@ -0,0 +1,377 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.backup.impl; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; + +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.backup.BackupCopyJob; +import org.apache.hadoop.hbase.backup.BackupInfo; +import org.apache.hadoop.hbase.backup.BackupInfo.BackupPhase; +import org.apache.hadoop.hbase.backup.BackupRequest; +import org.apache.hadoop.hbase.backup.BackupRestoreFactory; +import org.apache.hadoop.hbase.backup.BackupType; +import org.apache.hadoop.hbase.backup.util.BackupUtils; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.mapreduce.WALPlayer; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.HFileArchiveUtil; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; +import org.apache.hadoop.util.Tool; + +/** + * Incremental backup implementation. + * See the {@link #execute() execute} method. + * + */ +@InterfaceAudience.Private +public class IncrementalTableBackupClient extends TableBackupClient { + private static final Log LOG = LogFactory.getLog(IncrementalTableBackupClient.class); + + protected IncrementalTableBackupClient() { + } + + public IncrementalTableBackupClient(final Connection conn, final String backupId, + BackupRequest request) throws IOException { + super(conn, backupId, request); + } + + protected List<String> filterMissingFiles(List<String> incrBackupFileList) throws IOException { + FileSystem fs = FileSystem.get(conf); + List<String> list = new ArrayList<String>(); + for (String file : incrBackupFileList) { + Path p = new Path(file); + if (fs.exists(p) || isActiveWalPath(p)) { + list.add(file); + } else { + LOG.warn("Can't find file: " + file); + } + } + return list; + } + + /** + * Check if a given path is belongs to active WAL directory + * @param p path + * @return true, if yes + */ + protected boolean isActiveWalPath(Path p) { + return !AbstractFSWALProvider.isArchivedLogFile(p); + } + + protected static int getIndex(TableName tbl, List<TableName> sTableList) { + if (sTableList == null) return 0; + for (int i = 0; i < sTableList.size(); i++) { + if (tbl.equals(sTableList.get(i))) { + return i; + } + } + return -1; + } + + /* + * Reads bulk load records from backup table, iterates through the records and forms the paths + * for bulk loaded hfiles. Copies the bulk loaded hfiles to backup destination + * @param sTableList list of tables to be backed up + * @return map of table to List of files + */ + protected Map<byte[], List<Path>>[] handleBulkLoad(List<TableName> sTableList) throws IOException { + Map<byte[], List<Path>>[] mapForSrc = new Map[sTableList.size()]; + Pair<Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>>, List<byte[]>> pair = + backupManager.readBulkloadRows(sTableList); + Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>> map = pair.getFirst(); + FileSystem fs = FileSystem.get(conf); + FileSystem tgtFs; + try { + tgtFs = FileSystem.get(new URI(backupInfo.getBackupRootDir()), conf); + } catch (URISyntaxException use) { + throw new IOException("Unable to get FileSystem", use); + } + Path rootdir = FSUtils.getRootDir(conf); + Path tgtRoot = new Path(new Path(backupInfo.getBackupRootDir()), backupId); + for (Map.Entry<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>> tblEntry : + map.entrySet()) { + TableName srcTable = tblEntry.getKey(); + int srcIdx = getIndex(srcTable, sTableList); + if (srcIdx < 0) { + LOG.warn("Couldn't find " + srcTable + " in source table List"); + continue; + } + if (mapForSrc[srcIdx] == null) { + mapForSrc[srcIdx] = new TreeMap<byte[], List<Path>>(Bytes.BYTES_COMPARATOR); + } + Path tblDir = FSUtils.getTableDir(rootdir, srcTable); + Path tgtTable = new Path(new Path(tgtRoot, srcTable.getNamespaceAsString()), + srcTable.getQualifierAsString()); + for (Map.Entry<String,Map<String,List<Pair<String, Boolean>>>> regionEntry : + tblEntry.getValue().entrySet()){ + String regionName = regionEntry.getKey(); + Path regionDir = new Path(tblDir, regionName); + // map from family to List of hfiles + for (Map.Entry<String,List<Pair<String, Boolean>>> famEntry : + regionEntry.getValue().entrySet()) { + String fam = famEntry.getKey(); + Path famDir = new Path(regionDir, fam); + List<Path> files; + if (!mapForSrc[srcIdx].containsKey(fam.getBytes())) { + files = new ArrayList<Path>(); + mapForSrc[srcIdx].put(fam.getBytes(), files); + } else { + files = mapForSrc[srcIdx].get(fam.getBytes()); + } + Path archiveDir = HFileArchiveUtil.getStoreArchivePath(conf, srcTable, regionName, fam); + String tblName = srcTable.getQualifierAsString(); + Path tgtFam = new Path(new Path(tgtTable, regionName), fam); + if (!tgtFs.mkdirs(tgtFam)) { + throw new IOException("couldn't create " + tgtFam); + } + for (Pair<String, Boolean> fileWithState : famEntry.getValue()) { + String file = fileWithState.getFirst(); + boolean raw = fileWithState.getSecond(); + int idx = file.lastIndexOf("/"); + String filename = file; + if (idx > 0) { + filename = file.substring(idx+1); + } + Path p = new Path(famDir, filename); + Path tgt = new Path(tgtFam, filename); + Path archive = new Path(archiveDir, filename); + if (fs.exists(p)) { + if (LOG.isTraceEnabled()) { + LOG.trace("found bulk hfile " + file + " in " + famDir + " for " + tblName); + } + try { + if (LOG.isTraceEnabled()) { + LOG.trace("copying " + p + " to " + tgt); + } + FileUtil.copy(fs, p, tgtFs, tgt, false,conf); + } catch (FileNotFoundException e) { + LOG.debug("copying archive " + archive + " to " + tgt); + try { + FileUtil.copy(fs, archive, tgtFs, tgt, false, conf); + } catch (FileNotFoundException fnfe) { + if (!raw) throw fnfe; + } + } + } else { + LOG.debug("copying archive " + archive + " to " + tgt); + try { + FileUtil.copy(fs, archive, tgtFs, tgt, false, conf); + } catch (FileNotFoundException fnfe) { + if (!raw) throw fnfe; + } + } + files.add(tgt); + } + } + } + } + backupManager.writeBulkLoadedFiles(sTableList, mapForSrc); + backupManager.removeBulkLoadedRows(sTableList, pair.getSecond()); + return mapForSrc; + } + + @Override + public void execute() throws IOException { + + try { + // case PREPARE_INCREMENTAL: + beginBackup(backupManager, backupInfo); + backupInfo.setPhase(BackupPhase.PREPARE_INCREMENTAL); + LOG.debug("For incremental backup, current table set is " + + backupManager.getIncrementalBackupTableSet()); + newTimestamps = + ((IncrementalBackupManager) backupManager).getIncrBackupLogFileMap(); + } catch (Exception e) { + // fail the overall backup and return + failBackup(conn, backupInfo, backupManager, e, "Unexpected Exception : ", + BackupType.INCREMENTAL, conf); + return; + } + + // case INCREMENTAL_COPY: + try { + // copy out the table and region info files for each table + BackupUtils.copyTableRegionInfo(conn, backupInfo, conf); + // convert WAL to HFiles and copy them to .tmp under BACKUP_ROOT + convertWALsToHFiles(backupInfo); + incrementalCopyHFiles(backupInfo); + // Save list of WAL files copied + backupManager.recordWALFiles(backupInfo.getIncrBackupFileList()); + } catch (Exception e) { + String msg = "Unexpected exception in incremental-backup: incremental copy " + backupId; + // fail the overall backup and return + failBackup(conn, backupInfo, backupManager, e, msg, BackupType.INCREMENTAL, conf); + return; + } + // case INCR_BACKUP_COMPLETE: + // set overall backup status: complete. Here we make sure to complete the backup. + // After this checkpoint, even if entering cancel process, will let the backup finished + try { + // Set the previousTimestampMap which is before this current log roll to the manifest. + HashMap<TableName, HashMap<String, Long>> previousTimestampMap = + backupManager.readLogTimestampMap(); + backupInfo.setIncrTimestampMap(previousTimestampMap); + + // The table list in backupInfo is good for both full backup and incremental backup. + // For incremental backup, it contains the incremental backup table set. + backupManager.writeRegionServerLogTimestamp(backupInfo.getTables(), newTimestamps); + + HashMap<TableName, HashMap<String, Long>> newTableSetTimestampMap = + backupManager.readLogTimestampMap(); + + Long newStartCode = + BackupUtils.getMinValue(BackupUtils.getRSLogTimestampMins(newTableSetTimestampMap)); + backupManager.writeBackupStartCode(newStartCode); + + handleBulkLoad(backupInfo.getTableNames()); + // backup complete + completeBackup(conn, backupInfo, backupManager, BackupType.INCREMENTAL, conf); + + } catch (IOException e) { + failBackup(conn, backupInfo, backupManager, e, "Unexpected Exception : ", + BackupType.INCREMENTAL, conf); + } + } + + protected void incrementalCopyHFiles(BackupInfo backupInfo) throws Exception { + + try { + LOG.debug("Incremental copy HFiles is starting."); + // set overall backup phase: incremental_copy + backupInfo.setPhase(BackupPhase.INCREMENTAL_COPY); + // get incremental backup file list and prepare parms for DistCp + List<String> incrBackupFileList = new ArrayList<String>(); + // Add Bulk output + incrBackupFileList.add(getBulkOutputDir().toString()); + String[] strArr = incrBackupFileList.toArray(new String[incrBackupFileList.size() + 1]); + strArr[strArr.length - 1] = backupInfo.getBackupRootDir(); + BackupCopyJob copyService = BackupRestoreFactory.getBackupCopyJob(conf); + int res = copyService.copy(backupInfo, backupManager, conf, BackupType.INCREMENTAL, strArr); + if (res != 0) { + LOG.error("Copy incremental HFile files failed with return code: " + res + "."); + throw new IOException("Failed copy from " + StringUtils.join(incrBackupFileList, ',') + + " to " + backupInfo.getHLogTargetDir()); + } + LOG.debug("Incremental copy HFiles from " + StringUtils.join(incrBackupFileList, ',') + + " to " + backupInfo.getBackupRootDir() + " finished."); + } finally { + deleteBulkLoadDirectory(); + } + } + + protected void deleteBulkLoadDirectory() throws IOException { + // delete original bulk load directory on method exit + Path path = getBulkOutputDir(); + FileSystem fs = FileSystem.get(conf); + boolean result = fs.delete(path, true); + if (!result) { + LOG.warn("Could not delete " + path); + } + + } + + protected void convertWALsToHFiles(BackupInfo backupInfo) throws IOException { + // get incremental backup file list and prepare parameters for DistCp + List<String> incrBackupFileList = backupInfo.getIncrBackupFileList(); + // Get list of tables in incremental backup set + Set<TableName> tableSet = backupManager.getIncrementalBackupTableSet(); + // filter missing files out (they have been copied by previous backups) + incrBackupFileList = filterMissingFiles(incrBackupFileList); + for (TableName table : tableSet) { + // Check if table exists + if (tableExists(table, conn)) { + walToHFiles(incrBackupFileList, table); + } else { + LOG.warn("Table " + table + " does not exists. Skipping in WAL converter"); + } + } + } + + + protected boolean tableExists(TableName table, Connection conn) throws IOException { + try (Admin admin = conn.getAdmin();) { + return admin.tableExists(table); + } + } + + protected void walToHFiles(List<String> dirPaths, TableName tableName) throws IOException { + + Tool player = new WALPlayer(); + + // Player reads all files in arbitrary directory structure and creates + // a Map task for each file. We use ';' as separator + // because WAL file names contains ',' + String dirs = StringUtils.join(dirPaths, ';'); + + Path bulkOutputPath = getBulkOutputDirForTable(tableName); + conf.set(WALPlayer.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString()); + conf.set(WALPlayer.INPUT_FILES_SEPARATOR_KEY, ";"); + String[] playerArgs = { dirs, tableName.getNameAsString() }; + + try { + player.setConf(conf); + int result = player.run(playerArgs); + if(result != 0) { + throw new IOException("WAL Player failed"); + } + conf.unset(WALPlayer.INPUT_FILES_SEPARATOR_KEY); + } catch (IOException e) { + throw e; + } catch (Exception ee) { + throw new IOException("Can not convert from directory " + dirs + + " (check Hadoop, HBase and WALPlayer M/R job logs) ", ee); + } + } + + protected Path getBulkOutputDirForTable(TableName table) { + Path tablePath = getBulkOutputDir(); + tablePath = new Path(tablePath, table.getNamespaceAsString()); + tablePath = new Path(tablePath, table.getQualifierAsString()); + return new Path(tablePath, "data"); + } + + protected Path getBulkOutputDir() { + String backupId = backupInfo.getBackupId(); + Path path = new Path(backupInfo.getBackupRootDir()); + path = new Path(path, ".tmp"); + path = new Path(path, backupId); + return path; + } + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/2dda3712/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java ---------------------------------------------------------------------- diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java new file mode 100644 index 0000000..ea7a7b8 --- /dev/null +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java @@ -0,0 +1,278 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.backup.impl; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; + +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.backup.BackupType; +import org.apache.hadoop.hbase.backup.HBackupFileSystem; +import org.apache.hadoop.hbase.backup.RestoreRequest; +import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage; +import org.apache.hadoop.hbase.backup.util.BackupUtils; +import org.apache.hadoop.hbase.backup.util.RestoreTool; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; +import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles.LoadQueueItem; + +/** + * Restore table implementation + * + */ +@InterfaceAudience.Private +public class RestoreTablesClient { + private static final Log LOG = LogFactory.getLog(RestoreTablesClient.class); + + private Configuration conf; + private Connection conn; + private String backupId; + private TableName[] sTableArray; + private TableName[] tTableArray; + private String targetRootDir; + private boolean isOverwrite; + + public RestoreTablesClient(Connection conn, RestoreRequest request) throws IOException { + this.targetRootDir = request.getBackupRootDir(); + this.backupId = request.getBackupId(); + this.sTableArray = request.getFromTables(); + this.tTableArray = request.getToTables(); + if (tTableArray == null || tTableArray.length == 0) { + this.tTableArray = sTableArray; + } + this.isOverwrite = request.isOverwrite(); + this.conn = conn; + this.conf = conn.getConfiguration(); + + } + + /** + * Validate target tables + * @param conn connection + * @param mgr table state manager + * @param tTableArray: target tables + * @param isOverwrite overwrite existing table + * @throws IOException exception + */ + private void checkTargetTables(TableName[] tTableArray, boolean isOverwrite) throws IOException { + ArrayList<TableName> existTableList = new ArrayList<>(); + ArrayList<TableName> disabledTableList = new ArrayList<>(); + + // check if the tables already exist + try (Admin admin = conn.getAdmin();) { + for (TableName tableName : tTableArray) { + if (admin.tableExists(tableName)) { + existTableList.add(tableName); + if (admin.isTableDisabled(tableName)) { + disabledTableList.add(tableName); + } + } else { + LOG.info("HBase table " + tableName + + " does not exist. It will be created during restore process"); + } + } + } + + if (existTableList.size() > 0) { + if (!isOverwrite) { + LOG.error("Existing table (" + existTableList + + ") found in the restore target, please add " + + "\"-overwrite\" option in the command if you mean" + + " to restore to these existing tables"); + throw new IOException("Existing table found in target while no \"-overwrite\" " + + "option found"); + } else { + if (disabledTableList.size() > 0) { + LOG.error("Found offline table in the restore target, " + + "please enable them before restore with \"-overwrite\" option"); + LOG.info("Offline table list in restore target: " + disabledTableList); + throw new IOException( + "Found offline table in the target when restore with \"-overwrite\" option"); + } + } + } + } + + /** + * Restore operation handle each backupImage in array + * @param svc: master services + * @param images: array BackupImage + * @param sTable: table to be restored + * @param tTable: table to be restored to + * @param truncateIfExists: truncate table + * @throws IOException exception + */ + + private void restoreImages(BackupImage[] images, TableName sTable, TableName tTable, + boolean truncateIfExists) throws IOException { + + // First image MUST be image of a FULL backup + BackupImage image = images[0]; + String rootDir = image.getRootDir(); + String backupId = image.getBackupId(); + Path backupRoot = new Path(rootDir); + RestoreTool restoreTool = new RestoreTool(conf, backupRoot, backupId); + Path tableBackupPath = HBackupFileSystem.getTableBackupPath(sTable, backupRoot, backupId); + String lastIncrBackupId = images.length == 1 ? null : images[images.length - 1].getBackupId(); + // We need hFS only for full restore (see the code) + BackupManifest manifest = HBackupFileSystem.getManifest(conf, backupRoot, backupId); + if (manifest.getType() == BackupType.FULL) { + LOG.info("Restoring '" + sTable + "' to '" + tTable + "' from full" + " backup image " + + tableBackupPath.toString()); + restoreTool.fullRestoreTable(conn, tableBackupPath, sTable, tTable, truncateIfExists, + lastIncrBackupId); + } else { // incremental Backup + throw new IOException("Unexpected backup type " + image.getType()); + } + + if (images.length == 1) { + // full backup restore done + return; + } + + List<Path> dirList = new ArrayList<Path>(); + // add full backup path + // full backup path comes first + for (int i = 1; i < images.length; i++) { + BackupImage im = images[i]; + String fileBackupDir = + HBackupFileSystem.getTableBackupDataDir(im.getRootDir(), im.getBackupId(), sTable); + dirList.add(new Path(fileBackupDir)); + } + + String dirs = StringUtils.join(dirList, ","); + LOG.info("Restoring '" + sTable + "' to '" + tTable + "' from log dirs: " + dirs); + Path[] paths = new Path[dirList.size()]; + dirList.toArray(paths); + restoreTool.incrementalRestoreTable(conn, tableBackupPath, paths, new TableName[] { sTable }, + new TableName[] { tTable }, lastIncrBackupId); + LOG.info(sTable + " has been successfully restored to " + tTable); + } + + /** + * Restore operation. Stage 2: resolved Backup Image dependency + * @param backupManifestMap : tableName, Manifest + * @param sTableArray The array of tables to be restored + * @param tTableArray The array of mapping tables to restore to + * @return set of BackupImages restored + * @throws IOException exception + */ + private void restore(HashMap<TableName, BackupManifest> backupManifestMap, + TableName[] sTableArray, TableName[] tTableArray, boolean isOverwrite) throws IOException { + TreeSet<BackupImage> restoreImageSet = new TreeSet<BackupImage>(); + boolean truncateIfExists = isOverwrite; + Set<String> backupIdSet = new HashSet<>(); + + for (int i = 0; i < sTableArray.length; i++) { + TableName table = sTableArray[i]; + + BackupManifest manifest = backupManifestMap.get(table); + // Get the image list of this backup for restore in time order from old + // to new. + List<BackupImage> list = new ArrayList<BackupImage>(); + list.add(manifest.getBackupImage()); + TreeSet<BackupImage> set = new TreeSet<BackupImage>(list); + List<BackupImage> depList = manifest.getDependentListByTable(table); + set.addAll(depList); + BackupImage[] arr = new BackupImage[set.size()]; + set.toArray(arr); + restoreImages(arr, table, tTableArray[i], truncateIfExists); + restoreImageSet.addAll(list); + if (restoreImageSet != null && !restoreImageSet.isEmpty()) { + LOG.info("Restore includes the following image(s):"); + for (BackupImage image : restoreImageSet) { + LOG.info("Backup: " + image.getBackupId() + " " + + HBackupFileSystem.getTableBackupDir(image.getRootDir(), image.getBackupId(), table)); + if (image.getType() == BackupType.INCREMENTAL) { + backupIdSet.add(image.getBackupId()); + LOG.debug("adding " + image.getBackupId() + " for bulk load"); + } + } + } + } + try (BackupSystemTable table = new BackupSystemTable(conn)) { + List<TableName> sTableList = Arrays.asList(sTableArray); + for (String id : backupIdSet) { + LOG.debug("restoring bulk load for " + id); + Map<byte[], List<Path>>[] mapForSrc = table.readBulkLoadedFiles(id, sTableList); + Map<LoadQueueItem, ByteBuffer> loaderResult; + conf.setBoolean(LoadIncrementalHFiles.ALWAYS_COPY_FILES, true); + LoadIncrementalHFiles loader = BackupUtils.createLoader(conf); + for (int i = 0; i < sTableList.size(); i++) { + if (mapForSrc[i] != null && !mapForSrc[i].isEmpty()) { + loaderResult = loader.run(null, mapForSrc[i], tTableArray[i]); + LOG.debug("bulk loading " + sTableList.get(i) + " to " + tTableArray[i]); + if (loaderResult.isEmpty()) { + String msg = "Couldn't bulk load for " + sTableList.get(i) + " to " + tTableArray[i]; + LOG.error(msg); + throw new IOException(msg); + } + } + } + } + } + LOG.debug("restoreStage finished"); + } + + static long getTsFromBackupId(String backupId) { + if (backupId == null) { + return 0; + } + return Long.parseLong(backupId.substring(backupId.lastIndexOf("_") + 1)); + } + + static boolean withinRange(long a, long lower, long upper) { + if (a < lower || a > upper) { + return false; + } + return true; + } + + public void execute() throws IOException { + + // case VALIDATION: + // check the target tables + checkTargetTables(tTableArray, isOverwrite); + + // case RESTORE_IMAGES: + HashMap<TableName, BackupManifest> backupManifestMap = new HashMap<>(); + // check and load backup image manifest for the tables + Path rootPath = new Path(targetRootDir); + HBackupFileSystem.checkImageManifestExist(backupManifestMap, sTableArray, conf, rootPath, + backupId); + + restore(backupManifestMap, sTableArray, tTableArray, isOverwrite); + } + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/2dda3712/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java ---------------------------------------------------------------------- diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java new file mode 100644 index 0000000..6eec460 --- /dev/null +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java @@ -0,0 +1,436 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.backup.impl; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.backup.BackupInfo; +import org.apache.hadoop.hbase.backup.BackupInfo.BackupPhase; +import org.apache.hadoop.hbase.backup.BackupInfo.BackupState; +import org.apache.hadoop.hbase.backup.BackupRequest; +import org.apache.hadoop.hbase.backup.BackupRestoreConstants; +import org.apache.hadoop.hbase.backup.BackupType; +import org.apache.hadoop.hbase.backup.HBackupFileSystem; +import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.FSUtils; + +import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; + +/** + * Base class for backup operation. Concrete implementation for + * full and incremental backup are delegated to corresponding sub-classes: + * {@link FullTableBackupClient} and {@link IncrementalTableBackupClient} + * + */ +@InterfaceAudience.Private +public abstract class TableBackupClient { + + public static final String BACKUP_CLIENT_IMPL_CLASS = "backup.client.impl.class"; + + @VisibleForTesting + public static final String BACKUP_TEST_MODE_STAGE = "backup.test.mode.stage"; + + private static final Log LOG = LogFactory.getLog(TableBackupClient.class); + + protected Configuration conf; + protected Connection conn; + protected String backupId; + protected List<TableName> tableList; + protected HashMap<String, Long> newTimestamps = null; + + protected BackupManager backupManager; + protected BackupInfo backupInfo; + + public TableBackupClient() { + } + + public TableBackupClient(final Connection conn, final String backupId, BackupRequest request) + throws IOException { + init(conn, backupId, request); + } + + public void init(final Connection conn, final String backupId, BackupRequest request) + throws IOException + { + if (request.getBackupType() == BackupType.FULL) { + backupManager = new BackupManager(conn, conn.getConfiguration()); + } else { + backupManager = new IncrementalBackupManager(conn, conn.getConfiguration()); + } + this.backupId = backupId; + this.tableList = request.getTableList(); + this.conn = conn; + this.conf = conn.getConfiguration(); + backupInfo = + backupManager.createBackupInfo(backupId, request.getBackupType(), tableList, + request.getTargetRootDir(), request.getTotalTasks(), request.getBandwidth()); + if (tableList == null || tableList.isEmpty()) { + this.tableList = new ArrayList<>(backupInfo.getTables()); + } + // Start new session + backupManager.startBackupSession(); + } + + /** + * Begin the overall backup. + * @param backupInfo backup info + * @throws IOException exception + */ + protected void beginBackup(BackupManager backupManager, BackupInfo backupInfo) + throws IOException { + + BackupSystemTable.snapshot(conn); + backupManager.setBackupInfo(backupInfo); + // set the start timestamp of the overall backup + long startTs = EnvironmentEdgeManager.currentTime(); + backupInfo.setStartTs(startTs); + // set overall backup status: ongoing + backupInfo.setState(BackupState.RUNNING); + backupInfo.setPhase(BackupPhase.REQUEST); + LOG.info("Backup " + backupInfo.getBackupId() + " started at " + startTs + "."); + + backupManager.updateBackupInfo(backupInfo); + if (LOG.isDebugEnabled()) { + LOG.debug("Backup session " + backupInfo.getBackupId() + " has been started."); + } + } + + protected String getMessage(Exception e) { + String msg = e.getMessage(); + if (msg == null || msg.equals("")) { + msg = e.getClass().getName(); + } + return msg; + } + + /** + * Delete HBase snapshot for backup. + * @param backupInfo backup info + * @throws Exception exception + */ + protected static void deleteSnapshots(final Connection conn, BackupInfo backupInfo, Configuration conf) + throws IOException { + LOG.debug("Trying to delete snapshot for full backup."); + for (String snapshotName : backupInfo.getSnapshotNames()) { + if (snapshotName == null) { + continue; + } + LOG.debug("Trying to delete snapshot: " + snapshotName); + + try (Admin admin = conn.getAdmin();) { + admin.deleteSnapshot(snapshotName); + } + LOG.debug("Deleting the snapshot " + snapshotName + " for backup " + backupInfo.getBackupId() + + " succeeded."); + } + } + + /** + * Clean up directories with prefix "exportSnapshot-", which are generated when exporting + * snapshots. + * @throws IOException exception + */ + protected static void cleanupExportSnapshotLog(Configuration conf) throws IOException { + FileSystem fs = FSUtils.getCurrentFileSystem(conf); + Path stagingDir = + new Path(conf.get(BackupRestoreConstants.CONF_STAGING_ROOT, fs.getWorkingDirectory() + .toString())); + FileStatus[] files = FSUtils.listStatus(fs, stagingDir); + if (files == null) { + return; + } + for (FileStatus file : files) { + if (file.getPath().getName().startsWith("exportSnapshot-")) { + LOG.debug("Delete log files of exporting snapshot: " + file.getPath().getName()); + if (FSUtils.delete(fs, file.getPath(), true) == false) { + LOG.warn("Can not delete " + file.getPath()); + } + } + } + } + + /** + * Clean up the uncompleted data at target directory if the ongoing backup has already entered + * the copy phase. + */ + protected static void cleanupTargetDir(BackupInfo backupInfo, Configuration conf) { + try { + // clean up the uncompleted data at target directory if the ongoing backup has already entered + // the copy phase + LOG.debug("Trying to cleanup up target dir. Current backup phase: " + + backupInfo.getPhase()); + if (backupInfo.getPhase().equals(BackupPhase.SNAPSHOTCOPY) + || backupInfo.getPhase().equals(BackupPhase.INCREMENTAL_COPY) + || backupInfo.getPhase().equals(BackupPhase.STORE_MANIFEST)) { + FileSystem outputFs = + FileSystem.get(new Path(backupInfo.getBackupRootDir()).toUri(), conf); + + // now treat one backup as a transaction, clean up data that has been partially copied at + // table level + for (TableName table : backupInfo.getTables()) { + Path targetDirPath = + new Path(HBackupFileSystem.getTableBackupDir(backupInfo.getBackupRootDir(), + backupInfo.getBackupId(), table)); + if (outputFs.delete(targetDirPath, true)) { + LOG.debug("Cleaning up uncompleted backup data at " + targetDirPath.toString() + + " done."); + } else { + LOG.debug("No data has been copied to " + targetDirPath.toString() + "."); + } + + Path tableDir = targetDirPath.getParent(); + FileStatus[] backups = FSUtils.listStatus(outputFs, tableDir); + if (backups == null || backups.length == 0) { + outputFs.delete(tableDir, true); + LOG.debug(tableDir.toString() + " is empty, remove it."); + } + } + } + + } catch (IOException e1) { + LOG.error("Cleaning up uncompleted backup data of " + backupInfo.getBackupId() + " at " + + backupInfo.getBackupRootDir() + " failed due to " + e1.getMessage() + "."); + } + } + + /** + * Fail the overall backup. + * @param backupInfo backup info + * @param e exception + * @throws Exception exception + */ + protected void failBackup(Connection conn, BackupInfo backupInfo, BackupManager backupManager, + Exception e, String msg, BackupType type, Configuration conf) throws IOException { + + try { + LOG.error(msg + getMessage(e), e); + // If this is a cancel exception, then we've already cleaned. + // set the failure timestamp of the overall backup + backupInfo.setCompleteTs(EnvironmentEdgeManager.currentTime()); + // set failure message + backupInfo.setFailedMsg(e.getMessage()); + // set overall backup status: failed + backupInfo.setState(BackupState.FAILED); + // compose the backup failed data + String backupFailedData = + "BackupId=" + backupInfo.getBackupId() + ",startts=" + backupInfo.getStartTs() + + ",failedts=" + backupInfo.getCompleteTs() + ",failedphase=" + backupInfo.getPhase() + + ",failedmessage=" + backupInfo.getFailedMsg(); + LOG.error(backupFailedData); + cleanupAndRestoreBackupSystem(conn, backupInfo, conf); + // If backup session is updated to FAILED state - means we + // processed recovery already. + backupManager.updateBackupInfo(backupInfo); + backupManager.finishBackupSession(); + LOG.error("Backup " + backupInfo.getBackupId() + " failed."); + } catch (IOException ee) { + LOG.error("Please run backup repair tool manually to restore backup system integrity"); + throw ee; + } + } + + public static void cleanupAndRestoreBackupSystem (Connection conn, BackupInfo backupInfo, + Configuration conf) throws IOException + { + BackupType type = backupInfo.getType(); + // if full backup, then delete HBase snapshots if there already are snapshots taken + // and also clean up export snapshot log files if exist + if (type == BackupType.FULL) { + deleteSnapshots(conn, backupInfo, conf); + cleanupExportSnapshotLog(conf); + } + BackupSystemTable.restoreFromSnapshot(conn); + BackupSystemTable.deleteSnapshot(conn); + // clean up the uncompleted data at target directory if the ongoing backup has already entered + // the copy phase + // For incremental backup, DistCp logs will be cleaned with the targetDir. + cleanupTargetDir(backupInfo, conf); + } + + + + /** + * Add manifest for the current backup. The manifest is stored within the table backup directory. + * @param backupInfo The current backup info + * @throws IOException exception + * @throws BackupException exception + */ + protected void addManifest(BackupInfo backupInfo, BackupManager backupManager, BackupType type, + Configuration conf) throws IOException, BackupException { + // set the overall backup phase : store manifest + backupInfo.setPhase(BackupPhase.STORE_MANIFEST); + + BackupManifest manifest; + + // Since we have each table's backup in its own directory structure, + // we'll store its manifest with the table directory. + for (TableName table : backupInfo.getTables()) { + manifest = new BackupManifest(backupInfo, table); + ArrayList<BackupImage> ancestors = backupManager.getAncestors(backupInfo, table); + for (BackupImage image : ancestors) { + manifest.addDependentImage(image); + } + + if (type == BackupType.INCREMENTAL) { + // We'll store the log timestamps for this table only in its manifest. + HashMap<TableName, HashMap<String, Long>> tableTimestampMap = + new HashMap<TableName, HashMap<String, Long>>(); + tableTimestampMap.put(table, backupInfo.getIncrTimestampMap().get(table)); + manifest.setIncrTimestampMap(tableTimestampMap); + ArrayList<BackupImage> ancestorss = backupManager.getAncestors(backupInfo); + for (BackupImage image : ancestorss) { + manifest.addDependentImage(image); + } + } + manifest.store(conf); + } + + // For incremental backup, we store a overall manifest in + // <backup-root-dir>/WALs/<backup-id> + // This is used when created the next incremental backup + if (type == BackupType.INCREMENTAL) { + manifest = new BackupManifest(backupInfo); + // set the table region server start and end timestamps for incremental backup + manifest.setIncrTimestampMap(backupInfo.getIncrTimestampMap()); + ArrayList<BackupImage> ancestors = backupManager.getAncestors(backupInfo); + for (BackupImage image : ancestors) { + manifest.addDependentImage(image); + } + manifest.store(conf); + } + } + + /** + * Get backup request meta data dir as string. + * @param backupInfo backup info + * @return meta data dir + */ + protected String obtainBackupMetaDataStr(BackupInfo backupInfo) { + StringBuffer sb = new StringBuffer(); + sb.append("type=" + backupInfo.getType() + ",tablelist="); + for (TableName table : backupInfo.getTables()) { + sb.append(table + ";"); + } + if (sb.lastIndexOf(";") > 0) { + sb.delete(sb.lastIndexOf(";"), sb.lastIndexOf(";") + 1); + } + sb.append(",targetRootDir=" + backupInfo.getBackupRootDir()); + + return sb.toString(); + } + + /** + * Clean up directories with prefix "_distcp_logs-", which are generated when DistCp copying + * hlogs. + * @throws IOException exception + */ + protected void cleanupDistCpLog(BackupInfo backupInfo, Configuration conf) throws IOException { + Path rootPath = new Path(backupInfo.getHLogTargetDir()).getParent(); + FileSystem fs = FileSystem.get(rootPath.toUri(), conf); + FileStatus[] files = FSUtils.listStatus(fs, rootPath); + if (files == null) { + return; + } + for (FileStatus file : files) { + if (file.getPath().getName().startsWith("_distcp_logs")) { + LOG.debug("Delete log files of DistCp: " + file.getPath().getName()); + FSUtils.delete(fs, file.getPath(), true); + } + } + } + + /** + * Complete the overall backup. + * @param backupInfo backup info + * @throws Exception exception + */ + protected void completeBackup(final Connection conn, BackupInfo backupInfo, + BackupManager backupManager, BackupType type, Configuration conf) throws IOException { + // set the complete timestamp of the overall backup + backupInfo.setCompleteTs(EnvironmentEdgeManager.currentTime()); + // set overall backup status: complete + backupInfo.setState(BackupState.COMPLETE); + backupInfo.setProgress(100); + // add and store the manifest for the backup + addManifest(backupInfo, backupManager, type, conf); + + // compose the backup complete data + String backupCompleteData = + obtainBackupMetaDataStr(backupInfo) + ",startts=" + backupInfo.getStartTs() + + ",completets=" + backupInfo.getCompleteTs() + ",bytescopied=" + + backupInfo.getTotalBytesCopied(); + if (LOG.isDebugEnabled()) { + LOG.debug("Backup " + backupInfo.getBackupId() + " finished: " + backupCompleteData); + } + + // when full backup is done: + // - delete HBase snapshot + // - clean up directories with prefix "exportSnapshot-", which are generated when exporting + // snapshots + if (type == BackupType.FULL) { + deleteSnapshots(conn, backupInfo, conf); + cleanupExportSnapshotLog(conf); + } else if (type == BackupType.INCREMENTAL) { + cleanupDistCpLog(backupInfo, conf); + } + BackupSystemTable.deleteSnapshot(conn); + backupManager.updateBackupInfo(backupInfo); + + // Finish active session + backupManager.finishBackupSession(); + + LOG.info("Backup " + backupInfo.getBackupId() + " completed."); + } + + /** + * Backup request execution + * @throws IOException + */ + public abstract void execute() throws IOException; + + @VisibleForTesting + protected Stage getTestStage() { + return Stage.valueOf("stage_"+ conf.getInt(BACKUP_TEST_MODE_STAGE, 0)); + } + + @VisibleForTesting + protected void failStageIf(Stage stage) throws IOException { + Stage current = getTestStage(); + if (current == stage) { + throw new IOException("Failed stage " + stage+" in testing"); + } + } + + public static enum Stage { + stage_0, stage_1, stage_2, stage_3, stage_4 + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/2dda3712/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyJob.java ---------------------------------------------------------------------- diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyJob.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyJob.java new file mode 100644 index 0000000..016d1a4 --- /dev/null +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyJob.java @@ -0,0 +1,344 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.backup.mapreduce; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.math.BigDecimal; +import java.util.Arrays; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.backup.BackupCopyJob; +import org.apache.hadoop.hbase.backup.BackupInfo; +import org.apache.hadoop.hbase.backup.BackupType; +import org.apache.hadoop.hbase.backup.impl.BackupManager; +import org.apache.hadoop.hbase.backup.util.BackupUtils; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.snapshot.ExportSnapshot; +import org.apache.hadoop.mapreduce.Cluster; +import org.apache.hadoop.mapreduce.Counters; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.tools.DistCp; +import org.apache.hadoop.tools.DistCpConstants; +import org.apache.hadoop.tools.DistCpOptions; +import org.apache.zookeeper.KeeperException.NoNodeException; + +/** + * Map-Reduce implementation of {@link BackupCopyJob}. Basically, there are 2 types of copy + * operation: one is copying from snapshot, which bases on extending ExportSnapshot's function, the + * other is copying for incremental log files, which bases on extending DistCp's function. + */ +@InterfaceAudience.Private +public class MapReduceBackupCopyJob implements BackupCopyJob { + private static final Log LOG = LogFactory.getLog(MapReduceBackupCopyJob.class); + + private Configuration conf; + + // Accumulated progress within the whole backup process for the copy operation + private float progressDone = 0.1f; + private long bytesCopied = 0; + private static float INIT_PROGRESS = 0.1f; + + // The percentage of the current copy task within the whole task if multiple time copies are + // needed. The default value is 100%, which means only 1 copy task for the whole. + private float subTaskPercntgInWholeTask = 1f; + + public MapReduceBackupCopyJob() { + } + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } + + /** + * Get the current copy task percentage within the whole task if multiple copies are needed. + * @return the current copy task percentage + */ + public float getSubTaskPercntgInWholeTask() { + return subTaskPercntgInWholeTask; + } + + /** + * Set the current copy task percentage within the whole task if multiple copies are needed. Must + * be called before calling + * {@link #copy(BackupInfo, BackupManager, Configuration, BackupType, String[])} + * @param subTaskPercntgInWholeTask The percentage of the copy subtask + */ + public void setSubTaskPercntgInWholeTask(float subTaskPercntgInWholeTask) { + this.subTaskPercntgInWholeTask = subTaskPercntgInWholeTask; + } + + static class SnapshotCopy extends ExportSnapshot { + private BackupInfo backupInfo; + private TableName table; + + public SnapshotCopy(BackupInfo backupInfo, TableName table) { + super(); + this.backupInfo = backupInfo; + this.table = table; + } + + public TableName getTable() { + return this.table; + } + + public BackupInfo getBackupInfo() { + return this.backupInfo; + } + } + + /** + * Update the ongoing backup with new progress. + * @param backupInfo backup info + * @param newProgress progress + * @param bytesCopied bytes copied + * @throws NoNodeException exception + */ + static void updateProgress(BackupInfo backupInfo, BackupManager backupManager, + int newProgress, long bytesCopied) throws IOException { + // compose the new backup progress data, using fake number for now + String backupProgressData = newProgress + "%"; + + backupInfo.setProgress(newProgress); + backupManager.updateBackupInfo(backupInfo); + LOG.debug("Backup progress data \"" + backupProgressData + + "\" has been updated to backup system table for " + backupInfo.getBackupId()); + } + + /** + * Extends DistCp for progress updating to backup system table + * during backup. Using DistCpV2 (MAPREDUCE-2765). + * Simply extend it and override execute() method to get the + * Job reference for progress updating. + * Only the argument "src1, [src2, [...]] dst" is supported, + * no more DistCp options. + */ + class BackupDistCp extends DistCp { + + private BackupInfo backupInfo; + private BackupManager backupManager; + + public BackupDistCp(Configuration conf, DistCpOptions options, BackupInfo backupInfo, + BackupManager backupManager) throws Exception { + super(conf, options); + this.backupInfo = backupInfo; + this.backupManager = backupManager; + } + + @Override + public Job execute() throws Exception { + + // reflection preparation for private methods and fields + Class<?> classDistCp = org.apache.hadoop.tools.DistCp.class; + Method methodCreateMetaFolderPath = classDistCp.getDeclaredMethod("createMetaFolderPath"); + Method methodCreateJob = classDistCp.getDeclaredMethod("createJob"); + Method methodCreateInputFileListing = + classDistCp.getDeclaredMethod("createInputFileListing", Job.class); + Method methodCleanup = classDistCp.getDeclaredMethod("cleanup"); + + Field fieldInputOptions = classDistCp.getDeclaredField("inputOptions"); + Field fieldMetaFolder = classDistCp.getDeclaredField("metaFolder"); + Field fieldJobFS = classDistCp.getDeclaredField("jobFS"); + Field fieldSubmitted = classDistCp.getDeclaredField("submitted"); + + methodCreateMetaFolderPath.setAccessible(true); + methodCreateJob.setAccessible(true); + methodCreateInputFileListing.setAccessible(true); + methodCleanup.setAccessible(true); + + fieldInputOptions.setAccessible(true); + fieldMetaFolder.setAccessible(true); + fieldJobFS.setAccessible(true); + fieldSubmitted.setAccessible(true); + + // execute() logic starts here + assert fieldInputOptions.get(this) != null; + + Job job = null; + try { + synchronized (this) { + // Don't cleanup while we are setting up. + fieldMetaFolder.set(this, methodCreateMetaFolderPath.invoke(this)); + fieldJobFS.set(this, ((Path) fieldMetaFolder.get(this)).getFileSystem(super.getConf())); + job = (Job) methodCreateJob.invoke(this); + } + methodCreateInputFileListing.invoke(this, job); + + // Get the total length of the source files + List<Path> srcs = ((DistCpOptions) fieldInputOptions.get(this)).getSourcePaths(); + + long totalSrcLgth = 0; + for (Path aSrc : srcs) { + totalSrcLgth += + BackupUtils.getFilesLength(aSrc.getFileSystem(super.getConf()), aSrc); + } + + // submit the copy job + job.submit(); + fieldSubmitted.set(this, true); + + // after submit the MR job, set its handler in backup handler for cancel process + // this.backupHandler.copyJob = job; + + // Update the copy progress to ZK every 0.5s if progress value changed + int progressReportFreq = + MapReduceBackupCopyJob.this.getConf().getInt("hbase.backup.progressreport.frequency", + 500); + float lastProgress = progressDone; + while (!job.isComplete()) { + float newProgress = + progressDone + job.mapProgress() * subTaskPercntgInWholeTask * (1 - INIT_PROGRESS); + + if (newProgress > lastProgress) { + + BigDecimal progressData = + new BigDecimal(newProgress * 100).setScale(1, BigDecimal.ROUND_HALF_UP); + String newProgressStr = progressData + "%"; + LOG.info("Progress: " + newProgressStr); + updateProgress(backupInfo, backupManager, progressData.intValue(), bytesCopied); + LOG.debug("Backup progress data updated to backup system table: \"Progress: " + + newProgressStr + ".\""); + lastProgress = newProgress; + } + Thread.sleep(progressReportFreq); + } + // update the progress data after copy job complete + float newProgress = + progressDone + job.mapProgress() * subTaskPercntgInWholeTask * (1 - INIT_PROGRESS); + BigDecimal progressData = + new BigDecimal(newProgress * 100).setScale(1, BigDecimal.ROUND_HALF_UP); + + String newProgressStr = progressData + "%"; + LOG.info("Progress: " + newProgressStr + " subTask: " + subTaskPercntgInWholeTask + + " mapProgress: " + job.mapProgress()); + + // accumulate the overall backup progress + progressDone = newProgress; + bytesCopied += totalSrcLgth; + + updateProgress(backupInfo, backupManager, progressData.intValue(), bytesCopied); + LOG.debug("Backup progress data updated to backup system table: \"Progress: " + + newProgressStr + " - " + bytesCopied + " bytes copied.\""); + } catch (Throwable t) { + LOG.error("distcp " + job == null ? "" : job.getJobID() + " encountered error", t); + throw t; + } finally { + if (!fieldSubmitted.getBoolean(this)) { + methodCleanup.invoke(this); + } + } + + String jobID = job.getJobID().toString(); + job.getConfiguration().set(DistCpConstants.CONF_LABEL_DISTCP_JOB_ID, jobID); + + LOG.debug("DistCp job-id: " + jobID + " completed: " + job.isComplete() + " " + + job.isSuccessful()); + Counters ctrs = job.getCounters(); + LOG.debug(ctrs); + if (job.isComplete() && !job.isSuccessful()) { + throw new Exception("DistCp job-id: " + jobID + " failed"); + } + + return job; + } + + } + + /** + * Do backup copy based on different types. + * @param context The backup info + * @param conf The hadoop configuration + * @param copyType The backup copy type + * @param options Options for customized ExportSnapshot or DistCp + * @throws Exception exception + */ + @Override + public int copy(BackupInfo context, BackupManager backupManager, Configuration conf, + BackupType copyType, String[] options) throws IOException { + int res = 0; + + try { + if (copyType == BackupType.FULL) { + SnapshotCopy snapshotCp = new SnapshotCopy(context, context.getTableBySnapshot(options[1])); + LOG.debug("Doing SNAPSHOT_COPY"); + // Make a new instance of conf to be used by the snapshot copy class. + snapshotCp.setConf(new Configuration(conf)); + res = snapshotCp.run(options); + + } else if (copyType == BackupType.INCREMENTAL) { + LOG.debug("Doing COPY_TYPE_DISTCP"); + setSubTaskPercntgInWholeTask(1f); + + BackupDistCp distcp = + new BackupDistCp(new Configuration(conf), null, context, backupManager); + // Handle a special case where the source file is a single file. + // In this case, distcp will not create the target dir. It just take the + // target as a file name and copy source file to the target (as a file name). + // We need to create the target dir before run distcp. + LOG.debug("DistCp options: " + Arrays.toString(options)); + Path dest = new Path(options[options.length - 1]); + FileSystem destfs = dest.getFileSystem(conf); + if (!destfs.exists(dest)) { + destfs.mkdirs(dest); + } + res = distcp.run(options); + } + return res; + + } catch (Exception e) { + throw new IOException(e); + } + } + + @Override + public void cancel(String jobId) throws IOException { + JobID id = JobID.forName(jobId); + Cluster cluster = new Cluster(this.getConf()); + try { + Job job = cluster.getJob(id); + if (job == null) { + LOG.error("No job found for " + id); + // should we throw exception + return; + } + if (job.isComplete() || job.isRetired()) { + return; + } + + job.killJob(); + LOG.debug("Killed copy job " + id); + } catch (InterruptedException e) { + throw new IOException(e); + } + } + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/2dda3712/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupMergeJob.java ---------------------------------------------------------------------- diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupMergeJob.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupMergeJob.java new file mode 100644 index 0000000..00c5b83 --- /dev/null +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupMergeJob.java @@ -0,0 +1,321 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.backup.mapreduce; + +import static org.apache.hadoop.hbase.backup.util.BackupUtils.succeeded; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.backup.BackupInfo; +import org.apache.hadoop.hbase.backup.BackupMergeJob; +import org.apache.hadoop.hbase.backup.BackupRestoreConstants; +import org.apache.hadoop.hbase.backup.HBackupFileSystem; +import org.apache.hadoop.hbase.backup.impl.BackupManifest; +import org.apache.hadoop.hbase.backup.impl.BackupSystemTable; +import org.apache.hadoop.hbase.backup.util.BackupUtils; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.util.Tool; + +/** + * MapReduce implementation of {@link BackupMergeJob} + * Must be initialized with configuration of a backup destination cluster + * + */ + +@InterfaceAudience.Private +public class MapReduceBackupMergeJob implements BackupMergeJob { + public static final Log LOG = LogFactory.getLog(MapReduceBackupMergeJob.class); + + protected Tool player; + protected Configuration conf; + + public MapReduceBackupMergeJob() { + } + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } + + @Override + public void run(String[] backupIds) throws IOException { + String bulkOutputConfKey; + + // TODO : run player on remote cluster + player = new MapReduceHFileSplitterJob(); + bulkOutputConfKey = MapReduceHFileSplitterJob.BULK_OUTPUT_CONF_KEY; + // Player reads all files in arbitrary directory structure and creates + // a Map task for each file + String bids = StringUtils.join(backupIds, ","); + + if (LOG.isDebugEnabled()) { + LOG.debug("Merge backup images " + bids); + } + + List<Pair<TableName, Path>> processedTableList = new ArrayList<Pair<TableName, Path>>(); + boolean finishedTables = false; + Connection conn = ConnectionFactory.createConnection(getConf()); + BackupSystemTable table = new BackupSystemTable(conn); + FileSystem fs = FileSystem.get(getConf()); + + try { + + // Get exclusive lock on backup system + table.startBackupExclusiveOperation(); + // Start merge operation + table.startMergeOperation(backupIds); + + // Select most recent backup id + String mergedBackupId = findMostRecentBackupId(backupIds); + + TableName[] tableNames = getTableNamesInBackupImages(backupIds); + String backupRoot = null; + + BackupInfo bInfo = table.readBackupInfo(backupIds[0]); + backupRoot = bInfo.getBackupRootDir(); + + for (int i = 0; i < tableNames.length; i++) { + + LOG.info("Merge backup images for " + tableNames[i]); + + // Find input directories for table + + Path[] dirPaths = findInputDirectories(fs, backupRoot, tableNames[i], backupIds); + String dirs = StringUtils.join(dirPaths, ","); + Path bulkOutputPath = + BackupUtils.getBulkOutputDir(BackupUtils.getFileNameCompatibleString(tableNames[i]), + getConf(), false); + // Delete content if exists + if (fs.exists(bulkOutputPath)) { + if (!fs.delete(bulkOutputPath, true)) { + LOG.warn("Can not delete: " + bulkOutputPath); + } + } + Configuration conf = getConf(); + conf.set(bulkOutputConfKey, bulkOutputPath.toString()); + String[] playerArgs = { dirs, tableNames[i].getNameAsString() }; + + int result = 0; + + player.setConf(getConf()); + result = player.run(playerArgs); + if (!succeeded(result)) { + throw new IOException("Can not merge backup images for " + dirs + + " (check Hadoop/MR and HBase logs). Player return code =" + result); + } + // Add to processed table list + processedTableList.add(new Pair<TableName, Path>(tableNames[i], bulkOutputPath)); + LOG.debug("Merge Job finished:" + result); + } + List<TableName> tableList = toTableNameList(processedTableList); + table.updateProcessedTablesForMerge(tableList); + finishedTables = true; + + // Move data + for (Pair<TableName, Path> tn : processedTableList) { + moveData(fs, backupRoot, tn.getSecond(), tn.getFirst(), mergedBackupId); + } + + // Delete old data and update manifest + List<String> backupsToDelete = getBackupIdsToDelete(backupIds, mergedBackupId); + deleteBackupImages(backupsToDelete, conn, fs, backupRoot); + updateBackupManifest(backupRoot, mergedBackupId, backupsToDelete); + // Finish merge session + table.finishMergeOperation(); + // Release lock + table.finishBackupExclusiveOperation(); + } catch (RuntimeException e) { + + throw e; + } catch (Exception e) { + LOG.error(e); + if (!finishedTables) { + // cleanup bulk directories and finish merge + // merge MUST be repeated (no need for repair) + cleanupBulkLoadDirs(fs, toPathList(processedTableList)); + table.finishMergeOperation(); + table.finishBackupExclusiveOperation(); + throw new IOException("Backup merge operation failed, you should try it again", e); + } else { + // backup repair must be run + throw new IOException( + "Backup merge operation failed, run backup repair tool to restore system's integrity", + e); + } + } finally { + table.close(); + conn.close(); + } + } + + protected List<Path> toPathList(List<Pair<TableName, Path>> processedTableList) { + ArrayList<Path> list = new ArrayList<Path>(); + for (Pair<TableName, Path> p : processedTableList) { + list.add(p.getSecond()); + } + return list; + } + + protected List<TableName> toTableNameList(List<Pair<TableName, Path>> processedTableList) { + ArrayList<TableName> list = new ArrayList<TableName>(); + for (Pair<TableName, Path> p : processedTableList) { + list.add(p.getFirst()); + } + return list; + } + + protected void cleanupBulkLoadDirs(FileSystem fs, List<Path> pathList) throws IOException { + for (Path path : pathList) { + + if (!fs.delete(path, true)) { + LOG.warn("Can't delete " + path); + } + } + } + + protected void updateBackupManifest(String backupRoot, String mergedBackupId, + List<String> backupsToDelete) throws IllegalArgumentException, IOException { + + BackupManifest manifest = + HBackupFileSystem.getManifest(conf, new Path(backupRoot), mergedBackupId); + manifest.getBackupImage().removeAncestors(backupsToDelete); + // save back + manifest.store(conf); + + } + + protected void deleteBackupImages(List<String> backupIds, Connection conn, FileSystem fs, + String backupRoot) throws IOException { + + // Delete from backup system table + try (BackupSystemTable table = new BackupSystemTable(conn);) { + for (String backupId : backupIds) { + table.deleteBackupInfo(backupId); + } + } + + // Delete from file system + for (String backupId : backupIds) { + Path backupDirPath = HBackupFileSystem.getBackupPath(backupRoot, backupId); + + if (!fs.delete(backupDirPath, true)) { + LOG.warn("Could not delete " + backupDirPath); + } + } + } + + protected List<String> getBackupIdsToDelete(String[] backupIds, String mergedBackupId) { + List<String> list = new ArrayList<String>(); + for (String id : backupIds) { + if (id.equals(mergedBackupId)) { + continue; + } + list.add(id); + } + return list; + } + + protected void moveData(FileSystem fs, String backupRoot, Path bulkOutputPath, TableName tableName, + String mergedBackupId) throws IllegalArgumentException, IOException { + + Path dest = + new Path(HBackupFileSystem.getTableBackupDataDir(backupRoot, mergedBackupId, tableName)); + + // Delete all in dest + if (!fs.delete(dest, true)) { + throw new IOException("Could not delete " + dest); + } + + FileStatus[] fsts = fs.listStatus(bulkOutputPath); + for (FileStatus fst : fsts) { + if (fst.isDirectory()) { + fs.rename(fst.getPath().getParent(), dest); + } + } + + } + + protected String findMostRecentBackupId(String[] backupIds) { + long recentTimestamp = Long.MIN_VALUE; + for (String backupId : backupIds) { + long ts = Long.parseLong(backupId.split("_")[1]); + if (ts > recentTimestamp) { + recentTimestamp = ts; + } + } + return BackupRestoreConstants.BACKUPID_PREFIX + recentTimestamp; + } + + protected TableName[] getTableNamesInBackupImages(String[] backupIds) throws IOException { + + Set<TableName> allSet = new HashSet<TableName>(); + + try (Connection conn = ConnectionFactory.createConnection(conf); + BackupSystemTable table = new BackupSystemTable(conn);) { + for (String backupId : backupIds) { + BackupInfo bInfo = table.readBackupInfo(backupId); + + allSet.addAll(bInfo.getTableNames()); + } + } + + TableName[] ret = new TableName[allSet.size()]; + return allSet.toArray(ret); + } + + protected Path[] findInputDirectories(FileSystem fs, String backupRoot, TableName tableName, + String[] backupIds) throws IOException { + + List<Path> dirs = new ArrayList<Path>(); + + for (String backupId : backupIds) { + Path fileBackupDirPath = + new Path(HBackupFileSystem.getTableBackupDataDir(backupRoot, backupId, tableName)); + if (fs.exists(fileBackupDirPath)) { + dirs.add(fileBackupDirPath); + } else { + if (LOG.isTraceEnabled()) { + LOG.trace("File: " + fileBackupDirPath + " does not exist."); + } + } + } + Path[] ret = new Path[dirs.size()]; + return dirs.toArray(ret); + } + +}