Repository: hbase
Updated Branches:
  refs/heads/0.98 38deb4b82 -> 333ea48ae


HBASE-11326 Use an InputFormat for ExportSnapshot


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/333ea48a
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/333ea48a
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/333ea48a

Branch: refs/heads/0.98
Commit: 333ea48ae2fef96a9d55ba5f920e4db2274614f7
Parents: 488afeb
Author: Matteo Bertozzi <matteo.berto...@cloudera.com>
Authored: Thu Jun 12 09:06:00 2014 +0100
Committer: Matteo Bertozzi <matteo.berto...@cloudera.com>
Committed: Fri Aug 22 10:17:20 2014 +0100

----------------------------------------------------------------------
 .../hadoop/hbase/snapshot/ExportSnapshot.java   | 326 ++++++++++++-------
 .../hbase/snapshot/TestExportSnapshot.java      |  21 +-
 2 files changed, 215 insertions(+), 132 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/333ea48a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java
index 0639218..4a25a3c 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.snapshot;
 
 import java.io.BufferedInputStream;
 import java.io.FileNotFoundException;
+import java.io.DataInput;
+import java.io.DataOutput;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.URI;
@@ -63,8 +65,14 @@ import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
 import org.apache.hadoop.mapreduce.security.TokenCache;
@@ -84,6 +92,10 @@ import org.apache.hadoop.util.ToolRunner;
 public class ExportSnapshot extends Configured implements Tool {
   private static final Log LOG = LogFactory.getLog(ExportSnapshot.class);
 
+  private static final String MR_NUM_MAPS = "mapreduce.job.maps";
+  private static final String CONF_NUM_SPLITS = 
"snapshot.export.format.splits";
+  private static final String CONF_SNAPSHOT_NAME = 
"snapshot.export.format.snapshot.name";
+  private static final String CONF_SNAPSHOT_DIR = 
"snapshot.export.format.snapshot.dir";
   private static final String CONF_FILES_USER = 
"snapshot.export.files.attributes.user";
   private static final String CONF_FILES_GROUP = 
"snapshot.export.files.attributes.group";
   private static final String CONF_FILES_MODE = 
"snapshot.export.files.attributes.mode";
@@ -462,19 +474,23 @@ public class ExportSnapshot extends Configured implements 
Tool {
     }
   }
 
+  // ==========================================================================
+  //  Input Format
+  // ==========================================================================
+
   /**
    * Extract the list of files (HFiles/HLogs) to copy using Map-Reduce.
    * @return list of files referenced by the snapshot (pair of path and size)
    */
