fixed up the MultiScope test to use MODERN so that it works with Giraph and 
Spark. Will back tweak to tp31/.


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/f1aed80b
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/f1aed80b
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/f1aed80b

Branch: refs/heads/master
Commit: f1aed80b056c2244ce8b23ab077e8b10ee6939d9
Parents: 5ac61b7 3b8c628
Author: Marko A. Rodriguez <okramma...@gmail.com>
Authored: Tue Jun 27 09:50:15 2017 -0600
Committer: Marko A. Rodriguez <okramma...@gmail.com>
Committed: Tue Jun 27 09:50:15 2017 -0600

----------------------------------------------------------------------
 CHANGELOG.asciidoc                              |  2 +
 .../process/computer/GraphComputerTest.java     | 75 ++++++++++++++++++--
 .../process/computer/TinkerMessageBoard.java    |  4 +-
 .../process/computer/TinkerMessenger.java       | 22 +++---
 4 files changed, 88 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f1aed80b/CHANGELOG.asciidoc
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f1aed80b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
----------------------------------------------------------------------
diff --cc 
gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
index 5c66673,02ac5d4..e4b40e8
--- 
a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
+++ 
b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
@@@ -24,35 -22,18 +24,32 @@@ import org.apache.commons.configuration
  import org.apache.tinkerpop.gremlin.ExceptionCoverage;
  import org.apache.tinkerpop.gremlin.LoadGraphWith;
  import org.apache.tinkerpop.gremlin.process.AbstractGremlinProcessTest;
 +import 
org.apache.tinkerpop.gremlin.process.computer.clustering.peerpressure.PeerPressureVertexProgram;
 +import 
org.apache.tinkerpop.gremlin.process.computer.ranking.pagerank.PageRankVertexProgram;
 +import 
org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
 +import 
org.apache.tinkerpop.gremlin.process.computer.util.AbstractVertexProgramBuilder;
  import org.apache.tinkerpop.gremlin.process.computer.util.StaticMapReduce;
  import org.apache.tinkerpop.gremlin.process.computer.util.StaticVertexProgram;
 +import org.apache.tinkerpop.gremlin.process.traversal.Operator;
 +import org.apache.tinkerpop.gremlin.process.traversal.P;
- import org.apache.tinkerpop.gremlin.process.traversal.Path;
 +import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 +import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
  import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
 +import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyPath;
 +import 
org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.VerificationException;
 +import 
org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
 +import 
org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
  import org.apache.tinkerpop.gremlin.structure.Direction;
 -import org.apache.tinkerpop.gremlin.structure.Element;
 +import org.apache.tinkerpop.gremlin.structure.Edge;
  import org.apache.tinkerpop.gremlin.structure.Graph;
 +import org.apache.tinkerpop.gremlin.structure.Property;
  import org.apache.tinkerpop.gremlin.structure.Vertex;
  import org.apache.tinkerpop.gremlin.structure.VertexProperty;
  import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
  import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
 +import org.javatuples.Pair;
- import org.junit.Ignore;
  import org.junit.Test;
  
- import java.util.AbstractMap;
  import java.util.ArrayList;
  import java.util.Arrays;
  import java.util.Collections;
@@@ -70,15 -50,11 +68,14 @@@ import java.util.concurrent.Future
  
  import static org.apache.tinkerpop.gremlin.LoadGraphWith.GraphData.GRATEFUL;
  import static org.apache.tinkerpop.gremlin.LoadGraphWith.GraphData.MODERN;
 +import static 
org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.outE;
- import static org.apache.tinkerpop.gremlin.structure.T.id;
  import static org.junit.Assert.assertEquals;
  import static org.junit.Assert.assertFalse;
  import static org.junit.Assert.assertTrue;
  import static org.junit.Assert.fail;
 +import static org.junit.Assume.assumeNoException;
 +
+ 
  /**
   * @author Marko A. Rodriguez (http://markorodriguez.com)
   */
@@@ -1588,567 -1468,92 +1585,635 @@@ public class GraphComputerTest extends 
          public void storeState(final Configuration configuration) {
              VertexProgram.super.storeState(configuration);
          }
