http://git-wip-us.apache.org/repos/asf/hbase/blob/37c65946/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java deleted file mode 100644 index 4dab046..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java +++ /dev/null @@ -1,2051 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.backup.impl; - -import java.io.Closeable; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.TreeMap; -import java.util.TreeSet; - -import org.apache.commons.lang.StringUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellComparator; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.NamespaceDescriptor; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.backup.BackupInfo; -import org.apache.hadoop.hbase.backup.BackupInfo.BackupState; -import org.apache.hadoop.hbase.backup.BackupRestoreConstants; -import org.apache.hadoop.hbase.backup.BackupType; -import org.apache.hadoop.hbase.backup.util.BackupUtils; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.SnapshotDescription; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.shaded.protobuf.generated.BackupProtos; -import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.util.Pair; - -/** - * This class provides API to access backup system table<br> - * - * Backup system table schema:<br> - * <p><ul> - * <li>1. Backup sessions rowkey= "session:"+backupId; value =serialized BackupInfo</li> - * <li>2. Backup start code rowkey = "startcode:"+backupRoot; value = startcode</li> - * <li>3. Incremental backup set rowkey="incrbackupset:"+backupRoot; value=[list of tables]</li> - * <li>4. Table-RS-timestamp map rowkey="trslm:"+backupRoot+table_name; - * value = map[RS-> last WAL timestamp]</li> - * <li>5. RS - WAL ts map rowkey="rslogts:"+backupRoot +server; value = last WAL timestamp</li> - * <li>6. WALs recorded rowkey="wals:"+WAL unique file name; - * value = backupId and full WAL file name</li> - * </ul></p> - */ - -@InterfaceAudience.Private -public final class BackupSystemTable implements Closeable { - private static final Log LOG = LogFactory.getLog(BackupSystemTable.class); - - static class WALItem { - String backupId; - String walFile; - String backupRoot; - - WALItem(String backupId, String walFile, String backupRoot) { - this.backupId = backupId; - this.walFile = walFile; - this.backupRoot = backupRoot; - } - - public String getBackupId() { - return backupId; - } - - public String getWalFile() { - return walFile; - } - - public String getBackupRoot() { - return backupRoot; - } - - @Override - public String toString() { - return Path.SEPARATOR + backupRoot + Path.SEPARATOR + backupId + Path.SEPARATOR + walFile; - } - - } - - private TableName tableName; - /** - * Stores backup sessions (contexts) - */ - final static byte[] SESSIONS_FAMILY = "session".getBytes(); - /** - * Stores other meta - */ - final static byte[] META_FAMILY = "meta".getBytes(); - final static byte[] BULK_LOAD_FAMILY = "bulk".getBytes(); - /** - * Connection to HBase cluster, shared among all instances - */ - private final Connection connection; - - private final static String BACKUP_INFO_PREFIX = "session:"; - private final static String START_CODE_ROW = "startcode:"; - private final static byte[] ACTIVE_SESSION_ROW = "activesession:".getBytes(); - private final static byte[] ACTIVE_SESSION_COL = "c".getBytes(); - - private final static byte[] ACTIVE_SESSION_YES = "yes".getBytes(); - private final static byte[] ACTIVE_SESSION_NO = "no".getBytes(); - - private final static String INCR_BACKUP_SET = "incrbackupset:"; - private final static String TABLE_RS_LOG_MAP_PREFIX = "trslm:"; - private final static String RS_LOG_TS_PREFIX = "rslogts:"; - - private final static String BULK_LOAD_PREFIX = "bulk:"; - private final static byte[] BULK_LOAD_PREFIX_BYTES = BULK_LOAD_PREFIX.getBytes(); - private final static byte[] DELETE_OP_ROW = "delete_op_row".getBytes(); - private final static byte[] MERGE_OP_ROW = "merge_op_row".getBytes(); - - final static byte[] TBL_COL = Bytes.toBytes("tbl"); - final static byte[] FAM_COL = Bytes.toBytes("fam"); - final static byte[] PATH_COL = Bytes.toBytes("path"); - final static byte[] STATE_COL = Bytes.toBytes("state"); - // the two states a bulk loaded file can be - final static byte[] BL_PREPARE = Bytes.toBytes("R"); - final static byte[] BL_COMMIT = Bytes.toBytes("D"); - - private final static String WALS_PREFIX = "wals:"; - private final static String SET_KEY_PREFIX = "backupset:"; - - // separator between BULK_LOAD_PREFIX and ordinals - protected final static String BLK_LD_DELIM = ":"; - private final static byte[] EMPTY_VALUE = new byte[] {}; - - // Safe delimiter in a string - private final static String NULL = "\u0000"; - - public BackupSystemTable(Connection conn) throws IOException { - this.connection = conn; - tableName = BackupSystemTable.getTableName(conn.getConfiguration()); - checkSystemTable(); - } - - private void checkSystemTable() throws IOException { - try (Admin admin = connection.getAdmin();) { - - verifyNamespaceExists(admin); - - if (!admin.tableExists(tableName)) { - HTableDescriptor backupHTD = - BackupSystemTable.getSystemTableDescriptor(connection.getConfiguration()); - admin.createTable(backupHTD); - } - waitForSystemTable(admin); - } - } - - private void verifyNamespaceExists(Admin admin) throws IOException { - String namespaceName = tableName.getNamespaceAsString(); - NamespaceDescriptor ns = NamespaceDescriptor.create(namespaceName).build(); - NamespaceDescriptor[] list = admin.listNamespaceDescriptors(); - boolean exists = false; - for (NamespaceDescriptor nsd : list) { - if (nsd.getName().equals(ns.getName())) { - exists = true; - break; - } - } - if (!exists) { - admin.createNamespace(ns); - } - } - - private void waitForSystemTable(Admin admin) throws IOException { - long TIMEOUT = 60000; - long startTime = EnvironmentEdgeManager.currentTime(); - while (!admin.tableExists(tableName) || !admin.isTableAvailable(tableName)) { - try { - Thread.sleep(100); - } catch (InterruptedException e) { - } - if (EnvironmentEdgeManager.currentTime() - startTime > TIMEOUT) { - throw new IOException("Failed to create backup system table after " + TIMEOUT + "ms"); - } - } - LOG.debug("Backup table exists and available"); - - } - - @Override - public void close() { - // do nothing - } - - /** - * Updates status (state) of a backup session in backup system table table - * @param info backup info - * @throws IOException exception - */ - public void updateBackupInfo(BackupInfo info) throws IOException { - - if (LOG.isTraceEnabled()) { - LOG.trace("update backup status in backup system table for: " + info.getBackupId() - + " set status=" + info.getState()); - } - try (Table table = connection.getTable(tableName)) { - Put put = createPutForBackupInfo(info); - table.put(put); - } - } - - /* - * @param backupId the backup Id - * @return Map of rows to path of bulk loaded hfile - */ - Map<byte[], String> readBulkLoadedFiles(String backupId) throws IOException { - Scan scan = BackupSystemTable.createScanForBulkLoadedFiles(backupId); - try (Table table = connection.getTable(tableName); - ResultScanner scanner = table.getScanner(scan)) { - Result res = null; - Map<byte[], String> map = new TreeMap<>(Bytes.BYTES_COMPARATOR); - while ((res = scanner.next()) != null) { - res.advance(); - byte[] row = CellUtil.cloneRow(res.listCells().get(0)); - for (Cell cell : res.listCells()) { - if (CellComparator.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0, - BackupSystemTable.PATH_COL.length) == 0) { - map.put(row, Bytes.toString(CellUtil.cloneValue(cell))); - } - } - } - return map; - } - } - - /* - * Used during restore - * @param backupId the backup Id - * @param sTableList List of tables - * @return array of Map of family to List of Paths - */ - public Map<byte[], List<Path>>[] readBulkLoadedFiles(String backupId, List<TableName> sTableList) - throws IOException { - Scan scan = BackupSystemTable.createScanForBulkLoadedFiles(backupId); - Map<byte[], List<Path>>[] mapForSrc = new Map[sTableList == null ? 1 : sTableList.size()]; - try (Table table = connection.getTable(tableName); - ResultScanner scanner = table.getScanner(scan)) { - Result res = null; - while ((res = scanner.next()) != null) { - res.advance(); - TableName tbl = null; - byte[] fam = null; - String path = null; - for (Cell cell : res.listCells()) { - if (CellComparator.compareQualifiers(cell, BackupSystemTable.TBL_COL, 0, - BackupSystemTable.TBL_COL.length) == 0) { - tbl = TableName.valueOf(CellUtil.cloneValue(cell)); - } else if (CellComparator.compareQualifiers(cell, BackupSystemTable.FAM_COL, 0, - BackupSystemTable.FAM_COL.length) == 0) { - fam = CellUtil.cloneValue(cell); - } else if (CellComparator.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0, - BackupSystemTable.PATH_COL.length) == 0) { - path = Bytes.toString(CellUtil.cloneValue(cell)); - } - } - int srcIdx = IncrementalTableBackupClient.getIndex(tbl, sTableList); - if (srcIdx == -1) { - // the table is not among the query - continue; - } - if (mapForSrc[srcIdx] == null) { - mapForSrc[srcIdx] = new TreeMap<>(Bytes.BYTES_COMPARATOR); - } - List<Path> files; - if (!mapForSrc[srcIdx].containsKey(fam)) { - files = new ArrayList<Path>(); - mapForSrc[srcIdx].put(fam, files); - } else { - files = mapForSrc[srcIdx].get(fam); - } - files.add(new Path(path)); - if (LOG.isDebugEnabled()) { - LOG.debug("found bulk loaded file : " + tbl + " " + Bytes.toString(fam) + " " + path); - } - } - ; - return mapForSrc; - } - } - - /* - * @param map Map of row keys to path of bulk loaded hfile - */ - void deleteBulkLoadedFiles(Map<byte[], String> map) throws IOException { - try (Table table = connection.getTable(tableName)) { - List<Delete> dels = new ArrayList<>(); - for (byte[] row : map.keySet()) { - dels.add(new Delete(row).addFamily(BackupSystemTable.META_FAMILY)); - } - table.delete(dels); - } - } - - /** - * Deletes backup status from backup system table table - * @param backupId backup id - * @throws IOException exception - */ - - public void deleteBackupInfo(String backupId) throws IOException { - - if (LOG.isTraceEnabled()) { - LOG.trace("delete backup status in backup system table for " + backupId); - } - try (Table table = connection.getTable(tableName)) { - Delete del = createDeleteForBackupInfo(backupId); - table.delete(del); - } - } - - /* - * For postBulkLoadHFile() hook. - * @param tabName table name - * @param region the region receiving hfile - * @param finalPaths family and associated hfiles - */ - public void writePathsPostBulkLoad(TableName tabName, byte[] region, - Map<byte[], List<Path>> finalPaths) throws IOException { - if (LOG.isDebugEnabled()) { - LOG.debug("write bulk load descriptor to backup " + tabName + " with " + finalPaths.size() - + " entries"); - } - try (Table table = connection.getTable(tableName)) { - List<Put> puts = BackupSystemTable.createPutForCommittedBulkload(tabName, region, finalPaths); - table.put(puts); - LOG.debug("written " + puts.size() + " rows for bulk load of " + tabName); - } - } - - /* - * For preCommitStoreFile() hook - * @param tabName table name - * @param region the region receiving hfile - * @param family column family - * @param pairs list of paths for hfiles - */ - public void writeFilesForBulkLoadPreCommit(TableName tabName, byte[] region, final byte[] family, - final List<Pair<Path, Path>> pairs) throws IOException { - if (LOG.isDebugEnabled()) { - LOG.debug("write bulk load descriptor to backup " + tabName + " with " + pairs.size() - + " entries"); - } - try (Table table = connection.getTable(tableName)) { - List<Put> puts = - BackupSystemTable.createPutForPreparedBulkload(tabName, region, family, pairs); - table.put(puts); - LOG.debug("written " + puts.size() + " rows for bulk load of " + tabName); - } - } - - /* - * Removes rows recording bulk loaded hfiles from backup table - * @param lst list of table names - * @param rows the rows to be deleted - */ - public void removeBulkLoadedRows(List<TableName> lst, List<byte[]> rows) throws IOException { - try (Table table = connection.getTable(tableName)) { - List<Delete> lstDels = new ArrayList<>(); - for (byte[] row : rows) { - Delete del = new Delete(row); - lstDels.add(del); - LOG.debug("orig deleting the row: " + Bytes.toString(row)); - } - table.delete(lstDels); - LOG.debug("deleted " + rows.size() + " original bulkload rows for " + lst.size() + " tables"); - } - } - - /* - * Reads the rows from backup table recording bulk loaded hfiles - * @param tableList list of table names - * @return The keys of the Map are table, region and column family. Value of the map reflects - * whether the hfile was recorded by preCommitStoreFile hook (true) - */ - public Pair<Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>>, List<byte[]>> - readBulkloadRows(List<TableName> tableList) throws IOException { - Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>> map = new HashMap<>(); - List<byte[]> rows = new ArrayList<>(); - for (TableName tTable : tableList) { - Scan scan = BackupSystemTable.createScanForOrigBulkLoadedFiles(tTable); - Map<String, Map<String, List<Pair<String, Boolean>>>> tblMap = map.get(tTable); - try (Table table = connection.getTable(tableName); - ResultScanner scanner = table.getScanner(scan)) { - Result res = null; - while ((res = scanner.next()) != null) { - res.advance(); - String fam = null; - String path = null; - boolean raw = false; - byte[] row = null; - String region = null; - for (Cell cell : res.listCells()) { - row = CellUtil.cloneRow(cell); - rows.add(row); - String rowStr = Bytes.toString(row); - region = BackupSystemTable.getRegionNameFromOrigBulkLoadRow(rowStr); - if (CellComparator.compareQualifiers(cell, BackupSystemTable.FAM_COL, 0, - BackupSystemTable.FAM_COL.length) == 0) { - fam = Bytes.toString(CellUtil.cloneValue(cell)); - } else if (CellComparator.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0, - BackupSystemTable.PATH_COL.length) == 0) { - path = Bytes.toString(CellUtil.cloneValue(cell)); - } else if (CellComparator.compareQualifiers(cell, BackupSystemTable.STATE_COL, 0, - BackupSystemTable.STATE_COL.length) == 0) { - byte[] state = CellUtil.cloneValue(cell); - if (Bytes.equals(BackupSystemTable.BL_PREPARE, state)) { - raw = true; - } else raw = false; - } - } - if (map.get(tTable) == null) { - map.put(tTable, new HashMap<String, Map<String, List<Pair<String, Boolean>>>>()); - tblMap = map.get(tTable); - } - if (tblMap.get(region) == null) { - tblMap.put(region, new HashMap<String, List<Pair<String, Boolean>>>()); - } - Map<String, List<Pair<String, Boolean>>> famMap = tblMap.get(region); - if (famMap.get(fam) == null) { - famMap.put(fam, new ArrayList<Pair<String, Boolean>>()); - } - famMap.get(fam).add(new Pair<>(path, raw)); - LOG.debug("found orig " + path + " for " + fam + " of table " + region); - } - } - } - return new Pair<>(map, rows); - } - - /* - * @param sTableList List of tables - * @param maps array of Map of family to List of Paths - * @param backupId the backup Id - */ - public void writeBulkLoadedFiles(List<TableName> sTableList, Map<byte[], List<Path>>[] maps, - String backupId) throws IOException { - try (Table table = connection.getTable(tableName)) { - long ts = EnvironmentEdgeManager.currentTime(); - int cnt = 0; - List<Put> puts = new ArrayList<>(); - for (int idx = 0; idx < maps.length; idx++) { - Map<byte[], List<Path>> map = maps[idx]; - TableName tn = sTableList.get(idx); - if (map == null) continue; - for (Map.Entry<byte[], List<Path>> entry : map.entrySet()) { - byte[] fam = entry.getKey(); - List<Path> paths = entry.getValue(); - for (Path p : paths) { - Put put = - BackupSystemTable.createPutForBulkLoadedFile(tn, fam, p.toString(), backupId, ts, - cnt++); - puts.add(put); - } - } - } - if (!puts.isEmpty()) { - table.put(puts); - } - } - } - - /** - * Reads backup status object (instance of backup info) from backup system table table - * @param backupId backup id - * @return Current status of backup session or null - */ - - public BackupInfo readBackupInfo(String backupId) throws IOException { - if (LOG.isTraceEnabled()) { - LOG.trace("read backup status from backup system table for: " + backupId); - } - - try (Table table = connection.getTable(tableName)) { - Get get = createGetForBackupInfo(backupId); - Result res = table.get(get); - if (res.isEmpty()) { - return null; - } - return resultToBackupInfo(res); - } - } - - /** - * Read the last backup start code (timestamp) of last successful backup. Will return null if - * there is no start code stored on hbase or the value is of length 0. These two cases indicate - * there is no successful backup completed so far. - * @param backupRoot directory path to backup destination - * @return the timestamp of last successful backup - * @throws IOException exception - */ - public String readBackupStartCode(String backupRoot) throws IOException { - if (LOG.isTraceEnabled()) { - LOG.trace("read backup start code from backup system table"); - } - try (Table table = connection.getTable(tableName)) { - Get get = createGetForStartCode(backupRoot); - Result res = table.get(get); - if (res.isEmpty()) { - return null; - } - Cell cell = res.listCells().get(0); - byte[] val = CellUtil.cloneValue(cell); - if (val.length == 0) { - return null; - } - return new String(val); - } - } - - /** - * Write the start code (timestamp) to backup system table. If passed in null, then write 0 byte. - * @param startCode start code - * @param backupRoot root directory path to backup - * @throws IOException exception - */ - public void writeBackupStartCode(Long startCode, String backupRoot) throws IOException { - if (LOG.isTraceEnabled()) { - LOG.trace("write backup start code to backup system table " + startCode); - } - try (Table table = connection.getTable(tableName)) { - Put put = createPutForStartCode(startCode.toString(), backupRoot); - table.put(put); - } - } - - /** - * Exclusive operations are: - * create, delete, merge - * @throws IOException - */ - public void startBackupExclusiveOperation() throws IOException { - if (LOG.isDebugEnabled()) { - LOG.debug("Start new backup exclusive operation"); - } - try (Table table = connection.getTable(tableName)) { - Put put = createPutForStartBackupSession(); - // First try to put if row does not exist - if (!table.checkAndPut(ACTIVE_SESSION_ROW, SESSIONS_FAMILY, ACTIVE_SESSION_COL, null, put)) { - // Row exists, try to put if value == ACTIVE_SESSION_NO - if (!table.checkAndPut(ACTIVE_SESSION_ROW, SESSIONS_FAMILY, ACTIVE_SESSION_COL, - ACTIVE_SESSION_NO, put)) { - throw new IOException("There is an active backup exclusive operation"); - } - } - } - } - - private Put createPutForStartBackupSession() { - Put put = new Put(ACTIVE_SESSION_ROW); - put.addColumn(SESSIONS_FAMILY, ACTIVE_SESSION_COL, ACTIVE_SESSION_YES); - return put; - } - - public void finishBackupExclusiveOperation() throws IOException { - if (LOG.isDebugEnabled()) { - LOG.debug("Finish backup exclusive operation"); - } - try (Table table = connection.getTable(tableName)) { - Put put = createPutForStopBackupSession(); - if (!table.checkAndPut(ACTIVE_SESSION_ROW, SESSIONS_FAMILY, ACTIVE_SESSION_COL, - ACTIVE_SESSION_YES, put)) { - throw new IOException("There is no active backup exclusive operation"); - } - } - } - - private Put createPutForStopBackupSession() { - Put put = new Put(ACTIVE_SESSION_ROW); - put.addColumn(SESSIONS_FAMILY, ACTIVE_SESSION_COL, ACTIVE_SESSION_NO); - return put; - } - - /** - * Get the Region Servers log information after the last log roll from backup system table. - * @param backupRoot root directory path to backup - * @return RS log info - * @throws IOException exception - */ - public HashMap<String, Long> readRegionServerLastLogRollResult(String backupRoot) - throws IOException { - if (LOG.isTraceEnabled()) { - LOG.trace("read region server last roll log result to backup system table"); - } - - Scan scan = createScanForReadRegionServerLastLogRollResult(backupRoot); - - try (Table table = connection.getTable(tableName); - ResultScanner scanner = table.getScanner(scan)) { - Result res = null; - HashMap<String, Long> rsTimestampMap = new HashMap<String, Long>(); - while ((res = scanner.next()) != null) { - res.advance(); - Cell cell = res.current(); - byte[] row = CellUtil.cloneRow(cell); - String server = getServerNameForReadRegionServerLastLogRollResult(row); - byte[] data = CellUtil.cloneValue(cell); - rsTimestampMap.put(server, Bytes.toLong(data)); - } - return rsTimestampMap; - } - } - - /** - * Writes Region Server last roll log result (timestamp) to backup system table table - * @param server Region Server name - * @param ts last log timestamp - * @param backupRoot root directory path to backup - * @throws IOException exception - */ - public void writeRegionServerLastLogRollResult(String server, Long ts, String backupRoot) - throws IOException { - if (LOG.isTraceEnabled()) { - LOG.trace("write region server last roll log result to backup system table"); - } - try (Table table = connection.getTable(tableName)) { - Put put = createPutForRegionServerLastLogRollResult(server, ts, backupRoot); - table.put(put); - } - } - - /** - * Get all completed backup information (in desc order by time) - * @param onlyCompleted true, if only successfully completed sessions - * @return history info of BackupCompleteData - * @throws IOException exception - */ - public ArrayList<BackupInfo> getBackupHistory(boolean onlyCompleted) throws IOException { - if (LOG.isTraceEnabled()) { - LOG.trace("get backup history from backup system table"); - } - ArrayList<BackupInfo> list; - BackupState state = onlyCompleted ? BackupState.COMPLETE : BackupState.ANY; - list = getBackupInfos(state); - return BackupUtils.sortHistoryListDesc(list); - } - - /** - * Get all backups history - * @return list of backup info - * @throws IOException - */ - public List<BackupInfo> getBackupHistory() throws IOException { - return getBackupHistory(false); - } - - /** - * Get first n backup history records - * @param n number of records, if n== -1 - max number - * is ignored - * @return list of records - * @throws IOException - */ - public List<BackupInfo> getHistory(int n) throws IOException { - - List<BackupInfo> history = getBackupHistory(); - if (n == -1 || history.size() <= n) return history; - List<BackupInfo> list = new ArrayList<BackupInfo>(); - for (int i = 0; i < n; i++) { - list.add(history.get(i)); - } - return list; - - } - - /** - * Get backup history records filtered by list of filters. - * @param n max number of records, if n == -1 , then max number - * is ignored - * @param filters list of filters - * @return backup records - * @throws IOException - */ - public List<BackupInfo> getBackupHistory(int n, BackupInfo.Filter... filters) throws IOException { - if (filters.length == 0) return getHistory(n); - - List<BackupInfo> history = getBackupHistory(); - List<BackupInfo> result = new ArrayList<BackupInfo>(); - for (BackupInfo bi : history) { - if (n >= 0 && result.size() == n) break; - boolean passed = true; - for (int i = 0; i < filters.length; i++) { - if (!filters[i].apply(bi)) { - passed = false; - break; - } - } - if (passed) { - result.add(bi); - } - } - return result; - - } - - /* - * Retrieve TableName's for completed backup of given type - * @param type backup type - * @return List of table names - */ - public List<TableName> getTablesForBackupType(BackupType type) throws IOException { - Set<TableName> names = new HashSet<>(); - List<BackupInfo> infos = getBackupHistory(true); - for (BackupInfo info : infos) { - if (info.getType() != type) continue; - names.addAll(info.getTableNames()); - } - return new ArrayList(names); - } - - /** - * Get history for backup destination - * @param backupRoot backup destination path - * @return List of backup info - * @throws IOException - */ - public List<BackupInfo> getBackupHistory(String backupRoot) throws IOException { - ArrayList<BackupInfo> history = getBackupHistory(false); - for (Iterator<BackupInfo> iterator = history.iterator(); iterator.hasNext();) { - BackupInfo info = iterator.next(); - if (!backupRoot.equals(info.getBackupRootDir())) { - iterator.remove(); - } - } - return history; - } - - /** - * Get history for a table - * @param name table name - * @return history for a table - * @throws IOException - */ - public List<BackupInfo> getBackupHistoryForTable(TableName name) throws IOException { - List<BackupInfo> history = getBackupHistory(); - List<BackupInfo> tableHistory = new ArrayList<BackupInfo>(); - for (BackupInfo info : history) { - List<TableName> tables = info.getTableNames(); - if (tables.contains(name)) { - tableHistory.add(info); - } - } - return tableHistory; - } - - public Map<TableName, ArrayList<BackupInfo>> getBackupHistoryForTableSet(Set<TableName> set, - String backupRoot) throws IOException { - List<BackupInfo> history = getBackupHistory(backupRoot); - Map<TableName, ArrayList<BackupInfo>> tableHistoryMap = - new HashMap<TableName, ArrayList<BackupInfo>>(); - for (Iterator<BackupInfo> iterator = history.iterator(); iterator.hasNext();) { - BackupInfo info = iterator.next(); - if (!backupRoot.equals(info.getBackupRootDir())) { - continue; - } - List<TableName> tables = info.getTableNames(); - for (TableName tableName : tables) { - if (set.contains(tableName)) { - ArrayList<BackupInfo> list = tableHistoryMap.get(tableName); - if (list == null) { - list = new ArrayList<BackupInfo>(); - tableHistoryMap.put(tableName, list); - } - list.add(info); - } - } - } - return tableHistoryMap; - } - - /** - * Get all backup sessions with a given state (in descending order by time) - * @param state backup session state - * @return history info of backup info objects - * @throws IOException exception - */ - public ArrayList<BackupInfo> getBackupInfos(BackupState state) throws IOException { - if (LOG.isTraceEnabled()) { - LOG.trace("get backup infos from backup system table"); - } - - Scan scan = createScanForBackupHistory(); - ArrayList<BackupInfo> list = new ArrayList<BackupInfo>(); - - try (Table table = connection.getTable(tableName); - ResultScanner scanner = table.getScanner(scan)) { - Result res = null; - while ((res = scanner.next()) != null) { - res.advance(); - BackupInfo context = cellToBackupInfo(res.current()); - if (state != BackupState.ANY && context.getState() != state) { - continue; - } - list.add(context); - } - return list; - } - } - - /** - * Write the current timestamps for each regionserver to backup system table after a successful - * full or incremental backup. The saved timestamp is of the last log file that was backed up - * already. - * @param tables tables - * @param newTimestamps timestamps - * @param backupRoot root directory path to backup - * @throws IOException exception - */ - public void writeRegionServerLogTimestamp(Set<TableName> tables, - HashMap<String, Long> newTimestamps, String backupRoot) throws IOException { - if (LOG.isTraceEnabled()) { - LOG.trace("write RS log time stamps to backup system table for tables [" - + StringUtils.join(tables, ",") + "]"); - } - List<Put> puts = new ArrayList<Put>(); - for (TableName table : tables) { - byte[] smapData = toTableServerTimestampProto(table, newTimestamps).toByteArray(); - Put put = createPutForWriteRegionServerLogTimestamp(table, smapData, backupRoot); - puts.add(put); - } - try (Table table = connection.getTable(tableName)) { - table.put(puts); - } - } - - /** - * Read the timestamp for each region server log after the last successful backup. Each table has - * its own set of the timestamps. The info is stored for each table as a concatenated string of - * rs->timestapmp - * @param backupRoot root directory path to backup - * @return the timestamp for each region server. key: tableName value: - * RegionServer,PreviousTimeStamp - * @throws IOException exception - */ - public HashMap<TableName, HashMap<String, Long>> readLogTimestampMap(String backupRoot) - throws IOException { - if (LOG.isTraceEnabled()) { - LOG.trace("read RS log ts from backup system table for root=" + backupRoot); - } - - HashMap<TableName, HashMap<String, Long>> tableTimestampMap = - new HashMap<TableName, HashMap<String, Long>>(); - - Scan scan = createScanForReadLogTimestampMap(backupRoot); - try (Table table = connection.getTable(tableName); - ResultScanner scanner = table.getScanner(scan)) { - Result res = null; - while ((res = scanner.next()) != null) { - res.advance(); - Cell cell = res.current(); - byte[] row = CellUtil.cloneRow(cell); - String tabName = getTableNameForReadLogTimestampMap(row); - TableName tn = TableName.valueOf(tabName); - byte[] data = CellUtil.cloneValue(cell); - if (data == null) { - throw new IOException("Data of last backup data from backup system table " - + "is empty. Create a backup first."); - } - if (data != null && data.length > 0) { - HashMap<String, Long> lastBackup = - fromTableServerTimestampProto(BackupProtos.TableServerTimestamp.parseFrom(data)); - tableTimestampMap.put(tn, lastBackup); - } - } - return tableTimestampMap; - } - } - - private BackupProtos.TableServerTimestamp toTableServerTimestampProto(TableName table, - Map<String, Long> map) { - BackupProtos.TableServerTimestamp.Builder tstBuilder = - BackupProtos.TableServerTimestamp.newBuilder(); - tstBuilder.setTableName(org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil - .toProtoTableName(table)); - - for (Entry<String, Long> entry : map.entrySet()) { - BackupProtos.ServerTimestamp.Builder builder = BackupProtos.ServerTimestamp.newBuilder(); - HBaseProtos.ServerName.Builder snBuilder = HBaseProtos.ServerName.newBuilder(); - ServerName sn = ServerName.parseServerName(entry.getKey()); - snBuilder.setHostName(sn.getHostname()); - snBuilder.setPort(sn.getPort()); - builder.setServerName(snBuilder.build()); - builder.setTimestamp(entry.getValue()); - tstBuilder.addServerTimestamp(builder.build()); - } - - return tstBuilder.build(); - } - - private HashMap<String, Long> fromTableServerTimestampProto( - BackupProtos.TableServerTimestamp proto) { - HashMap<String, Long> map = new HashMap<String, Long>(); - List<BackupProtos.ServerTimestamp> list = proto.getServerTimestampList(); - for (BackupProtos.ServerTimestamp st : list) { - ServerName sn = - org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.toServerName(st.getServerName()); - map.put(sn.getHostname() + ":" + sn.getPort(), st.getTimestamp()); - } - return map; - } - - /** - * Return the current tables covered by incremental backup. - * @param backupRoot root directory path to backup - * @return set of tableNames - * @throws IOException exception - */ - public Set<TableName> getIncrementalBackupTableSet(String backupRoot) throws IOException { - if (LOG.isTraceEnabled()) { - LOG.trace("get incremental backup table set from backup system table"); - } - TreeSet<TableName> set = new TreeSet<>(); - - try (Table table = connection.getTable(tableName)) { - Get get = createGetForIncrBackupTableSet(backupRoot); - Result res = table.get(get); - if (res.isEmpty()) { - return set; - } - List<Cell> cells = res.listCells(); - for (Cell cell : cells) { - // qualifier = table name - we use table names as qualifiers - set.add(TableName.valueOf(CellUtil.cloneQualifier(cell))); - } - return set; - } - } - - /** - * Add tables to global incremental backup set - * @param tables set of tables - * @param backupRoot root directory path to backup - * @throws IOException exception - */ - public void addIncrementalBackupTableSet(Set<TableName> tables, String backupRoot) - throws IOException { - if (LOG.isTraceEnabled()) { - LOG.trace("Add incremental backup table set to backup system table. ROOT=" + backupRoot - + " tables [" + StringUtils.join(tables, " ") + "]"); - for (TableName table : tables) { - LOG.debug(table); - } - } - try (Table table = connection.getTable(tableName)) { - Put put = createPutForIncrBackupTableSet(tables, backupRoot); - table.put(put); - } - } - - /** - * Deletes incremental backup set for a backup destination - * @param backupRoot backup root - */ - - public void deleteIncrementalBackupTableSet(String backupRoot) throws IOException { - if (LOG.isTraceEnabled()) { - LOG.trace("Delete incremental backup table set to backup system table. ROOT=" + backupRoot); - } - try (Table table = connection.getTable(tableName)) { - Delete delete = createDeleteForIncrBackupTableSet(backupRoot); - table.delete(delete); - } - } - - /** - * Register WAL files as eligible for deletion - * @param files files - * @param backupId backup id - * @param backupRoot root directory path to backup destination - * @throws IOException exception - */ - public void addWALFiles(List<String> files, String backupId, String backupRoot) - throws IOException { - if (LOG.isTraceEnabled()) { - LOG.trace("add WAL files to backup system table: " + backupId + " " + backupRoot + " files [" - + StringUtils.join(files, ",") + "]"); - for (String f : files) { - LOG.debug("add :" + f); - } - } - try (Table table = connection.getTable(tableName)) { - List<Put> puts = createPutsForAddWALFiles(files, backupId, backupRoot); - table.put(puts); - } - } - - /** - * Register WAL files as eligible for deletion - * @param backupRoot root directory path to backup - * @throws IOException exception - */ - public Iterator<WALItem> getWALFilesIterator(String backupRoot) throws IOException { - if (LOG.isTraceEnabled()) { - LOG.trace("get WAL files from backup system table"); - } - final Table table = connection.getTable(tableName); - Scan scan = createScanForGetWALs(backupRoot); - final ResultScanner scanner = table.getScanner(scan); - final Iterator<Result> it = scanner.iterator(); - return new Iterator<WALItem>() { - - @Override - public boolean hasNext() { - boolean next = it.hasNext(); - if (!next) { - // close all - try { - scanner.close(); - table.close(); - } catch (IOException e) { - LOG.error("Close WAL Iterator", e); - } - } - return next; - } - - @Override - public WALItem next() { - Result next = it.next(); - List<Cell> cells = next.listCells(); - byte[] buf = cells.get(0).getValueArray(); - int len = cells.get(0).getValueLength(); - int offset = cells.get(0).getValueOffset(); - String backupId = new String(buf, offset, len); - buf = cells.get(1).getValueArray(); - len = cells.get(1).getValueLength(); - offset = cells.get(1).getValueOffset(); - String walFile = new String(buf, offset, len); - buf = cells.get(2).getValueArray(); - len = cells.get(2).getValueLength(); - offset = cells.get(2).getValueOffset(); - String backupRoot = new String(buf, offset, len); - return new WALItem(backupId, walFile, backupRoot); - } - - @Override - public void remove() { - // not implemented - throw new RuntimeException("remove is not supported"); - } - }; - - } - - /** - * Check if WAL file is eligible for deletion Future: to support all backup destinations - * @param file name of a file to check - * @return true, if deletable, false otherwise. - * @throws IOException exception - * TODO: multiple backup destination support - */ - public boolean isWALFileDeletable(String file) throws IOException { - if (LOG.isTraceEnabled()) { - LOG.trace("Check if WAL file has been already backed up in backup system table " + file); - } - try (Table table = connection.getTable(tableName)) { - Get get = createGetForCheckWALFile(file); - Result res = table.get(get); - if (res.isEmpty()) { - return false; - } - return true; - } - } - - /** - * Checks if we have at least one backup session in backup system table This API is used by - * BackupLogCleaner - * @return true, if - at least one session exists in backup system table table - * @throws IOException exception - */ - public boolean hasBackupSessions() throws IOException { - if (LOG.isTraceEnabled()) { - LOG.trace("Has backup sessions from backup system table"); - } - boolean result = false; - Scan scan = createScanForBackupHistory(); - scan.setCaching(1); - try (Table table = connection.getTable(tableName); - ResultScanner scanner = table.getScanner(scan)) { - if (scanner.next() != null) { - result = true; - } - return result; - } - } - - /** - * BACKUP SETS - */ - - /** - * Get backup set list - * @return backup set list - * @throws IOException - */ - public List<String> listBackupSets() throws IOException { - if (LOG.isTraceEnabled()) { - LOG.trace(" Backup set list"); - } - List<String> list = new ArrayList<String>(); - Table table = null; - ResultScanner scanner = null; - try { - table = connection.getTable(tableName); - Scan scan = createScanForBackupSetList(); - scan.setMaxVersions(1); - scanner = table.getScanner(scan); - Result res = null; - while ((res = scanner.next()) != null) { - res.advance(); - list.add(cellKeyToBackupSetName(res.current())); - } - return list; - } finally { - if (scanner != null) { - scanner.close(); - } - if (table != null) { - table.close(); - } - } - } - - /** - * Get backup set description (list of tables) - * @param name set's name - * @return list of tables in a backup set - * @throws IOException - */ - public List<TableName> describeBackupSet(String name) throws IOException { - if (LOG.isTraceEnabled()) { - LOG.trace(" Backup set describe: " + name); - } - Table table = null; - try { - table = connection.getTable(tableName); - Get get = createGetForBackupSet(name); - Result res = table.get(get); - if (res.isEmpty()) return null; - res.advance(); - String[] tables = cellValueToBackupSet(res.current()); - return toList(tables); - } finally { - if (table != null) { - table.close(); - } - } - } - - private List<TableName> toList(String[] tables) { - List<TableName> list = new ArrayList<TableName>(tables.length); - for (String name : tables) { - list.add(TableName.valueOf(name)); - } - return list; - } - - /** - * Add backup set (list of tables) - * @param name set name - * @param newTables list of tables, comma-separated - * @throws IOException - */ - public void addToBackupSet(String name, String[] newTables) throws IOException { - if (LOG.isTraceEnabled()) { - LOG.trace("Backup set add: " + name + " tables [" + StringUtils.join(newTables, " ") + "]"); - } - Table table = null; - String[] union = null; - try { - table = connection.getTable(tableName); - Get get = createGetForBackupSet(name); - Result res = table.get(get); - if (res.isEmpty()) { - union = newTables; - } else { - res.advance(); - String[] tables = cellValueToBackupSet(res.current()); - union = merge(tables, newTables); - } - Put put = createPutForBackupSet(name, union); - table.put(put); - } finally { - if (table != null) { - table.close(); - } - } - } - - private String[] merge(String[] tables, String[] newTables) { - List<String> list = new ArrayList<String>(); - // Add all from tables - for (String t : tables) { - list.add(t); - } - for (String nt : newTables) { - if (list.contains(nt)) continue; - list.add(nt); - } - String[] arr = new String[list.size()]; - list.toArray(arr); - return arr; - } - - /** - * Remove tables from backup set (list of tables) - * @param name set name - * @param toRemove list of tables - * @throws IOException - */ - public void removeFromBackupSet(String name, String[] toRemove) throws IOException { - if (LOG.isTraceEnabled()) { - LOG.trace(" Backup set remove from : " + name + " tables [" + StringUtils.join(toRemove, " ") - + "]"); - } - Table table = null; - String[] disjoint = null; - String[] tables = null; - try { - table = connection.getTable(tableName); - Get get = createGetForBackupSet(name); - Result res = table.get(get); - if (res.isEmpty()) { - LOG.warn("Backup set '" + name + "' not found."); - return; - } else { - res.advance(); - tables = cellValueToBackupSet(res.current()); - disjoint = disjoin(tables, toRemove); - } - if (disjoint.length > 0 && disjoint.length != tables.length) { - Put put = createPutForBackupSet(name, disjoint); - table.put(put); - } else if (disjoint.length == tables.length) { - LOG.warn("Backup set '" + name + "' does not contain tables [" - + StringUtils.join(toRemove, " ") + "]"); - } else { // disjoint.length == 0 and tables.length >0 - // Delete backup set - LOG.info("Backup set '" + name + "' is empty. Deleting."); - deleteBackupSet(name); - } - } finally { - if (table != null) { - table.close(); - } - } - } - - private String[] disjoin(String[] tables, String[] toRemove) { - List<String> list = new ArrayList<String>(); - // Add all from tables - for (String t : tables) { - list.add(t); - } - for (String nt : toRemove) { - if (list.contains(nt)) { - list.remove(nt); - } - } - String[] arr = new String[list.size()]; - list.toArray(arr); - return arr; - } - - /** - * Delete backup set - * @param name set's name - * @throws IOException - */ - public void deleteBackupSet(String name) throws IOException { - if (LOG.isTraceEnabled()) { - LOG.trace(" Backup set delete: " + name); - } - Table table = null; - try { - table = connection.getTable(tableName); - Delete del = createDeleteForBackupSet(name); - table.delete(del); - } finally { - if (table != null) { - table.close(); - } - } - } - - /** - * Get backup system table descriptor - * @return table's descriptor - */ - public static HTableDescriptor getSystemTableDescriptor(Configuration conf) { - - HTableDescriptor tableDesc = new HTableDescriptor(getTableName(conf)); - HColumnDescriptor colSessionsDesc = new HColumnDescriptor(SESSIONS_FAMILY); - colSessionsDesc.setMaxVersions(1); - // Time to keep backup sessions (secs) - Configuration config = HBaseConfiguration.create(); - int ttl = - config.getInt(BackupRestoreConstants.BACKUP_SYSTEM_TTL_KEY, - BackupRestoreConstants.BACKUP_SYSTEM_TTL_DEFAULT); - colSessionsDesc.setTimeToLive(ttl); - tableDesc.addFamily(colSessionsDesc); - HColumnDescriptor colMetaDesc = new HColumnDescriptor(META_FAMILY); - tableDesc.addFamily(colMetaDesc); - return tableDesc; - } - - public static TableName getTableName(Configuration conf) { - String name = - conf.get(BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_KEY, - BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_DEFAULT); - return TableName.valueOf(name); - } - - public static String getTableNameAsString(Configuration conf) { - return getTableName(conf).getNameAsString(); - } - - public static String getSnapshotName(Configuration conf) { - return "snapshot_" + getTableNameAsString(conf).replace(":", "_"); - } - - /** - * Creates Put operation for a given backup info object - * @param context backup info - * @return put operation - * @throws IOException exception - */ - private Put createPutForBackupInfo(BackupInfo context) throws IOException { - Put put = new Put(rowkey(BACKUP_INFO_PREFIX, context.getBackupId())); - put.addColumn(BackupSystemTable.SESSIONS_FAMILY, Bytes.toBytes("context"), - context.toByteArray()); - return put; - } - - /** - * Creates Get operation for a given backup id - * @param backupId backup's ID - * @return get operation - * @throws IOException exception - */ - private Get createGetForBackupInfo(String backupId) throws IOException { - Get get = new Get(rowkey(BACKUP_INFO_PREFIX, backupId)); - get.addFamily(BackupSystemTable.SESSIONS_FAMILY); - get.setMaxVersions(1); - return get; - } - - /** - * Creates Delete operation for a given backup id - * @param backupId backup's ID - * @return delete operation - * @throws IOException exception - */ - private Delete createDeleteForBackupInfo(String backupId) { - Delete del = new Delete(rowkey(BACKUP_INFO_PREFIX, backupId)); - del.addFamily(BackupSystemTable.SESSIONS_FAMILY); - return del; - } - - /** - * Converts Result to BackupInfo - * @param res HBase result - * @return backup info instance - * @throws IOException exception - */ - private BackupInfo resultToBackupInfo(Result res) throws IOException { - res.advance(); - Cell cell = res.current(); - return cellToBackupInfo(cell); - } - - /** - * Creates Get operation to retrieve start code from backup system table - * @return get operation - * @throws IOException exception - */ - private Get createGetForStartCode(String rootPath) throws IOException { - Get get = new Get(rowkey(START_CODE_ROW, rootPath)); - get.addFamily(BackupSystemTable.META_FAMILY); - get.setMaxVersions(1); - return get; - } - - /** - * Creates Put operation to store start code to backup system table - * @return put operation - * @throws IOException exception - */ - private Put createPutForStartCode(String startCode, String rootPath) { - Put put = new Put(rowkey(START_CODE_ROW, rootPath)); - put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("startcode"), - Bytes.toBytes(startCode)); - return put; - } - - /** - * Creates Get to retrieve incremental backup table set from backup system table - * @return get operation - * @throws IOException exception - */ - private Get createGetForIncrBackupTableSet(String backupRoot) throws IOException { - Get get = new Get(rowkey(INCR_BACKUP_SET, backupRoot)); - get.addFamily(BackupSystemTable.META_FAMILY); - get.setMaxVersions(1); - return get; - } - - /** - * Creates Put to store incremental backup table set - * @param tables tables - * @return put operation - */ - private Put createPutForIncrBackupTableSet(Set<TableName> tables, String backupRoot) { - Put put = new Put(rowkey(INCR_BACKUP_SET, backupRoot)); - for (TableName table : tables) { - put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes(table.getNameAsString()), - EMPTY_VALUE); - } - return put; - } - - /** - * Creates Delete for incremental backup table set - * @param backupRoot backup root - * @return delete operation - */ - private Delete createDeleteForIncrBackupTableSet(String backupRoot) { - Delete delete = new Delete(rowkey(INCR_BACKUP_SET, backupRoot)); - delete.addFamily(BackupSystemTable.META_FAMILY); - return delete; - } - - /** - * Creates Scan operation to load backup history - * @return scan operation - */ - private Scan createScanForBackupHistory() { - Scan scan = new Scan(); - byte[] startRow = Bytes.toBytes(BACKUP_INFO_PREFIX); - byte[] stopRow = Arrays.copyOf(startRow, startRow.length); - stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1); - scan.setStartRow(startRow); - scan.setStopRow(stopRow); - scan.addFamily(BackupSystemTable.SESSIONS_FAMILY); - scan.setMaxVersions(1); - return scan; - } - - /** - * Converts cell to backup info instance. - * @param current current cell - * @return backup backup info instance - * @throws IOException exception - */ - private BackupInfo cellToBackupInfo(Cell current) throws IOException { - byte[] data = CellUtil.cloneValue(current); - return BackupInfo.fromByteArray(data); - } - - /** - * Creates Put to write RS last roll log timestamp map - * @param table table - * @param smap map, containing RS:ts - * @return put operation - */ - private Put createPutForWriteRegionServerLogTimestamp(TableName table, byte[] smap, - String backupRoot) { - Put put = new Put(rowkey(TABLE_RS_LOG_MAP_PREFIX, backupRoot, NULL, table.getNameAsString())); - put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("log-roll-map"), smap); - return put; - } - - /** - * Creates Scan to load table-> { RS -> ts} map of maps - * @return scan operation - */ - private Scan createScanForReadLogTimestampMap(String backupRoot) { - Scan scan = new Scan(); - byte[] startRow = rowkey(TABLE_RS_LOG_MAP_PREFIX, backupRoot); - byte[] stopRow = Arrays.copyOf(startRow, startRow.length); - stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1); - scan.setStartRow(startRow); - scan.setStopRow(stopRow); - scan.addFamily(BackupSystemTable.META_FAMILY); - - return scan; - } - - /** - * Get table name from rowkey - * @param cloneRow rowkey - * @return table name - */ - private String getTableNameForReadLogTimestampMap(byte[] cloneRow) { - String s = Bytes.toString(cloneRow); - int index = s.lastIndexOf(NULL); - return s.substring(index + 1); - } - - /** - * Creates Put to store RS last log result - * @param server server name - * @param timestamp log roll result (timestamp) - * @return put operation - */ - private Put createPutForRegionServerLastLogRollResult(String server, Long timestamp, - String backupRoot) { - Put put = new Put(rowkey(RS_LOG_TS_PREFIX, backupRoot, NULL, server)); - put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("rs-log-ts"), - Bytes.toBytes(timestamp)); - return put; - } - - /** - * Creates Scan operation to load last RS log roll results - * @return scan operation - */ - private Scan createScanForReadRegionServerLastLogRollResult(String backupRoot) { - Scan scan = new Scan(); - byte[] startRow = rowkey(RS_LOG_TS_PREFIX, backupRoot); - byte[] stopRow = Arrays.copyOf(startRow, startRow.length); - stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1); - scan.setStartRow(startRow); - scan.setStopRow(stopRow); - scan.addFamily(BackupSystemTable.META_FAMILY); - scan.setMaxVersions(1); - - return scan; - } - - /** - * Get server's name from rowkey - * @param row rowkey - * @return server's name - */ - private String getServerNameForReadRegionServerLastLogRollResult(byte[] row) { - String s = Bytes.toString(row); - int index = s.lastIndexOf(NULL); - return s.substring(index + 1); - } - - /* - * Creates Put's for bulk load resulting from running LoadIncrementalHFiles - */ - static List<Put> createPutForCommittedBulkload(TableName table, byte[] region, - Map<byte[], List<Path>> finalPaths) { - List<Put> puts = new ArrayList<>(); - for (Map.Entry<byte[], List<Path>> entry : finalPaths.entrySet()) { - for (Path path : entry.getValue()) { - String file = path.toString(); - int lastSlash = file.lastIndexOf("/"); - String filename = file.substring(lastSlash + 1); - Put put = - new Put(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM, - Bytes.toString(region), BLK_LD_DELIM, filename)); - put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, table.getName()); - put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, entry.getKey()); - put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, file.getBytes()); - put.addColumn(BackupSystemTable.META_FAMILY, STATE_COL, BL_COMMIT); - puts.add(put); - LOG.debug("writing done bulk path " + file + " for " + table + " " + Bytes.toString(region)); - } - } - return puts; - } - - public static void snapshot(Connection conn) throws IOException { - - try (Admin admin = conn.getAdmin();) { - Configuration conf = conn.getConfiguration(); - admin.snapshot(BackupSystemTable.getSnapshotName(conf), BackupSystemTable.getTableName(conf)); - } - } - - public static void restoreFromSnapshot(Connection conn) throws IOException { - - Configuration conf = conn.getConfiguration(); - LOG.debug("Restoring " + BackupSystemTable.getTableNameAsString(conf) + " from snapshot"); - try (Admin admin = conn.getAdmin();) { - String snapshotName = BackupSystemTable.getSnapshotName(conf); - if (snapshotExists(admin, snapshotName)) { - admin.disableTable(BackupSystemTable.getTableName(conf)); - admin.restoreSnapshot(snapshotName); - admin.enableTable(BackupSystemTable.getTableName(conf)); - LOG.debug("Done restoring backup system table"); - } else { - // Snapshot does not exists, i.e completeBackup failed after - // deleting backup system table snapshot - // In this case we log WARN and proceed - LOG.warn("Could not restore backup system table. Snapshot " + snapshotName - + " does not exists."); - } - } - } - - protected static boolean snapshotExists(Admin admin, String snapshotName) throws IOException { - - List<SnapshotDescription> list = admin.listSnapshots(); - for (SnapshotDescription desc : list) { - if (desc.getName().equals(snapshotName)) { - return true; - } - } - return false; - } - - public static boolean snapshotExists(Connection conn) throws IOException { - return snapshotExists(conn.getAdmin(), getSnapshotName(conn.getConfiguration())); - } - - public static void deleteSnapshot(Connection conn) throws IOException { - - Configuration conf = conn.getConfiguration(); - LOG.debug("Deleting " + BackupSystemTable.getSnapshotName(conf) + " from the system"); - try (Admin admin = conn.getAdmin();) { - String snapshotName = BackupSystemTable.getSnapshotName(conf); - if (snapshotExists(admin, snapshotName)) { - admin.deleteSnapshot(snapshotName); - LOG.debug("Done deleting backup system table snapshot"); - } else { - LOG.error("Snapshot " + snapshotName + " does not exists"); - } - } - } - - /* - * Creates Put's for bulk load resulting from running LoadIncrementalHFiles - */ - static List<Put> createPutForPreparedBulkload(TableName table, byte[] region, - final byte[] family, final List<Pair<Path, Path>> pairs) { - List<Put> puts = new ArrayList<>(); - for (Pair<Path, Path> pair : pairs) { - Path path = pair.getSecond(); - String file = path.toString(); - int lastSlash = file.lastIndexOf("/"); - String filename = file.substring(lastSlash + 1); - Put put = - new Put(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM, Bytes.toString(region), - BLK_LD_DELIM, filename)); - put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, table.getName()); - put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, family); - put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, file.getBytes()); - put.addColumn(BackupSystemTable.META_FAMILY, STATE_COL, BL_PREPARE); - puts.add(put); - LOG.debug("writing raw bulk path " + file + " for " + table + " " + Bytes.toString(region)); - } - return puts; - } - - public static List<Delete> createDeleteForOrigBulkLoad(List<TableName> lst) { - List<Delete> lstDels = new ArrayList<>(); - for (TableName table : lst) { - Delete del = new Delete(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM)); - del.addFamily(BackupSystemTable.META_FAMILY); - lstDels.add(del); - } - return lstDels; - } - - private Put createPutForDeleteOperation(String[] backupIdList) { - - byte[] value = Bytes.toBytes(StringUtils.join(backupIdList, ",")); - Put put = new Put(DELETE_OP_ROW); - put.addColumn(META_FAMILY, FAM_COL, value); - return put; - } - - private Delete createDeleteForBackupDeleteOperation() { - - Delete delete = new Delete(DELETE_OP_ROW); - delete.addFamily(META_FAMILY); - return delete; - } - - private Get createGetForDeleteOperation() { - - Get get = new Get(DELETE_OP_ROW); - get.addFamily(META_FAMILY); - return get; - } - - public void startDeleteOperation(String[] backupIdList) throws IOException { - if (LOG.isTraceEnabled()) { - LOG.trace("Start delete operation for backups: " + StringUtils.join(backupIdList)); - } - Put put = createPutForDeleteOperation(backupIdList); - try (Table table = connection.getTable(tableName)) { - table.put(put); - } - } - - public void finishDeleteOperation() throws IOException { - if (LOG.isTraceEnabled()) { - LOG.trace("Finsih delete operation for backup ids "); - } - Delete delete = createDeleteForBackupDeleteOperation(); - try (Table table = connection.getTable(tableName)) { - table.delete(delete); - } - } - - public String[] getListOfBackupIdsFromDeleteOperation() throws IOException { - if (LOG.isTraceEnabled()) { - LOG.trace("Get delete operation for backup ids "); - } - Get get = createGetForDeleteOperation(); - try (Table table = connection.getTable(tableName)) { - Result res = table.get(get); - if (res.isEmpty()) { - return null; - } - Cell cell = res.listCells().get(0); - byte[] val = CellUtil.cloneValue(cell); - if (val.length == 0) { - return null; - } - return new String(val).split(","); - } - } - - private Put createPutForMergeOperation(String[] backupIdList) { - - byte[] value = Bytes.toBytes(StringUtils.join(backupIdList, ",")); - Put put = new Put(MERGE_OP_ROW); - put.addColumn(META_FAMILY, FAM_COL, value); - return put; - } - - public boolean isMergeInProgress() throws IOException { - Get get = new Get(MERGE_OP_ROW); - try (Table table = connection.getTable(tableName)) { - Result res = table.get(get); - if (res.isEmpty()) { - return false; - } - return true; - } - } - - private Put createPutForUpdateTablesForMerge(List<TableName> tables) { - - byte[] value = Bytes.toBytes(StringUtils.join(tables, ",")); - Put put = new Put(MERGE_OP_ROW); - put.addColumn(META_FAMILY, PATH_COL, value); - return put; - } - - private Delete createDeleteForBackupMergeOperation() { - - Delete delete = new Delete(MERGE_OP_ROW); - delete.addFamily(META_FAMILY); - return delete; - } - - private Get createGetForMergeOperation() { - - Get get = new Get(MERGE_OP_ROW); - get.addFamily(META_FAMILY); - return get; - } - - public void startMergeOperation(String[] backupIdList) throws IOException { - if (LOG.isTraceEnabled()) { - LOG.trace("Start merge operation for backups: " + StringUtils.join(backupIdList)); - } - Put put = createPutForMergeOperation(backupIdList); - try (Table table = connection.getTable(tableName)) { - table.put(put); - } - } - - public void updateProcessedTablesForMerge(List<TableName> tables) throws IOException { - if (LOG.isTraceEnabled()) { - LOG.trace("Update tables for merge : " + StringUtils.join(tables, ",")); - } - Put put = createPutForUpdateTablesForMerge(tables); - try (Table table = connection.getTable(tableName)) { - table.put(put); - } - } - - public void finishMergeOperation() throws IOException { - if (LOG.isTraceEnabled()) { - LOG.trace("Finsih merge operation for backup ids "); - } - Delete delete = createDeleteForBackupMergeOperation(); - try (Table table = connection.getTable(tableName)) { - table.delete(delete); - } - } - - public String[] getListOfBackupIdsFromMergeOperation() throws IOException { - if (LOG.isTraceEnabled()) { - LOG.trace("Get backup ids for merge operation"); - } - Get get = createGetForMergeOperation(); - try (Table table = connection.getTable(tableName)) { - Result res = table.get(get); - if (res.isEmpty()) { - return null; - } - Cell cell = res.listCells().get(0); - byte[] val = CellUtil.cloneValue(cell); - if (val.length == 0) { - return null; - } - return new String(val).split(","); - } - } - - static Scan createScanForOrigBulkLoadedFiles(TableName table) throws IOException { - Scan scan = new Scan(); - byte[] startRow = rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM); - byte[] stopRow = Arrays.copyOf(startRow, startRow.length); - stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1); - scan.withStartRow(startRow); - scan.withStopRow(stopRow); - scan.addFamily(BackupSystemTable.META_FAMILY); - scan.setMaxVersions(1); - return scan; - } - - static String getTableNameFromOrigBulkLoadRow(String rowStr) { - String[] parts = rowStr.split(BLK_LD_DELIM); - return parts[1]; - } - - static String getRegionNameFromOrigBulkLoadRow(String rowStr) { - // format is bulk : namespace : table : region : file - String[] parts = rowStr.split(BLK_LD_DELIM); - int idx = 3; - if (parts.length == 4) { - // the table is in default namespace - idx = 2; - } - LOG.debug("bulk row string " + rowStr + " region " + parts[idx]); - return parts[idx]; - } - - /* - * Used to query bulk loaded hfiles which have been copied by incremental backup - * @param backupId the backup Id. It can be null when querying for all tables - * @return the Scan object - */ - static Scan createScanForBulkLoadedFiles(String backupId) throws IOException { - Scan scan = new Scan(); - byte[] startRow = - backupId == null ? BULK_LOAD_PREFIX_BYTES : rowkey(BULK_LOAD_PREFIX, backupId - + BLK_LD_DELIM); - byte[] stopRow = Arrays.copyOf(startRow, startRow.length); - stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1); - scan.setStartRow(startRow); - scan.setStopRow(stopRow); - // scan.setTimeRange(lower, Long.MAX_VALUE); - scan.addFamily(BackupSystemTable.META_FAMILY); - scan.setMaxVersions(1); - return scan; - } - - static Put createPutForBulkLoadedFile(TableName tn, byte[] fam, String p, String backupId, - long ts, int idx) { - Put put = new Put(rowkey(BULK_LOAD_PREFIX, backupId + BLK_LD_DELIM + ts + BLK_LD_DELIM + idx)); - put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, tn.getName()); - put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, fam); - put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, p.getBytes()); - return put; - } - - /** - * Creates put list for list of WAL files - * @param files list of WAL file paths - * @param backupId backup id - * @return put list - * @throws IOException exception - */ - private List<Put> - createPutsForAddWALFiles(List<String> files, String backupId, String backupRoot) - throws IOException { - - List<Put> puts = new ArrayList<Put>(); - for (String file : files) { - Put put = new Put(rowkey(WALS_PREFIX, BackupUtils.getUniqueWALFileNamePart(file))); - put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("backupId"), - Bytes.toBytes(backupId)); - put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("file"), Bytes.toBytes(file)); - put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("root"), Bytes.toBytes(backupRoot)); - puts.add(put); - } - return puts; - } - - /** - * Creates Scan operation to load WALs - * @param backupRoot path to backup destination - * @return scan operation - */ - private Scan createScanForGetWALs(String backupRoot) { - // TODO: support for backupRoot - Scan scan = new Scan(); - byte[] startRow = Bytes.toBytes(WALS_PREFIX); - byte[] stopRow = Arrays.copyOf(startRow, startRow.length); - stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1); - scan.setStartRow(startRow); - scan.setStopRow(stopRow); - scan.addFamily(BackupSystemTable.META_FAMILY); - return scan; - } - - /** - * Creates Get operation for a given wal file name TODO: support for backup destination - * @param file file - * @return get operation - * @throws IOException exception - */ - private Get createGetForCheckWALFile(String file) throws IOException { - Get get = new Get(rowkey(WALS_PREFIX, BackupUtils.getUniqueWALFileNamePart(file))); - // add backup root column - get.addFamily(BackupSystemTable.META_FAMILY); - return get; - } - - /** - * Creates Scan operation to load backup set list - * @return scan operation - */ - private Scan createScanForBackupSetList() { - Scan scan = new Scan(); - byte[] startRow = Bytes.toBytes(SET_KEY_PREFIX); - byte[] stopRow = Arrays.copyOf(startRow, startRow.length); - stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1); - scan.setStartRow(startRow); - scan.setStopRow(stopRow); - scan.addFamily(BackupSystemTable.META_FAMILY); - return scan; - } - - /** - * Creates Get operation to load backup set content - * @return get operation - */ - private Get createGetForBackupSet(String name) { - Get get = new Get(rowkey(SET_KEY_PREFIX, name)); - get.addFamily(BackupSystemTable.META_FAMILY); - return get; - } - - /** - * Creates Delete operation to delete backup set content - * @param name backup set's name - * @return delete operation - */ - private Delete createDeleteForBackupSet(String name) { - Delete del = new Delete(rowkey(SET_KEY_PREFIX, name)); - del.addFamily(BackupSystemTable.META_FAMILY); - return del; - } - - /** - * Creates Put operation to update backup set content - * @param name backup set's name - * @param tables list of tables - * @return put operation - */ - private Put createPutForBackupSet(String name, String[] tables) { - Put put = new Put(rowkey(SET_KEY_PREFIX, name)); - byte[] value = convertToByteArray(tables); - put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("tables"), value); - return put; - } - - private byte[] convertToByteArray(String[] tables) { - return StringUtils.join(tables, ",").getBytes(); - } - - /** - * Converts cell to backup set list. - * @param current current cell - * @return backup set as array of table names - * @throws IOException - */ - private String[] cellValueToBackupSet(Cell current) throws IOException { - byte[] data = CellUtil.cloneValue(current); - if (data != null && data.length > 0) { - return Bytes.toString(data).split(","); - } else { - return new String[0]; - } - } - - /** - * Converts cell key to backup set name. - * @param current current cell - * @return backup set name - * @throws IOException - */ - private String cellKeyToBackupSetName(Cell current) throws IOException { - byte[] data = CellUtil.cloneRow(current); - return Bytes.toString(data).substring(SET_KEY_PREFIX.length()); - } - - private static byte[] rowkey(String s, String... other) { - StringBuilder sb = new StringBuilder(s); - for (String ss : other) { - sb.append(ss); - } - return sb.toString().getBytes(); - } - -}
http://git-wip-us.apache.org/repos/asf/hbase/blob/37c65946/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java deleted file mode 100644 index e323e96..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java +++ /dev/null @@ -1,224 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.backup.impl; - -import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.BACKUP_ATTEMPTS_PAUSE_MS_KEY; -import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.BACKUP_MAX_ATTEMPTS_KEY; -import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.DEFAULT_BACKUP_ATTEMPTS_PAUSE_MS; -import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.DEFAULT_BACKUP_MAX_ATTEMPTS; - -import java.io.IOException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.backup.BackupCopyJob; -import org.apache.hadoop.hbase.backup.BackupInfo; -import org.apache.hadoop.hbase.backup.BackupInfo.BackupPhase; -import org.apache.hadoop.hbase.backup.BackupInfo.BackupState; -import org.apache.hadoop.hbase.backup.BackupRequest; -import org.apache.hadoop.hbase.backup.BackupRestoreFactory; -import org.apache.hadoop.hbase.backup.BackupType; -import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager; -import org.apache.hadoop.hbase.backup.util.BackupUtils; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; - -/** - * Full table backup implementation - * - */ -@InterfaceAudience.Private -public class FullTableBackupClient extends TableBackupClient { - private static final Log LOG = LogFactory.getLog(FullTableBackupClient.class); - - public FullTableBackupClient() { - } - - public FullTableBackupClient(final Connection conn, final String backupId, BackupRequest request) - throws IOException { - super(conn, backupId, request); - } - - /** - * Do snapshot copy. - * @param backupInfo backup info - * @throws Exception exception - */ - protected void snapshotCopy(BackupInfo backupInfo) throws Exception { - LOG.info("Snapshot copy is starting."); - - // set overall backup phase: snapshot_copy - backupInfo.setPhase(BackupPhase.SNAPSHOTCOPY); - - // call ExportSnapshot to copy files based on hbase snapshot for backup - // ExportSnapshot only support single snapshot export, need loop for multiple tables case - BackupCopyJob copyService = BackupRestoreFactory.getBackupCopyJob(conf); - - // number of snapshots matches number of tables - float numOfSnapshots = backupInfo.getSnapshotNames().size(); - - LOG.debug("There are " + (int) numOfSnapshots + " snapshots to be copied."); - - for (TableName table : backupInfo.getTables()) { - // Currently we simply set the sub copy tasks by counting the table snapshot number, we can - // calculate the real files' size for the percentage in the future. - // backupCopier.setSubTaskPercntgInWholeTask(1f / numOfSnapshots); - int res = 0; - String[] args = new String[4]; - args[0] = "-snapshot"; - args[1] = backupInfo.getSnapshotName(table); - args[2] = "-copy-to"; - args[3] = backupInfo.getTableBackupDir(table); - - LOG.debug("Copy snapshot " + args[1] + " to " + args[3]); - res = copyService.copy(backupInfo, backupManager, conf, BackupType.FULL, args); - // if one snapshot export failed, do not continue for remained snapshots - if (res != 0) { - LOG.error("Exporting Snapshot " + args[1] + " failed with return code: " + res + "."); - - throw new IOException("Failed of exporting snapshot " + args[1] + " to " + args[3] - + " with reason code " + res); - } - LOG.info("Snapshot copy " + args[1] + " finished."); - } - } - - /** - * Backup request execution - * @throws IOException - */ - @Override - public void execute() throws IOException { - try (Admin admin = conn.getAdmin();) { - - // Begin BACKUP - beginBackup(backupManager, backupInfo); - String savedStartCode = null; - boolean firstBackup = false; - // do snapshot for full table backup - - savedStartCode = backupManager.readBackupStartCode(); - firstBackup = savedStartCode == null || Long.parseLong(savedStartCode) == 0L; - if (firstBackup) { - // This is our first backup. Let's put some marker to system table so that we can hold the logs - // while we do the backup. - backupManager.writeBackupStartCode(0L); - } - // We roll log here before we do the snapshot. It is possible there is duplicate data - // in the log that is already in the snapshot. But if we do it after the snapshot, we - // could have data loss. - // A better approach is to do the roll log on each RS in the same global procedure as - // the snapshot. - LOG.info("Execute roll log procedure for full backup ..."); - - Map<String, String> props = new HashMap<String, String>(); - props.put("backupRoot", backupInfo.getBackupRootDir()); - admin.execProcedure(LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_SIGNATURE, - LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_NAME, props); - - newTimestamps = backupManager.readRegionServerLastLogRollResult(); - if (firstBackup) { - // Updates registered log files - // We record ALL old WAL files as registered, because - // this is a first full backup in the system and these - // files are not needed for next incremental backup - List<String> logFiles = BackupUtils.getWALFilesOlderThan(conf, newTimestamps); - backupManager.recordWALFiles(logFiles); - } - - // SNAPSHOT_TABLES: - backupInfo.setPhase(BackupPhase.SNAPSHOT); - for (TableName tableName : tableList) { - String snapshotName = - "snapshot_" + Long.toString(EnvironmentEdgeManager.currentTime()) + "_" - + tableName.getNamespaceAsString() + "_" + tableName.getQualifierAsString(); - - snapshotTable(admin, tableName, snapshotName); - backupInfo.setSnapshotName(tableName, snapshotName); - } - - // SNAPSHOT_COPY: - // do snapshot copy - LOG.debug("snapshot copy for " + backupId); - snapshotCopy(backupInfo); - // Updates incremental backup table set - backupManager.addIncrementalBackupTableSet(backupInfo.getTables()); - - // BACKUP_COMPLETE: - // set overall backup status: complete. Here we make sure to complete the backup. - // After this checkpoint, even if entering cancel process, will let the backup finished - backupInfo.setState(BackupState.COMPLETE); - // The table list in backupInfo is good for both full backup and incremental backup. - // For incremental backup, it contains the incremental backup table set. - backupManager.writeRegionServerLogTimestamp(backupInfo.getTables(), newTimestamps); - - HashMap<TableName, HashMap<String, Long>> newTableSetTimestampMap = - backupManager.readLogTimestampMap(); - - Long newStartCode = - BackupUtils.getMinValue(BackupUtils - .getRSLogTimestampMins(newTableSetTimestampMap)); - backupManager.writeBackupStartCode(newStartCode); - - // backup complete - completeBackup(conn, backupInfo, backupManager, BackupType.FULL, conf); - } catch (Exception e) { - failBackup(conn, backupInfo, backupManager, e, "Unexpected BackupException : ", - BackupType.FULL, conf); - throw new IOException(e); - } - - } - - - protected void snapshotTable(Admin admin, TableName tableName, String snapshotName) - throws IOException { - - int maxAttempts = - conf.getInt(BACKUP_MAX_ATTEMPTS_KEY, DEFAULT_BACKUP_MAX_ATTEMPTS); - int pause = - conf.getInt(BACKUP_ATTEMPTS_PAUSE_MS_KEY, DEFAULT_BACKUP_ATTEMPTS_PAUSE_MS); - int attempts = 0; - - while (attempts++ < maxAttempts) { - try { - admin.snapshot(snapshotName, tableName); - return; - } catch (IOException ee) { - LOG.warn("Snapshot attempt " + attempts + " failed for table " + tableName - + ", sleeping for " + pause + "ms", ee); - if (attempts < maxAttempts) { - try { - Thread.sleep(pause); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - break; - } - } - } - } - throw new IOException("Failed to snapshot table "+ tableName); - } -}