http://git-wip-us.apache.org/repos/asf/hbase/blob/2dda3712/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java ---------------------------------------------------------------------- diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java new file mode 100644 index 0000000..49e8c75 --- /dev/null +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java @@ -0,0 +1,181 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.backup.mapreduce; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValue.Type; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.RegionLocator; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.HFileInputFormat; +import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2; +import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer; +import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +/** + * A tool to split HFiles into new region boundaries as a MapReduce job. The tool generates HFiles + * for later bulk importing. + */ +@InterfaceAudience.Private +public class MapReduceHFileSplitterJob extends Configured implements Tool { + private static final Log LOG = LogFactory.getLog(MapReduceHFileSplitterJob.class); + final static String NAME = "HFileSplitterJob"; + public final static String BULK_OUTPUT_CONF_KEY = "hfile.bulk.output"; + public final static String TABLES_KEY = "hfile.input.tables"; + public final static String TABLE_MAP_KEY = "hfile.input.tablesmap"; + private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name"; + + public MapReduceHFileSplitterJob() { + } + + protected MapReduceHFileSplitterJob(final Configuration c) { + super(c); + } + + /** + * A mapper that just writes out cells. This one can be used together with + * {@link KeyValueSortReducer} + */ + static class HFileCellMapper extends + Mapper<NullWritable, KeyValue, ImmutableBytesWritable, KeyValue> { + + @Override + public void map(NullWritable key, KeyValue value, Context context) throws IOException, + InterruptedException { + // Convert value to KeyValue if subclass + if (!value.getClass().equals(KeyValue.class)) { + value = + new KeyValue(value.getRowArray(), value.getRowOffset(), value.getRowLength(), + value.getFamilyArray(), value.getFamilyOffset(), value.getFamilyLength(), + value.getQualifierArray(), value.getQualifierOffset(), value.getQualifierLength(), + value.getTimestamp(), Type.codeToType(value.getTypeByte()), value.getValueArray(), + value.getValueOffset(), value.getValueLength()); + } + context.write(new ImmutableBytesWritable(CellUtil.cloneRow(value)), value); + } + + @Override + public void setup(Context context) throws IOException { + // do nothing + } + } + + /** + * Sets up the actual job. + * @param args The command line parameters. + * @return The newly created job. + * @throws IOException When setting up the job fails. + */ + public Job createSubmittableJob(String[] args) throws IOException { + Configuration conf = getConf(); + String inputDirs = args[0]; + String tabName = args[1]; + conf.setStrings(TABLES_KEY, tabName); + conf.set(FileInputFormat.INPUT_DIR, inputDirs); + Job job = + Job.getInstance(conf, + conf.get(JOB_NAME_CONF_KEY, NAME + "_" + EnvironmentEdgeManager.currentTime())); + job.setJarByClass(MapReduceHFileSplitterJob.class); + job.setInputFormatClass(HFileInputFormat.class); + job.setMapOutputKeyClass(ImmutableBytesWritable.class); + String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY); + if (hfileOutPath != null) { + LOG.debug("add incremental job :" + hfileOutPath + " from " + inputDirs); + TableName tableName = TableName.valueOf(tabName); + job.setMapperClass(HFileCellMapper.class); + job.setReducerClass(KeyValueSortReducer.class); + Path outputDir = new Path(hfileOutPath); + FileOutputFormat.setOutputPath(job, outputDir); + job.setMapOutputValueClass(KeyValue.class); + try (Connection conn = ConnectionFactory.createConnection(conf); + Table table = conn.getTable(tableName); + RegionLocator regionLocator = conn.getRegionLocator(tableName)) { + HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator); + } + LOG.debug("success configuring load incremental job"); + + TableMapReduceUtil.addDependencyJars(job.getConfiguration(), + org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions.class); + } else { + throw new IOException("No bulk output directory specified"); + } + return job; + } + + /** + * Print usage + * @param errorMsg Error message. Can be null. + */ + private void usage(final String errorMsg) { + if (errorMsg != null && errorMsg.length() > 0) { + System.err.println("ERROR: " + errorMsg); + } + System.err.println("Usage: " + NAME + " [options] <HFile inputdir(s)> <table>"); + System.err.println("Read all HFile's for <table> and split them to <table> region boundaries."); + System.err.println("<table> table to load.\n"); + System.err.println("To generate HFiles for a bulk data load, pass the option:"); + System.err.println(" -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output"); + System.err.println("Other options:"); + System.err.println(" -D " + JOB_NAME_CONF_KEY + + "=jobName - use the specified mapreduce job name for the HFile splitter"); + System.err.println("For performance also consider the following options:\n" + + " -Dmapreduce.map.speculative=false\n" + " -Dmapreduce.reduce.speculative=false"); + } + + /** + * Main entry point. + * @param args The command line parameters. + * @throws Exception When running the job fails. + */ + public static void main(String[] args) throws Exception { + int ret = ToolRunner.run(new MapReduceHFileSplitterJob(HBaseConfiguration.create()), args); + System.exit(ret); + } + + @Override + public int run(String[] args) throws Exception { + if (args.length < 2) { + usage("Wrong number of arguments: " + args.length); + System.exit(-1); + } + Job job = createSubmittableJob(args); + int result = job.waitForCompletion(true) ? 0 : 1; + return result; + } +}
http://git-wip-us.apache.org/repos/asf/hbase/blob/2dda3712/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java ---------------------------------------------------------------------- diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java new file mode 100644 index 0000000..1209e7c --- /dev/null +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.backup.mapreduce; + +import static org.apache.hadoop.hbase.backup.util.BackupUtils.failed; +import static org.apache.hadoop.hbase.backup.util.BackupUtils.succeeded; + +import java.io.IOException; + +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.backup.BackupRestoreConstants; +import org.apache.hadoop.hbase.backup.RestoreJob; +import org.apache.hadoop.hbase.backup.util.BackupUtils; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; +import org.apache.hadoop.util.Tool; + + +/** + * MapReduce implementation of {@link RestoreJob} + * + * For backup restore, it runs {@link MapReduceHFileSplitterJob} job and creates + * HFiles which are aligned with a region boundaries of a table being + * restored. + * + * The resulting HFiles then are loaded using HBase bulk load tool + * {@link LoadIncrementalHFiles} + */ +@InterfaceAudience.Private +public class MapReduceRestoreJob implements RestoreJob { + public static final Log LOG = LogFactory.getLog(MapReduceRestoreJob.class); + + private Tool player; + private Configuration conf; + + public MapReduceRestoreJob() { + } + + @Override + public void run(Path[] dirPaths, TableName[] tableNames, TableName[] newTableNames, + boolean fullBackupRestore) throws IOException { + + String bulkOutputConfKey; + + player = new MapReduceHFileSplitterJob(); + bulkOutputConfKey = MapReduceHFileSplitterJob.BULK_OUTPUT_CONF_KEY; + // Player reads all files in arbitrary directory structure and creates + // a Map task for each file + String dirs = StringUtils.join(dirPaths, ","); + + if (LOG.isDebugEnabled()) { + LOG.debug("Restore " + (fullBackupRestore ? "full" : "incremental") + + " backup from directory " + dirs + " from hbase tables " + + StringUtils.join(tableNames, BackupRestoreConstants.TABLENAME_DELIMITER_IN_COMMAND) + + " to tables " + + StringUtils.join(newTableNames, BackupRestoreConstants.TABLENAME_DELIMITER_IN_COMMAND)); + } + + for (int i = 0; i < tableNames.length; i++) { + + LOG.info("Restore " + tableNames[i] + " into " + newTableNames[i]); + + Path bulkOutputPath = + BackupUtils.getBulkOutputDir(BackupUtils.getFileNameCompatibleString(newTableNames[i]), + getConf()); + Configuration conf = getConf(); + conf.set(bulkOutputConfKey, bulkOutputPath.toString()); + String[] playerArgs = + { + dirs, + fullBackupRestore ? newTableNames[i].getNameAsString() : tableNames[i] + .getNameAsString() }; + + int result = 0; + int loaderResult = 0; + try { + + player.setConf(getConf()); + result = player.run(playerArgs); + if (succeeded(result)) { + // do bulk load + LoadIncrementalHFiles loader = BackupUtils.createLoader(getConf()); + if (LOG.isDebugEnabled()) { + LOG.debug("Restoring HFiles from directory " + bulkOutputPath); + } + String[] args = { bulkOutputPath.toString(), newTableNames[i].getNameAsString() }; + loaderResult = loader.run(args); + + if (failed(loaderResult)) { + throw new IOException("Can not restore from backup directory " + dirs + + " (check Hadoop and HBase logs). Bulk loader return code =" + loaderResult); + } + } else { + throw new IOException("Can not restore from backup directory " + dirs + + " (check Hadoop/MR and HBase logs). Player return code =" + result); + } + LOG.debug("Restore Job finished:" + result); + } catch (Exception e) { + LOG.error(e); + throw new IOException("Can not restore from backup directory " + dirs + + " (check Hadoop and HBase logs) ", e); + } + } + } + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/2dda3712/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/master/BackupLogCleaner.java ---------------------------------------------------------------------- diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/master/BackupLogCleaner.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/master/BackupLogCleaner.java new file mode 100644 index 0000000..b5b887c --- /dev/null +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/master/BackupLogCleaner.java @@ -0,0 +1,142 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.backup.master; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.backup.BackupRestoreConstants; +import org.apache.hadoop.hbase.backup.impl.BackupManager; +import org.apache.hadoop.hbase.backup.impl.BackupSystemTable; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate; + +/** + * Implementation of a log cleaner that checks if a log is still scheduled for incremental backup + * before deleting it when its TTL is over. + */ +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) +public class BackupLogCleaner extends BaseLogCleanerDelegate { + private static final Log LOG = LogFactory.getLog(BackupLogCleaner.class); + + private boolean stopped = false; + private Connection conn; + + public BackupLogCleaner() { + } + + @Override + public void init(Map<String, Object> params) { + if (params != null && params.containsKey(HMaster.MASTER)) { + MasterServices master = (MasterServices) params.get(HMaster.MASTER); + conn = master.getConnection(); + if (getConf() == null) { + super.setConf(conn.getConfiguration()); + } + } + if (conn == null) { + try { + conn = ConnectionFactory.createConnection(getConf()); + } catch (IOException ioe) { + throw new RuntimeException("Failed to create connection", ioe); + } + } + } + + @Override + public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) { + // all members of this class are null if backup is disabled, + // so we cannot filter the files + if (this.getConf() == null || !BackupManager.isBackupEnabled(getConf())) { + LOG.warn("Backup is not enabled. Check your " + BackupRestoreConstants.BACKUP_ENABLE_KEY + + " setting"); + return files; + } + + List<FileStatus> list = new ArrayList<FileStatus>(); + try (final BackupSystemTable table = new BackupSystemTable(conn)) { + // If we do not have recorded backup sessions + try { + if (!table.hasBackupSessions()) { + LOG.trace("BackupLogCleaner has no backup sessions"); + return files; + } + } catch (TableNotFoundException tnfe) { + LOG.warn("backup system table is not available" + tnfe.getMessage()); + return files; + } + + for (FileStatus file : files) { + String wal = file.getPath().toString(); + boolean logInSystemTable = table.isWALFileDeletable(wal); + if (LOG.isDebugEnabled()) { + if (logInSystemTable) { + LOG.debug("Found log file in backup system table, deleting: " + wal); + list.add(file); + } else { + LOG.debug("Didn't find this log in backup system table, keeping: " + wal); + } + } + } + return list; + } catch (IOException e) { + LOG.error("Failed to get backup system table table, therefore will keep all files", e); + // nothing to delete + return new ArrayList<FileStatus>(); + } + } + + @Override + public void setConf(Configuration config) { + // If backup is disabled, keep all members null + if (!config.getBoolean(BackupRestoreConstants.BACKUP_ENABLE_KEY, + BackupRestoreConstants.BACKUP_ENABLE_DEFAULT)) { + LOG.warn("Backup is disabled - allowing all wals to be deleted"); + return; + } + super.setConf(config); + } + + @Override + public void stop(String why) { + if (this.stopped) { + return; + } + this.stopped = true; + LOG.info("Stopping BackupLogCleaner"); + } + + @Override + public boolean isStopped() { + return this.stopped; + } + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/2dda3712/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/master/LogRollMasterProcedureManager.java ---------------------------------------------------------------------- diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/master/LogRollMasterProcedureManager.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/master/LogRollMasterProcedureManager.java new file mode 100644 index 0000000..47e428c --- /dev/null +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/master/LogRollMasterProcedureManager.java @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.backup.master; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ThreadPoolExecutor; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.CoordinatedStateManagerFactory; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.backup.BackupRestoreConstants; +import org.apache.hadoop.hbase.backup.impl.BackupManager; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager; +import org.apache.hadoop.hbase.errorhandling.ForeignException; +import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; +import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.master.MetricsMaster; +import org.apache.hadoop.hbase.procedure.MasterProcedureManager; +import org.apache.hadoop.hbase.procedure.Procedure; +import org.apache.hadoop.hbase.procedure.ProcedureCoordinator; +import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs; +import org.apache.hadoop.hbase.procedure.RegionServerProcedureManager; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription; +import org.apache.zookeeper.KeeperException; + +/** + * Master procedure manager for coordinated cluster-wide WAL roll operation, which is run during + * backup operation, see {@link MasterProcedureManager} and and {@link RegionServerProcedureManager} + */ +@InterfaceAudience.Private +public class LogRollMasterProcedureManager extends MasterProcedureManager { + + public static final String ROLLLOG_PROCEDURE_SIGNATURE = "rolllog-proc"; + public static final String ROLLLOG_PROCEDURE_NAME = "rolllog"; + private static final Log LOG = LogFactory.getLog(LogRollMasterProcedureManager.class); + + private MasterServices master; + private ProcedureCoordinator coordinator; + private boolean done; + + @Override + public void stop(String why) { + LOG.info("stop: " + why); + } + + @Override + public boolean isStopped() { + return false; + } + + @Override + public void initialize(MasterServices master, MetricsMaster metricsMaster) + throws KeeperException, IOException, UnsupportedOperationException { + this.master = master; + this.done = false; + + // setup the default procedure coordinator + String name = master.getServerName().toString(); + ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, 1); + BaseCoordinatedStateManager coordManager = + (BaseCoordinatedStateManager) CoordinatedStateManagerFactory + .getCoordinatedStateManager(master.getConfiguration()); + coordManager.initialize(master); + + ProcedureCoordinatorRpcs comms = + coordManager.getProcedureCoordinatorRpcs(getProcedureSignature(), name); + + this.coordinator = new ProcedureCoordinator(comms, tpool); + } + + @Override + public String getProcedureSignature() { + return ROLLLOG_PROCEDURE_SIGNATURE; + } + + @Override + public void execProcedure(ProcedureDescription desc) throws IOException { + if (!isBackupEnabled()) { + LOG.warn("Backup is not enabled. Check your " + BackupRestoreConstants.BACKUP_ENABLE_KEY + + " setting"); + return; + } + this.done = false; + // start the process on the RS + ForeignExceptionDispatcher monitor = new ForeignExceptionDispatcher(desc.getInstance()); + List<ServerName> serverNames = master.getServerManager().getOnlineServersList(); + List<String> servers = new ArrayList<String>(); + for (ServerName sn : serverNames) { + servers.add(sn.toString()); + } + + List<NameStringPair> conf = desc.getConfigurationList(); + byte[] data = new byte[0]; + if (conf.size() > 0) { + // Get backup root path + data = conf.get(0).getValue().getBytes(); + } + Procedure proc = coordinator.startProcedure(monitor, desc.getInstance(), data, servers); + if (proc == null) { + String msg = "Failed to submit distributed procedure for '" + desc.getInstance() + "'"; + LOG.error(msg); + throw new IOException(msg); + } + + try { + // wait for the procedure to complete. A timer thread is kicked off that should cancel this + // if it takes too long. + proc.waitForCompleted(); + LOG.info("Done waiting - exec procedure for " + desc.getInstance()); + LOG.info("Distributed roll log procedure is successful!"); + this.done = true; + } catch (InterruptedException e) { + ForeignException ee = + new ForeignException("Interrupted while waiting for roll log procdure to finish", e); + monitor.receive(ee); + Thread.currentThread().interrupt(); + } catch (ForeignException e) { + ForeignException ee = + new ForeignException("Exception while waiting for roll log procdure to finish", e); + monitor.receive(ee); + } + monitor.rethrowException(); + } + + private boolean isBackupEnabled() { + return BackupManager.isBackupEnabled(master.getConfiguration()); + } + + @Override + public boolean isProcedureDone(ProcedureDescription desc) throws IOException { + return done; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/2dda3712/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedure.java ---------------------------------------------------------------------- diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedure.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedure.java new file mode 100644 index 0000000..8fc644c --- /dev/null +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedure.java @@ -0,0 +1,168 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.backup.regionserver; + +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.Callable; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.backup.impl.BackupSystemTable; +import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.errorhandling.ForeignException; +import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; +import org.apache.hadoop.hbase.procedure.ProcedureMember; +import org.apache.hadoop.hbase.procedure.Subprocedure; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; +import org.apache.hadoop.hbase.regionserver.wal.FSHLog; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.wal.WAL; + +/** + * This backup sub-procedure implementation forces a WAL rolling on a RS. + */ +@InterfaceAudience.Private +public class LogRollBackupSubprocedure extends Subprocedure { + private static final Log LOG = LogFactory.getLog(LogRollBackupSubprocedure.class); + + private final RegionServerServices rss; + private final LogRollBackupSubprocedurePool taskManager; + private FSHLog hlog; + private String backupRoot; + + public LogRollBackupSubprocedure(RegionServerServices rss, ProcedureMember member, + ForeignExceptionDispatcher errorListener, long wakeFrequency, long timeout, + LogRollBackupSubprocedurePool taskManager, byte[] data) { + + super(member, LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_NAME, errorListener, + wakeFrequency, timeout); + LOG.info("Constructing a LogRollBackupSubprocedure."); + this.rss = rss; + this.taskManager = taskManager; + if (data != null) { + backupRoot = new String(data); + } + } + + /** + * Callable task. TODO. We don't need a thread pool to execute roll log. This can be simplified + * with no use of subprocedurepool. + */ + class RSRollLogTask implements Callable<Void> { + RSRollLogTask() { + } + + @Override + public Void call() throws Exception { + if (LOG.isDebugEnabled()) { + LOG.debug("++ DRPC started: " + rss.getServerName()); + } + hlog = (FSHLog) rss.getWAL(null); + long filenum = hlog.getFilenum(); + List<WAL> wals = rss.getWALs(); + long highest = -1; + for (WAL wal : wals) { + if (wal == null) continue; + if (((AbstractFSWAL) wal).getFilenum() > highest) { + highest = ((AbstractFSWAL) wal).getFilenum(); + } + } + + LOG.info("Trying to roll log in backup subprocedure, current log number: " + filenum + + " highest: " + highest + " on " + rss.getServerName()); + ((HRegionServer) rss).getWalRoller().requestRollAll(); + long start = EnvironmentEdgeManager.currentTime(); + while (!((HRegionServer) rss).getWalRoller().walRollFinished()) { + Thread.sleep(20); + } + LOG.debug("log roll took " + (EnvironmentEdgeManager.currentTime() - start)); + LOG.info("After roll log in backup subprocedure, current log number: " + hlog.getFilenum() + + " on " + rss.getServerName()); + + Connection connection = rss.getConnection(); + try (final BackupSystemTable table = new BackupSystemTable(connection)) { + // sanity check, good for testing + HashMap<String, Long> serverTimestampMap = + table.readRegionServerLastLogRollResult(backupRoot); + String host = rss.getServerName().getHostname(); + int port = rss.getServerName().getPort(); + String server = host + ":" + port; + Long sts = serverTimestampMap.get(host); + if (sts != null && sts > highest) { + LOG.warn("Won't update server's last roll log result: current=" + sts + " new=" + highest); + return null; + } + // write the log number to backup system table. + table.writeRegionServerLastLogRollResult(server, highest, backupRoot); + return null; + } catch (Exception e) { + LOG.error(e); + throw e; + } + } + } + + private void rolllog() throws ForeignException { + monitor.rethrowException(); + + taskManager.submitTask(new RSRollLogTask()); + monitor.rethrowException(); + + // wait for everything to complete. + taskManager.waitForOutstandingTasks(); + monitor.rethrowException(); + + } + + @Override + public void acquireBarrier() throws ForeignException { + // do nothing, executing in inside barrier step. + } + + /** + * do a log roll. + * @return some bytes + */ + @Override + public byte[] insideBarrier() throws ForeignException { + rolllog(); + return null; + } + + /** + * Cancel threads if they haven't finished. + */ + @Override + public void cleanup(Exception e) { + taskManager.abort("Aborting log roll subprocedure tasks for backup due to error", e); + } + + /** + * Hooray! + */ + public void releaseBarrier() { + // NO OP + } + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/2dda3712/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedurePool.java ---------------------------------------------------------------------- diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedurePool.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedurePool.java new file mode 100644 index 0000000..65a1fa3 --- /dev/null +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedurePool.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.backup.regionserver; + +import java.io.Closeable; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.DaemonThreadFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.errorhandling.ForeignException; + +/** + * Handle running each of the individual tasks for completing a backup procedure on a region + * server. + */ +@InterfaceAudience.Private +public class LogRollBackupSubprocedurePool implements Closeable, Abortable { + private static final Log LOG = LogFactory.getLog(LogRollBackupSubprocedurePool.class); + + /** Maximum number of concurrent snapshot region tasks that can run concurrently */ + private static final String CONCURENT_BACKUP_TASKS_KEY = "hbase.backup.region.concurrentTasks"; + private static final int DEFAULT_CONCURRENT_BACKUP_TASKS = 3; + + private final ExecutorCompletionService<Void> taskPool; + private final ThreadPoolExecutor executor; + private volatile boolean aborted; + private final List<Future<Void>> futures = new ArrayList<Future<Void>>(); + private final String name; + + public LogRollBackupSubprocedurePool(String name, Configuration conf) { + // configure the executor service + long keepAlive = + conf.getLong(LogRollRegionServerProcedureManager.BACKUP_TIMEOUT_MILLIS_KEY, + LogRollRegionServerProcedureManager.BACKUP_TIMEOUT_MILLIS_DEFAULT); + int threads = conf.getInt(CONCURENT_BACKUP_TASKS_KEY, DEFAULT_CONCURRENT_BACKUP_TASKS); + this.name = name; + executor = + new ThreadPoolExecutor(1, threads, keepAlive, TimeUnit.SECONDS, + new LinkedBlockingQueue<Runnable>(), new DaemonThreadFactory("rs(" + name + + ")-backup-pool")); + taskPool = new ExecutorCompletionService<Void>(executor); + } + + /** + * Submit a task to the pool. + */ + public void submitTask(final Callable<Void> task) { + Future<Void> f = this.taskPool.submit(task); + futures.add(f); + } + + /** + * Wait for all of the currently outstanding tasks submitted via {@link #submitTask(Callable)} + * @return <tt>true</tt> on success, <tt>false</tt> otherwise + * @throws ForeignException exception + */ + public boolean waitForOutstandingTasks() throws ForeignException { + LOG.debug("Waiting for backup procedure to finish."); + + try { + for (Future<Void> f : futures) { + f.get(); + } + return true; + } catch (InterruptedException e) { + if (aborted) { + throw new ForeignException("Interrupted and found to be aborted while waiting for tasks!", + e); + } + Thread.currentThread().interrupt(); + } catch (ExecutionException e) { + if (e.getCause() instanceof ForeignException) { + throw (ForeignException) e.getCause(); + } + throw new ForeignException(name, e.getCause()); + } finally { + // close off remaining tasks + for (Future<Void> f : futures) { + if (!f.isDone()) { + f.cancel(true); + } + } + } + return false; + } + + /** + * Attempt to cleanly shutdown any running tasks - allows currently running tasks to cleanly + * finish + */ + @Override + public void close() { + executor.shutdown(); + } + + @Override + public void abort(String why, Throwable e) { + if (this.aborted) { + return; + } + + this.aborted = true; + LOG.warn("Aborting because: " + why, e); + this.executor.shutdownNow(); + } + + @Override + public boolean isAborted() { + return this.aborted; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/2dda3712/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollRegionServerProcedureManager.java ---------------------------------------------------------------------- diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollRegionServerProcedureManager.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollRegionServerProcedureManager.java new file mode 100644 index 0000000..9d5a858 --- /dev/null +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollRegionServerProcedureManager.java @@ -0,0 +1,185 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.backup.regionserver; + +import java.io.IOException; +import java.util.concurrent.ThreadPoolExecutor; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CoordinatedStateManagerFactory; +import org.apache.hadoop.hbase.backup.BackupRestoreConstants; +import org.apache.hadoop.hbase.backup.impl.BackupManager; +import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager; +import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; +import org.apache.hadoop.hbase.procedure.ProcedureMember; +import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs; +import org.apache.hadoop.hbase.procedure.RegionServerProcedureManager; +import org.apache.hadoop.hbase.procedure.Subprocedure; +import org.apache.hadoop.hbase.procedure.SubprocedureFactory; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.zookeeper.KeeperException; + +/** + * This manager class handles the work dealing with distributed WAL roll request. + * <p> + * This provides the mechanism necessary to kick off a backup specific {@link Subprocedure} that is + * responsible by this region server. If any failures occur with the sub-procedure, the manager's + * procedure member notifies the procedure coordinator to abort all others. + * <p> + * On startup, requires {@link #start()} to be called. + * <p> + * On shutdown, requires org.apache.hadoop.hbase.procedure.ProcedureMember.close() to be called + */ +@InterfaceAudience.Private +public class LogRollRegionServerProcedureManager extends RegionServerProcedureManager { + + private static final Log LOG = LogFactory.getLog(LogRollRegionServerProcedureManager.class); + + /** Conf key for number of request threads to start backup on region servers */ + public static final String BACKUP_REQUEST_THREADS_KEY = "hbase.backup.region.pool.threads"; + /** # of threads for backup work on the rs. */ + public static final int BACKUP_REQUEST_THREADS_DEFAULT = 10; + + public static final String BACKUP_TIMEOUT_MILLIS_KEY = "hbase.backup.timeout"; + public static final long BACKUP_TIMEOUT_MILLIS_DEFAULT = 60000; + + /** Conf key for millis between checks to see if backup work completed or if there are errors */ + public static final String BACKUP_REQUEST_WAKE_MILLIS_KEY = "hbase.backup.region.wakefrequency"; + /** Default amount of time to check for errors while regions finish backup work */ + private static final long BACKUP_REQUEST_WAKE_MILLIS_DEFAULT = 500; + + private RegionServerServices rss; + private ProcedureMemberRpcs memberRpcs; + private ProcedureMember member; + private boolean started = false; + + /** + * Create a default backup procedure manager + */ + public LogRollRegionServerProcedureManager() { + } + + /** + * Start accepting backup procedure requests. + */ + @Override + public void start() { + if (!BackupManager.isBackupEnabled(rss.getConfiguration())) { + LOG.warn("Backup is not enabled. Check your " + BackupRestoreConstants.BACKUP_ENABLE_KEY + + " setting"); + return; + } + this.memberRpcs.start(rss.getServerName().toString(), member); + started = true; + LOG.info("Started region server backup manager."); + } + + /** + * Close <tt>this</tt> and all running backup procedure tasks + * @param force forcefully stop all running tasks + * @throws IOException exception + */ + @Override + public void stop(boolean force) throws IOException { + if (!started) { + return; + } + String mode = force ? "abruptly" : "gracefully"; + LOG.info("Stopping RegionServerBackupManager " + mode + "."); + + try { + this.member.close(); + } finally { + this.memberRpcs.close(); + } + } + + /** + * If in a running state, creates the specified subprocedure for handling a backup procedure. + * @return Subprocedure to submit to the ProcedureMemeber. + */ + public Subprocedure buildSubprocedure(byte[] data) { + + // don't run a backup if the parent is stop(ping) + if (rss.isStopping() || rss.isStopped()) { + throw new IllegalStateException("Can't start backup procedure on RS: " + rss.getServerName() + + ", because stopping/stopped!"); + } + + LOG.info("Attempting to run a roll log procedure for backup."); + ForeignExceptionDispatcher errorDispatcher = new ForeignExceptionDispatcher(); + Configuration conf = rss.getConfiguration(); + long timeoutMillis = conf.getLong(BACKUP_TIMEOUT_MILLIS_KEY, BACKUP_TIMEOUT_MILLIS_DEFAULT); + long wakeMillis = + conf.getLong(BACKUP_REQUEST_WAKE_MILLIS_KEY, BACKUP_REQUEST_WAKE_MILLIS_DEFAULT); + + LogRollBackupSubprocedurePool taskManager = + new LogRollBackupSubprocedurePool(rss.getServerName().toString(), conf); + return new LogRollBackupSubprocedure(rss, member, errorDispatcher, wakeMillis, timeoutMillis, + taskManager, data); + + } + + /** + * Build the actual backup procedure runner that will do all the 'hard' work + */ + public class BackupSubprocedureBuilder implements SubprocedureFactory { + + @Override + public Subprocedure buildSubprocedure(String name, byte[] data) { + return LogRollRegionServerProcedureManager.this.buildSubprocedure(data); + } + } + + @Override + public void initialize(RegionServerServices rss) throws KeeperException { + this.rss = rss; + if (!BackupManager.isBackupEnabled(rss.getConfiguration())) { + LOG.warn("Backup is not enabled. Check your " + BackupRestoreConstants.BACKUP_ENABLE_KEY + + " setting"); + return; + } + BaseCoordinatedStateManager coordManager = + (BaseCoordinatedStateManager) CoordinatedStateManagerFactory. + getCoordinatedStateManager(rss.getConfiguration()); + coordManager.initialize(rss); + this.memberRpcs = + coordManager + .getProcedureMemberRpcs(LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_SIGNATURE); + + // read in the backup handler configuration properties + Configuration conf = rss.getConfiguration(); + long keepAlive = conf.getLong(BACKUP_TIMEOUT_MILLIS_KEY, BACKUP_TIMEOUT_MILLIS_DEFAULT); + int opThreads = conf.getInt(BACKUP_REQUEST_THREADS_KEY, BACKUP_REQUEST_THREADS_DEFAULT); + // create the actual cohort member + ThreadPoolExecutor pool = + ProcedureMember.defaultPool(rss.getServerName().toString(), opThreads, keepAlive); + this.member = new ProcedureMember(memberRpcs, pool, new BackupSubprocedureBuilder()); + } + + @Override + public String getProcedureSignature() { + return "backup-proc"; + } + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/2dda3712/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupSet.java ---------------------------------------------------------------------- diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupSet.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupSet.java new file mode 100644 index 0000000..0da6fc4 --- /dev/null +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupSet.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.backup.util; + +import java.util.List; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * Backup set is a named group of HBase tables, which are managed together by Backup/Restore + * framework. Instead of using list of tables in backup or restore operation, one can use set's name + * instead. + */ +@InterfaceAudience.Private +public class BackupSet { + private final String name; + private final List<TableName> tables; + + public BackupSet(String name, List<TableName> tables) { + this.name = name; + this.tables = tables; + } + + public String getName() { + return name; + } + + public List<TableName> getTables() { + return tables; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(name).append("={"); + sb.append(StringUtils.join(tables, ',')); + sb.append("}"); + return sb.toString(); + } + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/2dda3712/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java ---------------------------------------------------------------------- diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java new file mode 100644 index 0000000..ce77645 --- /dev/null +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java @@ -0,0 +1,747 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.backup.util; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.URLDecoder; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map.Entry; +import java.util.TreeMap; +import java.util.TreeSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MetaTableAccessor; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.backup.BackupInfo; +import org.apache.hadoop.hbase.backup.BackupRestoreConstants; +import org.apache.hadoop.hbase.backup.HBackupFileSystem; +import org.apache.hadoop.hbase.backup.RestoreRequest; +import org.apache.hadoop.hbase.backup.impl.BackupManifest; +import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.FSTableDescriptors; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; + +/** + * A collection for methods used by multiple classes to backup HBase tables. + */ +@InterfaceAudience.Private +public final class BackupUtils { + protected static final Log LOG = LogFactory.getLog(BackupUtils.class); + public static final String LOGNAME_SEPARATOR = "."; + public static final int MILLISEC_IN_HOUR = 3600000; + + private BackupUtils() { + throw new AssertionError("Instantiating utility class..."); + } + + /** + * Loop through the RS log timestamp map for the tables, for each RS, find the min timestamp value + * for the RS among the tables. + * @param rsLogTimestampMap timestamp map + * @return the min timestamp of each RS + */ + public static HashMap<String, Long> getRSLogTimestampMins( + HashMap<TableName, HashMap<String, Long>> rsLogTimestampMap) { + + if (rsLogTimestampMap == null || rsLogTimestampMap.isEmpty()) { + return null; + } + + HashMap<String, Long> rsLogTimestampMins = new HashMap<String, Long>(); + HashMap<String, HashMap<TableName, Long>> rsLogTimestampMapByRS = + new HashMap<String, HashMap<TableName, Long>>(); + + for (Entry<TableName, HashMap<String, Long>> tableEntry : rsLogTimestampMap.entrySet()) { + TableName table = tableEntry.getKey(); + HashMap<String, Long> rsLogTimestamp = tableEntry.getValue(); + for (Entry<String, Long> rsEntry : rsLogTimestamp.entrySet()) { + String rs = rsEntry.getKey(); + Long ts = rsEntry.getValue(); + if (!rsLogTimestampMapByRS.containsKey(rs)) { + rsLogTimestampMapByRS.put(rs, new HashMap<TableName, Long>()); + rsLogTimestampMapByRS.get(rs).put(table, ts); + } else { + rsLogTimestampMapByRS.get(rs).put(table, ts); + } + } + } + + for (Entry<String, HashMap<TableName, Long>> entry : rsLogTimestampMapByRS.entrySet()) { + String rs = entry.getKey(); + rsLogTimestampMins.put(rs, BackupUtils.getMinValue(entry.getValue())); + } + + return rsLogTimestampMins; + } + + /** + * copy out Table RegionInfo into incremental backup image need to consider move this logic into + * HBackupFileSystem + * @param conn connection + * @param backupInfo backup info + * @param conf configuration + * @throws IOException exception + * @throws InterruptedException exception + */ + public static void + copyTableRegionInfo(Connection conn, BackupInfo backupInfo, Configuration conf) + throws IOException, InterruptedException { + Path rootDir = FSUtils.getRootDir(conf); + FileSystem fs = rootDir.getFileSystem(conf); + + // for each table in the table set, copy out the table info and region + // info files in the correct directory structure + for (TableName table : backupInfo.getTables()) { + + if (!MetaTableAccessor.tableExists(conn, table)) { + LOG.warn("Table " + table + " does not exists, skipping it."); + continue; + } + HTableDescriptor orig = FSTableDescriptors.getTableDescriptorFromFs(fs, rootDir, table); + + // write a copy of descriptor to the target directory + Path target = new Path(backupInfo.getTableBackupDir(table)); + FileSystem targetFs = target.getFileSystem(conf); + FSTableDescriptors descriptors = + new FSTableDescriptors(conf, targetFs, FSUtils.getRootDir(conf)); + descriptors.createTableDescriptorForTableDirectory(target, orig, false); + LOG.debug("Attempting to copy table info for:" + table + " target: " + target + + " descriptor: " + orig); + LOG.debug("Finished copying tableinfo."); + List<HRegionInfo> regions = null; + regions = MetaTableAccessor.getTableRegions(conn, table); + // For each region, write the region info to disk + LOG.debug("Starting to write region info for table " + table); + for (HRegionInfo regionInfo : regions) { + Path regionDir = + HRegion.getRegionDir(new Path(backupInfo.getTableBackupDir(table)), regionInfo); + regionDir = new Path(backupInfo.getTableBackupDir(table), regionDir.getName()); + writeRegioninfoOnFilesystem(conf, targetFs, regionDir, regionInfo); + } + LOG.debug("Finished writing region info for table " + table); + } + } + + /** + * Write the .regioninfo file on-disk. + */ + public static void writeRegioninfoOnFilesystem(final Configuration conf, final FileSystem fs, + final Path regionInfoDir, HRegionInfo regionInfo) throws IOException { + final byte[] content = regionInfo.toDelimitedByteArray(); + Path regionInfoFile = new Path(regionInfoDir, "." + HConstants.REGIONINFO_QUALIFIER_STR); + // First check to get the permissions + FsPermission perms = FSUtils.getFilePermissions(fs, conf, HConstants.DATA_FILE_UMASK_KEY); + // Write the RegionInfo file content + FSDataOutputStream out = FSUtils.create(conf, fs, regionInfoFile, perms, null); + try { + out.write(content); + } finally { + out.close(); + } + } + + /** + * Parses hostname:port from WAL file path + * @param p path to WAL file + * @return hostname:port + */ + public static String parseHostNameFromLogFile(Path p) { + try { + if (AbstractFSWALProvider.isArchivedLogFile(p)) { + return BackupUtils.parseHostFromOldLog(p); + } else { + ServerName sname = AbstractFSWALProvider.getServerNameFromWALDirectoryName(p); + if (sname != null) { + return sname.getAddress().toString(); + } else { + LOG.error("Skip log file (can't parse): " + p); + return null; + } + } + } catch (Exception e) { + LOG.error("Skip log file (can't parse): " + p, e); + return null; + } + } + + /** + * Returns WAL file name + * @param walFileName WAL file name + * @return WAL file name + * @throws IOException exception + * @throws IllegalArgumentException exception + */ + public static String getUniqueWALFileNamePart(String walFileName) throws IOException { + return getUniqueWALFileNamePart(new Path(walFileName)); + } + + /** + * Returns WAL file name + * @param p WAL file path + * @return WAL file name + * @throws IOException exception + */ + public static String getUniqueWALFileNamePart(Path p) throws IOException { + return p.getName(); + } + + /** + * Get the total length of files under the given directory recursively. + * @param fs The hadoop file system + * @param dir The target directory + * @return the total length of files + * @throws IOException exception + */ + public static long getFilesLength(FileSystem fs, Path dir) throws IOException { + long totalLength = 0; + FileStatus[] files = FSUtils.listStatus(fs, dir); + if (files != null) { + for (FileStatus fileStatus : files) { + if (fileStatus.isDirectory()) { + totalLength += getFilesLength(fs, fileStatus.getPath()); + } else { + totalLength += fileStatus.getLen(); + } + } + } + return totalLength; + } + + /** + * Get list of all old WAL files (WALs and archive) + * @param c configuration + * @param hostTimestampMap {host,timestamp} map + * @return list of WAL files + * @throws IOException exception + */ + public static List<String> getWALFilesOlderThan(final Configuration c, + final HashMap<String, Long> hostTimestampMap) throws IOException { + Path rootDir = FSUtils.getRootDir(c); + Path logDir = new Path(rootDir, HConstants.HREGION_LOGDIR_NAME); + Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME); + List<String> logFiles = new ArrayList<String>(); + + PathFilter filter = new PathFilter() { + + @Override + public boolean accept(Path p) { + try { + if (AbstractFSWALProvider.isMetaFile(p)) { + return false; + } + String host = parseHostNameFromLogFile(p); + if (host == null) { + return false; + } + Long oldTimestamp = hostTimestampMap.get(host); + Long currentLogTS = BackupUtils.getCreationTime(p); + return currentLogTS <= oldTimestamp; + } catch (Exception e) { + LOG.warn("Can not parse" + p, e); + return false; + } + } + }; + FileSystem fs = FileSystem.get(c); + logFiles = BackupUtils.getFiles(fs, logDir, logFiles, filter); + logFiles = BackupUtils.getFiles(fs, oldLogDir, logFiles, filter); + return logFiles; + } + + public static TableName[] parseTableNames(String tables) { + if (tables == null) { + return null; + } + String[] tableArray = tables.split(BackupRestoreConstants.TABLENAME_DELIMITER_IN_COMMAND); + + TableName[] ret = new TableName[tableArray.length]; + for (int i = 0; i < tableArray.length; i++) { + ret[i] = TableName.valueOf(tableArray[i]); + } + return ret; + } + + /** + * Check whether the backup path exist + * @param backupStr backup + * @param conf configuration + * @return Yes if path exists + * @throws IOException exception + */ + public static boolean checkPathExist(String backupStr, Configuration conf) throws IOException { + boolean isExist = false; + Path backupPath = new Path(backupStr); + FileSystem fileSys = backupPath.getFileSystem(conf); + String targetFsScheme = fileSys.getUri().getScheme(); + if (LOG.isTraceEnabled()) { + LOG.trace("Schema of given url: " + backupStr + " is: " + targetFsScheme); + } + if (fileSys.exists(backupPath)) { + isExist = true; + } + return isExist; + } + + /** + * Check target path first, confirm it doesn't exist before backup + * @param backupRootPath backup destination path + * @param conf configuration + * @throws IOException exception + */ + public static void checkTargetDir(String backupRootPath, Configuration conf) throws IOException { + boolean targetExists = false; + try { + targetExists = checkPathExist(backupRootPath, conf); + } catch (IOException e) { + String expMsg = e.getMessage(); + String newMsg = null; + if (expMsg.contains("No FileSystem for scheme")) { + newMsg = + "Unsupported filesystem scheme found in the backup target url. Error Message: " + + newMsg; + LOG.error(newMsg); + throw new IOException(newMsg); + } else { + throw e; + } + } + + if (targetExists) { + LOG.info("Using existing backup root dir: " + backupRootPath); + } else { + LOG.info("Backup root dir " + backupRootPath + " does not exist. Will be created."); + } + } + + /** + * Get the min value for all the Values a map. + * @param map map + * @return the min value + */ + public static <T> Long getMinValue(HashMap<T, Long> map) { + Long minTimestamp = null; + if (map != null) { + ArrayList<Long> timestampList = new ArrayList<Long>(map.values()); + Collections.sort(timestampList); + // The min among all the RS log timestamps will be kept in backup system table table. + minTimestamp = timestampList.get(0); + } + return minTimestamp; + } + + /** + * Parses host name:port from archived WAL path + * @param p path + * @return host name + * @throws IOException exception + */ + public static String parseHostFromOldLog(Path p) { + try { + String n = p.getName(); + int idx = n.lastIndexOf(LOGNAME_SEPARATOR); + String s = URLDecoder.decode(n.substring(0, idx), "UTF8"); + return ServerName.parseHostname(s) + ":" + ServerName.parsePort(s); + } catch (Exception e) { + LOG.warn("Skip log file (can't parse): " + p); + return null; + } + } + + /** + * Given the log file, parse the timestamp from the file name. The timestamp is the last number. + * @param p a path to the log file + * @return the timestamp + * @throws IOException exception + */ + public static Long getCreationTime(Path p) throws IOException { + int idx = p.getName().lastIndexOf(LOGNAME_SEPARATOR); + if (idx < 0) { + throw new IOException("Cannot parse timestamp from path " + p); + } + String ts = p.getName().substring(idx + 1); + return Long.parseLong(ts); + } + + public static List<String> getFiles(FileSystem fs, Path rootDir, List<String> files, + PathFilter filter) throws FileNotFoundException, IOException { + RemoteIterator<LocatedFileStatus> it = fs.listFiles(rootDir, true); + + while (it.hasNext()) { + LocatedFileStatus lfs = it.next(); + if (lfs.isDirectory()) { + continue; + } + // apply filter + if (filter.accept(lfs.getPath())) { + files.add(lfs.getPath().toString()); + } + } + return files; + } + + public static void cleanupBackupData(BackupInfo context, Configuration conf) throws IOException { + cleanupHLogDir(context, conf); + cleanupTargetDir(context, conf); + } + + /** + * Clean up directories which are generated when DistCp copying hlogs + * @param backupInfo backup info + * @param conf configuration + * @throws IOException exception + */ + private static void cleanupHLogDir(BackupInfo backupInfo, Configuration conf) throws IOException { + + String logDir = backupInfo.getHLogTargetDir(); + if (logDir == null) { + LOG.warn("No log directory specified for " + backupInfo.getBackupId()); + return; + } + + Path rootPath = new Path(logDir).getParent(); + FileSystem fs = FileSystem.get(rootPath.toUri(), conf); + FileStatus[] files = listStatus(fs, rootPath, null); + if (files == null) { + return; + } + for (FileStatus file : files) { + LOG.debug("Delete log files: " + file.getPath().getName()); + fs.delete(file.getPath(), true); + } + } + + private static void cleanupTargetDir(BackupInfo backupInfo, Configuration conf) { + try { + // clean up the data at target directory + LOG.debug("Trying to cleanup up target dir : " + backupInfo.getBackupId()); + String targetDir = backupInfo.getBackupRootDir(); + if (targetDir == null) { + LOG.warn("No target directory specified for " + backupInfo.getBackupId()); + return; + } + + FileSystem outputFs = FileSystem.get(new Path(backupInfo.getBackupRootDir()).toUri(), conf); + + for (TableName table : backupInfo.getTables()) { + Path targetDirPath = + new Path(getTableBackupDir(backupInfo.getBackupRootDir(), backupInfo.getBackupId(), + table)); + if (outputFs.delete(targetDirPath, true)) { + LOG.info("Cleaning up backup data at " + targetDirPath.toString() + " done."); + } else { + LOG.info("No data has been found in " + targetDirPath.toString() + "."); + } + + Path tableDir = targetDirPath.getParent(); + FileStatus[] backups = listStatus(outputFs, tableDir, null); + if (backups == null || backups.length == 0) { + outputFs.delete(tableDir, true); + LOG.debug(tableDir.toString() + " is empty, remove it."); + } + } + outputFs.delete(new Path(targetDir, backupInfo.getBackupId()), true); + } catch (IOException e1) { + LOG.error("Cleaning up backup data of " + backupInfo.getBackupId() + " at " + + backupInfo.getBackupRootDir() + " failed due to " + e1.getMessage() + "."); + } + } + + /** + * Given the backup root dir, backup id and the table name, return the backup image location, + * which is also where the backup manifest file is. return value look like: + * "hdfs://backup.hbase.org:9000/user/biadmin/backup1/backup_1396650096738/default/t1_dn/" + * @param backupRootDir backup root directory + * @param backupId backup id + * @param tableName table name + * @return backupPath String for the particular table + */ + public static String + getTableBackupDir(String backupRootDir, String backupId, TableName tableName) { + return backupRootDir + Path.SEPARATOR + backupId + Path.SEPARATOR + + tableName.getNamespaceAsString() + Path.SEPARATOR + tableName.getQualifierAsString() + + Path.SEPARATOR; + } + + /** + * Sort history list by start time in descending order. + * @param historyList history list + * @return sorted list of BackupCompleteData + */ + public static ArrayList<BackupInfo> sortHistoryListDesc(ArrayList<BackupInfo> historyList) { + ArrayList<BackupInfo> list = new ArrayList<BackupInfo>(); + TreeMap<String, BackupInfo> map = new TreeMap<String, BackupInfo>(); + for (BackupInfo h : historyList) { + map.put(Long.toString(h.getStartTs()), h); + } + Iterator<String> i = map.descendingKeySet().iterator(); + while (i.hasNext()) { + list.add(map.get(i.next())); + } + return list; + } + + /** + * Calls fs.listStatus() and treats FileNotFoundException as non-fatal This accommodates + * differences between hadoop versions, where hadoop 1 does not throw a FileNotFoundException, and + * return an empty FileStatus[] while Hadoop 2 will throw FileNotFoundException. + * @param fs file system + * @param dir directory + * @param filter path filter + * @return null if dir is empty or doesn't exist, otherwise FileStatus array + */ + public static FileStatus[] + listStatus(final FileSystem fs, final Path dir, final PathFilter filter) throws IOException { + FileStatus[] status = null; + try { + status = filter == null ? fs.listStatus(dir) : fs.listStatus(dir, filter); + } catch (FileNotFoundException fnfe) { + // if directory doesn't exist, return null + if (LOG.isTraceEnabled()) { + LOG.trace(dir + " doesn't exist"); + } + } + if (status == null || status.length < 1) return null; + return status; + } + + /** + * Return the 'path' component of a Path. In Hadoop, Path is an URI. This method returns the + * 'path' component of a Path's URI: e.g. If a Path is + * <code>hdfs://example.org:9000/hbase_trunk/TestTable/compaction.dir</code>, this method returns + * <code>/hbase_trunk/TestTable/compaction.dir</code>. This method is useful if you want to print + * out a Path without qualifying Filesystem instance. + * @param p file system Path whose 'path' component we are to return. + * @return Path portion of the Filesystem + */ + public static String getPath(Path p) { + return p.toUri().getPath(); + } + + /** + * Given the backup root dir and the backup id, return the log file location for an incremental + * backup. + * @param backupRootDir backup root directory + * @param backupId backup id + * @return logBackupDir: ".../user/biadmin/backup1/WALs/backup_1396650096738" + */ + public static String getLogBackupDir(String backupRootDir, String backupId) { + return backupRootDir + Path.SEPARATOR + backupId + Path.SEPARATOR + + HConstants.HREGION_LOGDIR_NAME; + } + + private static List<BackupInfo> getHistory(Configuration conf, Path backupRootPath) + throws IOException { + // Get all (n) history from backup root destination + FileSystem fs = FileSystem.get(conf); + RemoteIterator<LocatedFileStatus> it = fs.listLocatedStatus(backupRootPath); + + List<BackupInfo> infos = new ArrayList<BackupInfo>(); + while (it.hasNext()) { + LocatedFileStatus lfs = it.next(); + if (!lfs.isDirectory()) continue; + String backupId = lfs.getPath().getName(); + try { + BackupInfo info = loadBackupInfo(backupRootPath, backupId, fs); + infos.add(info); + } catch (IOException e) { + LOG.error("Can not load backup info from: " + lfs.getPath(), e); + } + } + // Sort + Collections.sort(infos, new Comparator<BackupInfo>() { + + @Override + public int compare(BackupInfo o1, BackupInfo o2) { + long ts1 = getTimestamp(o1.getBackupId()); + long ts2 = getTimestamp(o2.getBackupId()); + if (ts1 == ts2) return 0; + return ts1 < ts2 ? 1 : -1; + } + + private long getTimestamp(String backupId) { + String[] split = backupId.split("_"); + return Long.parseLong(split[1]); + } + }); + return infos; + } + + public static List<BackupInfo> getHistory(Configuration conf, int n, Path backupRootPath, + BackupInfo.Filter... filters) throws IOException { + List<BackupInfo> infos = getHistory(conf, backupRootPath); + List<BackupInfo> ret = new ArrayList<BackupInfo>(); + for (BackupInfo info : infos) { + if (ret.size() == n) { + break; + } + boolean passed = true; + for (int i = 0; i < filters.length; i++) { + if (!filters[i].apply(info)) { + passed = false; + break; + } + } + if (passed) { + ret.add(info); + } + } + return ret; + } + + public static BackupInfo loadBackupInfo(Path backupRootPath, String backupId, FileSystem fs) + throws IOException { + Path backupPath = new Path(backupRootPath, backupId); + + RemoteIterator<LocatedFileStatus> it = fs.listFiles(backupPath, true); + while (it.hasNext()) { + LocatedFileStatus lfs = it.next(); + if (lfs.getPath().getName().equals(BackupManifest.MANIFEST_FILE_NAME)) { + // Load BackupManifest + BackupManifest manifest = new BackupManifest(fs, lfs.getPath().getParent()); + BackupInfo info = manifest.toBackupInfo(); + return info; + } + } + return null; + } + + /** + * Create restore request. + * @param backupRootDir backup root dir + * @param backupId backup id + * @param check check only + * @param fromTables table list from + * @param toTables table list to + * @param isOverwrite overwrite data + * @return request obkect + */ + public static RestoreRequest createRestoreRequest(String backupRootDir, String backupId, + boolean check, TableName[] fromTables, TableName[] toTables, boolean isOverwrite) { + RestoreRequest.Builder builder = new RestoreRequest.Builder(); + RestoreRequest request = + builder.withBackupRootDir(backupRootDir).withBackupId(backupId).withCheck(check) + .withFromTables(fromTables).withToTables(toTables).withOvewrite(isOverwrite).build(); + return request; + } + + public static boolean validate(HashMap<TableName, BackupManifest> backupManifestMap, + Configuration conf) throws IOException { + boolean isValid = true; + + for (Entry<TableName, BackupManifest> manifestEntry : backupManifestMap.entrySet()) { + TableName table = manifestEntry.getKey(); + TreeSet<BackupImage> imageSet = new TreeSet<BackupImage>(); + + ArrayList<BackupImage> depList = manifestEntry.getValue().getDependentListByTable(table); + if (depList != null && !depList.isEmpty()) { + imageSet.addAll(depList); + } + + LOG.info("Dependent image(s) from old to new:"); + for (BackupImage image : imageSet) { + String imageDir = + HBackupFileSystem.getTableBackupDir(image.getRootDir(), image.getBackupId(), table); + if (!BackupUtils.checkPathExist(imageDir, conf)) { + LOG.error("ERROR: backup image does not exist: " + imageDir); + isValid = false; + break; + } + LOG.info("Backup image: " + image.getBackupId() + " for '" + table + "' is available"); + } + } + return isValid; + } + + public static Path getBulkOutputDir(String tableName, Configuration conf, boolean deleteOnExit) + throws IOException { + FileSystem fs = FileSystem.get(conf); + String tmp = + conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY); + Path path = + new Path(tmp + Path.SEPARATOR + "bulk_output-" + tableName + "-" + + EnvironmentEdgeManager.currentTime()); + if (deleteOnExit) { + fs.deleteOnExit(path); + } + return path; + } + + public static Path getBulkOutputDir(String tableName, Configuration conf) throws IOException { + return getBulkOutputDir(tableName, conf, true); + } + + public static String getFileNameCompatibleString(TableName table) { + return table.getNamespaceAsString() + "-" + table.getQualifierAsString(); + } + + public static boolean failed(int result) { + return result != 0; + } + + public static boolean succeeded(int result) { + return result == 0; + } + + public static LoadIncrementalHFiles createLoader(Configuration config) throws IOException { + // set configuration for restore: + // LoadIncrementalHFile needs more time + // <name>hbase.rpc.timeout</name> <value>600000</value> + // calculates + Configuration conf = new Configuration(config); + conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, MILLISEC_IN_HOUR); + + // By default, it is 32 and loader will fail if # of files in any region exceed this + // limit. Bad for snapshot restore. + conf.setInt(LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY, Integer.MAX_VALUE); + conf.set(LoadIncrementalHFiles.IGNORE_UNMATCHED_CF_CONF_KEY, "yes"); + LoadIncrementalHFiles loader = null; + try { + loader = new LoadIncrementalHFiles(conf); + } catch (Exception e) { + throw new IOException(e); + } + return loader; + } +}