+     }
+ 
+     /////////////////////////////////////////////
+     @Test
++    @LoadGraphWith(MODERN)
+     public void shouldSupportMultipleScopes() throws ExecutionException, 
InterruptedException {
 -        Vertex a = graph.addVertex("a");
 -        Vertex b = graph.addVertex("b");
 -        Vertex c = graph.addVertex("c");
 -        a.addEdge("edge", b);
 -        b.addEdge("edge", c);
 -
 -        // Simple graph:
 -        // a -> b -> c
 -
 -        // Execute a traversal program that sends an incoming message of "2" 
and an outgoing message of "1" from "b"
 -        // then each vertex sums any received messages
 -        ComputerResult result = graph.compute().program(new 
MultiScopeVertexProgram()).submit().get();
 -
 -        // We expect the results to be {a=2, b=0, c=1}. Instead it is {a=3, 
b=0, c=3}
 -        assertEquals((Long) 
result.graph().traversal().V().hasLabel("a").next().property(MultiScopeVertexProgram.MEMORY_KEY).value(),Long.valueOf(2L));
 -        assertEquals((Long) 
result.graph().traversal().V().hasLabel("b").next().property(MultiScopeVertexProgram.MEMORY_KEY).value(),Long.valueOf(0L));
 -        assertEquals((Long) 
result.graph().traversal().V().hasLabel("c").next().property(MultiScopeVertexProgram.MEMORY_KEY).value(),Long.valueOf(1L));
++        final ComputerResult result = graph.compute().program(new 
MultiScopeVertexProgram()).submit().get();
++        assertEquals(result.graph().traversal().V().has("name", 
"josh").next().property(MultiScopeVertexProgram.MEMORY_KEY).value(), 0L);
++        assertEquals(result.graph().traversal().V().has("name", 
"lop").next().property(MultiScopeVertexProgram.MEMORY_KEY).value(), 1L);
++        assertEquals(result.graph().traversal().V().has("name", 
"ripple").next().property(MultiScopeVertexProgram.MEMORY_KEY).value(), 1L);
++        assertEquals(result.graph().traversal().V().has("name", 
"marko").next().property(MultiScopeVertexProgram.MEMORY_KEY).value(), 2L);
+     }
+ 
 -    public static class MultiScopeVertexProgram implements 
