fixed up the MultiScope test to use MODERN so that it works with Giraph and Spark. Will back tweak to tp31/.
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/f1aed80b Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/f1aed80b Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/f1aed80b Branch: refs/heads/master Commit: f1aed80b056c2244ce8b23ab077e8b10ee6939d9 Parents: 5ac61b7 3b8c628 Author: Marko A. Rodriguez <okramma...@gmail.com> Authored: Tue Jun 27 09:50:15 2017 -0600 Committer: Marko A. Rodriguez <okramma...@gmail.com> Committed: Tue Jun 27 09:50:15 2017 -0600 ---------------------------------------------------------------------- CHANGELOG.asciidoc | 2 + .../process/computer/GraphComputerTest.java | 75 ++++++++++++++++++-- .../process/computer/TinkerMessageBoard.java | 4 +- .../process/computer/TinkerMessenger.java | 22 +++--- 4 files changed, 88 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f1aed80b/CHANGELOG.asciidoc ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f1aed80b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java ---------------------------------------------------------------------- diff --cc gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java index 5c66673,02ac5d4..e4b40e8 --- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java +++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java @@@ -24,35 -22,18 +24,32 @@@ import org.apache.commons.configuration import org.apache.tinkerpop.gremlin.ExceptionCoverage; import org.apache.tinkerpop.gremlin.LoadGraphWith; import org.apache.tinkerpop.gremlin.process.AbstractGremlinProcessTest; +import org.apache.tinkerpop.gremlin.process.computer.clustering.peerpressure.PeerPressureVertexProgram; +import org.apache.tinkerpop.gremlin.process.computer.ranking.pagerank.PageRankVertexProgram; +import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram; +import org.apache.tinkerpop.gremlin.process.computer.util.AbstractVertexProgramBuilder; import org.apache.tinkerpop.gremlin.process.computer.util.StaticMapReduce; import org.apache.tinkerpop.gremlin.process.computer.util.StaticVertexProgram; +import org.apache.tinkerpop.gremlin.process.traversal.Operator; +import org.apache.tinkerpop.gremlin.process.traversal.P; - import org.apache.tinkerpop.gremlin.process.traversal.Path; +import org.apache.tinkerpop.gremlin.process.traversal.Traversal; +import org.apache.tinkerpop.gremlin.process.traversal.Traverser; import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__; +import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyPath; +import org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.VerificationException; +import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement; +import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet; import org.apache.tinkerpop.gremlin.structure.Direction; -import org.apache.tinkerpop.gremlin.structure.Element; +import org.apache.tinkerpop.gremlin.structure.Edge; import org.apache.tinkerpop.gremlin.structure.Graph; +import org.apache.tinkerpop.gremlin.structure.Property; import org.apache.tinkerpop.gremlin.structure.Vertex; import org.apache.tinkerpop.gremlin.structure.VertexProperty; import org.apache.tinkerpop.gremlin.structure.util.StringFactory; import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils; +import org.javatuples.Pair; - import org.junit.Ignore; import org.junit.Test; - import java.util.AbstractMap; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@@ -70,15 -50,11 +68,14 @@@ import java.util.concurrent.Future import static org.apache.tinkerpop.gremlin.LoadGraphWith.GraphData.GRATEFUL; import static org.apache.tinkerpop.gremlin.LoadGraphWith.GraphData.MODERN; +import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.outE; - import static org.apache.tinkerpop.gremlin.structure.T.id; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.junit.Assume.assumeNoException; + + /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ @@@ -1588,567 -1468,92 +1585,635 @@@ public class GraphComputerTest extends public void storeState(final Configuration configuration) { VertexProgram.super.storeState(configuration); } + } + + ///////////////////////////////////////////// + @Test ++ @LoadGraphWith(MODERN) + public void shouldSupportMultipleScopes() throws ExecutionException, InterruptedException { - Vertex a = graph.addVertex("a"); - Vertex b = graph.addVertex("b"); - Vertex c = graph.addVertex("c"); - a.addEdge("edge", b); - b.addEdge("edge", c); - - // Simple graph: - // a -> b -> c - - // Execute a traversal program that sends an incoming message of "2" and an outgoing message of "1" from "b" - // then each vertex sums any received messages - ComputerResult result = graph.compute().program(new MultiScopeVertexProgram()).submit().get(); - - // We expect the results to be {a=2, b=0, c=1}. Instead it is {a=3, b=0, c=3} - assertEquals((Long) result.graph().traversal().V().hasLabel("a").next().property(MultiScopeVertexProgram.MEMORY_KEY).value(),Long.valueOf(2L)); - assertEquals((Long) result.graph().traversal().V().hasLabel("b").next().property(MultiScopeVertexProgram.MEMORY_KEY).value(),Long.valueOf(0L)); - assertEquals((Long) result.graph().traversal().V().hasLabel("c").next().property(MultiScopeVertexProgram.MEMORY_KEY).value(),Long.valueOf(1L)); ++ final ComputerResult result = graph.compute().program(new MultiScopeVertexProgram()).submit().get(); ++ assertEquals(result.graph().traversal().V().has("name", "josh").next().property(MultiScopeVertexProgram.MEMORY_KEY).value(), 0L); ++ assertEquals(result.graph().traversal().V().has("name", "lop").next().property(MultiScopeVertexProgram.MEMORY_KEY).value(), 1L); ++ assertEquals(result.graph().traversal().V().has("name", "ripple").next().property(MultiScopeVertexProgram.MEMORY_KEY).value(), 1L); ++ assertEquals(result.graph().traversal().V().has("name", "marko").next().property(MultiScopeVertexProgram.MEMORY_KEY).value(), 2L); + } + - public static class MultiScopeVertexProgram implements VertexProgram<Long> { ++ public static class MultiScopeVertexProgram extends StaticVertexProgram<Long> { + + private final MessageScope.Local<Long> countMessageScopeIn = MessageScope.Local.of(__::inE); + private final MessageScope.Local<Long> countMessageScopeOut = MessageScope.Local.of(__::outE); + + private static final String MEMORY_KEY = "count"; + - private static final Set<String> COMPUTE_KEYS = Collections.singleton(MEMORY_KEY); + + @Override - public void setup(final Memory memory) {} ++ public void setup(final Memory memory) { ++ } + + @Override + public GraphComputer.Persist getPreferredPersist() { + return GraphComputer.Persist.VERTEX_PROPERTIES; + } + + @Override - public Set<String> getElementComputeKeys() { - return COMPUTE_KEYS; ++ public Set<VertexComputeKey> getVertexComputeKeys() { ++ return Collections.singleton(VertexComputeKey.of(MEMORY_KEY, false)); + } + + @Override + public Set<MessageScope> getMessageScopes(final Memory memory) { + HashSet<MessageScope> scopes = new HashSet<>(); + scopes.add(countMessageScopeIn); + scopes.add(countMessageScopeOut); + return scopes; + } + + @Override + public void execute(Vertex vertex, Messenger<Long> messenger, Memory memory) { + switch (memory.getIteration()) { + case 0: - if (vertex.label().equals("b")) { ++ if (vertex.value("name").equals("josh")) { + messenger.sendMessage(this.countMessageScopeIn, 2L); + messenger.sendMessage(this.countMessageScopeOut, 1L); + } + break; + case 1: + long edgeCount = IteratorUtils.reduce(messenger.receiveMessages(), 0L, (a, b) -> a + b); + vertex.property(MEMORY_KEY, edgeCount); + break; + } + } + + @Override + public boolean terminate(final Memory memory) { + return memory.getIteration() == 1; + } + + @Override + public GraphComputer.ResultGraph getPreferredResultGraph() { + return GraphComputer.ResultGraph.NEW; + } - @Override - public MultiScopeVertexProgram clone() { + } + + ///////////////////////////////////////////// + + @Test + @LoadGraphWith(MODERN) + public void shouldSupportGraphFilter() throws Exception { + // if the graph computer does not support graph filter, then make sure its exception handling is correct + if (!graphProvider.getGraphComputer(graph).features().supportsGraphFilter()) { try { - return (MultiScopeVertexProgram) super.clone(); - } catch (final CloneNotSupportedException e) { - throw new RuntimeException(e); + graphProvider.getGraphComputer(graph).vertices(__.hasLabel("software")); + fail("Should throw an unsupported operation exception"); + } catch (final UnsupportedOperationException e) { + assertEquals(GraphComputer.Exceptions.graphFilterNotSupported().getMessage(), e.getMessage()); + } + try { + graphProvider.getGraphComputer(graph).edges(__.<Vertex>outE().limit(10)); + fail("Should throw an unsupported operation exception"); + } catch (final UnsupportedOperationException e) { + assertEquals(GraphComputer.Exceptions.graphFilterNotSupported().getMessage(), e.getMessage()); + } + return; + } + /// VERTEX PROGRAM + graphProvider.getGraphComputer(graph).vertices(__.hasLabel("software")).program(new VertexProgramM(VertexProgramM.SOFTWARE_ONLY)).submit().get(); + graphProvider.getGraphComputer(graph).vertices(__.hasLabel("person")).program(new VertexProgramM(VertexProgramM.PEOPLE_ONLY)).submit().get(); + graphProvider.getGraphComputer(graph).edges(__.bothE("knows")).program(new VertexProgramM(VertexProgramM.KNOWS_ONLY)).submit().get(); + graphProvider.getGraphComputer(graph).vertices(__.hasLabel("person")).edges(__.bothE("knows")).program(new VertexProgramM(VertexProgramM.PEOPLE_KNOWS_ONLY)).submit().get(); + graphProvider.getGraphComputer(graph).vertices(__.hasLabel("person")).edges(__.<Vertex>bothE("knows").has("weight", P.gt(0.5f))).program(new VertexProgramM(VertexProgramM.PEOPLE_KNOWS_WELL_ONLY)).submit().get(); + graphProvider.getGraphComputer(graph).edges(__.<Vertex>bothE().limit(0)).program(new VertexProgramM(VertexProgramM.VERTICES_ONLY)).submit().get(); + graphProvider.getGraphComputer(graph).edges(__.<Vertex>outE().limit(1)).program(new VertexProgramM(VertexProgramM.ONE_OUT_EDGE_ONLY)).submit().get(); + graphProvider.getGraphComputer(graph).edges(outE()).program(new VertexProgramM(VertexProgramM.OUT_EDGES_ONLY)).submit().get(); + + /// VERTEX PROGRAM + MAP REDUCE + graphProvider.getGraphComputer(graph).vertices(__.hasLabel("software")).program(new VertexProgramM(VertexProgramM.SOFTWARE_ONLY)).mapReduce(new MapReduceJ(VertexProgramM.SOFTWARE_ONLY)).submit().get(); + graphProvider.getGraphComputer(graph).vertices(__.hasLabel("person")).program(new VertexProgramM(VertexProgramM.PEOPLE_ONLY)).mapReduce(new MapReduceJ(VertexProgramM.PEOPLE_ONLY)).submit().get(); + graphProvider.getGraphComputer(graph).edges(__.bothE("knows")).program(new VertexProgramM(VertexProgramM.KNOWS_ONLY)).mapReduce(new MapReduceJ(VertexProgramM.KNOWS_ONLY)).submit().get(); + graphProvider.getGraphComputer(graph).vertices(__.hasLabel("person")).edges(__.bothE("knows")).program(new VertexProgramM(VertexProgramM.PEOPLE_KNOWS_ONLY)).mapReduce(new MapReduceJ(VertexProgramM.PEOPLE_KNOWS_ONLY)).submit().get(); + graphProvider.getGraphComputer(graph).vertices(__.hasLabel("person")).edges(__.<Vertex>bothE("knows").has("weight", P.gt(0.5f))).program(new VertexProgramM(VertexProgramM.PEOPLE_KNOWS_WELL_ONLY)).mapReduce(new MapReduceJ(VertexProgramM.PEOPLE_KNOWS_WELL_ONLY)).submit().get(); + graphProvider.getGraphComputer(graph).edges(__.<Vertex>bothE().limit(0)).program(new VertexProgramM(VertexProgramM.VERTICES_ONLY)).mapReduce(new MapReduceJ(VertexProgramM.VERTICES_ONLY)).submit().get(); + graphProvider.getGraphComputer(graph).edges(__.<Vertex>outE().limit(1)).program(new VertexProgramM(VertexProgramM.ONE_OUT_EDGE_ONLY)).mapReduce(new MapReduceJ(VertexProgramM.ONE_OUT_EDGE_ONLY)).submit().get(); + graphProvider.getGraphComputer(graph).edges(outE()).program(new VertexProgramM(VertexProgramM.OUT_EDGES_ONLY)).mapReduce(new MapReduceJ(VertexProgramM.OUT_EDGES_ONLY)).submit().get(); + + /// MAP REDUCE ONLY + graphProvider.getGraphComputer(graph).vertices(__.hasLabel("software")).mapReduce(new MapReduceJ(VertexProgramM.SOFTWARE_ONLY)).submit().get(); + graphProvider.getGraphComputer(graph).vertices(__.hasLabel("person")).mapReduce(new MapReduceJ(VertexProgramM.PEOPLE_ONLY)).submit().get(); + graphProvider.getGraphComputer(graph).edges(__.bothE("knows")).mapReduce(new MapReduceJ(VertexProgramM.KNOWS_ONLY)).submit().get(); + graphProvider.getGraphComputer(graph).vertices(__.hasLabel("person")).edges(__.bothE("knows")).mapReduce(new MapReduceJ(VertexProgramM.PEOPLE_KNOWS_ONLY)).submit().get(); + graphProvider.getGraphComputer(graph).vertices(__.hasLabel("person")).edges(__.<Vertex>bothE("knows").has("weight", P.gt(0.5f))).mapReduce(new MapReduceJ(VertexProgramM.PEOPLE_KNOWS_WELL_ONLY)).submit().get(); + graphProvider.getGraphComputer(graph).edges(__.<Vertex>bothE().limit(0)).mapReduce(new MapReduceJ(VertexProgramM.VERTICES_ONLY)).submit().get(); + graphProvider.getGraphComputer(graph).edges(__.<Vertex>outE().limit(1)).mapReduce(new MapReduceJ(VertexProgramM.ONE_OUT_EDGE_ONLY)).submit().get(); + graphProvider.getGraphComputer(graph).edges(outE()).mapReduce(new MapReduceJ(VertexProgramM.OUT_EDGES_ONLY)).submit().get(); + + // EXCEPTION HANDLING + try { + graphProvider.getGraphComputer(graph).vertices(__.out()); + fail(); + } catch (final IllegalArgumentException e) { + assertEquals(e.getMessage(), GraphComputer.Exceptions.vertexFilterAccessesIncidentEdges(__.out()).getMessage()); + } + try { + graphProvider.getGraphComputer(graph).edges(__.<Vertex>out().outE()); + fail(); + } catch (final IllegalArgumentException e) { + assertEquals(e.getMessage(), GraphComputer.Exceptions.edgeFilterAccessesAdjacentVertices(__.<Vertex>out().outE()).getMessage()); + } + } + + public static class VertexProgramM implements VertexProgram { + + public static final String SOFTWARE_ONLY = "softwareOnly"; + public static final String PEOPLE_ONLY = "peopleOnly"; + public static final String KNOWS_ONLY = "knowsOnly"; + public static final String PEOPLE_KNOWS_ONLY = "peopleKnowsOnly"; + public static final String PEOPLE_KNOWS_WELL_ONLY = "peopleKnowsWellOnly"; + public static final String VERTICES_ONLY = "verticesOnly"; + public static final String ONE_OUT_EDGE_ONLY = "oneOutEdgeOnly"; + public static final String OUT_EDGES_ONLY = "outEdgesOnly"; + + private String state; + + public VertexProgramM() { + + } + + public VertexProgramM(final String state) { + this.state = state; + } + + @Override + public void setup(final Memory memory) { + + } + + @Override + public void execute(final Vertex vertex, final Messenger messenger, final Memory memory) { + switch (this.state) { + case SOFTWARE_ONLY: { + assertEquals("software", vertex.label()); + assertFalse(vertex.edges(Direction.OUT).hasNext()); + assertTrue(vertex.edges(Direction.IN).hasNext()); + assertTrue(vertex.edges(Direction.IN, "created").hasNext()); + assertFalse(vertex.edges(Direction.IN, "knows").hasNext()); + break; + } + case PEOPLE_ONLY: { + assertEquals("person", vertex.label()); + assertFalse(vertex.edges(Direction.IN, "created").hasNext()); + assertTrue(IteratorUtils.count(vertex.edges(Direction.BOTH)) > 0); + break; + } + case KNOWS_ONLY: { + assertEquals(0, IteratorUtils.count(vertex.edges(Direction.BOTH, "created"))); + if (vertex.value("name").equals("marko")) + assertEquals(2, IteratorUtils.count(vertex.edges(Direction.BOTH, "knows"))); + else if (vertex.value("name").equals("vadas")) + assertEquals(1, IteratorUtils.count(vertex.edges(Direction.IN, "knows"))); + else if (vertex.value("name").equals("josh")) + assertEquals(1, IteratorUtils.count(vertex.edges(Direction.IN, "knows"))); + else { + assertEquals(0, IteratorUtils.count(vertex.edges(Direction.BOTH, "knows"))); + assertEquals(0, IteratorUtils.count(vertex.edges(Direction.BOTH))); + } + break; + } + case PEOPLE_KNOWS_ONLY: { + assertEquals("person", vertex.label()); + assertEquals(0, IteratorUtils.count(vertex.edges(Direction.BOTH, "created"))); + if (vertex.value("name").equals("marko")) + assertEquals(2, IteratorUtils.count(vertex.edges(Direction.BOTH, "knows"))); + else if (vertex.value("name").equals("vadas")) + assertEquals(1, IteratorUtils.count(vertex.edges(Direction.IN, "knows"))); + else if (vertex.value("name").equals("josh")) + assertEquals(1, IteratorUtils.count(vertex.edges(Direction.IN, "knows"))); + else { + assertEquals(0, IteratorUtils.count(vertex.edges(Direction.BOTH, "knows"))); + assertEquals(0, IteratorUtils.count(vertex.edges(Direction.BOTH))); + } + break; + } + case PEOPLE_KNOWS_WELL_ONLY: { + assertEquals("person", vertex.label()); + assertEquals(0, IteratorUtils.count(vertex.edges(Direction.BOTH, "created"))); + if (vertex.value("name").equals("marko")) { + assertEquals(1, IteratorUtils.count(vertex.edges(Direction.BOTH, "knows"))); + assertEquals(1.0, vertex.edges(Direction.OUT, "knows").next().value("weight"), 0.001); + } else if (vertex.value("name").equals("vadas")) + assertEquals(0, IteratorUtils.count(vertex.edges(Direction.IN, "knows"))); + else if (vertex.value("name").equals("josh")) { + assertEquals(1, IteratorUtils.count(vertex.edges(Direction.IN, "knows"))); + assertEquals(1.0, vertex.edges(Direction.IN, "knows").next().value("weight"), 0.001); + } else { + assertEquals(0, IteratorUtils.count(vertex.edges(Direction.BOTH, "knows"))); + assertEquals(0, IteratorUtils.count(vertex.edges(Direction.BOTH))); + } + break; + } + case VERTICES_ONLY: { + assertEquals(0, IteratorUtils.count(vertex.edges(Direction.BOTH))); + break; + } + case ONE_OUT_EDGE_ONLY: { + if (vertex.label().equals("software") || vertex.value("name").equals("vadas")) + assertEquals(0, IteratorUtils.count(vertex.edges(Direction.BOTH))); + else { + assertEquals(1, IteratorUtils.count(vertex.edges(Direction.OUT))); + assertEquals(0, IteratorUtils.count(vertex.edges(Direction.IN))); + assertEquals(1, IteratorUtils.count(vertex.edges(Direction.BOTH))); + } + break; + } + case OUT_EDGES_ONLY: { + if (vertex.label().equals("software") || vertex.value("name").equals("vadas")) + assertEquals(0, IteratorUtils.count(vertex.edges(Direction.BOTH))); + else { + assertTrue(IteratorUtils.count(vertex.edges(Direction.OUT)) > 0); + assertEquals(0, IteratorUtils.count(vertex.edges(Direction.IN))); + assertEquals(IteratorUtils.count(vertex.edges(Direction.OUT)), IteratorUtils.count(vertex.edges(Direction.BOTH))); + } + break; + } + default: + throw new IllegalStateException("This is an illegal state for this test case: " + this.state); + } + } + + @Override + public boolean terminate(final Memory memory) { + return true; + } + + @Override + public Set<MessageScope> getMessageScopes(Memory memory) { + return Collections.emptySet(); + } + + @Override + public GraphComputer.ResultGraph getPreferredResultGraph() { + return GraphComputer.ResultGraph.NEW; + } + + @Override + public GraphComputer.Persist getPreferredPersist() { + return GraphComputer.Persist.NOTHING; + } + + @Override + @SuppressWarnings("CloneDoesntCallSuperClone,CloneDoesntDeclareCloneNotSupportedException") + public VertexProgramM clone() { + return new VertexProgramM(this.state); + } + + @Override + public void loadState(final Graph graph, final Configuration configuration) { + this.state = configuration.getString("state"); + } + + @Override + public void storeState(final Configuration configuration) { + configuration.setProperty("state", this.state); + VertexProgram.super.storeState(configuration); + } + + } + + private static class MapReduceJ implements MapReduce<MapReduce.NullObject, Integer, MapReduce.NullObject, Integer, Integer> { + + private String state; + + public MapReduceJ() { + } + + public MapReduceJ(final String state) { + this.state = state; + } + + @Override + public void loadState(final Graph graph, final Configuration configuration) { + this.state = configuration.getString("state"); + } + + @Override + public void storeState(final Configuration configuration) { + configuration.setProperty("state", this.state); + MapReduce.super.storeState(configuration); + } + + @Override + @SuppressWarnings("CloneDoesntCallSuperClone,CloneDoesntDeclareCloneNotSupportedException") + public MapReduceJ clone() { + return new MapReduceJ(this.state); + } + + @Override + public boolean doStage(final Stage stage) { + return true; + } + + @Override + public void map(final Vertex vertex, final MapEmitter<NullObject, Integer> emitter) { + emitter.emit(1); + switch (this.state) { + case VertexProgramM.SOFTWARE_ONLY: { + assertEquals("software", vertex.label()); + break; + } + case VertexProgramM.PEOPLE_ONLY: { + assertEquals("person", vertex.label()); + break; + } + case VertexProgramM.KNOWS_ONLY: { + assertTrue(vertex.label().equals("person") || vertex.label().equals("software")); + break; + } + case VertexProgramM.PEOPLE_KNOWS_ONLY: { + assertEquals("person", vertex.label()); + break; + } + case VertexProgramM.PEOPLE_KNOWS_WELL_ONLY: { + assertEquals("person", vertex.label()); + break; + } + case VertexProgramM.VERTICES_ONLY: { + assertTrue(vertex.label().equals("person") || vertex.label().equals("software")); + break; + } + case VertexProgramM.ONE_OUT_EDGE_ONLY: { + assertTrue(vertex.label().equals("person") || vertex.label().equals("software")); + break; + } + case VertexProgramM.OUT_EDGES_ONLY: { + assertTrue(vertex.label().equals("person") || vertex.label().equals("software")); + break; + } + default: + throw new IllegalStateException("This is an illegal state for this test case: " + this.state); + } + } + + @Override + public void combine(final NullObject key, final Iterator<Integer> values, final ReduceEmitter<NullObject, Integer> emitter) { + this.reduce(key, values, emitter); + } + + @Override + public void reduce(final NullObject key, final Iterator<Integer> values, final ReduceEmitter<NullObject, Integer> emitter) { + int count = 0; + while (values.hasNext()) { + count = count + values.next(); + } + emitter.emit(count); + } + + @Override + public Integer generateFinalResult(final Iterator<KeyValue<NullObject, Integer>> keyValues) { + int counter = keyValues.next().getValue(); + assertFalse(keyValues.hasNext()); + + switch (this.state) { + case VertexProgramM.SOFTWARE_ONLY: { + assertEquals(2, counter); + break; + } + case VertexProgramM.PEOPLE_ONLY: { + assertEquals(4, counter); + break; + } + case VertexProgramM.KNOWS_ONLY: { + assertEquals(6, counter); + break; + } + case VertexProgramM.PEOPLE_KNOWS_ONLY: { + assertEquals(4, counter); + break; + } + case VertexProgramM.PEOPLE_KNOWS_WELL_ONLY: { + assertEquals(4, counter); + break; + } + case VertexProgramM.VERTICES_ONLY: { + assertEquals(6, counter); + break; + } + case VertexProgramM.ONE_OUT_EDGE_ONLY: { + assertEquals(6, counter); + break; + } + case VertexProgramM.OUT_EDGES_ONLY: { + assertEquals(6, counter); + break; + } + default: + throw new IllegalStateException("This is an illegal state for this test case: " + this.state); + } + return counter; + } + + @Override + public String getMemoryKey() { + return "a"; + } + } + + @Test + @LoadGraphWith(MODERN) + public void shouldSupportJobChaining() throws Exception { + final ComputerResult result1 = graphProvider.getGraphComputer(graph) + .program(PageRankVertexProgram.build().iterations(5).create(graph)).persist(GraphComputer.Persist.EDGES).result(GraphComputer.ResultGraph.NEW).submit().get(); + final Graph graph1 = result1.graph(); + final Memory memory1 = result1.memory(); + assertEquals(5, memory1.getIteration()); + assertEquals(6, graph1.traversal().V().count().next().intValue()); + assertEquals(6, graph1.traversal().E().count().next().intValue()); + assertEquals(6, graph1.traversal().V().values(PageRankVertexProgram.PAGE_RANK).count().next().intValue()); + assertEquals(18, graph1.traversal().V().values().count().next().intValue()); + // + final ComputerResult result2 = graph1.compute(graphProvider.getGraphComputer(graph1).getClass()) + .program(PeerPressureVertexProgram.build().maxIterations(4).create(graph1)).persist(GraphComputer.Persist.EDGES).result(GraphComputer.ResultGraph.NEW).submit().get(); + final Graph graph2 = result2.graph(); + final Memory memory2 = result2.memory(); + assertTrue(memory2.getIteration() <= 4); + assertEquals(6, graph2.traversal().V().count().next().intValue()); + assertEquals(6, graph2.traversal().E().count().next().intValue()); + assertEquals(6, graph2.traversal().V().values(PeerPressureVertexProgram.CLUSTER).count().next().intValue()); + assertEquals(6, graph2.traversal().V().values(PageRankVertexProgram.PAGE_RANK).count().next().intValue()); + assertEquals(24, graph2.traversal().V().values().count().next().intValue()); + // + final ComputerResult result3 = graph2.compute(graphProvider.getGraphComputer(graph2).getClass()) + .program(TraversalVertexProgram.build().traversal(g.V().groupCount("m").by(__.values(PageRankVertexProgram.PAGE_RANK).count()).label().asAdmin()).create(graph2)).persist(GraphComputer.Persist.EDGES).result(GraphComputer.ResultGraph.NEW).submit().get(); + final Graph graph3 = result3.graph(); + final Memory memory3 = result3.memory(); + assertTrue(memory3.keys().contains("m")); + assertTrue(memory3.keys().contains(TraversalVertexProgram.HALTED_TRAVERSERS)); + assertEquals(1, memory3.<Map<Long, Long>>get("m").size()); + assertEquals(6, memory3.<Map<Long, Long>>get("m").get(1l).intValue()); + List<Traverser<String>> traversers = IteratorUtils.list(memory3.<TraverserSet>get(TraversalVertexProgram.HALTED_TRAVERSERS).iterator()); + assertEquals(6l, traversers.stream().map(Traverser::bulk).reduce((a, b) -> a + b).get().longValue()); + assertEquals(4l, traversers.stream().filter(s -> s.get().equals("person")).map(Traverser::bulk).reduce((a, b) -> a + b).get().longValue()); + assertEquals(2l, traversers.stream().filter(s -> s.get().equals("software")).map(Traverser::bulk).reduce((a, b) -> a + b).get().longValue()); + assertEquals(6, graph3.traversal().V().count().next().intValue()); + assertEquals(6, graph3.traversal().E().count().next().intValue()); + assertEquals(0, graph3.traversal().V().values(TraversalVertexProgram.HALTED_TRAVERSERS).count().next().intValue()); + assertEquals(6, graph3.traversal().V().values(PeerPressureVertexProgram.CLUSTER).count().next().intValue()); + assertEquals(6, graph3.traversal().V().values(PageRankVertexProgram.PAGE_RANK).count().next().intValue()); + assertEquals(24, graph3.traversal().V().values().count().next().intValue()); // no halted traversers + + // TODO: add a test the shows DAG behavior -- splitting another TraversalVertexProgram off of the PeerPressureVertexProgram job. + } + + /////////////////////////////////// + + @Test + @LoadGraphWith(MODERN) + public void shouldSupportPreExistingComputeKeys() throws Exception { + final ComputerResult result = graphProvider.getGraphComputer(graph).program(new VertexProgramN()).submit().get(); + result.graph().vertices().forEachRemaining(vertex -> { + if (vertex.label().equals("person")) { + if (vertex.value("name").equals("marko")) + assertEquals(32, vertex.<Integer>value("age").intValue()); + else if (vertex.value("name").equals("peter")) + assertEquals(38, vertex.<Integer>value("age").intValue()); + else if (vertex.value("name").equals("vadas")) + assertEquals(30, vertex.<Integer>value("age").intValue()); + else if (vertex.value("name").equals("josh")) + assertEquals(35, vertex.<Integer>value("age").intValue()); + else + throw new IllegalStateException("This vertex should not have been accessed: " + vertex); + } + }); + } + + private static class VertexProgramN extends StaticVertexProgram { + + @Override + public void setup(final Memory memory) { + + } + + @Override + public void execute(final Vertex vertex, final Messenger messenger, final Memory memory) { + if (vertex.label().equals("person")) + vertex.property(VertexProperty.Cardinality.single, "age", vertex.<Integer>value("age") + 1); + } + + @Override + public boolean terminate(final Memory memory) { + return memory.getIteration() > 1; + } + + @Override + public Set<MessageScope> getMessageScopes(final Memory memory) { + return Collections.emptySet(); + } + + @Override + public Set<VertexComputeKey> getVertexComputeKeys() { + return Collections.singleton(VertexComputeKey.of("age", false)); + } + + @Override + public GraphComputer.ResultGraph getPreferredResultGraph() { + return GraphComputer.ResultGraph.NEW; + } + + @Override + public GraphComputer.Persist getPreferredPersist() { + return GraphComputer.Persist.VERTEX_PROPERTIES; + } + } + + /////////////////////////////////// + + @Test + @LoadGraphWith(MODERN) + public void shouldSupportTransientKeys() throws Exception { + final ComputerResult result = graphProvider.getGraphComputer(graph).program(new VertexProgramO()).mapReduce(new MapReduceK()).submit().get(); + result.graph().vertices().forEachRemaining(vertex -> { + assertFalse(vertex.property("v1").isPresent()); + assertFalse(vertex.property("v2").isPresent()); + assertTrue(vertex.property("v3").isPresent()); + assertEquals("shouldExist", vertex.value("v3")); + assertTrue(vertex.property("name").isPresent()); + if (vertex.label().equals("software")) + assertTrue(vertex.property("lang").isPresent()); + else + assertTrue(vertex.property("age").isPresent()); + assertEquals(3, IteratorUtils.count(vertex.properties())); + assertEquals(0, IteratorUtils.count(vertex.properties("v1"))); + assertEquals(0, IteratorUtils.count(vertex.properties("v2"))); + assertEquals(1, IteratorUtils.count(vertex.properties("v3"))); + assertEquals(1, IteratorUtils.count(vertex.properties("name"))); + }); + assertEquals(6l, result.graph().traversal().V().properties("name").count().next().longValue()); + assertEquals(0l, result.graph().traversal().V().properties("v1").count().next().longValue()); + assertEquals(0l, result.graph().traversal().V().properties("v2").count().next().longValue()); + assertEquals(6l, result.graph().traversal().V().properties("v3").count().next().longValue()); + assertEquals(6l, result.graph().traversal().V().<String>values("name").dedup().count().next().longValue()); + assertEquals(1l, result.graph().traversal().V().<String>values("v3").dedup().count().next().longValue()); + assertEquals("shouldExist", result.graph().traversal().V().<String>values("v3").dedup().next()); + /// + assertFalse(result.memory().exists("m1")); + assertFalse(result.memory().exists("m2")); + assertTrue(result.memory().exists("m3")); + assertEquals(24l, result.memory().<Long>get("m3").longValue()); + assertEquals(2, result.memory().keys().size()); // mapReduceK + } + + private static class VertexProgramO extends StaticVertexProgram { + + @Override + public void setup(final Memory memory) { + assertFalse(memory.exists("m1")); + assertFalse(memory.exists("m2")); + assertFalse(memory.exists("m3")); + } + + @Override + public void execute(final Vertex vertex, final Messenger messenger, final Memory memory) { + if (memory.isInitialIteration()) { + assertFalse(vertex.property("v1").isPresent()); + assertFalse(vertex.property("v2").isPresent()); + assertFalse(vertex.property("v3").isPresent()); + vertex.property("v1", "shouldNotExist"); + vertex.property("v2", "shouldNotExist"); + vertex.property("v3", "shouldExist"); + assertTrue(vertex.property("v1").isPresent()); + assertTrue(vertex.property("v2").isPresent()); + assertTrue(vertex.property("v3").isPresent()); + assertEquals("shouldNotExist", vertex.value("v1")); + assertEquals("shouldNotExist", vertex.value("v2")); + assertEquals("shouldExist", vertex.value("v3")); + // + assertFalse(memory.exists("m1")); + assertFalse(memory.exists("m2")); + assertFalse(memory.exists("m3")); + memory.add("m1", false); + memory.add("m2", true); + memory.add("m3", 2l); + // should still not exist as this pulls from the master memory + assertFalse(memory.exists("m1")); + assertFalse(memory.exists("m2")); + assertFalse(memory.exists("m3")); + + } else { + assertTrue(vertex.property("v1").isPresent()); + assertTrue(vertex.property("v2").isPresent()); + assertTrue(vertex.property("v3").isPresent()); + assertEquals("shouldNotExist", vertex.value("v1")); + assertEquals("shouldNotExist", vertex.value("v2")); + assertEquals("shouldExist", vertex.value("v3")); + // + assertTrue(memory.exists("m1")); + assertTrue(memory.exists("m2")); + assertTrue(memory.exists("m3")); + assertFalse(memory.get("m1")); + assertTrue(memory.get("m2")); + assertEquals(12l, memory.<Long>get("m3").longValue()); + memory.add("m1", true); + memory.add("m2", true); + memory.add("m3", 2l); } } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f1aed80b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerMessenger.java ----------------------------------------------------------------------