http://git-wip-us.apache.org/repos/asf/hbase/blob/2dda3712/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalBackupManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalBackupManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalBackupManager.java deleted file mode 100644 index 6330899..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalBackupManager.java +++ /dev/null @@ -1,387 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.backup.impl; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Set; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathFilter; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.backup.impl.BackupSystemTable.WALItem; -import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager; -import org.apache.hadoop.hbase.backup.util.BackupUtils; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.util.FSUtils; -import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; - -/** - * After a full backup was created, the incremental backup will only store the changes made after - * the last full or incremental backup. Creating the backup copies the logfiles in .logs and - * .oldlogs since the last backup timestamp. - */ -@InterfaceAudience.Private -public class IncrementalBackupManager extends BackupManager { - public static final Log LOG = LogFactory.getLog(IncrementalBackupManager.class); - - public IncrementalBackupManager(Connection conn, Configuration conf) throws IOException { - super(conn, conf); - } - - /** - * Obtain the list of logs that need to be copied out for this incremental backup. The list is set - * in BackupInfo. - * @return The new HashMap of RS log time stamps after the log roll for this incremental backup. - * @throws IOException exception - */ - public HashMap<String, Long> getIncrBackupLogFileMap() - throws IOException { - List<String> logList; - HashMap<String, Long> newTimestamps; - HashMap<String, Long> previousTimestampMins; - - String savedStartCode = readBackupStartCode(); - - // key: tableName - // value: <RegionServer,PreviousTimeStamp> - HashMap<TableName, HashMap<String, Long>> previousTimestampMap = readLogTimestampMap(); - - previousTimestampMins = BackupUtils.getRSLogTimestampMins(previousTimestampMap); - - if (LOG.isDebugEnabled()) { - LOG.debug("StartCode " + savedStartCode + "for backupID " + backupInfo.getBackupId()); - } - // get all new log files from .logs and .oldlogs after last TS and before new timestamp - if (savedStartCode == null || previousTimestampMins == null - || previousTimestampMins.isEmpty()) { - throw new IOException( - "Cannot read any previous back up timestamps from backup system table. " - + "In order to create an incremental backup, at least one full backup is needed."); - } - - LOG.info("Execute roll log procedure for incremental backup ..."); - HashMap<String, String> props = new HashMap<String, String>(); - props.put("backupRoot", backupInfo.getBackupRootDir()); - - try (Admin admin = conn.getAdmin();) { - - admin.execProcedure(LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_SIGNATURE, - LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_NAME, props); - - } - newTimestamps = readRegionServerLastLogRollResult(); - - logList = getLogFilesForNewBackup(previousTimestampMins, newTimestamps, conf, savedStartCode); - List<WALItem> logFromSystemTable = - getLogFilesFromBackupSystem(previousTimestampMins, newTimestamps, getBackupInfo() - .getBackupRootDir()); - logList = excludeAlreadyBackedUpWALs(logList, logFromSystemTable); - backupInfo.setIncrBackupFileList(logList); - - return newTimestamps; - } - - /** - * Get list of WAL files eligible for incremental backup - * @return list of WAL files - * @throws IOException - */ - public List<String> getIncrBackupLogFileList() - throws IOException { - List<String> logList; - HashMap<String, Long> newTimestamps; - HashMap<String, Long> previousTimestampMins; - - String savedStartCode = readBackupStartCode(); - - // key: tableName - // value: <RegionServer,PreviousTimeStamp> - HashMap<TableName, HashMap<String, Long>> previousTimestampMap = readLogTimestampMap(); - - previousTimestampMins = BackupUtils.getRSLogTimestampMins(previousTimestampMap); - - if (LOG.isDebugEnabled()) { - LOG.debug("StartCode " + savedStartCode + "for backupID " + backupInfo.getBackupId()); - } - // get all new log files from .logs and .oldlogs after last TS and before new timestamp - if (savedStartCode == null || previousTimestampMins == null - || previousTimestampMins.isEmpty()) { - throw new IOException( - "Cannot read any previous back up timestamps from backup system table. " - + "In order to create an incremental backup, at least one full backup is needed."); - } - - newTimestamps = readRegionServerLastLogRollResult(); - - logList = getLogFilesForNewBackup(previousTimestampMins, newTimestamps, conf, savedStartCode); - List<WALItem> logFromSystemTable = - getLogFilesFromBackupSystem(previousTimestampMins, newTimestamps, getBackupInfo() - .getBackupRootDir()); - - logList = excludeAlreadyBackedUpWALs(logList, logFromSystemTable); - backupInfo.setIncrBackupFileList(logList); - - return logList; - } - - - private List<String> excludeAlreadyBackedUpWALs(List<String> logList, - List<WALItem> logFromSystemTable) { - - Set<String> walFileNameSet = convertToSet(logFromSystemTable); - - List<String> list = new ArrayList<String>(); - for (int i=0; i < logList.size(); i++) { - Path p = new Path(logList.get(i)); - String name = p.getName(); - if (walFileNameSet.contains(name)) continue; - list.add(logList.get(i)); - } - return list; - } - - /** - * Create Set of WAL file names (not full path names) - * @param logFromSystemTable - * @return set of WAL file names - */ - private Set<String> convertToSet(List<WALItem> logFromSystemTable) { - - Set<String> set = new HashSet<String>(); - for (int i=0; i < logFromSystemTable.size(); i++) { - WALItem item = logFromSystemTable.get(i); - set.add(item.walFile); - } - return set; - } - - /** - * For each region server: get all log files newer than the last timestamps, but not newer than - * the newest timestamps. - * @param olderTimestamps timestamp map for each region server of the last backup. - * @param newestTimestamps timestamp map for each region server that the backup should lead to. - * @return list of log files which needs to be added to this backup - * @throws IOException - */ - private List<WALItem> getLogFilesFromBackupSystem(HashMap<String, Long> olderTimestamps, - HashMap<String, Long> newestTimestamps, String backupRoot) throws IOException { - List<WALItem> logFiles = new ArrayList<WALItem>(); - Iterator<WALItem> it = getWALFilesFromBackupSystem(); - while (it.hasNext()) { - WALItem item = it.next(); - String rootDir = item.getBackupRoot(); - if (!rootDir.equals(backupRoot)) { - continue; - } - String walFileName = item.getWalFile(); - String server = BackupUtils.parseHostNameFromLogFile(new Path(walFileName)); - if (server == null) { - continue; - } - Long tss = getTimestamp(walFileName); - Long oldTss = olderTimestamps.get(server); - Long newTss = newestTimestamps.get(server); - if (oldTss == null) { - logFiles.add(item); - continue; - } - if (newTss == null) { - newTss = Long.MAX_VALUE; - } - if (tss > oldTss && tss < newTss) { - logFiles.add(item); - } - } - return logFiles; - } - - private Long getTimestamp(String walFileName) { - int index = walFileName.lastIndexOf(BackupUtils.LOGNAME_SEPARATOR); - return Long.parseLong(walFileName.substring(index + 1)); - } - - /** - * For each region server: get all log files newer than the last timestamps but not newer than the - * newest timestamps. - * @param olderTimestamps the timestamp for each region server of the last backup. - * @param newestTimestamps the timestamp for each region server that the backup should lead to. - * @param conf the Hadoop and Hbase configuration - * @param savedStartCode the startcode (timestamp) of last successful backup. - * @return a list of log files to be backed up - * @throws IOException exception - */ - private List<String> getLogFilesForNewBackup(HashMap<String, Long> olderTimestamps, - HashMap<String, Long> newestTimestamps, Configuration conf, String savedStartCode) - throws IOException { - LOG.debug("In getLogFilesForNewBackup()\n" + "olderTimestamps: " + olderTimestamps - + "\n newestTimestamps: " + newestTimestamps); - Path rootdir = FSUtils.getRootDir(conf); - Path logDir = new Path(rootdir, HConstants.HREGION_LOGDIR_NAME); - Path oldLogDir = new Path(rootdir, HConstants.HREGION_OLDLOGDIR_NAME); - FileSystem fs = rootdir.getFileSystem(conf); - NewestLogFilter pathFilter = new NewestLogFilter(); - - List<String> resultLogFiles = new ArrayList<String>(); - List<String> newestLogs = new ArrayList<String>(); - - /* - * The old region servers and timestamps info we kept in backup system table may be out of sync - * if new region server is added or existing one lost. We'll deal with it here when processing - * the logs. If data in backup system table has more hosts, just ignore it. If the .logs - * directory includes more hosts, the additional hosts will not have old timestamps to compare - * with. We'll just use all the logs in that directory. We always write up-to-date region server - * and timestamp info to backup system table at the end of successful backup. - */ - - FileStatus[] rss; - Path p; - String host; - Long oldTimeStamp; - String currentLogFile; - long currentLogTS; - - // Get the files in .logs. - rss = fs.listStatus(logDir); - for (FileStatus rs : rss) { - p = rs.getPath(); - host = BackupUtils.parseHostNameFromLogFile(p); - if (host == null) { - continue; - } - FileStatus[] logs; - oldTimeStamp = olderTimestamps.get(host); - // It is possible that there is no old timestamp in backup system table for this host if - // this region server is newly added after our last backup. - if (oldTimeStamp == null) { - logs = fs.listStatus(p); - } else { - pathFilter.setLastBackupTS(oldTimeStamp); - logs = fs.listStatus(p, pathFilter); - } - for (FileStatus log : logs) { - LOG.debug("currentLogFile: " + log.getPath().toString()); - if (AbstractFSWALProvider.isMetaFile(log.getPath())) { - if (LOG.isDebugEnabled()) { - LOG.debug("Skip hbase:meta log file: " + log.getPath().getName()); - } - continue; - } - currentLogFile = log.getPath().toString(); - resultLogFiles.add(currentLogFile); - currentLogTS = BackupUtils.getCreationTime(log.getPath()); - // newestTimestamps is up-to-date with the current list of hosts - // so newestTimestamps.get(host) will not be null. - if (currentLogTS > newestTimestamps.get(host)) { - newestLogs.add(currentLogFile); - } - } - } - - // Include the .oldlogs files too. - FileStatus[] oldlogs = fs.listStatus(oldLogDir); - for (FileStatus oldlog : oldlogs) { - p = oldlog.getPath(); - currentLogFile = p.toString(); - if (AbstractFSWALProvider.isMetaFile(p)) { - if (LOG.isDebugEnabled()) { - LOG.debug("Skip .meta log file: " + currentLogFile); - } - continue; - } - host = BackupUtils.parseHostFromOldLog(p); - if (host == null) { - continue; - } - currentLogTS = BackupUtils.getCreationTime(p); - oldTimeStamp = olderTimestamps.get(host); - /* - * It is possible that there is no old timestamp in backup system table for this host. At the - * time of our last backup operation, this rs did not exist. The reason can be one of the two: - * 1. The rs already left/crashed. Its logs were moved to .oldlogs. 2. The rs was added after - * our last backup. - */ - if (oldTimeStamp == null) { - if (currentLogTS < Long.parseLong(savedStartCode)) { - // This log file is really old, its region server was before our last backup. - continue; - } else { - resultLogFiles.add(currentLogFile); - } - } else if (currentLogTS > oldTimeStamp) { - resultLogFiles.add(currentLogFile); - } - - // It is possible that a host in .oldlogs is an obsolete region server - // so newestTimestamps.get(host) here can be null. - // Even if these logs belong to a obsolete region server, we still need - // to include they to avoid loss of edits for backup. - Long newTimestamp = newestTimestamps.get(host); - if (newTimestamp != null && currentLogTS > newTimestamp) { - newestLogs.add(currentLogFile); - } - } - // remove newest log per host because they are still in use - resultLogFiles.removeAll(newestLogs); - return resultLogFiles; - } - - static class NewestLogFilter implements PathFilter { - private Long lastBackupTS = 0L; - - public NewestLogFilter() { - } - - protected void setLastBackupTS(Long ts) { - this.lastBackupTS = ts; - } - - @Override - public boolean accept(Path path) { - // skip meta table log -- ts.meta file - if (AbstractFSWALProvider.isMetaFile(path)) { - if (LOG.isDebugEnabled()) { - LOG.debug("Skip .meta log file: " + path.getName()); - } - return false; - } - long timestamp; - try { - timestamp = BackupUtils.getCreationTime(path); - return timestamp > lastBackupTS; - } catch (Exception e) { - LOG.warn("Cannot read timestamp of log file " + path); - return false; - } - } - } - -}
http://git-wip-us.apache.org/repos/asf/hbase/blob/2dda3712/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java deleted file mode 100644 index 6d48c32..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java +++ /dev/null @@ -1,377 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.backup.impl; - -import java.io.FileNotFoundException; -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeMap; - -import org.apache.commons.lang.StringUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.FileUtil; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.backup.BackupCopyJob; -import org.apache.hadoop.hbase.backup.BackupInfo; -import org.apache.hadoop.hbase.backup.BackupInfo.BackupPhase; -import org.apache.hadoop.hbase.backup.BackupRequest; -import org.apache.hadoop.hbase.backup.BackupRestoreFactory; -import org.apache.hadoop.hbase.backup.BackupType; -import org.apache.hadoop.hbase.backup.util.BackupUtils; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.mapreduce.WALPlayer; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.FSUtils; -import org.apache.hadoop.hbase.util.HFileArchiveUtil; -import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; -import org.apache.hadoop.util.Tool; - -/** - * Incremental backup implementation. - * See the {@link #execute() execute} method. - * - */ -@InterfaceAudience.Private -public class IncrementalTableBackupClient extends TableBackupClient { - private static final Log LOG = LogFactory.getLog(IncrementalTableBackupClient.class); - - protected IncrementalTableBackupClient() { - } - - public IncrementalTableBackupClient(final Connection conn, final String backupId, - BackupRequest request) throws IOException { - super(conn, backupId, request); - } - - protected List<String> filterMissingFiles(List<String> incrBackupFileList) throws IOException { - FileSystem fs = FileSystem.get(conf); - List<String> list = new ArrayList<String>(); - for (String file : incrBackupFileList) { - Path p = new Path(file); - if (fs.exists(p) || isActiveWalPath(p)) { - list.add(file); - } else { - LOG.warn("Can't find file: " + file); - } - } - return list; - } - - /** - * Check if a given path is belongs to active WAL directory - * @param p path - * @return true, if yes - */ - protected boolean isActiveWalPath(Path p) { - return !AbstractFSWALProvider.isArchivedLogFile(p); - } - - protected static int getIndex(TableName tbl, List<TableName> sTableList) { - if (sTableList == null) return 0; - for (int i = 0; i < sTableList.size(); i++) { - if (tbl.equals(sTableList.get(i))) { - return i; - } - } - return -1; - } - - /* - * Reads bulk load records from backup table, iterates through the records and forms the paths - * for bulk loaded hfiles. Copies the bulk loaded hfiles to backup destination - * @param sTableList list of tables to be backed up - * @return map of table to List of files - */ - protected Map<byte[], List<Path>>[] handleBulkLoad(List<TableName> sTableList) throws IOException { - Map<byte[], List<Path>>[] mapForSrc = new Map[sTableList.size()]; - Pair<Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>>, List<byte[]>> pair = - backupManager.readBulkloadRows(sTableList); - Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>> map = pair.getFirst(); - FileSystem fs = FileSystem.get(conf); - FileSystem tgtFs; - try { - tgtFs = FileSystem.get(new URI(backupInfo.getBackupRootDir()), conf); - } catch (URISyntaxException use) { - throw new IOException("Unable to get FileSystem", use); - } - Path rootdir = FSUtils.getRootDir(conf); - Path tgtRoot = new Path(new Path(backupInfo.getBackupRootDir()), backupId); - for (Map.Entry<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>> tblEntry : - map.entrySet()) { - TableName srcTable = tblEntry.getKey(); - int srcIdx = getIndex(srcTable, sTableList); - if (srcIdx < 0) { - LOG.warn("Couldn't find " + srcTable + " in source table List"); - continue; - } - if (mapForSrc[srcIdx] == null) { - mapForSrc[srcIdx] = new TreeMap<byte[], List<Path>>(Bytes.BYTES_COMPARATOR); - } - Path tblDir = FSUtils.getTableDir(rootdir, srcTable); - Path tgtTable = new Path(new Path(tgtRoot, srcTable.getNamespaceAsString()), - srcTable.getQualifierAsString()); - for (Map.Entry<String,Map<String,List<Pair<String, Boolean>>>> regionEntry : - tblEntry.getValue().entrySet()){ - String regionName = regionEntry.getKey(); - Path regionDir = new Path(tblDir, regionName); - // map from family to List of hfiles - for (Map.Entry<String,List<Pair<String, Boolean>>> famEntry : - regionEntry.getValue().entrySet()) { - String fam = famEntry.getKey(); - Path famDir = new Path(regionDir, fam); - List<Path> files; - if (!mapForSrc[srcIdx].containsKey(fam.getBytes())) { - files = new ArrayList<Path>(); - mapForSrc[srcIdx].put(fam.getBytes(), files); - } else { - files = mapForSrc[srcIdx].get(fam.getBytes()); - } - Path archiveDir = HFileArchiveUtil.getStoreArchivePath(conf, srcTable, regionName, fam); - String tblName = srcTable.getQualifierAsString(); - Path tgtFam = new Path(new Path(tgtTable, regionName), fam); - if (!tgtFs.mkdirs(tgtFam)) { - throw new IOException("couldn't create " + tgtFam); - } - for (Pair<String, Boolean> fileWithState : famEntry.getValue()) { - String file = fileWithState.getFirst(); - boolean raw = fileWithState.getSecond(); - int idx = file.lastIndexOf("/"); - String filename = file; - if (idx > 0) { - filename = file.substring(idx+1); - } - Path p = new Path(famDir, filename); - Path tgt = new Path(tgtFam, filename); - Path archive = new Path(archiveDir, filename); - if (fs.exists(p)) { - if (LOG.isTraceEnabled()) { - LOG.trace("found bulk hfile " + file + " in " + famDir + " for " + tblName); - } - try { - if (LOG.isTraceEnabled()) { - LOG.trace("copying " + p + " to " + tgt); - } - FileUtil.copy(fs, p, tgtFs, tgt, false,conf); - } catch (FileNotFoundException e) { - LOG.debug("copying archive " + archive + " to " + tgt); - try { - FileUtil.copy(fs, archive, tgtFs, tgt, false, conf); - } catch (FileNotFoundException fnfe) { - if (!raw) throw fnfe; - } - } - } else { - LOG.debug("copying archive " + archive + " to " + tgt); - try { - FileUtil.copy(fs, archive, tgtFs, tgt, false, conf); - } catch (FileNotFoundException fnfe) { - if (!raw) throw fnfe; - } - } - files.add(tgt); - } - } - } - } - backupManager.writeBulkLoadedFiles(sTableList, mapForSrc); - backupManager.removeBulkLoadedRows(sTableList, pair.getSecond()); - return mapForSrc; - } - - @Override - public void execute() throws IOException { - - try { - // case PREPARE_INCREMENTAL: - beginBackup(backupManager, backupInfo); - backupInfo.setPhase(BackupPhase.PREPARE_INCREMENTAL); - LOG.debug("For incremental backup, current table set is " - + backupManager.getIncrementalBackupTableSet()); - newTimestamps = - ((IncrementalBackupManager) backupManager).getIncrBackupLogFileMap(); - } catch (Exception e) { - // fail the overall backup and return - failBackup(conn, backupInfo, backupManager, e, "Unexpected Exception : ", - BackupType.INCREMENTAL, conf); - return; - } - - // case INCREMENTAL_COPY: - try { - // copy out the table and region info files for each table - BackupUtils.copyTableRegionInfo(conn, backupInfo, conf); - // convert WAL to HFiles and copy them to .tmp under BACKUP_ROOT - convertWALsToHFiles(backupInfo); - incrementalCopyHFiles(backupInfo); - // Save list of WAL files copied - backupManager.recordWALFiles(backupInfo.getIncrBackupFileList()); - } catch (Exception e) { - String msg = "Unexpected exception in incremental-backup: incremental copy " + backupId; - // fail the overall backup and return - failBackup(conn, backupInfo, backupManager, e, msg, BackupType.INCREMENTAL, conf); - return; - } - // case INCR_BACKUP_COMPLETE: - // set overall backup status: complete. Here we make sure to complete the backup. - // After this checkpoint, even if entering cancel process, will let the backup finished - try { - // Set the previousTimestampMap which is before this current log roll to the manifest. - HashMap<TableName, HashMap<String, Long>> previousTimestampMap = - backupManager.readLogTimestampMap(); - backupInfo.setIncrTimestampMap(previousTimestampMap); - - // The table list in backupInfo is good for both full backup and incremental backup. - // For incremental backup, it contains the incremental backup table set. - backupManager.writeRegionServerLogTimestamp(backupInfo.getTables(), newTimestamps); - - HashMap<TableName, HashMap<String, Long>> newTableSetTimestampMap = - backupManager.readLogTimestampMap(); - - Long newStartCode = - BackupUtils.getMinValue(BackupUtils.getRSLogTimestampMins(newTableSetTimestampMap)); - backupManager.writeBackupStartCode(newStartCode); - - handleBulkLoad(backupInfo.getTableNames()); - // backup complete - completeBackup(conn, backupInfo, backupManager, BackupType.INCREMENTAL, conf); - - } catch (IOException e) { - failBackup(conn, backupInfo, backupManager, e, "Unexpected Exception : ", - BackupType.INCREMENTAL, conf); - } - } - - protected void incrementalCopyHFiles(BackupInfo backupInfo) throws Exception { - - try { - LOG.debug("Incremental copy HFiles is starting."); - // set overall backup phase: incremental_copy - backupInfo.setPhase(BackupPhase.INCREMENTAL_COPY); - // get incremental backup file list and prepare parms for DistCp - List<String> incrBackupFileList = new ArrayList<String>(); - // Add Bulk output - incrBackupFileList.add(getBulkOutputDir().toString()); - String[] strArr = incrBackupFileList.toArray(new String[incrBackupFileList.size() + 1]); - strArr[strArr.length - 1] = backupInfo.getBackupRootDir(); - BackupCopyJob copyService = BackupRestoreFactory.getBackupCopyJob(conf); - int res = copyService.copy(backupInfo, backupManager, conf, BackupType.INCREMENTAL, strArr); - if (res != 0) { - LOG.error("Copy incremental HFile files failed with return code: " + res + "."); - throw new IOException("Failed copy from " + StringUtils.join(incrBackupFileList, ',') - + " to " + backupInfo.getHLogTargetDir()); - } - LOG.debug("Incremental copy HFiles from " + StringUtils.join(incrBackupFileList, ',') - + " to " + backupInfo.getBackupRootDir() + " finished."); - } finally { - deleteBulkLoadDirectory(); - } - } - - protected void deleteBulkLoadDirectory() throws IOException { - // delete original bulk load directory on method exit - Path path = getBulkOutputDir(); - FileSystem fs = FileSystem.get(conf); - boolean result = fs.delete(path, true); - if (!result) { - LOG.warn("Could not delete " + path); - } - - } - - protected void convertWALsToHFiles(BackupInfo backupInfo) throws IOException { - // get incremental backup file list and prepare parameters for DistCp - List<String> incrBackupFileList = backupInfo.getIncrBackupFileList(); - // Get list of tables in incremental backup set - Set<TableName> tableSet = backupManager.getIncrementalBackupTableSet(); - // filter missing files out (they have been copied by previous backups) - incrBackupFileList = filterMissingFiles(incrBackupFileList); - for (TableName table : tableSet) { - // Check if table exists - if (tableExists(table, conn)) { - walToHFiles(incrBackupFileList, table); - } else { - LOG.warn("Table " + table + " does not exists. Skipping in WAL converter"); - } - } - } - - - protected boolean tableExists(TableName table, Connection conn) throws IOException { - try (Admin admin = conn.getAdmin();) { - return admin.tableExists(table); - } - } - - protected void walToHFiles(List<String> dirPaths, TableName tableName) throws IOException { - - Tool player = new WALPlayer(); - - // Player reads all files in arbitrary directory structure and creates - // a Map task for each file. We use ';' as separator - // because WAL file names contains ',' - String dirs = StringUtils.join(dirPaths, ';'); - - Path bulkOutputPath = getBulkOutputDirForTable(tableName); - conf.set(WALPlayer.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString()); - conf.set(WALPlayer.INPUT_FILES_SEPARATOR_KEY, ";"); - String[] playerArgs = { dirs, tableName.getNameAsString() }; - - try { - player.setConf(conf); - int result = player.run(playerArgs); - if(result != 0) { - throw new IOException("WAL Player failed"); - } - conf.unset(WALPlayer.INPUT_FILES_SEPARATOR_KEY); - } catch (IOException e) { - throw e; - } catch (Exception ee) { - throw new IOException("Can not convert from directory " + dirs - + " (check Hadoop, HBase and WALPlayer M/R job logs) ", ee); - } - } - - protected Path getBulkOutputDirForTable(TableName table) { - Path tablePath = getBulkOutputDir(); - tablePath = new Path(tablePath, table.getNamespaceAsString()); - tablePath = new Path(tablePath, table.getQualifierAsString()); - return new Path(tablePath, "data"); - } - - protected Path getBulkOutputDir() { - String backupId = backupInfo.getBackupId(); - Path path = new Path(backupInfo.getBackupRootDir()); - path = new Path(path, ".tmp"); - path = new Path(path, backupId); - return path; - } - -} http://git-wip-us.apache.org/repos/asf/hbase/blob/2dda3712/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java deleted file mode 100644 index ea7a7b8..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java +++ /dev/null @@ -1,278 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.backup.impl; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeSet; - -import org.apache.commons.lang.StringUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.backup.BackupType; -import org.apache.hadoop.hbase.backup.HBackupFileSystem; -import org.apache.hadoop.hbase.backup.RestoreRequest; -import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage; -import org.apache.hadoop.hbase.backup.util.BackupUtils; -import org.apache.hadoop.hbase.backup.util.RestoreTool; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; -import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles.LoadQueueItem; - -/** - * Restore table implementation - * - */ -@InterfaceAudience.Private -public class RestoreTablesClient { - private static final Log LOG = LogFactory.getLog(RestoreTablesClient.class); - - private Configuration conf; - private Connection conn; - private String backupId; - private TableName[] sTableArray; - private TableName[] tTableArray; - private String targetRootDir; - private boolean isOverwrite; - - public RestoreTablesClient(Connection conn, RestoreRequest request) throws IOException { - this.targetRootDir = request.getBackupRootDir(); - this.backupId = request.getBackupId(); - this.sTableArray = request.getFromTables(); - this.tTableArray = request.getToTables(); - if (tTableArray == null || tTableArray.length == 0) { - this.tTableArray = sTableArray; - } - this.isOverwrite = request.isOverwrite(); - this.conn = conn; - this.conf = conn.getConfiguration(); - - } - - /** - * Validate target tables - * @param conn connection - * @param mgr table state manager - * @param tTableArray: target tables - * @param isOverwrite overwrite existing table - * @throws IOException exception - */ - private void checkTargetTables(TableName[] tTableArray, boolean isOverwrite) throws IOException { - ArrayList<TableName> existTableList = new ArrayList<>(); - ArrayList<TableName> disabledTableList = new ArrayList<>(); - - // check if the tables already exist - try (Admin admin = conn.getAdmin();) { - for (TableName tableName : tTableArray) { - if (admin.tableExists(tableName)) { - existTableList.add(tableName); - if (admin.isTableDisabled(tableName)) { - disabledTableList.add(tableName); - } - } else { - LOG.info("HBase table " + tableName - + " does not exist. It will be created during restore process"); - } - } - } - - if (existTableList.size() > 0) { - if (!isOverwrite) { - LOG.error("Existing table (" + existTableList - + ") found in the restore target, please add " - + "\"-overwrite\" option in the command if you mean" - + " to restore to these existing tables"); - throw new IOException("Existing table found in target while no \"-overwrite\" " - + "option found"); - } else { - if (disabledTableList.size() > 0) { - LOG.error("Found offline table in the restore target, " - + "please enable them before restore with \"-overwrite\" option"); - LOG.info("Offline table list in restore target: " + disabledTableList); - throw new IOException( - "Found offline table in the target when restore with \"-overwrite\" option"); - } - } - } - } - - /** - * Restore operation handle each backupImage in array - * @param svc: master services - * @param images: array BackupImage - * @param sTable: table to be restored - * @param tTable: table to be restored to - * @param truncateIfExists: truncate table - * @throws IOException exception - */ - - private void restoreImages(BackupImage[] images, TableName sTable, TableName tTable, - boolean truncateIfExists) throws IOException { - - // First image MUST be image of a FULL backup - BackupImage image = images[0]; - String rootDir = image.getRootDir(); - String backupId = image.getBackupId(); - Path backupRoot = new Path(rootDir); - RestoreTool restoreTool = new RestoreTool(conf, backupRoot, backupId); - Path tableBackupPath = HBackupFileSystem.getTableBackupPath(sTable, backupRoot, backupId); - String lastIncrBackupId = images.length == 1 ? null : images[images.length - 1].getBackupId(); - // We need hFS only for full restore (see the code) - BackupManifest manifest = HBackupFileSystem.getManifest(conf, backupRoot, backupId); - if (manifest.getType() == BackupType.FULL) { - LOG.info("Restoring '" + sTable + "' to '" + tTable + "' from full" + " backup image " - + tableBackupPath.toString()); - restoreTool.fullRestoreTable(conn, tableBackupPath, sTable, tTable, truncateIfExists, - lastIncrBackupId); - } else { // incremental Backup - throw new IOException("Unexpected backup type " + image.getType()); - } - - if (images.length == 1) { - // full backup restore done - return; - } - - List<Path> dirList = new ArrayList<Path>(); - // add full backup path - // full backup path comes first - for (int i = 1; i < images.length; i++) { - BackupImage im = images[i]; - String fileBackupDir = - HBackupFileSystem.getTableBackupDataDir(im.getRootDir(), im.getBackupId(), sTable); - dirList.add(new Path(fileBackupDir)); - } - - String dirs = StringUtils.join(dirList, ","); - LOG.info("Restoring '" + sTable + "' to '" + tTable + "' from log dirs: " + dirs); - Path[] paths = new Path[dirList.size()]; - dirList.toArray(paths); - restoreTool.incrementalRestoreTable(conn, tableBackupPath, paths, new TableName[] { sTable }, - new TableName[] { tTable }, lastIncrBackupId); - LOG.info(sTable + " has been successfully restored to " + tTable); - } - - /** - * Restore operation. Stage 2: resolved Backup Image dependency - * @param backupManifestMap : tableName, Manifest - * @param sTableArray The array of tables to be restored - * @param tTableArray The array of mapping tables to restore to - * @return set of BackupImages restored - * @throws IOException exception - */ - private void restore(HashMap<TableName, BackupManifest> backupManifestMap, - TableName[] sTableArray, TableName[] tTableArray, boolean isOverwrite) throws IOException { - TreeSet<BackupImage> restoreImageSet = new TreeSet<BackupImage>(); - boolean truncateIfExists = isOverwrite; - Set<String> backupIdSet = new HashSet<>(); - - for (int i = 0; i < sTableArray.length; i++) { - TableName table = sTableArray[i]; - - BackupManifest manifest = backupManifestMap.get(table); - // Get the image list of this backup for restore in time order from old - // to new. - List<BackupImage> list = new ArrayList<BackupImage>(); - list.add(manifest.getBackupImage()); - TreeSet<BackupImage> set = new TreeSet<BackupImage>(list); - List<BackupImage> depList = manifest.getDependentListByTable(table); - set.addAll(depList); - BackupImage[] arr = new BackupImage[set.size()]; - set.toArray(arr); - restoreImages(arr, table, tTableArray[i], truncateIfExists); - restoreImageSet.addAll(list); - if (restoreImageSet != null && !restoreImageSet.isEmpty()) { - LOG.info("Restore includes the following image(s):"); - for (BackupImage image : restoreImageSet) { - LOG.info("Backup: " + image.getBackupId() + " " - + HBackupFileSystem.getTableBackupDir(image.getRootDir(), image.getBackupId(), table)); - if (image.getType() == BackupType.INCREMENTAL) { - backupIdSet.add(image.getBackupId()); - LOG.debug("adding " + image.getBackupId() + " for bulk load"); - } - } - } - } - try (BackupSystemTable table = new BackupSystemTable(conn)) { - List<TableName> sTableList = Arrays.asList(sTableArray); - for (String id : backupIdSet) { - LOG.debug("restoring bulk load for " + id); - Map<byte[], List<Path>>[] mapForSrc = table.readBulkLoadedFiles(id, sTableList); - Map<LoadQueueItem, ByteBuffer> loaderResult; - conf.setBoolean(LoadIncrementalHFiles.ALWAYS_COPY_FILES, true); - LoadIncrementalHFiles loader = BackupUtils.createLoader(conf); - for (int i = 0; i < sTableList.size(); i++) { - if (mapForSrc[i] != null && !mapForSrc[i].isEmpty()) { - loaderResult = loader.run(null, mapForSrc[i], tTableArray[i]); - LOG.debug("bulk loading " + sTableList.get(i) + " to " + tTableArray[i]); - if (loaderResult.isEmpty()) { - String msg = "Couldn't bulk load for " + sTableList.get(i) + " to " + tTableArray[i]; - LOG.error(msg); - throw new IOException(msg); - } - } - } - } - } - LOG.debug("restoreStage finished"); - } - - static long getTsFromBackupId(String backupId) { - if (backupId == null) { - return 0; - } - return Long.parseLong(backupId.substring(backupId.lastIndexOf("_") + 1)); - } - - static boolean withinRange(long a, long lower, long upper) { - if (a < lower || a > upper) { - return false; - } - return true; - } - - public void execute() throws IOException { - - // case VALIDATION: - // check the target tables - checkTargetTables(tTableArray, isOverwrite); - - // case RESTORE_IMAGES: - HashMap<TableName, BackupManifest> backupManifestMap = new HashMap<>(); - // check and load backup image manifest for the tables - Path rootPath = new Path(targetRootDir); - HBackupFileSystem.checkImageManifestExist(backupManifestMap, sTableArray, conf, rootPath, - backupId); - - restore(backupManifestMap, sTableArray, tTableArray, isOverwrite); - } - -} http://git-wip-us.apache.org/repos/asf/hbase/blob/2dda3712/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java deleted file mode 100644 index 6eec460..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java +++ /dev/null @@ -1,436 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.backup.impl; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.backup.BackupInfo; -import org.apache.hadoop.hbase.backup.BackupInfo.BackupPhase; -import org.apache.hadoop.hbase.backup.BackupInfo.BackupState; -import org.apache.hadoop.hbase.backup.BackupRequest; -import org.apache.hadoop.hbase.backup.BackupRestoreConstants; -import org.apache.hadoop.hbase.backup.BackupType; -import org.apache.hadoop.hbase.backup.HBackupFileSystem; -import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.util.FSUtils; - -import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; - -/** - * Base class for backup operation. Concrete implementation for - * full and incremental backup are delegated to corresponding sub-classes: - * {@link FullTableBackupClient} and {@link IncrementalTableBackupClient} - * - */ -@InterfaceAudience.Private -public abstract class TableBackupClient { - - public static final String BACKUP_CLIENT_IMPL_CLASS = "backup.client.impl.class"; - - @VisibleForTesting - public static final String BACKUP_TEST_MODE_STAGE = "backup.test.mode.stage"; - - private static final Log LOG = LogFactory.getLog(TableBackupClient.class); - - protected Configuration conf; - protected Connection conn; - protected String backupId; - protected List<TableName> tableList; - protected HashMap<String, Long> newTimestamps = null; - - protected BackupManager backupManager; - protected BackupInfo backupInfo; - - public TableBackupClient() { - } - - public TableBackupClient(final Connection conn, final String backupId, BackupRequest request) - throws IOException { - init(conn, backupId, request); - } - - public void init(final Connection conn, final String backupId, BackupRequest request) - throws IOException - { - if (request.getBackupType() == BackupType.FULL) { - backupManager = new BackupManager(conn, conn.getConfiguration()); - } else { - backupManager = new IncrementalBackupManager(conn, conn.getConfiguration()); - } - this.backupId = backupId; - this.tableList = request.getTableList(); - this.conn = conn; - this.conf = conn.getConfiguration(); - backupInfo = - backupManager.createBackupInfo(backupId, request.getBackupType(), tableList, - request.getTargetRootDir(), request.getTotalTasks(), request.getBandwidth()); - if (tableList == null || tableList.isEmpty()) { - this.tableList = new ArrayList<>(backupInfo.getTables()); - } - // Start new session - backupManager.startBackupSession(); - } - - /** - * Begin the overall backup. - * @param backupInfo backup info - * @throws IOException exception - */ - protected void beginBackup(BackupManager backupManager, BackupInfo backupInfo) - throws IOException { - - BackupSystemTable.snapshot(conn); - backupManager.setBackupInfo(backupInfo); - // set the start timestamp of the overall backup - long startTs = EnvironmentEdgeManager.currentTime(); - backupInfo.setStartTs(startTs); - // set overall backup status: ongoing - backupInfo.setState(BackupState.RUNNING); - backupInfo.setPhase(BackupPhase.REQUEST); - LOG.info("Backup " + backupInfo.getBackupId() + " started at " + startTs + "."); - - backupManager.updateBackupInfo(backupInfo); - if (LOG.isDebugEnabled()) { - LOG.debug("Backup session " + backupInfo.getBackupId() + " has been started."); - } - } - - protected String getMessage(Exception e) { - String msg = e.getMessage(); - if (msg == null || msg.equals("")) { - msg = e.getClass().getName(); - } - return msg; - } - - /** - * Delete HBase snapshot for backup. - * @param backupInfo backup info - * @throws Exception exception - */ - protected static void deleteSnapshots(final Connection conn, BackupInfo backupInfo, Configuration conf) - throws IOException { - LOG.debug("Trying to delete snapshot for full backup."); - for (String snapshotName : backupInfo.getSnapshotNames()) { - if (snapshotName == null) { - continue; - } - LOG.debug("Trying to delete snapshot: " + snapshotName); - - try (Admin admin = conn.getAdmin();) { - admin.deleteSnapshot(snapshotName); - } - LOG.debug("Deleting the snapshot " + snapshotName + " for backup " + backupInfo.getBackupId() - + " succeeded."); - } - } - - /** - * Clean up directories with prefix "exportSnapshot-", which are generated when exporting - * snapshots. - * @throws IOException exception - */ - protected static void cleanupExportSnapshotLog(Configuration conf) throws IOException { - FileSystem fs = FSUtils.getCurrentFileSystem(conf); - Path stagingDir = - new Path(conf.get(BackupRestoreConstants.CONF_STAGING_ROOT, fs.getWorkingDirectory() - .toString())); - FileStatus[] files = FSUtils.listStatus(fs, stagingDir); - if (files == null) { - return; - } - for (FileStatus file : files) { - if (file.getPath().getName().startsWith("exportSnapshot-")) { - LOG.debug("Delete log files of exporting snapshot: " + file.getPath().getName()); - if (FSUtils.delete(fs, file.getPath(), true) == false) { - LOG.warn("Can not delete " + file.getPath()); - } - } - } - } - - /** - * Clean up the uncompleted data at target directory if the ongoing backup has already entered - * the copy phase. - */ - protected static void cleanupTargetDir(BackupInfo backupInfo, Configuration conf) { - try { - // clean up the uncompleted data at target directory if the ongoing backup has already entered - // the copy phase - LOG.debug("Trying to cleanup up target dir. Current backup phase: " - + backupInfo.getPhase()); - if (backupInfo.getPhase().equals(BackupPhase.SNAPSHOTCOPY) - || backupInfo.getPhase().equals(BackupPhase.INCREMENTAL_COPY) - || backupInfo.getPhase().equals(BackupPhase.STORE_MANIFEST)) { - FileSystem outputFs = - FileSystem.get(new Path(backupInfo.getBackupRootDir()).toUri(), conf); - - // now treat one backup as a transaction, clean up data that has been partially copied at - // table level - for (TableName table : backupInfo.getTables()) { - Path targetDirPath = - new Path(HBackupFileSystem.getTableBackupDir(backupInfo.getBackupRootDir(), - backupInfo.getBackupId(), table)); - if (outputFs.delete(targetDirPath, true)) { - LOG.debug("Cleaning up uncompleted backup data at " + targetDirPath.toString() - + " done."); - } else { - LOG.debug("No data has been copied to " + targetDirPath.toString() + "."); - } - - Path tableDir = targetDirPath.getParent(); - FileStatus[] backups = FSUtils.listStatus(outputFs, tableDir); - if (backups == null || backups.length == 0) { - outputFs.delete(tableDir, true); - LOG.debug(tableDir.toString() + " is empty, remove it."); - } - } - } - - } catch (IOException e1) { - LOG.error("Cleaning up uncompleted backup data of " + backupInfo.getBackupId() + " at " - + backupInfo.getBackupRootDir() + " failed due to " + e1.getMessage() + "."); - } - } - - /** - * Fail the overall backup. - * @param backupInfo backup info - * @param e exception - * @throws Exception exception - */ - protected void failBackup(Connection conn, BackupInfo backupInfo, BackupManager backupManager, - Exception e, String msg, BackupType type, Configuration conf) throws IOException { - - try { - LOG.error(msg + getMessage(e), e); - // If this is a cancel exception, then we've already cleaned. - // set the failure timestamp of the overall backup - backupInfo.setCompleteTs(EnvironmentEdgeManager.currentTime()); - // set failure message - backupInfo.setFailedMsg(e.getMessage()); - // set overall backup status: failed - backupInfo.setState(BackupState.FAILED); - // compose the backup failed data - String backupFailedData = - "BackupId=" + backupInfo.getBackupId() + ",startts=" + backupInfo.getStartTs() - + ",failedts=" + backupInfo.getCompleteTs() + ",failedphase=" + backupInfo.getPhase() - + ",failedmessage=" + backupInfo.getFailedMsg(); - LOG.error(backupFailedData); - cleanupAndRestoreBackupSystem(conn, backupInfo, conf); - // If backup session is updated to FAILED state - means we - // processed recovery already. - backupManager.updateBackupInfo(backupInfo); - backupManager.finishBackupSession(); - LOG.error("Backup " + backupInfo.getBackupId() + " failed."); - } catch (IOException ee) { - LOG.error("Please run backup repair tool manually to restore backup system integrity"); - throw ee; - } - } - - public static void cleanupAndRestoreBackupSystem (Connection conn, BackupInfo backupInfo, - Configuration conf) throws IOException - { - BackupType type = backupInfo.getType(); - // if full backup, then delete HBase snapshots if there already are snapshots taken - // and also clean up export snapshot log files if exist - if (type == BackupType.FULL) { - deleteSnapshots(conn, backupInfo, conf); - cleanupExportSnapshotLog(conf); - } - BackupSystemTable.restoreFromSnapshot(conn); - BackupSystemTable.deleteSnapshot(conn); - // clean up the uncompleted data at target directory if the ongoing backup has already entered - // the copy phase - // For incremental backup, DistCp logs will be cleaned with the targetDir. - cleanupTargetDir(backupInfo, conf); - } - - - - /** - * Add manifest for the current backup. The manifest is stored within the table backup directory. - * @param backupInfo The current backup info - * @throws IOException exception - * @throws BackupException exception - */ - protected void addManifest(BackupInfo backupInfo, BackupManager backupManager, BackupType type, - Configuration conf) throws IOException, BackupException { - // set the overall backup phase : store manifest - backupInfo.setPhase(BackupPhase.STORE_MANIFEST); - - BackupManifest manifest; - - // Since we have each table's backup in its own directory structure, - // we'll store its manifest with the table directory. - for (TableName table : backupInfo.getTables()) { - manifest = new BackupManifest(backupInfo, table); - ArrayList<BackupImage> ancestors = backupManager.getAncestors(backupInfo, table); - for (BackupImage image : ancestors) { - manifest.addDependentImage(image); - } - - if (type == BackupType.INCREMENTAL) { - // We'll store the log timestamps for this table only in its manifest. - HashMap<TableName, HashMap<String, Long>> tableTimestampMap = - new HashMap<TableName, HashMap<String, Long>>(); - tableTimestampMap.put(table, backupInfo.getIncrTimestampMap().get(table)); - manifest.setIncrTimestampMap(tableTimestampMap); - ArrayList<BackupImage> ancestorss = backupManager.getAncestors(backupInfo); - for (BackupImage image : ancestorss) { - manifest.addDependentImage(image); - } - } - manifest.store(conf); - } - - // For incremental backup, we store a overall manifest in - // <backup-root-dir>/WALs/<backup-id> - // This is used when created the next incremental backup - if (type == BackupType.INCREMENTAL) { - manifest = new BackupManifest(backupInfo); - // set the table region server start and end timestamps for incremental backup - manifest.setIncrTimestampMap(backupInfo.getIncrTimestampMap()); - ArrayList<BackupImage> ancestors = backupManager.getAncestors(backupInfo); - for (BackupImage image : ancestors) { - manifest.addDependentImage(image); - } - manifest.store(conf); - } - } - - /** - * Get backup request meta data dir as string. - * @param backupInfo backup info - * @return meta data dir - */ - protected String obtainBackupMetaDataStr(BackupInfo backupInfo) { - StringBuffer sb = new StringBuffer(); - sb.append("type=" + backupInfo.getType() + ",tablelist="); - for (TableName table : backupInfo.getTables()) { - sb.append(table + ";"); - } - if (sb.lastIndexOf(";") > 0) { - sb.delete(sb.lastIndexOf(";"), sb.lastIndexOf(";") + 1); - } - sb.append(",targetRootDir=" + backupInfo.getBackupRootDir()); - - return sb.toString(); - } - - /** - * Clean up directories with prefix "_distcp_logs-", which are generated when DistCp copying - * hlogs. - * @throws IOException exception - */ - protected void cleanupDistCpLog(BackupInfo backupInfo, Configuration conf) throws IOException { - Path rootPath = new Path(backupInfo.getHLogTargetDir()).getParent(); - FileSystem fs = FileSystem.get(rootPath.toUri(), conf); - FileStatus[] files = FSUtils.listStatus(fs, rootPath); - if (files == null) { - return; - } - for (FileStatus file : files) { - if (file.getPath().getName().startsWith("_distcp_logs")) { - LOG.debug("Delete log files of DistCp: " + file.getPath().getName()); - FSUtils.delete(fs, file.getPath(), true); - } - } - } - - /** - * Complete the overall backup. - * @param backupInfo backup info - * @throws Exception exception - */ - protected void completeBackup(final Connection conn, BackupInfo backupInfo, - BackupManager backupManager, BackupType type, Configuration conf) throws IOException { - // set the complete timestamp of the overall backup - backupInfo.setCompleteTs(EnvironmentEdgeManager.currentTime()); - // set overall backup status: complete - backupInfo.setState(BackupState.COMPLETE); - backupInfo.setProgress(100); - // add and store the manifest for the backup - addManifest(backupInfo, backupManager, type, conf); - - // compose the backup complete data - String backupCompleteData = - obtainBackupMetaDataStr(backupInfo) + ",startts=" + backupInfo.getStartTs() - + ",completets=" + backupInfo.getCompleteTs() + ",bytescopied=" - + backupInfo.getTotalBytesCopied(); - if (LOG.isDebugEnabled()) { - LOG.debug("Backup " + backupInfo.getBackupId() + " finished: " + backupCompleteData); - } - - // when full backup is done: - // - delete HBase snapshot - // - clean up directories with prefix "exportSnapshot-", which are generated when exporting - // snapshots - if (type == BackupType.FULL) { - deleteSnapshots(conn, backupInfo, conf); - cleanupExportSnapshotLog(conf); - } else if (type == BackupType.INCREMENTAL) { - cleanupDistCpLog(backupInfo, conf); - } - BackupSystemTable.deleteSnapshot(conn); - backupManager.updateBackupInfo(backupInfo); - - // Finish active session - backupManager.finishBackupSession(); - - LOG.info("Backup " + backupInfo.getBackupId() + " completed."); - } - - /** - * Backup request execution - * @throws IOException - */ - public abstract void execute() throws IOException; - - @VisibleForTesting - protected Stage getTestStage() { - return Stage.valueOf("stage_"+ conf.getInt(BACKUP_TEST_MODE_STAGE, 0)); - } - - @VisibleForTesting - protected void failStageIf(Stage stage) throws IOException { - Stage current = getTestStage(); - if (current == stage) { - throw new IOException("Failed stage " + stage+" in testing"); - } - } - - public static enum Stage { - stage_0, stage_1, stage_2, stage_3, stage_4 - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/2dda3712/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyJob.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyJob.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyJob.java deleted file mode 100644 index 016d1a4..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyJob.java +++ /dev/null @@ -1,344 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.backup.mapreduce; - -import java.io.IOException; -import java.lang.reflect.Field; -import java.lang.reflect.Method; -import java.math.BigDecimal; -import java.util.Arrays; -import java.util.List; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.backup.BackupCopyJob; -import org.apache.hadoop.hbase.backup.BackupInfo; -import org.apache.hadoop.hbase.backup.BackupType; -import org.apache.hadoop.hbase.backup.impl.BackupManager; -import org.apache.hadoop.hbase.backup.util.BackupUtils; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.snapshot.ExportSnapshot; -import org.apache.hadoop.mapreduce.Cluster; -import org.apache.hadoop.mapreduce.Counters; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.JobID; -import org.apache.hadoop.tools.DistCp; -import org.apache.hadoop.tools.DistCpConstants; -import org.apache.hadoop.tools.DistCpOptions; -import org.apache.zookeeper.KeeperException.NoNodeException; - -/** - * Map-Reduce implementation of {@link BackupCopyJob}. Basically, there are 2 types of copy - * operation: one is copying from snapshot, which bases on extending ExportSnapshot's function, the - * other is copying for incremental log files, which bases on extending DistCp's function. - */ -@InterfaceAudience.Private -public class MapReduceBackupCopyJob implements BackupCopyJob { - private static final Log LOG = LogFactory.getLog(MapReduceBackupCopyJob.class); - - private Configuration conf; - - // Accumulated progress within the whole backup process for the copy operation - private float progressDone = 0.1f; - private long bytesCopied = 0; - private static float INIT_PROGRESS = 0.1f; - - // The percentage of the current copy task within the whole task if multiple time copies are - // needed. The default value is 100%, which means only 1 copy task for the whole. - private float subTaskPercntgInWholeTask = 1f; - - public MapReduceBackupCopyJob() { - } - - @Override - public Configuration getConf() { - return conf; - } - - @Override - public void setConf(Configuration conf) { - this.conf = conf; - } - - /** - * Get the current copy task percentage within the whole task if multiple copies are needed. - * @return the current copy task percentage - */ - public float getSubTaskPercntgInWholeTask() { - return subTaskPercntgInWholeTask; - } - - /** - * Set the current copy task percentage within the whole task if multiple copies are needed. Must - * be called before calling - * {@link #copy(BackupInfo, BackupManager, Configuration, BackupType, String[])} - * @param subTaskPercntgInWholeTask The percentage of the copy subtask - */ - public void setSubTaskPercntgInWholeTask(float subTaskPercntgInWholeTask) { - this.subTaskPercntgInWholeTask = subTaskPercntgInWholeTask; - } - - static class SnapshotCopy extends ExportSnapshot { - private BackupInfo backupInfo; - private TableName table; - - public SnapshotCopy(BackupInfo backupInfo, TableName table) { - super(); - this.backupInfo = backupInfo; - this.table = table; - } - - public TableName getTable() { - return this.table; - } - - public BackupInfo getBackupInfo() { - return this.backupInfo; - } - } - - /** - * Update the ongoing backup with new progress. - * @param backupInfo backup info - * @param newProgress progress - * @param bytesCopied bytes copied - * @throws NoNodeException exception - */ - static void updateProgress(BackupInfo backupInfo, BackupManager backupManager, - int newProgress, long bytesCopied) throws IOException { - // compose the new backup progress data, using fake number for now - String backupProgressData = newProgress + "%"; - - backupInfo.setProgress(newProgress); - backupManager.updateBackupInfo(backupInfo); - LOG.debug("Backup progress data \"" + backupProgressData - + "\" has been updated to backup system table for " + backupInfo.getBackupId()); - } - - /** - * Extends DistCp for progress updating to backup system table - * during backup. Using DistCpV2 (MAPREDUCE-2765). - * Simply extend it and override execute() method to get the - * Job reference for progress updating. - * Only the argument "src1, [src2, [...]] dst" is supported, - * no more DistCp options. - */ - class BackupDistCp extends DistCp { - - private BackupInfo backupInfo; - private BackupManager backupManager; - - public BackupDistCp(Configuration conf, DistCpOptions options, BackupInfo backupInfo, - BackupManager backupManager) throws Exception { - super(conf, options); - this.backupInfo = backupInfo; - this.backupManager = backupManager; - } - - @Override - public Job execute() throws Exception { - - // reflection preparation for private methods and fields - Class<?> classDistCp = org.apache.hadoop.tools.DistCp.class; - Method methodCreateMetaFolderPath = classDistCp.getDeclaredMethod("createMetaFolderPath"); - Method methodCreateJob = classDistCp.getDeclaredMethod("createJob"); - Method methodCreateInputFileListing = - classDistCp.getDeclaredMethod("createInputFileListing", Job.class); - Method methodCleanup = classDistCp.getDeclaredMethod("cleanup"); - - Field fieldInputOptions = classDistCp.getDeclaredField("inputOptions"); - Field fieldMetaFolder = classDistCp.getDeclaredField("metaFolder"); - Field fieldJobFS = classDistCp.getDeclaredField("jobFS"); - Field fieldSubmitted = classDistCp.getDeclaredField("submitted"); - - methodCreateMetaFolderPath.setAccessible(true); - methodCreateJob.setAccessible(true); - methodCreateInputFileListing.setAccessible(true); - methodCleanup.setAccessible(true); - - fieldInputOptions.setAccessible(true); - fieldMetaFolder.setAccessible(true); - fieldJobFS.setAccessible(true); - fieldSubmitted.setAccessible(true); - - // execute() logic starts here - assert fieldInputOptions.get(this) != null; - - Job job = null; - try { - synchronized (this) { - // Don't cleanup while we are setting up. - fieldMetaFolder.set(this, methodCreateMetaFolderPath.invoke(this)); - fieldJobFS.set(this, ((Path) fieldMetaFolder.get(this)).getFileSystem(super.getConf())); - job = (Job) methodCreateJob.invoke(this); - } - methodCreateInputFileListing.invoke(this, job); - - // Get the total length of the source files - List<Path> srcs = ((DistCpOptions) fieldInputOptions.get(this)).getSourcePaths(); - - long totalSrcLgth = 0; - for (Path aSrc : srcs) { - totalSrcLgth += - BackupUtils.getFilesLength(aSrc.getFileSystem(super.getConf()), aSrc); - } - - // submit the copy job - job.submit(); - fieldSubmitted.set(this, true); - - // after submit the MR job, set its handler in backup handler for cancel process - // this.backupHandler.copyJob = job; - - // Update the copy progress to ZK every 0.5s if progress value changed - int progressReportFreq = - MapReduceBackupCopyJob.this.getConf().getInt("hbase.backup.progressreport.frequency", - 500); - float lastProgress = progressDone; - while (!job.isComplete()) { - float newProgress = - progressDone + job.mapProgress() * subTaskPercntgInWholeTask * (1 - INIT_PROGRESS); - - if (newProgress > lastProgress) { - - BigDecimal progressData = - new BigDecimal(newProgress * 100).setScale(1, BigDecimal.ROUND_HALF_UP); - String newProgressStr = progressData + "%"; - LOG.info("Progress: " + newProgressStr); - updateProgress(backupInfo, backupManager, progressData.intValue(), bytesCopied); - LOG.debug("Backup progress data updated to backup system table: \"Progress: " - + newProgressStr + ".\""); - lastProgress = newProgress; - } - Thread.sleep(progressReportFreq); - } - // update the progress data after copy job complete - float newProgress = - progressDone + job.mapProgress() * subTaskPercntgInWholeTask * (1 - INIT_PROGRESS); - BigDecimal progressData = - new BigDecimal(newProgress * 100).setScale(1, BigDecimal.ROUND_HALF_UP); - - String newProgressStr = progressData + "%"; - LOG.info("Progress: " + newProgressStr + " subTask: " + subTaskPercntgInWholeTask - + " mapProgress: " + job.mapProgress()); - - // accumulate the overall backup progress - progressDone = newProgress; - bytesCopied += totalSrcLgth; - - updateProgress(backupInfo, backupManager, progressData.intValue(), bytesCopied); - LOG.debug("Backup progress data updated to backup system table: \"Progress: " - + newProgressStr + " - " + bytesCopied + " bytes copied.\""); - } catch (Throwable t) { - LOG.error("distcp " + job == null ? "" : job.getJobID() + " encountered error", t); - throw t; - } finally { - if (!fieldSubmitted.getBoolean(this)) { - methodCleanup.invoke(this); - } - } - - String jobID = job.getJobID().toString(); - job.getConfiguration().set(DistCpConstants.CONF_LABEL_DISTCP_JOB_ID, jobID); - - LOG.debug("DistCp job-id: " + jobID + " completed: " + job.isComplete() + " " - + job.isSuccessful()); - Counters ctrs = job.getCounters(); - LOG.debug(ctrs); - if (job.isComplete() && !job.isSuccessful()) { - throw new Exception("DistCp job-id: " + jobID + " failed"); - } - - return job; - } - - } - - /** - * Do backup copy based on different types. - * @param context The backup info - * @param conf The hadoop configuration - * @param copyType The backup copy type - * @param options Options for customized ExportSnapshot or DistCp - * @throws Exception exception - */ - @Override - public int copy(BackupInfo context, BackupManager backupManager, Configuration conf, - BackupType copyType, String[] options) throws IOException { - int res = 0; - - try { - if (copyType == BackupType.FULL) { - SnapshotCopy snapshotCp = new SnapshotCopy(context, context.getTableBySnapshot(options[1])); - LOG.debug("Doing SNAPSHOT_COPY"); - // Make a new instance of conf to be used by the snapshot copy class. - snapshotCp.setConf(new Configuration(conf)); - res = snapshotCp.run(options); - - } else if (copyType == BackupType.INCREMENTAL) { - LOG.debug("Doing COPY_TYPE_DISTCP"); - setSubTaskPercntgInWholeTask(1f); - - BackupDistCp distcp = - new BackupDistCp(new Configuration(conf), null, context, backupManager); - // Handle a special case where the source file is a single file. - // In this case, distcp will not create the target dir. It just take the - // target as a file name and copy source file to the target (as a file name). - // We need to create the target dir before run distcp. - LOG.debug("DistCp options: " + Arrays.toString(options)); - Path dest = new Path(options[options.length - 1]); - FileSystem destfs = dest.getFileSystem(conf); - if (!destfs.exists(dest)) { - destfs.mkdirs(dest); - } - res = distcp.run(options); - } - return res; - - } catch (Exception e) { - throw new IOException(e); - } - } - - @Override - public void cancel(String jobId) throws IOException { - JobID id = JobID.forName(jobId); - Cluster cluster = new Cluster(this.getConf()); - try { - Job job = cluster.getJob(id); - if (job == null) { - LOG.error("No job found for " + id); - // should we throw exception - return; - } - if (job.isComplete() || job.isRetired()) { - return; - } - - job.killJob(); - LOG.debug("Killed copy job " + id); - } catch (InterruptedException e) { - throw new IOException(e); - } - } - -} http://git-wip-us.apache.org/repos/asf/hbase/blob/2dda3712/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupMergeJob.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupMergeJob.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupMergeJob.java deleted file mode 100644 index 00c5b83..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupMergeJob.java +++ /dev/null @@ -1,321 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.backup.mapreduce; - -import static org.apache.hadoop.hbase.backup.util.BackupUtils.succeeded; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - -import org.apache.commons.lang.StringUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.backup.BackupInfo; -import org.apache.hadoop.hbase.backup.BackupMergeJob; -import org.apache.hadoop.hbase.backup.BackupRestoreConstants; -import org.apache.hadoop.hbase.backup.HBackupFileSystem; -import org.apache.hadoop.hbase.backup.impl.BackupManifest; -import org.apache.hadoop.hbase.backup.impl.BackupSystemTable; -import org.apache.hadoop.hbase.backup.util.BackupUtils; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.util.Tool; - -/** - * MapReduce implementation of {@link BackupMergeJob} - * Must be initialized with configuration of a backup destination cluster - * - */ - -@InterfaceAudience.Private -public class MapReduceBackupMergeJob implements BackupMergeJob { - public static final Log LOG = LogFactory.getLog(MapReduceBackupMergeJob.class); - - protected Tool player; - protected Configuration conf; - - public MapReduceBackupMergeJob() { - } - - @Override - public Configuration getConf() { - return conf; - } - - @Override - public void setConf(Configuration conf) { - this.conf = conf; - } - - @Override - public void run(String[] backupIds) throws IOException { - String bulkOutputConfKey; - - // TODO : run player on remote cluster - player = new MapReduceHFileSplitterJob(); - bulkOutputConfKey = MapReduceHFileSplitterJob.BULK_OUTPUT_CONF_KEY; - // Player reads all files in arbitrary directory structure and creates - // a Map task for each file - String bids = StringUtils.join(backupIds, ","); - - if (LOG.isDebugEnabled()) { - LOG.debug("Merge backup images " + bids); - } - - List<Pair<TableName, Path>> processedTableList = new ArrayList<Pair<TableName, Path>>(); - boolean finishedTables = false; - Connection conn = ConnectionFactory.createConnection(getConf()); - BackupSystemTable table = new BackupSystemTable(conn); - FileSystem fs = FileSystem.get(getConf()); - - try { - - // Get exclusive lock on backup system - table.startBackupExclusiveOperation(); - // Start merge operation - table.startMergeOperation(backupIds); - - // Select most recent backup id - String mergedBackupId = findMostRecentBackupId(backupIds); - - TableName[] tableNames = getTableNamesInBackupImages(backupIds); - String backupRoot = null; - - BackupInfo bInfo = table.readBackupInfo(backupIds[0]); - backupRoot = bInfo.getBackupRootDir(); - - for (int i = 0; i < tableNames.length; i++) { - - LOG.info("Merge backup images for " + tableNames[i]); - - // Find input directories for table - - Path[] dirPaths = findInputDirectories(fs, backupRoot, tableNames[i], backupIds); - String dirs = StringUtils.join(dirPaths, ","); - Path bulkOutputPath = - BackupUtils.getBulkOutputDir(BackupUtils.getFileNameCompatibleString(tableNames[i]), - getConf(), false); - // Delete content if exists - if (fs.exists(bulkOutputPath)) { - if (!fs.delete(bulkOutputPath, true)) { - LOG.warn("Can not delete: " + bulkOutputPath); - } - } - Configuration conf = getConf(); - conf.set(bulkOutputConfKey, bulkOutputPath.toString()); - String[] playerArgs = { dirs, tableNames[i].getNameAsString() }; - - int result = 0; - - player.setConf(getConf()); - result = player.run(playerArgs); - if (!succeeded(result)) { - throw new IOException("Can not merge backup images for " + dirs - + " (check Hadoop/MR and HBase logs). Player return code =" + result); - } - // Add to processed table list - processedTableList.add(new Pair<TableName, Path>(tableNames[i], bulkOutputPath)); - LOG.debug("Merge Job finished:" + result); - } - List<TableName> tableList = toTableNameList(processedTableList); - table.updateProcessedTablesForMerge(tableList); - finishedTables = true; - - // Move data - for (Pair<TableName, Path> tn : processedTableList) { - moveData(fs, backupRoot, tn.getSecond(), tn.getFirst(), mergedBackupId); - } - - // Delete old data and update manifest - List<String> backupsToDelete = getBackupIdsToDelete(backupIds, mergedBackupId); - deleteBackupImages(backupsToDelete, conn, fs, backupRoot); - updateBackupManifest(backupRoot, mergedBackupId, backupsToDelete); - // Finish merge session - table.finishMergeOperation(); - // Release lock - table.finishBackupExclusiveOperation(); - } catch (RuntimeException e) { - - throw e; - } catch (Exception e) { - LOG.error(e); - if (!finishedTables) { - // cleanup bulk directories and finish merge - // merge MUST be repeated (no need for repair) - cleanupBulkLoadDirs(fs, toPathList(processedTableList)); - table.finishMergeOperation(); - table.finishBackupExclusiveOperation(); - throw new IOException("Backup merge operation failed, you should try it again", e); - } else { - // backup repair must be run - throw new IOException( - "Backup merge operation failed, run backup repair tool to restore system's integrity", - e); - } - } finally { - table.close(); - conn.close(); - } - } - - protected List<Path> toPathList(List<Pair<TableName, Path>> processedTableList) { - ArrayList<Path> list = new ArrayList<Path>(); - for (Pair<TableName, Path> p : processedTableList) { - list.add(p.getSecond()); - } - return list; - } - - protected List<TableName> toTableNameList(List<Pair<TableName, Path>> processedTableList) { - ArrayList<TableName> list = new ArrayList<TableName>(); - for (Pair<TableName, Path> p : processedTableList) { - list.add(p.getFirst()); - } - return list; - } - - protected void cleanupBulkLoadDirs(FileSystem fs, List<Path> pathList) throws IOException { - for (Path path : pathList) { - - if (!fs.delete(path, true)) { - LOG.warn("Can't delete " + path); - } - } - } - - protected void updateBackupManifest(String backupRoot, String mergedBackupId, - List<String> backupsToDelete) throws IllegalArgumentException, IOException { - - BackupManifest manifest = - HBackupFileSystem.getManifest(conf, new Path(backupRoot), mergedBackupId); - manifest.getBackupImage().removeAncestors(backupsToDelete); - // save back - manifest.store(conf); - - } - - protected void deleteBackupImages(List<String> backupIds, Connection conn, FileSystem fs, - String backupRoot) throws IOException { - - // Delete from backup system table - try (BackupSystemTable table = new BackupSystemTable(conn);) { - for (String backupId : backupIds) { - table.deleteBackupInfo(backupId); - } - } - - // Delete from file system - for (String backupId : backupIds) { - Path backupDirPath = HBackupFileSystem.getBackupPath(backupRoot, backupId); - - if (!fs.delete(backupDirPath, true)) { - LOG.warn("Could not delete " + backupDirPath); - } - } - } - - protected List<String> getBackupIdsToDelete(String[] backupIds, String mergedBackupId) { - List<String> list = new ArrayList<String>(); - for (String id : backupIds) { - if (id.equals(mergedBackupId)) { - continue; - } - list.add(id); - } - return list; - } - - protected void moveData(FileSystem fs, String backupRoot, Path bulkOutputPath, TableName tableName, - String mergedBackupId) throws IllegalArgumentException, IOException { - - Path dest = - new Path(HBackupFileSystem.getTableBackupDataDir(backupRoot, mergedBackupId, tableName)); - - // Delete all in dest - if (!fs.delete(dest, true)) { - throw new IOException("Could not delete " + dest); - } - - FileStatus[] fsts = fs.listStatus(bulkOutputPath); - for (FileStatus fst : fsts) { - if (fst.isDirectory()) { - fs.rename(fst.getPath().getParent(), dest); - } - } - - } - - protected String findMostRecentBackupId(String[] backupIds) { - long recentTimestamp = Long.MIN_VALUE; - for (String backupId : backupIds) { - long ts = Long.parseLong(backupId.split("_")[1]); - if (ts > recentTimestamp) { - recentTimestamp = ts; - } - } - return BackupRestoreConstants.BACKUPID_PREFIX + recentTimestamp; - } - - protected TableName[] getTableNamesInBackupImages(String[] backupIds) throws IOException { - - Set<TableName> allSet = new HashSet<TableName>(); - - try (Connection conn = ConnectionFactory.createConnection(conf); - BackupSystemTable table = new BackupSystemTable(conn);) { - for (String backupId : backupIds) { - BackupInfo bInfo = table.readBackupInfo(backupId); - - allSet.addAll(bInfo.getTableNames()); - } - } - - TableName[] ret = new TableName[allSet.size()]; - return allSet.toArray(ret); - } - - protected Path[] findInputDirectories(FileSystem fs, String backupRoot, TableName tableName, - String[] backupIds) throws IOException { - - List<Path> dirs = new ArrayList<Path>(); - - for (String backupId : backupIds) { - Path fileBackupDirPath = - new Path(HBackupFileSystem.getTableBackupDataDir(backupRoot, backupId, tableName)); - if (fs.exists(fileBackupDirPath)) { - dirs.add(fileBackupDirPath); - } else { - if (LOG.isTraceEnabled()) { - LOG.trace("File: " + fileBackupDirPath + " does not exist."); - } - } - } - Path[] ret = new Path[dirs.size()]; - return dirs.toArray(ret); - } - -}