http://git-wip-us.apache.org/repos/asf/hbase/blob/2dda3712/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java ---------------------------------------------------------------------- diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java new file mode 100644 index 0000000..4dab046 --- /dev/null +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java @@ -0,0 +1,2051 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.backup.impl; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; + +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.NamespaceDescriptor; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.backup.BackupInfo; +import org.apache.hadoop.hbase.backup.BackupInfo.BackupState; +import org.apache.hadoop.hbase.backup.BackupRestoreConstants; +import org.apache.hadoop.hbase.backup.BackupType; +import org.apache.hadoop.hbase.backup.util.BackupUtils; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.SnapshotDescription; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.shaded.protobuf.generated.BackupProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.Pair; + +/** + * This class provides API to access backup system table<br> + * + * Backup system table schema:<br> + * <p><ul> + * <li>1. Backup sessions rowkey= "session:"+backupId; value =serialized BackupInfo</li> + * <li>2. Backup start code rowkey = "startcode:"+backupRoot; value = startcode</li> + * <li>3. Incremental backup set rowkey="incrbackupset:"+backupRoot; value=[list of tables]</li> + * <li>4. Table-RS-timestamp map rowkey="trslm:"+backupRoot+table_name; + * value = map[RS-> last WAL timestamp]</li> + * <li>5. RS - WAL ts map rowkey="rslogts:"+backupRoot +server; value = last WAL timestamp</li> + * <li>6. WALs recorded rowkey="wals:"+WAL unique file name; + * value = backupId and full WAL file name</li> + * </ul></p> + */ + +@InterfaceAudience.Private +public final class BackupSystemTable implements Closeable { + private static final Log LOG = LogFactory.getLog(BackupSystemTable.class); + + static class WALItem { + String backupId; + String walFile; + String backupRoot; + + WALItem(String backupId, String walFile, String backupRoot) { + this.backupId = backupId; + this.walFile = walFile; + this.backupRoot = backupRoot; + } + + public String getBackupId() { + return backupId; + } + + public String getWalFile() { + return walFile; + } + + public String getBackupRoot() { + return backupRoot; + } + + @Override + public String toString() { + return Path.SEPARATOR + backupRoot + Path.SEPARATOR + backupId + Path.SEPARATOR + walFile; + } + + } + + private TableName tableName; + /** + * Stores backup sessions (contexts) + */ + final static byte[] SESSIONS_FAMILY = "session".getBytes(); + /** + * Stores other meta + */ + final static byte[] META_FAMILY = "meta".getBytes(); + final static byte[] BULK_LOAD_FAMILY = "bulk".getBytes(); + /** + * Connection to HBase cluster, shared among all instances + */ + private final Connection connection; + + private final static String BACKUP_INFO_PREFIX = "session:"; + private final static String START_CODE_ROW = "startcode:"; + private final static byte[] ACTIVE_SESSION_ROW = "activesession:".getBytes(); + private final static byte[] ACTIVE_SESSION_COL = "c".getBytes(); + + private final static byte[] ACTIVE_SESSION_YES = "yes".getBytes(); + private final static byte[] ACTIVE_SESSION_NO = "no".getBytes(); + + private final static String INCR_BACKUP_SET = "incrbackupset:"; + private final static String TABLE_RS_LOG_MAP_PREFIX = "trslm:"; + private final static String RS_LOG_TS_PREFIX = "rslogts:"; + + private final static String BULK_LOAD_PREFIX = "bulk:"; + private final static byte[] BULK_LOAD_PREFIX_BYTES = BULK_LOAD_PREFIX.getBytes(); + private final static byte[] DELETE_OP_ROW = "delete_op_row".getBytes(); + private final static byte[] MERGE_OP_ROW = "merge_op_row".getBytes(); + + final static byte[] TBL_COL = Bytes.toBytes("tbl"); + final static byte[] FAM_COL = Bytes.toBytes("fam"); + final static byte[] PATH_COL = Bytes.toBytes("path"); + final static byte[] STATE_COL = Bytes.toBytes("state"); + // the two states a bulk loaded file can be + final static byte[] BL_PREPARE = Bytes.toBytes("R"); + final static byte[] BL_COMMIT = Bytes.toBytes("D"); + + private final static String WALS_PREFIX = "wals:"; + private final static String SET_KEY_PREFIX = "backupset:"; + + // separator between BULK_LOAD_PREFIX and ordinals + protected final static String BLK_LD_DELIM = ":"; + private final static byte[] EMPTY_VALUE = new byte[] {}; + + // Safe delimiter in a string + private final static String NULL = "\u0000"; + + public BackupSystemTable(Connection conn) throws IOException { + this.connection = conn; + tableName = BackupSystemTable.getTableName(conn.getConfiguration()); + checkSystemTable(); + } + + private void checkSystemTable() throws IOException { + try (Admin admin = connection.getAdmin();) { + + verifyNamespaceExists(admin); + + if (!admin.tableExists(tableName)) { + HTableDescriptor backupHTD = + BackupSystemTable.getSystemTableDescriptor(connection.getConfiguration()); + admin.createTable(backupHTD); + } + waitForSystemTable(admin); + } + } + + private void verifyNamespaceExists(Admin admin) throws IOException { + String namespaceName = tableName.getNamespaceAsString(); + NamespaceDescriptor ns = NamespaceDescriptor.create(namespaceName).build(); + NamespaceDescriptor[] list = admin.listNamespaceDescriptors(); + boolean exists = false; + for (NamespaceDescriptor nsd : list) { + if (nsd.getName().equals(ns.getName())) { + exists = true; + break; + } + } + if (!exists) { + admin.createNamespace(ns); + } + } + + private void waitForSystemTable(Admin admin) throws IOException { + long TIMEOUT = 60000; + long startTime = EnvironmentEdgeManager.currentTime(); + while (!admin.tableExists(tableName) || !admin.isTableAvailable(tableName)) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + } + if (EnvironmentEdgeManager.currentTime() - startTime > TIMEOUT) { + throw new IOException("Failed to create backup system table after " + TIMEOUT + "ms"); + } + } + LOG.debug("Backup table exists and available"); + + } + + @Override + public void close() { + // do nothing + } + + /** + * Updates status (state) of a backup session in backup system table table + * @param info backup info + * @throws IOException exception + */ + public void updateBackupInfo(BackupInfo info) throws IOException { + + if (LOG.isTraceEnabled()) { + LOG.trace("update backup status in backup system table for: " + info.getBackupId() + + " set status=" + info.getState()); + } + try (Table table = connection.getTable(tableName)) { + Put put = createPutForBackupInfo(info); + table.put(put); + } + } + + /* + * @param backupId the backup Id + * @return Map of rows to path of bulk loaded hfile + */ + Map<byte[], String> readBulkLoadedFiles(String backupId) throws IOException { + Scan scan = BackupSystemTable.createScanForBulkLoadedFiles(backupId); + try (Table table = connection.getTable(tableName); + ResultScanner scanner = table.getScanner(scan)) { + Result res = null; + Map<byte[], String> map = new TreeMap<>(Bytes.BYTES_COMPARATOR); + while ((res = scanner.next()) != null) { + res.advance(); + byte[] row = CellUtil.cloneRow(res.listCells().get(0)); + for (Cell cell : res.listCells()) { + if (CellComparator.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0, + BackupSystemTable.PATH_COL.length) == 0) { + map.put(row, Bytes.toString(CellUtil.cloneValue(cell))); + } + } + } + return map; + } + } + + /* + * Used during restore + * @param backupId the backup Id + * @param sTableList List of tables + * @return array of Map of family to List of Paths + */ + public Map<byte[], List<Path>>[] readBulkLoadedFiles(String backupId, List<TableName> sTableList) + throws IOException { + Scan scan = BackupSystemTable.createScanForBulkLoadedFiles(backupId); + Map<byte[], List<Path>>[] mapForSrc = new Map[sTableList == null ? 1 : sTableList.size()]; + try (Table table = connection.getTable(tableName); + ResultScanner scanner = table.getScanner(scan)) { + Result res = null; + while ((res = scanner.next()) != null) { + res.advance(); + TableName tbl = null; + byte[] fam = null; + String path = null; + for (Cell cell : res.listCells()) { + if (CellComparator.compareQualifiers(cell, BackupSystemTable.TBL_COL, 0, + BackupSystemTable.TBL_COL.length) == 0) { + tbl = TableName.valueOf(CellUtil.cloneValue(cell)); + } else if (CellComparator.compareQualifiers(cell, BackupSystemTable.FAM_COL, 0, + BackupSystemTable.FAM_COL.length) == 0) { + fam = CellUtil.cloneValue(cell); + } else if (CellComparator.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0, + BackupSystemTable.PATH_COL.length) == 0) { + path = Bytes.toString(CellUtil.cloneValue(cell)); + } + } + int srcIdx = IncrementalTableBackupClient.getIndex(tbl, sTableList); + if (srcIdx == -1) { + // the table is not among the query + continue; + } + if (mapForSrc[srcIdx] == null) { + mapForSrc[srcIdx] = new TreeMap<>(Bytes.BYTES_COMPARATOR); + } + List<Path> files; + if (!mapForSrc[srcIdx].containsKey(fam)) { + files = new ArrayList<Path>(); + mapForSrc[srcIdx].put(fam, files); + } else { + files = mapForSrc[srcIdx].get(fam); + } + files.add(new Path(path)); + if (LOG.isDebugEnabled()) { + LOG.debug("found bulk loaded file : " + tbl + " " + Bytes.toString(fam) + " " + path); + } + } + ; + return mapForSrc; + } + } + + /* + * @param map Map of row keys to path of bulk loaded hfile + */ + void deleteBulkLoadedFiles(Map<byte[], String> map) throws IOException { + try (Table table = connection.getTable(tableName)) { + List<Delete> dels = new ArrayList<>(); + for (byte[] row : map.keySet()) { + dels.add(new Delete(row).addFamily(BackupSystemTable.META_FAMILY)); + } + table.delete(dels); + } + } + + /** + * Deletes backup status from backup system table table + * @param backupId backup id + * @throws IOException exception + */ + + public void deleteBackupInfo(String backupId) throws IOException { + + if (LOG.isTraceEnabled()) { + LOG.trace("delete backup status in backup system table for " + backupId); + } + try (Table table = connection.getTable(tableName)) { + Delete del = createDeleteForBackupInfo(backupId); + table.delete(del); + } + } + + /* + * For postBulkLoadHFile() hook. + * @param tabName table name + * @param region the region receiving hfile + * @param finalPaths family and associated hfiles + */ + public void writePathsPostBulkLoad(TableName tabName, byte[] region, + Map<byte[], List<Path>> finalPaths) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("write bulk load descriptor to backup " + tabName + " with " + finalPaths.size() + + " entries"); + } + try (Table table = connection.getTable(tableName)) { + List<Put> puts = BackupSystemTable.createPutForCommittedBulkload(tabName, region, finalPaths); + table.put(puts); + LOG.debug("written " + puts.size() + " rows for bulk load of " + tabName); + } + } + + /* + * For preCommitStoreFile() hook + * @param tabName table name + * @param region the region receiving hfile + * @param family column family + * @param pairs list of paths for hfiles + */ + public void writeFilesForBulkLoadPreCommit(TableName tabName, byte[] region, final byte[] family, + final List<Pair<Path, Path>> pairs) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("write bulk load descriptor to backup " + tabName + " with " + pairs.size() + + " entries"); + } + try (Table table = connection.getTable(tableName)) { + List<Put> puts = + BackupSystemTable.createPutForPreparedBulkload(tabName, region, family, pairs); + table.put(puts); + LOG.debug("written " + puts.size() + " rows for bulk load of " + tabName); + } + } + + /* + * Removes rows recording bulk loaded hfiles from backup table + * @param lst list of table names + * @param rows the rows to be deleted + */ + public void removeBulkLoadedRows(List<TableName> lst, List<byte[]> rows) throws IOException { + try (Table table = connection.getTable(tableName)) { + List<Delete> lstDels = new ArrayList<>(); + for (byte[] row : rows) { + Delete del = new Delete(row); + lstDels.add(del); + LOG.debug("orig deleting the row: " + Bytes.toString(row)); + } + table.delete(lstDels); + LOG.debug("deleted " + rows.size() + " original bulkload rows for " + lst.size() + " tables"); + } + } + + /* + * Reads the rows from backup table recording bulk loaded hfiles + * @param tableList list of table names + * @return The keys of the Map are table, region and column family. Value of the map reflects + * whether the hfile was recorded by preCommitStoreFile hook (true) + */ + public Pair<Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>>, List<byte[]>> + readBulkloadRows(List<TableName> tableList) throws IOException { + Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>> map = new HashMap<>(); + List<byte[]> rows = new ArrayList<>(); + for (TableName tTable : tableList) { + Scan scan = BackupSystemTable.createScanForOrigBulkLoadedFiles(tTable); + Map<String, Map<String, List<Pair<String, Boolean>>>> tblMap = map.get(tTable); + try (Table table = connection.getTable(tableName); + ResultScanner scanner = table.getScanner(scan)) { + Result res = null; + while ((res = scanner.next()) != null) { + res.advance(); + String fam = null; + String path = null; + boolean raw = false; + byte[] row = null; + String region = null; + for (Cell cell : res.listCells()) { + row = CellUtil.cloneRow(cell); + rows.add(row); + String rowStr = Bytes.toString(row); + region = BackupSystemTable.getRegionNameFromOrigBulkLoadRow(rowStr); + if (CellComparator.compareQualifiers(cell, BackupSystemTable.FAM_COL, 0, + BackupSystemTable.FAM_COL.length) == 0) { + fam = Bytes.toString(CellUtil.cloneValue(cell)); + } else if (CellComparator.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0, + BackupSystemTable.PATH_COL.length) == 0) { + path = Bytes.toString(CellUtil.cloneValue(cell)); + } else if (CellComparator.compareQualifiers(cell, BackupSystemTable.STATE_COL, 0, + BackupSystemTable.STATE_COL.length) == 0) { + byte[] state = CellUtil.cloneValue(cell); + if (Bytes.equals(BackupSystemTable.BL_PREPARE, state)) { + raw = true; + } else raw = false; + } + } + if (map.get(tTable) == null) { + map.put(tTable, new HashMap<String, Map<String, List<Pair<String, Boolean>>>>()); + tblMap = map.get(tTable); + } + if (tblMap.get(region) == null) { + tblMap.put(region, new HashMap<String, List<Pair<String, Boolean>>>()); + } + Map<String, List<Pair<String, Boolean>>> famMap = tblMap.get(region); + if (famMap.get(fam) == null) { + famMap.put(fam, new ArrayList<Pair<String, Boolean>>()); + } + famMap.get(fam).add(new Pair<>(path, raw)); + LOG.debug("found orig " + path + " for " + fam + " of table " + region); + } + } + } + return new Pair<>(map, rows); + } + + /* + * @param sTableList List of tables + * @param maps array of Map of family to List of Paths + * @param backupId the backup Id + */ + public void writeBulkLoadedFiles(List<TableName> sTableList, Map<byte[], List<Path>>[] maps, + String backupId) throws IOException { + try (Table table = connection.getTable(tableName)) { + long ts = EnvironmentEdgeManager.currentTime(); + int cnt = 0; + List<Put> puts = new ArrayList<>(); + for (int idx = 0; idx < maps.length; idx++) { + Map<byte[], List<Path>> map = maps[idx]; + TableName tn = sTableList.get(idx); + if (map == null) continue; + for (Map.Entry<byte[], List<Path>> entry : map.entrySet()) { + byte[] fam = entry.getKey(); + List<Path> paths = entry.getValue(); + for (Path p : paths) { + Put put = + BackupSystemTable.createPutForBulkLoadedFile(tn, fam, p.toString(), backupId, ts, + cnt++); + puts.add(put); + } + } + } + if (!puts.isEmpty()) { + table.put(puts); + } + } + } + + /** + * Reads backup status object (instance of backup info) from backup system table table + * @param backupId backup id + * @return Current status of backup session or null + */ + + public BackupInfo readBackupInfo(String backupId) throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace("read backup status from backup system table for: " + backupId); + } + + try (Table table = connection.getTable(tableName)) { + Get get = createGetForBackupInfo(backupId); + Result res = table.get(get); + if (res.isEmpty()) { + return null; + } + return resultToBackupInfo(res); + } + } + + /** + * Read the last backup start code (timestamp) of last successful backup. Will return null if + * there is no start code stored on hbase or the value is of length 0. These two cases indicate + * there is no successful backup completed so far. + * @param backupRoot directory path to backup destination + * @return the timestamp of last successful backup + * @throws IOException exception + */ + public String readBackupStartCode(String backupRoot) throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace("read backup start code from backup system table"); + } + try (Table table = connection.getTable(tableName)) { + Get get = createGetForStartCode(backupRoot); + Result res = table.get(get); + if (res.isEmpty()) { + return null; + } + Cell cell = res.listCells().get(0); + byte[] val = CellUtil.cloneValue(cell); + if (val.length == 0) { + return null; + } + return new String(val); + } + } + + /** + * Write the start code (timestamp) to backup system table. If passed in null, then write 0 byte. + * @param startCode start code + * @param backupRoot root directory path to backup + * @throws IOException exception + */ + public void writeBackupStartCode(Long startCode, String backupRoot) throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace("write backup start code to backup system table " + startCode); + } + try (Table table = connection.getTable(tableName)) { + Put put = createPutForStartCode(startCode.toString(), backupRoot); + table.put(put); + } + } + + /** + * Exclusive operations are: + * create, delete, merge + * @throws IOException + */ + public void startBackupExclusiveOperation() throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Start new backup exclusive operation"); + } + try (Table table = connection.getTable(tableName)) { + Put put = createPutForStartBackupSession(); + // First try to put if row does not exist + if (!table.checkAndPut(ACTIVE_SESSION_ROW, SESSIONS_FAMILY, ACTIVE_SESSION_COL, null, put)) { + // Row exists, try to put if value == ACTIVE_SESSION_NO + if (!table.checkAndPut(ACTIVE_SESSION_ROW, SESSIONS_FAMILY, ACTIVE_SESSION_COL, + ACTIVE_SESSION_NO, put)) { + throw new IOException("There is an active backup exclusive operation"); + } + } + } + } + + private Put createPutForStartBackupSession() { + Put put = new Put(ACTIVE_SESSION_ROW); + put.addColumn(SESSIONS_FAMILY, ACTIVE_SESSION_COL, ACTIVE_SESSION_YES); + return put; + } + + public void finishBackupExclusiveOperation() throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Finish backup exclusive operation"); + } + try (Table table = connection.getTable(tableName)) { + Put put = createPutForStopBackupSession(); + if (!table.checkAndPut(ACTIVE_SESSION_ROW, SESSIONS_FAMILY, ACTIVE_SESSION_COL, + ACTIVE_SESSION_YES, put)) { + throw new IOException("There is no active backup exclusive operation"); + } + } + } + + private Put createPutForStopBackupSession() { + Put put = new Put(ACTIVE_SESSION_ROW); + put.addColumn(SESSIONS_FAMILY, ACTIVE_SESSION_COL, ACTIVE_SESSION_NO); + return put; + } + + /** + * Get the Region Servers log information after the last log roll from backup system table. + * @param backupRoot root directory path to backup + * @return RS log info + * @throws IOException exception + */ + public HashMap<String, Long> readRegionServerLastLogRollResult(String backupRoot) + throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace("read region server last roll log result to backup system table"); + } + + Scan scan = createScanForReadRegionServerLastLogRollResult(backupRoot); + + try (Table table = connection.getTable(tableName); + ResultScanner scanner = table.getScanner(scan)) { + Result res = null; + HashMap<String, Long> rsTimestampMap = new HashMap<String, Long>(); + while ((res = scanner.next()) != null) { + res.advance(); + Cell cell = res.current(); + byte[] row = CellUtil.cloneRow(cell); + String server = getServerNameForReadRegionServerLastLogRollResult(row); + byte[] data = CellUtil.cloneValue(cell); + rsTimestampMap.put(server, Bytes.toLong(data)); + } + return rsTimestampMap; + } + } + + /** + * Writes Region Server last roll log result (timestamp) to backup system table table + * @param server Region Server name + * @param ts last log timestamp + * @param backupRoot root directory path to backup + * @throws IOException exception + */ + public void writeRegionServerLastLogRollResult(String server, Long ts, String backupRoot) + throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace("write region server last roll log result to backup system table"); + } + try (Table table = connection.getTable(tableName)) { + Put put = createPutForRegionServerLastLogRollResult(server, ts, backupRoot); + table.put(put); + } + } + + /** + * Get all completed backup information (in desc order by time) + * @param onlyCompleted true, if only successfully completed sessions + * @return history info of BackupCompleteData + * @throws IOException exception + */ + public ArrayList<BackupInfo> getBackupHistory(boolean onlyCompleted) throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace("get backup history from backup system table"); + } + ArrayList<BackupInfo> list; + BackupState state = onlyCompleted ? BackupState.COMPLETE : BackupState.ANY; + list = getBackupInfos(state); + return BackupUtils.sortHistoryListDesc(list); + } + + /** + * Get all backups history + * @return list of backup info + * @throws IOException + */ + public List<BackupInfo> getBackupHistory() throws IOException { + return getBackupHistory(false); + } + + /** + * Get first n backup history records + * @param n number of records, if n== -1 - max number + * is ignored + * @return list of records + * @throws IOException + */ + public List<BackupInfo> getHistory(int n) throws IOException { + + List<BackupInfo> history = getBackupHistory(); + if (n == -1 || history.size() <= n) return history; + List<BackupInfo> list = new ArrayList<BackupInfo>(); + for (int i = 0; i < n; i++) { + list.add(history.get(i)); + } + return list; + + } + + /** + * Get backup history records filtered by list of filters. + * @param n max number of records, if n == -1 , then max number + * is ignored + * @param filters list of filters + * @return backup records + * @throws IOException + */ + public List<BackupInfo> getBackupHistory(int n, BackupInfo.Filter... filters) throws IOException { + if (filters.length == 0) return getHistory(n); + + List<BackupInfo> history = getBackupHistory(); + List<BackupInfo> result = new ArrayList<BackupInfo>(); + for (BackupInfo bi : history) { + if (n >= 0 && result.size() == n) break; + boolean passed = true; + for (int i = 0; i < filters.length; i++) { + if (!filters[i].apply(bi)) { + passed = false; + break; + } + } + if (passed) { + result.add(bi); + } + } + return result; + + } + + /* + * Retrieve TableName's for completed backup of given type + * @param type backup type + * @return List of table names + */ + public List<TableName> getTablesForBackupType(BackupType type) throws IOException { + Set<TableName> names = new HashSet<>(); + List<BackupInfo> infos = getBackupHistory(true); + for (BackupInfo info : infos) { + if (info.getType() != type) continue; + names.addAll(info.getTableNames()); + } + return new ArrayList(names); + } + + /** + * Get history for backup destination + * @param backupRoot backup destination path + * @return List of backup info + * @throws IOException + */ + public List<BackupInfo> getBackupHistory(String backupRoot) throws IOException { + ArrayList<BackupInfo> history = getBackupHistory(false); + for (Iterator<BackupInfo> iterator = history.iterator(); iterator.hasNext();) { + BackupInfo info = iterator.next(); + if (!backupRoot.equals(info.getBackupRootDir())) { + iterator.remove(); + } + } + return history; + } + + /** + * Get history for a table + * @param name table name + * @return history for a table + * @throws IOException + */ + public List<BackupInfo> getBackupHistoryForTable(TableName name) throws IOException { + List<BackupInfo> history = getBackupHistory(); + List<BackupInfo> tableHistory = new ArrayList<BackupInfo>(); + for (BackupInfo info : history) { + List<TableName> tables = info.getTableNames(); + if (tables.contains(name)) { + tableHistory.add(info); + } + } + return tableHistory; + } + + public Map<TableName, ArrayList<BackupInfo>> getBackupHistoryForTableSet(Set<TableName> set, + String backupRoot) throws IOException { + List<BackupInfo> history = getBackupHistory(backupRoot); + Map<TableName, ArrayList<BackupInfo>> tableHistoryMap = + new HashMap<TableName, ArrayList<BackupInfo>>(); + for (Iterator<BackupInfo> iterator = history.iterator(); iterator.hasNext();) { + BackupInfo info = iterator.next(); + if (!backupRoot.equals(info.getBackupRootDir())) { + continue; + } + List<TableName> tables = info.getTableNames(); + for (TableName tableName : tables) { + if (set.contains(tableName)) { + ArrayList<BackupInfo> list = tableHistoryMap.get(tableName); + if (list == null) { + list = new ArrayList<BackupInfo>(); + tableHistoryMap.put(tableName, list); + } + list.add(info); + } + } + } + return tableHistoryMap; + } + + /** + * Get all backup sessions with a given state (in descending order by time) + * @param state backup session state + * @return history info of backup info objects + * @throws IOException exception + */ + public ArrayList<BackupInfo> getBackupInfos(BackupState state) throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace("get backup infos from backup system table"); + } + + Scan scan = createScanForBackupHistory(); + ArrayList<BackupInfo> list = new ArrayList<BackupInfo>(); + + try (Table table = connection.getTable(tableName); + ResultScanner scanner = table.getScanner(scan)) { + Result res = null; + while ((res = scanner.next()) != null) { + res.advance(); + BackupInfo context = cellToBackupInfo(res.current()); + if (state != BackupState.ANY && context.getState() != state) { + continue; + } + list.add(context); + } + return list; + } + } + + /** + * Write the current timestamps for each regionserver to backup system table after a successful + * full or incremental backup. The saved timestamp is of the last log file that was backed up + * already. + * @param tables tables + * @param newTimestamps timestamps + * @param backupRoot root directory path to backup + * @throws IOException exception + */ + public void writeRegionServerLogTimestamp(Set<TableName> tables, + HashMap<String, Long> newTimestamps, String backupRoot) throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace("write RS log time stamps to backup system table for tables [" + + StringUtils.join(tables, ",") + "]"); + } + List<Put> puts = new ArrayList<Put>(); + for (TableName table : tables) { + byte[] smapData = toTableServerTimestampProto(table, newTimestamps).toByteArray(); + Put put = createPutForWriteRegionServerLogTimestamp(table, smapData, backupRoot); + puts.add(put); + } + try (Table table = connection.getTable(tableName)) { + table.put(puts); + } + } + + /** + * Read the timestamp for each region server log after the last successful backup. Each table has + * its own set of the timestamps. The info is stored for each table as a concatenated string of + * rs->timestapmp + * @param backupRoot root directory path to backup + * @return the timestamp for each region server. key: tableName value: + * RegionServer,PreviousTimeStamp + * @throws IOException exception + */ + public HashMap<TableName, HashMap<String, Long>> readLogTimestampMap(String backupRoot) + throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace("read RS log ts from backup system table for root=" + backupRoot); + } + + HashMap<TableName, HashMap<String, Long>> tableTimestampMap = + new HashMap<TableName, HashMap<String, Long>>(); + + Scan scan = createScanForReadLogTimestampMap(backupRoot); + try (Table table = connection.getTable(tableName); + ResultScanner scanner = table.getScanner(scan)) { + Result res = null; + while ((res = scanner.next()) != null) { + res.advance(); + Cell cell = res.current(); + byte[] row = CellUtil.cloneRow(cell); + String tabName = getTableNameForReadLogTimestampMap(row); + TableName tn = TableName.valueOf(tabName); + byte[] data = CellUtil.cloneValue(cell); + if (data == null) { + throw new IOException("Data of last backup data from backup system table " + + "is empty. Create a backup first."); + } + if (data != null && data.length > 0) { + HashMap<String, Long> lastBackup = + fromTableServerTimestampProto(BackupProtos.TableServerTimestamp.parseFrom(data)); + tableTimestampMap.put(tn, lastBackup); + } + } + return tableTimestampMap; + } + } + + private BackupProtos.TableServerTimestamp toTableServerTimestampProto(TableName table, + Map<String, Long> map) { + BackupProtos.TableServerTimestamp.Builder tstBuilder = + BackupProtos.TableServerTimestamp.newBuilder(); + tstBuilder.setTableName(org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil + .toProtoTableName(table)); + + for (Entry<String, Long> entry : map.entrySet()) { + BackupProtos.ServerTimestamp.Builder builder = BackupProtos.ServerTimestamp.newBuilder(); + HBaseProtos.ServerName.Builder snBuilder = HBaseProtos.ServerName.newBuilder(); + ServerName sn = ServerName.parseServerName(entry.getKey()); + snBuilder.setHostName(sn.getHostname()); + snBuilder.setPort(sn.getPort()); + builder.setServerName(snBuilder.build()); + builder.setTimestamp(entry.getValue()); + tstBuilder.addServerTimestamp(builder.build()); + } + + return tstBuilder.build(); + } + + private HashMap<String, Long> fromTableServerTimestampProto( + BackupProtos.TableServerTimestamp proto) { + HashMap<String, Long> map = new HashMap<String, Long>(); + List<BackupProtos.ServerTimestamp> list = proto.getServerTimestampList(); + for (BackupProtos.ServerTimestamp st : list) { + ServerName sn = + org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.toServerName(st.getServerName()); + map.put(sn.getHostname() + ":" + sn.getPort(), st.getTimestamp()); + } + return map; + } + + /** + * Return the current tables covered by incremental backup. + * @param backupRoot root directory path to backup + * @return set of tableNames + * @throws IOException exception + */ + public Set<TableName> getIncrementalBackupTableSet(String backupRoot) throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace("get incremental backup table set from backup system table"); + } + TreeSet<TableName> set = new TreeSet<>(); + + try (Table table = connection.getTable(tableName)) { + Get get = createGetForIncrBackupTableSet(backupRoot); + Result res = table.get(get); + if (res.isEmpty()) { + return set; + } + List<Cell> cells = res.listCells(); + for (Cell cell : cells) { + // qualifier = table name - we use table names as qualifiers + set.add(TableName.valueOf(CellUtil.cloneQualifier(cell))); + } + return set; + } + } + + /** + * Add tables to global incremental backup set + * @param tables set of tables + * @param backupRoot root directory path to backup + * @throws IOException exception + */ + public void addIncrementalBackupTableSet(Set<TableName> tables, String backupRoot) + throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace("Add incremental backup table set to backup system table. ROOT=" + backupRoot + + " tables [" + StringUtils.join(tables, " ") + "]"); + for (TableName table : tables) { + LOG.debug(table); + } + } + try (Table table = connection.getTable(tableName)) { + Put put = createPutForIncrBackupTableSet(tables, backupRoot); + table.put(put); + } + } + + /** + * Deletes incremental backup set for a backup destination + * @param backupRoot backup root + */ + + public void deleteIncrementalBackupTableSet(String backupRoot) throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace("Delete incremental backup table set to backup system table. ROOT=" + backupRoot); + } + try (Table table = connection.getTable(tableName)) { + Delete delete = createDeleteForIncrBackupTableSet(backupRoot); + table.delete(delete); + } + } + + /** + * Register WAL files as eligible for deletion + * @param files files + * @param backupId backup id + * @param backupRoot root directory path to backup destination + * @throws IOException exception + */ + public void addWALFiles(List<String> files, String backupId, String backupRoot) + throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace("add WAL files to backup system table: " + backupId + " " + backupRoot + " files [" + + StringUtils.join(files, ",") + "]"); + for (String f : files) { + LOG.debug("add :" + f); + } + } + try (Table table = connection.getTable(tableName)) { + List<Put> puts = createPutsForAddWALFiles(files, backupId, backupRoot); + table.put(puts); + } + } + + /** + * Register WAL files as eligible for deletion + * @param backupRoot root directory path to backup + * @throws IOException exception + */ + public Iterator<WALItem> getWALFilesIterator(String backupRoot) throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace("get WAL files from backup system table"); + } + final Table table = connection.getTable(tableName); + Scan scan = createScanForGetWALs(backupRoot); + final ResultScanner scanner = table.getScanner(scan); + final Iterator<Result> it = scanner.iterator(); + return new Iterator<WALItem>() { + + @Override + public boolean hasNext() { + boolean next = it.hasNext(); + if (!next) { + // close all + try { + scanner.close(); + table.close(); + } catch (IOException e) { + LOG.error("Close WAL Iterator", e); + } + } + return next; + } + + @Override + public WALItem next() { + Result next = it.next(); + List<Cell> cells = next.listCells(); + byte[] buf = cells.get(0).getValueArray(); + int len = cells.get(0).getValueLength(); + int offset = cells.get(0).getValueOffset(); + String backupId = new String(buf, offset, len); + buf = cells.get(1).getValueArray(); + len = cells.get(1).getValueLength(); + offset = cells.get(1).getValueOffset(); + String walFile = new String(buf, offset, len); + buf = cells.get(2).getValueArray(); + len = cells.get(2).getValueLength(); + offset = cells.get(2).getValueOffset(); + String backupRoot = new String(buf, offset, len); + return new WALItem(backupId, walFile, backupRoot); + } + + @Override + public void remove() { + // not implemented + throw new RuntimeException("remove is not supported"); + } + }; + + } + + /** + * Check if WAL file is eligible for deletion Future: to support all backup destinations + * @param file name of a file to check + * @return true, if deletable, false otherwise. + * @throws IOException exception + * TODO: multiple backup destination support + */ + public boolean isWALFileDeletable(String file) throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace("Check if WAL file has been already backed up in backup system table " + file); + } + try (Table table = connection.getTable(tableName)) { + Get get = createGetForCheckWALFile(file); + Result res = table.get(get); + if (res.isEmpty()) { + return false; + } + return true; + } + } + + /** + * Checks if we have at least one backup session in backup system table This API is used by + * BackupLogCleaner + * @return true, if - at least one session exists in backup system table table + * @throws IOException exception + */ + public boolean hasBackupSessions() throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace("Has backup sessions from backup system table"); + } + boolean result = false; + Scan scan = createScanForBackupHistory(); + scan.setCaching(1); + try (Table table = connection.getTable(tableName); + ResultScanner scanner = table.getScanner(scan)) { + if (scanner.next() != null) { + result = true; + } + return result; + } + } + + /** + * BACKUP SETS + */ + + /** + * Get backup set list + * @return backup set list + * @throws IOException + */ + public List<String> listBackupSets() throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace(" Backup set list"); + } + List<String> list = new ArrayList<String>(); + Table table = null; + ResultScanner scanner = null; + try { + table = connection.getTable(tableName); + Scan scan = createScanForBackupSetList(); + scan.setMaxVersions(1); + scanner = table.getScanner(scan); + Result res = null; + while ((res = scanner.next()) != null) { + res.advance(); + list.add(cellKeyToBackupSetName(res.current())); + } + return list; + } finally { + if (scanner != null) { + scanner.close(); + } + if (table != null) { + table.close(); + } + } + } + + /** + * Get backup set description (list of tables) + * @param name set's name + * @return list of tables in a backup set + * @throws IOException + */ + public List<TableName> describeBackupSet(String name) throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace(" Backup set describe: " + name); + } + Table table = null; + try { + table = connection.getTable(tableName); + Get get = createGetForBackupSet(name); + Result res = table.get(get); + if (res.isEmpty()) return null; + res.advance(); + String[] tables = cellValueToBackupSet(res.current()); + return toList(tables); + } finally { + if (table != null) { + table.close(); + } + } + } + + private List<TableName> toList(String[] tables) { + List<TableName> list = new ArrayList<TableName>(tables.length); + for (String name : tables) { + list.add(TableName.valueOf(name)); + } + return list; + } + + /** + * Add backup set (list of tables) + * @param name set name + * @param newTables list of tables, comma-separated + * @throws IOException + */ + public void addToBackupSet(String name, String[] newTables) throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace("Backup set add: " + name + " tables [" + StringUtils.join(newTables, " ") + "]"); + } + Table table = null; + String[] union = null; + try { + table = connection.getTable(tableName); + Get get = createGetForBackupSet(name); + Result res = table.get(get); + if (res.isEmpty()) { + union = newTables; + } else { + res.advance(); + String[] tables = cellValueToBackupSet(res.current()); + union = merge(tables, newTables); + } + Put put = createPutForBackupSet(name, union); + table.put(put); + } finally { + if (table != null) { + table.close(); + } + } + } + + private String[] merge(String[] tables, String[] newTables) { + List<String> list = new ArrayList<String>(); + // Add all from tables + for (String t : tables) { + list.add(t); + } + for (String nt : newTables) { + if (list.contains(nt)) continue; + list.add(nt); + } + String[] arr = new String[list.size()]; + list.toArray(arr); + return arr; + } + + /** + * Remove tables from backup set (list of tables) + * @param name set name + * @param toRemove list of tables + * @throws IOException + */ + public void removeFromBackupSet(String name, String[] toRemove) throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace(" Backup set remove from : " + name + " tables [" + StringUtils.join(toRemove, " ") + + "]"); + } + Table table = null; + String[] disjoint = null; + String[] tables = null; + try { + table = connection.getTable(tableName); + Get get = createGetForBackupSet(name); + Result res = table.get(get); + if (res.isEmpty()) { + LOG.warn("Backup set '" + name + "' not found."); + return; + } else { + res.advance(); + tables = cellValueToBackupSet(res.current()); + disjoint = disjoin(tables, toRemove); + } + if (disjoint.length > 0 && disjoint.length != tables.length) { + Put put = createPutForBackupSet(name, disjoint); + table.put(put); + } else if (disjoint.length == tables.length) { + LOG.warn("Backup set '" + name + "' does not contain tables [" + + StringUtils.join(toRemove, " ") + "]"); + } else { // disjoint.length == 0 and tables.length >0 + // Delete backup set + LOG.info("Backup set '" + name + "' is empty. Deleting."); + deleteBackupSet(name); + } + } finally { + if (table != null) { + table.close(); + } + } + } + + private String[] disjoin(String[] tables, String[] toRemove) { + List<String> list = new ArrayList<String>(); + // Add all from tables + for (String t : tables) { + list.add(t); + } + for (String nt : toRemove) { + if (list.contains(nt)) { + list.remove(nt); + } + } + String[] arr = new String[list.size()]; + list.toArray(arr); + return arr; + } + + /** + * Delete backup set + * @param name set's name + * @throws IOException + */ + public void deleteBackupSet(String name) throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace(" Backup set delete: " + name); + } + Table table = null; + try { + table = connection.getTable(tableName); + Delete del = createDeleteForBackupSet(name); + table.delete(del); + } finally { + if (table != null) { + table.close(); + } + } + } + + /** + * Get backup system table descriptor + * @return table's descriptor + */ + public static HTableDescriptor getSystemTableDescriptor(Configuration conf) { + + HTableDescriptor tableDesc = new HTableDescriptor(getTableName(conf)); + HColumnDescriptor colSessionsDesc = new HColumnDescriptor(SESSIONS_FAMILY); + colSessionsDesc.setMaxVersions(1); + // Time to keep backup sessions (secs) + Configuration config = HBaseConfiguration.create(); + int ttl = + config.getInt(BackupRestoreConstants.BACKUP_SYSTEM_TTL_KEY, + BackupRestoreConstants.BACKUP_SYSTEM_TTL_DEFAULT); + colSessionsDesc.setTimeToLive(ttl); + tableDesc.addFamily(colSessionsDesc); + HColumnDescriptor colMetaDesc = new HColumnDescriptor(META_FAMILY); + tableDesc.addFamily(colMetaDesc); + return tableDesc; + } + + public static TableName getTableName(Configuration conf) { + String name = + conf.get(BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_KEY, + BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_DEFAULT); + return TableName.valueOf(name); + } + + public static String getTableNameAsString(Configuration conf) { + return getTableName(conf).getNameAsString(); + } + + public static String getSnapshotName(Configuration conf) { + return "snapshot_" + getTableNameAsString(conf).replace(":", "_"); + } + + /** + * Creates Put operation for a given backup info object + * @param context backup info + * @return put operation + * @throws IOException exception + */ + private Put createPutForBackupInfo(BackupInfo context) throws IOException { + Put put = new Put(rowkey(BACKUP_INFO_PREFIX, context.getBackupId())); + put.addColumn(BackupSystemTable.SESSIONS_FAMILY, Bytes.toBytes("context"), + context.toByteArray()); + return put; + } + + /** + * Creates Get operation for a given backup id + * @param backupId backup's ID + * @return get operation + * @throws IOException exception + */ + private Get createGetForBackupInfo(String backupId) throws IOException { + Get get = new Get(rowkey(BACKUP_INFO_PREFIX, backupId)); + get.addFamily(BackupSystemTable.SESSIONS_FAMILY); + get.setMaxVersions(1); + return get; + } + + /** + * Creates Delete operation for a given backup id + * @param backupId backup's ID + * @return delete operation + * @throws IOException exception + */ + private Delete createDeleteForBackupInfo(String backupId) { + Delete del = new Delete(rowkey(BACKUP_INFO_PREFIX, backupId)); + del.addFamily(BackupSystemTable.SESSIONS_FAMILY); + return del; + } + + /** + * Converts Result to BackupInfo + * @param res HBase result + * @return backup info instance + * @throws IOException exception + */ + private BackupInfo resultToBackupInfo(Result res) throws IOException { + res.advance(); + Cell cell = res.current(); + return cellToBackupInfo(cell); + } + + /** + * Creates Get operation to retrieve start code from backup system table + * @return get operation + * @throws IOException exception + */ + private Get createGetForStartCode(String rootPath) throws IOException { + Get get = new Get(rowkey(START_CODE_ROW, rootPath)); + get.addFamily(BackupSystemTable.META_FAMILY); + get.setMaxVersions(1); + return get; + } + + /** + * Creates Put operation to store start code to backup system table + * @return put operation + * @throws IOException exception + */ + private Put createPutForStartCode(String startCode, String rootPath) { + Put put = new Put(rowkey(START_CODE_ROW, rootPath)); + put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("startcode"), + Bytes.toBytes(startCode)); + return put; + } + + /** + * Creates Get to retrieve incremental backup table set from backup system table + * @return get operation + * @throws IOException exception + */ + private Get createGetForIncrBackupTableSet(String backupRoot) throws IOException { + Get get = new Get(rowkey(INCR_BACKUP_SET, backupRoot)); + get.addFamily(BackupSystemTable.META_FAMILY); + get.setMaxVersions(1); + return get; + } + + /** + * Creates Put to store incremental backup table set + * @param tables tables + * @return put operation + */ + private Put createPutForIncrBackupTableSet(Set<TableName> tables, String backupRoot) { + Put put = new Put(rowkey(INCR_BACKUP_SET, backupRoot)); + for (TableName table : tables) { + put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes(table.getNameAsString()), + EMPTY_VALUE); + } + return put; + } + + /** + * Creates Delete for incremental backup table set + * @param backupRoot backup root + * @return delete operation + */ + private Delete createDeleteForIncrBackupTableSet(String backupRoot) { + Delete delete = new Delete(rowkey(INCR_BACKUP_SET, backupRoot)); + delete.addFamily(BackupSystemTable.META_FAMILY); + return delete; + } + + /** + * Creates Scan operation to load backup history + * @return scan operation + */ + private Scan createScanForBackupHistory() { + Scan scan = new Scan(); + byte[] startRow = Bytes.toBytes(BACKUP_INFO_PREFIX); + byte[] stopRow = Arrays.copyOf(startRow, startRow.length); + stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1); + scan.setStartRow(startRow); + scan.setStopRow(stopRow); + scan.addFamily(BackupSystemTable.SESSIONS_FAMILY); + scan.setMaxVersions(1); + return scan; + } + + /** + * Converts cell to backup info instance. + * @param current current cell + * @return backup backup info instance + * @throws IOException exception + */ + private BackupInfo cellToBackupInfo(Cell current) throws IOException { + byte[] data = CellUtil.cloneValue(current); + return BackupInfo.fromByteArray(data); + } + + /** + * Creates Put to write RS last roll log timestamp map + * @param table table + * @param smap map, containing RS:ts + * @return put operation + */ + private Put createPutForWriteRegionServerLogTimestamp(TableName table, byte[] smap, + String backupRoot) { + Put put = new Put(rowkey(TABLE_RS_LOG_MAP_PREFIX, backupRoot, NULL, table.getNameAsString())); + put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("log-roll-map"), smap); + return put; + } + + /** + * Creates Scan to load table-> { RS -> ts} map of maps + * @return scan operation + */ + private Scan createScanForReadLogTimestampMap(String backupRoot) { + Scan scan = new Scan(); + byte[] startRow = rowkey(TABLE_RS_LOG_MAP_PREFIX, backupRoot); + byte[] stopRow = Arrays.copyOf(startRow, startRow.length); + stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1); + scan.setStartRow(startRow); + scan.setStopRow(stopRow); + scan.addFamily(BackupSystemTable.META_FAMILY); + + return scan; + } + + /** + * Get table name from rowkey + * @param cloneRow rowkey + * @return table name + */ + private String getTableNameForReadLogTimestampMap(byte[] cloneRow) { + String s = Bytes.toString(cloneRow); + int index = s.lastIndexOf(NULL); + return s.substring(index + 1); + } + + /** + * Creates Put to store RS last log result + * @param server server name + * @param timestamp log roll result (timestamp) + * @return put operation + */ + private Put createPutForRegionServerLastLogRollResult(String server, Long timestamp, + String backupRoot) { + Put put = new Put(rowkey(RS_LOG_TS_PREFIX, backupRoot, NULL, server)); + put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("rs-log-ts"), + Bytes.toBytes(timestamp)); + return put; + } + + /** + * Creates Scan operation to load last RS log roll results + * @return scan operation + */ + private Scan createScanForReadRegionServerLastLogRollResult(String backupRoot) { + Scan scan = new Scan(); + byte[] startRow = rowkey(RS_LOG_TS_PREFIX, backupRoot); + byte[] stopRow = Arrays.copyOf(startRow, startRow.length); + stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1); + scan.setStartRow(startRow); + scan.setStopRow(stopRow); + scan.addFamily(BackupSystemTable.META_FAMILY); + scan.setMaxVersions(1); + + return scan; + } + + /** + * Get server's name from rowkey + * @param row rowkey + * @return server's name + */ + private String getServerNameForReadRegionServerLastLogRollResult(byte[] row) { + String s = Bytes.toString(row); + int index = s.lastIndexOf(NULL); + return s.substring(index + 1); + } + + /* + * Creates Put's for bulk load resulting from running LoadIncrementalHFiles + */ + static List<Put> createPutForCommittedBulkload(TableName table, byte[] region, + Map<byte[], List<Path>> finalPaths) { + List<Put> puts = new ArrayList<>(); + for (Map.Entry<byte[], List<Path>> entry : finalPaths.entrySet()) { + for (Path path : entry.getValue()) { + String file = path.toString(); + int lastSlash = file.lastIndexOf("/"); + String filename = file.substring(lastSlash + 1); + Put put = + new Put(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM, + Bytes.toString(region), BLK_LD_DELIM, filename)); + put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, table.getName()); + put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, entry.getKey()); + put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, file.getBytes()); + put.addColumn(BackupSystemTable.META_FAMILY, STATE_COL, BL_COMMIT); + puts.add(put); + LOG.debug("writing done bulk path " + file + " for " + table + " " + Bytes.toString(region)); + } + } + return puts; + } + + public static void snapshot(Connection conn) throws IOException { + + try (Admin admin = conn.getAdmin();) { + Configuration conf = conn.getConfiguration(); + admin.snapshot(BackupSystemTable.getSnapshotName(conf), BackupSystemTable.getTableName(conf)); + } + } + + public static void restoreFromSnapshot(Connection conn) throws IOException { + + Configuration conf = conn.getConfiguration(); + LOG.debug("Restoring " + BackupSystemTable.getTableNameAsString(conf) + " from snapshot"); + try (Admin admin = conn.getAdmin();) { + String snapshotName = BackupSystemTable.getSnapshotName(conf); + if (snapshotExists(admin, snapshotName)) { + admin.disableTable(BackupSystemTable.getTableName(conf)); + admin.restoreSnapshot(snapshotName); + admin.enableTable(BackupSystemTable.getTableName(conf)); + LOG.debug("Done restoring backup system table"); + } else { + // Snapshot does not exists, i.e completeBackup failed after + // deleting backup system table snapshot + // In this case we log WARN and proceed + LOG.warn("Could not restore backup system table. Snapshot " + snapshotName + + " does not exists."); + } + } + } + + protected static boolean snapshotExists(Admin admin, String snapshotName) throws IOException { + + List<SnapshotDescription> list = admin.listSnapshots(); + for (SnapshotDescription desc : list) { + if (desc.getName().equals(snapshotName)) { + return true; + } + } + return false; + } + + public static boolean snapshotExists(Connection conn) throws IOException { + return snapshotExists(conn.getAdmin(), getSnapshotName(conn.getConfiguration())); + } + + public static void deleteSnapshot(Connection conn) throws IOException { + + Configuration conf = conn.getConfiguration(); + LOG.debug("Deleting " + BackupSystemTable.getSnapshotName(conf) + " from the system"); + try (Admin admin = conn.getAdmin();) { + String snapshotName = BackupSystemTable.getSnapshotName(conf); + if (snapshotExists(admin, snapshotName)) { + admin.deleteSnapshot(snapshotName); + LOG.debug("Done deleting backup system table snapshot"); + } else { + LOG.error("Snapshot " + snapshotName + " does not exists"); + } + } + } + + /* + * Creates Put's for bulk load resulting from running LoadIncrementalHFiles + */ + static List<Put> createPutForPreparedBulkload(TableName table, byte[] region, + final byte[] family, final List<Pair<Path, Path>> pairs) { + List<Put> puts = new ArrayList<>(); + for (Pair<Path, Path> pair : pairs) { + Path path = pair.getSecond(); + String file = path.toString(); + int lastSlash = file.lastIndexOf("/"); + String filename = file.substring(lastSlash + 1); + Put put = + new Put(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM, Bytes.toString(region), + BLK_LD_DELIM, filename)); + put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, table.getName()); + put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, family); + put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, file.getBytes()); + put.addColumn(BackupSystemTable.META_FAMILY, STATE_COL, BL_PREPARE); + puts.add(put); + LOG.debug("writing raw bulk path " + file + " for " + table + " " + Bytes.toString(region)); + } + return puts; + } + + public static List<Delete> createDeleteForOrigBulkLoad(List<TableName> lst) { + List<Delete> lstDels = new ArrayList<>(); + for (TableName table : lst) { + Delete del = new Delete(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM)); + del.addFamily(BackupSystemTable.META_FAMILY); + lstDels.add(del); + } + return lstDels; + } + + private Put createPutForDeleteOperation(String[] backupIdList) { + + byte[] value = Bytes.toBytes(StringUtils.join(backupIdList, ",")); + Put put = new Put(DELETE_OP_ROW); + put.addColumn(META_FAMILY, FAM_COL, value); + return put; + } + + private Delete createDeleteForBackupDeleteOperation() { + + Delete delete = new Delete(DELETE_OP_ROW); + delete.addFamily(META_FAMILY); + return delete; + } + + private Get createGetForDeleteOperation() { + + Get get = new Get(DELETE_OP_ROW); + get.addFamily(META_FAMILY); + return get; + } + + public void startDeleteOperation(String[] backupIdList) throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace("Start delete operation for backups: " + StringUtils.join(backupIdList)); + } + Put put = createPutForDeleteOperation(backupIdList); + try (Table table = connection.getTable(tableName)) { + table.put(put); + } + } + + public void finishDeleteOperation() throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace("Finsih delete operation for backup ids "); + } + Delete delete = createDeleteForBackupDeleteOperation(); + try (Table table = connection.getTable(tableName)) { + table.delete(delete); + } + } + + public String[] getListOfBackupIdsFromDeleteOperation() throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace("Get delete operation for backup ids "); + } + Get get = createGetForDeleteOperation(); + try (Table table = connection.getTable(tableName)) { + Result res = table.get(get); + if (res.isEmpty()) { + return null; + } + Cell cell = res.listCells().get(0); + byte[] val = CellUtil.cloneValue(cell); + if (val.length == 0) { + return null; + } + return new String(val).split(","); + } + } + + private Put createPutForMergeOperation(String[] backupIdList) { + + byte[] value = Bytes.toBytes(StringUtils.join(backupIdList, ",")); + Put put = new Put(MERGE_OP_ROW); + put.addColumn(META_FAMILY, FAM_COL, value); + return put; + } + + public boolean isMergeInProgress() throws IOException { + Get get = new Get(MERGE_OP_ROW); + try (Table table = connection.getTable(tableName)) { + Result res = table.get(get); + if (res.isEmpty()) { + return false; + } + return true; + } + } + + private Put createPutForUpdateTablesForMerge(List<TableName> tables) { + + byte[] value = Bytes.toBytes(StringUtils.join(tables, ",")); + Put put = new Put(MERGE_OP_ROW); + put.addColumn(META_FAMILY, PATH_COL, value); + return put; + } + + private Delete createDeleteForBackupMergeOperation() { + + Delete delete = new Delete(MERGE_OP_ROW); + delete.addFamily(META_FAMILY); + return delete; + } + + private Get createGetForMergeOperation() { + + Get get = new Get(MERGE_OP_ROW); + get.addFamily(META_FAMILY); + return get; + } + + public void startMergeOperation(String[] backupIdList) throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace("Start merge operation for backups: " + StringUtils.join(backupIdList)); + } + Put put = createPutForMergeOperation(backupIdList); + try (Table table = connection.getTable(tableName)) { + table.put(put); + } + } + + public void updateProcessedTablesForMerge(List<TableName> tables) throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace("Update tables for merge : " + StringUtils.join(tables, ",")); + } + Put put = createPutForUpdateTablesForMerge(tables); + try (Table table = connection.getTable(tableName)) { + table.put(put); + } + } + + public void finishMergeOperation() throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace("Finsih merge operation for backup ids "); + } + Delete delete = createDeleteForBackupMergeOperation(); + try (Table table = connection.getTable(tableName)) { + table.delete(delete); + } + } + + public String[] getListOfBackupIdsFromMergeOperation() throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace("Get backup ids for merge operation"); + } + Get get = createGetForMergeOperation(); + try (Table table = connection.getTable(tableName)) { + Result res = table.get(get); + if (res.isEmpty()) { + return null; + } + Cell cell = res.listCells().get(0); + byte[] val = CellUtil.cloneValue(cell); + if (val.length == 0) { + return null; + } + return new String(val).split(","); + } + } + + static Scan createScanForOrigBulkLoadedFiles(TableName table) throws IOException { + Scan scan = new Scan(); + byte[] startRow = rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM); + byte[] stopRow = Arrays.copyOf(startRow, startRow.length); + stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1); + scan.withStartRow(startRow); + scan.withStopRow(stopRow); + scan.addFamily(BackupSystemTable.META_FAMILY); + scan.setMaxVersions(1); + return scan; + } + + static String getTableNameFromOrigBulkLoadRow(String rowStr) { + String[] parts = rowStr.split(BLK_LD_DELIM); + return parts[1]; + } + + static String getRegionNameFromOrigBulkLoadRow(String rowStr) { + // format is bulk : namespace : table : region : file + String[] parts = rowStr.split(BLK_LD_DELIM); + int idx = 3; + if (parts.length == 4) { + // the table is in default namespace + idx = 2; + } + LOG.debug("bulk row string " + rowStr + " region " + parts[idx]); + return parts[idx]; + } + + /* + * Used to query bulk loaded hfiles which have been copied by incremental backup + * @param backupId the backup Id. It can be null when querying for all tables + * @return the Scan object + */ + static Scan createScanForBulkLoadedFiles(String backupId) throws IOException { + Scan scan = new Scan(); + byte[] startRow = + backupId == null ? BULK_LOAD_PREFIX_BYTES : rowkey(BULK_LOAD_PREFIX, backupId + + BLK_LD_DELIM); + byte[] stopRow = Arrays.copyOf(startRow, startRow.length); + stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1); + scan.setStartRow(startRow); + scan.setStopRow(stopRow); + // scan.setTimeRange(lower, Long.MAX_VALUE); + scan.addFamily(BackupSystemTable.META_FAMILY); + scan.setMaxVersions(1); + return scan; + } + + static Put createPutForBulkLoadedFile(TableName tn, byte[] fam, String p, String backupId, + long ts, int idx) { + Put put = new Put(rowkey(BULK_LOAD_PREFIX, backupId + BLK_LD_DELIM + ts + BLK_LD_DELIM + idx)); + put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, tn.getName()); + put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, fam); + put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, p.getBytes()); + return put; + } + + /** + * Creates put list for list of WAL files + * @param files list of WAL file paths + * @param backupId backup id + * @return put list + * @throws IOException exception + */ + private List<Put> + createPutsForAddWALFiles(List<String> files, String backupId, String backupRoot) + throws IOException { + + List<Put> puts = new ArrayList<Put>(); + for (String file : files) { + Put put = new Put(rowkey(WALS_PREFIX, BackupUtils.getUniqueWALFileNamePart(file))); + put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("backupId"), + Bytes.toBytes(backupId)); + put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("file"), Bytes.toBytes(file)); + put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("root"), Bytes.toBytes(backupRoot)); + puts.add(put); + } + return puts; + } + + /** + * Creates Scan operation to load WALs + * @param backupRoot path to backup destination + * @return scan operation + */ + private Scan createScanForGetWALs(String backupRoot) { + // TODO: support for backupRoot + Scan scan = new Scan(); + byte[] startRow = Bytes.toBytes(WALS_PREFIX); + byte[] stopRow = Arrays.copyOf(startRow, startRow.length); + stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1); + scan.setStartRow(startRow); + scan.setStopRow(stopRow); + scan.addFamily(BackupSystemTable.META_FAMILY); + return scan; + } + + /** + * Creates Get operation for a given wal file name TODO: support for backup destination + * @param file file + * @return get operation + * @throws IOException exception + */ + private Get createGetForCheckWALFile(String file) throws IOException { + Get get = new Get(rowkey(WALS_PREFIX, BackupUtils.getUniqueWALFileNamePart(file))); + // add backup root column + get.addFamily(BackupSystemTable.META_FAMILY); + return get; + } + + /** + * Creates Scan operation to load backup set list + * @return scan operation + */ + private Scan createScanForBackupSetList() { + Scan scan = new Scan(); + byte[] startRow = Bytes.toBytes(SET_KEY_PREFIX); + byte[] stopRow = Arrays.copyOf(startRow, startRow.length); + stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1); + scan.setStartRow(startRow); + scan.setStopRow(stopRow); + scan.addFamily(BackupSystemTable.META_FAMILY); + return scan; + } + + /** + * Creates Get operation to load backup set content + * @return get operation + */ + private Get createGetForBackupSet(String name) { + Get get = new Get(rowkey(SET_KEY_PREFIX, name)); + get.addFamily(BackupSystemTable.META_FAMILY); + return get; + } + + /** + * Creates Delete operation to delete backup set content + * @param name backup set's name + * @return delete operation + */ + private Delete createDeleteForBackupSet(String name) { + Delete del = new Delete(rowkey(SET_KEY_PREFIX, name)); + del.addFamily(BackupSystemTable.META_FAMILY); + return del; + } + + /** + * Creates Put operation to update backup set content + * @param name backup set's name + * @param tables list of tables + * @return put operation + */ + private Put createPutForBackupSet(String name, String[] tables) { + Put put = new Put(rowkey(SET_KEY_PREFIX, name)); + byte[] value = convertToByteArray(tables); + put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("tables"), value); + return put; + } + + private byte[] convertToByteArray(String[] tables) { + return StringUtils.join(tables, ",").getBytes(); + } + + /** + * Converts cell to backup set list. + * @param current current cell + * @return backup set as array of table names + * @throws IOException + */ + private String[] cellValueToBackupSet(Cell current) throws IOException { + byte[] data = CellUtil.cloneValue(current); + if (data != null && data.length > 0) { + return Bytes.toString(data).split(","); + } else { + return new String[0]; + } + } + + /** + * Converts cell key to backup set name. + * @param current current cell + * @return backup set name + * @throws IOException + */ + private String cellKeyToBackupSetName(Cell current) throws IOException { + byte[] data = CellUtil.cloneRow(current); + return Bytes.toString(data).substring(SET_KEY_PREFIX.length()); + } + + private static byte[] rowkey(String s, String... other) { + StringBuilder sb = new StringBuilder(s); + for (String ss : other) { + sb.append(ss); + } + return sb.toString().getBytes(); + } + +}
http://git-wip-us.apache.org/repos/asf/hbase/blob/2dda3712/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java ---------------------------------------------------------------------- diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java new file mode 100644 index 0000000..e323e96 --- /dev/null +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java @@ -0,0 +1,224 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.backup.impl; + +import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.BACKUP_ATTEMPTS_PAUSE_MS_KEY; +import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.BACKUP_MAX_ATTEMPTS_KEY; +import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.DEFAULT_BACKUP_ATTEMPTS_PAUSE_MS; +import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.DEFAULT_BACKUP_MAX_ATTEMPTS; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.backup.BackupCopyJob; +import org.apache.hadoop.hbase.backup.BackupInfo; +import org.apache.hadoop.hbase.backup.BackupInfo.BackupPhase; +import org.apache.hadoop.hbase.backup.BackupInfo.BackupState; +import org.apache.hadoop.hbase.backup.BackupRequest; +import org.apache.hadoop.hbase.backup.BackupRestoreFactory; +import org.apache.hadoop.hbase.backup.BackupType; +import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager; +import org.apache.hadoop.hbase.backup.util.BackupUtils; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; + +/** + * Full table backup implementation + * + */ +@InterfaceAudience.Private +public class FullTableBackupClient extends TableBackupClient { + private static final Log LOG = LogFactory.getLog(FullTableBackupClient.class); + + public FullTableBackupClient() { + } + + public FullTableBackupClient(final Connection conn, final String backupId, BackupRequest request) + throws IOException { + super(conn, backupId, request); + } + + /** + * Do snapshot copy. + * @param backupInfo backup info + * @throws Exception exception + */ + protected void snapshotCopy(BackupInfo backupInfo) throws Exception { + LOG.info("Snapshot copy is starting."); + + // set overall backup phase: snapshot_copy + backupInfo.setPhase(BackupPhase.SNAPSHOTCOPY); + + // call ExportSnapshot to copy files based on hbase snapshot for backup + // ExportSnapshot only support single snapshot export, need loop for multiple tables case + BackupCopyJob copyService = BackupRestoreFactory.getBackupCopyJob(conf); + + // number of snapshots matches number of tables + float numOfSnapshots = backupInfo.getSnapshotNames().size(); + + LOG.debug("There are " + (int) numOfSnapshots + " snapshots to be copied."); + + for (TableName table : backupInfo.getTables()) { + // Currently we simply set the sub copy tasks by counting the table snapshot number, we can + // calculate the real files' size for the percentage in the future. + // backupCopier.setSubTaskPercntgInWholeTask(1f / numOfSnapshots); + int res = 0; + String[] args = new String[4]; + args[0] = "-snapshot"; + args[1] = backupInfo.getSnapshotName(table); + args[2] = "-copy-to"; + args[3] = backupInfo.getTableBackupDir(table); + + LOG.debug("Copy snapshot " + args[1] + " to " + args[3]); + res = copyService.copy(backupInfo, backupManager, conf, BackupType.FULL, args); + // if one snapshot export failed, do not continue for remained snapshots + if (res != 0) { + LOG.error("Exporting Snapshot " + args[1] + " failed with return code: " + res + "."); + + throw new IOException("Failed of exporting snapshot " + args[1] + " to " + args[3] + + " with reason code " + res); + } + LOG.info("Snapshot copy " + args[1] + " finished."); + } + } + + /** + * Backup request execution + * @throws IOException + */ + @Override + public void execute() throws IOException { + try (Admin admin = conn.getAdmin();) { + + // Begin BACKUP + beginBackup(backupManager, backupInfo); + String savedStartCode = null; + boolean firstBackup = false; + // do snapshot for full table backup + + savedStartCode = backupManager.readBackupStartCode(); + firstBackup = savedStartCode == null || Long.parseLong(savedStartCode) == 0L; + if (firstBackup) { + // This is our first backup. Let's put some marker to system table so that we can hold the logs + // while we do the backup. + backupManager.writeBackupStartCode(0L); + } + // We roll log here before we do the snapshot. It is possible there is duplicate data + // in the log that is already in the snapshot. But if we do it after the snapshot, we + // could have data loss. + // A better approach is to do the roll log on each RS in the same global procedure as + // the snapshot. + LOG.info("Execute roll log procedure for full backup ..."); + + Map<String, String> props = new HashMap<String, String>(); + props.put("backupRoot", backupInfo.getBackupRootDir()); + admin.execProcedure(LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_SIGNATURE, + LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_NAME, props); + + newTimestamps = backupManager.readRegionServerLastLogRollResult(); + if (firstBackup) { + // Updates registered log files + // We record ALL old WAL files as registered, because + // this is a first full backup in the system and these + // files are not needed for next incremental backup + List<String> logFiles = BackupUtils.getWALFilesOlderThan(conf, newTimestamps); + backupManager.recordWALFiles(logFiles); + } + + // SNAPSHOT_TABLES: + backupInfo.setPhase(BackupPhase.SNAPSHOT); + for (TableName tableName : tableList) { + String snapshotName = + "snapshot_" + Long.toString(EnvironmentEdgeManager.currentTime()) + "_" + + tableName.getNamespaceAsString() + "_" + tableName.getQualifierAsString(); + + snapshotTable(admin, tableName, snapshotName); + backupInfo.setSnapshotName(tableName, snapshotName); + } + + // SNAPSHOT_COPY: + // do snapshot copy + LOG.debug("snapshot copy for " + backupId); + snapshotCopy(backupInfo); + // Updates incremental backup table set + backupManager.addIncrementalBackupTableSet(backupInfo.getTables()); + + // BACKUP_COMPLETE: + // set overall backup status: complete. Here we make sure to complete the backup. + // After this checkpoint, even if entering cancel process, will let the backup finished + backupInfo.setState(BackupState.COMPLETE); + // The table list in backupInfo is good for both full backup and incremental backup. + // For incremental backup, it contains the incremental backup table set. + backupManager.writeRegionServerLogTimestamp(backupInfo.getTables(), newTimestamps); + + HashMap<TableName, HashMap<String, Long>> newTableSetTimestampMap = + backupManager.readLogTimestampMap(); + + Long newStartCode = + BackupUtils.getMinValue(BackupUtils + .getRSLogTimestampMins(newTableSetTimestampMap)); + backupManager.writeBackupStartCode(newStartCode); + + // backup complete + completeBackup(conn, backupInfo, backupManager, BackupType.FULL, conf); + } catch (Exception e) { + failBackup(conn, backupInfo, backupManager, e, "Unexpected BackupException : ", + BackupType.FULL, conf); + throw new IOException(e); + } + + } + + + protected void snapshotTable(Admin admin, TableName tableName, String snapshotName) + throws IOException { + + int maxAttempts = + conf.getInt(BACKUP_MAX_ATTEMPTS_KEY, DEFAULT_BACKUP_MAX_ATTEMPTS); + int pause = + conf.getInt(BACKUP_ATTEMPTS_PAUSE_MS_KEY, DEFAULT_BACKUP_ATTEMPTS_PAUSE_MS); + int attempts = 0; + + while (attempts++ < maxAttempts) { + try { + admin.snapshot(snapshotName, tableName); + return; + } catch (IOException ee) { + LOG.warn("Snapshot attempt " + attempts + " failed for table " + tableName + + ", sleeping for " + pause + "ms", ee); + if (attempts < maxAttempts) { + try { + Thread.sleep(pause); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + break; + } + } + } + } + throw new IOException("Failed to snapshot table "+ tableName); + } +}