VertexProgram<Long> {
++    public static class MultiScopeVertexProgram extends 
StaticVertexProgram<Long> {
+ 
+         private final MessageScope.Local<Long> countMessageScopeIn = 
MessageScope.Local.of(__::inE);
+         private final MessageScope.Local<Long> countMessageScopeOut = 
MessageScope.Local.of(__::outE);
+ 
+         private static final String MEMORY_KEY = "count";
+ 
 -        private static final Set<String> COMPUTE_KEYS = 
Collections.singleton(MEMORY_KEY);
+ 
+         @Override
 -        public void setup(final Memory memory) {}
++        public void setup(final Memory memory) {
++        }
+ 
+         @Override
+         public GraphComputer.Persist getPreferredPersist() {
+             return GraphComputer.Persist.VERTEX_PROPERTIES;
+         }
+ 
+         @Override
 -        public Set<String> getElementComputeKeys() {
 -            return COMPUTE_KEYS;
++        public Set<VertexComputeKey> getVertexComputeKeys() {
++            return Collections.singleton(VertexComputeKey.of(MEMORY_KEY, 
false));
+         }
+ 
+         @Override
+         public Set<MessageScope> getMessageScopes(final Memory memory) {
+             HashSet<MessageScope> scopes = new HashSet<>();
+             scopes.add(countMessageScopeIn);
+             scopes.add(countMessageScopeOut);
+             return scopes;
+         }
+ 
+         @Override
+         public void execute(Vertex vertex, Messenger<Long> messenger, Memory 
memory) {
+             switch (memory.getIteration()) {
+                 case 0:
 -                    if (vertex.label().equals("b")) {
++                    if (vertex.value("name").equals("josh")) {
+                         messenger.sendMessage(this.countMessageScopeIn, 2L);
+                         messenger.sendMessage(this.countMessageScopeOut, 1L);
+                     }
+                     break;
+                 case 1:
+                     long edgeCount = 
IteratorUtils.reduce(messenger.receiveMessages(), 0L, (a, b) -> a + b);
+                     vertex.property(MEMORY_KEY, edgeCount);
+                     break;
+             }
+         }
+ 
+         @Override
+         public boolean terminate(final Memory memory) {
+             return memory.getIteration() == 1;
+         }
+ 
+         @Override
+         public GraphComputer.ResultGraph getPreferredResultGraph() {
+             return GraphComputer.ResultGraph.NEW;
+         }
  
 -        @Override
 -        public MultiScopeVertexProgram clone() {
 +    }
 +
 +    /////////////////////////////////////////////
 +
 +    @Test
 +    @LoadGraphWith(MODERN)
 +    public void shouldSupportGraphFilter() throws Exception {
 +        // if the graph computer does not support graph filter, then make 
sure its exception handling is correct
 +        if 
(!graphProvider.getGraphComputer(graph).features().supportsGraphFilter()) {
              try {
 -                return (MultiScopeVertexProgram) super.clone();
 -            } catch (final CloneNotSupportedException e) {
 -                throw new RuntimeException(e);
 +                
graphProvider.getGraphComputer(graph).vertices(__.hasLabel("software"));
 +                fail("Should throw an unsupported operation exception");
 +            } catch (final UnsupportedOperationException e) {
 +                
assertEquals(GraphComputer.Exceptions.graphFilterNotSupported().getMessage(), 
e.getMessage());
 +            }
 +            try {
 +                
graphProvider.getGraphComputer(graph).edges(__.<Vertex>outE().limit(10));
 +                fail("Should throw an unsupported operation exception");
 +            } catch (final UnsupportedOperationException e) {
 +                
assertEquals(GraphComputer.Exceptions.graphFilterNotSupported().getMessage(), 
e.getMessage());
 +            }
 +            return;
 +        }
 +        /// VERTEX PROGRAM
 +        
graphProvider.getGraphComputer(graph).vertices(__.hasLabel("software")).program(new
 VertexProgramM(VertexProgramM.SOFTWARE_ONLY)).submit().get();
 +        
graphProvider.getGraphComputer(graph).vertices(__.hasLabel("person")).program(new
 VertexProgramM(VertexProgramM.PEOPLE_ONLY)).submit().get();
 +        
graphProvider.getGraphComputer(graph).edges(__.bothE("knows")).program(new 
VertexProgramM(VertexProgramM.KNOWS_ONLY)).submit().get();
 +        
graphProvider.getGraphComputer(graph).vertices(__.hasLabel("person")).edges(__.bothE("knows")).program(new
 VertexProgramM(VertexProgramM.PEOPLE_KNOWS_ONLY)).submit().get();
 +        
graphProvider.getGraphComputer(graph).vertices(__.hasLabel("person")).edges(__.<Vertex>bothE("knows").has("weight",
 P.gt(0.5f))).program(new 
VertexProgramM(VertexProgramM.PEOPLE_KNOWS_WELL_ONLY)).submit().get();
 +        
graphProvider.getGraphComputer(graph).edges(__.<Vertex>bothE().limit(0)).program(new
 VertexProgramM(VertexProgramM.VERTICES_ONLY)).submit().get();
 +        
graphProvider.getGraphComputer(graph).edges(__.<Vertex>outE().limit(1)).program(new
 VertexProgramM(VertexProgramM.ONE_OUT_EDGE_ONLY)).submit().get();
 +        graphProvider.getGraphComputer(graph).edges(outE()).program(new 
VertexProgramM(VertexProgramM.OUT_EDGES_ONLY)).submit().get();
 +
 +        /// VERTEX PROGRAM + MAP REDUCE
 +        
graphProvider.getGraphComputer(graph).vertices(__.hasLabel("software")).program(new
 VertexProgramM(VertexProgramM.SOFTWARE_ONLY)).mapReduce(new 
MapReduceJ(VertexProgramM.SOFTWARE_ONLY)).submit().get();
 +        
graphProvider.getGraphComputer(graph).vertices(__.hasLabel("person")).program(new
 VertexProgramM(VertexProgramM.PEOPLE_ONLY)).mapReduce(new 
MapReduceJ(VertexProgramM.PEOPLE_ONLY)).submit().get();
 +        
graphProvider.getGraphComputer(graph).edges(__.bothE("knows")).program(new 
VertexProgramM(VertexProgramM.KNOWS_ONLY)).mapReduce(new 
MapReduceJ(VertexProgramM.KNOWS_ONLY)).submit().get();
 +        
graphProvider.getGraphComputer(graph).vertices(__.hasLabel("person")).edges(__.bothE("knows")).program(new
 VertexProgramM(VertexProgramM.PEOPLE_KNOWS_ONLY)).mapReduce(new 
MapReduceJ(VertexProgramM.PEOPLE_KNOWS_ONLY)).submit().get();
 +        
graphProvider.getGraphComputer(graph).vertices(__.hasLabel("person")).edges(__.<Vertex>bothE("knows").has("weight",
 P.gt(0.5f))).program(new 
VertexProgramM(VertexProgramM.PEOPLE_KNOWS_WELL_ONLY)).mapReduce(new 
MapReduceJ(VertexProgramM.PEOPLE_KNOWS_WELL_ONLY)).submit().get();
 +        
graphProvider.getGraphComputer(graph).edges(__.<Vertex>bothE().limit(0)).program(new
 VertexProgramM(VertexProgramM.VERTICES_ONLY)).mapReduce(new 
MapReduceJ(VertexProgramM.VERTICES_ONLY)).submit().get();
 +        
graphProvider.getGraphComputer(graph).edges(__.<Vertex>outE().limit(1)).program(new
 VertexProgramM(VertexProgramM.ONE_OUT_EDGE_ONLY)).mapReduce(new 
MapReduceJ(VertexProgramM.ONE_OUT_EDGE_ONLY)).submit().get();
 +        graphProvider.getGraphComputer(graph).edges(outE()).program(new 
VertexProgramM(VertexProgramM.OUT_EDGES_ONLY)).mapReduce(new 
MapReduceJ(VertexProgramM.OUT_EDGES_ONLY)).submit().get();
 +
 +        /// MAP REDUCE ONLY
 +        
graphProvider.getGraphComputer(graph).vertices(__.hasLabel("software")).mapReduce(new
 MapReduceJ(VertexProgramM.SOFTWARE_ONLY)).submit().get();
 +        
graphProvider.getGraphComputer(graph).vertices(__.hasLabel("person")).mapReduce(new
 MapReduceJ(VertexProgramM.PEOPLE_ONLY)).submit().get();
 +        
graphProvider.getGraphComputer(graph).edges(__.bothE("knows")).mapReduce(new 
MapReduceJ(VertexProgramM.KNOWS_ONLY)).submit().get();
 +        
graphProvider.getGraphComputer(graph).vertices(__.hasLabel("person")).edges(__.bothE("knows")).mapReduce(new
 MapReduceJ(VertexProgramM.PEOPLE_KNOWS_ONLY)).submit().get();
 +        
graphProvider.getGraphComputer(graph).vertices(__.hasLabel("person")).edges(__.<Vertex>bothE("knows").has("weight",
 P.gt(0.5f))).mapReduce(new 
MapReduceJ(VertexProgramM.PEOPLE_KNOWS_WELL_ONLY)).submit().get();
 +        
graphProvider.getGraphComputer(graph).edges(__.<Vertex>bothE().limit(0)).mapReduce(new
 MapReduceJ(VertexProgramM.VERTICES_ONLY)).submit().get();
 +        
graphProvider.getGraphComputer(graph).edges(__.<Vertex>outE().limit(1)).mapReduce(new
 MapReduceJ(VertexProgramM.ONE_OUT_EDGE_ONLY)).submit().get();
 +        graphProvider.getGraphComputer(graph).edges(outE()).mapReduce(new 
MapReduceJ(VertexProgramM.OUT_EDGES_ONLY)).submit().get();
 +
 +        // EXCEPTION HANDLING
 +        try {
 +            graphProvider.getGraphComputer(graph).vertices(__.out());
 +            fail();
 +        } catch (final IllegalArgumentException e) {
 +            assertEquals(e.getMessage(), 
GraphComputer.Exceptions.vertexFilterAccessesIncidentEdges(__.out()).getMessage());
 +        }
 +        try {
 +            
graphProvider.getGraphComputer(graph).edges(__.<Vertex>out().outE());
 +            fail();
 +        } catch (final IllegalArgumentException e) {
 +            assertEquals(e.getMessage(), 
GraphComputer.Exceptions.edgeFilterAccessesAdjacentVertices(__.<Vertex>out().outE()).getMessage());
 +        }
 +    }
 +
 +    public static class VertexProgramM implements VertexProgram {
 +
 +        public static final String SOFTWARE_ONLY = "softwareOnly";
 +        public static final String PEOPLE_ONLY = "peopleOnly";
 +        public static final String KNOWS_ONLY = "knowsOnly";
 +        public static final String PEOPLE_KNOWS_ONLY = "peopleKnowsOnly";
 +        public static final String PEOPLE_KNOWS_WELL_ONLY = 
"peopleKnowsWellOnly";
 +        public static final String VERTICES_ONLY = "verticesOnly";
 +        public static final String ONE_OUT_EDGE_ONLY = "oneOutEdgeOnly";
 +        public static final String OUT_EDGES_ONLY = "outEdgesOnly";
 +
 +        private String state;
 +
 +        public VertexProgramM() {
 +
 +        }
 +
 +        public VertexProgramM(final String state) {
 +            this.state = state;
 +        }
 +
 +        @Override
 +        public void setup(final Memory memory) {
 +
 +        }
 +
 +        @Override
 +        public void execute(final Vertex vertex, final Messenger messenger, 
final Memory memory) {
 +            switch (this.state) {
 +                case SOFTWARE_ONLY: {
 +                    assertEquals("software", vertex.label());
 +                    assertFalse(vertex.edges(Direction.OUT).hasNext());
 +                    assertTrue(vertex.edges(Direction.IN).hasNext());
 +                    assertTrue(vertex.edges(Direction.IN, 
"created").hasNext());
 +                    assertFalse(vertex.edges(Direction.IN, 
"knows").hasNext());
 +                    break;
 +                }
 +                case PEOPLE_ONLY: {
 +                    assertEquals("person", vertex.label());
 +                    assertFalse(vertex.edges(Direction.IN, 
"created").hasNext());
 +                    
assertTrue(IteratorUtils.count(vertex.edges(Direction.BOTH)) > 0);
 +                    break;
 +                }
 +                case KNOWS_ONLY: {
 +                    assertEquals(0, 
IteratorUtils.count(vertex.edges(Direction.BOTH, "created")));
 +                    if (vertex.value("name").equals("marko"))
 +                        assertEquals(2, 
IteratorUtils.count(vertex.edges(Direction.BOTH, "knows")));
 +                    else if (vertex.value("name").equals("vadas"))
 +                        assertEquals(1, 
IteratorUtils.count(vertex.edges(Direction.IN, "knows")));
 +                    else if (vertex.value("name").equals("josh"))
 +                        assertEquals(1, 
IteratorUtils.count(vertex.edges(Direction.IN, "knows")));
 +                    else {
 +                        assertEquals(0, 
IteratorUtils.count(vertex.edges(Direction.BOTH, "knows")));
 +                        assertEquals(0, 
IteratorUtils.count(vertex.edges(Direction.BOTH)));
 +                    }
 +                    break;
 +                }
 +                case PEOPLE_KNOWS_ONLY: {
 +                    assertEquals("person", vertex.label());
 +                    assertEquals(0, 
IteratorUtils.count(vertex.edges(Direction.BOTH, "created")));
 +                    if (vertex.value("name").equals("marko"))
 +                        assertEquals(2, 
IteratorUtils.count(vertex.edges(Direction.BOTH, "knows")));
 +                    else if (vertex.value("name").equals("vadas"))
 +                        assertEquals(1, 
IteratorUtils.count(vertex.edges(Direction.IN, "knows")));
 +                    else if (vertex.value("name").equals("josh"))
 +                        assertEquals(1, 
IteratorUtils.count(vertex.edges(Direction.IN, "knows")));
 +                    else {
 +                        assertEquals(0, 
IteratorUtils.count(vertex.edges(Direction.BOTH, "knows")));
 +                        assertEquals(0, 
IteratorUtils.count(vertex.edges(Direction.BOTH)));
 +                    }
 +                    break;
 +                }
 +                case PEOPLE_KNOWS_WELL_ONLY: {
 +                    assertEquals("person", vertex.label());
 +                    assertEquals(0, 
IteratorUtils.count(vertex.edges(Direction.BOTH, "created")));
 +                    if (vertex.value("name").equals("marko")) {
 +                        assertEquals(1, 
IteratorUtils.count(vertex.edges(Direction.BOTH, "knows")));
 +                        assertEquals(1.0, vertex.edges(Direction.OUT, 
"knows").next().value("weight"), 0.001);
 +                    } else if (vertex.value("name").equals("vadas"))
 +                        assertEquals(0, 
IteratorUtils.count(vertex.edges(Direction.IN, "knows")));
 +                    else if (vertex.value("name").equals("josh")) {
 +                        assertEquals(1, 
IteratorUtils.count(vertex.edges(Direction.IN, "knows")));
 +                        assertEquals(1.0, vertex.edges(Direction.IN, 
"knows").next().value("weight"), 0.001);
 +                    } else {
 +                        assertEquals(0, 
IteratorUtils.count(vertex.edges(Direction.BOTH, "knows")));
 +                        assertEquals(0, 
IteratorUtils.count(vertex.edges(Direction.BOTH)));
 +                    }
 +                    break;
 +                }
 +                case VERTICES_ONLY: {
 +                    assertEquals(0, 
IteratorUtils.count(vertex.edges(Direction.BOTH)));
 +                    break;
 +                }
 +                case ONE_OUT_EDGE_ONLY: {
 +                    if (vertex.label().equals("software") || 
vertex.value("name").equals("vadas"))
 +                        assertEquals(0, 
IteratorUtils.count(vertex.edges(Direction.BOTH)));
 +                    else {
 +                        assertEquals(1, 
IteratorUtils.count(vertex.edges(Direction.OUT)));
 +                        assertEquals(0, 
IteratorUtils.count(vertex.edges(Direction.IN)));
 +                        assertEquals(1, 
IteratorUtils.count(vertex.edges(Direction.BOTH)));
 +                    }
 +                    break;
 +                }
 +                case OUT_EDGES_ONLY: {
 +                    if (vertex.label().equals("software") || 
vertex.value("name").equals("vadas"))
 +                        assertEquals(0, 
IteratorUtils.count(vertex.edges(Direction.BOTH)));
 +                    else {
 +                        
assertTrue(IteratorUtils.count(vertex.edges(Direction.OUT)) > 0);
 +                        assertEquals(0, 
IteratorUtils.count(vertex.edges(Direction.IN)));
 +                        
assertEquals(IteratorUtils.count(vertex.edges(Direction.OUT)), 
IteratorUtils.count(vertex.edges(Direction.BOTH)));
 +                    }
 +                    break;
 +                }
 +                default:
 +                    throw new IllegalStateException("This is an illegal state 
for this test case: " + this.state);
 +            }
 +        }
 +
 +        @Override
 +        public boolean terminate(final Memory memory) {
 +            return true;
 +        }
 +
 +        @Override
 +        public Set<MessageScope> getMessageScopes(Memory memory) {
 +            return Collections.emptySet();
 +        }
 +
 +        @Override
 +        public GraphComputer.ResultGraph getPreferredResultGraph() {
 +            return GraphComputer.ResultGraph.NEW;
 +        }
 +
 +        @Override
 +        public GraphComputer.Persist getPreferredPersist() {
 +            return GraphComputer.Persist.NOTHING;
 +        }
 +
 +        @Override
 +        
@SuppressWarnings("CloneDoesntCallSuperClone,CloneDoesntDeclareCloneNotSupportedException")
 +        public VertexProgramM clone() {
 +            return new VertexProgramM(this.state);
 +        }
 +
 +        @Override
 +        public void loadState(final Graph graph, final Configuration 
configuration) {
 +            this.state = configuration.getString("state");
 +        }
 +
 +        @Override
 +        public void storeState(final Configuration configuration) {
 +            configuration.setProperty("state", this.state);
 +            VertexProgram.super.storeState(configuration);
 +        }
 +
 +    }
 +
 +    private static class MapReduceJ implements 
MapReduce<MapReduce.NullObject, Integer, MapReduce.NullObject, Integer, 
Integer> {
 +
 +        private String state;
 +
 +        public MapReduceJ() {
 +        }
 +
 +        public MapReduceJ(final String state) {
 +            this.state = state;
 +        }
 +
 +        @Override
 +        public void loadState(final Graph graph, final Configuration 
configuration) {
 +            this.state = configuration.getString("state");
 +        }
 +
 +        @Override
 +        public void storeState(final Configuration configuration) {
 +            configuration.setProperty("state", this.state);
 +            MapReduce.super.storeState(configuration);
 +        }
 +
 +        @Override
 +        
@SuppressWarnings("CloneDoesntCallSuperClone,CloneDoesntDeclareCloneNotSupportedException")
 +        public MapReduceJ clone() {
 +            return new MapReduceJ(this.state);
 +        }
 +
 +        @Override
 +        public boolean doStage(final Stage stage) {
 +            return true;
 +        }
 +
 +        @Override
 +        public void map(final Vertex vertex, final MapEmitter<NullObject, 
Integer> emitter) {
 +            emitter.emit(1);
 +            switch (this.state) {
 +                case VertexProgramM.SOFTWARE_ONLY: {
 +                    assertEquals("software", vertex.label());
 +                    break;
 +                }
 +                case VertexProgramM.PEOPLE_ONLY: {
 +                    assertEquals("person", vertex.label());
 +                    break;
 +                }
 +                case VertexProgramM.KNOWS_ONLY: {
 +                    assertTrue(vertex.label().equals("person") || 
vertex.label().equals("software"));
 +                    break;
 +                }
 +                case VertexProgramM.PEOPLE_KNOWS_ONLY: {
 +                    assertEquals("person", vertex.label());
 +                    break;
 +                }
 +                case VertexProgramM.PEOPLE_KNOWS_WELL_ONLY: {
 +                    assertEquals("person", vertex.label());
 +                    break;
 +                }
 +                case VertexProgramM.VERTICES_ONLY: {
 +                    assertTrue(vertex.label().equals("person") || 
vertex.label().equals("software"));
 +                    break;
 +                }
 +                case VertexProgramM.ONE_OUT_EDGE_ONLY: {
 +                    assertTrue(vertex.label().equals("person") || 
vertex.label().equals("software"));
 +                    break;
 +                }
 +                case VertexProgramM.OUT_EDGES_ONLY: {
 +                    assertTrue(vertex.label().equals("person") || 
vertex.label().equals("software"));
 +                    break;
 +                }
 +                default:
 +                    throw new IllegalStateException("This is an illegal state 
for this test case: " + this.state);
 +            }
 +        }
 +
 +        @Override
 +        public void combine(final NullObject key, final Iterator<Integer> 
values, final ReduceEmitter<NullObject, Integer> emitter) {
 +            this.reduce(key, values, emitter);
 +        }
 +
 +        @Override
 +        public void reduce(final NullObject key, final Iterator<Integer> 
values, final ReduceEmitter<NullObject, Integer> emitter) {
 +            int count = 0;
 +            while (values.hasNext()) {
 +                count = count + values.next();
 +            }
 +            emitter.emit(count);
 +        }
 +
 +        @Override
 +        public Integer generateFinalResult(final 
Iterator<KeyValue<NullObject, Integer>> keyValues) {
 +            int counter = keyValues.next().getValue();
 +            assertFalse(keyValues.hasNext());
 +
 +            switch (this.state) {
 +                case VertexProgramM.SOFTWARE_ONLY: {
 +                    assertEquals(2, counter);
 +                    break;
 +                }
 +                case VertexProgramM.PEOPLE_ONLY: {
 +                    assertEquals(4, counter);
 +                    break;
 +                }
 +                case VertexProgramM.KNOWS_ONLY: {
 +                    assertEquals(6, counter);
 +                    break;
 +                }
 +                case VertexProgramM.PEOPLE_KNOWS_ONLY: {
 +                    assertEquals(4, counter);
 +                    break;
 +                }
 +                case VertexProgramM.PEOPLE_KNOWS_WELL_ONLY: {
 +                    assertEquals(4, counter);
 +                    break;
 +                }
 +                case VertexProgramM.VERTICES_ONLY: {
 +                    assertEquals(6, counter);
 +                    break;
 +                }
 +                case VertexProgramM.ONE_OUT_EDGE_ONLY: {
 +                    assertEquals(6, counter);
 +                    break;
 +                }
 +                case VertexProgramM.OUT_EDGES_ONLY: {
 +                    assertEquals(6, counter);
 +                    break;
 +                }
 +                default:
 +                    throw new IllegalStateException("This is an illegal state 
for this test case: " + this.state);
 +            }
 +            return counter;
 +        }
 +
 +        @Override
 +        public String getMemoryKey() {
 +            return "a";
 +        }
 +    }
 +
 +    @Test
 +    @LoadGraphWith(MODERN)
 +    public void shouldSupportJobChaining() throws Exception {
 +        final ComputerResult result1 = graphProvider.getGraphComputer(graph)
 +                
.program(PageRankVertexProgram.build().iterations(5).create(graph)).persist(GraphComputer.Persist.EDGES).result(GraphComputer.ResultGraph.NEW).submit().get();
 +        final Graph graph1 = result1.graph();
 +        final Memory memory1 = result1.memory();
 +        assertEquals(5, memory1.getIteration());
 +        assertEquals(6, graph1.traversal().V().count().next().intValue());
 +        assertEquals(6, graph1.traversal().E().count().next().intValue());
 +        assertEquals(6, 
graph1.traversal().V().values(PageRankVertexProgram.PAGE_RANK).count().next().intValue());
 +        assertEquals(18, 
graph1.traversal().V().values().count().next().intValue());
 +        //
 +        final ComputerResult result2 = 
graph1.compute(graphProvider.getGraphComputer(graph1).getClass())
 +                
.program(PeerPressureVertexProgram.build().maxIterations(4).create(graph1)).persist(GraphComputer.Persist.EDGES).result(GraphComputer.ResultGraph.NEW).submit().get();
 +        final Graph graph2 = result2.graph();
 +        final Memory memory2 = result2.memory();
 +        assertTrue(memory2.getIteration() <= 4);
 +        assertEquals(6, graph2.traversal().V().count().next().intValue());
 +        assertEquals(6, graph2.traversal().E().count().next().intValue());
 +        assertEquals(6, 
graph2.traversal().V().values(PeerPressureVertexProgram.CLUSTER).count().next().intValue());
 +        assertEquals(6, 
graph2.traversal().V().values(PageRankVertexProgram.PAGE_RANK).count().next().intValue());
 +        assertEquals(24, 
graph2.traversal().V().values().count().next().intValue());
 +        //
 +        final ComputerResult result3 = 
graph2.compute(graphProvider.getGraphComputer(graph2).getClass())
 +                
.program(TraversalVertexProgram.build().traversal(g.V().groupCount("m").by(__.values(PageRankVertexProgram.PAGE_RANK).count()).label().asAdmin()).create(graph2)).persist(GraphComputer.Persist.EDGES).result(GraphComputer.ResultGraph.NEW).submit().get();
 +        final Graph graph3 = result3.graph();
 +        final Memory memory3 = result3.memory();
 +        assertTrue(memory3.keys().contains("m"));
 +        
assertTrue(memory3.keys().contains(TraversalVertexProgram.HALTED_TRAVERSERS));
 +        assertEquals(1, memory3.<Map<Long, Long>>get("m").size());
 +        assertEquals(6, memory3.<Map<Long, Long>>get("m").get(1l).intValue());
 +        List<Traverser<String>> traversers = 
IteratorUtils.list(memory3.<TraverserSet>get(TraversalVertexProgram.HALTED_TRAVERSERS).iterator());
 +        assertEquals(6l, traversers.stream().map(Traverser::bulk).reduce((a, 
b) -> a + b).get().longValue());
 +        assertEquals(4l, traversers.stream().filter(s -> 
s.get().equals("person")).map(Traverser::bulk).reduce((a, b) -> a + 
b).get().longValue());
 +        assertEquals(2l, traversers.stream().filter(s -> 
s.get().equals("software")).map(Traverser::bulk).reduce((a, b) -> a + 
b).get().longValue());
 +        assertEquals(6, graph3.traversal().V().count().next().intValue());
 +        assertEquals(6, graph3.traversal().E().count().next().intValue());
 +        assertEquals(0, 
graph3.traversal().V().values(TraversalVertexProgram.HALTED_TRAVERSERS).count().next().intValue());
 +        assertEquals(6, 
graph3.traversal().V().values(PeerPressureVertexProgram.CLUSTER).count().next().intValue());
 +        assertEquals(6, 
graph3.traversal().V().values(PageRankVertexProgram.PAGE_RANK).count().next().intValue());
 +        assertEquals(24, 
graph3.traversal().V().values().count().next().intValue()); // no halted 
traversers
 +
 +        // TODO: add a test the shows DAG behavior -- splitting another 
TraversalVertexProgram off of the PeerPressureVertexProgram job.
 +    }
 +
 +    ///////////////////////////////////
 +
 +    @Test
 +    @LoadGraphWith(MODERN)
 +    public void shouldSupportPreExistingComputeKeys() throws Exception {
 +        final ComputerResult result = 
graphProvider.getGraphComputer(graph).program(new 
VertexProgramN()).submit().get();
 +        result.graph().vertices().forEachRemaining(vertex -> {
 +            if (vertex.label().equals("person")) {
 +                if (vertex.value("name").equals("marko"))
 +                    assertEquals(32, vertex.<Integer>value("age").intValue());
 +                else if (vertex.value("name").equals("peter"))
 +                    assertEquals(38, vertex.<Integer>value("age").intValue());
 +                else if (vertex.value("name").equals("vadas"))
 +                    assertEquals(30, vertex.<Integer>value("age").intValue());
 +                else if (vertex.value("name").equals("josh"))
 +                    assertEquals(35, vertex.<Integer>value("age").intValue());
 +                else
 +                    throw new IllegalStateException("This vertex should not 
have been accessed: " + vertex);
 +            }
 +        });
 +    }
 +
 +    private static class VertexProgramN extends StaticVertexProgram {
 +
 +        @Override
 +        public void setup(final Memory memory) {
 +
 +        }
 +
 +        @Override
 +        public void execute(final Vertex vertex, final Messenger messenger, 
final Memory memory) {
 +            if (vertex.label().equals("person"))
 +                vertex.property(VertexProperty.Cardinality.single, "age", 
vertex.<Integer>value("age") + 1);
 +        }
 +
 +        @Override
 +        public boolean terminate(final Memory memory) {
 +            return memory.getIteration() > 1;
 +        }
 +
 +        @Override
 +        public Set<MessageScope> getMessageScopes(final Memory memory) {
 +            return Collections.emptySet();
 +        }
 +
 +        @Override
 +        public Set<VertexComputeKey> getVertexComputeKeys() {
 +            return Collections.singleton(VertexComputeKey.of("age", false));
 +        }
 +
 +        @Override
 +        public GraphComputer.ResultGraph getPreferredResultGraph() {
 +            return GraphComputer.ResultGraph.NEW;
 +        }
 +
 +        @Override
 +        public GraphComputer.Persist getPreferredPersist() {
 +            return GraphComputer.Persist.VERTEX_PROPERTIES;
 +        }
 +    }
 +
 +    ///////////////////////////////////
 +
 +    @Test
 +    @LoadGraphWith(MODERN)
 +    public void shouldSupportTransientKeys() throws Exception {
 +        final ComputerResult result = 
graphProvider.getGraphComputer(graph).program(new 
VertexProgramO()).mapReduce(new MapReduceK()).submit().get();
 +        result.graph().vertices().forEachRemaining(vertex -> {
 +            assertFalse(vertex.property("v1").isPresent());
 +            assertFalse(vertex.property("v2").isPresent());
 +            assertTrue(vertex.property("v3").isPresent());
 +            assertEquals("shouldExist", vertex.value("v3"));
 +            assertTrue(vertex.property("name").isPresent());
 +            if (vertex.label().equals("software"))
 +                assertTrue(vertex.property("lang").isPresent());
 +            else
 +                assertTrue(vertex.property("age").isPresent());
 +            assertEquals(3, IteratorUtils.count(vertex.properties()));
 +            assertEquals(0, IteratorUtils.count(vertex.properties("v1")));
 +            assertEquals(0, IteratorUtils.count(vertex.properties("v2")));
 +            assertEquals(1, IteratorUtils.count(vertex.properties("v3")));
 +            assertEquals(1, IteratorUtils.count(vertex.properties("name")));
 +        });
 +        assertEquals(6l, 
result.graph().traversal().V().properties("name").count().next().longValue());
 +        assertEquals(0l, 
result.graph().traversal().V().properties("v1").count().next().longValue());
 +        assertEquals(0l, 
result.graph().traversal().V().properties("v2").count().next().longValue());
 +        assertEquals(6l, 
result.graph().traversal().V().properties("v3").count().next().longValue());
 +        assertEquals(6l, 
result.graph().traversal().V().<String>values("name").dedup().count().next().longValue());
 +        assertEquals(1l, 
result.graph().traversal().V().<String>values("v3").dedup().count().next().longValue());
 +        assertEquals("shouldExist", 
result.graph().traversal().V().<String>values("v3").dedup().next());
 +        ///
 +        assertFalse(result.memory().exists("m1"));
 +        assertFalse(result.memory().exists("m2"));
 +        assertTrue(result.memory().exists("m3"));
 +        assertEquals(24l, result.memory().<Long>get("m3").longValue());
 +        assertEquals(2, result.memory().keys().size());  // mapReduceK
 +    }
 +
 +    private static class VertexProgramO extends StaticVertexProgram {
 +
 +        @Override
 +        public void setup(final Memory memory) {
 +            assertFalse(memory.exists("m1"));
 +            assertFalse(memory.exists("m2"));
 +            assertFalse(memory.exists("m3"));
 +        }
 +
 +        @Override
 +        public void execute(final Vertex vertex, final Messenger messenger, 
final Memory memory) {
 +            if (memory.isInitialIteration()) {
 +                assertFalse(vertex.property("v1").isPresent());
 +                assertFalse(vertex.property("v2").isPresent());
 +                assertFalse(vertex.property("v3").isPresent());
 +                vertex.property("v1", "shouldNotExist");
 +                vertex.property("v2", "shouldNotExist");
 +                vertex.property("v3", "shouldExist");
 +                assertTrue(vertex.property("v1").isPresent());
 +                assertTrue(vertex.property("v2").isPresent());
 +                assertTrue(vertex.property("v3").isPresent());
 +                assertEquals("shouldNotExist", vertex.value("v1"));
 +                assertEquals("shouldNotExist", vertex.value("v2"));
 +                assertEquals("shouldExist", vertex.value("v3"));
 +                //
 +                assertFalse(memory.exists("m1"));
 +                assertFalse(memory.exists("m2"));
 +                assertFalse(memory.exists("m3"));
 +                memory.add("m1", false);
 +                memory.add("m2", true);
 +                memory.add("m3", 2l);
 +                // should still not exist as this pulls from the master memory
 +                assertFalse(memory.exists("m1"));
 +                assertFalse(memory.exists("m2"));
 +                assertFalse(memory.exists("m3"));
 +
 +            } else {
 +                assertTrue(vertex.property("v1").isPresent());
 +                assertTrue(vertex.property("v2").isPresent());
 +                assertTrue(vertex.property("v3").isPresent());
 +                assertEquals("shouldNotExist", vertex.value("v1"));
 +                assertEquals("shouldNotExist", vertex.value("v2"));
 +                assertEquals("shouldExist", vertex.value("v3"));
 +                //
 +                assertTrue(memory.exists("m1"));
 +                assertTrue(memory.exists("m2"));
 +                assertTrue(memory.exists("m3"));
 +                assertFalse(memory.get("m1"));
 +                assertTrue(memory.get("m2"));
 +                assertEquals(12l, memory.<Long>get("m3").longValue());
 +                memory.add("m1", true);
 +                memory.add("m2", true);
 +                memory.add("m3", 2l);
              }
          }
  

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f1aed80b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerMessenger.java
----------------------------------------------------------------------

Reply via email to