I have Master/Worker serialization working fully now. Prior to this moment, the master was sent to an Akka server and then it executed its workers locally. Now the workers are distributed to where their physical Partition is. Furthermore, ActorProgams are fully serializable via apache.commons.Configuration. AND -- best of all -- the Traversal to execute in TraversalActorProgram, is represented as Bytecode. NOT as a serialized traversal like we have in TraversalVertexProgram. Good stuff. Next up -- how to configuration-wise specify a Partitioner.
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/a079f788 Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/a079f788 Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/a079f788 Branch: refs/heads/TINKERPOP-1564 Commit: a079f788fe9dde502332bf5c6388cde8b01cc701 Parents: 4319420 Author: Marko A. Rodriguez <okramma...@gmail.com> Authored: Thu Jan 12 09:04:55 2017 -0700 Committer: Marko A. Rodriguez <okramma...@gmail.com> Committed: Thu Jan 19 13:01:41 2017 -0700 ---------------------------------------------------------------------- .../akka/process/actors/AkkaConfigFactory.java | 14 ++- .../akka/process/actors/AkkaGraphActors.java | 15 ++- .../akka/process/actors/MasterActor.java | 18 +++- .../akka/process/actors/WorkerActor.java | 14 ++- .../akka/process/actors/io/GryoSerializer.java | 102 ------------------- .../process/actors/io/gryo/GryoSerializer.java | 102 +++++++++++++++++++ .../src/main/resources/application.conf | 4 +- .../akka/process/actors/AkkaActorsProvider.java | 57 ++++++++++- .../akka/process/actors/AkkaPlayTest.java | 10 +- .../actors/traversal/TraversalActorProgram.java | 24 ++++- .../traversal/message/BarrierAddMessage.java | 2 +- 11 files changed, 236 insertions(+), 126 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a079f788/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaConfigFactory.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaConfigFactory.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaConfigFactory.java index a85e25a..adbcf03 100644 --- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaConfigFactory.java +++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaConfigFactory.java @@ -19,14 +19,21 @@ package org.apache.tinkerpop.gremlin.akka.process.actors; +import akka.actor.Address; +import akka.actor.AddressFromURIString; +import akka.actor.Deploy; +import akka.actor.Props; +import akka.remote.RemoteScope; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import com.typesafe.config.ConfigValueFactory; -import org.apache.tinkerpop.gremlin.akka.process.actors.io.GryoSerializer; +import org.apache.tinkerpop.gremlin.akka.process.actors.io.gryo.GryoSerializer; import org.apache.tinkerpop.gremlin.process.actors.ActorProgram; +import java.net.InetAddress; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -59,4 +66,9 @@ final class AkkaConfigFactory { map(Class::getCanonicalName). collect(Collectors.toList()).toString())); } + + static Address getMasterActorDeployment() { + final List<String> seedNodes = ConfigFactory.defaultApplication().getStringList("akka.cluster.seed-nodes"); + return AddressFromURIString.parse(seedNodes.get(0)); + } } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a079f788/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaGraphActors.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaGraphActors.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaGraphActors.java index 2638bfa..9601320 100644 --- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaGraphActors.java +++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaGraphActors.java @@ -20,9 +20,12 @@ package org.apache.tinkerpop.gremlin.akka.process.actors; import akka.actor.ActorSystem; +import akka.actor.Deploy; import akka.actor.Props; +import akka.remote.RemoteScope; import org.apache.commons.configuration.BaseConfiguration; import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.ConfigurationUtils; import org.apache.tinkerpop.gremlin.process.actors.ActorProgram; import org.apache.tinkerpop.gremlin.process.actors.ActorsResult; import org.apache.tinkerpop.gremlin.process.actors.Address; @@ -30,9 +33,7 @@ import org.apache.tinkerpop.gremlin.process.actors.GraphActors; import org.apache.tinkerpop.gremlin.process.actors.util.DefaultActorsResult; import org.apache.tinkerpop.gremlin.process.actors.util.GraphActorsHelper; import org.apache.tinkerpop.gremlin.structure.Graph; -import org.apache.tinkerpop.gremlin.structure.Partitioner; import org.apache.tinkerpop.gremlin.structure.util.StringFactory; -import org.apache.tinkerpop.gremlin.structure.util.partitioner.HashPartitioner; import org.apache.tinkerpop.gremlin.util.config.SerializableConfiguration; import java.net.InetAddress; @@ -88,9 +89,15 @@ public final class AkkaGraphActors<R> implements GraphActors<R> { final ActorSystem system = ActorSystem.create("traversal", AkkaConfigFactory.generateAkkaConfig(this.actorProgram)); final ActorsResult<R> result = new DefaultActorsResult<>(); - final Partitioner partitioner = this.workers == 1 ? graph.partitioner() : new HashPartitioner(graph.partitioner(), this.workers); try { - new Address.Master(system.actorOf(Props.create(MasterActor.class, this.actorProgram, partitioner, result), "master").path().toString(), InetAddress.getLocalHost()); + final Configuration programConfiguration = new SerializableConfiguration(this.configuration); + this.actorProgram.storeState(programConfiguration); + ConfigurationUtils.copy(graph.configuration(), programConfiguration); + final akka.actor.Address masterAddress = AkkaConfigFactory.getMasterActorDeployment(); + new Address.Master(system.actorOf( + Props.create(MasterActor.class, programConfiguration, result).withDeploy(new Deploy(new RemoteScope(masterAddress))), + "master").path().toString(), + InetAddress.getByName(masterAddress.host().get())); } catch (final UnknownHostException e) { throw new IllegalStateException(e.getMessage(), e); } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a079f788/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/MasterActor.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/MasterActor.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/MasterActor.java index b9c30bf..2b4d6b5 100644 --- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/MasterActor.java +++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/MasterActor.java @@ -27,12 +27,16 @@ import akka.actor.Props; import akka.dispatch.RequiresMessageQueue; import akka.japi.pf.ReceiveBuilder; import akka.remote.RemoteScope; +import org.apache.commons.configuration.Configuration; import org.apache.tinkerpop.gremlin.process.actors.Actor; import org.apache.tinkerpop.gremlin.process.actors.ActorProgram; import org.apache.tinkerpop.gremlin.process.actors.ActorsResult; import org.apache.tinkerpop.gremlin.process.actors.Address; +import org.apache.tinkerpop.gremlin.structure.Graph; import org.apache.tinkerpop.gremlin.structure.Partition; import org.apache.tinkerpop.gremlin.structure.Partitioner; +import org.apache.tinkerpop.gremlin.structure.util.GraphFactory; +import org.apache.tinkerpop.gremlin.structure.util.partitioner.HashPartitioner; import java.net.InetAddress; import java.net.UnknownHostException; @@ -53,8 +57,10 @@ public final class MasterActor extends AbstractActor implements RequiresMessageQ private final ActorsResult<?> result; private final Partitioner partitioner; - public MasterActor(final ActorProgram program, final Partitioner partitioner, final ActorsResult<?> result) { - this.partitioner = partitioner; + public MasterActor(final Configuration configuration, final ActorsResult<?> result) { + final Graph graph = GraphFactory.open(configuration); + final ActorProgram actorProgram = ActorProgram.createActorProgram(graph, configuration); + this.partitioner = new HashPartitioner(graph.partitioner(), 5); this.result = result; try { this.master = new Address.Master(self().path().toString(), InetAddress.getLocalHost()); @@ -64,12 +70,14 @@ public final class MasterActor extends AbstractActor implements RequiresMessageQ this.workers = new ArrayList<>(); final List<Partition> partitions = partitioner.getPartitions(); for (final Partition partition : partitions) { - akka.actor.Address addr = AddressFromURIString.parse("akka.tcp://traversal@127.0.0.1:2552"); + akka.actor.Address addr = AkkaConfigFactory.getMasterActorDeployment(); final String workerPathString = "worker-" + partition.id(); this.workers.add(new Address.Worker(workerPathString, partition.location())); - context().actorOf(Props.create(WorkerActor.class, program, this.master, partition, partitioner).withDeploy(new Deploy(new RemoteScope(addr))), workerPathString); + context().actorOf(Props.create(WorkerActor.class, configuration, this.workers.size()-1, this.master) + .withDeploy(new Deploy(new RemoteScope(addr))), + workerPathString); } - this.masterProgram = program.createMasterProgram(this); + this.masterProgram = actorProgram.createMasterProgram(this); receive(ReceiveBuilder.matchAny(this.masterProgram::execute).build()); } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a079f788/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/WorkerActor.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/WorkerActor.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/WorkerActor.java index 7520ce4..73623f5 100644 --- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/WorkerActor.java +++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/WorkerActor.java @@ -23,11 +23,15 @@ import akka.actor.AbstractActor; import akka.actor.ActorSelection; import akka.dispatch.RequiresMessageQueue; import akka.japi.pf.ReceiveBuilder; +import org.apache.commons.configuration.Configuration; import org.apache.tinkerpop.gremlin.process.actors.Actor; import org.apache.tinkerpop.gremlin.process.actors.ActorProgram; import org.apache.tinkerpop.gremlin.process.actors.Address; +import org.apache.tinkerpop.gremlin.structure.Graph; import org.apache.tinkerpop.gremlin.structure.Partition; import org.apache.tinkerpop.gremlin.structure.Partitioner; +import org.apache.tinkerpop.gremlin.structure.util.GraphFactory; +import org.apache.tinkerpop.gremlin.structure.util.partitioner.HashPartitioner; import java.util.ArrayList; import java.util.HashMap; @@ -47,16 +51,18 @@ public final class WorkerActor extends AbstractActor implements RequiresMessageQ private final List<Address.Worker> workers; private final Map<Address, ActorSelection> actors = new HashMap<>(); - public WorkerActor(final ActorProgram program, final Address.Master master, final Partition localPartition, final Partitioner partitioner) { - this.localPartition = localPartition; - this.partitioner = partitioner; + public WorkerActor(final Configuration configuration, final int workerIndex, final Address.Master master) { + final Graph graph = GraphFactory.open(configuration); + final ActorProgram actorProgram = ActorProgram.createActorProgram(graph, configuration); + this.partitioner = new HashPartitioner(graph.partitioner(), 5); + this.localPartition = this.partitioner.getPartitions().get(workerIndex); this.self = new Address.Worker(this.createWorkerAddress(localPartition), localPartition.location()); this.master = master; this.workers = new ArrayList<>(); for (final Partition partition : partitioner.getPartitions()) { this.workers.add(new Address.Worker(this.createWorkerAddress(partition), partition.location())); } - this.workerProgram = program.createWorkerProgram(this); + this.workerProgram = actorProgram.createWorkerProgram(this); receive(ReceiveBuilder.matchAny(this.workerProgram::execute).build()); } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a079f788/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/io/GryoSerializer.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/io/GryoSerializer.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/io/GryoSerializer.java deleted file mode 100644 index c567497..0000000 --- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/io/GryoSerializer.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tinkerpop.gremlin.akka.process.actors.io; - -import akka.serialization.Serializer; -import org.apache.tinkerpop.gremlin.process.actors.traversal.message.BarrierAddMessage; -import org.apache.tinkerpop.gremlin.process.actors.traversal.message.BarrierDoneMessage; -import org.apache.tinkerpop.gremlin.process.actors.traversal.message.SideEffectAddMessage; -import org.apache.tinkerpop.gremlin.process.actors.traversal.message.SideEffectSetMessage; -import org.apache.tinkerpop.gremlin.process.actors.traversal.message.StartMessage; -import org.apache.tinkerpop.gremlin.process.actors.traversal.message.Terminate; -import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoMapper; -import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoPool; -import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoVersion; -import org.apache.tinkerpop.shaded.kryo.io.Input; -import org.apache.tinkerpop.shaded.kryo.io.Output; -import scala.Option; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; - -/** - * @author Marko A. Rodriguez (http://markorodriguez.com) - */ -public final class GryoSerializer implements Serializer { - - private final GryoPool gryoPool; - - public GryoSerializer() { - this.gryoPool = GryoPool.build(). - poolSize(1). - initializeMapper(builder -> - builder.referenceTracking(true). - registrationRequired(true). - version(GryoVersion.V3_0). - addCustom( - Terminate.class, - StartMessage.class, - BarrierAddMessage.class, - BarrierDoneMessage.class, - SideEffectSetMessage.class, - SideEffectAddMessage.class)).create(); - } - - public GryoMapper getGryoMapper() { - return this.gryoPool.getMapper(); - } - - @Override - public int identifier() { - return GryoVersion.V3_0.hashCode(); - } - - @Override - public byte[] toBinary(final Object object) { - final Output output = new Output(new ByteArrayOutputStream()); - this.gryoPool.writeWithKryo(kryo -> kryo.writeObject(output, object)); - output.flush(); - return output.getBuffer(); - } - - @Override - public boolean includeManifest() { - return true; - } - - @Override - public Object fromBinary(byte[] bytes, Option<Class<?>> option) { - return option.isEmpty() ? this.fromBinary(bytes) : this.fromBinary(bytes, option.get()); - } - - @Override - public Object fromBinary(byte[] bytes) { - final ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes); - final Input input = new Input(inputStream); - return this.gryoPool.readWithKryo(kryo -> kryo.readClassAndObject(input)); - } - - @Override - public Object fromBinary(byte[] bytes, Class<?> aClass) { - final Input input = new Input(); - input.setBuffer(bytes); - return this.gryoPool.readWithKryo(kryo -> kryo.readObject(input, aClass)); // todo: be smart about just reading object - } -} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a079f788/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/io/gryo/GryoSerializer.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/io/gryo/GryoSerializer.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/io/gryo/GryoSerializer.java new file mode 100644 index 0000000..a8ded2e --- /dev/null +++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/io/gryo/GryoSerializer.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.akka.process.actors.io.gryo; + +import akka.serialization.Serializer; +import org.apache.tinkerpop.gremlin.process.actors.Address; +import org.apache.tinkerpop.gremlin.process.actors.traversal.message.BarrierAddMessage; +import org.apache.tinkerpop.gremlin.process.actors.traversal.message.BarrierDoneMessage; +import org.apache.tinkerpop.gremlin.process.actors.traversal.message.SideEffectAddMessage; +import org.apache.tinkerpop.gremlin.process.actors.traversal.message.SideEffectSetMessage; +import org.apache.tinkerpop.gremlin.process.actors.traversal.message.StartMessage; +import org.apache.tinkerpop.gremlin.process.actors.traversal.message.Terminate; +import org.apache.tinkerpop.gremlin.process.actors.util.DefaultActorsResult; +import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoMapper; +import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoPool; +import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoVersion; +import org.apache.tinkerpop.shaded.kryo.io.Input; +import org.apache.tinkerpop.shaded.kryo.io.Output; +import scala.Option; + +import java.io.ByteArrayOutputStream; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public final class GryoSerializer implements Serializer { + + private final GryoPool gryoPool; + + public GryoSerializer() { + this.gryoPool = GryoPool.build(). + poolSize(10). + initializeMapper(builder -> + builder.referenceTracking(true). + registrationRequired(true). + version(GryoVersion.V3_0). + addCustom( + Terminate.class, + StartMessage.class, + BarrierAddMessage.class, + BarrierDoneMessage.class, + SideEffectSetMessage.class, + SideEffectAddMessage.class, + DefaultActorsResult.class, + Address.Master.class)).create(); + } + + public GryoMapper getGryoMapper() { + return this.gryoPool.getMapper(); + } + + @Override + public int identifier() { + return GryoVersion.V3_0.hashCode(); + } + + @Override + public boolean includeManifest() { + return false; + } + + @Override + public byte[] toBinary(final Object object) { + final Output output = new Output(new ByteArrayOutputStream()); + this.gryoPool.writeWithKryo(kryo -> kryo.writeClassAndObject(output, object)); + return output.getBuffer(); + } + + @Override + public Object fromBinary(final byte[] bytes) { + final Input input = new Input(); + input.setBuffer(bytes); + return this.gryoPool.readWithKryo(kryo -> kryo.readClassAndObject(input)); + } + + @Override + public Object fromBinary(final byte[] bytes, final Class<?> aClass) { + return fromBinary(bytes); + } + + @Override + public Object fromBinary(final byte[] bytes, final Option<Class<?>> option) { + return option.isEmpty() ? this.fromBinary(bytes) : this.fromBinary(bytes, option.get()); + } +} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a079f788/akka-gremlin/src/main/resources/application.conf ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/main/resources/application.conf b/akka-gremlin/src/main/resources/application.conf index cd8b190..188a821 100644 --- a/akka-gremlin/src/main/resources/application.conf +++ b/akka-gremlin/src/main/resources/application.conf @@ -4,7 +4,7 @@ akka { provider = remote serialize-messages = on serializers { - gryo = "org.apache.tinkerpop.gremlin.akka.process.actors.io.GryoSerializer" + gryo = "org.apache.tinkerpop.gremlin.akka.process.actors.io.gryo.GryoSerializer" } } remote { @@ -16,7 +16,7 @@ akka { } cluster { seed-nodes = [ - "akka.tcp://traversal@127.0.0.1:2551", + "akka.tcp://traversal@127.0.0.1:2552" "akka.tcp://traversal@127.0.0.1:2552"] auto-down-unreachable-after = 10s http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a079f788/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaActorsProvider.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaActorsProvider.java b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaActorsProvider.java index 4168445..bf52089 100644 --- a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaActorsProvider.java +++ b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaActorsProvider.java @@ -22,7 +22,7 @@ package org.apache.tinkerpop.gremlin.akka.process.actors; import org.apache.commons.configuration.Configuration; import org.apache.tinkerpop.gremlin.AbstractGraphProvider; import org.apache.tinkerpop.gremlin.LoadGraphWith; -import org.apache.tinkerpop.gremlin.akka.process.actors.AkkaGraphActors; +import org.apache.tinkerpop.gremlin.TestHelper; import org.apache.tinkerpop.gremlin.process.actors.GraphActors; import org.apache.tinkerpop.gremlin.process.traversal.TraversalInterruptionTest; import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource; @@ -39,6 +39,8 @@ import org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.Partit import org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.SubgraphStrategyProcessTest; import org.apache.tinkerpop.gremlin.structure.Graph; import org.apache.tinkerpop.gremlin.structure.VertexProperty; +import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONResourceAccess; +import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoResourceAccess; import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerEdge; import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerElement; import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph; @@ -50,6 +52,7 @@ import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerVertexProperty; import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Random; import java.util.Set; @@ -89,6 +92,32 @@ public class AkkaActorsProvider extends AbstractGraphProvider { add(TinkerVertexProperty.class); }}; + private static Map<String, String> PATHS = new HashMap<>(); + + static { + try { + final List<String> kryoResources = Arrays.asList( + "tinkerpop-modern.kryo", + "grateful-dead.kryo", + "tinkerpop-classic.kryo", + "tinkerpop-crew.kryo"); + for (final String fileName : kryoResources) { + PATHS.put(fileName, TestHelper.generateTempFileFromResource(GryoResourceAccess.class, fileName, "").getAbsolutePath().replace('\\', '/')); + } + + final List<String> graphsonResources = Arrays.asList( + "tinkerpop-modern-v2d0-typed.json", + "grateful-dead-v2d0-typed.json", + "tinkerpop-classic-v2d0-typed.json", + "tinkerpop-crew-v2d0-typed.json"); + for (final String fileName : graphsonResources) { + PATHS.put(fileName, TestHelper.generateTempFileFromResource(GraphSONResourceAccess.class, fileName, "").getAbsolutePath().replace('\\', '/')); + } + } catch (Exception e) { + e.printStackTrace(); + } + } + @Override public Map<String, Object> getBaseConfiguration(final String graphName, final Class<?> test, final String testMethodName, final LoadGraphWith.GraphData loadGraphWith) { @@ -101,14 +130,38 @@ public class AkkaActorsProvider extends AbstractGraphProvider { put(TinkerGraph.GREMLIN_TINKERGRAPH_EDGE_ID_MANAGER, idMaker); put(TinkerGraph.GREMLIN_TINKERGRAPH_VERTEX_PROPERTY_ID_MANAGER, idMaker); put("skipTest", SKIP_TESTS.contains(testMethodName) || SKIP_TESTS.contains(test.getCanonicalName())); + if (null != loadGraphWith) { + put(TinkerGraph.GREMLIN_TINKERGRAPH_GRAPH_LOCATION, loadGraphDataViaHadoopConfig(loadGraphWith)); + put(TinkerGraph.GREMLIN_TINKERGRAPH_GRAPH_FORMAT, "gryo"); + } if (loadGraphWith == LoadGraphWith.GraphData.CREW) put(TinkerGraph.GREMLIN_TINKERGRAPH_DEFAULT_VERTEX_PROPERTY_CARDINALITY, VertexProperty.Cardinality.list.name()); }}; } + public String loadGraphDataViaHadoopConfig(final LoadGraphWith.GraphData graphData) { + final String type = ".kryo"; + if (graphData.equals(LoadGraphWith.GraphData.GRATEFUL)) { + return PATHS.get("grateful-dead" + type); + } else if (graphData.equals(LoadGraphWith.GraphData.MODERN)) { + return PATHS.get("tinkerpop-modern" + type); + } else if (graphData.equals(LoadGraphWith.GraphData.CLASSIC)) { + return PATHS.get("tinkerpop-classic" + type); + } else if (graphData.equals(LoadGraphWith.GraphData.CREW)) { + return PATHS.get("tinkerpop-crew" + type); + } else { + throw new RuntimeException("Could not load graph with " + graphData); + } + } + + @Override + public void loadGraphData(final Graph graph, final LoadGraphWith loadGraphWith, final Class testClass, final String testName) { + + } + @Override public void clear(final Graph graph, final Configuration configuration) throws Exception { - if (graph != null) graph.close(); + // if (graph != null) graph.close(); } @Override http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a079f788/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaPlayTest.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaPlayTest.java b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaPlayTest.java index c95f336..e1c80bd 100644 --- a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaPlayTest.java +++ b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaPlayTest.java @@ -19,6 +19,8 @@ package org.apache.tinkerpop.gremlin.akka.process.actors; +import org.apache.commons.configuration.BaseConfiguration; +import org.apache.commons.configuration.Configuration; import org.apache.tinkerpop.gremlin.akka.process.actors.AkkaGraphActors; import org.apache.tinkerpop.gremlin.process.actors.GraphActors; import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource; @@ -41,8 +43,12 @@ public class AkkaPlayTest { @Test @Ignore public void testPlay1() throws Exception { - final Graph graph = TinkerGraph.open(); - graph.io(GryoIo.build()).readGraph("../data/tinkerpop-modern.kryo"); + final Configuration configuration = new BaseConfiguration(); + configuration.setProperty(Graph.GRAPH, TinkerGraph.class.getCanonicalName()); + configuration.setProperty(TinkerGraph.GREMLIN_TINKERGRAPH_GRAPH_LOCATION, "/Users/marko/software/tinkerpop/data/tinkerpop-modern.kryo"); + configuration.setProperty(TinkerGraph.GREMLIN_TINKERGRAPH_GRAPH_FORMAT,"gryo"); + final Graph graph = TinkerGraph.open(configuration); + //graph.io(GryoIo.build()).readGraph("../data/tinkerpop-modern.kryo"); GraphTraversalSource g = graph.traversal().withProcessor(GraphActors.open(AkkaGraphActors.class).workers(3)); // System.out.println(g.V().group().by("name").by(outE().values("weight").fold()).toList()); http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a079f788/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalActorProgram.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalActorProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalActorProgram.java index 22add26..d8fe9ef 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalActorProgram.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalActorProgram.java @@ -55,7 +55,7 @@ import java.util.Optional; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public final class TraversalActorProgram<R> implements ActorProgram, Serializable { +public final class TraversalActorProgram<R> implements ActorProgram { public static boolean DETACH = true; @@ -72,11 +72,13 @@ public final class TraversalActorProgram<R> implements ActorProgram, Serializabl private Traversal.Admin<?, R> traversal; public TraverserSet<R> result = new TraverserSet<>(); - private Configuration configuration; + + public TraversalActorProgram() { + + } public TraversalActorProgram(final Traversal.Admin<?, R> traversal) { this.traversal = traversal; - this.configuration = new SerializableConfiguration(configuration); final TraversalStrategies strategies = this.traversal.getStrategies().clone(); strategies.addStrategies(ActorVerificationStrategy.instance(), ReadOnlyStrategy.instance()); // TODO: make TinkerGraph/etc. strategies smart about actors @@ -105,6 +107,22 @@ public final class TraversalActorProgram<R> implements ActorProgram, Serializabl public void loadState(final Graph graph, final Configuration configuration) { final Bytecode bytecode = (Bytecode) configuration.getProperty(TRAVERSAL_ACTOR_PROGRAM_BYTECODE); this.traversal = (Traversal.Admin<?, R>) JavaTranslator.of(graph.traversal()).translate(bytecode); + final TraversalStrategies strategies = this.traversal.getStrategies().clone(); + strategies.addStrategies(ActorVerificationStrategy.instance(), ReadOnlyStrategy.instance()); + // TODO: make TinkerGraph/etc. strategies smart about actors + new ArrayList<>(strategies.toList()).stream(). + filter(s -> s instanceof TraversalStrategy.ProviderOptimizationStrategy). + map(TraversalStrategy::getClass). + forEach(strategies::removeStrategies); + strategies.removeStrategies( + ActorProgramStrategy.class, + LazyBarrierStrategy.class, + RepeatUnrollStrategy.class, + MatchPredicateStrategy.class, + InlineFilterStrategy.class, + PathRetractionStrategy.class); + this.traversal.setStrategies(strategies); + this.traversal.applyStrategies(); } @Override http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a079f788/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/BarrierAddMessage.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/BarrierAddMessage.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/BarrierAddMessage.java index ade6796..b17e83c 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/BarrierAddMessage.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/BarrierAddMessage.java @@ -28,7 +28,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier; public final class BarrierAddMessage { private Object barrier; - private String stepId; + private String stepId; private BarrierAddMessage() { // for serialization