http://git-wip-us.apache.org/repos/asf/hbase/blob/2dda3712/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java deleted file mode 100644 index 49e8c75..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java +++ /dev/null @@ -1,181 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.backup.mapreduce; - -import java.io.IOException; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.KeyValue.Type; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.RegionLocator; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.mapreduce.HFileInputFormat; -import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2; -import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer; -import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; - -/** - * A tool to split HFiles into new region boundaries as a MapReduce job. The tool generates HFiles - * for later bulk importing. - */ -@InterfaceAudience.Private -public class MapReduceHFileSplitterJob extends Configured implements Tool { - private static final Log LOG = LogFactory.getLog(MapReduceHFileSplitterJob.class); - final static String NAME = "HFileSplitterJob"; - public final static String BULK_OUTPUT_CONF_KEY = "hfile.bulk.output"; - public final static String TABLES_KEY = "hfile.input.tables"; - public final static String TABLE_MAP_KEY = "hfile.input.tablesmap"; - private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name"; - - public MapReduceHFileSplitterJob() { - } - - protected MapReduceHFileSplitterJob(final Configuration c) { - super(c); - } - - /** - * A mapper that just writes out cells. This one can be used together with - * {@link KeyValueSortReducer} - */ - static class HFileCellMapper extends - Mapper<NullWritable, KeyValue, ImmutableBytesWritable, KeyValue> { - - @Override - public void map(NullWritable key, KeyValue value, Context context) throws IOException, - InterruptedException { - // Convert value to KeyValue if subclass - if (!value.getClass().equals(KeyValue.class)) { - value = - new KeyValue(value.getRowArray(), value.getRowOffset(), value.getRowLength(), - value.getFamilyArray(), value.getFamilyOffset(), value.getFamilyLength(), - value.getQualifierArray(), value.getQualifierOffset(), value.getQualifierLength(), - value.getTimestamp(), Type.codeToType(value.getTypeByte()), value.getValueArray(), - value.getValueOffset(), value.getValueLength()); - } - context.write(new ImmutableBytesWritable(CellUtil.cloneRow(value)), value); - } - - @Override - public void setup(Context context) throws IOException { - // do nothing - } - } - - /** - * Sets up the actual job. - * @param args The command line parameters. - * @return The newly created job. - * @throws IOException When setting up the job fails. - */ - public Job createSubmittableJob(String[] args) throws IOException { - Configuration conf = getConf(); - String inputDirs = args[0]; - String tabName = args[1]; - conf.setStrings(TABLES_KEY, tabName); - conf.set(FileInputFormat.INPUT_DIR, inputDirs); - Job job = - Job.getInstance(conf, - conf.get(JOB_NAME_CONF_KEY, NAME + "_" + EnvironmentEdgeManager.currentTime())); - job.setJarByClass(MapReduceHFileSplitterJob.class); - job.setInputFormatClass(HFileInputFormat.class); - job.setMapOutputKeyClass(ImmutableBytesWritable.class); - String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY); - if (hfileOutPath != null) { - LOG.debug("add incremental job :" + hfileOutPath + " from " + inputDirs); - TableName tableName = TableName.valueOf(tabName); - job.setMapperClass(HFileCellMapper.class); - job.setReducerClass(KeyValueSortReducer.class); - Path outputDir = new Path(hfileOutPath); - FileOutputFormat.setOutputPath(job, outputDir); - job.setMapOutputValueClass(KeyValue.class); - try (Connection conn = ConnectionFactory.createConnection(conf); - Table table = conn.getTable(tableName); - RegionLocator regionLocator = conn.getRegionLocator(tableName)) { - HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator); - } - LOG.debug("success configuring load incremental job"); - - TableMapReduceUtil.addDependencyJars(job.getConfiguration(), - org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions.class); - } else { - throw new IOException("No bulk output directory specified"); - } - return job; - } - - /** - * Print usage - * @param errorMsg Error message. Can be null. - */ - private void usage(final String errorMsg) { - if (errorMsg != null && errorMsg.length() > 0) { - System.err.println("ERROR: " + errorMsg); - } - System.err.println("Usage: " + NAME + " [options] <HFile inputdir(s)> <table>"); - System.err.println("Read all HFile's for <table> and split them to <table> region boundaries."); - System.err.println("<table> table to load.\n"); - System.err.println("To generate HFiles for a bulk data load, pass the option:"); - System.err.println(" -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output"); - System.err.println("Other options:"); - System.err.println(" -D " + JOB_NAME_CONF_KEY - + "=jobName - use the specified mapreduce job name for the HFile splitter"); - System.err.println("For performance also consider the following options:\n" - + " -Dmapreduce.map.speculative=false\n" + " -Dmapreduce.reduce.speculative=false"); - } - - /** - * Main entry point. - * @param args The command line parameters. - * @throws Exception When running the job fails. - */ - public static void main(String[] args) throws Exception { - int ret = ToolRunner.run(new MapReduceHFileSplitterJob(HBaseConfiguration.create()), args); - System.exit(ret); - } - - @Override - public int run(String[] args) throws Exception { - if (args.length < 2) { - usage("Wrong number of arguments: " + args.length); - System.exit(-1); - } - Job job = createSubmittableJob(args); - int result = job.waitForCompletion(true) ? 0 : 1; - return result; - } -}
http://git-wip-us.apache.org/repos/asf/hbase/blob/2dda3712/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java deleted file mode 100644 index 1209e7c..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java +++ /dev/null @@ -1,136 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.backup.mapreduce; - -import static org.apache.hadoop.hbase.backup.util.BackupUtils.failed; -import static org.apache.hadoop.hbase.backup.util.BackupUtils.succeeded; - -import java.io.IOException; - -import org.apache.commons.lang.StringUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.backup.BackupRestoreConstants; -import org.apache.hadoop.hbase.backup.RestoreJob; -import org.apache.hadoop.hbase.backup.util.BackupUtils; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; -import org.apache.hadoop.util.Tool; - - -/** - * MapReduce implementation of {@link RestoreJob} - * - * For backup restore, it runs {@link MapReduceHFileSplitterJob} job and creates - * HFiles which are aligned with a region boundaries of a table being - * restored. - * - * The resulting HFiles then are loaded using HBase bulk load tool - * {@link LoadIncrementalHFiles} - */ -@InterfaceAudience.Private -public class MapReduceRestoreJob implements RestoreJob { - public static final Log LOG = LogFactory.getLog(MapReduceRestoreJob.class); - - private Tool player; - private Configuration conf; - - public MapReduceRestoreJob() { - } - - @Override - public void run(Path[] dirPaths, TableName[] tableNames, TableName[] newTableNames, - boolean fullBackupRestore) throws IOException { - - String bulkOutputConfKey; - - player = new MapReduceHFileSplitterJob(); - bulkOutputConfKey = MapReduceHFileSplitterJob.BULK_OUTPUT_CONF_KEY; - // Player reads all files in arbitrary directory structure and creates - // a Map task for each file - String dirs = StringUtils.join(dirPaths, ","); - - if (LOG.isDebugEnabled()) { - LOG.debug("Restore " + (fullBackupRestore ? "full" : "incremental") - + " backup from directory " + dirs + " from hbase tables " - + StringUtils.join(tableNames, BackupRestoreConstants.TABLENAME_DELIMITER_IN_COMMAND) - + " to tables " - + StringUtils.join(newTableNames, BackupRestoreConstants.TABLENAME_DELIMITER_IN_COMMAND)); - } - - for (int i = 0; i < tableNames.length; i++) { - - LOG.info("Restore " + tableNames[i] + " into " + newTableNames[i]); - - Path bulkOutputPath = - BackupUtils.getBulkOutputDir(BackupUtils.getFileNameCompatibleString(newTableNames[i]), - getConf()); - Configuration conf = getConf(); - conf.set(bulkOutputConfKey, bulkOutputPath.toString()); - String[] playerArgs = - { - dirs, - fullBackupRestore ? newTableNames[i].getNameAsString() : tableNames[i] - .getNameAsString() }; - - int result = 0; - int loaderResult = 0; - try { - - player.setConf(getConf()); - result = player.run(playerArgs); - if (succeeded(result)) { - // do bulk load - LoadIncrementalHFiles loader = BackupUtils.createLoader(getConf()); - if (LOG.isDebugEnabled()) { - LOG.debug("Restoring HFiles from directory " + bulkOutputPath); - } - String[] args = { bulkOutputPath.toString(), newTableNames[i].getNameAsString() }; - loaderResult = loader.run(args); - - if (failed(loaderResult)) { - throw new IOException("Can not restore from backup directory " + dirs - + " (check Hadoop and HBase logs). Bulk loader return code =" + loaderResult); - } - } else { - throw new IOException("Can not restore from backup directory " + dirs - + " (check Hadoop/MR and HBase logs). Player return code =" + result); - } - LOG.debug("Restore Job finished:" + result); - } catch (Exception e) { - LOG.error(e); - throw new IOException("Can not restore from backup directory " + dirs - + " (check Hadoop and HBase logs) ", e); - } - } - } - - @Override - public Configuration getConf() { - return conf; - } - - @Override - public void setConf(Configuration conf) { - this.conf = conf; - } - -} http://git-wip-us.apache.org/repos/asf/hbase/blob/2dda3712/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/BackupLogCleaner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/BackupLogCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/BackupLogCleaner.java deleted file mode 100644 index b5b887c..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/BackupLogCleaner.java +++ /dev/null @@ -1,142 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.backup.master; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.hbase.HBaseInterfaceAudience; -import org.apache.hadoop.hbase.TableNotFoundException; -import org.apache.hadoop.hbase.backup.BackupRestoreConstants; -import org.apache.hadoop.hbase.backup.impl.BackupManager; -import org.apache.hadoop.hbase.backup.impl.BackupSystemTable; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.master.HMaster; -import org.apache.hadoop.hbase.master.MasterServices; -import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate; - -/** - * Implementation of a log cleaner that checks if a log is still scheduled for incremental backup - * before deleting it when its TTL is over. - */ -@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) -public class BackupLogCleaner extends BaseLogCleanerDelegate { - private static final Log LOG = LogFactory.getLog(BackupLogCleaner.class); - - private boolean stopped = false; - private Connection conn; - - public BackupLogCleaner() { - } - - @Override - public void init(Map<String, Object> params) { - if (params != null && params.containsKey(HMaster.MASTER)) { - MasterServices master = (MasterServices) params.get(HMaster.MASTER); - conn = master.getConnection(); - if (getConf() == null) { - super.setConf(conn.getConfiguration()); - } - } - if (conn == null) { - try { - conn = ConnectionFactory.createConnection(getConf()); - } catch (IOException ioe) { - throw new RuntimeException("Failed to create connection", ioe); - } - } - } - - @Override - public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) { - // all members of this class are null if backup is disabled, - // so we cannot filter the files - if (this.getConf() == null || !BackupManager.isBackupEnabled(getConf())) { - LOG.warn("Backup is not enabled. Check your " + BackupRestoreConstants.BACKUP_ENABLE_KEY - + " setting"); - return files; - } - - List<FileStatus> list = new ArrayList<FileStatus>(); - try (final BackupSystemTable table = new BackupSystemTable(conn)) { - // If we do not have recorded backup sessions - try { - if (!table.hasBackupSessions()) { - LOG.trace("BackupLogCleaner has no backup sessions"); - return files; - } - } catch (TableNotFoundException tnfe) { - LOG.warn("backup system table is not available" + tnfe.getMessage()); - return files; - } - - for (FileStatus file : files) { - String wal = file.getPath().toString(); - boolean logInSystemTable = table.isWALFileDeletable(wal); - if (LOG.isDebugEnabled()) { - if (logInSystemTable) { - LOG.debug("Found log file in backup system table, deleting: " + wal); - list.add(file); - } else { - LOG.debug("Didn't find this log in backup system table, keeping: " + wal); - } - } - } - return list; - } catch (IOException e) { - LOG.error("Failed to get backup system table table, therefore will keep all files", e); - // nothing to delete - return new ArrayList<FileStatus>(); - } - } - - @Override - public void setConf(Configuration config) { - // If backup is disabled, keep all members null - if (!config.getBoolean(BackupRestoreConstants.BACKUP_ENABLE_KEY, - BackupRestoreConstants.BACKUP_ENABLE_DEFAULT)) { - LOG.warn("Backup is disabled - allowing all wals to be deleted"); - return; - } - super.setConf(config); - } - - @Override - public void stop(String why) { - if (this.stopped) { - return; - } - this.stopped = true; - LOG.info("Stopping BackupLogCleaner"); - } - - @Override - public boolean isStopped() { - return this.stopped; - } - -} http://git-wip-us.apache.org/repos/asf/hbase/blob/2dda3712/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/LogRollMasterProcedureManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/LogRollMasterProcedureManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/LogRollMasterProcedureManager.java deleted file mode 100644 index 47e428c..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/LogRollMasterProcedureManager.java +++ /dev/null @@ -1,155 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.backup.master; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ThreadPoolExecutor; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.CoordinatedStateManagerFactory; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.backup.BackupRestoreConstants; -import org.apache.hadoop.hbase.backup.impl.BackupManager; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager; -import org.apache.hadoop.hbase.errorhandling.ForeignException; -import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; -import org.apache.hadoop.hbase.master.MasterServices; -import org.apache.hadoop.hbase.master.MetricsMaster; -import org.apache.hadoop.hbase.procedure.MasterProcedureManager; -import org.apache.hadoop.hbase.procedure.Procedure; -import org.apache.hadoop.hbase.procedure.ProcedureCoordinator; -import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs; -import org.apache.hadoop.hbase.procedure.RegionServerProcedureManager; -import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair; -import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription; -import org.apache.zookeeper.KeeperException; - -/** - * Master procedure manager for coordinated cluster-wide WAL roll operation, which is run during - * backup operation, see {@link MasterProcedureManager} and and {@link RegionServerProcedureManager} - */ -@InterfaceAudience.Private -public class LogRollMasterProcedureManager extends MasterProcedureManager { - - public static final String ROLLLOG_PROCEDURE_SIGNATURE = "rolllog-proc"; - public static final String ROLLLOG_PROCEDURE_NAME = "rolllog"; - private static final Log LOG = LogFactory.getLog(LogRollMasterProcedureManager.class); - - private MasterServices master; - private ProcedureCoordinator coordinator; - private boolean done; - - @Override - public void stop(String why) { - LOG.info("stop: " + why); - } - - @Override - public boolean isStopped() { - return false; - } - - @Override - public void initialize(MasterServices master, MetricsMaster metricsMaster) - throws KeeperException, IOException, UnsupportedOperationException { - this.master = master; - this.done = false; - - // setup the default procedure coordinator - String name = master.getServerName().toString(); - ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, 1); - BaseCoordinatedStateManager coordManager = - (BaseCoordinatedStateManager) CoordinatedStateManagerFactory - .getCoordinatedStateManager(master.getConfiguration()); - coordManager.initialize(master); - - ProcedureCoordinatorRpcs comms = - coordManager.getProcedureCoordinatorRpcs(getProcedureSignature(), name); - - this.coordinator = new ProcedureCoordinator(comms, tpool); - } - - @Override - public String getProcedureSignature() { - return ROLLLOG_PROCEDURE_SIGNATURE; - } - - @Override - public void execProcedure(ProcedureDescription desc) throws IOException { - if (!isBackupEnabled()) { - LOG.warn("Backup is not enabled. Check your " + BackupRestoreConstants.BACKUP_ENABLE_KEY - + " setting"); - return; - } - this.done = false; - // start the process on the RS - ForeignExceptionDispatcher monitor = new ForeignExceptionDispatcher(desc.getInstance()); - List<ServerName> serverNames = master.getServerManager().getOnlineServersList(); - List<String> servers = new ArrayList<String>(); - for (ServerName sn : serverNames) { - servers.add(sn.toString()); - } - - List<NameStringPair> conf = desc.getConfigurationList(); - byte[] data = new byte[0]; - if (conf.size() > 0) { - // Get backup root path - data = conf.get(0).getValue().getBytes(); - } - Procedure proc = coordinator.startProcedure(monitor, desc.getInstance(), data, servers); - if (proc == null) { - String msg = "Failed to submit distributed procedure for '" + desc.getInstance() + "'"; - LOG.error(msg); - throw new IOException(msg); - } - - try { - // wait for the procedure to complete. A timer thread is kicked off that should cancel this - // if it takes too long. - proc.waitForCompleted(); - LOG.info("Done waiting - exec procedure for " + desc.getInstance()); - LOG.info("Distributed roll log procedure is successful!"); - this.done = true; - } catch (InterruptedException e) { - ForeignException ee = - new ForeignException("Interrupted while waiting for roll log procdure to finish", e); - monitor.receive(ee); - Thread.currentThread().interrupt(); - } catch (ForeignException e) { - ForeignException ee = - new ForeignException("Exception while waiting for roll log procdure to finish", e); - monitor.receive(ee); - } - monitor.rethrowException(); - } - - private boolean isBackupEnabled() { - return BackupManager.isBackupEnabled(master.getConfiguration()); - } - - @Override - public boolean isProcedureDone(ProcedureDescription desc) throws IOException { - return done; - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/2dda3712/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedure.java deleted file mode 100644 index 8fc644c..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedure.java +++ /dev/null @@ -1,168 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.backup.regionserver; - -import java.util.HashMap; -import java.util.List; -import java.util.concurrent.Callable; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.backup.impl.BackupSystemTable; -import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.errorhandling.ForeignException; -import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; -import org.apache.hadoop.hbase.procedure.ProcedureMember; -import org.apache.hadoop.hbase.procedure.Subprocedure; -import org.apache.hadoop.hbase.regionserver.HRegionServer; -import org.apache.hadoop.hbase.regionserver.RegionServerServices; -import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; -import org.apache.hadoop.hbase.regionserver.wal.FSHLog; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.wal.WAL; - -/** - * This backup sub-procedure implementation forces a WAL rolling on a RS. - */ -@InterfaceAudience.Private -public class LogRollBackupSubprocedure extends Subprocedure { - private static final Log LOG = LogFactory.getLog(LogRollBackupSubprocedure.class); - - private final RegionServerServices rss; - private final LogRollBackupSubprocedurePool taskManager; - private FSHLog hlog; - private String backupRoot; - - public LogRollBackupSubprocedure(RegionServerServices rss, ProcedureMember member, - ForeignExceptionDispatcher errorListener, long wakeFrequency, long timeout, - LogRollBackupSubprocedurePool taskManager, byte[] data) { - - super(member, LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_NAME, errorListener, - wakeFrequency, timeout); - LOG.info("Constructing a LogRollBackupSubprocedure."); - this.rss = rss; - this.taskManager = taskManager; - if (data != null) { - backupRoot = new String(data); - } - } - - /** - * Callable task. TODO. We don't need a thread pool to execute roll log. This can be simplified - * with no use of subprocedurepool. - */ - class RSRollLogTask implements Callable<Void> { - RSRollLogTask() { - } - - @Override - public Void call() throws Exception { - if (LOG.isDebugEnabled()) { - LOG.debug("++ DRPC started: " + rss.getServerName()); - } - hlog = (FSHLog) rss.getWAL(null); - long filenum = hlog.getFilenum(); - List<WAL> wals = rss.getWALs(); - long highest = -1; - for (WAL wal : wals) { - if (wal == null) continue; - if (((AbstractFSWAL) wal).getFilenum() > highest) { - highest = ((AbstractFSWAL) wal).getFilenum(); - } - } - - LOG.info("Trying to roll log in backup subprocedure, current log number: " + filenum - + " highest: " + highest + " on " + rss.getServerName()); - ((HRegionServer) rss).getWalRoller().requestRollAll(); - long start = EnvironmentEdgeManager.currentTime(); - while (!((HRegionServer) rss).getWalRoller().walRollFinished()) { - Thread.sleep(20); - } - LOG.debug("log roll took " + (EnvironmentEdgeManager.currentTime() - start)); - LOG.info("After roll log in backup subprocedure, current log number: " + hlog.getFilenum() - + " on " + rss.getServerName()); - - Connection connection = rss.getConnection(); - try (final BackupSystemTable table = new BackupSystemTable(connection)) { - // sanity check, good for testing - HashMap<String, Long> serverTimestampMap = - table.readRegionServerLastLogRollResult(backupRoot); - String host = rss.getServerName().getHostname(); - int port = rss.getServerName().getPort(); - String server = host + ":" + port; - Long sts = serverTimestampMap.get(host); - if (sts != null && sts > highest) { - LOG.warn("Won't update server's last roll log result: current=" + sts + " new=" + highest); - return null; - } - // write the log number to backup system table. - table.writeRegionServerLastLogRollResult(server, highest, backupRoot); - return null; - } catch (Exception e) { - LOG.error(e); - throw e; - } - } - } - - private void rolllog() throws ForeignException { - monitor.rethrowException(); - - taskManager.submitTask(new RSRollLogTask()); - monitor.rethrowException(); - - // wait for everything to complete. - taskManager.waitForOutstandingTasks(); - monitor.rethrowException(); - - } - - @Override - public void acquireBarrier() throws ForeignException { - // do nothing, executing in inside barrier step. - } - - /** - * do a log roll. - * @return some bytes - */ - @Override - public byte[] insideBarrier() throws ForeignException { - rolllog(); - return null; - } - - /** - * Cancel threads if they haven't finished. - */ - @Override - public void cleanup(Exception e) { - taskManager.abort("Aborting log roll subprocedure tasks for backup due to error", e); - } - - /** - * Hooray! - */ - public void releaseBarrier() { - // NO OP - } - -} http://git-wip-us.apache.org/repos/asf/hbase/blob/2dda3712/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedurePool.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedurePool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedurePool.java deleted file mode 100644 index 65a1fa3..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedurePool.java +++ /dev/null @@ -1,139 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.backup.regionserver; - -import java.io.Closeable; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorCompletionService; -import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Abortable; -import org.apache.hadoop.hbase.DaemonThreadFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.errorhandling.ForeignException; - -/** - * Handle running each of the individual tasks for completing a backup procedure on a region - * server. - */ -@InterfaceAudience.Private -public class LogRollBackupSubprocedurePool implements Closeable, Abortable { - private static final Log LOG = LogFactory.getLog(LogRollBackupSubprocedurePool.class); - - /** Maximum number of concurrent snapshot region tasks that can run concurrently */ - private static final String CONCURENT_BACKUP_TASKS_KEY = "hbase.backup.region.concurrentTasks"; - private static final int DEFAULT_CONCURRENT_BACKUP_TASKS = 3; - - private final ExecutorCompletionService<Void> taskPool; - private final ThreadPoolExecutor executor; - private volatile boolean aborted; - private final List<Future<Void>> futures = new ArrayList<Future<Void>>(); - private final String name; - - public LogRollBackupSubprocedurePool(String name, Configuration conf) { - // configure the executor service - long keepAlive = - conf.getLong(LogRollRegionServerProcedureManager.BACKUP_TIMEOUT_MILLIS_KEY, - LogRollRegionServerProcedureManager.BACKUP_TIMEOUT_MILLIS_DEFAULT); - int threads = conf.getInt(CONCURENT_BACKUP_TASKS_KEY, DEFAULT_CONCURRENT_BACKUP_TASKS); - this.name = name; - executor = - new ThreadPoolExecutor(1, threads, keepAlive, TimeUnit.SECONDS, - new LinkedBlockingQueue<Runnable>(), new DaemonThreadFactory("rs(" + name - + ")-backup-pool")); - taskPool = new ExecutorCompletionService<Void>(executor); - } - - /** - * Submit a task to the pool. - */ - public void submitTask(final Callable<Void> task) { - Future<Void> f = this.taskPool.submit(task); - futures.add(f); - } - - /** - * Wait for all of the currently outstanding tasks submitted via {@link #submitTask(Callable)} - * @return <tt>true</tt> on success, <tt>false</tt> otherwise - * @throws ForeignException exception - */ - public boolean waitForOutstandingTasks() throws ForeignException { - LOG.debug("Waiting for backup procedure to finish."); - - try { - for (Future<Void> f : futures) { - f.get(); - } - return true; - } catch (InterruptedException e) { - if (aborted) { - throw new ForeignException("Interrupted and found to be aborted while waiting for tasks!", - e); - } - Thread.currentThread().interrupt(); - } catch (ExecutionException e) { - if (e.getCause() instanceof ForeignException) { - throw (ForeignException) e.getCause(); - } - throw new ForeignException(name, e.getCause()); - } finally { - // close off remaining tasks - for (Future<Void> f : futures) { - if (!f.isDone()) { - f.cancel(true); - } - } - } - return false; - } - - /** - * Attempt to cleanly shutdown any running tasks - allows currently running tasks to cleanly - * finish - */ - @Override - public void close() { - executor.shutdown(); - } - - @Override - public void abort(String why, Throwable e) { - if (this.aborted) { - return; - } - - this.aborted = true; - LOG.warn("Aborting because: " + why, e); - this.executor.shutdownNow(); - } - - @Override - public boolean isAborted() { - return this.aborted; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/2dda3712/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollRegionServerProcedureManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollRegionServerProcedureManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollRegionServerProcedureManager.java deleted file mode 100644 index 9d5a858..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollRegionServerProcedureManager.java +++ /dev/null @@ -1,185 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.backup.regionserver; - -import java.io.IOException; -import java.util.concurrent.ThreadPoolExecutor; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.CoordinatedStateManagerFactory; -import org.apache.hadoop.hbase.backup.BackupRestoreConstants; -import org.apache.hadoop.hbase.backup.impl.BackupManager; -import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager; -import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; -import org.apache.hadoop.hbase.procedure.ProcedureMember; -import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs; -import org.apache.hadoop.hbase.procedure.RegionServerProcedureManager; -import org.apache.hadoop.hbase.procedure.Subprocedure; -import org.apache.hadoop.hbase.procedure.SubprocedureFactory; -import org.apache.hadoop.hbase.regionserver.RegionServerServices; -import org.apache.zookeeper.KeeperException; - -/** - * This manager class handles the work dealing with distributed WAL roll request. - * <p> - * This provides the mechanism necessary to kick off a backup specific {@link Subprocedure} that is - * responsible by this region server. If any failures occur with the sub-procedure, the manager's - * procedure member notifies the procedure coordinator to abort all others. - * <p> - * On startup, requires {@link #start()} to be called. - * <p> - * On shutdown, requires org.apache.hadoop.hbase.procedure.ProcedureMember.close() to be called - */ -@InterfaceAudience.Private -public class LogRollRegionServerProcedureManager extends RegionServerProcedureManager { - - private static final Log LOG = LogFactory.getLog(LogRollRegionServerProcedureManager.class); - - /** Conf key for number of request threads to start backup on region servers */ - public static final String BACKUP_REQUEST_THREADS_KEY = "hbase.backup.region.pool.threads"; - /** # of threads for backup work on the rs. */ - public static final int BACKUP_REQUEST_THREADS_DEFAULT = 10; - - public static final String BACKUP_TIMEOUT_MILLIS_KEY = "hbase.backup.timeout"; - public static final long BACKUP_TIMEOUT_MILLIS_DEFAULT = 60000; - - /** Conf key for millis between checks to see if backup work completed or if there are errors */ - public static final String BACKUP_REQUEST_WAKE_MILLIS_KEY = "hbase.backup.region.wakefrequency"; - /** Default amount of time to check for errors while regions finish backup work */ - private static final long BACKUP_REQUEST_WAKE_MILLIS_DEFAULT = 500; - - private RegionServerServices rss; - private ProcedureMemberRpcs memberRpcs; - private ProcedureMember member; - private boolean started = false; - - /** - * Create a default backup procedure manager - */ - public LogRollRegionServerProcedureManager() { - } - - /** - * Start accepting backup procedure requests. - */ - @Override - public void start() { - if (!BackupManager.isBackupEnabled(rss.getConfiguration())) { - LOG.warn("Backup is not enabled. Check your " + BackupRestoreConstants.BACKUP_ENABLE_KEY - + " setting"); - return; - } - this.memberRpcs.start(rss.getServerName().toString(), member); - started = true; - LOG.info("Started region server backup manager."); - } - - /** - * Close <tt>this</tt> and all running backup procedure tasks - * @param force forcefully stop all running tasks - * @throws IOException exception - */ - @Override - public void stop(boolean force) throws IOException { - if (!started) { - return; - } - String mode = force ? "abruptly" : "gracefully"; - LOG.info("Stopping RegionServerBackupManager " + mode + "."); - - try { - this.member.close(); - } finally { - this.memberRpcs.close(); - } - } - - /** - * If in a running state, creates the specified subprocedure for handling a backup procedure. - * @return Subprocedure to submit to the ProcedureMemeber. - */ - public Subprocedure buildSubprocedure(byte[] data) { - - // don't run a backup if the parent is stop(ping) - if (rss.isStopping() || rss.isStopped()) { - throw new IllegalStateException("Can't start backup procedure on RS: " + rss.getServerName() - + ", because stopping/stopped!"); - } - - LOG.info("Attempting to run a roll log procedure for backup."); - ForeignExceptionDispatcher errorDispatcher = new ForeignExceptionDispatcher(); - Configuration conf = rss.getConfiguration(); - long timeoutMillis = conf.getLong(BACKUP_TIMEOUT_MILLIS_KEY, BACKUP_TIMEOUT_MILLIS_DEFAULT); - long wakeMillis = - conf.getLong(BACKUP_REQUEST_WAKE_MILLIS_KEY, BACKUP_REQUEST_WAKE_MILLIS_DEFAULT); - - LogRollBackupSubprocedurePool taskManager = - new LogRollBackupSubprocedurePool(rss.getServerName().toString(), conf); - return new LogRollBackupSubprocedure(rss, member, errorDispatcher, wakeMillis, timeoutMillis, - taskManager, data); - - } - - /** - * Build the actual backup procedure runner that will do all the 'hard' work - */ - public class BackupSubprocedureBuilder implements SubprocedureFactory { - - @Override - public Subprocedure buildSubprocedure(String name, byte[] data) { - return LogRollRegionServerProcedureManager.this.buildSubprocedure(data); - } - } - - @Override - public void initialize(RegionServerServices rss) throws KeeperException { - this.rss = rss; - if (!BackupManager.isBackupEnabled(rss.getConfiguration())) { - LOG.warn("Backup is not enabled. Check your " + BackupRestoreConstants.BACKUP_ENABLE_KEY - + " setting"); - return; - } - BaseCoordinatedStateManager coordManager = - (BaseCoordinatedStateManager) CoordinatedStateManagerFactory. - getCoordinatedStateManager(rss.getConfiguration()); - coordManager.initialize(rss); - this.memberRpcs = - coordManager - .getProcedureMemberRpcs(LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_SIGNATURE); - - // read in the backup handler configuration properties - Configuration conf = rss.getConfiguration(); - long keepAlive = conf.getLong(BACKUP_TIMEOUT_MILLIS_KEY, BACKUP_TIMEOUT_MILLIS_DEFAULT); - int opThreads = conf.getInt(BACKUP_REQUEST_THREADS_KEY, BACKUP_REQUEST_THREADS_DEFAULT); - // create the actual cohort member - ThreadPoolExecutor pool = - ProcedureMember.defaultPool(rss.getServerName().toString(), opThreads, keepAlive); - this.member = new ProcedureMember(memberRpcs, pool, new BackupSubprocedureBuilder()); - } - - @Override - public String getProcedureSignature() { - return "backup-proc"; - } - -} http://git-wip-us.apache.org/repos/asf/hbase/blob/2dda3712/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupSet.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupSet.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupSet.java deleted file mode 100644 index 0da6fc4..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupSet.java +++ /dev/null @@ -1,58 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.backup.util; - -import java.util.List; - -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.classification.InterfaceAudience; - -/** - * Backup set is a named group of HBase tables, which are managed together by Backup/Restore - * framework. Instead of using list of tables in backup or restore operation, one can use set's name - * instead. - */ -@InterfaceAudience.Private -public class BackupSet { - private final String name; - private final List<TableName> tables; - - public BackupSet(String name, List<TableName> tables) { - this.name = name; - this.tables = tables; - } - - public String getName() { - return name; - } - - public List<TableName> getTables() { - return tables; - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append(name).append("={"); - sb.append(StringUtils.join(tables, ',')); - sb.append("}"); - return sb.toString(); - } - -} http://git-wip-us.apache.org/repos/asf/hbase/blob/2dda3712/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java deleted file mode 100644 index ce77645..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java +++ /dev/null @@ -1,747 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.backup.util; - -import java.io.FileNotFoundException; -import java.io.IOException; -import java.net.URLDecoder; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map.Entry; -import java.util.TreeMap; -import java.util.TreeSet; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocatedFileStatus; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathFilter; -import org.apache.hadoop.fs.RemoteIterator; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.MetaTableAccessor; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.backup.BackupInfo; -import org.apache.hadoop.hbase.backup.BackupRestoreConstants; -import org.apache.hadoop.hbase.backup.HBackupFileSystem; -import org.apache.hadoop.hbase.backup.RestoreRequest; -import org.apache.hadoop.hbase.backup.impl.BackupManifest; -import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; -import org.apache.hadoop.hbase.regionserver.HRegion; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.util.FSTableDescriptors; -import org.apache.hadoop.hbase.util.FSUtils; -import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; - -/** - * A collection for methods used by multiple classes to backup HBase tables. - */ -@InterfaceAudience.Private -public final class BackupUtils { - protected static final Log LOG = LogFactory.getLog(BackupUtils.class); - public static final String LOGNAME_SEPARATOR = "."; - public static final int MILLISEC_IN_HOUR = 3600000; - - private BackupUtils() { - throw new AssertionError("Instantiating utility class..."); - } - - /** - * Loop through the RS log timestamp map for the tables, for each RS, find the min timestamp value - * for the RS among the tables. - * @param rsLogTimestampMap timestamp map - * @return the min timestamp of each RS - */ - public static HashMap<String, Long> getRSLogTimestampMins( - HashMap<TableName, HashMap<String, Long>> rsLogTimestampMap) { - - if (rsLogTimestampMap == null || rsLogTimestampMap.isEmpty()) { - return null; - } - - HashMap<String, Long> rsLogTimestampMins = new HashMap<String, Long>(); - HashMap<String, HashMap<TableName, Long>> rsLogTimestampMapByRS = - new HashMap<String, HashMap<TableName, Long>>(); - - for (Entry<TableName, HashMap<String, Long>> tableEntry : rsLogTimestampMap.entrySet()) { - TableName table = tableEntry.getKey(); - HashMap<String, Long> rsLogTimestamp = tableEntry.getValue(); - for (Entry<String, Long> rsEntry : rsLogTimestamp.entrySet()) { - String rs = rsEntry.getKey(); - Long ts = rsEntry.getValue(); - if (!rsLogTimestampMapByRS.containsKey(rs)) { - rsLogTimestampMapByRS.put(rs, new HashMap<TableName, Long>()); - rsLogTimestampMapByRS.get(rs).put(table, ts); - } else { - rsLogTimestampMapByRS.get(rs).put(table, ts); - } - } - } - - for (Entry<String, HashMap<TableName, Long>> entry : rsLogTimestampMapByRS.entrySet()) { - String rs = entry.getKey(); - rsLogTimestampMins.put(rs, BackupUtils.getMinValue(entry.getValue())); - } - - return rsLogTimestampMins; - } - - /** - * copy out Table RegionInfo into incremental backup image need to consider move this logic into - * HBackupFileSystem - * @param conn connection - * @param backupInfo backup info - * @param conf configuration - * @throws IOException exception - * @throws InterruptedException exception - */ - public static void - copyTableRegionInfo(Connection conn, BackupInfo backupInfo, Configuration conf) - throws IOException, InterruptedException { - Path rootDir = FSUtils.getRootDir(conf); - FileSystem fs = rootDir.getFileSystem(conf); - - // for each table in the table set, copy out the table info and region - // info files in the correct directory structure - for (TableName table : backupInfo.getTables()) { - - if (!MetaTableAccessor.tableExists(conn, table)) { - LOG.warn("Table " + table + " does not exists, skipping it."); - continue; - } - HTableDescriptor orig = FSTableDescriptors.getTableDescriptorFromFs(fs, rootDir, table); - - // write a copy of descriptor to the target directory - Path target = new Path(backupInfo.getTableBackupDir(table)); - FileSystem targetFs = target.getFileSystem(conf); - FSTableDescriptors descriptors = - new FSTableDescriptors(conf, targetFs, FSUtils.getRootDir(conf)); - descriptors.createTableDescriptorForTableDirectory(target, orig, false); - LOG.debug("Attempting to copy table info for:" + table + " target: " + target - + " descriptor: " + orig); - LOG.debug("Finished copying tableinfo."); - List<HRegionInfo> regions = null; - regions = MetaTableAccessor.getTableRegions(conn, table); - // For each region, write the region info to disk - LOG.debug("Starting to write region info for table " + table); - for (HRegionInfo regionInfo : regions) { - Path regionDir = - HRegion.getRegionDir(new Path(backupInfo.getTableBackupDir(table)), regionInfo); - regionDir = new Path(backupInfo.getTableBackupDir(table), regionDir.getName()); - writeRegioninfoOnFilesystem(conf, targetFs, regionDir, regionInfo); - } - LOG.debug("Finished writing region info for table " + table); - } - } - - /** - * Write the .regioninfo file on-disk. - */ - public static void writeRegioninfoOnFilesystem(final Configuration conf, final FileSystem fs, - final Path regionInfoDir, HRegionInfo regionInfo) throws IOException { - final byte[] content = regionInfo.toDelimitedByteArray(); - Path regionInfoFile = new Path(regionInfoDir, "." + HConstants.REGIONINFO_QUALIFIER_STR); - // First check to get the permissions - FsPermission perms = FSUtils.getFilePermissions(fs, conf, HConstants.DATA_FILE_UMASK_KEY); - // Write the RegionInfo file content - FSDataOutputStream out = FSUtils.create(conf, fs, regionInfoFile, perms, null); - try { - out.write(content); - } finally { - out.close(); - } - } - - /** - * Parses hostname:port from WAL file path - * @param p path to WAL file - * @return hostname:port - */ - public static String parseHostNameFromLogFile(Path p) { - try { - if (AbstractFSWALProvider.isArchivedLogFile(p)) { - return BackupUtils.parseHostFromOldLog(p); - } else { - ServerName sname = AbstractFSWALProvider.getServerNameFromWALDirectoryName(p); - if (sname != null) { - return sname.getAddress().toString(); - } else { - LOG.error("Skip log file (can't parse): " + p); - return null; - } - } - } catch (Exception e) { - LOG.error("Skip log file (can't parse): " + p, e); - return null; - } - } - - /** - * Returns WAL file name - * @param walFileName WAL file name - * @return WAL file name - * @throws IOException exception - * @throws IllegalArgumentException exception - */ - public static String getUniqueWALFileNamePart(String walFileName) throws IOException { - return getUniqueWALFileNamePart(new Path(walFileName)); - } - - /** - * Returns WAL file name - * @param p WAL file path - * @return WAL file name - * @throws IOException exception - */ - public static String getUniqueWALFileNamePart(Path p) throws IOException { - return p.getName(); - } - - /** - * Get the total length of files under the given directory recursively. - * @param fs The hadoop file system - * @param dir The target directory - * @return the total length of files - * @throws IOException exception - */ - public static long getFilesLength(FileSystem fs, Path dir) throws IOException { - long totalLength = 0; - FileStatus[] files = FSUtils.listStatus(fs, dir); - if (files != null) { - for (FileStatus fileStatus : files) { - if (fileStatus.isDirectory()) { - totalLength += getFilesLength(fs, fileStatus.getPath()); - } else { - totalLength += fileStatus.getLen(); - } - } - } - return totalLength; - } - - /** - * Get list of all old WAL files (WALs and archive) - * @param c configuration - * @param hostTimestampMap {host,timestamp} map - * @return list of WAL files - * @throws IOException exception - */ - public static List<String> getWALFilesOlderThan(final Configuration c, - final HashMap<String, Long> hostTimestampMap) throws IOException { - Path rootDir = FSUtils.getRootDir(c); - Path logDir = new Path(rootDir, HConstants.HREGION_LOGDIR_NAME); - Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME); - List<String> logFiles = new ArrayList<String>(); - - PathFilter filter = new PathFilter() { - - @Override - public boolean accept(Path p) { - try { - if (AbstractFSWALProvider.isMetaFile(p)) { - return false; - } - String host = parseHostNameFromLogFile(p); - if (host == null) { - return false; - } - Long oldTimestamp = hostTimestampMap.get(host); - Long currentLogTS = BackupUtils.getCreationTime(p); - return currentLogTS <= oldTimestamp; - } catch (Exception e) { - LOG.warn("Can not parse" + p, e); - return false; - } - } - }; - FileSystem fs = FileSystem.get(c); - logFiles = BackupUtils.getFiles(fs, logDir, logFiles, filter); - logFiles = BackupUtils.getFiles(fs, oldLogDir, logFiles, filter); - return logFiles; - } - - public static TableName[] parseTableNames(String tables) { - if (tables == null) { - return null; - } - String[] tableArray = tables.split(BackupRestoreConstants.TABLENAME_DELIMITER_IN_COMMAND); - - TableName[] ret = new TableName[tableArray.length]; - for (int i = 0; i < tableArray.length; i++) { - ret[i] = TableName.valueOf(tableArray[i]); - } - return ret; - } - - /** - * Check whether the backup path exist - * @param backupStr backup - * @param conf configuration - * @return Yes if path exists - * @throws IOException exception - */ - public static boolean checkPathExist(String backupStr, Configuration conf) throws IOException { - boolean isExist = false; - Path backupPath = new Path(backupStr); - FileSystem fileSys = backupPath.getFileSystem(conf); - String targetFsScheme = fileSys.getUri().getScheme(); - if (LOG.isTraceEnabled()) { - LOG.trace("Schema of given url: " + backupStr + " is: " + targetFsScheme); - } - if (fileSys.exists(backupPath)) { - isExist = true; - } - return isExist; - } - - /** - * Check target path first, confirm it doesn't exist before backup - * @param backupRootPath backup destination path - * @param conf configuration - * @throws IOException exception - */ - public static void checkTargetDir(String backupRootPath, Configuration conf) throws IOException { - boolean targetExists = false; - try { - targetExists = checkPathExist(backupRootPath, conf); - } catch (IOException e) { - String expMsg = e.getMessage(); - String newMsg = null; - if (expMsg.contains("No FileSystem for scheme")) { - newMsg = - "Unsupported filesystem scheme found in the backup target url. Error Message: " - + newMsg; - LOG.error(newMsg); - throw new IOException(newMsg); - } else { - throw e; - } - } - - if (targetExists) { - LOG.info("Using existing backup root dir: " + backupRootPath); - } else { - LOG.info("Backup root dir " + backupRootPath + " does not exist. Will be created."); - } - } - - /** - * Get the min value for all the Values a map. - * @param map map - * @return the min value - */ - public static <T> Long getMinValue(HashMap<T, Long> map) { - Long minTimestamp = null; - if (map != null) { - ArrayList<Long> timestampList = new ArrayList<Long>(map.values()); - Collections.sort(timestampList); - // The min among all the RS log timestamps will be kept in backup system table table. - minTimestamp = timestampList.get(0); - } - return minTimestamp; - } - - /** - * Parses host name:port from archived WAL path - * @param p path - * @return host name - * @throws IOException exception - */ - public static String parseHostFromOldLog(Path p) { - try { - String n = p.getName(); - int idx = n.lastIndexOf(LOGNAME_SEPARATOR); - String s = URLDecoder.decode(n.substring(0, idx), "UTF8"); - return ServerName.parseHostname(s) + ":" + ServerName.parsePort(s); - } catch (Exception e) { - LOG.warn("Skip log file (can't parse): " + p); - return null; - } - } - - /** - * Given the log file, parse the timestamp from the file name. The timestamp is the last number. - * @param p a path to the log file - * @return the timestamp - * @throws IOException exception - */ - public static Long getCreationTime(Path p) throws IOException { - int idx = p.getName().lastIndexOf(LOGNAME_SEPARATOR); - if (idx < 0) { - throw new IOException("Cannot parse timestamp from path " + p); - } - String ts = p.getName().substring(idx + 1); - return Long.parseLong(ts); - } - - public static List<String> getFiles(FileSystem fs, Path rootDir, List<String> files, - PathFilter filter) throws FileNotFoundException, IOException { - RemoteIterator<LocatedFileStatus> it = fs.listFiles(rootDir, true); - - while (it.hasNext()) { - LocatedFileStatus lfs = it.next(); - if (lfs.isDirectory()) { - continue; - } - // apply filter - if (filter.accept(lfs.getPath())) { - files.add(lfs.getPath().toString()); - } - } - return files; - } - - public static void cleanupBackupData(BackupInfo context, Configuration conf) throws IOException { - cleanupHLogDir(context, conf); - cleanupTargetDir(context, conf); - } - - /** - * Clean up directories which are generated when DistCp copying hlogs - * @param backupInfo backup info - * @param conf configuration - * @throws IOException exception - */ - private static void cleanupHLogDir(BackupInfo backupInfo, Configuration conf) throws IOException { - - String logDir = backupInfo.getHLogTargetDir(); - if (logDir == null) { - LOG.warn("No log directory specified for " + backupInfo.getBackupId()); - return; - } - - Path rootPath = new Path(logDir).getParent(); - FileSystem fs = FileSystem.get(rootPath.toUri(), conf); - FileStatus[] files = listStatus(fs, rootPath, null); - if (files == null) { - return; - } - for (FileStatus file : files) { - LOG.debug("Delete log files: " + file.getPath().getName()); - fs.delete(file.getPath(), true); - } - } - - private static void cleanupTargetDir(BackupInfo backupInfo, Configuration conf) { - try { - // clean up the data at target directory - LOG.debug("Trying to cleanup up target dir : " + backupInfo.getBackupId()); - String targetDir = backupInfo.getBackupRootDir(); - if (targetDir == null) { - LOG.warn("No target directory specified for " + backupInfo.getBackupId()); - return; - } - - FileSystem outputFs = FileSystem.get(new Path(backupInfo.getBackupRootDir()).toUri(), conf); - - for (TableName table : backupInfo.getTables()) { - Path targetDirPath = - new Path(getTableBackupDir(backupInfo.getBackupRootDir(), backupInfo.getBackupId(), - table)); - if (outputFs.delete(targetDirPath, true)) { - LOG.info("Cleaning up backup data at " + targetDirPath.toString() + " done."); - } else { - LOG.info("No data has been found in " + targetDirPath.toString() + "."); - } - - Path tableDir = targetDirPath.getParent(); - FileStatus[] backups = listStatus(outputFs, tableDir, null); - if (backups == null || backups.length == 0) { - outputFs.delete(tableDir, true); - LOG.debug(tableDir.toString() + " is empty, remove it."); - } - } - outputFs.delete(new Path(targetDir, backupInfo.getBackupId()), true); - } catch (IOException e1) { - LOG.error("Cleaning up backup data of " + backupInfo.getBackupId() + " at " - + backupInfo.getBackupRootDir() + " failed due to " + e1.getMessage() + "."); - } - } - - /** - * Given the backup root dir, backup id and the table name, return the backup image location, - * which is also where the backup manifest file is. return value look like: - * "hdfs://backup.hbase.org:9000/user/biadmin/backup1/backup_1396650096738/default/t1_dn/" - * @param backupRootDir backup root directory - * @param backupId backup id - * @param tableName table name - * @return backupPath String for the particular table - */ - public static String - getTableBackupDir(String backupRootDir, String backupId, TableName tableName) { - return backupRootDir + Path.SEPARATOR + backupId + Path.SEPARATOR - + tableName.getNamespaceAsString() + Path.SEPARATOR + tableName.getQualifierAsString() - + Path.SEPARATOR; - } - - /** - * Sort history list by start time in descending order. - * @param historyList history list - * @return sorted list of BackupCompleteData - */ - public static ArrayList<BackupInfo> sortHistoryListDesc(ArrayList<BackupInfo> historyList) { - ArrayList<BackupInfo> list = new ArrayList<BackupInfo>(); - TreeMap<String, BackupInfo> map = new TreeMap<String, BackupInfo>(); - for (BackupInfo h : historyList) { - map.put(Long.toString(h.getStartTs()), h); - } - Iterator<String> i = map.descendingKeySet().iterator(); - while (i.hasNext()) { - list.add(map.get(i.next())); - } - return list; - } - - /** - * Calls fs.listStatus() and treats FileNotFoundException as non-fatal This accommodates - * differences between hadoop versions, where hadoop 1 does not throw a FileNotFoundException, and - * return an empty FileStatus[] while Hadoop 2 will throw FileNotFoundException. - * @param fs file system - * @param dir directory - * @param filter path filter - * @return null if dir is empty or doesn't exist, otherwise FileStatus array - */ - public static FileStatus[] - listStatus(final FileSystem fs, final Path dir, final PathFilter filter) throws IOException { - FileStatus[] status = null; - try { - status = filter == null ? fs.listStatus(dir) : fs.listStatus(dir, filter); - } catch (FileNotFoundException fnfe) { - // if directory doesn't exist, return null - if (LOG.isTraceEnabled()) { - LOG.trace(dir + " doesn't exist"); - } - } - if (status == null || status.length < 1) return null; - return status; - } - - /** - * Return the 'path' component of a Path. In Hadoop, Path is an URI. This method returns the - * 'path' component of a Path's URI: e.g. If a Path is - * <code>hdfs://example.org:9000/hbase_trunk/TestTable/compaction.dir</code>, this method returns - * <code>/hbase_trunk/TestTable/compaction.dir</code>. This method is useful if you want to print - * out a Path without qualifying Filesystem instance. - * @param p file system Path whose 'path' component we are to return. - * @return Path portion of the Filesystem - */ - public static String getPath(Path p) { - return p.toUri().getPath(); - } - - /** - * Given the backup root dir and the backup id, return the log file location for an incremental - * backup. - * @param backupRootDir backup root directory - * @param backupId backup id - * @return logBackupDir: ".../user/biadmin/backup1/WALs/backup_1396650096738" - */ - public static String getLogBackupDir(String backupRootDir, String backupId) { - return backupRootDir + Path.SEPARATOR + backupId + Path.SEPARATOR - + HConstants.HREGION_LOGDIR_NAME; - } - - private static List<BackupInfo> getHistory(Configuration conf, Path backupRootPath) - throws IOException { - // Get all (n) history from backup root destination - FileSystem fs = FileSystem.get(conf); - RemoteIterator<LocatedFileStatus> it = fs.listLocatedStatus(backupRootPath); - - List<BackupInfo> infos = new ArrayList<BackupInfo>(); - while (it.hasNext()) { - LocatedFileStatus lfs = it.next(); - if (!lfs.isDirectory()) continue; - String backupId = lfs.getPath().getName(); - try { - BackupInfo info = loadBackupInfo(backupRootPath, backupId, fs); - infos.add(info); - } catch (IOException e) { - LOG.error("Can not load backup info from: " + lfs.getPath(), e); - } - } - // Sort - Collections.sort(infos, new Comparator<BackupInfo>() { - - @Override - public int compare(BackupInfo o1, BackupInfo o2) { - long ts1 = getTimestamp(o1.getBackupId()); - long ts2 = getTimestamp(o2.getBackupId()); - if (ts1 == ts2) return 0; - return ts1 < ts2 ? 1 : -1; - } - - private long getTimestamp(String backupId) { - String[] split = backupId.split("_"); - return Long.parseLong(split[1]); - } - }); - return infos; - } - - public static List<BackupInfo> getHistory(Configuration conf, int n, Path backupRootPath, - BackupInfo.Filter... filters) throws IOException { - List<BackupInfo> infos = getHistory(conf, backupRootPath); - List<BackupInfo> ret = new ArrayList<BackupInfo>(); - for (BackupInfo info : infos) { - if (ret.size() == n) { - break; - } - boolean passed = true; - for (int i = 0; i < filters.length; i++) { - if (!filters[i].apply(info)) { - passed = false; - break; - } - } - if (passed) { - ret.add(info); - } - } - return ret; - } - - public static BackupInfo loadBackupInfo(Path backupRootPath, String backupId, FileSystem fs) - throws IOException { - Path backupPath = new Path(backupRootPath, backupId); - - RemoteIterator<LocatedFileStatus> it = fs.listFiles(backupPath, true); - while (it.hasNext()) { - LocatedFileStatus lfs = it.next(); - if (lfs.getPath().getName().equals(BackupManifest.MANIFEST_FILE_NAME)) { - // Load BackupManifest - BackupManifest manifest = new BackupManifest(fs, lfs.getPath().getParent()); - BackupInfo info = manifest.toBackupInfo(); - return info; - } - } - return null; - } - - /** - * Create restore request. - * @param backupRootDir backup root dir - * @param backupId backup id - * @param check check only - * @param fromTables table list from - * @param toTables table list to - * @param isOverwrite overwrite data - * @return request obkect - */ - public static RestoreRequest createRestoreRequest(String backupRootDir, String backupId, - boolean check, TableName[] fromTables, TableName[] toTables, boolean isOverwrite) { - RestoreRequest.Builder builder = new RestoreRequest.Builder(); - RestoreRequest request = - builder.withBackupRootDir(backupRootDir).withBackupId(backupId).withCheck(check) - .withFromTables(fromTables).withToTables(toTables).withOvewrite(isOverwrite).build(); - return request; - } - - public static boolean validate(HashMap<TableName, BackupManifest> backupManifestMap, - Configuration conf) throws IOException { - boolean isValid = true; - - for (Entry<TableName, BackupManifest> manifestEntry : backupManifestMap.entrySet()) { - TableName table = manifestEntry.getKey(); - TreeSet<BackupImage> imageSet = new TreeSet<BackupImage>(); - - ArrayList<BackupImage> depList = manifestEntry.getValue().getDependentListByTable(table); - if (depList != null && !depList.isEmpty()) { - imageSet.addAll(depList); - } - - LOG.info("Dependent image(s) from old to new:"); - for (BackupImage image : imageSet) { - String imageDir = - HBackupFileSystem.getTableBackupDir(image.getRootDir(), image.getBackupId(), table); - if (!BackupUtils.checkPathExist(imageDir, conf)) { - LOG.error("ERROR: backup image does not exist: " + imageDir); - isValid = false; - break; - } - LOG.info("Backup image: " + image.getBackupId() + " for '" + table + "' is available"); - } - } - return isValid; - } - - public static Path getBulkOutputDir(String tableName, Configuration conf, boolean deleteOnExit) - throws IOException { - FileSystem fs = FileSystem.get(conf); - String tmp = - conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY); - Path path = - new Path(tmp + Path.SEPARATOR + "bulk_output-" + tableName + "-" - + EnvironmentEdgeManager.currentTime()); - if (deleteOnExit) { - fs.deleteOnExit(path); - } - return path; - } - - public static Path getBulkOutputDir(String tableName, Configuration conf) throws IOException { - return getBulkOutputDir(tableName, conf, true); - } - - public static String getFileNameCompatibleString(TableName table) { - return table.getNamespaceAsString() + "-" + table.getQualifierAsString(); - } - - public static boolean failed(int result) { - return result != 0; - } - - public static boolean succeeded(int result) { - return result == 0; - } - - public static LoadIncrementalHFiles createLoader(Configuration config) throws IOException { - // set configuration for restore: - // LoadIncrementalHFile needs more time - // <name>hbase.rpc.timeout</name> <value>600000</value> - // calculates - Configuration conf = new Configuration(config); - conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, MILLISEC_IN_HOUR); - - // By default, it is 32 and loader will fail if # of files in any region exceed this - // limit. Bad for snapshot restore. - conf.setInt(LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY, Integer.MAX_VALUE); - conf.set(LoadIncrementalHFiles.IGNORE_UNMATCHED_CF_CONF_KEY, "yes"); - LoadIncrementalHFiles loader = null; - try { - loader = new LoadIncrementalHFiles(conf); - } catch (Exception e) { - throw new IOException(e); - } - return loader; - } -}