more organization and cleaning and learning.... next up Host and Proxy.
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/c69b6c05 Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/c69b6c05 Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/c69b6c05 Branch: refs/heads/TINKERPOP-1564 Commit: c69b6c059072f8835731768d59bdc1d35fbb2d2f Parents: d74ee61 Author: Marko A. Rodriguez <okramma...@gmail.com> Authored: Thu Jan 12 11:13:48 2017 -0700 Committer: Marko A. Rodriguez <okramma...@gmail.com> Committed: Mon Jan 23 14:22:53 2017 -0700 ---------------------------------------------------------------------- .../akka/process/actors/AkkaConfigFactory.java | 2 +- .../akka/process/actors/AkkaGraphActors.java | 25 ++++++-------------- .../akka/process/actors/MasterActor.java | 16 +++++++------ .../akka/process/actors/WorkerActor.java | 8 ++++--- .../src/main/resources/application.conf | 4 ++-- .../util/partitioner/GlobalPartitioner.java | 2 -- 6 files changed, 24 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/c69b6c05/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaConfigFactory.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaConfigFactory.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaConfigFactory.java index 7a7c958..ad6a3d6 100644 --- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaConfigFactory.java +++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaConfigFactory.java @@ -72,6 +72,6 @@ final class AkkaConfigFactory { static Address getWorkerActorDeployment(final Partition partition) { final String location = partition.location().isSiteLocalAddress() ? "127.0.0.1" : partition.location().getHostAddress().toString(); - return AddressFromURIString.parse("akka.tcp://traversal@" + location + ":2552"); + return AddressFromURIString.parse("akka.tcp://tinkerpop@" + location + ":2552"); } } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/c69b6c05/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaGraphActors.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaGraphActors.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaGraphActors.java index 0e80924..d967aed 100644 --- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaGraphActors.java +++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaGraphActors.java @@ -28,7 +28,6 @@ import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.ConfigurationUtils; import org.apache.tinkerpop.gremlin.process.actors.ActorProgram; import org.apache.tinkerpop.gremlin.process.actors.ActorsResult; -import org.apache.tinkerpop.gremlin.process.actors.Address; import org.apache.tinkerpop.gremlin.process.actors.GraphActors; import org.apache.tinkerpop.gremlin.process.actors.util.DefaultActorsResult; import org.apache.tinkerpop.gremlin.process.actors.util.GraphActorsHelper; @@ -36,8 +35,6 @@ import org.apache.tinkerpop.gremlin.structure.Graph; import org.apache.tinkerpop.gremlin.structure.util.StringFactory; import org.apache.tinkerpop.gremlin.util.config.SerializableConfiguration; -import java.net.InetAddress; -import java.net.UnknownHostException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; @@ -47,7 +44,6 @@ import java.util.concurrent.Future; public final class AkkaGraphActors<R> implements GraphActors<R> { private ActorProgram actorProgram; - private int workers = 1; private Configuration configuration; private boolean executed = false; @@ -71,7 +67,6 @@ public final class AkkaGraphActors<R> implements GraphActors<R> { @Override public GraphActors<R> workers(final int workers) { - this.workers = workers; this.configuration.setProperty(GRAPH_ACTORS_WORKERS, workers); return this; } @@ -88,20 +83,14 @@ public final class AkkaGraphActors<R> implements GraphActors<R> { throw new IllegalStateException("Can not execute twice"); this.executed = true; - final ActorSystem system = ActorSystem.create("traversal", AkkaConfigFactory.generateAkkaConfig(this.actorProgram)); + final ActorSystem system = ActorSystem.create("tinkerpop", AkkaConfigFactory.generateAkkaConfig(this.actorProgram)); final ActorsResult<R> result = new DefaultActorsResult<>(); - try { - final Configuration programConfiguration = new SerializableConfiguration(this.configuration); - ConfigurationUtils.copy(graph.configuration(), programConfiguration); - /////// - final akka.actor.Address masterAddress = AkkaConfigFactory.getMasterActorDeployment(); - new Address.Master(system.actorOf( - Props.create(MasterActor.class, programConfiguration, result).withDeploy(new Deploy(new RemoteScope(masterAddress))), - "master").path().toString(), - InetAddress.getByName(masterAddress.host().get())); - } catch (final UnknownHostException e) { - throw new IllegalStateException(e.getMessage(), e); - } + final Configuration finalConfiguration = new SerializableConfiguration(this.configuration); + ConfigurationUtils.copy(graph.configuration(), finalConfiguration); + /////// + final akka.actor.Address masterAddress = AkkaConfigFactory.getMasterActorDeployment(); + system.actorOf(Props.create(MasterActor.class, finalConfiguration, result).withDeploy(new Deploy(new RemoteScope(masterAddress))), "master"); + return CompletableFuture.supplyAsync(() -> { while (!system.isTerminated()) { http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/c69b6c05/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/MasterActor.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/MasterActor.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/MasterActor.java index 3231645..4fbfd94 100644 --- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/MasterActor.java +++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/MasterActor.java @@ -21,7 +21,6 @@ package org.apache.tinkerpop.gremlin.akka.process.actors; import akka.actor.AbstractActor; import akka.actor.ActorSelection; -import akka.actor.AddressFromURIString; import akka.actor.Deploy; import akka.actor.Props; import akka.dispatch.RequiresMessageQueue; @@ -32,6 +31,7 @@ import org.apache.tinkerpop.gremlin.process.actors.Actor; import org.apache.tinkerpop.gremlin.process.actors.ActorProgram; import org.apache.tinkerpop.gremlin.process.actors.ActorsResult; import org.apache.tinkerpop.gremlin.process.actors.Address; +import org.apache.tinkerpop.gremlin.process.actors.GraphActors; import org.apache.tinkerpop.gremlin.structure.Graph; import org.apache.tinkerpop.gremlin.structure.Partition; import org.apache.tinkerpop.gremlin.structure.Partitioner; @@ -60,7 +60,8 @@ public final class MasterActor extends AbstractActor implements RequiresMessageQ public MasterActor(final Configuration configuration, final ActorsResult<?> result) { final Graph graph = GraphFactory.open(configuration); final ActorProgram actorProgram = ActorProgram.createActorProgram(graph, configuration); - this.partitioner = new HashPartitioner(graph.partitioner(), 5); + final int workers = configuration.getInt(GraphActors.GRAPH_ACTORS_WORKERS, 1); + this.partitioner = workers == 1 ? graph.partitioner() : new HashPartitioner(graph.partitioner(), workers); this.result = result; try { this.master = new Address.Master(self().path().toString(), InetAddress.getLocalHost()); @@ -70,11 +71,12 @@ public final class MasterActor extends AbstractActor implements RequiresMessageQ this.workers = new ArrayList<>(); final List<Partition> partitions = partitioner.getPartitions(); for (final Partition partition : partitions) { - final String workerPathString = "worker-" + partition.id(); - this.workers.add(new Address.Worker(workerPathString, partition.location())); - context().actorOf(Props.create(WorkerActor.class, configuration, this.workers.size()-1, this.master) - .withDeploy(new Deploy(new RemoteScope(AkkaConfigFactory.getWorkerActorDeployment(partition)))), - workerPathString); + final Address.Worker workerAddress = new Address.Worker("worker-" + partition.id(), partition.location()); + this.workers.add(workerAddress); + context().actorOf( + Props.create(WorkerActor.class, configuration, partition.id(), this.master) + .withDeploy(new Deploy(new RemoteScope(AkkaConfigFactory.getWorkerActorDeployment(partition)))), + workerAddress.getId()); } this.masterProgram = actorProgram.createMasterProgram(this); receive(ReceiveBuilder.matchAny(this.masterProgram::execute).build()); http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/c69b6c05/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/WorkerActor.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/WorkerActor.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/WorkerActor.java index f90f081..02c4e4c 100644 --- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/WorkerActor.java +++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/WorkerActor.java @@ -27,6 +27,7 @@ import org.apache.commons.configuration.Configuration; import org.apache.tinkerpop.gremlin.process.actors.Actor; import org.apache.tinkerpop.gremlin.process.actors.ActorProgram; import org.apache.tinkerpop.gremlin.process.actors.Address; +import org.apache.tinkerpop.gremlin.process.actors.GraphActors; import org.apache.tinkerpop.gremlin.structure.Graph; import org.apache.tinkerpop.gremlin.structure.Partition; import org.apache.tinkerpop.gremlin.structure.Partitioner; @@ -51,11 +52,12 @@ public final class WorkerActor extends AbstractActor implements RequiresMessageQ private final List<Address.Worker> workers; private final Map<Address, ActorSelection> actors = new HashMap<>(); - public WorkerActor(final Configuration configuration, final int workerIndex, final Address.Master master) { + public WorkerActor(final Configuration configuration, final String partitionId, final Address.Master master) { final Graph graph = GraphFactory.open(configuration); final ActorProgram actorProgram = ActorProgram.createActorProgram(graph, configuration); - this.partitioner = new HashPartitioner(graph.partitioner(), 5); - this.localPartition = this.partitioner.getPartitions().get(workerIndex); + final int workers = configuration.getInt(GraphActors.GRAPH_ACTORS_WORKERS, 1); + this.partitioner = workers == 1 ? graph.partitioner() : new HashPartitioner(graph.partitioner(), workers); + this.localPartition = this.partitioner.getPartition(partitionId); this.self = new Address.Worker(this.createWorkerAddress(this.localPartition), this.localPartition.location()); this.master = master; this.workers = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/c69b6c05/akka-gremlin/src/main/resources/application.conf ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/main/resources/application.conf b/akka-gremlin/src/main/resources/application.conf index d722ee3..6d1915f 100644 --- a/akka-gremlin/src/main/resources/application.conf +++ b/akka-gremlin/src/main/resources/application.conf @@ -16,8 +16,8 @@ akka { } cluster { seed-nodes = [ - "akka.tcp://traversal@127.0.0.1:2552" - "akka.tcp://traversal@127.0.0.1:2552"] + "akka.tcp://tinkerpop@127.0.0.1:2552" + "akka.tcp://tinkerpop@127.0.0.1:2552"] auto-down-unreachable-after = 10s } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/c69b6c05/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/partitioner/GlobalPartitioner.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/partitioner/GlobalPartitioner.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/partitioner/GlobalPartitioner.java index af04dbe..73962d3 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/partitioner/GlobalPartitioner.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/partitioner/GlobalPartitioner.java @@ -33,7 +33,6 @@ import org.apache.tinkerpop.gremlin.structure.util.StringFactory; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.Collections; -import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -78,7 +77,6 @@ public final class GlobalPartitioner implements Partitioner { private class GlobalPartition implements Partition { private final GlobalPartitioner partitioner; - private final Map<String, Object> configuration = new HashMap<>(); private final String id; private final InetAddress location;