-  private List<Pair<SnapshotFileInfo, Long>> getSnapshotFiles(final FileSystem 
fs,
-      final Path snapshotDir) throws IOException {
+  private static List<Pair<SnapshotFileInfo, Long>> getSnapshotFiles(final 
Configuration conf,
+      final FileSystem fs, final Path snapshotDir) throws IOException {
     SnapshotDescription snapshotDesc = 
SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
 
     final List<Pair<SnapshotFileInfo, Long>> files = new 
ArrayList<Pair<SnapshotFileInfo, Long>>();
     final TableName table = TableName.valueOf(snapshotDesc.getTable());
-    final Configuration conf = getConf();
 
     // Get snapshot files
+    LOG.info("Loading Snapshot '" + snapshotDesc.getName() + "' hfile list");
     SnapshotReferenceUtil.visitReferencedFiles(conf, fs, snapshotDir, 
snapshotDesc,
       new SnapshotReferenceUtil.SnapshotVisitor() {
         @Override
@@ -492,7 +508,12 @@ public class ExportSnapshot extends Configured implements 
Tool {
               .setHfile(path.toString())
               .build();
 
-            long size = new HFileLink(conf, path).getFileStatus(fs).getLen();
+            long size;
+            if (storeFile.hasFileSize()) {
+              size = storeFile.getFileSize();
+            } else {
+              size = new HFileLink(conf, path).getFileStatus(fs).getLen();
+            }
             files.add(new Pair<SnapshotFileInfo, Long>(fileInfo, size));
           }
         }
@@ -522,7 +543,7 @@ public class ExportSnapshot extends Configured implements 
Tool {
    * and then each group fetch the bigger file available, iterating through 
groups
    * alternating the direction.
    */
-  static List<List<SnapshotFileInfo>> getBalancedSplits(
+  static List<List<Pair<SnapshotFileInfo, Long>>> getBalancedSplits(
       final List<Pair<SnapshotFileInfo, Long>> files, final int ngroups) {
     // Sort files by size, from small to big
     Collections.sort(files, new Comparator<Pair<SnapshotFileInfo, Long>>() {
@@ -533,18 +554,19 @@ public class ExportSnapshot extends Configured implements 
Tool {
     });
 
     // create balanced groups
-    List<List<SnapshotFileInfo>> fileGroups = new 
LinkedList<List<SnapshotFileInfo>>();
+    List<List<Pair<SnapshotFileInfo, Long>>> fileGroups =
+      new LinkedList<List<Pair<SnapshotFileInfo, Long>>>();
     long[] sizeGroups = new long[ngroups];
     int hi = files.size() - 1;
     int lo = 0;
 
-    List<SnapshotFileInfo> group;
+    List<Pair<SnapshotFileInfo, Long>> group;
     int dir = 1;
     int g = 0;
 
     while (hi >= lo) {
       if (g == fileGroups.size()) {
-        group = new LinkedList<SnapshotFileInfo>();
+        group = new LinkedList<Pair<SnapshotFileInfo, Long>>();
         fileGroups.add(group);
       } else {
         group = fileGroups.get(g);
@@ -554,7 +576,7 @@ public class ExportSnapshot extends Configured implements 
Tool {
 
       // add the hi one
       sizeGroups[g] += fileInfo.getSecond();
-      group.add(fileInfo.getFirst());
+      group.add(fileInfo);
 
       // change direction when at the end or the beginning
       g += dir;
@@ -576,102 +598,177 @@ public class ExportSnapshot extends Configured 
implements Tool {
     return fileGroups;
   }
 
-  private static Path getInputFolderPath(Configuration conf)
-      throws IOException, InterruptedException {
-    Path stagingDir = JobUtil.getStagingDir(conf);
-    return new Path(stagingDir, INPUT_FOLDER_PREFIX +
-      String.valueOf(EnvironmentEdgeManager.currentTimeMillis()));
-  }
+  private static class ExportSnapshotInputFormat extends 
InputFormat<BytesWritable, NullWritable> {
+    @Override
+    public RecordReader<BytesWritable, NullWritable> 
createRecordReader(InputSplit split,
+        TaskAttemptContext tac) throws IOException, InterruptedException {
+      return new 
ExportSnapshotRecordReader(((ExportSnapshotInputSplit)split).getSplitKeys());
+    }
 
-  /**
-   * Create the input files, with the path to copy, for the MR job.
-   * Each input files contains n files, and each input file has a similar 
amount data to copy.
-   * The number of input files created are based on the number of mappers 
provided as argument
-   * and the number of the files to copy.
-   */
-  private static Path[] createInputFiles(final Configuration conf, final Path 
inputFolderPath,
-      final List<Pair<SnapshotFileInfo, Long>> snapshotFiles, int mappers)
-      throws IOException, InterruptedException {
-    FileSystem fs = inputFolderPath.getFileSystem(conf);
-    LOG.debug("Input folder location: " + inputFolderPath);
-
-    List<List<SnapshotFileInfo>> splits = getBalancedSplits(snapshotFiles, 
mappers);
-    Path[] inputFiles = new Path[splits.size()];
-
-    BytesWritable key = new BytesWritable();
-    for (int i = 0; i < inputFiles.length; i++) {
-      List<SnapshotFileInfo> files = splits.get(i);
-      inputFiles[i] = new Path(inputFolderPath, String.format("export-%d.seq", 
i));
-      SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, 
inputFiles[i],
-        BytesWritable.class, NullWritable.class);
-      LOG.debug("Input split: " + i);
-      try {
-        for (SnapshotFileInfo file: files) {
-          byte[] pbFileInfo = file.toByteArray();
-          key.set(pbFileInfo, 0, pbFileInfo.length);
-          writer.append(key, NullWritable.get());
+    @Override
+    public List<InputSplit> getSplits(JobContext context) throws IOException, 
InterruptedException {
+      Configuration conf = context.getConfiguration();
+      String snapshotName = conf.get(CONF_SNAPSHOT_NAME);
+      Path snapshotDir = new Path(conf.get(CONF_SNAPSHOT_DIR));
+      FileSystem fs = FileSystem.get(snapshotDir.toUri(), conf);
+
+      List<Pair<SnapshotFileInfo, Long>> snapshotFiles = 
getSnapshotFiles(conf, fs, snapshotDir);
+      int mappers = conf.getInt(CONF_NUM_SPLITS, 0);
+      if (mappers == 0 && snapshotFiles.size() > 0) {
+        mappers = 1 + (snapshotFiles.size() / conf.getInt(CONF_MAP_GROUP, 10));
+        mappers = Math.min(mappers, snapshotFiles.size());
+        conf.setInt(CONF_NUM_SPLITS, mappers);
+        conf.setInt(MR_NUM_MAPS, mappers);
+      }
+
+      List<List<Pair<SnapshotFileInfo, Long>>> groups = 
getBalancedSplits(snapshotFiles, mappers);
+      List<InputSplit> splits = new ArrayList(groups.size());
+      for (List<Pair<SnapshotFileInfo, Long>> files: groups) {
+        splits.add(new ExportSnapshotInputSplit(files));
+      }
+      return splits;
+    }
+
+    private static class ExportSnapshotInputSplit extends InputSplit 
implements Writable {
+      private List<Pair<BytesWritable, Long>> files;
+      private long length;
+
+      public ExportSnapshotInputSplit() {
+        this.files = null;
+      }
+
+      public ExportSnapshotInputSplit(final List<Pair<SnapshotFileInfo, Long>> 
snapshotFiles) {
+        this.files = new ArrayList(snapshotFiles.size());
+        for (Pair<SnapshotFileInfo, Long> fileInfo: snapshotFiles) {
+          this.files.add(new Pair<BytesWritable, Long>(
+            new BytesWritable(fileInfo.getFirst().toByteArray()), 
fileInfo.getSecond()));
+          this.length += fileInfo.getSecond();
+        }
+      }
+
+      private List<Pair<BytesWritable, Long>> getSplitKeys() {
+        return files;
+      }
+
+      @Override
+      public long getLength() throws IOException, InterruptedException {
+        return length;
+      }
+
+      @Override
+      public String[] getLocations() throws IOException, InterruptedException {
+        return new String[] {};
+      }
+
+      @Override
+      public void readFields(DataInput in) throws IOException {
+        int count = in.readInt();
+        files = new ArrayList<Pair<BytesWritable, Long>>(count);
+        length = 0;
+        for (int i = 0; i < count; ++i) {
+          BytesWritable fileInfo = new BytesWritable();
+          fileInfo.readFields(in);
+          long size = in.readLong();
+          files.add(new Pair<BytesWritable, Long>(fileInfo, size));
+          length += size;
+        }
+      }
+
+      @Override
+      public void write(DataOutput out) throws IOException {
+        out.writeInt(files.size());
+        for (final Pair<BytesWritable, Long> fileInfo: files) {
+          fileInfo.getFirst().write(out);
+          out.writeLong(fileInfo.getSecond());
         }
-      } finally {
-        writer.close();
       }
     }
 
-    return inputFiles;
+    private static class ExportSnapshotRecordReader
+        extends RecordReader<BytesWritable, NullWritable> {
+      private final List<Pair<BytesWritable, Long>> files;
+      private long totalSize = 0;
+      private long procSize = 0;
+      private int index = -1;
+
+      ExportSnapshotRecordReader(final List<Pair<BytesWritable, Long>> files) {
+        this.files = files;
+        for (Pair<BytesWritable, Long> fileInfo: files) {
+          totalSize += fileInfo.getSecond();
+        }
+      }
+
+      @Override
+      public void close() { }
+
+      @Override
+      public BytesWritable getCurrentKey() { return 
files.get(index).getFirst(); }
+
+      @Override
+      public NullWritable getCurrentValue() { return NullWritable.get(); }
+
+      @Override
+      public float getProgress() { return (float)procSize / totalSize; }
+
+      @Override
+      public void initialize(InputSplit split, TaskAttemptContext tac) { }
+
+      @Override
+      public boolean nextKeyValue() {
+        if (index >= 0) {
+          procSize += files.get(index).getSecond();
+        }
+        return(++index < files.size());
+      }
+    }
   }
 
+  // ==========================================================================
+  //  Tool
+  // ==========================================================================
+
   /**
    * Run Map-Reduce Job to perform the files copy.
    */
   private void runCopyJob(final Path inputRoot, final Path outputRoot,
-      final List<Pair<SnapshotFileInfo, Long>> snapshotFiles, final boolean 
verifyChecksum,
+      final String snapshotName, final Path snapshotDir, final boolean 
verifyChecksum,
       final String filesUser, final String filesGroup, final int filesMode,
       final int mappers, final int bandwidthMB)
           throws IOException, InterruptedException, ClassNotFoundException {
     Configuration conf = getConf();
     if (filesGroup != null) conf.set(CONF_FILES_GROUP, filesGroup);
     if (filesUser != null) conf.set(CONF_FILES_USER, filesUser);
+    if (mappers > 0) {
+      conf.setInt(CONF_NUM_SPLITS, mappers);
+      conf.setInt(MR_NUM_MAPS, mappers);
+    }
     conf.setInt(CONF_FILES_MODE, filesMode);
     conf.setBoolean(CONF_CHECKSUM_VERIFY, verifyChecksum);
     conf.set(CONF_OUTPUT_ROOT, outputRoot.toString());
     conf.set(CONF_INPUT_ROOT, inputRoot.toString());
-    conf.setInt("mapreduce.job.maps", mappers);
     conf.setInt(CONF_BANDWIDTH_MB, bandwidthMB);
+    conf.set(CONF_SNAPSHOT_NAME, snapshotName);
+    conf.set(CONF_SNAPSHOT_DIR, snapshotDir.toString());
 
     Job job = new Job(conf);
-    job.setJobName("ExportSnapshot");
+    job.setJobName("ExportSnapshot-" + snapshotName);
     job.setJarByClass(ExportSnapshot.class);
     TableMapReduceUtil.addDependencyJars(job);
     job.setMapperClass(ExportMapper.class);
-    job.setInputFormatClass(SequenceFileInputFormat.class);
+    job.setInputFormatClass(ExportSnapshotInputFormat.class);
     job.setOutputFormatClass(NullOutputFormat.class);
     job.setMapSpeculativeExecution(false);
     job.setNumReduceTasks(0);
 
-    // Create MR Input
-    Path inputFolderPath = getInputFolderPath(conf);
-    for (Path path: createInputFiles(conf, inputFolderPath, snapshotFiles, 
mappers)) {
-      LOG.debug("Add Input Path=" + path);
-      SequenceFileInputFormat.addInputPath(job, path);
-    }
+    // Acquire the delegation Tokens
+    TokenCache.obtainTokensForNamenodes(job.getCredentials(),
+      new Path[] { inputRoot, outputRoot }, conf);
 
-    try {
-      // Acquire the delegation Tokens
-      TokenCache.obtainTokensForNamenodes(job.getCredentials(),
-        new Path[] { inputRoot, outputRoot }, conf);
-
-      // Run the MR Job
-      if (!job.waitForCompletion(true)) {
-        // TODO: Replace the fixed string with job.getStatus().getFailureInfo()
-        // when it will be available on all the supported versions.
-        throw new ExportSnapshotException("Copy Files Map-Reduce Job failed");
-      }
-    } finally {
-      // Remove MR Input
-      try {
-        inputFolderPath.getFileSystem(conf).delete(inputFolderPath, true);
-      } catch (IOException e) {
-        LOG.warn("Unable to remove MR input folder: " + inputFolderPath, e);
-      }
+    // Run the MR Job
+    if (!job.waitForCompletion(true)) {
+      // TODO: Replace the fixed string with job.getStatus().getFailureInfo()
+      // when it will be available on all the supported versions.
+      throw new ExportSnapshotException("Copy Files Map-Reduce Job failed");
     }
   }
 
@@ -704,45 +801,40 @@ public class ExportSnapshot extends Configured implements 
Tool {
     int mappers = 0;
 
     Configuration conf = getConf();
+    Path inputRoot = FSUtils.getRootDir(conf);
 
     // Process command line args
     for (int i = 0; i < args.length; i++) {
       String cmd = args[i];
-      try {
-        if (cmd.equals("-snapshot")) {
-          snapshotName = args[++i];
-        } else if (cmd.equals("-target")) {
-          targetName = args[++i];
-        } else if (cmd.equals("-copy-to")) {
-          outputRoot = new Path(args[++i]);
-        } else if (cmd.equals("-copy-from")) {
-          Path sourceDir = new Path(args[++i]);
-          URI defaultFs = sourceDir.getFileSystem(conf).getUri();
-          FSUtils.setFsDefault(conf, new Path(defaultFs));
-          FSUtils.setRootDir(conf, sourceDir);
-        } else if (cmd.equals("-no-checksum-verify")) {
-          verifyChecksum = false;
-        } else if (cmd.equals("-no-target-verify")) {
-          verifyTarget = false;
-        } else if (cmd.equals("-mappers")) {
-          mappers = Integer.parseInt(args[++i]);
-        } else if (cmd.equals("-chuser")) {
-          filesUser = args[++i];
-        } else if (cmd.equals("-chgroup")) {
-          filesGroup = args[++i];
-        } else if (cmd.equals("-bandwidth")) {
-          bandwidthMB = Integer.parseInt(args[++i]);
-        } else if (cmd.equals("-chmod")) {
-          filesMode = Integer.parseInt(args[++i], 8);
-        } else if (cmd.equals("-overwrite")) {
-          overwrite = true;
-        } else if (cmd.equals("-h") || cmd.equals("--help")) {
-          printUsageAndExit();
-        } else {
-          System.err.println("UNEXPECTED: " + cmd);
-          printUsageAndExit();
-        }
-      } catch (IOException e) {
+      if (cmd.equals("-snapshot")) {
+        snapshotName = args[++i];
+      } else if (cmd.equals("-target")) {
+        targetName = args[++i];
+      } else if (cmd.equals("-copy-to")) {
+        outputRoot = new Path(args[++i]);
+      } else if (cmd.equals("-copy-from")) {
+        inputRoot = new Path(args[++i]);
+        FSUtils.setRootDir(conf, inputRoot);
+      } else if (cmd.equals("-no-checksum-verify")) {
+        verifyChecksum = false;
+      } else if (cmd.equals("-no-target-verify")) {
+        verifyTarget = false;
+      } else if (cmd.equals("-mappers")) {
+        mappers = Integer.parseInt(args[++i]);
+      } else if (cmd.equals("-chuser")) {
+        filesUser = args[++i];
+      } else if (cmd.equals("-chgroup")) {
+        filesGroup = args[++i];
+      } else if (cmd.equals("-bandwidth")) {
+        bandwidthMB = Integer.parseInt(args[++i]);
+      } else if (cmd.equals("-chmod")) {
+        filesMode = Integer.parseInt(args[++i], 8);
+      } else if (cmd.equals("-overwrite")) {
+        overwrite = true;
+      } else if (cmd.equals("-h") || cmd.equals("--help")) {
+        printUsageAndExit();
+      } else {
+        System.err.println("UNEXPECTED: " + cmd);
         printUsageAndExit();
       }
     }
@@ -762,7 +854,6 @@ public class ExportSnapshot extends Configured implements 
Tool {
       targetName = snapshotName;
     }
 
-    Path inputRoot = FSUtils.getRootDir(conf);
     FileSystem inputFs = FileSystem.get(inputRoot.toUri(), conf);
     LOG.debug("inputFs=" + inputFs.getUri().toString() + " inputRoot=" + 
inputRoot);
     FileSystem outputFs = FileSystem.get(outputRoot.toUri(), conf);
@@ -806,14 +897,6 @@ public class ExportSnapshot extends Configured implements 
Tool {
       }
     }
 
-    // Step 0 - Extract snapshot files to copy
-    LOG.info("Loading Snapshot hfile list");
-    final List<Pair<SnapshotFileInfo, Long>> files = getSnapshotFiles(inputFs, 
snapshotDir);
-    if (mappers == 0 && files.size() > 0) {
-      mappers = 1 + (files.size() / conf.getInt(CONF_MAP_GROUP, 10));
-      mappers = Math.min(mappers, files.size());
-    }
-
     // Step 1 - Copy fs1:/.snapshot/<snapshot> to  
fs2:/.snapshot/.tmp/<snapshot>
     // The snapshot references must be copied before the hfiles otherwise the 
cleaner
     // will remove them because they are unreferenced.
@@ -839,13 +922,8 @@ public class ExportSnapshot extends Configured implements 
Tool {
     // The snapshot references must be copied before the files otherwise the 
files gets removed
     // by the HFileArchiver, since they have no references.
     try {
-      if (files.size() == 0) {
-        LOG.warn("There are 0 store file to be copied. There may be no data in 
the table.");
-      } else {
-        runCopyJob(inputRoot, outputRoot, files, verifyChecksum,
-                   filesUser, filesGroup, filesMode, mappers, bandwidthMB);
-      }
-
+      runCopyJob(inputRoot, outputRoot, snapshotName, snapshotDir, 
verifyChecksum,
+                 filesUser, filesGroup, filesMode, mappers, bandwidthMB);
 
       LOG.info("Finalize the Snapshot Export");
       if (!skipTmp) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/333ea48a/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestExportSnapshot.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestExportSnapshot.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestExportSnapshot.java
index ce74d91..95268e9 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestExportSnapshot.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestExportSnapshot.java
@@ -161,26 +161,31 @@ public class TestExportSnapshot {
     //    group 2: 18, 13,  8,  3 (total size: 42)
     //    group 3: 17, 12,  7,  4 (total size: 42)
     //    group 4: 16, 11,  6,  5 (total size: 42)
-    List<List<SnapshotFileInfo>> splits = 
ExportSnapshot.getBalancedSplits(files, 5);
+    List<List<Pair<SnapshotFileInfo, Long>>> splits = 
ExportSnapshot.getBalancedSplits(files, 5);
     assertEquals(5, splits.size());
 
     String[] split0 = new String[] {"file-20", "file-11", "file-10", "file-1", 
"file-0"};
-    verifyBalanceSplit(splits.get(0), split0);
+    verifyBalanceSplit(splits.get(0), split0, 42);
     String[] split1 = new String[] {"file-19", "file-12", "file-9",  "file-2"};
-    verifyBalanceSplit(splits.get(1), split1);
+    verifyBalanceSplit(splits.get(1), split1, 42);
     String[] split2 = new String[] {"file-18", "file-13", "file-8",  "file-3"};
-    verifyBalanceSplit(splits.get(2), split2);
+    verifyBalanceSplit(splits.get(2), split2, 42);
     String[] split3 = new String[] {"file-17", "file-14", "file-7",  "file-4"};
-    verifyBalanceSplit(splits.get(3), split3);
+    verifyBalanceSplit(splits.get(3), split3, 42);
     String[] split4 = new String[] {"file-16", "file-15", "file-6",  "file-5"};
-    verifyBalanceSplit(splits.get(4), split4);
+    verifyBalanceSplit(splits.get(4), split4, 42);
   }
 
-  private void verifyBalanceSplit(final List<SnapshotFileInfo> split, final 
String[] expected) {
+  private void verifyBalanceSplit(final List<Pair<SnapshotFileInfo, Long>> 
split,
+      final String[] expected, final long expectedSize) {
     assertEquals(expected.length, split.size());
+    long totalSize = 0;
     for (int i = 0; i < expected.length; ++i) {
-      assertEquals(expected[i], split.get(i).getHfile());
+      Pair<SnapshotFileInfo, Long> fileInfo = split.get(i);
+      assertEquals(expected[i], fileInfo.getFirst().getHfile());
+      totalSize += fileInfo.getSecond();
     }
+    assertEquals(expectedSize, totalSize);
   }
 
   /**

Reply via email to