This is an automated email from the ASF dual-hosted git repository. mwalch pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/accumulo-examples.git
The following commit(s) were added to refs/heads/master by this push: new 8c3264c Created Accumulo/Spark example (#39) 8c3264c is described below commit 8c3264ce46500ab328297a0122e9ede669980938 Author: Mike Walch <mwa...@apache.org> AuthorDate: Thu Mar 28 12:22:23 2019 -0400 Created Accumulo/Spark example (#39) --- README.md | 2 + spark/.gitignore | 6 + spark/README.md | 45 ++++++ spark/pom.xml | 117 +++++++++++++++ spark/run.sh | 28 ++++ .../java/org/apache/accumulo/spark/CopyPlus5K.java | 157 +++++++++++++++++++++ 6 files changed, 355 insertions(+) diff --git a/README.md b/README.md index 77c91bc..0be1400 100644 --- a/README.md +++ b/README.md @@ -83,6 +83,7 @@ Each example below highlights a feature of Apache Accumulo. | [rowhash] | Using MapReduce to read a table and write to a new column in the same table. | | [sample] | Building and using sample data in Accumulo. | | [shard] | Using the intersecting iterator with a term index partitioned by document. | +| [spark] | Using Accumulo as input and output for Apache Spark jobs | | [tabletofile] | Using MapReduce to read a table and write one of its columns to a file in HDFS. | | [terasort] | Generating random data and sorting it using Accumulo. | | [uniquecols] | Use MapReduce to count unique columns in Accumulo | @@ -120,6 +121,7 @@ This repository can be used to test Accumulo release candidates. See [rowhash]: docs/rowhash.md [sample]: docs/sample.md [shard]: docs/shard.md +[spark]: spark/README.md [tabletofile]: docs/tabletofile.md [terasort]: docs/terasort.md [uniquecols]: docs/uniquecols.md diff --git a/spark/.gitignore b/spark/.gitignore new file mode 100644 index 0000000..f534230 --- /dev/null +++ b/spark/.gitignore @@ -0,0 +1,6 @@ +/.classpath +/.project +/.settings/ +/target/ +/*.iml +/.idea diff --git a/spark/README.md b/spark/README.md new file mode 100644 index 0000000..af19029 --- /dev/null +++ b/spark/README.md @@ -0,0 +1,45 @@ +<!-- +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> +# Apache Accumulo Spark Example + +## Requirements + +* Accumulo 2.0+ +* Hadoop YARN installed & `HADOOP_CONF_DIR` set in environment +* Spark installed & `SPARK_HOME` set in environment + +## Spark example + +The [CopyPlus5K] example will create an Accumulo table called `spark_example_input` +and write 100 key/value entries into Accumulo with the values `0..99`. It then launches +a Spark application that does following: + +* Read data from `spark_example_input` table using `AccumuloInputFormat` +* Add 5000 to each value +* Write the data to a new Accumulo table (called `spark_example_output`) using one of + two methods. + 1. **Bulk import** - Write data to an RFile in HDFS using `AccumuloFileOutputFormat` and + bulk import to Accumulo table + 2. **Batchwriter** - Creates a `BatchWriter` in Spark code to write to the table. + +This application can be run using the command: + + ./run.sh batch /path/to/accumulo-client.properties + +Change `batch` to `bulk` to use Bulk import method. + +[CopyPlus5K]: src/main/java/org/apache/accumulo/spark/CopyPlus5K.java diff --git a/spark/pom.xml b/spark/pom.xml new file mode 100644 index 0000000..67f5de2 --- /dev/null +++ b/spark/pom.xml @@ -0,0 +1,117 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache</groupId> + <artifactId>apache</artifactId> + <version>21</version> + </parent> + <groupId>org.apache.accumulo</groupId> + <artifactId>accumulo-spark</artifactId> + <version>2.0.0-SNAPSHOT</version> + <name>Apache Accumulo Spark Example</name> + <description>Example Spark Application for Apache Accumulo</description> + <properties> + <accumulo.version>2.0.0-SNAPSHOT</accumulo.version> + <hadoop.version>3.2.0</hadoop.version> + <maven.compiler.source>1.8</maven.compiler.source> + <maven.compiler.target>1.8</maven.compiler.target> + <zookeeper.version>3.4.13</zookeeper.version> + </properties> + <dependencyManagement> + <dependencies> + <dependency> + <groupId>org.apache.zookeeper</groupId> + <artifactId>zookeeper</artifactId> + <version>${zookeeper.version}</version> + </dependency> + </dependencies> + </dependencyManagement> + <dependencies> + <dependency> + <groupId>org.apache.accumulo</groupId> + <artifactId>accumulo-core</artifactId> + <version>${accumulo.version}</version> + </dependency> + <dependency> + <groupId>org.apache.accumulo</groupId> + <artifactId>accumulo-hadoop-mapreduce</artifactId> + <version>${accumulo.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client-api</artifactId> + <version>${hadoop.version}</version> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_2.11</artifactId> + <version>2.4.0</version> + </dependency> + </dependencies> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + </plugin> + </plugins> + </build> + <profiles> + <profile> + <id>create-shade-jar</id> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <executions> + <execution> + <id>spark-shade-jar</id> + <goals> + <goal>shade</goal> + </goals> + <phase>package</phase> + <configuration> + <finalName>${project.artifactId}-shaded</finalName> + <shadedArtifactAttached>true</shadedArtifactAttached> + <shadedClassifierName>shaded</shadedClassifierName> + <artifactSet> + <excludes> + <exclude>org.apache.accumulo:accumulo-native</exclude> + <exclude>org.apache.hadoop:*</exclude> + <exclude>org.apache.spark:*</exclude> + </excludes> + </artifactSet> + <relocations> + <relocation> + <!-- Required as Accumulo uses a different version than Hadoop --> + <pattern>com.google.common</pattern> + <shadedPattern>shaded.com.google.common</shadedPattern> + </relocation> + </relocations> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + </profile> + </profiles> +</project> diff --git a/spark/run.sh b/spark/run.sh new file mode 100755 index 0000000..e1ab9c0 --- /dev/null +++ b/spark/run.sh @@ -0,0 +1,28 @@ +#!/bin/bash + +if [[ -z "$1" || -z "$2" ]]; then + echo "Usage: ./run.sh [bulk|batch] /path/to/accumulo-client.properties" + exit 1 +fi + +JAR=./target/accumulo-spark-shaded.jar +if [[ ! -f $JAR ]]; then + mvn clean package -P create-shade-jar +fi + +if [[ -z "$SPARK_HOME" ]]; then + echo "SPARK_HOME must be set!" + exit 1 +fi + +if [[ -z "$HADOOP_CONF_DIR" ]]; then + echo "HADOOP_CONF_DIR must be set!" + exit 1 +fi + +"$SPARK_HOME"/bin/spark-submit \ + --class org.apache.accumulo.spark.CopyPlus5K \ + --master yarn \ + --deploy-mode client \ + $JAR \ + $1 $2 diff --git a/spark/src/main/java/org/apache/accumulo/spark/CopyPlus5K.java b/spark/src/main/java/org/apache/accumulo/spark/CopyPlus5K.java new file mode 100644 index 0000000..4443a70 --- /dev/null +++ b/spark/src/main/java/org/apache/accumulo/spark/CopyPlus5K.java @@ -0,0 +1,157 @@ +package org.apache.accumulo.spark; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Properties; + +import org.apache.accumulo.core.client.Accumulo; +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.hadoop.mapreduce.AccumuloFileOutputFormat; +import org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormat; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.Job; +import org.apache.spark.Partitioner; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaSparkContext; + +public class CopyPlus5K { + + public static class AccumuloRangePartitioner extends Partitioner { + + private static final long serialVersionUID = 1L; + private List<String> splits; + + AccumuloRangePartitioner(String... listSplits) { + this.splits = Arrays.asList(listSplits); + } + + @Override + public int getPartition(Object o) { + int index = Collections.binarySearch(splits, ((Key)o).getRow().toString()); + index = index < 0 ? (index + 1) * -1 : index; + return index; + } + + @Override + public int numPartitions() { + return splits.size() + 1; + } + } + + private static void cleanupAndCreateTables(Properties props) throws Exception { + FileSystem hdfs = FileSystem.get(new Configuration()); + if (hdfs.exists(rootPath)) { + hdfs.delete(rootPath, true); + } + try (AccumuloClient client = Accumulo.newClient().from(props).build()) { + if (client.tableOperations().exists(inputTable)) { + client.tableOperations().delete(inputTable); + } + if (client.tableOperations().exists(outputTable)) { + client.tableOperations().delete(outputTable); + } + // Create tables + client.tableOperations().create(inputTable); + client.tableOperations().create(outputTable); + + // Write data to input table + try (BatchWriter bw = client.createBatchWriter(inputTable)) { + for (int i = 0; i < 100; i++) { + Mutation m = new Mutation(String.format("%03d", i)); + m.at().family("cf1").qualifier("cq1").put("" + i); + bw.addMutation(m); + } + } + } + } + + private static final String inputTable = "spark_example_input"; + private static final String outputTable = "spark_example_output"; + private static final Path rootPath = new Path("/spark_example/"); + + public static void main(String[] args) throws Exception { + + if ((!args[0].equals("batch") && !args[0].equals("bulk")) || args[1].isEmpty()) { + System.out.println("Usage: ./run.sh [batch|bulk] /path/to/accumulo-client.properties"); + System.exit(1); + } + + // Read client properties from file + final Properties props = Accumulo.newClientProperties().from(args[1]).build(); + + cleanupAndCreateTables(props); + + SparkConf conf = new SparkConf(); + conf.setAppName("CopyPlus5K"); + // KryoSerializer is needed for serializing Accumulo Key when partitioning data for bulk import + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + conf.registerKryoClasses(new Class[]{Key.class, Value.class, Properties.class}); + + JavaSparkContext sc = new JavaSparkContext(conf); + + Job job = Job.getInstance(); + + // Read input from Accumulo + AccumuloInputFormat.configure().clientProperties(props).table(inputTable).store(job); + JavaPairRDD<Key,Value> data = sc.newAPIHadoopRDD(job.getConfiguration(), + AccumuloInputFormat.class, Key.class, Value.class); + + // Add 5K to all values + JavaPairRDD<Key, Value> dataPlus5K = data.mapValues(v -> + new Value("" + (Integer.parseInt(v.toString()) + 5_000))); + + if (args[0].equals("batch")) { + // Write output using batch writer + dataPlus5K.foreachPartition(iter -> { + // Intentionally created an Accumulo client for each partition to avoid attempting to + // serialize it and send it to each remote process. + try (AccumuloClient client = Accumulo.newClient().from(props).build(); + BatchWriter bw = client.createBatchWriter(outputTable)) { + iter.forEachRemaining(kv -> { + Key key = kv._1; + Value val = kv._2; + Mutation m = new Mutation(key.getRow()); + m.at().family(key.getColumnFamily()).qualifier(key.getColumnQualifier()) + .visibility(key.getColumnVisibility()).timestamp(key.getTimestamp()).put(val); + try { + bw.addMutation(m); + } catch (MutationsRejectedException e) { + e.printStackTrace(); + } + }); + } + }); + } else if (args[0].equals("bulk")) { + // Write output using bulk import + + // Create HDFS directory for bulk import + FileSystem hdfs = FileSystem.get(new Configuration()); + hdfs.mkdirs(rootPath); + Path outputDir = new Path(rootPath.toString() + "/output"); + + // Write Spark output to HDFS + AccumuloFileOutputFormat.configure().outputPath(outputDir).store(job); + Partitioner partitioner = new AccumuloRangePartitioner("3", "7"); + JavaPairRDD<Key, Value> partData = dataPlus5K.repartitionAndSortWithinPartitions(partitioner); + partData.saveAsNewAPIHadoopFile(outputDir.toString(), Key.class, Value.class, + AccumuloFileOutputFormat.class); + + // Bulk import into Accumulo + try (AccumuloClient client = Accumulo.newClient().from(props).build()) { + client.tableOperations().importDirectory(outputDir.toString()).to(outputTable).load(); + } + } else { + System.out.println("Unknown method to write output: " + args[0]); + System.exit(1); + } + } +}