keith-turner commented on a change in pull request #2368:
URL: https://github.com/apache/accumulo/pull/2368#discussion_r758567995



##########
File path: 
core/src/main/java/org/apache/accumulo/core/file/rfile/GenerateSplits.java
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.file.rfile;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.io.BufferedWriter;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.cli.ClientOpts;
+import org.apache.accumulo.core.cli.ConfigOpts;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.SiteConfiguration;
+import org.apache.accumulo.core.crypto.CryptoServiceFactory;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.file.FileSKVIterator;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iteratorsImpl.system.MultiIterator;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.TextUtil;
+import org.apache.accumulo.start.spi.KeywordExecutable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+import com.google.auto.service.AutoService;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+@AutoService(KeywordExecutable.class)
+@SuppressFBWarnings(value = "PATH_TRAVERSAL_OUT",
+    justification = "app is run in same security context as user providing the 
filename")
+public class GenerateSplits implements KeywordExecutable {
+  private static final Logger log = 
LoggerFactory.getLogger(GenerateSplits.class);
+
+  static class Opts extends ConfigOpts {
+    @Parameter(names = {"-a", "--auths"}, converter = 
ClientOpts.AuthConverter.class,
+        description = "the authorizations to use when reading the files")
+    public Authorizations auths = Authorizations.EMPTY;
+
+    @Parameter(names = {"-n", "--num"}, description = "The number of splits to 
generate")
+    public long numSplits = 0;
+
+    @Parameter(names = {"-ss", "--split-size"}, description = "The split size 
desired in bytes")
+    public long splitSize = 0;
+
+    @Parameter(names = {"-b64", "--base64encoded"}, description = "Base 64 
encode the split points")
+    public boolean base64encode = false;
+
+    @Parameter(names = {"-sf", "--splits-file"}, description = "Output the 
splits to a file")
+    public String outputFile;
+
+    @Parameter(description = " <file|directory> { <file> ... }")
+    public List<String> files = new ArrayList<>();
+  }
+
+  @Override
+  public String keyword() {
+    return "generate-splits";
+  }
+
+  @Override
+  public String description() {
+    return "Generate split points from a set of 1 or more rfiles";
+  }
+
+  public static void main(String[] args) throws Exception {
+    new GenerateSplits().execute(args);
+  }
+
+  @Override
+  public void execute(String[] args) throws Exception {
+    Opts opts = new Opts();
+    opts.parseArgs(GenerateSplits.class.getName(), args);
+    if (opts.files.isEmpty()) {
+      throw new IllegalArgumentException("No files were given");
+    }
+
+    Configuration hadoopConf = new Configuration();
+    SiteConfiguration siteConf = opts.getSiteConfiguration();
+    boolean encode = opts.base64encode;
+
+    TreeSet<String> splits;
+    TreeSet<String> desiredSplits = new TreeSet<>();
+
+    if (opts.numSplits > 0 && opts.splitSize > 0) {
+      throw new IllegalArgumentException("Requested number of splits and split 
size.");
+    }
+    if (opts.numSplits == 0 && opts.splitSize == 0) {
+      throw new IllegalArgumentException("Required number of splits or split 
size.");
+    }
+    long numSplits = opts.numSplits;
+    long splitSize = opts.splitSize;
+
+    FileSystem fs = FileSystem.get(hadoopConf);
+    FileSystem localFs = FileSystem.getLocal(hadoopConf);
+    List<Path> filePaths = new ArrayList<>();
+    for (String file : opts.files) {
+      Path path = new Path(file);
+      if (file.contains(":")) {
+        fs = path.getFileSystem(hadoopConf);
+      } else {
+        log.warn("Attempting to find file across filesystems. Consider 
providing URI "
+            + "instead of path");
+        if (!fs.exists(path))
+          fs = localFs; // fall back to local
+      }
+      // get all the files in the directory
+      if (fs.getFileStatus(path).isDirectory()) {
+        // can only explode one directory
+        if (opts.files.size() > 1)
+          throw new IllegalArgumentException("Only one directory can be 
specified");
+        var iter = fs.listFiles(path, true);
+        while (iter.hasNext()) {
+          filePaths.add(iter.next().getPath());
+        }
+      } else {
+        filePaths.add(path);
+      }
+    }
+
+    // if no size specified look at indexed keys first
+    if (opts.splitSize == 0) {
+      splits = getIndexKeys(siteConf, hadoopConf, fs, filePaths, encode);
+      // if there weren't enough splits indexed, try again with size = 0
+      if (splits.size() < numSplits) {
+        log.info("Only found {} indexed keys but need {}. Doing a full scan on 
files {}",
+            splits.size(), numSplits, filePaths);
+        splits = getSplitsBySize(siteConf, hadoopConf, filePaths, fs, 0, 
encode);
+      }
+    } else {
+      splits = getSplitsBySize(siteConf, hadoopConf, filePaths, fs, splitSize, 
encode);
+    }
+
+    int numFound = splits.size();
+    // its possible we found too many indexed so take every (numFound / 
numSplits) split
+    if (opts.splitSize == 0 && numFound > numSplits) {
+      var iter = splits.iterator();
+      long factor = numFound / numSplits;
+      log.debug("Found {} splits but requested {} picking 1 every {}", 
numFound, opts.numSplits,
+          factor);
+      for (int i = 0; i < numFound; i++) {
+        String next = iter.next();
+        if (i % factor == 0 && desiredSplits.size() < numSplits) {
+          desiredSplits.add(next);
+        }
+      }

Review comment:
       For a case where numFound is 150 and numSplits is 100, 150/100 is 1, so 
factor is 1.   `i % 1` is always zero, so I think the first 100 splits out of 
the 150 will be taken.  This is probably not desired.   Thinking an approach 
similar to the following will yield better results.  Hopefully the following 
code would take splits `0,1,3,4,6,7,9,10, etc` but I am not sure the code is 
correct... however that is the intent.
   
   ```suggestion
         double targetFactor = (double)numFound / numSplits;
         log.debug("Found {} splits but requested {} picking 1 every {}", 
numFound, opts.numSplits,
             factor);
     
         for (int i = 0; i < numFound; i++) {
           double currFactor = (double)(i+1)/desiredSplits.size();
           String next = iter.next();
           // unsure if this should be currFactor >= targetFactor .. need to 
think through some edge cases
           if (currFactor > targetFactor && desiredSplits.size() < numSplits) {
             desiredSplits.add(next);
           }
         }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to