Repository: hbase
Updated Branches:
  refs/heads/HBASE-7912 909c4efa8 -> 914d162ca


HBASE-16861 Rename Service to Task (Vladimir Rodionov)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/914d162c
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/914d162c
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/914d162c

Branch: refs/heads/HBASE-7912
Commit: 914d162ca163c818654eb912fd2c42e6ea58fa72
Parents: 909c4ef
Author: tedyu <yuzhih...@gmail.com>
Authored: Mon Oct 17 11:59:54 2016 -0700
Committer: tedyu <yuzhih...@gmail.com>
Committed: Mon Oct 17 11:59:54 2016 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/backup/BackupCopyService.java  |  53 ---
 .../hadoop/hbase/backup/BackupCopyTask.java     |  53 +++
 .../backup/BackupRestoreServerFactory.java      |  24 +-
 .../hadoop/hbase/backup/RestoreService.java     |  50 ---
 .../apache/hadoop/hbase/backup/RestoreTask.java |  50 +++
 .../backup/impl/FullTableBackupClient.java      |   4 +-
 .../impl/IncrementalTableBackupClient.java      |   4 +-
 .../mapreduce/MapReduceBackupCopyService.java   | 349 -------------------
 .../mapreduce/MapReduceBackupCopyTask.java      | 349 +++++++++++++++++++
 .../mapreduce/MapReduceRestoreService.java      | 171 ---------
 .../backup/mapreduce/MapReduceRestoreTask.java  | 171 +++++++++
 .../hbase/backup/util/RestoreServerUtil.java    |   6 +-
 12 files changed, 642 insertions(+), 642 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/914d162c/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupCopyService.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupCopyService.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupCopyService.java
