came up with a much cleaner GiraphGraphComputer usage around workers.
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/a170ad97 Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/a170ad97 Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/a170ad97 Branch: refs/heads/TINKERPOP-1564 Commit: a170ad9700c5e6e0ec95379c255d6e752e1abbb5 Parents: d5a906b Author: Marko A. Rodriguez <okramma...@gmail.com> Authored: Mon Dec 19 14:31:29 2016 -0700 Committer: Marko A. Rodriguez <okramma...@gmail.com> Committed: Mon Jan 23 14:22:53 2017 -0700 ---------------------------------------------------------------------- .../process/computer/GiraphGraphComputer.java | 44 +++++++++----------- 1 file changed, 20 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a170ad97/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java ---------------------------------------------------------------------- diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java index db4d6da..b316220 100644 --- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java +++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java @@ -58,7 +58,6 @@ import org.apache.tinkerpop.gremlin.process.computer.MemoryComputeKey; import org.apache.tinkerpop.gremlin.process.computer.VertexProgram; import org.apache.tinkerpop.gremlin.process.computer.util.DefaultComputerResult; import org.apache.tinkerpop.gremlin.process.computer.util.MapMemory; -import org.apache.tinkerpop.gremlin.structure.Graph; import org.apache.tinkerpop.gremlin.structure.io.Storage; import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimServiceLoader; import org.apache.tinkerpop.gremlin.util.Gremlin; @@ -81,26 +80,14 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple protected GiraphConfiguration giraphConfiguration = new GiraphConfiguration(); private MapMemory memory = new MapMemory(); - private boolean useWorkerThreadsInConfiguration; private Set<String> vertexProgramConfigurationKeys = new HashSet<>(); public GiraphGraphComputer(final HadoopGraph hadoopGraph) { - this(hadoopGraph.configuration()); + this(hadoopGraph.configuration()); } private GiraphGraphComputer(final Configuration configuration) { super(configuration); - this.giraphConfiguration.setMasterComputeClass(GiraphMemory.class); - this.giraphConfiguration.setVertexClass(GiraphVertex.class); - this.giraphConfiguration.setComputationClass(GiraphComputation.class); - this.giraphConfiguration.setWorkerContextClass(GiraphWorkerContext.class); - this.giraphConfiguration.setOutEdgesClass(EmptyOutEdges.class); - this.giraphConfiguration.setClass(GiraphConstants.VERTEX_ID_CLASS.getKey(), ObjectWritable.class, ObjectWritable.class); - this.giraphConfiguration.setClass(GiraphConstants.VERTEX_VALUE_CLASS.getKey(), VertexWritable.class, VertexWritable.class); - this.giraphConfiguration.setBoolean(GiraphConstants.STATIC_GRAPH.getKey(), true); - this.giraphConfiguration.setVertexInputFormatClass(GiraphVertexInputFormat.class); - this.giraphConfiguration.setVertexOutputFormatClass(GiraphVertexOutputFormat.class); - this.useWorkerThreadsInConfiguration = this.giraphConfiguration.getInt(GiraphConstants.MAX_WORKERS, -666) != -666 || this.giraphConfiguration.getInt(GiraphConstants.NUM_COMPUTE_THREADS.getKey(), -666) != -666; } public static GiraphGraphComputer open(final org.apache.commons.configuration.Configuration configuration) { @@ -108,14 +95,6 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple } @Override - public Future<ComputerResult> submit(final Graph graph) { - final Configuration configuration = graph.configuration(); - this.configuration.copy(configuration); - configuration.getKeys().forEachRemaining(key -> this.giraphConfiguration.set(key, configuration.getProperty(key).toString())); - return this.submit(); - } - - @Override public GraphComputer program(final VertexProgram vertexProgram) { super.program(vertexProgram); this.memory.addVertexProgramMemoryComputeKeys(this.vertexProgram); @@ -129,6 +108,13 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple } @Override + public GraphComputer workers(final int workers) { + this.configuration.clearProperty(GiraphConstants.MAX_WORKERS); + this.configuration.clearProperty(GiraphConstants.NUM_COMPUTE_THREADS.getKey()); + return super.workers(workers); + } + + @Override public Future<ComputerResult> submit() { super.validateStatePriorToExecution(); return ComputerSubmissionHelper.runWithBackgroundThread(this::submitWithExecutor, "GiraphSubmitter"); @@ -137,7 +123,16 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple private Future<ComputerResult> submitWithExecutor(final Executor exec) { final long startTime = System.currentTimeMillis(); this.configuration.getKeys().forEachRemaining(key -> this.giraphConfiguration.set(key, this.configuration.getProperty(key).toString())); - this.useWorkerThreadsInConfiguration = this.giraphConfiguration.getInt(GiraphConstants.MAX_WORKERS, -666) != -666 || this.giraphConfiguration.getInt(GiraphConstants.NUM_COMPUTE_THREADS.getKey(), -666) != -666; + this.giraphConfiguration.setMasterComputeClass(GiraphMemory.class); + this.giraphConfiguration.setVertexClass(GiraphVertex.class); + this.giraphConfiguration.setComputationClass(GiraphComputation.class); + this.giraphConfiguration.setWorkerContextClass(GiraphWorkerContext.class); + this.giraphConfiguration.setOutEdgesClass(EmptyOutEdges.class); + this.giraphConfiguration.setClass(GiraphConstants.VERTEX_ID_CLASS.getKey(), ObjectWritable.class, ObjectWritable.class); + this.giraphConfiguration.setClass(GiraphConstants.VERTEX_VALUE_CLASS.getKey(), VertexWritable.class, VertexWritable.class); + this.giraphConfiguration.setBoolean(GiraphConstants.STATIC_GRAPH.getKey(), true); + this.giraphConfiguration.setVertexInputFormatClass(GiraphVertexInputFormat.class); + this.giraphConfiguration.setVertexOutputFormatClass(GiraphVertexOutputFormat.class); final Configuration apacheConfiguration = ConfUtil.makeApacheConfiguration(this.giraphConfiguration); ConfigurationUtils.copy(this.configuration, apacheConfiguration); return CompletableFuture.<ComputerResult>supplyAsync(() -> { @@ -181,7 +176,8 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple if (!this.vertexProgram.getMessageCombiner().isPresent()) this.giraphConfiguration.unset(GiraphConstants.MESSAGE_COMBINER_CLASS.getKey()); // split required workers across system (open map slots + max threads per machine = total amount of TinkerPop workers) - if (!this.useWorkerThreadsInConfiguration) { + if (!(this.giraphConfiguration.getInt(GiraphConstants.MAX_WORKERS, -666) != -666 || + this.giraphConfiguration.getInt(GiraphConstants.NUM_COMPUTE_THREADS.getKey(), -666) != -666)) { final Cluster cluster = new Cluster(GiraphGraphComputer.this.giraphConfiguration); int totalMappers = cluster.getClusterStatus().getMapSlotCapacity() - 1; // 1 is needed for master cluster.close();