Repository: tinkerpop Updated Branches: refs/heads/tp33 a708cc3bd -> 3891777e4
TINKERPOP-1113 Added spark configuration options as concrete methods CTR Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/bd85e5fe Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/bd85e5fe Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/bd85e5fe Branch: refs/heads/tp33 Commit: bd85e5febee56434c4de4e7ab31e3444437a9f5e Parents: f36eb4f Author: Stephen Mallette <sp...@genoprime.com> Authored: Tue May 22 06:55:46 2018 -0400 Committer: Stephen Mallette <sp...@genoprime.com> Committed: Tue May 22 06:55:46 2018 -0400 ---------------------------------------------------------------------- CHANGELOG.asciidoc | 1 + .../process/computer/SparkGraphComputer.java | 90 +++++++++++++++++--- .../computer/SparkHadoopGraphProvider.java | 5 +- 3 files changed, 80 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/bd85e5fe/CHANGELOG.asciidoc ---------------------------------------------------------------------- diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 21fde2c..395bb55 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -26,6 +26,7 @@ image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima * Removed recursive handling of streaming results from Gremlin-Python driver to avoid max recursion depth errors. * Improved performance of `TraversalVertexProgram` and related infrastructure. * Fixed bug in `GroovyTranslator` that didn't properly handle empty `Map` objects. +* Added concrete configuration methods to `SparkGraphComputer` to make a more clear API for configuring it. [[release-3-2-9]] === TinkerPop 3.2.9 (Release Date: May 8, 2018) http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/bd85e5fe/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java index 00a2e46..4c896cd 100644 --- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java +++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java @@ -30,11 +30,10 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.spark.HashPartitioner; import org.apache.spark.Partitioner; -import org.apache.spark.SparkConf; -import org.apache.spark.SparkContext; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.launcher.SparkLauncher; +import org.apache.spark.serializer.Serializer; import org.apache.spark.storage.StorageLevel; import org.apache.tinkerpop.gremlin.hadoop.Constants; import org.apache.tinkerpop.gremlin.hadoop.process.computer.AbstractHadoopGraphComputer; @@ -79,7 +78,16 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ThreadFactory; +import static org.apache.tinkerpop.gremlin.hadoop.Constants.GREMLIN_SPARK_GRAPH_STORAGE_LEVEL; +import static org.apache.tinkerpop.gremlin.hadoop.Constants.GREMLIN_SPARK_PERSIST_CONTEXT; +import static org.apache.tinkerpop.gremlin.hadoop.Constants.GREMLIN_SPARK_PERSIST_STORAGE_LEVEL; +import static org.apache.tinkerpop.gremlin.hadoop.Constants.GREMLIN_SPARK_SKIP_GRAPH_CACHE; +import static org.apache.tinkerpop.gremlin.hadoop.Constants.GREMLIN_SPARK_SKIP_PARTITIONER; +import static org.apache.tinkerpop.gremlin.hadoop.Constants.SPARK_SERIALIZER; + /** + * {@link GraphComputer} implementation for Apache Spark. + * * @author Marko A. Rodriguez (http://markorodriguez.com) */ public final class SparkGraphComputer extends AbstractHadoopGraphComputer { @@ -107,8 +115,12 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer { ConfigurationUtils.copy(this.hadoopGraph.configuration(), this.sparkConfiguration); } + /** + * Sets the number of workers. If the {@code spark.master} configuration is configured with "local" then it will + * change that configuration to use the specified number of worker threads. + */ @Override - public GraphComputer workers(final int workers) { + public SparkGraphComputer workers(final int workers) { super.workers(workers); if (this.sparkConfiguration.containsKey(SparkLauncher.SPARK_MASTER) && this.sparkConfiguration.getString(SparkLauncher.SPARK_MASTER).startsWith("local")) { this.sparkConfiguration.setProperty(SparkLauncher.SPARK_MASTER, "local[" + this.workers + "]"); @@ -118,11 +130,61 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer { } @Override - public GraphComputer configure(final String key, final Object value) { + public SparkGraphComputer configure(final String key, final Object value) { this.sparkConfiguration.setProperty(key, value); return this; } + /** + * Sets the configuration option for {@code spark.master} which is the cluster manager to connect to which may be + * one of the <a href="https://spark.apache.org/docs/latest/submitting-applications.html#master-urls">allowed master URLs</a>. + */ + public SparkGraphComputer master(final String clusterManager) { + return configure(SparkLauncher.SPARK_MASTER, clusterManager); + } + + /** + * Determines if the Spark context should be left open preventing Spark from garbage collecting unreferenced RDDs. + */ + public SparkGraphComputer persistContext(final boolean persist) { + return configure(GREMLIN_SPARK_PERSIST_CONTEXT, persist); + } + + /** + * Specifies the method by which the {@link VertexProgram} created graph is persisted. By default, it is configured + * to use {@code StorageLevel#MEMORY_ONLY()} + */ + public SparkGraphComputer graphStorageLevel(final StorageLevel storageLevel) { + return configure(GREMLIN_SPARK_GRAPH_STORAGE_LEVEL, storageLevel.description()); + } + + public SparkGraphComputer persistStorageLevel(final StorageLevel storageLevel) { + return configure(GREMLIN_SPARK_PERSIST_STORAGE_LEVEL, storageLevel.description()); + } + + /** + * Determines if the graph RDD should be partitioned or not. By default, this value is {@code false}. + */ + public SparkGraphComputer skipPartitioner(final boolean skip) { + return configure(GREMLIN_SPARK_SKIP_PARTITIONER, skip); + } + + /** + * Determines if the graph RDD should be cached or not. If {@code true} then + * {@link #graphStorageLevel(StorageLevel)} is ignored. By default, this value is {@code false}. + */ + public SparkGraphComputer skipGraphCache(final boolean skip) { + return configure(GREMLIN_SPARK_SKIP_GRAPH_CACHE, skip); + } + + /** + * Specifies the {@code org.apache.spark.serializer.Serializer} implementation to use. By default, this value is + * set to {@link GryoSerializer}. + */ + public SparkGraphComputer serializer(final Class<? extends Serializer> serializer) { + return configure(SPARK_SERIALIZER, serializer.getCanonicalName()); + } + @Override public Future<ComputerResult> submit() { this.validateStatePriorToExecution(); @@ -135,8 +197,8 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer { final long startTime = System.currentTimeMillis(); // apache and hadoop configurations that are used throughout the graph computer computation final org.apache.commons.configuration.Configuration graphComputerConfiguration = new HadoopConfiguration(this.sparkConfiguration); - if (!graphComputerConfiguration.containsKey(Constants.SPARK_SERIALIZER)) - graphComputerConfiguration.setProperty(Constants.SPARK_SERIALIZER, GryoSerializer.class.getCanonicalName()); + if (!graphComputerConfiguration.containsKey(SPARK_SERIALIZER)) + graphComputerConfiguration.setProperty(SPARK_SERIALIZER, GryoSerializer.class.getCanonicalName()); graphComputerConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_WRITER_HAS_EDGES, this.persist.equals(GraphComputer.Persist.EDGES)); final Configuration hadoopConfiguration = ConfUtil.makeHadoopConfiguration(graphComputerConfiguration); final Storage fileSystemStorage = FileSystemStorage.open(hadoopConfiguration); @@ -144,8 +206,8 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer { final boolean inputFromSpark = PersistedInputRDD.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_READER, Object.class)); final boolean outputToHDFS = FileOutputFormat.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_WRITER, Object.class)); final boolean outputToSpark = PersistedOutputRDD.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_WRITER, Object.class)); - final boolean skipPartitioner = graphComputerConfiguration.getBoolean(Constants.GREMLIN_SPARK_SKIP_PARTITIONER, false); - final boolean skipPersist = graphComputerConfiguration.getBoolean(Constants.GREMLIN_SPARK_SKIP_GRAPH_CACHE, false); + final boolean skipPartitioner = graphComputerConfiguration.getBoolean(GREMLIN_SPARK_SKIP_PARTITIONER, false); + final boolean skipPersist = graphComputerConfiguration.getBoolean(GREMLIN_SPARK_SKIP_GRAPH_CACHE, false); if (inputFromHDFS) { String inputLocation = Constants .getSearchGraphLocation(hadoopConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION), @@ -230,7 +292,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer { assert loadedGraphRDD.partitioner().isPresent(); } else { assert skipPartitioner == !loadedGraphRDD.partitioner().isPresent(); // no easy way to test this with a test case - this.logger.debug("Partitioning has been skipped for the loaded graphRDD via " + Constants.GREMLIN_SPARK_SKIP_PARTITIONER); + this.logger.debug("Partitioning has been skipped for the loaded graphRDD via " + GREMLIN_SPARK_SKIP_PARTITIONER); } } // if the loaded graphRDD was already partitioned previous, then this coalesce/repartition will not take place @@ -242,7 +304,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer { } // persist the vertex program loaded graph as specified by configuration or else use default cache() which is MEMORY_ONLY if (!skipPersist && (!inputFromSpark || partitioned || filtered)) - loadedGraphRDD = loadedGraphRDD.persist(StorageLevel.fromString(hadoopConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_STORAGE_LEVEL, "MEMORY_ONLY"))); + loadedGraphRDD = loadedGraphRDD.persist(StorageLevel.fromString(hadoopConfiguration.get(GREMLIN_SPARK_GRAPH_STORAGE_LEVEL, "MEMORY_ONLY"))); // final graph with view (for persisting and/or mapReducing -- may be null and thus, possible to save space/time) JavaPairRDD<Object, VertexWritable> computedGraphRDD = null; @@ -323,7 +385,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer { }); // if there is only one MapReduce to execute, don't bother wasting the clock cycles. if (this.mapReducers.size() > 1) - mapReduceRDD = mapReduceRDD.persist(StorageLevel.fromString(hadoopConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_STORAGE_LEVEL, "MEMORY_ONLY"))); + mapReduceRDD = mapReduceRDD.persist(StorageLevel.fromString(hadoopConfiguration.get(GREMLIN_SPARK_GRAPH_STORAGE_LEVEL, "MEMORY_ONLY"))); } for (final MapReduce mapReduce : this.mapReducers) { @@ -370,11 +432,11 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer { // clear properties that should not be propagated in an OLAP chain graphComputerConfiguration.clearProperty(Constants.GREMLIN_HADOOP_GRAPH_FILTER); graphComputerConfiguration.clearProperty(Constants.GREMLIN_HADOOP_VERTEX_PROGRAM_INTERCEPTOR); - graphComputerConfiguration.clearProperty(Constants.GREMLIN_SPARK_SKIP_GRAPH_CACHE); - graphComputerConfiguration.clearProperty(Constants.GREMLIN_SPARK_SKIP_PARTITIONER); + graphComputerConfiguration.clearProperty(GREMLIN_SPARK_SKIP_GRAPH_CACHE); + graphComputerConfiguration.clearProperty(GREMLIN_SPARK_SKIP_PARTITIONER); return new DefaultComputerResult(InputOutputHelper.getOutputGraph(graphComputerConfiguration, this.resultGraph, this.persist), finalMemory.asImmutable()); } finally { - if (!graphComputerConfiguration.getBoolean(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, false)) + if (!graphComputerConfiguration.getBoolean(GREMLIN_SPARK_PERSIST_CONTEXT, false)) Spark.close(); } }); http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/bd85e5fe/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java index d4201b5..469c4b1 100644 --- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java +++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java @@ -18,6 +18,7 @@ */ package org.apache.tinkerpop.gremlin.spark.process.computer; +import org.apache.spark.launcher.SparkLauncher; import org.apache.tinkerpop.gremlin.GraphProvider; import org.apache.tinkerpop.gremlin.LoadGraphWith; import org.apache.tinkerpop.gremlin.groovy.util.SugarTestHelper; @@ -82,8 +83,8 @@ public class SparkHadoopGraphProvider extends HadoopGraphProvider { } config.put(Constants.GREMLIN_HADOOP_DEFAULT_GRAPH_COMPUTER, SparkGraphComputer.class.getCanonicalName()); - config.put("spark.master", "local[4]"); - config.put("spark.serializer", GryoSerializer.class.getCanonicalName()); + config.put(SparkLauncher.SPARK_MASTER, "local[4]"); + config.put(Constants.SPARK_SERIALIZER, GryoSerializer.class.getCanonicalName()); config.put("spark.kryo.registrationRequired", true); return config; }