deleted file mode 100644
index 6c70123..0000000
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupCopyService.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.backup;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.backup.impl.BackupManager;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.classification.InterfaceStability;
-
-@InterfaceAudience.Private
-@InterfaceStability.Evolving
-public interface BackupCopyService extends Configurable {
-
-  /**
-   * Copy backup data
-   * @param backupContext - context
-   * @param backupManager  - manager
-   * @param conf - configuration
-   * @param copyType - copy type
-   * @param options - array of options (implementation-specific)
-   * @return result (0 - success)
-   * @throws IOException
-   */
-  public int copy(BackupInfo backupContext, BackupManager backupManager, 
Configuration conf,
-      BackupType copyType, String[] options) throws IOException;
-  
-
-   /**
-    * Cancel copy job
-    * @param jobHandler - copy job handler
-    * @throws IOException
-    */
-   public void cancelCopyJob(String jobHandler) throws IOException;  
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/914d162c/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupCopyTask.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupCopyTask.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupCopyTask.java
new file mode 100644
index 0000000..ba23bd4
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupCopyTask.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.backup;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.backup.impl.BackupManager;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public interface BackupCopyTask extends Configurable {
+
+  /**
+   * Copy backup data task
+   * @param backupContext - context
+   * @param backupManager  - manager
+   * @param conf - configuration
+   * @param copyType - copy type
+   * @param options - array of options (implementation-specific)
+   * @return result (0 - success)
+   * @throws IOException
+   */
+  public int copy(BackupInfo backupContext, BackupManager backupManager, 
Configuration conf,
+      BackupType copyType, String[] options) throws IOException;
+  
+
+   /**
+    * Cancel copy job
+    * @param jobHandler - copy job handler
+    * @throws IOException
+    */
+   public void cancelCopyJob(String jobHandler) throws IOException;  
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/914d162c/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreServerFactory.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreServerFactory.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreServerFactory.java
index 7644a4d..06e367f 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreServerFactory.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreServerFactory.java
@@ -18,8 +18,8 @@
 package org.apache.hadoop.hbase.backup;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.backup.mapreduce.MapReduceBackupCopyService;
-import org.apache.hadoop.hbase.backup.mapreduce.MapReduceRestoreService;
+import org.apache.hadoop.hbase.backup.mapreduce.MapReduceBackupCopyTask;
+import org.apache.hadoop.hbase.backup.mapreduce.MapReduceRestoreTask;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.util.ReflectionUtils;
@@ -40,11 +40,11 @@ public final class BackupRestoreServerFactory {
    * @param conf - configuration
    * @return backup restore service instance
    */
-  public static RestoreService getRestoreService(Configuration conf) {
-    Class<? extends RestoreService> cls =
-        conf.getClass(HBASE_INCR_RESTORE_IMPL_CLASS, 
MapReduceRestoreService.class,
-          RestoreService.class);
-    RestoreService service =  ReflectionUtils.newInstance(cls, conf);
+  public static RestoreTask getRestoreService(Configuration conf) {
+    Class<? extends RestoreTask> cls =
+        conf.getClass(HBASE_INCR_RESTORE_IMPL_CLASS, 
MapReduceRestoreTask.class,
+          RestoreTask.class);
+    RestoreTask service =  ReflectionUtils.newInstance(cls, conf);
     service.setConf(conf);
     return service;
   }
@@ -54,11 +54,11 @@ public final class BackupRestoreServerFactory {
    * @param conf - configuration
    * @return backup copy service
    */
-  public static BackupCopyService getBackupCopyService(Configuration conf) {
-    Class<? extends BackupCopyService> cls =
-        conf.getClass(HBASE_BACKUP_COPY_IMPL_CLASS, 
MapReduceBackupCopyService.class,
-          BackupCopyService.class);
-    BackupCopyService service = ReflectionUtils.newInstance(cls, conf);;
+  public static BackupCopyTask getBackupCopyService(Configuration conf) {
+    Class<? extends BackupCopyTask> cls =
+        conf.getClass(HBASE_BACKUP_COPY_IMPL_CLASS, 
MapReduceBackupCopyTask.class,
+          BackupCopyTask.class);
+    BackupCopyTask service = ReflectionUtils.newInstance(cls, conf);;
     service.setConf(conf);
     return service;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/914d162c/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/RestoreService.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/RestoreService.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/RestoreService.java
deleted file mode 100644
index 2da98c2..0000000
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/RestoreService.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.backup;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.classification.InterfaceStability;
-
-@InterfaceAudience.Private
-@InterfaceStability.Evolving
-
-/**
- * Backup restore service interface
- * Concrete implementation is provided by backup provider.
- */
-
-public interface RestoreService extends Configurable{
-
-  /**
-   * Run restore operation
-   * @param dirPaths - path array of WAL log directories
-   * @param fromTables - from tables
-   * @param toTables - to tables
-   * @param fullBackupRestore - full backup restore
-   * @throws IOException
-   */
-  public void run(Path[] dirPaths, TableName[] fromTables, 
-      TableName[] toTables, boolean fullBackupRestore)
-    throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/914d162c/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/RestoreTask.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/RestoreTask.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/RestoreTask.java
new file mode 100644
index 0000000..fa4ed3a
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/RestoreTask.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.backup;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+
+/**
+ * Backup restore task interface
+ * Concrete implementation is provided by backup provider.
+ */
+
+public interface RestoreTask extends Configurable{
+
+  /**
+   * Run restore task
+   * @param dirPaths - path array of WAL log directories
+   * @param fromTables - from tables
+   * @param toTables - to tables
+   * @param fullBackupRestore - full backup restore
+   * @throws IOException
+   */
+  public void run(Path[] dirPaths, TableName[] fromTables, 
+      TableName[] toTables, boolean fullBackupRestore)
+    throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/914d162c/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java
index 5a8b941..ce2aea9 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java
@@ -31,7 +31,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.backup.BackupCopyService;
+import org.apache.hadoop.hbase.backup.BackupCopyTask;
 import org.apache.hadoop.hbase.backup.BackupInfo;
 import org.apache.hadoop.hbase.backup.BackupInfo.BackupPhase;
 import org.apache.hadoop.hbase.backup.BackupInfo.BackupState;
@@ -266,7 +266,7 @@ public class FullTableBackupClient {
 
     // call ExportSnapshot to copy files based on hbase snapshot for backup
     // ExportSnapshot only support single snapshot export, need loop for 
multiple tables case
-    BackupCopyService copyService = 
BackupRestoreServerFactory.getBackupCopyService(conf);
+    BackupCopyTask copyService = 
BackupRestoreServerFactory.getBackupCopyService(conf);
 
     // number of snapshots matches number of tables
     float numOfSnapshots = backupContext.getSnapshotNames().size();

http://git-wip-us.apache.org/repos/asf/hbase/blob/914d162c/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
index 0a8d14d..8acace0 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
@@ -31,7 +31,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.backup.BackupCopyService;
+import org.apache.hadoop.hbase.backup.BackupCopyTask;
 import org.apache.hadoop.hbase.backup.BackupInfo;
 import org.apache.hadoop.hbase.backup.BackupInfo.BackupPhase;
 import org.apache.hadoop.hbase.backup.BackupInfo.BackupState;
@@ -114,7 +114,7 @@ public class IncrementalTableBackupClient {
     String[] strArr = incrBackupFileList.toArray(new 
String[incrBackupFileList.size() + 1]);
     strArr[strArr.length - 1] = backupContext.getHLogTargetDir();
 
-    BackupCopyService copyService = 
BackupRestoreServerFactory.getBackupCopyService(conf);
+    BackupCopyTask copyService = 
BackupRestoreServerFactory.getBackupCopyService(conf);
     int counter = 0;
     int MAX_ITERAIONS = 2;
     while (counter++ < MAX_ITERAIONS) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/914d162c/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyService.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyService.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyService.java
deleted file mode 100644
index a058ebc..0000000
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyService.java
+++ /dev/null
@@ -1,349 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.backup.mapreduce;
-
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.lang.reflect.Method;
-import java.math.BigDecimal;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.backup.BackupCopyService;
-import org.apache.hadoop.hbase.backup.BackupInfo;
-import org.apache.hadoop.hbase.backup.BackupType;
-import org.apache.hadoop.hbase.backup.impl.BackupManager;
-import org.apache.hadoop.hbase.backup.util.BackupServerUtil;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.classification.InterfaceStability;
-import org.apache.hadoop.hbase.snapshot.ExportSnapshot;
-import org.apache.hadoop.mapreduce.Cluster;
-import org.apache.hadoop.mapreduce.Counters;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.tools.DistCp;
-import org.apache.hadoop.tools.DistCpConstants;
-import org.apache.hadoop.tools.DistCpOptions;
-import org.apache.zookeeper.KeeperException.NoNodeException;
-/**
- * Copier for backup operation. Basically, there are 2 types of copy. One is 
copying from snapshot,
- * which bases on extending ExportSnapshot's function with copy progress 
reporting to ZooKeeper
- * implementation. The other is copying for incremental log files, which bases 
on extending
- * DistCp's function with copy progress reporting to ZooKeeper implementation.
- *
- * For now this is only a wrapper. The other features such as progress and 
increment backup will be
- * implemented in future jira
- */
-
-@InterfaceAudience.Private
-@InterfaceStability.Evolving
-public class MapReduceBackupCopyService implements BackupCopyService {
-  private static final Log LOG = 
LogFactory.getLog(MapReduceBackupCopyService.class);
-
-  private Configuration conf;
-  // private static final long BYTES_PER_MAP = 2 * 256 * 1024 * 1024;
-
-  // Accumulated progress within the whole backup process for the copy 
operation
-  private float progressDone = 0.1f;
-  private long bytesCopied = 0;
-  private static float INIT_PROGRESS = 0.1f;
-
-  // The percentage of the current copy task within the whole task if multiple 
time copies are
-  // needed. The default value is 100%, which means only 1 copy task for the 
whole.
-  private float subTaskPercntgInWholeTask = 1f;
-
-  public MapReduceBackupCopyService() {
-  }
-
-  @Override
-  public Configuration getConf() {
-    return conf;
-  }
-
-  @Override
-  public void setConf(Configuration conf) {
-    this.conf = conf;
-  }
-
-  /**
-   * Get the current copy task percentage within the whole task if multiple 
copies are needed.
-   * @return the current copy task percentage
-   */
-  public float getSubTaskPercntgInWholeTask() {
-    return subTaskPercntgInWholeTask;
-  }
-
-  /**
-   * Set the current copy task percentage within the whole task if multiple 
copies are needed. Must
-   * be called before calling
-   * {@link #copy(BackupHandler, Configuration, Type, String[])}
-   * @param subTaskPercntgInWholeTask The percentage of the copy subtask
-   */
-  public void setSubTaskPercntgInWholeTask(float subTaskPercntgInWholeTask) {
-    this.subTaskPercntgInWholeTask = subTaskPercntgInWholeTask;
-  }
-
-  class SnapshotCopy extends ExportSnapshot {
-    private BackupInfo backupContext;
-    private TableName table;
-
-    public SnapshotCopy(BackupInfo backupContext, TableName table) {
-      super();
-      this.backupContext = backupContext;
-      this.table = table;
-    }
-
-    public TableName getTable() {
-      return this.table;
-    }
-  }
-
-  /**
-   * Update the ongoing backup with new progress.
-   * @param backupContext backup context
-   * 
-   * @param newProgress progress
-   * @param bytesCopied bytes copied
-   * @throws NoNodeException exception
-   */
-  static void updateProgress(BackupInfo backupContext, BackupManager 
backupManager,
-      int newProgress, long bytesCopied) throws IOException {
-    // compose the new backup progress data, using fake number for now
-    String backupProgressData = newProgress + "%";
-
-    backupContext.setProgress(newProgress);
-    backupManager.updateBackupInfo(backupContext);
-    LOG.debug("Backup progress data \"" + backupProgressData
-      + "\" has been updated to hbase:backup for " + 
backupContext.getBackupId());
-  }
-
-  // Extends DistCp for progress updating to hbase:backup
-  // during backup. Using DistCpV2 (MAPREDUCE-2765).
-  // Simply extend it and override execute() method to get the
-  // Job reference for progress updating.
-  // Only the argument "src1, [src2, [...]] dst" is supported,
-  // no more DistCp options.
-  class BackupDistCp extends DistCp {
-
-    private BackupInfo backupContext;
-    private BackupManager backupManager;
-
-    public BackupDistCp(Configuration conf, DistCpOptions options, BackupInfo 
backupContext,
-        BackupManager backupManager)
-        throws Exception {
-      super(conf, options);
-      this.backupContext = backupContext;
-      this.backupManager = backupManager;
-    }
-
-    @Override
-    public Job execute() throws Exception {
-
-      // reflection preparation for private methods and fields
-      Class<?> classDistCp = org.apache.hadoop.tools.DistCp.class;
-      Method methodCreateMetaFolderPath = 
classDistCp.getDeclaredMethod("createMetaFolderPath");
-      Method methodCreateJob = classDistCp.getDeclaredMethod("createJob");
-      Method methodCreateInputFileListing =
-          classDistCp.getDeclaredMethod("createInputFileListing", Job.class);
-      Method methodCleanup = classDistCp.getDeclaredMethod("cleanup");
-
-      Field fieldInputOptions = classDistCp.getDeclaredField("inputOptions");
-      Field fieldMetaFolder = classDistCp.getDeclaredField("metaFolder");
-      Field fieldJobFS = classDistCp.getDeclaredField("jobFS");
-      Field fieldSubmitted = classDistCp.getDeclaredField("submitted");
-
-      methodCreateMetaFolderPath.setAccessible(true);
-      methodCreateJob.setAccessible(true);
-      methodCreateInputFileListing.setAccessible(true);
-      methodCleanup.setAccessible(true);
-
-      fieldInputOptions.setAccessible(true);
-      fieldMetaFolder.setAccessible(true);
-      fieldJobFS.setAccessible(true);
-      fieldSubmitted.setAccessible(true);
-
-      // execute() logic starts here
-      assert fieldInputOptions.get(this) != null;
-      assert getConf() != null;
-
-      Job job = null;
-      try {
-        synchronized (this) {
-          // Don't cleanup while we are setting up.
-          fieldMetaFolder.set(this, methodCreateMetaFolderPath.invoke(this));
-          fieldJobFS.set(this, ((Path) 
fieldMetaFolder.get(this)).getFileSystem(getConf()));
-          job = (Job) methodCreateJob.invoke(this);
-        }
-        methodCreateInputFileListing.invoke(this, job);
-
-        // Get the total length of the source files
-        List<Path> srcs = ((DistCpOptions) 
fieldInputOptions.get(this)).getSourcePaths();
-        
-        long totalSrcLgth = 0;
-        for (Path aSrc : srcs) {
-          totalSrcLgth += 
BackupServerUtil.getFilesLength(aSrc.getFileSystem(getConf()), aSrc);
-        }
-
-        // submit the copy job
-        job.submit();
-        fieldSubmitted.set(this, true);
-
-        // after submit the MR job, set its handler in backup handler for 
cancel process
-        // this.backupHandler.copyJob = job;
-
-        // Update the copy progress to ZK every 0.5s if progress value changed
-        int progressReportFreq =
-            this.getConf().getInt("hbase.backup.progressreport.frequency", 
500);
-        float lastProgress = progressDone;
-        while (!job.isComplete()) {
-          float newProgress =
-              progressDone + job.mapProgress() * subTaskPercntgInWholeTask * 
(1 - INIT_PROGRESS);
-
-          if (newProgress > lastProgress) {
-
-            BigDecimal progressData =
-                new BigDecimal(newProgress * 100).setScale(1, 
BigDecimal.ROUND_HALF_UP);
-            String newProgressStr = progressData + "%";
-            LOG.info("Progress: " + newProgressStr);
-            updateProgress(backupContext, backupManager, 
progressData.intValue(),
-              bytesCopied);
-            LOG.debug("Backup progress data updated to hbase:backup: 
\"Progress: " + newProgressStr
-              + ".\"");
-            lastProgress = newProgress;
-          }
-          Thread.sleep(progressReportFreq);
-        }
-        // update the progress data after copy job complete
-        float newProgress =
-            progressDone + job.mapProgress() * subTaskPercntgInWholeTask * (1 
- INIT_PROGRESS);
-        BigDecimal progressData =
-            new BigDecimal(newProgress * 100).setScale(1, 
BigDecimal.ROUND_HALF_UP);
-
-        String newProgressStr = progressData + "%";
-        LOG.info("Progress: " + newProgressStr + " subTask: " + 
subTaskPercntgInWholeTask +
-            " mapProgress: " + job.mapProgress());
-
-        // accumulate the overall backup progress
-        progressDone = newProgress;
-        bytesCopied += totalSrcLgth;
-
-        updateProgress(backupContext, backupManager, progressData.intValue(),
-          bytesCopied);
-        LOG.debug("Backup progress data updated to hbase:backup: \"Progress: " 
+ newProgressStr
-          + " - " + bytesCopied + " bytes copied.\"");
-      } catch (Throwable t) {
-        LOG.error("distcp " + job.getJobID() + " encountered error", t);
-        throw t;
-      } finally {
-        if (!fieldSubmitted.getBoolean(this)) {
-          methodCleanup.invoke(this);
-        }
-      }
-
-      String jobID = job.getJobID().toString();
-      job.getConfiguration().set(DistCpConstants.CONF_LABEL_DISTCP_JOB_ID, 
jobID);
-
-      LOG.debug("DistCp job-id: " + jobID + " completed: " + job.isComplete() 
+ " " +
-          job.isSuccessful());
-      Counters ctrs = job.getCounters();
-      LOG.debug(ctrs);
-      if (job.isComplete() && !job.isSuccessful()) {
-        throw new Exception("DistCp job-id: " + jobID + " failed");
-      }
-
-      return job;
-    }
-
-  }
-
-  
-  /**
-   * Do backup copy based on different types.
-   * @param context The backup context
-   * @param conf The hadoop configuration
-   * @param copyType The backup copy type
-   * @param options Options for customized ExportSnapshot or DistCp
-   * @throws Exception exception
-   */
-  @Override
-  public int copy(BackupInfo context, BackupManager backupManager, 
Configuration conf,
-      BackupType copyType, String[] options) throws IOException {
-    int res = 0;
-
-    try {
-      if (copyType == BackupType.FULL) {
-        SnapshotCopy snapshotCp =
-            new SnapshotCopy(context, context.getTableBySnapshot(options[1]));
-        LOG.debug("Doing SNAPSHOT_COPY");
-        // Make a new instance of conf to be used by the snapshot copy class.
-        snapshotCp.setConf(new Configuration(conf));
-        res = snapshotCp.run(options);
-
-      } else if (copyType == BackupType.INCREMENTAL) {
-        LOG.debug("Doing COPY_TYPE_DISTCP");
-        setSubTaskPercntgInWholeTask(1f);
-
-        BackupDistCp distcp = new BackupDistCp(new Configuration(conf), null, 
context,
-          backupManager);
-        // Handle a special case where the source file is a single file.
-        // In this case, distcp will not create the target dir. It just take 
the
-        // target as a file name and copy source file to the target (as a file 
name).
-        // We need to create the target dir before run distcp.
-        LOG.debug("DistCp options: " + Arrays.toString(options));
-        Path dest = new Path(options[options.length-1]);
-        FileSystem destfs = dest.getFileSystem(conf);
-        if (!destfs.exists(dest)) {
-          destfs.mkdirs(dest);
-        }
-        res = distcp.run(options);
-      }
-      return res;
-
-    } catch (Exception e) {
-      throw new IOException(e);
-    }
-  }
-
-   @Override
-   public void cancelCopyJob(String jobId) throws IOException {
-     JobID id = JobID.forName(jobId);     
-     Cluster cluster = new Cluster(getConf());
-     try {
-       Job job = cluster.getJob(id);
-       if (job == null) {
-         LOG.error("No job found for " + id);
-         // should we throw exception
-       }
-       if (job.isComplete() || job.isRetired()) {
-         return;
-       }
- 
-       job.killJob();
-       LOG.debug("Killed job " + id);
-     } catch (InterruptedException e) {
-       throw new IOException(e);
-     }
-   }
-
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/914d162c/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyTask.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyTask.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyTask.java
new file mode 100644
index 0000000..cdde89e
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyTask.java
@@ -0,0 +1,349 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.mapreduce;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.BackupCopyTask;
+import org.apache.hadoop.hbase.backup.BackupInfo;
+import org.apache.hadoop.hbase.backup.BackupType;
+import org.apache.hadoop.hbase.backup.impl.BackupManager;
+import org.apache.hadoop.hbase.backup.util.BackupServerUtil;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.snapshot.ExportSnapshot;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.tools.DistCp;
+import org.apache.hadoop.tools.DistCpConstants;
+import org.apache.hadoop.tools.DistCpOptions;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+/**
+ * Copier for backup operation. Basically, there are 2 types of copy. One is 
copying from snapshot,
+ * which bases on extending ExportSnapshot's function with copy progress 
reporting to ZooKeeper
+ * implementation. The other is copying for incremental log files, which bases 
on extending
+ * DistCp's function with copy progress reporting to ZooKeeper implementation.
+ *
+ * For now this is only a wrapper. The other features such as progress and 
increment backup will be
+ * implemented in future jira
+ */
+
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class MapReduceBackupCopyTask implements BackupCopyTask {
+  private static final Log LOG = 
LogFactory.getLog(MapReduceBackupCopyTask.class);
+
+  private Configuration conf;
+  // private static final long BYTES_PER_MAP = 2 * 256 * 1024 * 1024;
+
+  // Accumulated progress within the whole backup process for the copy 
operation
+  private float progressDone = 0.1f;
+  private long bytesCopied = 0;
+  private static float INIT_PROGRESS = 0.1f;
+
+  // The percentage of the current copy task within the whole task if multiple 
time copies are
+  // needed. The default value is 100%, which means only 1 copy task for the 
whole.
+  private float subTaskPercntgInWholeTask = 1f;
+
+  public MapReduceBackupCopyTask() {
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  /**
+   * Get the current copy task percentage within the whole task if multiple 
copies are needed.
+   * @return the current copy task percentage
+   */
+  public float getSubTaskPercntgInWholeTask() {
+    return subTaskPercntgInWholeTask;
+  }
+
+  /**
+   * Set the current copy task percentage within the whole task if multiple 
copies are needed. Must
+   * be called before calling
+   * {@link #copy(BackupHandler, Configuration, Type, String[])}
+   * @param subTaskPercntgInWholeTask The percentage of the copy subtask
+   */
+  public void setSubTaskPercntgInWholeTask(float subTaskPercntgInWholeTask) {
+    this.subTaskPercntgInWholeTask = subTaskPercntgInWholeTask;
+  }
+
+  class SnapshotCopy extends ExportSnapshot {
+    private BackupInfo backupContext;
+    private TableName table;
+
+    public SnapshotCopy(BackupInfo backupContext, TableName table) {
+      super();
+      this.backupContext = backupContext;
+      this.table = table;
+    }
+
+    public TableName getTable() {
+      return this.table;
+    }
+  }
+
+  /**
+   * Update the ongoing backup with new progress.
+   * @param backupContext backup context
+   * 
+   * @param newProgress progress
+   * @param bytesCopied bytes copied
+   * @throws NoNodeException exception
+   */
+  static void updateProgress(BackupInfo backupContext, BackupManager 
backupManager,
+      int newProgress, long bytesCopied) throws IOException {
+    // compose the new backup progress data, using fake number for now
+    String backupProgressData = newProgress + "%";
+
+    backupContext.setProgress(newProgress);
+    backupManager.updateBackupInfo(backupContext);
+    LOG.debug("Backup progress data \"" + backupProgressData
+      + "\" has been updated to hbase:backup for " + 
backupContext.getBackupId());
+  }
+
+  // Extends DistCp for progress updating to hbase:backup
+  // during backup. Using DistCpV2 (MAPREDUCE-2765).
+  // Simply extend it and override execute() method to get the
+  // Job reference for progress updating.
+  // Only the argument "src1, [src2, [...]] dst" is supported,
+  // no more DistCp options.
+  class BackupDistCp extends DistCp {
+
+    private BackupInfo backupContext;
+    private BackupManager backupManager;
+
+    public BackupDistCp(Configuration conf, DistCpOptions options, BackupInfo 
backupContext,
+        BackupManager backupManager)
+        throws Exception {
+      super(conf, options);
+      this.backupContext = backupContext;
+      this.backupManager = backupManager;
+    }
+
+    @Override
+    public Job execute() throws Exception {
+
+      // reflection preparation for private methods and fields
+      Class<?> classDistCp = org.apache.hadoop.tools.DistCp.class;
+      Method methodCreateMetaFolderPath = 
classDistCp.getDeclaredMethod("createMetaFolderPath");
+      Method methodCreateJob = classDistCp.getDeclaredMethod("createJob");
+      Method methodCreateInputFileListing =
+          classDistCp.getDeclaredMethod("createInputFileListing", Job.class);
+      Method methodCleanup = classDistCp.getDeclaredMethod("cleanup");
+
+      Field fieldInputOptions = classDistCp.getDeclaredField("inputOptions");
+      Field fieldMetaFolder = classDistCp.getDeclaredField("metaFolder");
+      Field fieldJobFS = classDistCp.getDeclaredField("jobFS");
+      Field fieldSubmitted = classDistCp.getDeclaredField("submitted");
+
+      methodCreateMetaFolderPath.setAccessible(true);
+      methodCreateJob.setAccessible(true);
+      methodCreateInputFileListing.setAccessible(true);
+      methodCleanup.setAccessible(true);
+
+      fieldInputOptions.setAccessible(true);
+      fieldMetaFolder.setAccessible(true);
+      fieldJobFS.setAccessible(true);
+      fieldSubmitted.setAccessible(true);
+
+      // execute() logic starts here
+      assert fieldInputOptions.get(this) != null;
+      assert getConf() != null;
+
+      Job job = null;
+      try {
+        synchronized (this) {
+          // Don't cleanup while we are setting up.
+          fieldMetaFolder.set(this, methodCreateMetaFolderPath.invoke(this));
+          fieldJobFS.set(this, ((Path) 
fieldMetaFolder.get(this)).getFileSystem(getConf()));
+          job = (Job) methodCreateJob.invoke(this);
+        }
+        methodCreateInputFileListing.invoke(this, job);
+
+        // Get the total length of the source files
+        List<Path> srcs = ((DistCpOptions) 
fieldInputOptions.get(this)).getSourcePaths();
+        
+        long totalSrcLgth = 0;
+        for (Path aSrc : srcs) {
+          totalSrcLgth += 
BackupServerUtil.getFilesLength(aSrc.getFileSystem(getConf()), aSrc);
+        }
+
+        // submit the copy job
+        job.submit();
+        fieldSubmitted.set(this, true);
+
+        // after submit the MR job, set its handler in backup handler for 
cancel process
+        // this.backupHandler.copyJob = job;
+
+        // Update the copy progress to ZK every 0.5s if progress value changed
+        int progressReportFreq =
+            this.getConf().getInt("hbase.backup.progressreport.frequency", 
500);
+        float lastProgress = progressDone;
+        while (!job.isComplete()) {
+          float newProgress =
+              progressDone + job.mapProgress() * subTaskPercntgInWholeTask * 
(1 - INIT_PROGRESS);
+
+          if (newProgress > lastProgress) {
+
+            BigDecimal progressData =
+                new BigDecimal(newProgress * 100).setScale(1, 
BigDecimal.ROUND_HALF_UP);
+            String newProgressStr = progressData + "%";
+            LOG.info("Progress: " + newProgressStr);
+            updateProgress(backupContext, backupManager, 
progressData.intValue(),
+              bytesCopied);
+            LOG.debug("Backup progress data updated to hbase:backup: 
\"Progress: " + newProgressStr
+              + ".\"");
+            lastProgress = newProgress;
+          }
+          Thread.sleep(progressReportFreq);
+        }
+        // update the progress data after copy job complete
+        float newProgress =
+            progressDone + job.mapProgress() * subTaskPercntgInWholeTask * (1 
- INIT_PROGRESS);
+        BigDecimal progressData =
+            new BigDecimal(newProgress * 100).setScale(1, 
BigDecimal.ROUND_HALF_UP);
+
+        String newProgressStr = progressData + "%";
+        LOG.info("Progress: " + newProgressStr + " subTask: " + 
subTaskPercntgInWholeTask +
+            " mapProgress: " + job.mapProgress());
+
+        // accumulate the overall backup progress
+        progressDone = newProgress;
+        bytesCopied += totalSrcLgth;
+
+        updateProgress(backupContext, backupManager, progressData.intValue(),
+          bytesCopied);
+        LOG.debug("Backup progress data updated to hbase:backup: \"Progress: " 
+ newProgressStr
+          + " - " + bytesCopied + " bytes copied.\"");
+      } catch (Throwable t) {
+        LOG.error("distcp " + job.getJobID() + " encountered error", t);
+        throw t;
+      } finally {
+        if (!fieldSubmitted.getBoolean(this)) {
+          methodCleanup.invoke(this);
+        }
+      }
+
+      String jobID = job.getJobID().toString();
+      job.getConfiguration().set(DistCpConstants.CONF_LABEL_DISTCP_JOB_ID, 
jobID);
+
+      LOG.debug("DistCp job-id: " + jobID + " completed: " + job.isComplete() 
+ " " +
+          job.isSuccessful());
+      Counters ctrs = job.getCounters();
+      LOG.debug(ctrs);
+      if (job.isComplete() && !job.isSuccessful()) {
+        throw new Exception("DistCp job-id: " + jobID + " failed");
+      }
+
+      return job;
+    }
+
+  }
+
+  
+  /**
+   * Do backup copy based on different types.
+   * @param context The backup context
+   * @param conf The hadoop configuration
+   * @param copyType The backup copy type
+   * @param options Options for customized ExportSnapshot or DistCp
+   * @throws Exception exception
+   */
+  @Override
+  public int copy(BackupInfo context, BackupManager backupManager, 
Configuration conf,
+      BackupType copyType, String[] options) throws IOException {
+    int res = 0;
+
+    try {
+      if (copyType == BackupType.FULL) {
+        SnapshotCopy snapshotCp =
+            new SnapshotCopy(context, context.getTableBySnapshot(options[1]));
+        LOG.debug("Doing SNAPSHOT_COPY");
+        // Make a new instance of conf to be used by the snapshot copy class.
+        snapshotCp.setConf(new Configuration(conf));
+        res = snapshotCp.run(options);
+
+      } else if (copyType == BackupType.INCREMENTAL) {
+        LOG.debug("Doing COPY_TYPE_DISTCP");
+        setSubTaskPercntgInWholeTask(1f);
+
+        BackupDistCp distcp = new BackupDistCp(new Configuration(conf), null, 
context,
+          backupManager);
+        // Handle a special case where the source file is a single file.
+        // In this case, distcp will not create the target dir. It just take 
the
+        // target as a file name and copy source file to the target (as a file 
name).
+        // We need to create the target dir before run distcp.
+        LOG.debug("DistCp options: " + Arrays.toString(options));
+        Path dest = new Path(options[options.length-1]);
+        FileSystem destfs = dest.getFileSystem(conf);
+        if (!destfs.exists(dest)) {
+          destfs.mkdirs(dest);
+        }
+        res = distcp.run(options);
+      }
+      return res;
+
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
+   @Override
+   public void cancelCopyJob(String jobId) throws IOException {
+     JobID id = JobID.forName(jobId);     
+     Cluster cluster = new Cluster(getConf());
+     try {
+       Job job = cluster.getJob(id);
+       if (job == null) {
+         LOG.error("No job found for " + id);
+         // should we throw exception
+       }
+       if (job.isComplete() || job.isRetired()) {
+         return;
+       }
+ 
+       job.killJob();
+       LOG.debug("Killed job " + id);
+     } catch (InterruptedException e) {
+       throw new IOException(e);
+     }
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/914d162c/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreService.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreService.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreService.java
deleted file mode 100644
index 18c1f86..0000000
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreService.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.backup.mapreduce;
-
-import java.io.IOException;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.backup.RestoreService;
-import org.apache.hadoop.hbase.backup.util.BackupServerUtil;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.classification.InterfaceStability;
-import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
-import org.apache.hadoop.hbase.mapreduce.WALPlayer;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.util.Tool;
-
-@InterfaceAudience.Private
-@InterfaceStability.Evolving
-public class MapReduceRestoreService implements RestoreService {
-  public static final Log LOG = 
LogFactory.getLog(MapReduceRestoreService.class);
-
-  private Tool player;
-  private Configuration conf;
-
-  public MapReduceRestoreService() {
-  }
-
-  @Override
-  public void run(Path[] dirPaths, TableName[] tableNames, TableName[] 
newTableNames,
-      boolean fullBackupRestore) throws IOException {
-
-    String bulkOutputConfKey;
-
-    if (fullBackupRestore) {
-      player = new HFileSplitter();
-      bulkOutputConfKey = HFileSplitter.BULK_OUTPUT_CONF_KEY;
-    } else {
-      player = new WALPlayer();
-      bulkOutputConfKey = WALPlayer.BULK_OUTPUT_CONF_KEY;
-    }
-    // Player reads all files in arbitrary directory structure and creates 
-    // a Map task for each file
-    String dirs = StringUtils.join(dirPaths, ",");
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Restore " + (fullBackupRestore ? "full" : "incremental")
-          + " backup from directory " + dirs + " from hbase tables "
-          + BackupServerUtil.join(tableNames) + " to tables "
-          + BackupServerUtil.join(newTableNames));
-    }
-
-    for (int i = 0; i < tableNames.length; i++) {
-
-      LOG.info("Restore " + tableNames[i] + " into " + newTableNames[i]);
-
-      Path bulkOutputPath = 
getBulkOutputDir(getFileNameCompatibleString(newTableNames[i]));
-      Configuration conf = getConf();
-      conf.set(bulkOutputConfKey, bulkOutputPath.toString());
-      String[] playerArgs = { dirs, tableNames[i].getNameAsString() };
-
-      int result = 0;
-      int loaderResult = 0;
-      try {
-
-        player.setConf(getConf());
-        result = player.run(playerArgs);
-        if (succeeded(result)) {
-          // do bulk load
-          LoadIncrementalHFiles loader = createLoader();
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Restoring HFiles from directory " + bulkOutputPath);
-          }
-          String[] args = { bulkOutputPath.toString(), 
newTableNames[i].getNameAsString() };
-          loaderResult = loader.run(args);
-
-          if (failed(loaderResult)) {
-            throw new IOException("Can not restore from backup directory " + 
dirs
-                + " (check Hadoop and HBase logs). Bulk loader return code =" 
+ loaderResult);
-          }
-        } else {
-          throw new IOException("Can not restore from backup directory " + dirs
-              + " (check Hadoop/MR and HBase logs). Player return code =" + 
result);
-        }
-        LOG.debug("Restore Job finished:" + result);
-      } catch (Exception e) {
-        throw new IOException("Can not restore from backup directory " + dirs
-            + " (check Hadoop and HBase logs) ", e);
-      }
-
-    }
-  }
-
-  private String getFileNameCompatibleString(TableName table) {
-    return table.getNamespaceAsString() + "-" + table.getQualifierAsString();
-  }
-
-  private boolean failed(int result) {
-    return result != 0;
-  }
-
-  private boolean succeeded(int result) {
-    return result == 0;
-  }
-
-  private LoadIncrementalHFiles createLoader() throws IOException {
-    // set configuration for restore:
-    // LoadIncrementalHFile needs more time
-    // <name>hbase.rpc.timeout</name> <value>600000</value>
-    // calculates
-    Integer milliSecInHour = 3600000;
-    Configuration conf = new Configuration(getConf());
-    conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, milliSecInHour);
-
-    // By default, it is 32 and loader will fail if # of files in any region 
exceed this
-    // limit. Bad for snapshot restore.
-    conf.setInt(LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY, 
Integer.MAX_VALUE);
-    conf.set(LoadIncrementalHFiles.IGNORE_UNMATCHED_CF_CONF_KEY, "yes");
-    LoadIncrementalHFiles loader = null;
-    try {
-      loader = new LoadIncrementalHFiles(conf);
-    } catch (Exception e) {
-      throw new IOException(e);
-    }
-    return loader;
-  }
-
-  private Path getBulkOutputDir(String tableName) throws IOException {
-    Configuration conf = getConf();
-    FileSystem fs = FileSystem.get(conf);
-    String tmp =
-        conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, 
HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY);
-    Path path =
-        new Path(tmp + Path.SEPARATOR + "bulk_output-" + tableName + "-"
-            + EnvironmentEdgeManager.currentTime());
-    fs.deleteOnExit(path);
-    return path;
-  }
-
-  @Override
-  public Configuration getConf() {
-    return conf;
-  }
-
-  @Override
-  public void setConf(Configuration conf) {
-    this.conf = conf;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/914d162c/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreTask.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreTask.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreTask.java
new file mode 100644
index 0000000..8d9f5b4
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreTask.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.RestoreTask;
+import org.apache.hadoop.hbase.backup.util.BackupServerUtil;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.mapreduce.WALPlayer;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.util.Tool;
+
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class MapReduceRestoreTask implements RestoreTask {
+  public static final Log LOG = LogFactory.getLog(MapReduceRestoreTask.class);
+
+  private Tool player;
+  private Configuration conf;
+
+  public MapReduceRestoreTask() {
+  }
+
+  @Override
+  public void run(Path[] dirPaths, TableName[] tableNames, TableName[] 
newTableNames,
+      boolean fullBackupRestore) throws IOException {
+
+    String bulkOutputConfKey;
+
+    if (fullBackupRestore) {
+      player = new HFileSplitter();
+      bulkOutputConfKey = HFileSplitter.BULK_OUTPUT_CONF_KEY;
+    } else {
+      player = new WALPlayer();
+      bulkOutputConfKey = WALPlayer.BULK_OUTPUT_CONF_KEY;
+    }
+    // Player reads all files in arbitrary directory structure and creates 
+    // a Map task for each file
+    String dirs = StringUtils.join(dirPaths, ",");
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Restore " + (fullBackupRestore ? "full" : "incremental")
+          + " backup from directory " + dirs + " from hbase tables "
+          + BackupServerUtil.join(tableNames) + " to tables "
+          + BackupServerUtil.join(newTableNames));
+    }
+
+    for (int i = 0; i < tableNames.length; i++) {
+
+      LOG.info("Restore " + tableNames[i] + " into " + newTableNames[i]);
+
+      Path bulkOutputPath = 
getBulkOutputDir(getFileNameCompatibleString(newTableNames[i]));
+      Configuration conf = getConf();
+      conf.set(bulkOutputConfKey, bulkOutputPath.toString());
+      String[] playerArgs = { dirs, tableNames[i].getNameAsString() };
+
+      int result = 0;
+      int loaderResult = 0;
+      try {
+
+        player.setConf(getConf());
+        result = player.run(playerArgs);
+        if (succeeded(result)) {
+          // do bulk load
+          LoadIncrementalHFiles loader = createLoader();
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Restoring HFiles from directory " + bulkOutputPath);
+          }
+          String[] args = { bulkOutputPath.toString(), 
newTableNames[i].getNameAsString() };
+          loaderResult = loader.run(args);
+
+          if (failed(loaderResult)) {
+            throw new IOException("Can not restore from backup directory " + 
dirs
+                + " (check Hadoop and HBase logs). Bulk loader return code =" 
+ loaderResult);
+          }
+        } else {
+          throw new IOException("Can not restore from backup directory " + dirs
+              + " (check Hadoop/MR and HBase logs). Player return code =" + 
result);
+        }
+        LOG.debug("Restore Job finished:" + result);
+      } catch (Exception e) {
+        throw new IOException("Can not restore from backup directory " + dirs
+            + " (check Hadoop and HBase logs) ", e);
+      }
+
+    }
+  }
+
+  private String getFileNameCompatibleString(TableName table) {
+    return table.getNamespaceAsString() + "-" + table.getQualifierAsString();
+  }
+
+  private boolean failed(int result) {
+    return result != 0;
+  }
+
+  private boolean succeeded(int result) {
+    return result == 0;
+  }
+
+  private LoadIncrementalHFiles createLoader() throws IOException {
+    // set configuration for restore:
+    // LoadIncrementalHFile needs more time
+    // <name>hbase.rpc.timeout</name> <value>600000</value>
+    // calculates
+    Integer milliSecInHour = 3600000;
+    Configuration conf = new Configuration(getConf());
+    conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, milliSecInHour);
+
+    // By default, it is 32 and loader will fail if # of files in any region 
exceed this
+    // limit. Bad for snapshot restore.
+    conf.setInt(LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY, 
Integer.MAX_VALUE);
+    conf.set(LoadIncrementalHFiles.IGNORE_UNMATCHED_CF_CONF_KEY, "yes");
+    LoadIncrementalHFiles loader = null;
+    try {
+      loader = new LoadIncrementalHFiles(conf);
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+    return loader;
+  }
+
+  private Path getBulkOutputDir(String tableName) throws IOException {
+    Configuration conf = getConf();
+    FileSystem fs = FileSystem.get(conf);
+    String tmp =
+        conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, 
HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY);
+    Path path =
+        new Path(tmp + Path.SEPARATOR + "bulk_output-" + tableName + "-"
+            + EnvironmentEdgeManager.currentTime());
+    fs.deleteOnExit(path);
+    return path;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/914d162c/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreServerUtil.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreServerUtil.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreServerUtil.java
index cc2ecdf..c317844 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreServerUtil.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreServerUtil.java
@@ -42,7 +42,7 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.backup.BackupRestoreServerFactory;
 import org.apache.hadoop.hbase.backup.HBackupFileSystem;
 import org.apache.hadoop.hbase.backup.RestoreRequest;
-import org.apache.hadoop.hbase.backup.RestoreService;
+import org.apache.hadoop.hbase.backup.RestoreTask;
 import org.apache.hadoop.hbase.backup.impl.BackupManifest;
 import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
@@ -241,7 +241,7 @@ public class RestoreServerUtil {
         LOG.info("Changed " + newTableDescriptor.getTableName() + " to: " + 
newTableDescriptor);
       }
     }
-    RestoreService restoreService =
+    RestoreTask restoreService =
         BackupRestoreServerFactory.getRestoreService(conf);
 
     restoreService.run(logDirs, tableNames, newTableNames, false);
@@ -478,7 +478,7 @@ public class RestoreServerUtil {
         // Run restore service
         Path[] dirs = new Path[regionPathList.size()];
         regionPathList.toArray(dirs);
-        RestoreService restoreService =
+        RestoreTask restoreService =
             BackupRestoreServerFactory.getRestoreService(conf);
 
         restoreService.run(dirs, new TableName[] { tableName }, new 
TableName[] { newTableName },

Reply via email to