milleruntime closed pull request #20: Update bulkIngest example. Fixes #17
URL: https://github.com/apache/accumulo-examples/pull/20
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/bulkIngest.md b/docs/bulkIngest.md
index b856d83..5dda1d1 100644
--- a/docs/bulkIngest.md
+++ b/docs/bulkIngest.md
@@ -16,18 +16,28 @@ limitations under the License.
 -->
 # Apache Accumulo Bulk Ingest Example
 
-This is an example of how to bulk ingest data into accumulo using map reduce.
+This is an example of how to bulk ingest data into Accumulo using map reduce.
+
+This tutorial uses the following Java classes.
+
+ * [SetupTable.java] - creates the table and some data to ingest
+ * [BulkIngestExample.java] - ingest the data using map reduce
+ * [VerifyIngest.java] - checks that the data was ingested
+ 
+Remember to copy the accumulo-examples-\*.jar to Accumulo's 'lib/ext' 
directory.
+
+    $ cp target/accumulo-examples-*.jar /path/accumulo/lib/ext
 
 The following commands show how to run this example. This example creates a
 table called test_bulk which has two initial split points. Then 1000 rows of
 test data are created in HDFS. After that the 1000 rows are ingested into
-accumulo. Then we verify the 1000 rows are in accumulo.
+Accumulo. Then we verify the 1000 rows are in Accumulo. 
 
     $ PKG=org.apache.accumulo.examples.mapreduce.bulk
-    $ ARGS="-c examples.conf"
-    $ accumulo $PKG.SetupTable $ARGS -t test_bulk row_00000333 row_00000666
-    $ accumulo $PKG.GenerateTestData --start-row 0 --count 1000 --output 
bulk/test_1.txt
-    $ accumulo-util hadoop-jar target/accumulo-examples-X.Y.Z.jar 
$PKG.BulkIngestExample $ARGS -t test_bulk --inputDir bulk --workDir tmp/bulkWork
-    $ accumulo $PKG.VerifyIngest $ARGS -t test_bulk --start-row 0 --count 1000
+    $ accumulo $PKG.SetupTable
+    $ accumulo-util hadoop-jar target/accumulo-examples-*.jar 
$PKG.BulkIngestExample
+    $ ./bin/runex mapreduce.bulk.VerifyIngest
 
-For a high level discussion of bulk ingest, see the docs dir.
+[SetupTable.java]: 
../src/main/java/org/apache/accumulo/examples/mapreduce/bulk/SetupTable.java
+[BulkIngestExample.java]:  
../src/main/java/org/apache/accumulo/examples/mapreduce/bulk/BulkIngestExample.java
+[VerifyIngest.java]: 
../src/main/java/org/apache/accumulo/examples/mapreduce/bulk/VerifyIngest.java
diff --git 
a/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/BulkIngestExample.java
 
b/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/BulkIngestExample.java
index 9754243..f87a768 100644
--- 
a/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/BulkIngestExample.java
+++ 
b/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/BulkIngestExample.java
@@ -22,11 +22,15 @@
 import java.util.Base64;
 import java.util.Collection;
 
+import org.apache.accumulo.core.client.ConnectionInfo;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.mapreduce.AccumuloFileOutputFormat;
+import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
+import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
 import 
org.apache.accumulo.core.client.mapreduce.lib.partition.RangePartitioner;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.util.TextUtil;
 import org.apache.accumulo.examples.cli.MapReduceClientOnRequiredTable;
 import org.apache.hadoop.conf.Configuration;
@@ -48,7 +52,11 @@
 /**
  * Example map reduce job that bulk ingest data into an accumulo table. The 
expected input is text files containing tab separated key value pairs on each 
line.
  */
+
 public class BulkIngestExample extends Configured implements Tool {
+  static String workDir = "tmp/bulkWork";
+  static String inputDir = "bulk";
+
   public static class MapClass extends Mapper<LongWritable,Text,Text,Text> {
     private Text outputKey = new Text();
     private Text outputValue = new Text();
@@ -94,18 +102,8 @@ public void reduce(Text key, Iterable<Text> values, Context 
output) throws IOExc
     }
   }
 
