HBASE-14135 Merge backup images (Vladimir Rodionov)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/05e6e569 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/05e6e569 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/05e6e569 Branch: refs/heads/HBASE-14070.HLC Commit: 05e6e5695089640006d06c2f74126b50a73363b7 Parents: c6ac04a Author: Josh Elser <els...@apache.org> Authored: Sun Aug 13 20:55:58 2017 -0400 Committer: Josh Elser <els...@apache.org> Committed: Sun Aug 13 20:55:58 2017 -0400 ---------------------------------------------------------------------- .../apache/hadoop/hbase/backup/BackupAdmin.java | 20 +- .../hadoop/hbase/backup/BackupDriver.java | 2 + .../apache/hadoop/hbase/backup/BackupInfo.java | 5 + .../hadoop/hbase/backup/BackupMergeJob.java | 40 +++ .../hbase/backup/BackupRestoreFactory.java | 20 +- .../hadoop/hbase/backup/HBackupFileSystem.java | 57 ++-- .../hbase/backup/impl/BackupAdminImpl.java | 213 +++++++++--- .../hbase/backup/impl/BackupCommands.java | 163 ++++++--- .../hadoop/hbase/backup/impl/BackupManager.java | 21 +- .../hbase/backup/impl/BackupManifest.java | 24 +- .../hbase/backup/impl/BackupSystemTable.java | 314 ++++++++++------- .../hbase/backup/impl/RestoreTablesClient.java | 32 +- .../backup/mapreduce/HFileSplitterJob.java | 181 ---------- .../mapreduce/MapReduceBackupMergeJob.java | 321 ++++++++++++++++++ .../mapreduce/MapReduceHFileSplitterJob.java | 181 ++++++++++ .../backup/mapreduce/MapReduceRestoreJob.java | 84 ++--- .../hadoop/hbase/backup/util/BackupUtils.java | 93 +++-- .../TestIncrementalBackupMergeWithFailures.java | 336 +++++++++++++++++++ .../backup/TestRepairAfterFailedDelete.java | 2 +- 19 files changed, 1574 insertions(+), 535 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/05e6e569/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupAdmin.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupAdmin.java index 6f642a4..9dc6382 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupAdmin.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupAdmin.java @@ -38,8 +38,8 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; public interface BackupAdmin extends Closeable { /** - * Backup given list of tables fully. This is a synchronous operation. - * It returns backup id on success or throw exception on failure. + * Backup given list of tables fully. This is a synchronous operation. It returns backup id on + * success or throw exception on failure. * @param userRequest BackupRequest instance * @return the backup Id */ @@ -61,16 +61,24 @@ public interface BackupAdmin extends Closeable { */ BackupInfo getBackupInfo(String backupId) throws IOException; - /** * Delete backup image command - * @param backupIds backup id list + * @param backupIds array of backup ids * @return total number of deleted sessions * @throws IOException exception */ int deleteBackups(String[] backupIds) throws IOException; /** + * Merge backup images command + * @param backupIds array of backup ids of images to be merged + * The resulting backup image will have the same backup id as the most + * recent image from a list of images to be merged + * @throws IOException exception + */ + void mergeBackups(String[] backupIds) throws IOException; + + /** * Show backup history command * @param n last n backup sessions * @return list of backup info objects @@ -113,7 +121,7 @@ public interface BackupAdmin extends Closeable { /** * Add tables to backup set command * @param name name of backup set. - * @param tables list of tables to be added to this set. + * @param tables array of tables to be added to this set. * @throws IOException exception */ void addToBackupSet(String name, TableName[] tables) throws IOException; @@ -121,7 +129,7 @@ public interface BackupAdmin extends Closeable { /** * Remove tables from backup set * @param name name of backup set. - * @param tables list of tables to be removed from this set. + * @param tables array of tables to be removed from this set. * @throws IOException exception */ void removeFromBackupSet(String name, TableName[] tables) throws IOException; http://git-wip-us.apache.org/repos/asf/hbase/blob/05e6e569/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupDriver.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupDriver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupDriver.java index e2cdb2f..9dd8531 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupDriver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupDriver.java @@ -111,6 +111,8 @@ public class BackupDriver extends AbstractHBaseTool { type = BackupCommand.SET; } else if (BackupCommand.REPAIR.name().equalsIgnoreCase(cmd)) { type = BackupCommand.REPAIR; + } else if (BackupCommand.MERGE.name().equalsIgnoreCase(cmd)) { + type = BackupCommand.MERGE; } else { System.out.println("Unsupported command for backup: " + cmd); printToolUsage(); http://git-wip-us.apache.org/repos/asf/hbase/blob/05e6e569/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupInfo.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupInfo.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupInfo.java index f6a1fe4..1765bf3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupInfo.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupInfo.java @@ -433,6 +433,11 @@ public class BackupInfo implements Comparable<BackupInfo> { } } + @Override + public String toString() { + return backupId; + } + public byte[] toByteArray() throws IOException { return toProtosBackupInfo().toByteArray(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/05e6e569/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupMergeJob.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupMergeJob.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupMergeJob.java new file mode 100644 index 0000000..136782f --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupMergeJob.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.backup; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * Backup merge operation job interface. Concrete implementation is provided by backup provider, see + * {@link BackupRestoreFactory} + */ + +@InterfaceAudience.Private +public interface BackupMergeJob extends Configurable { + + /** + * Run backup merge operation + * @param backupIds backup image ids + * @throws IOException + */ + void run(String[] backupIds) throws IOException; +} http://git-wip-us.apache.org/repos/asf/hbase/blob/05e6e569/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreFactory.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreFactory.java index 6d8967a..d72c884 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreFactory.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.backup; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.backup.mapreduce.MapReduceBackupCopyJob; +import org.apache.hadoop.hbase.backup.mapreduce.MapReduceBackupMergeJob; import org.apache.hadoop.hbase.backup.mapreduce.MapReduceRestoreJob; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.util.ReflectionUtils; @@ -32,6 +33,7 @@ public final class BackupRestoreFactory { public final static String HBASE_INCR_RESTORE_IMPL_CLASS = "hbase.incremental.restore.class"; public final static String HBASE_BACKUP_COPY_IMPL_CLASS = "hbase.backup.copy.class"; + public final static String HBASE_BACKUP_MERGE_IMPL_CLASS = "hbase.backup.merge.class"; private BackupRestoreFactory() { throw new AssertionError("Instantiating utility class..."); @@ -40,7 +42,7 @@ public final class BackupRestoreFactory { /** * Gets backup restore job * @param conf configuration - * @return backup restore task instance + * @return backup restore job instance */ public static RestoreJob getRestoreJob(Configuration conf) { Class<? extends RestoreJob> cls = @@ -53,7 +55,7 @@ public final class BackupRestoreFactory { /** * Gets backup copy job * @param conf configuration - * @return backup copy task + * @return backup copy job instance */ public static BackupCopyJob getBackupCopyJob(Configuration conf) { Class<? extends BackupCopyJob> cls = @@ -63,4 +65,18 @@ public final class BackupRestoreFactory { service.setConf(conf); return service; } + + /** + * Gets backup merge job + * @param conf configuration + * @return backup merge job instance + */ + public static BackupMergeJob getBackupMergeJob(Configuration conf) { + Class<? extends BackupMergeJob> cls = + conf.getClass(HBASE_BACKUP_MERGE_IMPL_CLASS, MapReduceBackupMergeJob.class, + BackupMergeJob.class); + BackupMergeJob service = ReflectionUtils.newInstance(cls, conf); + service.setConf(conf); + return service; + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/05e6e569/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HBackupFileSystem.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HBackupFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HBackupFileSystem.java index 46044db..1c43e88 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HBackupFileSystem.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HBackupFileSystem.java @@ -49,8 +49,8 @@ public class HBackupFileSystem { /** * Given the backup root dir, backup id and the table name, return the backup image location, * which is also where the backup manifest file is. return value look like: - * "hdfs://backup.hbase.org:9000/user/biadmin/backup/backup_1396650096738/default/t1_dn/", - * where "hdfs://backup.hbase.org:9000/user/biadmin/backup" is a backup root directory + * "hdfs://backup.hbase.org:9000/user/biadmin/backup/backup_1396650096738/default/t1_dn/", where + * "hdfs://backup.hbase.org:9000/user/biadmin/backup" is a backup root directory * @param backupRootDir backup root directory * @param backupId backup id * @param tableName table name @@ -63,18 +63,26 @@ public class HBackupFileSystem { + Path.SEPARATOR; } + public static String getTableBackupDataDir(String backupRootDir, String backupId, + TableName tableName) { + return getTableBackupDir(backupRootDir, backupId, tableName) + Path.SEPARATOR + "data"; + } + + public static Path getBackupPath(String backupRootDir, String backupId) { + return new Path(backupRootDir + Path.SEPARATOR + backupId); + } + /** * Given the backup root dir, backup id and the table name, return the backup image location, * which is also where the backup manifest file is. return value look like: - * "hdfs://backup.hbase.org:9000/user/biadmin/backup/backup_1396650096738/default/t1_dn/", - * where "hdfs://backup.hbase.org:9000/user/biadmin/backup" is a backup root directory + * "hdfs://backup.hbase.org:9000/user/biadmin/backup/backup_1396650096738/default/t1_dn/", where + * "hdfs://backup.hbase.org:9000/user/biadmin/backup" is a backup root directory * @param backupRootPath backup root path * @param tableName table name * @param backupId backup Id * @return backupPath for the particular table */ - public static Path getTableBackupPath(TableName tableName, - Path backupRootPath, String backupId) { + public static Path getTableBackupPath(TableName tableName, Path backupRootPath, String backupId) { return new Path(getTableBackupDir(backupRootPath.toString(), backupId, tableName)); } @@ -94,33 +102,30 @@ public class HBackupFileSystem { return new Path(getLogBackupDir(backupRootDir, backupId)); } - private static Path getManifestPath(TableName tableName, Configuration conf, Path backupRootPath, - String backupId) throws IOException { - Path manifestPath = - new Path(getTableBackupPath(tableName, backupRootPath, backupId), - BackupManifest.MANIFEST_FILE_NAME); + // TODO we do not keep WAL files anymore + // Move manifest file to other place + private static Path getManifestPath(Configuration conf, Path backupRootPath, String backupId) + throws IOException { + Path manifestPath = null; FileSystem fs = backupRootPath.getFileSystem(conf); + manifestPath = + new Path(getBackupPath(backupRootPath.toString(), backupId) + Path.SEPARATOR + + BackupManifest.MANIFEST_FILE_NAME); if (!fs.exists(manifestPath)) { - // check log dir for incremental backup case - manifestPath = - new Path(getLogBackupDir(backupRootPath.toString(), backupId) + Path.SEPARATOR - + BackupManifest.MANIFEST_FILE_NAME); - if (!fs.exists(manifestPath)) { - String errorMsg = - "Could not find backup manifest " + BackupManifest.MANIFEST_FILE_NAME + " for " - + backupId + ". File " + manifestPath + " does not exists. Did " + backupId - + " correspond to previously taken backup ?"; - throw new IOException(errorMsg); - } + String errorMsg = + "Could not find backup manifest " + BackupManifest.MANIFEST_FILE_NAME + " for " + + backupId + ". File " + manifestPath + " does not exists. Did " + backupId + + " correspond to previously taken backup ?"; + throw new IOException(errorMsg); } return manifestPath; } - public static BackupManifest getManifest(TableName tableName, Configuration conf, - Path backupRootPath, String backupId) throws IOException { + public static BackupManifest + getManifest(Configuration conf, Path backupRootPath, String backupId) throws IOException { BackupManifest manifest = - new BackupManifest(conf, getManifestPath(tableName, conf, backupRootPath, backupId)); + new BackupManifest(conf, getManifestPath(conf, backupRootPath, backupId)); return manifest; } @@ -134,7 +139,7 @@ public class HBackupFileSystem { TableName[] tableArray, Configuration conf, Path backupRootPath, String backupId) throws IOException { for (TableName tableName : tableArray) { - BackupManifest manifest = getManifest(tableName, conf, backupRootPath, backupId); + BackupManifest manifest = getManifest(conf, backupRootPath, backupId); backupManifestMap.put(tableName, manifest); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/05e6e569/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java index 6e35d92..99fb06c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.backup.impl; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -36,8 +37,10 @@ import org.apache.hadoop.hbase.backup.BackupAdmin; import org.apache.hadoop.hbase.backup.BackupClientFactory; import org.apache.hadoop.hbase.backup.BackupInfo; import org.apache.hadoop.hbase.backup.BackupInfo.BackupState; +import org.apache.hadoop.hbase.backup.BackupMergeJob; import org.apache.hadoop.hbase.backup.BackupRequest; import org.apache.hadoop.hbase.backup.BackupRestoreConstants; +import org.apache.hadoop.hbase.backup.BackupRestoreFactory; import org.apache.hadoop.hbase.backup.BackupType; import org.apache.hadoop.hbase.backup.HBackupFileSystem; import org.apache.hadoop.hbase.backup.RestoreRequest; @@ -46,9 +49,8 @@ import org.apache.hadoop.hbase.backup.util.BackupUtils; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; - import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @InterfaceAudience.Private public class BackupAdminImpl implements BackupAdmin { @@ -65,12 +67,8 @@ public class BackupAdminImpl implements BackupAdmin { @Override public void close() throws IOException { - if (conn != null) { - conn.close(); - } } - @Override public BackupInfo getBackupInfo(String backupId) throws IOException { BackupInfo backupInfo = null; @@ -105,12 +103,12 @@ public class BackupAdminImpl implements BackupAdmin { // is running by using startBackupSession API // If there is an active session in progress, exception will be thrown try { - sysTable.startBackupSession(); + sysTable.startBackupExclusiveOperation(); deleteSessionStarted = true; } catch (IOException e) { LOG.warn("You can not run delete command while active backup session is in progress. \n" + "If there is no active backup session running, run backup repair utility to restore \n" - +"backup system integrity."); + + "backup system integrity."); return -1; } @@ -126,7 +124,7 @@ public class BackupAdminImpl implements BackupAdmin { sysTable.startDeleteOperation(backupIds); // Step 4: Snapshot backup system table if (!BackupSystemTable.snapshotExists(conn)) { - BackupSystemTable.snapshot(conn); + BackupSystemTable.snapshot(conn); } else { LOG.warn("Backup system table snapshot exists"); } @@ -154,13 +152,13 @@ public class BackupAdminImpl implements BackupAdmin { // Fail delete operation // Step 1 if (snapshotDone) { - if(BackupSystemTable.snapshotExists(conn)) { + if (BackupSystemTable.snapshotExists(conn)) { BackupSystemTable.restoreFromSnapshot(conn); // delete snapshot BackupSystemTable.deleteSnapshot(conn); // We still have record with unfinished delete operation - LOG.error("Delete operation failed, please run backup repair utility to restore "+ - "backup system integrity", e); + LOG.error("Delete operation failed, please run backup repair utility to restore " + + "backup system integrity", e); throw e; } else { LOG.warn("Delete operation succeeded, there were some errors: ", e); @@ -169,7 +167,7 @@ public class BackupAdminImpl implements BackupAdmin { } finally { if (deleteSessionStarted) { - sysTable.finishBackupSession(); + sysTable.finishBackupExclusiveOperation(); } } } @@ -206,17 +204,17 @@ public class BackupAdminImpl implements BackupAdmin { /** * Delete single backup and all related backups <br> * Algorithm:<br> - * Backup type: FULL or INCREMENTAL <br> - * Is this last backup session for table T: YES or NO <br> - * For every table T from table list 'tables':<br> - * if(FULL, YES) deletes only physical data (PD) <br> - * if(FULL, NO), deletes PD, scans all newer backups and removes T from backupInfo,<br> - * until we either reach the most recent backup for T in the system or FULL backup<br> - * which includes T<br> - * if(INCREMENTAL, YES) deletes only physical data (PD) - * if(INCREMENTAL, NO) deletes physical data and for table T scans all backup images between last<br> - * FULL backup, which is older than the backup being deleted and the next FULL backup (if exists) <br> - * or last one for a particular table T and removes T from list of backup tables. + * Backup type: FULL or INCREMENTAL <br> + * Is this last backup session for table T: YES or NO <br> + * For every table T from table list 'tables':<br> + * if(FULL, YES) deletes only physical data (PD) <br> + * if(FULL, NO), deletes PD, scans all newer backups and removes T from backupInfo,<br> + * until we either reach the most recent backup for T in the system or FULL backup<br> + * which includes T<br> + * if(INCREMENTAL, YES) deletes only physical data (PD) if(INCREMENTAL, NO) deletes physical data + * and for table T scans all backup images between last<br> + * FULL backup, which is older than the backup being deleted and the next FULL backup (if exists) <br> + * or last one for a particular table T and removes T from list of backup tables. * @param backupId backup id * @param sysTable backup system table * @return total number of deleted backup images @@ -285,8 +283,9 @@ public class BackupAdminImpl implements BackupAdmin { return totalDeleted; } - private void removeTableFromBackupImage(BackupInfo info, TableName tn, - BackupSystemTable sysTable) throws IOException { + private void + removeTableFromBackupImage(BackupInfo info, TableName tn, BackupSystemTable sysTable) + throws IOException { List<TableName> tables = info.getTableNames(); LOG.debug("Remove " + tn + " from " + info.getBackupId() + " tables=" + info.getTableListAsString()); @@ -485,7 +484,7 @@ public class BackupAdminImpl implements BackupAdmin { private String[] toStringArray(TableName[] list) { String[] arr = new String[list.length]; - for(int i=0; i < list.length; i++) { + for (int i = 0; i < list.length; i++) { arr[i] = list[i].toString(); } return arr; @@ -521,7 +520,7 @@ public class BackupAdminImpl implements BackupAdmin { String targetRootDir = request.getTargetRootDir(); List<TableName> tableList = request.getTableList(); - String backupId =BackupRestoreConstants.BACKUPID_PREFIX + EnvironmentEdgeManager.currentTime(); + String backupId = BackupRestoreConstants.BACKUPID_PREFIX + EnvironmentEdgeManager.currentTime(); if (type == BackupType.INCREMENTAL) { Set<TableName> incrTableSet = null; try (BackupSystemTable table = new BackupSystemTable(conn)) { @@ -529,19 +528,20 @@ public class BackupAdminImpl implements BackupAdmin { } if (incrTableSet.isEmpty()) { - String msg = "Incremental backup table set contains no tables. " - + "You need to run full backup first " + - (tableList != null ? "on "+StringUtils.join(tableList, ","): ""); + String msg = + "Incremental backup table set contains no tables. " + + "You need to run full backup first " + + (tableList != null ? "on " + StringUtils.join(tableList, ",") : ""); throw new IOException(msg); } - if(tableList != null) { + if (tableList != null) { tableList.removeAll(incrTableSet); if (!tableList.isEmpty()) { String extraTables = StringUtils.join(tableList, ","); - String msg = "Some tables (" + extraTables + ") haven't gone through full backup. "+ - "Perform full backup on " + extraTables + " first, " - + "then retry the command"; + String msg = + "Some tables (" + extraTables + ") haven't gone through full backup. " + + "Perform full backup on " + extraTables + " first, " + "then retry the command"; throw new IOException(msg); } } @@ -584,14 +584,13 @@ public class BackupAdminImpl implements BackupAdmin { // update table list BackupRequest.Builder builder = new BackupRequest.Builder(); - request = builder.withBackupType(request.getBackupType()). - withTableList(tableList). - withTargetRootDir(request.getTargetRootDir()). - withBackupSetName(request.getBackupSetName()). - withTotalTasks(request.getTotalTasks()). - withBandwidthPerTasks((int)request.getBandwidth()).build(); - - TableBackupClient client =null; + request = + builder.withBackupType(request.getBackupType()).withTableList(tableList) + .withTargetRootDir(request.getTargetRootDir()) + .withBackupSetName(request.getBackupSetName()).withTotalTasks(request.getTotalTasks()) + .withBandwidthPerTasks((int) request.getBandwidth()).build(); + + TableBackupClient client = null; try { client = BackupClientFactory.create(conn, backupId, request); } catch (IOException e) { @@ -613,4 +612,132 @@ public class BackupAdminImpl implements BackupAdmin { return tableList; } + @Override + public void mergeBackups(String[] backupIds) throws IOException { + try (final BackupSystemTable sysTable = new BackupSystemTable(conn);) { + checkIfValidForMerge(backupIds, sysTable); + BackupMergeJob job = BackupRestoreFactory.getBackupMergeJob(conn.getConfiguration()); + job.run(backupIds); + } + } + + /** + * Verifies that backup images are valid for merge. + * + * <ul> + * <li>All backups MUST be in the same destination + * <li>No FULL backups are allowed - only INCREMENTAL + * <li>All backups must be in COMPLETE state + * <li>No holes in backup list are allowed + * </ul> + * <p> + * @param backupIds list of backup ids + * @param table backup system table + * @throws IOException + */ + private void checkIfValidForMerge(String[] backupIds, BackupSystemTable table) throws IOException { + String backupRoot = null; + + final Set<TableName> allTables = new HashSet<TableName>(); + final Set<String> allBackups = new HashSet<String>(); + long minTime = Long.MAX_VALUE, maxTime = Long.MIN_VALUE; + for (String backupId : backupIds) { + BackupInfo bInfo = table.readBackupInfo(backupId); + if (bInfo == null) { + String msg = "Backup session " + backupId + " not found"; + throw new IOException(msg); + } + if (backupRoot == null) { + backupRoot = bInfo.getBackupRootDir(); + } else if (!bInfo.getBackupRootDir().equals(backupRoot)) { + throw new IOException("Found different backup destinations in a list of a backup sessions \n" + + "1. " + backupRoot + "\n" + "2. " + bInfo.getBackupRootDir()); + } + if (bInfo.getType() == BackupType.FULL) { + throw new IOException("FULL backup image can not be merged for: \n" + bInfo); + } + + if (bInfo.getState() != BackupState.COMPLETE) { + throw new IOException("Backup image " + backupId + + " can not be merged becuase of its state: " + bInfo.getState()); + } + allBackups.add(backupId); + allTables.addAll(bInfo.getTableNames()); + long time = bInfo.getStartTs(); + if (time < minTime) { + minTime = time; + } + if (time > maxTime) { + maxTime = time; + } + } + + + final long startRangeTime = minTime; + final long endRangeTime = maxTime; + final String backupDest = backupRoot; + // Check we have no 'holes' in backup id list + // Filter 1 : backupRoot + // Filter 2 : time range filter + // Filter 3 : table filter + + BackupInfo.Filter destinationFilter = new BackupInfo.Filter() { + + @Override + public boolean apply(BackupInfo info) { + return info.getBackupRootDir().equals(backupDest); + } + }; + + BackupInfo.Filter timeRangeFilter = new BackupInfo.Filter() { + + @Override + public boolean apply(BackupInfo info) { + long time = info.getStartTs(); + return time >= startRangeTime && time <= endRangeTime ; + } + }; + + BackupInfo.Filter tableFilter = new BackupInfo.Filter() { + + @Override + public boolean apply(BackupInfo info) { + List<TableName> tables = info.getTableNames(); + return !Collections.disjoint(allTables, tables); + } + }; + + BackupInfo.Filter typeFilter = new BackupInfo.Filter() { + + @Override + public boolean apply(BackupInfo info) { + return info.getType() == BackupType.INCREMENTAL; + } + }; + + BackupInfo.Filter stateFilter = new BackupInfo.Filter() { + @Override + public boolean apply(BackupInfo info) { + return info.getState() == BackupState.COMPLETE; + } + }; + + List<BackupInfo> allInfos = + table.getBackupHistory( -1, destinationFilter, + timeRangeFilter, tableFilter, typeFilter, stateFilter); + if (allInfos.size() != allBackups.size()) { + // Yes we have at least one hole in backup image sequence + List<String> missingIds = new ArrayList<String>(); + for(BackupInfo info: allInfos) { + if(allBackups.contains(info.getBackupId())) { + continue; + } + missingIds.add(info.getBackupId()); + } + String errMsg = + "Sequence of backup ids has 'holes'. The following backup images must be added:" + + org.apache.hadoop.util.StringUtils.join(",", missingIds); + throw new IOException(errMsg); + } + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/05e6e569/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java index aa15fba..650ba2e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java @@ -59,16 +59,15 @@ import org.apache.hadoop.hbase.backup.util.BackupUtils; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; - import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; /** * General backup commands, options and usage messages */ @InterfaceAudience.Private -public final class BackupCommands { +public final class BackupCommands { public final static String INCORRECT_USAGE = "Incorrect usage"; @@ -79,7 +78,8 @@ public final class BackupCommands { + " history show history of all successful backups\n" + " progress show the progress of the latest backup request\n" + " set backup set management\n" - + " repair repair backup system table" + + " repair repair backup system table\n" + + " merge merge backup images\n" + "Run \'hbase backup COMMAND -h\' to see help message for each command\n"; public static final String CREATE_CMD_USAGE = @@ -109,17 +109,20 @@ public final class BackupCommands { public static final String SET_CMD_USAGE = "Usage: hbase backup set COMMAND [name] [tables]\n" + " name Backup set name\n" - + " tables Comma separated list of tables.\n" - + "COMMAND is one of:\n" + " add add tables to a set, create a set if needed\n" + + " tables Comma separated list of tables.\n" + "COMMAND is one of:\n" + + " add add tables to a set, create a set if needed\n" + " remove remove tables from a set\n" + " list list all backup sets in the system\n" + " describe describe set\n" + " delete delete backup set\n"; + public static final String MERGE_CMD_USAGE = "Usage: hbase backup merge [backup_ids]\n" + + " backup_ids Comma separated list of backup image ids.\n"; public static final String USAGE_FOOTER = ""; public static abstract class Command extends Configured { CommandLine cmdline; Connection conn; + Command(Configuration conf) { if (conf == null) { conf = HBaseConfiguration.create(); @@ -140,7 +143,7 @@ public final class BackupCommands { try (BackupSystemTable table = new BackupSystemTable(conn);) { List<BackupInfo> sessions = table.getBackupInfos(BackupState.RUNNING); - if(sessions.size() > 0) { + if (sessions.size() > 0) { System.err.println("Found backup session in a RUNNING state: "); System.err.println(sessions.get(0)); System.err.println("This may indicate that a previous session has failed abnormally."); @@ -154,11 +157,19 @@ public final class BackupCommands { try (BackupSystemTable table = new BackupSystemTable(conn);) { String[] ids = table.getListOfBackupIdsFromDeleteOperation(); - if(ids !=null && ids.length > 0) { - System.err.println("Found failed backup delete coommand. "); + if (ids != null && ids.length > 0) { + System.err.println("Found failed backup DELETE coommand. "); System.err.println("Backup system recovery is required."); - throw new IOException("Failed backup delete found, aborted command execution"); + throw new IOException("Failed backup DELETE found, aborted command execution"); } + + ids = table.getListOfBackupIdsFromMergeOperation(); + if (ids != null && ids.length > 0) { + System.err.println("Found failed backup MERGE coommand. "); + System.err.println("Backup system recovery is required."); + throw new IOException("Failed backup MERGE found, aborted command execution"); + } + } } } @@ -178,10 +189,10 @@ public final class BackupCommands { protected boolean requiresNoActiveSession() { return false; } + /** - * Command requires consistent state of a backup system - * Backup system may become inconsistent because of an abnormal - * termination of a backup session or delete command + * Command requires consistent state of a backup system Backup system may become inconsistent + * because of an abnormal termination of a backup session or delete command * @return true, if yes */ protected boolean requiresConsistentState() { @@ -220,6 +231,9 @@ public final class BackupCommands { case REPAIR: cmd = new RepairCommand(conf, cmdline); break; + case MERGE: + cmd = new MergeCommand(conf, cmdline); + break; case HELP: default: cmd = new HelpCommand(conf, cmdline); @@ -257,7 +271,7 @@ public final class BackupCommands { throw new IOException(INCORRECT_USAGE); } String[] args = cmdline.getArgs(); - if (args.length !=3) { + if (args.length != 3) { printUsage(); throw new IOException(INCORRECT_USAGE); } @@ -274,7 +288,6 @@ public final class BackupCommands { throw new IOException(INCORRECT_USAGE); } - String tables = null; // Check if we have both: backup set and list of tables @@ -310,14 +323,14 @@ public final class BackupCommands { try (BackupAdminImpl admin = new BackupAdminImpl(conn);) { - BackupRequest.Builder builder = new BackupRequest.Builder(); - BackupRequest request = builder.withBackupType(BackupType.valueOf(args[1].toUpperCase())) - .withTableList(tables != null ? - Lists.newArrayList(BackupUtils.parseTableNames(tables)) : null) - .withTargetRootDir(args[2]) - .withTotalTasks(workers) - .withBandwidthPerTasks(bandwidth) - .withBackupSetName(setName).build(); + BackupRequest.Builder builder = new BackupRequest.Builder(); + BackupRequest request = + builder + .withBackupType(BackupType.valueOf(args[1].toUpperCase())) + .withTableList( + tables != null ? Lists.newArrayList(BackupUtils.parseTableNames(tables)) : null) + .withTargetRootDir(args[2]).withTotalTasks(workers) + .withBandwidthPerTasks(bandwidth).withBackupSetName(setName).build(); String backupId = admin.backupTables(request); System.out.println("Backup session " + backupId + " finished. Status: SUCCESS"); } catch (IOException e) { @@ -544,7 +557,8 @@ public final class BackupCommands { int deleted = admin.deleteBackups(backupIds); System.out.println("Deleted " + deleted + " backups. Total requested: " + args.length); } catch (IOException e) { - System.err.println("Delete command FAILED. Please run backup repair tool to restore backup system integrity"); + System.err + .println("Delete command FAILED. Please run backup repair tool to restore backup system integrity"); throw e; } @@ -584,8 +598,9 @@ public final class BackupCommands { if (list.size() == 0) { // No failed sessions found System.out.println("REPAIR status: no failed sessions found." - +" Checking failed delete backup operation ..."); + + " Checking failed delete backup operation ..."); repairFailedBackupDeletionIfAny(conn, sysTable); + repairFailedBackupMergeIfAny(conn, sysTable); return; } backupInfo = list.get(0); @@ -606,32 +621,55 @@ public final class BackupCommands { // If backup session is updated to FAILED state - means we // processed recovery already. sysTable.updateBackupInfo(backupInfo); - sysTable.finishBackupSession(); - System.out.println("REPAIR status: finished repair failed session:\n "+ backupInfo); + sysTable.finishBackupExclusiveOperation(); + System.out.println("REPAIR status: finished repair failed session:\n " + backupInfo); } } private void repairFailedBackupDeletionIfAny(Connection conn, BackupSystemTable sysTable) - throws IOException - { + throws IOException { String[] backupIds = sysTable.getListOfBackupIdsFromDeleteOperation(); - if (backupIds == null ||backupIds.length == 0) { - System.out.println("No failed backup delete operation found"); + if (backupIds == null || backupIds.length == 0) { + System.out.println("No failed backup DELETE operation found"); // Delete backup table snapshot if exists BackupSystemTable.deleteSnapshot(conn); return; } - System.out.println("Found failed delete operation for: " + StringUtils.join(backupIds)); - System.out.println("Running delete again ..."); + System.out.println("Found failed DELETE operation for: " + StringUtils.join(backupIds)); + System.out.println("Running DELETE again ..."); // Restore table from snapshot BackupSystemTable.restoreFromSnapshot(conn); // Finish previous failed session - sysTable.finishBackupSession(); - try(BackupAdmin admin = new BackupAdminImpl(conn);) { + sysTable.finishBackupExclusiveOperation(); + try (BackupAdmin admin = new BackupAdminImpl(conn);) { admin.deleteBackups(backupIds); } - System.out.println("Delete operation finished OK: "+ StringUtils.join(backupIds)); + System.out.println("DELETE operation finished OK: " + StringUtils.join(backupIds)); + + } + + private void repairFailedBackupMergeIfAny(Connection conn, BackupSystemTable sysTable) + throws IOException { + String[] backupIds = sysTable.getListOfBackupIdsFromMergeOperation(); + if (backupIds == null || backupIds.length == 0) { + System.out.println("No failed backup MERGE operation found"); + // Delete backup table snapshot if exists + BackupSystemTable.deleteSnapshot(conn); + return; + } + System.out.println("Found failed MERGE operation for: " + StringUtils.join(backupIds)); + System.out.println("Running MERGE again ..."); + // Restore table from snapshot + BackupSystemTable.restoreFromSnapshot(conn); + // Unlock backupo system + sysTable.finishBackupExclusiveOperation(); + // Finish previous failed session + sysTable.finishMergeOperation(); + try (BackupAdmin admin = new BackupAdminImpl(conn);) { + admin.mergeBackups(backupIds); + } + System.out.println("MERGE operation finished OK: " + StringUtils.join(backupIds)); } @@ -641,6 +679,56 @@ public final class BackupCommands { } } + private static class MergeCommand extends Command { + + MergeCommand(Configuration conf, CommandLine cmdline) { + super(conf); + this.cmdline = cmdline; + } + + @Override + protected boolean requiresNoActiveSession() { + return true; + } + + @Override + protected boolean requiresConsistentState() { + return true; + } + + @Override + public void execute() throws IOException { + super.execute(); + + String[] args = cmdline == null ? null : cmdline.getArgs(); + if (args == null || (args.length != 2)) { + System.err.println("ERROR: wrong number of arguments: " + + (args == null ? null : args.length)); + printUsage(); + throw new IOException(INCORRECT_USAGE); + } + + String[] backupIds = args[1].split(","); + if (backupIds.length < 2) { + String msg = "ERROR: can not merge a single backup image. "+ + "Number of images must be greater than 1."; + System.err.println(msg); + throw new IOException(msg); + + } + Configuration conf = getConf() != null ? getConf() : HBaseConfiguration.create(); + try (final Connection conn = ConnectionFactory.createConnection(conf); + final BackupAdminImpl admin = new BackupAdminImpl(conn);) { + admin.mergeBackups(backupIds); + } + } + + @Override + protected void printUsage() { + System.out.println(MERGE_CMD_USAGE); + } + } + // TODO Cancel command private static class CancelCommand extends Command { @@ -672,7 +760,6 @@ public final class BackupCommands { @Override public void execute() throws IOException { - int n = parseHistoryLength(); final TableName tableName = getTableName(); final String setName = getTableSetName(); @@ -883,7 +970,7 @@ public final class BackupCommands { private TableName[] toTableNames(String[] tables) { TableName[] arr = new TableName[tables.length]; - for (int i=0; i < tables.length; i++) { + for (int i = 0; i < tables.length; i++) { arr[i] = TableName.valueOf(tables[i]); } return arr; http://git-wip-us.apache.org/repos/asf/hbase/blob/05e6e569/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java index bf80506..8fe5eaf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java @@ -115,8 +115,8 @@ public class BackupManager implements Closeable { } if (LOG.isDebugEnabled()) { - LOG.debug("Added log cleaner: " + cleanerClass +"\n" + - "Added master procedure manager: " + masterProcedureClass); + LOG.debug("Added log cleaner: " + cleanerClass + "\n" + "Added master procedure manager: " + + masterProcedureClass); } } @@ -185,9 +185,8 @@ public class BackupManager implements Closeable { * @return BackupInfo * @throws BackupException exception */ - public BackupInfo createBackupInfo(String backupId, BackupType type, - List<TableName> tableList, String targetRootDir, int workers, long bandwidth) - throws BackupException { + public BackupInfo createBackupInfo(String backupId, BackupType type, List<TableName> tableList, + String targetRootDir, int workers, long bandwidth) throws BackupException { if (targetRootDir == null) { throw new BackupException("Wrong backup request parameter: target backup root directory"); } @@ -313,7 +312,7 @@ public class BackupManager implements Closeable { } } else { Path logBackupPath = - HBackupFileSystem.getLogBackupPath(backup.getBackupRootDir(), backup.getBackupId()); + HBackupFileSystem.getBackupPath(backup.getBackupRootDir(), backup.getBackupId()); LOG.debug("Current backup has an incremental backup ancestor, " + "touching its image manifest in " + logBackupPath.toString() + " to construct the dependency."); @@ -371,7 +370,7 @@ public class BackupManager implements Closeable { * @throws IOException if active session already exists */ public void startBackupSession() throws IOException { - systemTable.startBackupSession(); + systemTable.startBackupExclusiveOperation(); } /** @@ -379,10 +378,9 @@ public class BackupManager implements Closeable { * @throws IOException if no active session */ public void finishBackupSession() throws IOException { - systemTable.finishBackupSession(); + systemTable.finishBackupExclusiveOperation(); } - /** * Read the last backup start code (timestamp) of last successful backup. Will return null if * there is no startcode stored in backup system table or the value is of length 0. These two @@ -413,7 +411,7 @@ public class BackupManager implements Closeable { } public Pair<Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>>, List<byte[]>> - readBulkloadRows(List<TableName> tableList) throws IOException { + readBulkloadRows(List<TableName> tableList) throws IOException { return systemTable.readBulkloadRows(tableList); } @@ -448,8 +446,7 @@ public class BackupManager implements Closeable { */ public void writeRegionServerLogTimestamp(Set<TableName> tables, HashMap<String, Long> newTimestamps) throws IOException { - systemTable.writeRegionServerLogTimestamp(tables, newTimestamps, - backupInfo.getBackupRootDir()); + systemTable.writeRegionServerLogTimestamp(tables, newTimestamps, backupInfo.getBackupRootDir()); } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/05e6e569/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManifest.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManifest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManifest.java index b8adac9..7e3201e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManifest.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManifest.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.backup.BackupInfo; import org.apache.hadoop.hbase.backup.BackupType; +import org.apache.hadoop.hbase.backup.HBackupFileSystem; import org.apache.hadoop.hbase.backup.util.BackupUtils; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; @@ -61,9 +62,8 @@ public class BackupManifest { public static final String MANIFEST_FILE_NAME = ".backup.manifest"; /** - * Backup image, the dependency graph is made up by series of backup images - * BackupImage contains all the relevant information to restore the backup and - * is used during restore operation + * Backup image, the dependency graph is made up by series of backup images BackupImage contains + * all the relevant information to restore the backup and is used during restore operation */ public static class BackupImage implements Comparable<BackupImage> { @@ -294,6 +294,16 @@ public class BackupManifest { return this.ancestors; } + public void removeAncestors(List<String> backupIds) { + List<BackupImage> toRemove = new ArrayList<BackupImage>(); + for (BackupImage im : this.ancestors) { + if (backupIds.contains(im.getBackupId())) { + toRemove.add(im); + } + } + this.ancestors.removeAll(toRemove); + } + private void addAncestor(BackupImage backupImage) { this.getAncestors().add(backupImage); } @@ -464,18 +474,16 @@ public class BackupManifest { } /** - * Persist the manifest file. + * TODO: fix it. Persist the manifest file. * @throws IOException IOException when storing the manifest file. */ public void store(Configuration conf) throws BackupException { byte[] data = backupImage.toProto().toByteArray(); // write the file, overwrite if already exist - String logBackupDir = - BackupUtils.getLogBackupDir(backupImage.getRootDir(), backupImage.getBackupId()); Path manifestFilePath = - new Path(new Path((tableBackupDir != null ? tableBackupDir : logBackupDir)), - MANIFEST_FILE_NAME); + new Path(HBackupFileSystem.getBackupPath(backupImage.getRootDir(), + backupImage.getBackupId()), MANIFEST_FILE_NAME); try (FSDataOutputStream out = manifestFilePath.getFileSystem(conf).create(manifestFilePath, true);) { out.write(data); http://git-wip-us.apache.org/repos/asf/hbase/blob/05e6e569/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java index e5a3daa..4dab046 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java @@ -82,6 +82,7 @@ import org.apache.hadoop.hbase.util.Pair; * value = backupId and full WAL file name</li> * </ul></p> */ + @InterfaceAudience.Private public final class BackupSystemTable implements Closeable { private static final Log LOG = LogFactory.getLog(BackupSystemTable.class); @@ -118,7 +119,7 @@ public final class BackupSystemTable implements Closeable { private TableName tableName; /** - * Stores backup sessions (contexts) + * Stores backup sessions (contexts) */ final static byte[] SESSIONS_FAMILY = "session".getBytes(); /** @@ -127,11 +128,10 @@ public final class BackupSystemTable implements Closeable { final static byte[] META_FAMILY = "meta".getBytes(); final static byte[] BULK_LOAD_FAMILY = "bulk".getBytes(); /** - * Connection to HBase cluster, shared among all instances + * Connection to HBase cluster, shared among all instances */ private final Connection connection; - private final static String BACKUP_INFO_PREFIX = "session:"; private final static String START_CODE_ROW = "startcode:"; private final static byte[] ACTIVE_SESSION_ROW = "activesession:".getBytes(); @@ -147,6 +147,7 @@ public final class BackupSystemTable implements Closeable { private final static String BULK_LOAD_PREFIX = "bulk:"; private final static byte[] BULK_LOAD_PREFIX_BYTES = BULK_LOAD_PREFIX.getBytes(); private final static byte[] DELETE_OP_ROW = "delete_op_row".getBytes(); + private final static byte[] MERGE_OP_ROW = "merge_op_row".getBytes(); final static byte[] TBL_COL = Bytes.toBytes("tbl"); final static byte[] FAM_COL = Bytes.toBytes("fam"); @@ -160,7 +161,7 @@ public final class BackupSystemTable implements Closeable { private final static String SET_KEY_PREFIX = "backupset:"; // separator between BULK_LOAD_PREFIX and ordinals - protected final static String BLK_LD_DELIM = ":"; + protected final static String BLK_LD_DELIM = ":"; private final static byte[] EMPTY_VALUE = new byte[] {}; // Safe delimiter in a string @@ -187,19 +188,19 @@ public final class BackupSystemTable implements Closeable { } private void verifyNamespaceExists(Admin admin) throws IOException { - String namespaceName = tableName.getNamespaceAsString(); - NamespaceDescriptor ns = NamespaceDescriptor.create(namespaceName).build(); - NamespaceDescriptor[] list = admin.listNamespaceDescriptors(); - boolean exists = false; - for( NamespaceDescriptor nsd: list) { - if (nsd.getName().equals(ns.getName())) { - exists = true; - break; - } - } - if (!exists) { - admin.createNamespace(ns); + String namespaceName = tableName.getNamespaceAsString(); + NamespaceDescriptor ns = NamespaceDescriptor.create(namespaceName).build(); + NamespaceDescriptor[] list = admin.listNamespaceDescriptors(); + boolean exists = false; + for (NamespaceDescriptor nsd : list) { + if (nsd.getName().equals(ns.getName())) { + exists = true; + break; } + } + if (!exists) { + admin.createNamespace(ns); + } } private void waitForSystemTable(Admin admin) throws IOException { @@ -211,15 +212,13 @@ public final class BackupSystemTable implements Closeable { } catch (InterruptedException e) { } if (EnvironmentEdgeManager.currentTime() - startTime > TIMEOUT) { - throw new IOException("Failed to create backup system table after "+ TIMEOUT+"ms"); + throw new IOException("Failed to create backup system table after " + TIMEOUT + "ms"); } } LOG.debug("Backup table exists and available"); } - - @Override public void close() { // do nothing @@ -257,7 +256,7 @@ public final class BackupSystemTable implements Closeable { byte[] row = CellUtil.cloneRow(res.listCells().get(0)); for (Cell cell : res.listCells()) { if (CellComparator.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0, - BackupSystemTable.PATH_COL.length) == 0) { + BackupSystemTable.PATH_COL.length) == 0) { map.put(row, Bytes.toString(CellUtil.cloneValue(cell))); } } @@ -286,13 +285,13 @@ public final class BackupSystemTable implements Closeable { String path = null; for (Cell cell : res.listCells()) { if (CellComparator.compareQualifiers(cell, BackupSystemTable.TBL_COL, 0, - BackupSystemTable.TBL_COL.length) == 0) { + BackupSystemTable.TBL_COL.length) == 0) { tbl = TableName.valueOf(CellUtil.cloneValue(cell)); } else if (CellComparator.compareQualifiers(cell, BackupSystemTable.FAM_COL, 0, - BackupSystemTable.FAM_COL.length) == 0) { + BackupSystemTable.FAM_COL.length) == 0) { fam = CellUtil.cloneValue(cell); } else if (CellComparator.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0, - BackupSystemTable.PATH_COL.length) == 0) { + BackupSystemTable.PATH_COL.length) == 0) { path = Bytes.toString(CellUtil.cloneValue(cell)); } } @@ -313,9 +312,10 @@ public final class BackupSystemTable implements Closeable { } files.add(new Path(path)); if (LOG.isDebugEnabled()) { - LOG.debug("found bulk loaded file : " + tbl + " " + Bytes.toString(fam) + " " + path); + LOG.debug("found bulk loaded file : " + tbl + " " + Bytes.toString(fam) + " " + path); } - }; + } + ; return mapForSrc; } } @@ -359,16 +359,16 @@ public final class BackupSystemTable implements Closeable { public void writePathsPostBulkLoad(TableName tabName, byte[] region, Map<byte[], List<Path>> finalPaths) throws IOException { if (LOG.isDebugEnabled()) { - LOG.debug("write bulk load descriptor to backup " + tabName + " with " + - finalPaths.size() + " entries"); + LOG.debug("write bulk load descriptor to backup " + tabName + " with " + finalPaths.size() + + " entries"); } try (Table table = connection.getTable(tableName)) { - List<Put> puts = BackupSystemTable.createPutForCommittedBulkload(tabName, region, - finalPaths); + List<Put> puts = BackupSystemTable.createPutForCommittedBulkload(tabName, region, finalPaths); table.put(puts); LOG.debug("written " + puts.size() + " rows for bulk load of " + tabName); } } + /* * For preCommitStoreFile() hook * @param tabName table name @@ -376,15 +376,15 @@ public final class BackupSystemTable implements Closeable { * @param family column family * @param pairs list of paths for hfiles */ - public void writeFilesForBulkLoadPreCommit(TableName tabName, byte[] region, - final byte[] family, final List<Pair<Path, Path>> pairs) throws IOException { + public void writeFilesForBulkLoadPreCommit(TableName tabName, byte[] region, final byte[] family, + final List<Pair<Path, Path>> pairs) throws IOException { if (LOG.isDebugEnabled()) { - LOG.debug("write bulk load descriptor to backup " + tabName + " with " + - pairs.size() + " entries"); + LOG.debug("write bulk load descriptor to backup " + tabName + " with " + pairs.size() + + " entries"); } try (Table table = connection.getTable(tableName)) { - List<Put> puts = BackupSystemTable.createPutForPreparedBulkload(tabName, region, - family, pairs); + List<Put> puts = + BackupSystemTable.createPutForPreparedBulkload(tabName, region, family, pairs); table.put(puts); LOG.debug("written " + puts.size() + " rows for bulk load of " + tabName); } @@ -411,11 +411,11 @@ public final class BackupSystemTable implements Closeable { /* * Reads the rows from backup table recording bulk loaded hfiles * @param tableList list of table names - * @return The keys of the Map are table, region and column family. - * Value of the map reflects whether the hfile was recorded by preCommitStoreFile hook (true) + * @return The keys of the Map are table, region and column family. Value of the map reflects + * whether the hfile was recorded by preCommitStoreFile hook (true) */ public Pair<Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>>, List<byte[]>> - readBulkloadRows(List<TableName> tableList) throws IOException { + readBulkloadRows(List<TableName> tableList) throws IOException { Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>> map = new HashMap<>(); List<byte[]> rows = new ArrayList<>(); for (TableName tTable : tableList) { @@ -437,13 +437,13 @@ public final class BackupSystemTable implements Closeable { String rowStr = Bytes.toString(row); region = BackupSystemTable.getRegionNameFromOrigBulkLoadRow(rowStr); if (CellComparator.compareQualifiers(cell, BackupSystemTable.FAM_COL, 0, - BackupSystemTable.FAM_COL.length) == 0) { + BackupSystemTable.FAM_COL.length) == 0) { fam = Bytes.toString(CellUtil.cloneValue(cell)); } else if (CellComparator.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0, - BackupSystemTable.PATH_COL.length) == 0) { + BackupSystemTable.PATH_COL.length) == 0) { path = Bytes.toString(CellUtil.cloneValue(cell)); } else if (CellComparator.compareQualifiers(cell, BackupSystemTable.STATE_COL, 0, - BackupSystemTable.STATE_COL.length) == 0) { + BackupSystemTable.STATE_COL.length) == 0) { byte[] state = CellUtil.cloneValue(cell); if (Bytes.equals(BackupSystemTable.BL_PREPARE, state)) { raw = true; @@ -484,12 +484,13 @@ public final class BackupSystemTable implements Closeable { Map<byte[], List<Path>> map = maps[idx]; TableName tn = sTableList.get(idx); if (map == null) continue; - for (Map.Entry<byte[], List<Path>> entry: map.entrySet()) { + for (Map.Entry<byte[], List<Path>> entry : map.entrySet()) { byte[] fam = entry.getKey(); List<Path> paths = entry.getValue(); for (Path p : paths) { - Put put = BackupSystemTable.createPutForBulkLoadedFile(tn, fam, p.toString(), - backupId, ts, cnt++); + Put put = + BackupSystemTable.createPutForBulkLoadedFile(tn, fam, p.toString(), backupId, ts, + cnt++); puts.add(put); } } @@ -564,18 +565,23 @@ public final class BackupSystemTable implements Closeable { } } - public void startBackupSession() throws IOException { - if (LOG.isTraceEnabled()) { - LOG.trace("Start new backup session"); + /** + * Exclusive operations are: + * create, delete, merge + * @throws IOException + */ + public void startBackupExclusiveOperation() throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Start new backup exclusive operation"); } try (Table table = connection.getTable(tableName)) { Put put = createPutForStartBackupSession(); - //First try to put if row does not exist + // First try to put if row does not exist if (!table.checkAndPut(ACTIVE_SESSION_ROW, SESSIONS_FAMILY, ACTIVE_SESSION_COL, null, put)) { // Row exists, try to put if value == ACTIVE_SESSION_NO if (!table.checkAndPut(ACTIVE_SESSION_ROW, SESSIONS_FAMILY, ACTIVE_SESSION_COL, ACTIVE_SESSION_NO, put)) { - throw new IOException("There is an active backup session"); + throw new IOException("There is an active backup exclusive operation"); } } } @@ -587,17 +593,15 @@ public final class BackupSystemTable implements Closeable { return put; } - public void finishBackupSession() throws IOException - { - if (LOG.isTraceEnabled()) { - LOG.trace("Stop backup session"); + public void finishBackupExclusiveOperation() throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Finish backup exclusive operation"); } try (Table table = connection.getTable(tableName)) { Put put = createPutForStopBackupSession(); - if(!table.checkAndPut(ACTIVE_SESSION_ROW, SESSIONS_FAMILY, ACTIVE_SESSION_COL, - ACTIVE_SESSION_YES, put)) - { - throw new IOException("There is no active backup session"); + if (!table.checkAndPut(ACTIVE_SESSION_ROW, SESSIONS_FAMILY, ACTIVE_SESSION_COL, + ACTIVE_SESSION_YES, put)) { + throw new IOException("There is no active backup exclusive operation"); } } } @@ -630,8 +634,7 @@ public final class BackupSystemTable implements Closeable { res.advance(); Cell cell = res.current(); byte[] row = CellUtil.cloneRow(cell); - String server = - getServerNameForReadRegionServerLastLogRollResult(row); + String server = getServerNameForReadRegionServerLastLogRollResult(row); byte[] data = CellUtil.cloneValue(cell); rsTimestampMap.put(server, Bytes.toLong(data)); } @@ -652,8 +655,7 @@ public final class BackupSystemTable implements Closeable { LOG.trace("write region server last roll log result to backup system table"); } try (Table table = connection.getTable(tableName)) { - Put put = - createPutForRegionServerLastLogRollResult(server, ts, backupRoot); + Put put = createPutForRegionServerLastLogRollResult(server, ts, backupRoot); table.put(put); } } @@ -685,14 +687,15 @@ public final class BackupSystemTable implements Closeable { /** * Get first n backup history records - * @param n number of records + * @param n number of records, if n== -1 - max number + * is ignored * @return list of records * @throws IOException */ public List<BackupInfo> getHistory(int n) throws IOException { List<BackupInfo> history = getBackupHistory(); - if (history.size() <= n) return history; + if (n == -1 || history.size() <= n) return history; List<BackupInfo> list = new ArrayList<BackupInfo>(); for (int i = 0; i < n; i++) { list.add(history.get(i)); @@ -703,7 +706,8 @@ public final class BackupSystemTable implements Closeable { /** * Get backup history records filtered by list of filters. - * @param n max number of records + * @param n max number of records, if n == -1 , then max number + * is ignored * @param filters list of filters * @return backup records * @throws IOException @@ -714,7 +718,7 @@ public final class BackupSystemTable implements Closeable { List<BackupInfo> history = getBackupHistory(); List<BackupInfo> result = new ArrayList<BackupInfo>(); for (BackupInfo bi : history) { - if (result.size() == n) break; + if (n >= 0 && result.size() == n) break; boolean passed = true; for (int i = 0; i < filters.length; i++) { if (!filters[i].apply(bi)) { @@ -852,9 +856,7 @@ public final class BackupSystemTable implements Closeable { List<Put> puts = new ArrayList<Put>(); for (TableName table : tables) { byte[] smapData = toTableServerTimestampProto(table, newTimestamps).toByteArray(); - Put put = - createPutForWriteRegionServerLogTimestamp(table, smapData, - backupRoot); + Put put = createPutForWriteRegionServerLogTimestamp(table, smapData, backupRoot); puts.add(put); } try (Table table = connection.getTable(tableName)) { @@ -1018,8 +1020,7 @@ public final class BackupSystemTable implements Closeable { } } try (Table table = connection.getTable(tableName)) { - List<Put> puts = - createPutsForAddWALFiles(files, backupId, backupRoot); + List<Put> puts = createPutsForAddWALFiles(files, backupId, backupRoot); table.put(puts); } } @@ -1087,6 +1088,7 @@ public final class BackupSystemTable implements Closeable { * @param file name of a file to check * @return true, if deletable, false otherwise. * @throws IOException exception + * TODO: multiple backup destination support */ public boolean isWALFileDeletable(String file) throws IOException { if (LOG.isTraceEnabled()) { @@ -1271,12 +1273,12 @@ public final class BackupSystemTable implements Closeable { if (disjoint.length > 0 && disjoint.length != tables.length) { Put put = createPutForBackupSet(name, disjoint); table.put(put); - } else if(disjoint.length == tables.length) { + } else if (disjoint.length == tables.length) { LOG.warn("Backup set '" + name + "' does not contain tables [" + StringUtils.join(toRemove, " ") + "]"); } else { // disjoint.length == 0 and tables.length >0 - // Delete backup set - LOG.info("Backup set '"+name+"' is empty. Deleting."); + // Delete backup set + LOG.info("Backup set '" + name + "' is empty. Deleting."); deleteBackupSet(name); } } finally { @@ -1356,7 +1358,7 @@ public final class BackupSystemTable implements Closeable { } public static String getSnapshotName(Configuration conf) { - return "snapshot_"+getTableNameAsString(conf).replace(":", "_"); + return "snapshot_" + getTableNameAsString(conf).replace(":", "_"); } /** @@ -1589,17 +1591,16 @@ public final class BackupSystemTable implements Closeable { for (Path path : entry.getValue()) { String file = path.toString(); int lastSlash = file.lastIndexOf("/"); - String filename = file.substring(lastSlash+1); - Put put = new Put(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM, - Bytes.toString(region), BLK_LD_DELIM, filename)); + String filename = file.substring(lastSlash + 1); + Put put = + new Put(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM, + Bytes.toString(region), BLK_LD_DELIM, filename)); put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, table.getName()); put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, entry.getKey()); - put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, - file.getBytes()); + put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, file.getBytes()); put.addColumn(BackupSystemTable.META_FAMILY, STATE_COL, BL_COMMIT); puts.add(put); - LOG.debug("writing done bulk path " + file + " for " + table + " " + - Bytes.toString(region)); + LOG.debug("writing done bulk path " + file + " for " + table + " " + Bytes.toString(region)); } } return puts; @@ -1607,19 +1608,16 @@ public final class BackupSystemTable implements Closeable { public static void snapshot(Connection conn) throws IOException { - try (Admin admin = conn.getAdmin();){ + try (Admin admin = conn.getAdmin();) { Configuration conf = conn.getConfiguration(); - admin.snapshot(BackupSystemTable.getSnapshotName(conf), - BackupSystemTable.getTableName(conf)); + admin.snapshot(BackupSystemTable.getSnapshotName(conf), BackupSystemTable.getTableName(conf)); } } - public static void restoreFromSnapshot(Connection conn) - throws IOException { + public static void restoreFromSnapshot(Connection conn) throws IOException { Configuration conf = conn.getConfiguration(); - LOG.debug("Restoring " + BackupSystemTable.getTableNameAsString(conf) + - " from snapshot"); + LOG.debug("Restoring " + BackupSystemTable.getTableNameAsString(conf) + " from snapshot"); try (Admin admin = conn.getAdmin();) { String snapshotName = BackupSystemTable.getSnapshotName(conf); if (snapshotExists(admin, snapshotName)) { @@ -1631,8 +1629,8 @@ public final class BackupSystemTable implements Closeable { // Snapshot does not exists, i.e completeBackup failed after // deleting backup system table snapshot // In this case we log WARN and proceed - LOG.warn("Could not restore backup system table. Snapshot " + snapshotName+ - " does not exists."); + LOG.warn("Could not restore backup system table. Snapshot " + snapshotName + + " does not exists."); } } } @@ -1640,7 +1638,7 @@ public final class BackupSystemTable implements Closeable { protected static boolean snapshotExists(Admin admin, String snapshotName) throws IOException { List<SnapshotDescription> list = admin.listSnapshots(); - for (SnapshotDescription desc: list) { + for (SnapshotDescription desc : list) { if (desc.getName().equals(snapshotName)) { return true; } @@ -1648,26 +1646,25 @@ public final class BackupSystemTable implements Closeable { return false; } - public static boolean snapshotExists (Connection conn) throws IOException { + public static boolean snapshotExists(Connection conn) throws IOException { return snapshotExists(conn.getAdmin(), getSnapshotName(conn.getConfiguration())); } - public static void deleteSnapshot(Connection conn) - throws IOException { + public static void deleteSnapshot(Connection conn) throws IOException { Configuration conf = conn.getConfiguration(); - LOG.debug("Deleting " + BackupSystemTable.getSnapshotName(conf) + - " from the system"); + LOG.debug("Deleting " + BackupSystemTable.getSnapshotName(conf) + " from the system"); try (Admin admin = conn.getAdmin();) { String snapshotName = BackupSystemTable.getSnapshotName(conf); if (snapshotExists(admin, snapshotName)) { admin.deleteSnapshot(snapshotName); LOG.debug("Done deleting backup system table snapshot"); } else { - LOG.error("Snapshot "+snapshotName+" does not exists"); + LOG.error("Snapshot " + snapshotName + " does not exists"); } } } + /* * Creates Put's for bulk load resulting from running LoadIncrementalHFiles */ @@ -1678,17 +1675,16 @@ public final class BackupSystemTable implements Closeable { Path path = pair.getSecond(); String file = path.toString(); int lastSlash = file.lastIndexOf("/"); - String filename = file.substring(lastSlash+1); - Put put = new Put(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM, - Bytes.toString(region), BLK_LD_DELIM, filename)); + String filename = file.substring(lastSlash + 1); + Put put = + new Put(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM, Bytes.toString(region), + BLK_LD_DELIM, filename)); put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, table.getName()); put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, family); - put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, - file.getBytes()); + put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, file.getBytes()); put.addColumn(BackupSystemTable.META_FAMILY, STATE_COL, BL_PREPARE); puts.add(put); - LOG.debug("writing raw bulk path " + file + " for " + table + " " + - Bytes.toString(region)); + LOG.debug("writing raw bulk path " + file + " for " + table + " " + Bytes.toString(region)); } return puts; } @@ -1725,7 +1721,6 @@ public final class BackupSystemTable implements Closeable { return get; } - public void startDeleteOperation(String[] backupIdList) throws IOException { if (LOG.isTraceEnabled()) { LOG.trace("Start delete operation for backups: " + StringUtils.join(backupIdList)); @@ -1765,6 +1760,96 @@ public final class BackupSystemTable implements Closeable { } } + private Put createPutForMergeOperation(String[] backupIdList) { + + byte[] value = Bytes.toBytes(StringUtils.join(backupIdList, ",")); + Put put = new Put(MERGE_OP_ROW); + put.addColumn(META_FAMILY, FAM_COL, value); + return put; + } + + public boolean isMergeInProgress() throws IOException { + Get get = new Get(MERGE_OP_ROW); + try (Table table = connection.getTable(tableName)) { + Result res = table.get(get); + if (res.isEmpty()) { + return false; + } + return true; + } + } + + private Put createPutForUpdateTablesForMerge(List<TableName> tables) { + + byte[] value = Bytes.toBytes(StringUtils.join(tables, ",")); + Put put = new Put(MERGE_OP_ROW); + put.addColumn(META_FAMILY, PATH_COL, value); + return put; + } + + private Delete createDeleteForBackupMergeOperation() { + + Delete delete = new Delete(MERGE_OP_ROW); + delete.addFamily(META_FAMILY); + return delete; + } + + private Get createGetForMergeOperation() { + + Get get = new Get(MERGE_OP_ROW); + get.addFamily(META_FAMILY); + return get; + } + + public void startMergeOperation(String[] backupIdList) throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace("Start merge operation for backups: " + StringUtils.join(backupIdList)); + } + Put put = createPutForMergeOperation(backupIdList); + try (Table table = connection.getTable(tableName)) { + table.put(put); + } + } + + public void updateProcessedTablesForMerge(List<TableName> tables) throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace("Update tables for merge : " + StringUtils.join(tables, ",")); + } + Put put = createPutForUpdateTablesForMerge(tables); + try (Table table = connection.getTable(tableName)) { + table.put(put); + } + } + + public void finishMergeOperation() throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace("Finsih merge operation for backup ids "); + } + Delete delete = createDeleteForBackupMergeOperation(); + try (Table table = connection.getTable(tableName)) { + table.delete(delete); + } + } + + public String[] getListOfBackupIdsFromMergeOperation() throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace("Get backup ids for merge operation"); + } + Get get = createGetForMergeOperation(); + try (Table table = connection.getTable(tableName)) { + Result res = table.get(get); + if (res.isEmpty()) { + return null; + } + Cell cell = res.listCells().get(0); + byte[] val = CellUtil.cloneValue(cell); + if (val.length == 0) { + return null; + } + return new String(val).split(","); + } + } + static Scan createScanForOrigBulkLoadedFiles(TableName table) throws IOException { Scan scan = new Scan(); byte[] startRow = rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM); @@ -1776,10 +1861,12 @@ public final class BackupSystemTable implements Closeable { scan.setMaxVersions(1); return scan; } + static String getTableNameFromOrigBulkLoadRow(String rowStr) { String[] parts = rowStr.split(BLK_LD_DELIM); return parts[1]; } + static String getRegionNameFromOrigBulkLoadRow(String rowStr) { // format is bulk : namespace : table : region : file String[] parts = rowStr.split(BLK_LD_DELIM); @@ -1791,6 +1878,7 @@ public final class BackupSystemTable implements Closeable { LOG.debug("bulk row string " + rowStr + " region " + parts[idx]); return parts[idx]; } + /* * Used to query bulk loaded hfiles which have been copied by incremental backup * @param backupId the backup Id. It can be null when querying for all tables @@ -1798,13 +1886,14 @@ public final class BackupSystemTable implements Closeable { */ static Scan createScanForBulkLoadedFiles(String backupId) throws IOException { Scan scan = new Scan(); - byte[] startRow = backupId == null ? BULK_LOAD_PREFIX_BYTES : - rowkey(BULK_LOAD_PREFIX, backupId+BLK_LD_DELIM); + byte[] startRow = + backupId == null ? BULK_LOAD_PREFIX_BYTES : rowkey(BULK_LOAD_PREFIX, backupId + + BLK_LD_DELIM); byte[] stopRow = Arrays.copyOf(startRow, startRow.length); stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1); scan.setStartRow(startRow); scan.setStopRow(stopRow); - //scan.setTimeRange(lower, Long.MAX_VALUE); + // scan.setTimeRange(lower, Long.MAX_VALUE); scan.addFamily(BackupSystemTable.META_FAMILY); scan.setMaxVersions(1); return scan; @@ -1812,12 +1901,13 @@ public final class BackupSystemTable implements Closeable { static Put createPutForBulkLoadedFile(TableName tn, byte[] fam, String p, String backupId, long ts, int idx) { - Put put = new Put(rowkey(BULK_LOAD_PREFIX, backupId+BLK_LD_DELIM+ts+BLK_LD_DELIM+idx)); + Put put = new Put(rowkey(BULK_LOAD_PREFIX, backupId + BLK_LD_DELIM + ts + BLK_LD_DELIM + idx)); put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, tn.getName()); put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, fam); put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, p.getBytes()); return put; } + /** * Creates put list for list of WAL files * @param files list of WAL file paths @@ -1825,8 +1915,9 @@ public final class BackupSystemTable implements Closeable { * @return put list * @throws IOException exception */ - private List<Put> createPutsForAddWALFiles(List<String> files, String backupId, - String backupRoot) throws IOException { + private List<Put> + createPutsForAddWALFiles(List<String> files, String backupId, String backupRoot) + throws IOException { List<Put> puts = new ArrayList<Put>(); for (String file : files) { @@ -1957,5 +2048,4 @@ public final class BackupSystemTable implements Closeable { return sb.toString().getBytes(); } - } http://git-wip-us.apache.org/repos/asf/hbase/blob/05e6e569/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java index 381e9b1..ea7a7b8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java @@ -39,7 +39,7 @@ import org.apache.hadoop.hbase.backup.BackupType; import org.apache.hadoop.hbase.backup.HBackupFileSystem; import org.apache.hadoop.hbase.backup.RestoreRequest; import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage; -import org.apache.hadoop.hbase.backup.mapreduce.MapReduceRestoreJob; +import org.apache.hadoop.hbase.backup.util.BackupUtils; import org.apache.hadoop.hbase.backup.util.RestoreTool; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Admin; @@ -58,7 +58,6 @@ public class RestoreTablesClient { private Configuration conf; private Connection conn; private String backupId; - private String fullBackupId; private TableName[] sTableArray; private TableName[] tTableArray; private String targetRootDir; @@ -107,8 +106,7 @@ public class RestoreTablesClient { if (existTableList.size() > 0) { if (!isOverwrite) { - LOG.error("Existing table (" - + existTableList + LOG.error("Existing table (" + existTableList + ") found in the restore target, please add " + "\"-overwrite\" option in the command if you mean" + " to restore to these existing tables"); @@ -148,9 +146,8 @@ public class RestoreTablesClient { Path tableBackupPath = HBackupFileSystem.getTableBackupPath(sTable, backupRoot, backupId); String lastIncrBackupId = images.length == 1 ? null : images[images.length - 1].getBackupId(); // We need hFS only for full restore (see the code) - BackupManifest manifest = HBackupFileSystem.getManifest(sTable, conf, backupRoot, backupId); + BackupManifest manifest = HBackupFileSystem.getManifest(conf, backupRoot, backupId); if (manifest.getType() == BackupType.FULL) { - fullBackupId = manifest.getBackupImage().getBackupId(); LOG.info("Restoring '" + sTable + "' to '" + tTable + "' from full" + " backup image " + tableBackupPath.toString()); restoreTool.fullRestoreTable(conn, tableBackupPath, sTable, tTable, truncateIfExists, @@ -169,8 +166,8 @@ public class RestoreTablesClient { // full backup path comes first for (int i = 1; i < images.length; i++) { BackupImage im = images[i]; - String fileBackupDir = HBackupFileSystem.getTableBackupDir(im.getRootDir(), - im.getBackupId(), sTable)+ Path.SEPARATOR+"data"; + String fileBackupDir = + HBackupFileSystem.getTableBackupDataDir(im.getRootDir(), im.getBackupId(), sTable); dirList.add(new Path(fileBackupDir)); } @@ -196,8 +193,10 @@ public class RestoreTablesClient { TreeSet<BackupImage> restoreImageSet = new TreeSet<BackupImage>(); boolean truncateIfExists = isOverwrite; Set<String> backupIdSet = new HashSet<>(); + for (int i = 0; i < sTableArray.length; i++) { TableName table = sTableArray[i]; + BackupManifest manifest = backupManifestMap.get(table); // Get the image list of this backup for restore in time order from old // to new. @@ -213,11 +212,8 @@ public class RestoreTablesClient { if (restoreImageSet != null && !restoreImageSet.isEmpty()) { LOG.info("Restore includes the following image(s):"); for (BackupImage image : restoreImageSet) { - LOG.info("Backup: " - + image.getBackupId() - + " " - + HBackupFileSystem.getTableBackupDir(image.getRootDir(), image.getBackupId(), - table)); + LOG.info("Backup: " + image.getBackupId() + " " + + HBackupFileSystem.getTableBackupDir(image.getRootDir(), image.getBackupId(), table)); if (image.getType() == BackupType.INCREMENTAL) { backupIdSet.add(image.getBackupId()); LOG.debug("adding " + image.getBackupId() + " for bulk load"); @@ -232,13 +228,13 @@ public class RestoreTablesClient { Map<byte[], List<Path>>[] mapForSrc = table.readBulkLoadedFiles(id, sTableList); Map<LoadQueueItem, ByteBuffer> loaderResult; conf.setBoolean(LoadIncrementalHFiles.ALWAYS_COPY_FILES, true); - LoadIncrementalHFiles loader = MapReduceRestoreJob.createLoader(conf); + LoadIncrementalHFiles loader = BackupUtils.createLoader(conf); for (int i = 0; i < sTableList.size(); i++) { if (mapForSrc[i] != null && !mapForSrc[i].isEmpty()) { loaderResult = loader.run(null, mapForSrc[i], tTableArray[i]); LOG.debug("bulk loading " + sTableList.get(i) + " to " + tTableArray[i]); if (loaderResult.isEmpty()) { - String msg = "Couldn't bulk load for " + sTableList.get(i) + " to " +tTableArray[i]; + String msg = "Couldn't bulk load for " + sTableList.get(i) + " to " + tTableArray[i]; LOG.error(msg); throw new IOException(msg); } @@ -253,7 +249,7 @@ public class RestoreTablesClient { if (backupId == null) { return 0; } - return Long.parseLong(backupId.substring(backupId.lastIndexOf("_")+1)); + return Long.parseLong(backupId.substring(backupId.lastIndexOf("_") + 1)); } static boolean withinRange(long a, long lower, long upper) { @@ -268,15 +264,15 @@ public class RestoreTablesClient { // case VALIDATION: // check the target tables checkTargetTables(tTableArray, isOverwrite); + // case RESTORE_IMAGES: HashMap<TableName, BackupManifest> backupManifestMap = new HashMap<>(); // check and load backup image manifest for the tables Path rootPath = new Path(targetRootDir); HBackupFileSystem.checkImageManifestExist(backupManifestMap, sTableArray, conf, rootPath, backupId); + restore(backupManifestMap, sTableArray, tTableArray, isOverwrite); } - - }