http://git-wip-us.apache.org/repos/asf/hbase/blob/2dda3712/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
----------------------------------------------------------------------
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
new file mode 100644
index 0000000..4dab046
--- /dev/null
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
@@ -0,0 +1,2051 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.impl;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.NamespaceDescriptor;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.BackupInfo;
+import org.apache.hadoop.hbase.backup.BackupInfo.BackupState;
+import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
+import org.apache.hadoop.hbase.backup.BackupType;
+import org.apache.hadoop.hbase.backup.util.BackupUtils;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.SnapshotDescription;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.BackupProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Pair;
+
+/**
+ * This class provides API to access backup system table<br>
+ *
+ * Backup system table schema:<br>
+ * <p><ul>
+ * <li>1. Backup sessions rowkey= "session:"+backupId; value =serialized 
BackupInfo</li>
+ * <li>2. Backup start code rowkey = "startcode:"+backupRoot; value = 
startcode</li>
+ * <li>3. Incremental backup set rowkey="incrbackupset:"+backupRoot; 
value=[list of tables]</li>
+ * <li>4. Table-RS-timestamp map rowkey="trslm:"+backupRoot+table_name;
+ * value = map[RS-> last WAL timestamp]</li>
+ * <li>5. RS - WAL ts map rowkey="rslogts:"+backupRoot +server; value = last 
WAL timestamp</li>
+ * <li>6. WALs recorded rowkey="wals:"+WAL unique file name;
+ * value = backupId and full WAL file name</li>
+ * </ul></p>
+ */
+
+@InterfaceAudience.Private
+public final class BackupSystemTable implements Closeable {
+  private static final Log LOG = LogFactory.getLog(BackupSystemTable.class);
+
+  static class WALItem {
+    String backupId;
+    String walFile;
+    String backupRoot;
+
+    WALItem(String backupId, String walFile, String backupRoot) {
+      this.backupId = backupId;
+      this.walFile = walFile;
+      this.backupRoot = backupRoot;
+    }
+
+    public String getBackupId() {
+      return backupId;
+    }
+
+    public String getWalFile() {
+      return walFile;
+    }
+
+    public String getBackupRoot() {
+      return backupRoot;
+    }
+
+    @Override
+    public String toString() {
+      return Path.SEPARATOR + backupRoot + Path.SEPARATOR + backupId + 
Path.SEPARATOR + walFile;
+    }
+
+  }
+
+  private TableName tableName;
+  /**
+   * Stores backup sessions (contexts)
+   */
+  final static byte[] SESSIONS_FAMILY = "session".getBytes();
+  /**
+   * Stores other meta
+   */
+  final static byte[] META_FAMILY = "meta".getBytes();
+  final static byte[] BULK_LOAD_FAMILY = "bulk".getBytes();
+  /**
+   * Connection to HBase cluster, shared among all instances
+   */
+  private final Connection connection;
+
+  private final static String BACKUP_INFO_PREFIX = "session:";
+  private final static String START_CODE_ROW = "startcode:";
+  private final static byte[] ACTIVE_SESSION_ROW = "activesession:".getBytes();
+  private final static byte[] ACTIVE_SESSION_COL = "c".getBytes();
+
+  private final static byte[] ACTIVE_SESSION_YES = "yes".getBytes();
+  private final static byte[] ACTIVE_SESSION_NO = "no".getBytes();
+
+  private final static String INCR_BACKUP_SET = "incrbackupset:";
+  private final static String TABLE_RS_LOG_MAP_PREFIX = "trslm:";
+  private final static String RS_LOG_TS_PREFIX = "rslogts:";
+
+  private final static String BULK_LOAD_PREFIX = "bulk:";
+  private final static byte[] BULK_LOAD_PREFIX_BYTES = 
BULK_LOAD_PREFIX.getBytes();
+  private final static byte[] DELETE_OP_ROW = "delete_op_row".getBytes();
+  private final static byte[] MERGE_OP_ROW = "merge_op_row".getBytes();
+
+  final static byte[] TBL_COL = Bytes.toBytes("tbl");
+  final static byte[] FAM_COL = Bytes.toBytes("fam");
+  final static byte[] PATH_COL = Bytes.toBytes("path");
+  final static byte[] STATE_COL = Bytes.toBytes("state");
+  // the two states a bulk loaded file can be
+  final static byte[] BL_PREPARE = Bytes.toBytes("R");
+  final static byte[] BL_COMMIT = Bytes.toBytes("D");
+
+  private final static String WALS_PREFIX = "wals:";
+  private final static String SET_KEY_PREFIX = "backupset:";
+
+  // separator between BULK_LOAD_PREFIX and ordinals
+  protected final static String BLK_LD_DELIM = ":";
+  private final static byte[] EMPTY_VALUE = new byte[] {};
+
+  // Safe delimiter in a string
+  private final static String NULL = "\u0000";
+
+  public BackupSystemTable(Connection conn) throws IOException {
+    this.connection = conn;
+    tableName = BackupSystemTable.getTableName(conn.getConfiguration());
+    checkSystemTable();
+  }
+
+  private void checkSystemTable() throws IOException {
+    try (Admin admin = connection.getAdmin();) {
+
+      verifyNamespaceExists(admin);
+
+      if (!admin.tableExists(tableName)) {
+        HTableDescriptor backupHTD =
+            
BackupSystemTable.getSystemTableDescriptor(connection.getConfiguration());
+        admin.createTable(backupHTD);
+      }
+      waitForSystemTable(admin);
+    }
+  }
+
+  private void verifyNamespaceExists(Admin admin) throws IOException {
+    String namespaceName = tableName.getNamespaceAsString();
+    NamespaceDescriptor ns = NamespaceDescriptor.create(namespaceName).build();
+    NamespaceDescriptor[] list = admin.listNamespaceDescriptors();
+    boolean exists = false;
+    for (NamespaceDescriptor nsd : list) {
+      if (nsd.getName().equals(ns.getName())) {
+        exists = true;
+        break;
+      }
+    }
+    if (!exists) {
+      admin.createNamespace(ns);
+    }
+  }
+
+  private void waitForSystemTable(Admin admin) throws IOException {
+    long TIMEOUT = 60000;
+    long startTime = EnvironmentEdgeManager.currentTime();
+    while (!admin.tableExists(tableName) || 
!admin.isTableAvailable(tableName)) {
+      try {
+        Thread.sleep(100);
+      } catch (InterruptedException e) {
+      }
+      if (EnvironmentEdgeManager.currentTime() - startTime > TIMEOUT) {
+        throw new IOException("Failed to create backup system table after " + 
TIMEOUT + "ms");
+      }
+    }
+    LOG.debug("Backup table exists and available");
+
+  }
+
+  @Override
+  public void close() {
+    // do nothing
+  }
+
+  /**
+   * Updates status (state) of a backup session in backup system table table
+   * @param info backup info
+   * @throws IOException exception
+   */
+  public void updateBackupInfo(BackupInfo info) throws IOException {
+
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("update backup status in backup system table for: " + 
info.getBackupId()
+          + " set status=" + info.getState());
+    }
+    try (Table table = connection.getTable(tableName)) {
+      Put put = createPutForBackupInfo(info);
+      table.put(put);
+    }
+  }
+
+  /*
+   * @param backupId the backup Id
+   * @return Map of rows to path of bulk loaded hfile
+   */
+  Map<byte[], String> readBulkLoadedFiles(String backupId) throws IOException {
+    Scan scan = BackupSystemTable.createScanForBulkLoadedFiles(backupId);
+    try (Table table = connection.getTable(tableName);
+        ResultScanner scanner = table.getScanner(scan)) {
+      Result res = null;
+      Map<byte[], String> map = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+      while ((res = scanner.next()) != null) {
+        res.advance();
+        byte[] row = CellUtil.cloneRow(res.listCells().get(0));
+        for (Cell cell : res.listCells()) {
+          if (CellComparator.compareQualifiers(cell, 
BackupSystemTable.PATH_COL, 0,
+            BackupSystemTable.PATH_COL.length) == 0) {
+            map.put(row, Bytes.toString(CellUtil.cloneValue(cell)));
+          }
+        }
+      }
+      return map;
+    }
+  }
+
+  /*
+   * Used during restore
+   * @param backupId the backup Id
+   * @param sTableList List of tables
+   * @return array of Map of family to List of Paths
+   */
+  public Map<byte[], List<Path>>[] readBulkLoadedFiles(String backupId, 
List<TableName> sTableList)
+      throws IOException {
+    Scan scan = BackupSystemTable.createScanForBulkLoadedFiles(backupId);
+    Map<byte[], List<Path>>[] mapForSrc = new Map[sTableList == null ? 1 : 
sTableList.size()];
+    try (Table table = connection.getTable(tableName);
+        ResultScanner scanner = table.getScanner(scan)) {
+      Result res = null;
+      while ((res = scanner.next()) != null) {
+        res.advance();
+        TableName tbl = null;
+        byte[] fam = null;
+        String path = null;
+        for (Cell cell : res.listCells()) {
+          if (CellComparator.compareQualifiers(cell, 
BackupSystemTable.TBL_COL, 0,
+            BackupSystemTable.TBL_COL.length) == 0) {
+            tbl = TableName.valueOf(CellUtil.cloneValue(cell));
+          } else if (CellComparator.compareQualifiers(cell, 
BackupSystemTable.FAM_COL, 0,
+            BackupSystemTable.FAM_COL.length) == 0) {
+            fam = CellUtil.cloneValue(cell);
+          } else if (CellComparator.compareQualifiers(cell, 
BackupSystemTable.PATH_COL, 0,
+            BackupSystemTable.PATH_COL.length) == 0) {
+            path = Bytes.toString(CellUtil.cloneValue(cell));
+          }
+        }
+        int srcIdx = IncrementalTableBackupClient.getIndex(tbl, sTableList);
+        if (srcIdx == -1) {
+          // the table is not among the query
+          continue;
+        }
+        if (mapForSrc[srcIdx] == null) {
+          mapForSrc[srcIdx] = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+        }
+        List<Path> files;
+        if (!mapForSrc[srcIdx].containsKey(fam)) {
+          files = new ArrayList<Path>();
+          mapForSrc[srcIdx].put(fam, files);
+        } else {
+          files = mapForSrc[srcIdx].get(fam);
+        }
+        files.add(new Path(path));
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("found bulk loaded file : " + tbl + " " + 
Bytes.toString(fam) + " " + path);
+        }
+      }
+      ;
+      return mapForSrc;
+    }
+  }
+
+  /*
+   * @param map Map of row keys to path of bulk loaded hfile
+   */
+  void deleteBulkLoadedFiles(Map<byte[], String> map) throws IOException {
+    try (Table table = connection.getTable(tableName)) {
+      List<Delete> dels = new ArrayList<>();
+      for (byte[] row : map.keySet()) {
+        dels.add(new Delete(row).addFamily(BackupSystemTable.META_FAMILY));
+      }
+      table.delete(dels);
+    }
+  }
+
+  /**
+   * Deletes backup status from backup system table table
+   * @param backupId backup id
+   * @throws IOException exception
+   */
+
+  public void deleteBackupInfo(String backupId) throws IOException {
+
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("delete backup status in backup system table for " + backupId);
+    }
+    try (Table table = connection.getTable(tableName)) {
+      Delete del = createDeleteForBackupInfo(backupId);
+      table.delete(del);
+    }
+  }
+
+  /*
+   * For postBulkLoadHFile() hook.
+   * @param tabName table name
+   * @param region the region receiving hfile
+   * @param finalPaths family and associated hfiles
+   */
+  public void writePathsPostBulkLoad(TableName tabName, byte[] region,
+      Map<byte[], List<Path>> finalPaths) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("write bulk load descriptor to backup " + tabName + " with " + 
finalPaths.size()
+          + " entries");
+    }
+    try (Table table = connection.getTable(tableName)) {
+      List<Put> puts = 
BackupSystemTable.createPutForCommittedBulkload(tabName, region, finalPaths);
+      table.put(puts);
+      LOG.debug("written " + puts.size() + " rows for bulk load of " + 
tabName);
+    }
+  }
+
+  /*
+   * For preCommitStoreFile() hook
+   * @param tabName table name
+   * @param region the region receiving hfile
+   * @param family column family
+   * @param pairs list of paths for hfiles
+   */
+  public void writeFilesForBulkLoadPreCommit(TableName tabName, byte[] region, 
final byte[] family,
+      final List<Pair<Path, Path>> pairs) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("write bulk load descriptor to backup " + tabName + " with " + 
pairs.size()
+          + " entries");
+    }
+    try (Table table = connection.getTable(tableName)) {
+      List<Put> puts =
+          BackupSystemTable.createPutForPreparedBulkload(tabName, region, 
family, pairs);
+      table.put(puts);
+      LOG.debug("written " + puts.size() + " rows for bulk load of " + 
tabName);
+    }
+  }
+
+  /*
+   * Removes rows recording bulk loaded hfiles from backup table
+   * @param lst list of table names
+   * @param rows the rows to be deleted
+   */
+  public void removeBulkLoadedRows(List<TableName> lst, List<byte[]> rows) 
throws IOException {
+    try (Table table = connection.getTable(tableName)) {
+      List<Delete> lstDels = new ArrayList<>();
+      for (byte[] row : rows) {
+        Delete del = new Delete(row);
+        lstDels.add(del);
+        LOG.debug("orig deleting the row: " + Bytes.toString(row));
+      }
+      table.delete(lstDels);
+      LOG.debug("deleted " + rows.size() + " original bulkload rows for " + 
lst.size() + " tables");
+    }
+  }
+
+  /*
+   * Reads the rows from backup table recording bulk loaded hfiles
+   * @param tableList list of table names
+   * @return The keys of the Map are table, region and column family. Value of 
the map reflects
+   * whether the hfile was recorded by preCommitStoreFile hook (true)
+   */
+  public Pair<Map<TableName, Map<String, Map<String, List<Pair<String, 
Boolean>>>>>, List<byte[]>>
+      readBulkloadRows(List<TableName> tableList) throws IOException {
+    Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>> map 
= new HashMap<>();
+    List<byte[]> rows = new ArrayList<>();
+    for (TableName tTable : tableList) {
+      Scan scan = BackupSystemTable.createScanForOrigBulkLoadedFiles(tTable);
+      Map<String, Map<String, List<Pair<String, Boolean>>>> tblMap = 
map.get(tTable);
+      try (Table table = connection.getTable(tableName);
+          ResultScanner scanner = table.getScanner(scan)) {
+        Result res = null;
+        while ((res = scanner.next()) != null) {
+          res.advance();
+          String fam = null;
+          String path = null;
+          boolean raw = false;
+          byte[] row = null;
+          String region = null;
+          for (Cell cell : res.listCells()) {
+            row = CellUtil.cloneRow(cell);
+            rows.add(row);
+            String rowStr = Bytes.toString(row);
+            region = 
BackupSystemTable.getRegionNameFromOrigBulkLoadRow(rowStr);
+            if (CellComparator.compareQualifiers(cell, 
BackupSystemTable.FAM_COL, 0,
+              BackupSystemTable.FAM_COL.length) == 0) {
+              fam = Bytes.toString(CellUtil.cloneValue(cell));
+            } else if (CellComparator.compareQualifiers(cell, 
BackupSystemTable.PATH_COL, 0,
+              BackupSystemTable.PATH_COL.length) == 0) {
+              path = Bytes.toString(CellUtil.cloneValue(cell));
+            } else if (CellComparator.compareQualifiers(cell, 
BackupSystemTable.STATE_COL, 0,
+              BackupSystemTable.STATE_COL.length) == 0) {
+              byte[] state = CellUtil.cloneValue(cell);
+              if (Bytes.equals(BackupSystemTable.BL_PREPARE, state)) {
+                raw = true;
+              } else raw = false;
+            }
+          }
+          if (map.get(tTable) == null) {
+            map.put(tTable, new HashMap<String, Map<String, List<Pair<String, 
Boolean>>>>());
+            tblMap = map.get(tTable);
+          }
+          if (tblMap.get(region) == null) {
+            tblMap.put(region, new HashMap<String, List<Pair<String, 
Boolean>>>());
+          }
+          Map<String, List<Pair<String, Boolean>>> famMap = tblMap.get(region);
+          if (famMap.get(fam) == null) {
+            famMap.put(fam, new ArrayList<Pair<String, Boolean>>());
+          }
+          famMap.get(fam).add(new Pair<>(path, raw));
+          LOG.debug("found orig " + path + " for " + fam + " of table " + 
region);
+        }
+      }
+    }
+    return new Pair<>(map, rows);
+  }
+
+  /*
+   * @param sTableList List of tables
+   * @param maps array of Map of family to List of Paths
+   * @param backupId the backup Id
+   */
+  public void writeBulkLoadedFiles(List<TableName> sTableList, Map<byte[], 
List<Path>>[] maps,
+      String backupId) throws IOException {
+    try (Table table = connection.getTable(tableName)) {
+      long ts = EnvironmentEdgeManager.currentTime();
+      int cnt = 0;
+      List<Put> puts = new ArrayList<>();
+      for (int idx = 0; idx < maps.length; idx++) {
+        Map<byte[], List<Path>> map = maps[idx];
+        TableName tn = sTableList.get(idx);
+        if (map == null) continue;
+        for (Map.Entry<byte[], List<Path>> entry : map.entrySet()) {
+          byte[] fam = entry.getKey();
+          List<Path> paths = entry.getValue();
+          for (Path p : paths) {
+            Put put =
+                BackupSystemTable.createPutForBulkLoadedFile(tn, fam, 
p.toString(), backupId, ts,
+                  cnt++);
+            puts.add(put);
+          }
+        }
+      }
+      if (!puts.isEmpty()) {
+        table.put(puts);
+      }
+    }
+  }
+
+  /**
+   * Reads backup status object (instance of backup info) from backup system 
table table
+   * @param backupId backup id
+   * @return Current status of backup session or null
+   */
+
+  public BackupInfo readBackupInfo(String backupId) throws IOException {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("read backup status from backup system table for: " + 
backupId);
+    }
+
+    try (Table table = connection.getTable(tableName)) {
+      Get get = createGetForBackupInfo(backupId);
+      Result res = table.get(get);
+      if (res.isEmpty()) {
+        return null;
+      }
+      return resultToBackupInfo(res);
+    }
+  }
+
+  /**
+   * Read the last backup start code (timestamp) of last successful backup. 
Will return null if
+   * there is no start code stored on hbase or the value is of length 0. These 
two cases indicate
+   * there is no successful backup completed so far.
+   * @param backupRoot directory path to backup destination
+   * @return the timestamp of last successful backup
+   * @throws IOException exception
+   */
+  public String readBackupStartCode(String backupRoot) throws IOException {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("read backup start code from backup system table");
+    }
+    try (Table table = connection.getTable(tableName)) {
+      Get get = createGetForStartCode(backupRoot);
+      Result res = table.get(get);
+      if (res.isEmpty()) {
+        return null;
+      }
+      Cell cell = res.listCells().get(0);
+      byte[] val = CellUtil.cloneValue(cell);
+      if (val.length == 0) {
+        return null;
+      }
+      return new String(val);
+    }
+  }
+
+  /**
+   * Write the start code (timestamp) to backup system table. If passed in 
null, then write 0 byte.
+   * @param startCode start code
+   * @param backupRoot root directory path to backup
+   * @throws IOException exception
+   */
+  public void writeBackupStartCode(Long startCode, String backupRoot) throws 
IOException {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("write backup start code to backup system table " + startCode);
+    }
+    try (Table table = connection.getTable(tableName)) {
+      Put put = createPutForStartCode(startCode.toString(), backupRoot);
+      table.put(put);
+    }
+  }
+
+  /**
+   * Exclusive operations are:
+   * create, delete, merge
+   * @throws IOException
+   */
+  public void startBackupExclusiveOperation() throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Start new backup exclusive operation");
+    }
+    try (Table table = connection.getTable(tableName)) {
+      Put put = createPutForStartBackupSession();
+      // First try to put if row does not exist
+      if (!table.checkAndPut(ACTIVE_SESSION_ROW, SESSIONS_FAMILY, 
ACTIVE_SESSION_COL, null, put)) {
+        // Row exists, try to put if value == ACTIVE_SESSION_NO
+        if (!table.checkAndPut(ACTIVE_SESSION_ROW, SESSIONS_FAMILY, 
ACTIVE_SESSION_COL,
+          ACTIVE_SESSION_NO, put)) {
+          throw new IOException("There is an active backup exclusive 
operation");
+        }
+      }
+    }
+  }
+
+  private Put createPutForStartBackupSession() {
+    Put put = new Put(ACTIVE_SESSION_ROW);
+    put.addColumn(SESSIONS_FAMILY, ACTIVE_SESSION_COL, ACTIVE_SESSION_YES);
+    return put;
+  }
+
+  public void finishBackupExclusiveOperation() throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Finish backup exclusive operation");
+    }
+    try (Table table = connection.getTable(tableName)) {
+      Put put = createPutForStopBackupSession();
+      if (!table.checkAndPut(ACTIVE_SESSION_ROW, SESSIONS_FAMILY, 
ACTIVE_SESSION_COL,
+        ACTIVE_SESSION_YES, put)) {
+        throw new IOException("There is no active backup exclusive operation");
+      }
+    }
+  }
+
+  private Put createPutForStopBackupSession() {
+    Put put = new Put(ACTIVE_SESSION_ROW);
+    put.addColumn(SESSIONS_FAMILY, ACTIVE_SESSION_COL, ACTIVE_SESSION_NO);
+    return put;
+  }
+
+  /**
+   * Get the Region Servers log information after the last log roll from 
backup system table.
+   * @param backupRoot root directory path to backup
+   * @return RS log info
+   * @throws IOException exception
+   */
+  public HashMap<String, Long> readRegionServerLastLogRollResult(String 
backupRoot)
+      throws IOException {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("read region server last roll log result to backup system 
table");
+    }
+
+    Scan scan = createScanForReadRegionServerLastLogRollResult(backupRoot);
+
+    try (Table table = connection.getTable(tableName);
+        ResultScanner scanner = table.getScanner(scan)) {
+      Result res = null;
+      HashMap<String, Long> rsTimestampMap = new HashMap<String, Long>();
+      while ((res = scanner.next()) != null) {
+        res.advance();
+        Cell cell = res.current();
+        byte[] row = CellUtil.cloneRow(cell);
+        String server = getServerNameForReadRegionServerLastLogRollResult(row);
+        byte[] data = CellUtil.cloneValue(cell);
+        rsTimestampMap.put(server, Bytes.toLong(data));
+      }
+      return rsTimestampMap;
+    }
+  }
+
+  /**
+   * Writes Region Server last roll log result (timestamp) to backup system 
table table
+   * @param server Region Server name
+   * @param ts last log timestamp
+   * @param backupRoot root directory path to backup
+   * @throws IOException exception
+   */
+  public void writeRegionServerLastLogRollResult(String server, Long ts, 
String backupRoot)
+      throws IOException {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("write region server last roll log result to backup system 
table");
+    }
+    try (Table table = connection.getTable(tableName)) {
+      Put put = createPutForRegionServerLastLogRollResult(server, ts, 
backupRoot);
+      table.put(put);
+    }
+  }
+
+  /**
+   * Get all completed backup information (in desc order by time)
+   * @param onlyCompleted true, if only successfully completed sessions
+   * @return history info of BackupCompleteData
+   * @throws IOException exception
+   */
+  public ArrayList<BackupInfo> getBackupHistory(boolean onlyCompleted) throws 
IOException {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("get backup history from backup system table");
+    }
+    ArrayList<BackupInfo> list;
+    BackupState state = onlyCompleted ? BackupState.COMPLETE : BackupState.ANY;
+    list = getBackupInfos(state);
+    return BackupUtils.sortHistoryListDesc(list);
+  }
+
+  /**
+   * Get all backups history
+   * @return list of backup info
+   * @throws IOException
+   */
+  public List<BackupInfo> getBackupHistory() throws IOException {
+    return getBackupHistory(false);
+  }
+
+  /**
+   * Get first n backup history records
+   * @param n number of records, if n== -1 - max number
+   *        is ignored
+   * @return list of records
+   * @throws IOException
+   */
+  public List<BackupInfo> getHistory(int n) throws IOException {
+
+    List<BackupInfo> history = getBackupHistory();
+    if (n == -1 || history.size() <= n) return history;
+    List<BackupInfo> list = new ArrayList<BackupInfo>();
+    for (int i = 0; i < n; i++) {
+      list.add(history.get(i));
+    }
+    return list;
+
+  }
+
+  /**
+   * Get backup history records filtered by list of filters.
+   * @param n max number of records, if n == -1 , then max number
+   *        is ignored
+   * @param filters list of filters
+   * @return backup records
+   * @throws IOException
+   */
+  public List<BackupInfo> getBackupHistory(int n, BackupInfo.Filter... 
filters) throws IOException {
+    if (filters.length == 0) return getHistory(n);
+
+    List<BackupInfo> history = getBackupHistory();
+    List<BackupInfo> result = new ArrayList<BackupInfo>();
+    for (BackupInfo bi : history) {
+      if (n >= 0 && result.size() == n) break;
+      boolean passed = true;
+      for (int i = 0; i < filters.length; i++) {
+        if (!filters[i].apply(bi)) {
+          passed = false;
+          break;
+        }
+      }
+      if (passed) {
+        result.add(bi);
+      }
+    }
+    return result;
+
+  }
+
+  /*
+   * Retrieve TableName's for completed backup of given type
+   * @param type backup type
+   * @return List of table names
+   */
+  public List<TableName> getTablesForBackupType(BackupType type) throws 
IOException {
+    Set<TableName> names = new HashSet<>();
+    List<BackupInfo> infos = getBackupHistory(true);
+    for (BackupInfo info : infos) {
+      if (info.getType() != type) continue;
+      names.addAll(info.getTableNames());
+    }
+    return new ArrayList(names);
+  }
+
+  /**
+   * Get history for backup destination
+   * @param backupRoot backup destination path
+   * @return List of backup info
+   * @throws IOException
+   */
+  public List<BackupInfo> getBackupHistory(String backupRoot) throws 
IOException {
+    ArrayList<BackupInfo> history = getBackupHistory(false);
+    for (Iterator<BackupInfo> iterator = history.iterator(); 
iterator.hasNext();) {
+      BackupInfo info = iterator.next();
+      if (!backupRoot.equals(info.getBackupRootDir())) {
+        iterator.remove();
+      }
+    }
+    return history;
+  }
+
+  /**
+   * Get history for a table
+   * @param name table name
+   * @return history for a table
+   * @throws IOException
+   */
+  public List<BackupInfo> getBackupHistoryForTable(TableName name) throws 
IOException {
+    List<BackupInfo> history = getBackupHistory();
+    List<BackupInfo> tableHistory = new ArrayList<BackupInfo>();
+    for (BackupInfo info : history) {
+      List<TableName> tables = info.getTableNames();
+      if (tables.contains(name)) {
+        tableHistory.add(info);
+      }
+    }
+    return tableHistory;
+  }
+
+  public Map<TableName, ArrayList<BackupInfo>> 
getBackupHistoryForTableSet(Set<TableName> set,
+      String backupRoot) throws IOException {
+    List<BackupInfo> history = getBackupHistory(backupRoot);
+    Map<TableName, ArrayList<BackupInfo>> tableHistoryMap =
+        new HashMap<TableName, ArrayList<BackupInfo>>();
+    for (Iterator<BackupInfo> iterator = history.iterator(); 
iterator.hasNext();) {
+      BackupInfo info = iterator.next();
+      if (!backupRoot.equals(info.getBackupRootDir())) {
+        continue;
+      }
+      List<TableName> tables = info.getTableNames();
+      for (TableName tableName : tables) {
+        if (set.contains(tableName)) {
+          ArrayList<BackupInfo> list = tableHistoryMap.get(tableName);
+          if (list == null) {
+            list = new ArrayList<BackupInfo>();
+            tableHistoryMap.put(tableName, list);
+          }
+          list.add(info);
+        }
+      }
+    }
+    return tableHistoryMap;
+  }
+
+  /**
+   * Get all backup sessions with a given state (in descending order by time)
+   * @param state backup session state
+   * @return history info of backup info objects
+   * @throws IOException exception
+   */
+  public ArrayList<BackupInfo> getBackupInfos(BackupState state) throws 
IOException {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("get backup infos from backup system table");
+    }
+
+    Scan scan = createScanForBackupHistory();
+    ArrayList<BackupInfo> list = new ArrayList<BackupInfo>();
+
+    try (Table table = connection.getTable(tableName);
+        ResultScanner scanner = table.getScanner(scan)) {
+      Result res = null;
+      while ((res = scanner.next()) != null) {
+        res.advance();
+        BackupInfo context = cellToBackupInfo(res.current());
+        if (state != BackupState.ANY && context.getState() != state) {
+          continue;
+        }
+        list.add(context);
+      }
+      return list;
+    }
+  }
+
+  /**
+   * Write the current timestamps for each regionserver to backup system table 
after a successful
+   * full or incremental backup. The saved timestamp is of the last log file 
that was backed up
+   * already.
+   * @param tables tables
+   * @param newTimestamps timestamps
+   * @param backupRoot root directory path to backup
+   * @throws IOException exception
+   */
+  public void writeRegionServerLogTimestamp(Set<TableName> tables,
+      HashMap<String, Long> newTimestamps, String backupRoot) throws 
IOException {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("write RS log time stamps to backup system table for tables ["
+          + StringUtils.join(tables, ",") + "]");
+    }
+    List<Put> puts = new ArrayList<Put>();
+    for (TableName table : tables) {
+      byte[] smapData = toTableServerTimestampProto(table, 
newTimestamps).toByteArray();
+      Put put = createPutForWriteRegionServerLogTimestamp(table, smapData, 
backupRoot);
+      puts.add(put);
+    }
+    try (Table table = connection.getTable(tableName)) {
+      table.put(puts);
+    }
+  }
+
+  /**
+   * Read the timestamp for each region server log after the last successful 
backup. Each table has
+   * its own set of the timestamps. The info is stored for each table as a 
concatenated string of
+   * rs->timestapmp
+   * @param backupRoot root directory path to backup
+   * @return the timestamp for each region server. key: tableName value:
+   *         RegionServer,PreviousTimeStamp
+   * @throws IOException exception
+   */
+  public HashMap<TableName, HashMap<String, Long>> readLogTimestampMap(String 
backupRoot)
+      throws IOException {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("read RS log ts from backup system table for root=" + 
backupRoot);
+    }
+
+    HashMap<TableName, HashMap<String, Long>> tableTimestampMap =
+        new HashMap<TableName, HashMap<String, Long>>();
+
+    Scan scan = createScanForReadLogTimestampMap(backupRoot);
+    try (Table table = connection.getTable(tableName);
+        ResultScanner scanner = table.getScanner(scan)) {
+      Result res = null;
+      while ((res = scanner.next()) != null) {
+        res.advance();
+        Cell cell = res.current();
+        byte[] row = CellUtil.cloneRow(cell);
+        String tabName = getTableNameForReadLogTimestampMap(row);
+        TableName tn = TableName.valueOf(tabName);
+        byte[] data = CellUtil.cloneValue(cell);
+        if (data == null) {
+          throw new IOException("Data of last backup data from backup system 
table "
+              + "is empty. Create a backup first.");
+        }
+        if (data != null && data.length > 0) {
+          HashMap<String, Long> lastBackup =
+              
fromTableServerTimestampProto(BackupProtos.TableServerTimestamp.parseFrom(data));
+          tableTimestampMap.put(tn, lastBackup);
+        }
+      }
+      return tableTimestampMap;
+    }
+  }
+
+  private BackupProtos.TableServerTimestamp 
toTableServerTimestampProto(TableName table,
+      Map<String, Long> map) {
+    BackupProtos.TableServerTimestamp.Builder tstBuilder =
+        BackupProtos.TableServerTimestamp.newBuilder();
+    
tstBuilder.setTableName(org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil
+        .toProtoTableName(table));
+
+    for (Entry<String, Long> entry : map.entrySet()) {
+      BackupProtos.ServerTimestamp.Builder builder = 
BackupProtos.ServerTimestamp.newBuilder();
+      HBaseProtos.ServerName.Builder snBuilder = 
HBaseProtos.ServerName.newBuilder();
+      ServerName sn = ServerName.parseServerName(entry.getKey());
+      snBuilder.setHostName(sn.getHostname());
+      snBuilder.setPort(sn.getPort());
+      builder.setServerName(snBuilder.build());
+      builder.setTimestamp(entry.getValue());
+      tstBuilder.addServerTimestamp(builder.build());
+    }
+
+    return tstBuilder.build();
+  }
+
+  private HashMap<String, Long> fromTableServerTimestampProto(
+      BackupProtos.TableServerTimestamp proto) {
+    HashMap<String, Long> map = new HashMap<String, Long>();
+    List<BackupProtos.ServerTimestamp> list = proto.getServerTimestampList();
+    for (BackupProtos.ServerTimestamp st : list) {
+      ServerName sn =
+          
org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.toServerName(st.getServerName());
+      map.put(sn.getHostname() + ":" + sn.getPort(), st.getTimestamp());
+    }
+    return map;
+  }
+
+  /**
+   * Return the current tables covered by incremental backup.
+   * @param backupRoot root directory path to backup
+   * @return set of tableNames
+   * @throws IOException exception
+   */
+  public Set<TableName> getIncrementalBackupTableSet(String backupRoot) throws 
IOException {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("get incremental backup table set from backup system table");
+    }
+    TreeSet<TableName> set = new TreeSet<>();
+
+    try (Table table = connection.getTable(tableName)) {
+      Get get = createGetForIncrBackupTableSet(backupRoot);
+      Result res = table.get(get);
+      if (res.isEmpty()) {
+        return set;
+      }
+      List<Cell> cells = res.listCells();
+      for (Cell cell : cells) {
+        // qualifier = table name - we use table names as qualifiers
+        set.add(TableName.valueOf(CellUtil.cloneQualifier(cell)));
+      }
+      return set;
+    }
+  }
+
+  /**
+   * Add tables to global incremental backup set
+   * @param tables set of tables
+   * @param backupRoot root directory path to backup
+   * @throws IOException exception
+   */
+  public void addIncrementalBackupTableSet(Set<TableName> tables, String 
backupRoot)
+      throws IOException {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Add incremental backup table set to backup system table. 
ROOT=" + backupRoot
+          + " tables [" + StringUtils.join(tables, " ") + "]");
+      for (TableName table : tables) {
+        LOG.debug(table);
+      }
+    }
+    try (Table table = connection.getTable(tableName)) {
+      Put put = createPutForIncrBackupTableSet(tables, backupRoot);
+      table.put(put);
+    }
+  }
+
+  /**
+   * Deletes incremental backup set for a backup destination
+   * @param backupRoot backup root
+   */
+
+  public void deleteIncrementalBackupTableSet(String backupRoot) throws 
IOException {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Delete incremental backup table set to backup system table. 
ROOT=" + backupRoot);
+    }
+    try (Table table = connection.getTable(tableName)) {
+      Delete delete = createDeleteForIncrBackupTableSet(backupRoot);
+      table.delete(delete);
+    }
+  }
+
+  /**
+   * Register WAL files as eligible for deletion
+   * @param files files
+   * @param backupId backup id
+   * @param backupRoot root directory path to backup destination
+   * @throws IOException exception
+   */
+  public void addWALFiles(List<String> files, String backupId, String 
backupRoot)
+      throws IOException {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("add WAL files to backup system table: " + backupId + " " + 
backupRoot + " files ["
+          + StringUtils.join(files, ",") + "]");
+      for (String f : files) {
+        LOG.debug("add :" + f);
+      }
+    }
+    try (Table table = connection.getTable(tableName)) {
+      List<Put> puts = createPutsForAddWALFiles(files, backupId, backupRoot);
+      table.put(puts);
+    }
+  }
+
+  /**
+   * Register WAL files as eligible for deletion
+   * @param backupRoot root directory path to backup
+   * @throws IOException exception
+   */
+  public Iterator<WALItem> getWALFilesIterator(String backupRoot) throws 
IOException {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("get WAL files from backup system table");
+    }
+    final Table table = connection.getTable(tableName);
+    Scan scan = createScanForGetWALs(backupRoot);
+    final ResultScanner scanner = table.getScanner(scan);
+    final Iterator<Result> it = scanner.iterator();
+    return new Iterator<WALItem>() {
+
+      @Override
+      public boolean hasNext() {
+        boolean next = it.hasNext();
+        if (!next) {
+          // close all
+          try {
+            scanner.close();
+            table.close();
+          } catch (IOException e) {
+            LOG.error("Close WAL Iterator", e);
+          }
+        }
+        return next;
+      }
+
+      @Override
+      public WALItem next() {
+        Result next = it.next();
+        List<Cell> cells = next.listCells();
+        byte[] buf = cells.get(0).getValueArray();
+        int len = cells.get(0).getValueLength();
+        int offset = cells.get(0).getValueOffset();
+        String backupId = new String(buf, offset, len);
+        buf = cells.get(1).getValueArray();
+        len = cells.get(1).getValueLength();
+        offset = cells.get(1).getValueOffset();
+        String walFile = new String(buf, offset, len);
+        buf = cells.get(2).getValueArray();
+        len = cells.get(2).getValueLength();
+        offset = cells.get(2).getValueOffset();
+        String backupRoot = new String(buf, offset, len);
+        return new WALItem(backupId, walFile, backupRoot);
+      }
+
+      @Override
+      public void remove() {
+        // not implemented
+        throw new RuntimeException("remove is not supported");
+      }
+    };
+
+  }
+
+  /**
+   * Check if WAL file is eligible for deletion Future: to support all backup 
destinations
+   * @param file name of a file to check
+   * @return true, if deletable, false otherwise.
+   * @throws IOException exception
+   * TODO: multiple backup destination support
+   */
+  public boolean isWALFileDeletable(String file) throws IOException {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Check if WAL file has been already backed up in backup system 
table " + file);
+    }
+    try (Table table = connection.getTable(tableName)) {
+      Get get = createGetForCheckWALFile(file);
+      Result res = table.get(get);
+      if (res.isEmpty()) {
+        return false;
+      }
+      return true;
+    }
+  }
+
+  /**
+   * Checks if we have at least one backup session in backup system table This 
API is used by
+   * BackupLogCleaner
+   * @return true, if - at least one session exists in backup system table 
table
+   * @throws IOException exception
+   */
+  public boolean hasBackupSessions() throws IOException {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Has backup sessions from backup system table");
+    }
+    boolean result = false;
+    Scan scan = createScanForBackupHistory();
+    scan.setCaching(1);
+    try (Table table = connection.getTable(tableName);
+        ResultScanner scanner = table.getScanner(scan)) {
+      if (scanner.next() != null) {
+        result = true;
+      }
+      return result;
+    }
+  }
+
+  /**
+   * BACKUP SETS
+   */
+
+  /**
+   * Get backup set list
+   * @return backup set list
+   * @throws IOException
+   */
+  public List<String> listBackupSets() throws IOException {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace(" Backup set list");
+    }
+    List<String> list = new ArrayList<String>();
+    Table table = null;
+    ResultScanner scanner = null;
+    try {
+      table = connection.getTable(tableName);
+      Scan scan = createScanForBackupSetList();
+      scan.setMaxVersions(1);
+      scanner = table.getScanner(scan);
+      Result res = null;
+      while ((res = scanner.next()) != null) {
+        res.advance();
+        list.add(cellKeyToBackupSetName(res.current()));
+      }
+      return list;
+    } finally {
+      if (scanner != null) {
+        scanner.close();
+      }
+      if (table != null) {
+        table.close();
+      }
+    }
+  }
+
+  /**
+   * Get backup set description (list of tables)
+   * @param name set's name
+   * @return list of tables in a backup set
+   * @throws IOException
+   */
+  public List<TableName> describeBackupSet(String name) throws IOException {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace(" Backup set describe: " + name);
+    }
+    Table table = null;
+    try {
+      table = connection.getTable(tableName);
+      Get get = createGetForBackupSet(name);
+      Result res = table.get(get);
+      if (res.isEmpty()) return null;
+      res.advance();
+      String[] tables = cellValueToBackupSet(res.current());
+      return toList(tables);
+    } finally {
+      if (table != null) {
+        table.close();
+      }
+    }
+  }
+
+  private List<TableName> toList(String[] tables) {
+    List<TableName> list = new ArrayList<TableName>(tables.length);
+    for (String name : tables) {
+      list.add(TableName.valueOf(name));
+    }
+    return list;
+  }
+
+  /**
+   * Add backup set (list of tables)
+   * @param name set name
+   * @param newTables list of tables, comma-separated
+   * @throws IOException
+   */
+  public void addToBackupSet(String name, String[] newTables) throws 
IOException {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Backup set add: " + name + " tables [" + 
StringUtils.join(newTables, " ") + "]");
+    }
+    Table table = null;
+    String[] union = null;
+    try {
+      table = connection.getTable(tableName);
+      Get get = createGetForBackupSet(name);
+      Result res = table.get(get);
+      if (res.isEmpty()) {
+        union = newTables;
+      } else {
+        res.advance();
+        String[] tables = cellValueToBackupSet(res.current());
+        union = merge(tables, newTables);
+      }
+      Put put = createPutForBackupSet(name, union);
+      table.put(put);
+    } finally {
+      if (table != null) {
+        table.close();
+      }
+    }
+  }
+
+  private String[] merge(String[] tables, String[] newTables) {
+    List<String> list = new ArrayList<String>();
+    // Add all from tables
+    for (String t : tables) {
+      list.add(t);
+    }
+    for (String nt : newTables) {
+      if (list.contains(nt)) continue;
+      list.add(nt);
+    }
+    String[] arr = new String[list.size()];
+    list.toArray(arr);
+    return arr;
+  }
+
+  /**
+   * Remove tables from backup set (list of tables)
+   * @param name set name
+   * @param toRemove list of tables
+   * @throws IOException
+   */
+  public void removeFromBackupSet(String name, String[] toRemove) throws 
IOException {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace(" Backup set remove from : " + name + " tables [" + 
StringUtils.join(toRemove, " ")
+          + "]");
+    }
+    Table table = null;
+    String[] disjoint = null;
+    String[] tables = null;
+    try {
+      table = connection.getTable(tableName);
+      Get get = createGetForBackupSet(name);
+      Result res = table.get(get);
+      if (res.isEmpty()) {
+        LOG.warn("Backup set '" + name + "' not found.");
+        return;
+      } else {
+        res.advance();
+        tables = cellValueToBackupSet(res.current());
+        disjoint = disjoin(tables, toRemove);
+      }
+      if (disjoint.length > 0 && disjoint.length != tables.length) {
+        Put put = createPutForBackupSet(name, disjoint);
+        table.put(put);
+      } else if (disjoint.length == tables.length) {
+        LOG.warn("Backup set '" + name + "' does not contain tables ["
+            + StringUtils.join(toRemove, " ") + "]");
+      } else { // disjoint.length == 0 and tables.length >0
+        // Delete backup set
+        LOG.info("Backup set '" + name + "' is empty. Deleting.");
+        deleteBackupSet(name);
+      }
+    } finally {
+      if (table != null) {
+        table.close();
+      }
+    }
+  }
+
+  private String[] disjoin(String[] tables, String[] toRemove) {
+    List<String> list = new ArrayList<String>();
+    // Add all from tables
+    for (String t : tables) {
+      list.add(t);
+    }
+    for (String nt : toRemove) {
+      if (list.contains(nt)) {
+        list.remove(nt);
+      }
+    }
+    String[] arr = new String[list.size()];
+    list.toArray(arr);
+    return arr;
+  }
+
+  /**
+   * Delete backup set
+   * @param name set's name
+   * @throws IOException
+   */
+  public void deleteBackupSet(String name) throws IOException {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace(" Backup set delete: " + name);
+    }
+    Table table = null;
+    try {
+      table = connection.getTable(tableName);
+      Delete del = createDeleteForBackupSet(name);
+      table.delete(del);
+    } finally {
+      if (table != null) {
+        table.close();
+      }
+    }
+  }
+
+  /**
+   * Get backup system table descriptor
+   * @return table's descriptor
+   */
+  public static HTableDescriptor getSystemTableDescriptor(Configuration conf) {
+
+    HTableDescriptor tableDesc = new HTableDescriptor(getTableName(conf));
+    HColumnDescriptor colSessionsDesc = new HColumnDescriptor(SESSIONS_FAMILY);
+    colSessionsDesc.setMaxVersions(1);
+    // Time to keep backup sessions (secs)
+    Configuration config = HBaseConfiguration.create();
+    int ttl =
+        config.getInt(BackupRestoreConstants.BACKUP_SYSTEM_TTL_KEY,
+          BackupRestoreConstants.BACKUP_SYSTEM_TTL_DEFAULT);
+    colSessionsDesc.setTimeToLive(ttl);
+    tableDesc.addFamily(colSessionsDesc);
+    HColumnDescriptor colMetaDesc = new HColumnDescriptor(META_FAMILY);
+    tableDesc.addFamily(colMetaDesc);
+    return tableDesc;
+  }
+
+  public static TableName getTableName(Configuration conf) {
+    String name =
+        conf.get(BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_KEY,
+          BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_DEFAULT);
+    return TableName.valueOf(name);
+  }
+
+  public static String getTableNameAsString(Configuration conf) {
+    return getTableName(conf).getNameAsString();
+  }
+
+  public static String getSnapshotName(Configuration conf) {
+    return "snapshot_" + getTableNameAsString(conf).replace(":", "_");
+  }
+
+  /**
+   * Creates Put operation for a given backup info object
+   * @param context backup info
+   * @return put operation
+   * @throws IOException exception
+   */
+  private Put createPutForBackupInfo(BackupInfo context) throws IOException {
+    Put put = new Put(rowkey(BACKUP_INFO_PREFIX, context.getBackupId()));
+    put.addColumn(BackupSystemTable.SESSIONS_FAMILY, Bytes.toBytes("context"),
+      context.toByteArray());
+    return put;
+  }
+
+  /**
+   * Creates Get operation for a given backup id
+   * @param backupId backup's ID
+   * @return get operation
+   * @throws IOException exception
+   */
+  private Get createGetForBackupInfo(String backupId) throws IOException {
+    Get get = new Get(rowkey(BACKUP_INFO_PREFIX, backupId));
+    get.addFamily(BackupSystemTable.SESSIONS_FAMILY);
+    get.setMaxVersions(1);
+    return get;
+  }
+
+  /**
+   * Creates Delete operation for a given backup id
+   * @param backupId backup's ID
+   * @return delete operation
+   * @throws IOException exception
+   */
+  private Delete createDeleteForBackupInfo(String backupId) {
+    Delete del = new Delete(rowkey(BACKUP_INFO_PREFIX, backupId));
+    del.addFamily(BackupSystemTable.SESSIONS_FAMILY);
+    return del;
+  }
+
+  /**
+   * Converts Result to BackupInfo
+   * @param res HBase result
+   * @return backup info instance
+   * @throws IOException exception
+   */
+  private BackupInfo resultToBackupInfo(Result res) throws IOException {
+    res.advance();
+    Cell cell = res.current();
+    return cellToBackupInfo(cell);
+  }
+
+  /**
+   * Creates Get operation to retrieve start code from backup system table
+   * @return get operation
+   * @throws IOException exception
+   */
+  private Get createGetForStartCode(String rootPath) throws IOException {
+    Get get = new Get(rowkey(START_CODE_ROW, rootPath));
+    get.addFamily(BackupSystemTable.META_FAMILY);
+    get.setMaxVersions(1);
+    return get;
+  }
+
+  /**
+   * Creates Put operation to store start code to backup system table
+   * @return put operation
+   * @throws IOException exception
+   */
+  private Put createPutForStartCode(String startCode, String rootPath) {
+    Put put = new Put(rowkey(START_CODE_ROW, rootPath));
+    put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("startcode"),
+      Bytes.toBytes(startCode));
+    return put;
+  }
+
+  /**
+   * Creates Get to retrieve incremental backup table set from backup system 
table
+   * @return get operation
+   * @throws IOException exception
+   */
+  private Get createGetForIncrBackupTableSet(String backupRoot) throws 
IOException {
+    Get get = new Get(rowkey(INCR_BACKUP_SET, backupRoot));
+    get.addFamily(BackupSystemTable.META_FAMILY);
+    get.setMaxVersions(1);
+    return get;
+  }
+
+  /**
+   * Creates Put to store incremental backup table set
+   * @param tables tables
+   * @return put operation
+   */
+  private Put createPutForIncrBackupTableSet(Set<TableName> tables, String 
backupRoot) {
+    Put put = new Put(rowkey(INCR_BACKUP_SET, backupRoot));
+    for (TableName table : tables) {
+      put.addColumn(BackupSystemTable.META_FAMILY, 
Bytes.toBytes(table.getNameAsString()),
+        EMPTY_VALUE);
+    }
+    return put;
+  }
+
+  /**
+   * Creates Delete for incremental backup table set
+   * @param backupRoot backup root
+   * @return delete operation
+   */
+  private Delete createDeleteForIncrBackupTableSet(String backupRoot) {
+    Delete delete = new Delete(rowkey(INCR_BACKUP_SET, backupRoot));
+    delete.addFamily(BackupSystemTable.META_FAMILY);
+    return delete;
+  }
+
+  /**
+   * Creates Scan operation to load backup history
+   * @return scan operation
+   */
+  private Scan createScanForBackupHistory() {
+    Scan scan = new Scan();
+    byte[] startRow = Bytes.toBytes(BACKUP_INFO_PREFIX);
+    byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
+    stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
+    scan.setStartRow(startRow);
+    scan.setStopRow(stopRow);
+    scan.addFamily(BackupSystemTable.SESSIONS_FAMILY);
+    scan.setMaxVersions(1);
+    return scan;
+  }
+
+  /**
+   * Converts cell to backup info instance.
+   * @param current current cell
+   * @return backup backup info instance
+   * @throws IOException exception
+   */
+  private BackupInfo cellToBackupInfo(Cell current) throws IOException {
+    byte[] data = CellUtil.cloneValue(current);
+    return BackupInfo.fromByteArray(data);
+  }
+
+  /**
+   * Creates Put to write RS last roll log timestamp map
+   * @param table table
+   * @param smap map, containing RS:ts
+   * @return put operation
+   */
+  private Put createPutForWriteRegionServerLogTimestamp(TableName table, 
byte[] smap,
+      String backupRoot) {
+    Put put = new Put(rowkey(TABLE_RS_LOG_MAP_PREFIX, backupRoot, NULL, 
table.getNameAsString()));
+    put.addColumn(BackupSystemTable.META_FAMILY, 
Bytes.toBytes("log-roll-map"), smap);
+    return put;
+  }
+
+  /**
+   * Creates Scan to load table-> { RS -> ts} map of maps
+   * @return scan operation
+   */
+  private Scan createScanForReadLogTimestampMap(String backupRoot) {
+    Scan scan = new Scan();
+    byte[] startRow = rowkey(TABLE_RS_LOG_MAP_PREFIX, backupRoot);
+    byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
+    stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
+    scan.setStartRow(startRow);
+    scan.setStopRow(stopRow);
+    scan.addFamily(BackupSystemTable.META_FAMILY);
+
+    return scan;
+  }
+
+  /**
+   * Get table name from rowkey
+   * @param cloneRow rowkey
+   * @return table name
+   */
+  private String getTableNameForReadLogTimestampMap(byte[] cloneRow) {
+    String s = Bytes.toString(cloneRow);
+    int index = s.lastIndexOf(NULL);
+    return s.substring(index + 1);
+  }
+
+  /**
+   * Creates Put to store RS last log result
+   * @param server server name
+   * @param timestamp log roll result (timestamp)
+   * @return put operation
+   */
+  private Put createPutForRegionServerLastLogRollResult(String server, Long 
timestamp,
+      String backupRoot) {
+    Put put = new Put(rowkey(RS_LOG_TS_PREFIX, backupRoot, NULL, server));
+    put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("rs-log-ts"),
+      Bytes.toBytes(timestamp));
+    return put;
+  }
+
+  /**
+   * Creates Scan operation to load last RS log roll results
+   * @return scan operation
+   */
+  private Scan createScanForReadRegionServerLastLogRollResult(String 
backupRoot) {
+    Scan scan = new Scan();
+    byte[] startRow = rowkey(RS_LOG_TS_PREFIX, backupRoot);
+    byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
+    stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
+    scan.setStartRow(startRow);
+    scan.setStopRow(stopRow);
+    scan.addFamily(BackupSystemTable.META_FAMILY);
+    scan.setMaxVersions(1);
+
+    return scan;
+  }
+
+  /**
+   * Get server's name from rowkey
+   * @param row rowkey
+   * @return server's name
+   */
+  private String getServerNameForReadRegionServerLastLogRollResult(byte[] row) 
{
+    String s = Bytes.toString(row);
+    int index = s.lastIndexOf(NULL);
+    return s.substring(index + 1);
+  }
+
+  /*
+   * Creates Put's for bulk load resulting from running LoadIncrementalHFiles
+   */
+  static List<Put> createPutForCommittedBulkload(TableName table, byte[] 
region,
+      Map<byte[], List<Path>> finalPaths) {
+    List<Put> puts = new ArrayList<>();
+    for (Map.Entry<byte[], List<Path>> entry : finalPaths.entrySet()) {
+      for (Path path : entry.getValue()) {
+        String file = path.toString();
+        int lastSlash = file.lastIndexOf("/");
+        String filename = file.substring(lastSlash + 1);
+        Put put =
+            new Put(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM,
+              Bytes.toString(region), BLK_LD_DELIM, filename));
+        put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, table.getName());
+        put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, entry.getKey());
+        put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, 
file.getBytes());
+        put.addColumn(BackupSystemTable.META_FAMILY, STATE_COL, BL_COMMIT);
+        puts.add(put);
+        LOG.debug("writing done bulk path " + file + " for " + table + " " + 
Bytes.toString(region));
+      }
+    }
+    return puts;
+  }
+
+  public static void snapshot(Connection conn) throws IOException {
+
+    try (Admin admin = conn.getAdmin();) {
+      Configuration conf = conn.getConfiguration();
+      admin.snapshot(BackupSystemTable.getSnapshotName(conf), 
BackupSystemTable.getTableName(conf));
+    }
+  }
+
+  public static void restoreFromSnapshot(Connection conn) throws IOException {
+
+    Configuration conf = conn.getConfiguration();
+    LOG.debug("Restoring " + BackupSystemTable.getTableNameAsString(conf) + " 
from snapshot");
+    try (Admin admin = conn.getAdmin();) {
+      String snapshotName = BackupSystemTable.getSnapshotName(conf);
+      if (snapshotExists(admin, snapshotName)) {
+        admin.disableTable(BackupSystemTable.getTableName(conf));
+        admin.restoreSnapshot(snapshotName);
+        admin.enableTable(BackupSystemTable.getTableName(conf));
+        LOG.debug("Done restoring backup system table");
+      } else {
+        // Snapshot does not exists, i.e completeBackup failed after
+        // deleting backup system table snapshot
+        // In this case we log WARN and proceed
+        LOG.warn("Could not restore backup system table. Snapshot " + 
snapshotName
+            + " does not exists.");
+      }
+    }
+  }
+
+  protected static boolean snapshotExists(Admin admin, String snapshotName) 
throws IOException {
+
+    List<SnapshotDescription> list = admin.listSnapshots();
+    for (SnapshotDescription desc : list) {
+      if (desc.getName().equals(snapshotName)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  public static boolean snapshotExists(Connection conn) throws IOException {
+    return snapshotExists(conn.getAdmin(), 
getSnapshotName(conn.getConfiguration()));
+  }
+
+  public static void deleteSnapshot(Connection conn) throws IOException {
+
+    Configuration conf = conn.getConfiguration();
+    LOG.debug("Deleting " + BackupSystemTable.getSnapshotName(conf) + " from 
the system");
+    try (Admin admin = conn.getAdmin();) {
+      String snapshotName = BackupSystemTable.getSnapshotName(conf);
+      if (snapshotExists(admin, snapshotName)) {
+        admin.deleteSnapshot(snapshotName);
+        LOG.debug("Done deleting backup system table snapshot");
+      } else {
+        LOG.error("Snapshot " + snapshotName + " does not exists");
+      }
+    }
+  }
+
+  /*
+   * Creates Put's for bulk load resulting from running LoadIncrementalHFiles
+   */
+  static List<Put> createPutForPreparedBulkload(TableName table, byte[] region,
+      final byte[] family, final List<Pair<Path, Path>> pairs) {
+    List<Put> puts = new ArrayList<>();
+    for (Pair<Path, Path> pair : pairs) {
+      Path path = pair.getSecond();
+      String file = path.toString();
+      int lastSlash = file.lastIndexOf("/");
+      String filename = file.substring(lastSlash + 1);
+      Put put =
+          new Put(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM, 
Bytes.toString(region),
+            BLK_LD_DELIM, filename));
+      put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, table.getName());
+      put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, family);
+      put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, file.getBytes());
+      put.addColumn(BackupSystemTable.META_FAMILY, STATE_COL, BL_PREPARE);
+      puts.add(put);
+      LOG.debug("writing raw bulk path " + file + " for " + table + " " + 
Bytes.toString(region));
+    }
+    return puts;
+  }
+
+  public static List<Delete> createDeleteForOrigBulkLoad(List<TableName> lst) {
+    List<Delete> lstDels = new ArrayList<>();
+    for (TableName table : lst) {
+      Delete del = new Delete(rowkey(BULK_LOAD_PREFIX, table.toString(), 
BLK_LD_DELIM));
+      del.addFamily(BackupSystemTable.META_FAMILY);
+      lstDels.add(del);
+    }
+    return lstDels;
+  }
+
+  private Put createPutForDeleteOperation(String[] backupIdList) {
+
+    byte[] value = Bytes.toBytes(StringUtils.join(backupIdList, ","));
+    Put put = new Put(DELETE_OP_ROW);
+    put.addColumn(META_FAMILY, FAM_COL, value);
+    return put;
+  }
+
+  private Delete createDeleteForBackupDeleteOperation() {
+
+    Delete delete = new Delete(DELETE_OP_ROW);
+    delete.addFamily(META_FAMILY);
+    return delete;
+  }
+
+  private Get createGetForDeleteOperation() {
+
+    Get get = new Get(DELETE_OP_ROW);
+    get.addFamily(META_FAMILY);
+    return get;
+  }
+
+  public void startDeleteOperation(String[] backupIdList) throws IOException {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Start delete operation for backups: " + 
StringUtils.join(backupIdList));
+    }
+    Put put = createPutForDeleteOperation(backupIdList);
+    try (Table table = connection.getTable(tableName)) {
+      table.put(put);
+    }
+  }
+
+  public void finishDeleteOperation() throws IOException {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Finsih delete operation for backup ids ");
+    }
+    Delete delete = createDeleteForBackupDeleteOperation();
+    try (Table table = connection.getTable(tableName)) {
+      table.delete(delete);
+    }
+  }
+
+  public String[] getListOfBackupIdsFromDeleteOperation() throws IOException {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Get delete operation for backup ids ");
+    }
+    Get get = createGetForDeleteOperation();
+    try (Table table = connection.getTable(tableName)) {
+      Result res = table.get(get);
+      if (res.isEmpty()) {
+        return null;
+      }
+      Cell cell = res.listCells().get(0);
+      byte[] val = CellUtil.cloneValue(cell);
+      if (val.length == 0) {
+        return null;
+      }
+      return new String(val).split(",");
+    }
+  }
+
+  private Put createPutForMergeOperation(String[] backupIdList) {
+
+    byte[] value = Bytes.toBytes(StringUtils.join(backupIdList, ","));
+    Put put = new Put(MERGE_OP_ROW);
+    put.addColumn(META_FAMILY, FAM_COL, value);
+    return put;
+  }
+
+  public boolean isMergeInProgress() throws IOException {
+    Get get = new Get(MERGE_OP_ROW);
+    try (Table table = connection.getTable(tableName)) {
+      Result res = table.get(get);
+      if (res.isEmpty()) {
+        return false;
+      }
+      return true;
+    }
+  }
+
+  private Put createPutForUpdateTablesForMerge(List<TableName> tables) {
+
+    byte[] value = Bytes.toBytes(StringUtils.join(tables, ","));
+    Put put = new Put(MERGE_OP_ROW);
+    put.addColumn(META_FAMILY, PATH_COL, value);
+    return put;
+  }
+
+  private Delete createDeleteForBackupMergeOperation() {
+
+    Delete delete = new Delete(MERGE_OP_ROW);
+    delete.addFamily(META_FAMILY);
+    return delete;
+  }
+
+  private Get createGetForMergeOperation() {
+
+    Get get = new Get(MERGE_OP_ROW);
+    get.addFamily(META_FAMILY);
+    return get;
+  }
+
+  public void startMergeOperation(String[] backupIdList) throws IOException {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Start merge operation for backups: " + 
StringUtils.join(backupIdList));
+    }
+    Put put = createPutForMergeOperation(backupIdList);
+    try (Table table = connection.getTable(tableName)) {
+      table.put(put);
+    }
+  }
+
+  public void updateProcessedTablesForMerge(List<TableName> tables) throws 
IOException {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Update tables for merge : " + StringUtils.join(tables, ","));
+    }
+    Put put = createPutForUpdateTablesForMerge(tables);
+    try (Table table = connection.getTable(tableName)) {
+      table.put(put);
+    }
+  }
+
+  public void finishMergeOperation() throws IOException {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Finsih merge operation for backup ids ");
+    }
+    Delete delete = createDeleteForBackupMergeOperation();
+    try (Table table = connection.getTable(tableName)) {
+      table.delete(delete);
+    }
+  }
+
+  public String[] getListOfBackupIdsFromMergeOperation() throws IOException {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Get backup ids for merge operation");
+    }
+    Get get = createGetForMergeOperation();
+    try (Table table = connection.getTable(tableName)) {
+      Result res = table.get(get);
+      if (res.isEmpty()) {
+        return null;
+      }
+      Cell cell = res.listCells().get(0);
+      byte[] val = CellUtil.cloneValue(cell);
+      if (val.length == 0) {
+        return null;
+      }
+      return new String(val).split(",");
+    }
+  }
+
+  static Scan createScanForOrigBulkLoadedFiles(TableName table) throws 
IOException {
+    Scan scan = new Scan();
+    byte[] startRow = rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM);
+    byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
+    stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
+    scan.withStartRow(startRow);
+    scan.withStopRow(stopRow);
+    scan.addFamily(BackupSystemTable.META_FAMILY);
+    scan.setMaxVersions(1);
+    return scan;
+  }
+
+  static String getTableNameFromOrigBulkLoadRow(String rowStr) {
+    String[] parts = rowStr.split(BLK_LD_DELIM);
+    return parts[1];
+  }
+
+  static String getRegionNameFromOrigBulkLoadRow(String rowStr) {
+    // format is bulk : namespace : table : region : file
+    String[] parts = rowStr.split(BLK_LD_DELIM);
+    int idx = 3;
+    if (parts.length == 4) {
+      // the table is in default namespace
+      idx = 2;
+    }
+    LOG.debug("bulk row string " + rowStr + " region " + parts[idx]);
+    return parts[idx];
+  }
+
+  /*
+   * Used to query bulk loaded hfiles which have been copied by incremental 
backup
+   * @param backupId the backup Id. It can be null when querying for all tables
+   * @return the Scan object
+   */
+  static Scan createScanForBulkLoadedFiles(String backupId) throws IOException 
{
+    Scan scan = new Scan();
+    byte[] startRow =
+        backupId == null ? BULK_LOAD_PREFIX_BYTES : rowkey(BULK_LOAD_PREFIX, 
backupId
+            + BLK_LD_DELIM);
+    byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
+    stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
+    scan.setStartRow(startRow);
+    scan.setStopRow(stopRow);
+    // scan.setTimeRange(lower, Long.MAX_VALUE);
+    scan.addFamily(BackupSystemTable.META_FAMILY);
+    scan.setMaxVersions(1);
+    return scan;
+  }
+
+  static Put createPutForBulkLoadedFile(TableName tn, byte[] fam, String p, 
String backupId,
+      long ts, int idx) {
+    Put put = new Put(rowkey(BULK_LOAD_PREFIX, backupId + BLK_LD_DELIM + ts + 
BLK_LD_DELIM + idx));
+    put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, tn.getName());
+    put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, fam);
+    put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, p.getBytes());
+    return put;
+  }
+
+  /**
+   * Creates put list for list of WAL files
+   * @param files list of WAL file paths
+   * @param backupId backup id
+   * @return put list
+   * @throws IOException exception
+   */
+  private List<Put>
+      createPutsForAddWALFiles(List<String> files, String backupId, String 
backupRoot)
+          throws IOException {
+
+    List<Put> puts = new ArrayList<Put>();
+    for (String file : files) {
+      Put put = new Put(rowkey(WALS_PREFIX, 
BackupUtils.getUniqueWALFileNamePart(file)));
+      put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("backupId"),
+        Bytes.toBytes(backupId));
+      put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("file"), 
Bytes.toBytes(file));
+      put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("root"), 
Bytes.toBytes(backupRoot));
+      puts.add(put);
+    }
+    return puts;
+  }
+
+  /**
+   * Creates Scan operation to load WALs
+   * @param backupRoot path to backup destination
+   * @return scan operation
+   */
+  private Scan createScanForGetWALs(String backupRoot) {
+    // TODO: support for backupRoot
+    Scan scan = new Scan();
+    byte[] startRow = Bytes.toBytes(WALS_PREFIX);
+    byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
+    stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
+    scan.setStartRow(startRow);
+    scan.setStopRow(stopRow);
+    scan.addFamily(BackupSystemTable.META_FAMILY);
+    return scan;
+  }
+
+  /**
+   * Creates Get operation for a given wal file name TODO: support for backup 
destination
+   * @param file file
+   * @return get operation
+   * @throws IOException exception
+   */
+  private Get createGetForCheckWALFile(String file) throws IOException {
+    Get get = new Get(rowkey(WALS_PREFIX, 
BackupUtils.getUniqueWALFileNamePart(file)));
+    // add backup root column
+    get.addFamily(BackupSystemTable.META_FAMILY);
+    return get;
+  }
+
+  /**
+   * Creates Scan operation to load backup set list
+   * @return scan operation
+   */
+  private Scan createScanForBackupSetList() {
+    Scan scan = new Scan();
+    byte[] startRow = Bytes.toBytes(SET_KEY_PREFIX);
+    byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
+    stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
+    scan.setStartRow(startRow);
+    scan.setStopRow(stopRow);
+    scan.addFamily(BackupSystemTable.META_FAMILY);
+    return scan;
+  }
+
+  /**
+   * Creates Get operation to load backup set content
+   * @return get operation
+   */
+  private Get createGetForBackupSet(String name) {
+    Get get = new Get(rowkey(SET_KEY_PREFIX, name));
+    get.addFamily(BackupSystemTable.META_FAMILY);
+    return get;
+  }
+
+  /**
+   * Creates Delete operation to delete backup set content
+   * @param name backup set's name
+   * @return delete operation
+   */
+  private Delete createDeleteForBackupSet(String name) {
+    Delete del = new Delete(rowkey(SET_KEY_PREFIX, name));
+    del.addFamily(BackupSystemTable.META_FAMILY);
+    return del;
+  }
+
+  /**
+   * Creates Put operation to update backup set content
+   * @param name backup set's name
+   * @param tables list of tables
+   * @return put operation
+   */
+  private Put createPutForBackupSet(String name, String[] tables) {
+    Put put = new Put(rowkey(SET_KEY_PREFIX, name));
+    byte[] value = convertToByteArray(tables);
+    put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("tables"), 
value);
+    return put;
+  }
+
+  private byte[] convertToByteArray(String[] tables) {
+    return StringUtils.join(tables, ",").getBytes();
+  }
+
+  /**
+   * Converts cell to backup set list.
+   * @param current current cell
+   * @return backup set as array of table names
+   * @throws IOException
+   */
+  private String[] cellValueToBackupSet(Cell current) throws IOException {
+    byte[] data = CellUtil.cloneValue(current);
+    if (data != null && data.length > 0) {
+      return Bytes.toString(data).split(",");
+    } else {
+      return new String[0];
+    }
+  }
+
+  /**
+   * Converts cell key to backup set name.
+   * @param current current cell
+   * @return backup set name
+   * @throws IOException
+   */
+  private String cellKeyToBackupSetName(Cell current) throws IOException {
+    byte[] data = CellUtil.cloneRow(current);
+    return Bytes.toString(data).substring(SET_KEY_PREFIX.length());
+  }
+
+  private static byte[] rowkey(String s, String... other) {
+    StringBuilder sb = new StringBuilder(s);
+    for (String ss : other) {
+      sb.append(ss);
+    }
+    return sb.toString().getBytes();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/2dda3712/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java
----------------------------------------------------------------------
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java
new file mode 100644
index 0000000..e323e96
--- /dev/null
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.backup.impl;
+
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.BACKUP_ATTEMPTS_PAUSE_MS_KEY;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.BACKUP_MAX_ATTEMPTS_KEY;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.DEFAULT_BACKUP_ATTEMPTS_PAUSE_MS;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.DEFAULT_BACKUP_MAX_ATTEMPTS;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.BackupCopyJob;
+import org.apache.hadoop.hbase.backup.BackupInfo;
+import org.apache.hadoop.hbase.backup.BackupInfo.BackupPhase;
+import org.apache.hadoop.hbase.backup.BackupInfo.BackupState;
+import org.apache.hadoop.hbase.backup.BackupRequest;
+import org.apache.hadoop.hbase.backup.BackupRestoreFactory;
+import org.apache.hadoop.hbase.backup.BackupType;
+import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager;
+import org.apache.hadoop.hbase.backup.util.BackupUtils;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+
+/**
+ * Full table backup implementation
+ *
+ */
+@InterfaceAudience.Private
+public class FullTableBackupClient extends TableBackupClient {
+  private static final Log LOG = 
LogFactory.getLog(FullTableBackupClient.class);
+
+  public FullTableBackupClient() {
+  }
+
+  public FullTableBackupClient(final Connection conn, final String backupId, 
BackupRequest request)
+      throws IOException {
+    super(conn, backupId, request);
+  }
+
+  /**
+   * Do snapshot copy.
+   * @param backupInfo backup info
+   * @throws Exception exception
+   */
+  protected void snapshotCopy(BackupInfo backupInfo) throws Exception {
+    LOG.info("Snapshot copy is starting.");
+
+    // set overall backup phase: snapshot_copy
+    backupInfo.setPhase(BackupPhase.SNAPSHOTCOPY);
+
+    // call ExportSnapshot to copy files based on hbase snapshot for backup
+    // ExportSnapshot only support single snapshot export, need loop for 
multiple tables case
+    BackupCopyJob copyService = BackupRestoreFactory.getBackupCopyJob(conf);
+
+    // number of snapshots matches number of tables
+    float numOfSnapshots = backupInfo.getSnapshotNames().size();
+
+    LOG.debug("There are " + (int) numOfSnapshots + " snapshots to be 
copied.");
+
+    for (TableName table : backupInfo.getTables()) {
+      // Currently we simply set the sub copy tasks by counting the table 
snapshot number, we can
+      // calculate the real files' size for the percentage in the future.
+      // backupCopier.setSubTaskPercntgInWholeTask(1f / numOfSnapshots);
+      int res = 0;
+      String[] args = new String[4];
+      args[0] = "-snapshot";
+      args[1] = backupInfo.getSnapshotName(table);
+      args[2] = "-copy-to";
+      args[3] = backupInfo.getTableBackupDir(table);
+
+      LOG.debug("Copy snapshot " + args[1] + " to " + args[3]);
+      res = copyService.copy(backupInfo, backupManager, conf, BackupType.FULL, 
args);
+      // if one snapshot export failed, do not continue for remained snapshots
+      if (res != 0) {
+        LOG.error("Exporting Snapshot " + args[1] + " failed with return code: 
" + res + ".");
+
+        throw new IOException("Failed of exporting snapshot " + args[1] + " to 
" + args[3]
+            + " with reason code " + res);
+      }
+      LOG.info("Snapshot copy " + args[1] + " finished.");
+    }
+  }
+
+  /**
+   * Backup request execution
+   * @throws IOException
+   */
+  @Override
+  public void execute() throws IOException {
+    try (Admin admin = conn.getAdmin();) {
+
+      // Begin BACKUP
+      beginBackup(backupManager, backupInfo);
+      String savedStartCode = null;
+      boolean firstBackup = false;
+      // do snapshot for full table backup
+
+      savedStartCode = backupManager.readBackupStartCode();
+      firstBackup = savedStartCode == null || Long.parseLong(savedStartCode) 
== 0L;
+      if (firstBackup) {
+        // This is our first backup. Let's put some marker to system table so 
that we can hold the logs
+        // while we do the backup.
+        backupManager.writeBackupStartCode(0L);
+      }
+      // We roll log here before we do the snapshot. It is possible there is 
duplicate data
+      // in the log that is already in the snapshot. But if we do it after the 
snapshot, we
+      // could have data loss.
+      // A better approach is to do the roll log on each RS in the same global 
procedure as
+      // the snapshot.
+      LOG.info("Execute roll log procedure for full backup ...");
+
+      Map<String, String> props = new HashMap<String, String>();
+      props.put("backupRoot", backupInfo.getBackupRootDir());
+      
admin.execProcedure(LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_SIGNATURE,
+        LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_NAME, props);
+
+      newTimestamps = backupManager.readRegionServerLastLogRollResult();
+      if (firstBackup) {
+        // Updates registered log files
+        // We record ALL old WAL files as registered, because
+        // this is a first full backup in the system and these
+        // files are not needed for next incremental backup
+        List<String> logFiles = BackupUtils.getWALFilesOlderThan(conf, 
newTimestamps);
+        backupManager.recordWALFiles(logFiles);
+      }
+
+      // SNAPSHOT_TABLES:
+      backupInfo.setPhase(BackupPhase.SNAPSHOT);
+      for (TableName tableName : tableList) {
+        String snapshotName =
+            "snapshot_" + Long.toString(EnvironmentEdgeManager.currentTime()) 
+ "_"
+                + tableName.getNamespaceAsString() + "_" + 
tableName.getQualifierAsString();
+
+        snapshotTable(admin, tableName, snapshotName);
+        backupInfo.setSnapshotName(tableName, snapshotName);
+      }
+
+      // SNAPSHOT_COPY:
+      // do snapshot copy
+      LOG.debug("snapshot copy for " + backupId);
+      snapshotCopy(backupInfo);
+      // Updates incremental backup table set
+      backupManager.addIncrementalBackupTableSet(backupInfo.getTables());
+
+      // BACKUP_COMPLETE:
+      // set overall backup status: complete. Here we make sure to complete 
the backup.
+      // After this checkpoint, even if entering cancel process, will let the 
backup finished
+      backupInfo.setState(BackupState.COMPLETE);
+      // The table list in backupInfo is good for both full backup and 
incremental backup.
+      // For incremental backup, it contains the incremental backup table set.
+      backupManager.writeRegionServerLogTimestamp(backupInfo.getTables(), 
newTimestamps);
+
+      HashMap<TableName, HashMap<String, Long>> newTableSetTimestampMap =
+          backupManager.readLogTimestampMap();
+
+      Long newStartCode =
+          BackupUtils.getMinValue(BackupUtils
+              .getRSLogTimestampMins(newTableSetTimestampMap));
+      backupManager.writeBackupStartCode(newStartCode);
+
+      // backup complete
+      completeBackup(conn, backupInfo, backupManager, BackupType.FULL, conf);
+    } catch (Exception e) {
+      failBackup(conn, backupInfo, backupManager, e, "Unexpected 
BackupException : ",
+        BackupType.FULL, conf);
+      throw new IOException(e);
+    }
+
+  }
+
+
+  protected void snapshotTable(Admin admin, TableName tableName, String 
snapshotName)
+      throws IOException {
+
+    int maxAttempts =
+        conf.getInt(BACKUP_MAX_ATTEMPTS_KEY, DEFAULT_BACKUP_MAX_ATTEMPTS);
+    int pause =
+        conf.getInt(BACKUP_ATTEMPTS_PAUSE_MS_KEY, 
DEFAULT_BACKUP_ATTEMPTS_PAUSE_MS);
+    int attempts = 0;
+
+    while (attempts++ < maxAttempts) {
+      try {
+        admin.snapshot(snapshotName, tableName);
+        return;
+      } catch (IOException ee) {
+        LOG.warn("Snapshot attempt " + attempts + " failed for table " + 
tableName
+            + ", sleeping for " + pause + "ms", ee);
+        if (attempts < maxAttempts) {
+          try {
+            Thread.sleep(pause);
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            break;
+          }
+        }
+      }
+    }
+    throw new IOException("Failed to snapshot table "+ tableName);
+  }
+}

Reply via email to