-  static class Opts extends MapReduceClientOnRequiredTable {
-    @Parameter(names = "--inputDir", required = true)
-    String inputDir;
-    @Parameter(names = "--workDir", required = true)
-    String workDir;
-  }
-
   @Override
   public int run(String[] args) {
-    Opts opts = new Opts();
-    opts.parseArgs(BulkIngestExample.class.getName(), args);
-
     Configuration conf = getConf();
     PrintStream out = null;
     try {
@@ -121,17 +119,22 @@ public int run(String[] args) {
 
       job.setReducerClass(ReduceClass.class);
       job.setOutputFormatClass(AccumuloFileOutputFormat.class);
-      opts.setAccumuloConfigs(job);
 
-      Connector connector = opts.getConnector();
+      Connector connector = 
Connector.builder().usingProperties("conf/accumulo-client.properties").build();
+      ConnectionInfo connectionInfo = 
Connector.builder().usingProperties("conf/accumulo-client.properties").info();
+      AccumuloInputFormat.setConnectionInfo(job, connectionInfo);
+      AccumuloInputFormat.setInputTableName(job, SetupTable.tableName);
+      AccumuloInputFormat.setScanAuthorizations(job, Authorizations.EMPTY);
+      AccumuloOutputFormat.setCreateTables(job, true);
+      AccumuloOutputFormat.setDefaultTableName(job, SetupTable.tableName);
 
-      TextInputFormat.setInputPaths(job, new Path(opts.inputDir));
-      AccumuloFileOutputFormat.setOutputPath(job, new Path(opts.workDir + 
"/files"));
+      TextInputFormat.setInputPaths(job, new Path(inputDir));
+      AccumuloFileOutputFormat.setOutputPath(job, new Path(workDir + 
"/files"));
 
       FileSystem fs = FileSystem.get(conf);
-      out = new PrintStream(new BufferedOutputStream(fs.create(new 
Path(opts.workDir + "/splits.txt"))));
+      out = new PrintStream(new BufferedOutputStream(fs.create(new 
Path(workDir + "/splits.txt"))));
 
-      Collection<Text> splits = 
connector.tableOperations().listSplits(opts.getTableName(), 100);
+      Collection<Text> splits = 
connector.tableOperations().listSplits(SetupTable.tableName, 100);
       for (Text split : splits)
         
out.println(Base64.getEncoder().encodeToString(TextUtil.getBytes(split)));
 
@@ -139,16 +142,16 @@ public int run(String[] args) {
       out.close();
 
       job.setPartitionerClass(RangePartitioner.class);
-      RangePartitioner.setSplitFile(job, opts.workDir + "/splits.txt");
+      RangePartitioner.setSplitFile(job, workDir + "/splits.txt");
 
       job.waitForCompletion(true);
-      Path failures = new Path(opts.workDir, "failures");
+      Path failures = new Path(workDir, "failures");
       fs.delete(failures, true);
-      fs.mkdirs(new Path(opts.workDir, "failures"));
+      fs.mkdirs(new Path(workDir, "failures"));
       // With HDFS permissions on, we need to make sure the Accumulo user can 
read/move the rfiles
       FsShell fsShell = new FsShell(conf);
-      fsShell.run(new String[] {"-chmod", "-R", "777", opts.workDir});
-      connector.tableOperations().importDirectory(opts.getTableName(), 
opts.workDir + "/files", opts.workDir + "/failures", false);
+      fsShell.run(new String[] {"-chmod", "-R", "777", workDir});
+      connector.tableOperations().importDirectory(SetupTable.tableName, 
workDir + "/files", workDir + "/failures", false);
 
     } catch (Exception e) {
       throw new RuntimeException(e);
diff --git 
a/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/GenerateTestData.java
 
b/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/GenerateTestData.java
deleted file mode 100644
index 4622ea0..0000000
--- 
a/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/GenerateTestData.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.examples.mapreduce.bulk;
-
-import java.io.BufferedOutputStream;
-import java.io.IOException;
-import java.io.PrintStream;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import com.beust.jcommander.Parameter;
-
-public class GenerateTestData {
-
-  static class Opts extends org.apache.accumulo.core.cli.Help {
-    @Parameter(names = "--start-row", required = true)
-    int startRow = 0;
-    @Parameter(names = "--count", required = true)
-    int numRows = 0;
-    @Parameter(names = "--output", required = true)
-    String outputFile;
-  }
-
-  public static void main(String[] args) throws IOException {
-    Opts opts = new Opts();
-    opts.parseArgs(GenerateTestData.class.getName(), args);
-
-    FileSystem fs = FileSystem.get(new Configuration());
-    PrintStream out = new PrintStream(new BufferedOutputStream(fs.create(new 
Path(opts.outputFile))));
-
-    for (int i = 0; i < opts.numRows; i++) {
-      out.println(String.format("row_%010d\tvalue_%010d", i + opts.startRow, i 
+ opts.startRow));
-    }
-    out.close();
-  }
-
-}
diff --git 
a/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/SetupTable.java 
b/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/SetupTable.java
index 17e549d..225be47 100644
--- a/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/SetupTable.java
+++ b/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/SetupTable.java
@@ -16,35 +16,46 @@
  */
 package org.apache.accumulo.examples.mapreduce.bulk;
 
-import java.util.ArrayList;
-import java.util.List;
+import java.io.BufferedOutputStream;
+import java.io.PrintStream;
 import java.util.TreeSet;
 
 import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.examples.cli.ClientOnRequiredTable;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 
-import com.beust.jcommander.Parameter;
 
 public class SetupTable {
 
-  static class Opts extends ClientOnRequiredTable {
-    @Parameter(description = "<split> { <split> ... } ")
-    List<String> splits = new ArrayList<>();
-  }
+  static String[] splits = {"row_00000333", "row_00000666"};
+  static String tableName = "test_bulk";
+  static int numRows = 1000;
+  static String outputFile = "bulk/test_1.txt";
 
   public static void main(String[] args) throws Exception {
-    Opts opts = new Opts();
-    opts.parseArgs(SetupTable.class.getName(), args);
-    Connector conn = opts.getConnector();
-    conn.tableOperations().create(opts.getTableName());
-    if (!opts.splits.isEmpty()) {
-      // create a table with initial partitions
-      TreeSet<Text> intialPartitions = new TreeSet<>();
-      for (String split : opts.splits) {
-        intialPartitions.add(new Text(split));
+    Connector conn = 
Connector.builder().usingProperties("conf/accumulo-client.properties").build();
+    try {
+      conn.tableOperations().create(tableName);
+    } catch (TableExistsException e) {
+      //ignore
+    }
+
+    // create a table with initial partitions
+    TreeSet<Text> intialPartitions = new TreeSet<>();
+    for (String split : splits) {
+      intialPartitions.add(new Text(split));
+    }
+    conn.tableOperations().addSplits(tableName, intialPartitions);
+
+    FileSystem fs = FileSystem.get(new Configuration());
+    try (PrintStream out = new PrintStream(new 
BufferedOutputStream(fs.create(new Path(outputFile))))) {
+      // create some data in outputFile
+      for (int i = 0; i < numRows; i++) {
+        out.println(String.format("row_%010d\tvalue_%010d", i, i));
       }
-      conn.tableOperations().addSplits(opts.getTableName(), intialPartitions);
     }
   }
 }
diff --git 
a/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/VerifyIngest.java 
b/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/VerifyIngest.java
index 0dead6d..6fd1318 100644
--- 
a/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/VerifyIngest.java
+++ 
b/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/VerifyIngest.java
@@ -27,37 +27,25 @@
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.examples.cli.ClientOnRequiredTable;
+import org.apache.accumulo.core.security.Authorizations;
 import org.apache.hadoop.io.Text;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.beust.jcommander.Parameter;
-
 public class VerifyIngest {
   private static final Logger log = 
LoggerFactory.getLogger(VerifyIngest.class);
 
-  static class Opts extends ClientOnRequiredTable {
-    @Parameter(names = "--start-row")
-    int startRow = 0;
-    @Parameter(names = "--count", required = true, description = "number of 
rows to verify")
-    int numRows = 0;
-  }
-
   public static void main(String[] args) throws AccumuloException, 
AccumuloSecurityException, TableNotFoundException {
-    Opts opts = new Opts();
-    opts.parseArgs(VerifyIngest.class.getName(), args);
-
-    Connector connector = opts.getConnector();
-    Scanner scanner = connector.createScanner(opts.getTableName(), opts.auths);
+    Connector connector = 
Connector.builder().usingProperties("conf/accumulo-client.properties").build();
+    Scanner scanner = connector.createScanner(SetupTable.tableName, 
Authorizations.EMPTY);
 
-    scanner.setRange(new Range(new Text(String.format("row_%010d", 
opts.startRow)), null));
+    scanner.setRange(new Range(String.format("row_%010d", 0), null));
 
     Iterator<Entry<Key,Value>> si = scanner.iterator();
 
     boolean ok = true;
 
-    for (int i = opts.startRow; i < opts.numRows; i++) {
+    for (int i = 0; i < SetupTable.numRows; i++) {
 
       if (si.hasNext()) {
         Entry<Key,Value> entry = si.next();
diff --git a/src/test/java/org/apache/accumulo/examples/ExamplesIT.java 
b/src/test/java/org/apache/accumulo/examples/ExamplesIT.java
index 7240304..0087eb4 100644
--- a/src/test/java/org/apache/accumulo/examples/ExamplesIT.java
+++ b/src/test/java/org/apache/accumulo/examples/ExamplesIT.java
@@ -77,7 +77,6 @@
 import org.apache.accumulo.examples.mapreduce.TeraSortIngest;
 import org.apache.accumulo.examples.mapreduce.WordCount;
 import org.apache.accumulo.examples.mapreduce.bulk.BulkIngestExample;
-import org.apache.accumulo.examples.mapreduce.bulk.GenerateTestData;
 import org.apache.accumulo.examples.mapreduce.bulk.SetupTable;
 import org.apache.accumulo.examples.mapreduce.bulk.VerifyIngest;
 import org.apache.accumulo.examples.shard.ContinuousQuery;
@@ -403,32 +402,6 @@ public void testMaxMutationConstraint() throws Exception {
     }
   }
 
-  @Test
-  public void testBulkIngest() throws Exception {
-    // TODO Figure out a way to run M/R with Kerberos
-    assumeTrue(getAdminToken() instanceof PasswordToken);
-    String tableName = getUniqueNames(1)[0];
-    FileSystem fs = getFileSystem();
-    Path p = new Path(dir, "tmp");
-    if (fs.exists(p)) {
-      fs.delete(p, true);
-    }
-    goodExec(GenerateTestData.class, "--start-row", "0", "--count", "10000", 
"--output", dir + "/tmp/input/data");
-
-    List<String> commonArgs = new ArrayList<>(Arrays.asList(new String[] 
{"-c", getConnectionFile(), "--table", tableName}));
-
-    List<String> args = new ArrayList<>(commonArgs);
-    goodExec(SetupTable.class, args.toArray(new String[0]));
-
-    args = new ArrayList<>(commonArgs);
-    args.addAll(Arrays.asList(new String[] {"--inputDir", dir + "/tmp/input", 
"--workDir", dir + "/tmp"}));
-    goodExec(BulkIngestExample.class, args.toArray(new String[0]));
-
-    args = new ArrayList<>(commonArgs);
-    args.addAll(Arrays.asList(new String[] {"--start-row", "0", "--count", 
"10000"}));
-    goodExec(VerifyIngest.class, args.toArray(new String[0]));
-  }
-
   @Test
   public void testTeraSortAndRead() throws Exception {
     // TODO Figure out a way to run M/R with Kerberos


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to