TINKERPOP-1967 fixed up halted traversers
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/b2cb1874 Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/b2cb1874 Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/b2cb1874 Branch: refs/heads/master Commit: b2cb187470794bfca352c8a9c7d0c444d102e46b Parents: 8954c27 Author: Stephen Mallette <sp...@genoprime.com> Authored: Mon Jul 30 10:51:35 2018 -0400 Committer: Stephen Mallette <sp...@genoprime.com> Committed: Thu Aug 9 10:54:41 2018 -0400 ---------------------------------------------------------------------- .../ConnectedComponentVertexProgram.java | 42 +++++++++++++++----- .../ConnectedComponentVertexProgramStep.java | 26 ++++++++++-- .../step/map/ConnectedComponentTest.java | 2 +- 3 files changed, 57 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b2cb1874/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/clustering/connected/ConnectedComponentVertexProgram.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/clustering/connected/ConnectedComponentVertexProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/clustering/connected/ConnectedComponentVertexProgram.java index de718f1..82907eb 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/clustering/connected/ConnectedComponentVertexProgram.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/clustering/connected/ConnectedComponentVertexProgram.java @@ -32,7 +32,10 @@ import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexPr import org.apache.tinkerpop.gremlin.process.computer.util.AbstractVertexProgramBuilder; import org.apache.tinkerpop.gremlin.process.traversal.Operator; import org.apache.tinkerpop.gremlin.process.traversal.Traversal; +import org.apache.tinkerpop.gremlin.process.traversal.Traverser; import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__; +import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.IndexedTraverserSet; +import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet; import org.apache.tinkerpop.gremlin.process.traversal.util.PureTraversal; import org.apache.tinkerpop.gremlin.structure.Direction; import org.apache.tinkerpop.gremlin.structure.Edge; @@ -40,6 +43,8 @@ import org.apache.tinkerpop.gremlin.structure.Graph; import org.apache.tinkerpop.gremlin.structure.Vertex; import org.apache.tinkerpop.gremlin.structure.VertexProperty; +import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.Iterator; @@ -61,12 +66,14 @@ public class ConnectedComponentVertexProgram implements VertexProgram<String> { private static final String VOTE_TO_HALT = "gremlin.connectedComponentVertexProgram.voteToHalt"; private static final Set<MemoryComputeKey> MEMORY_COMPUTE_KEYS = Collections.singleton(MemoryComputeKey.of(VOTE_TO_HALT, Operator.and, false, true)); + private MessageScope.Local<?> scope = MessageScope.Local.of(__::bothE); private Set<MessageScope> scopes; private String property = COMPONENT; - private boolean hasHalted = false; private PureTraversal<Vertex, Edge> edgeTraversal = null; private Configuration configuration; + private TraverserSet<Vertex> haltedTraversers; + private IndexedTraverserSet<Vertex, Vertex> haltedTraversersIndex; private ConnectedComponentVertexProgram() {} @@ -85,7 +92,12 @@ public class ConnectedComponentVertexProgram implements VertexProgram<String> { scopes = new HashSet<>(Collections.singletonList(scope)); this.property = configuration.getString(PROPERTY, COMPONENT); - this.hasHalted = configuration.getBoolean(HAS_HALTED, false); + + this.haltedTraversers = TraversalVertexProgram.loadHaltedTraversers(configuration); + this.haltedTraversersIndex = new IndexedTraverserSet<>(v -> v); + for (final Traverser.Admin<Vertex> traverser : this.haltedTraversers) { + this.haltedTraversersIndex.add(traverser.split()); + } } @Override @@ -104,6 +116,8 @@ public class ConnectedComponentVertexProgram implements VertexProgram<String> { @Override public void execute(final Vertex vertex, final Messenger<String> messenger, final Memory memory) { if (memory.isInitialIteration()) { + copyHaltedTraversersFromMemory(vertex); + // on the first pass, just initialize the component to its own id then pass it to all adjacent vertices // for evaluation vertex.property(VertexProperty.Cardinality.single, property, vertex.id().toString()); @@ -113,7 +127,7 @@ public class ConnectedComponentVertexProgram implements VertexProgram<String> { // halting traversers. only want to send messages from traversers that are still hanging about after // the filter. the unfiltered vertices can only react to messages sent to them. of course, this can // lead to weirdness in results. - if (vertex.edges(Direction.BOTH).hasNext() && !(hasHalted && !vertex.property(TraversalVertexProgram.HALTED_TRAVERSERS).isPresent())) { + if (vertex.edges(Direction.BOTH).hasNext()) { // since there was message passing we don't want to halt on the first round. this should only trigger // a single pass finish if the graph is completely disconnected (technically, it won't even really // work in cases where halted traversers come into play @@ -148,7 +162,9 @@ public class ConnectedComponentVertexProgram implements VertexProgram<String> { @Override public Set<VertexComputeKey> getVertexComputeKeys() { - return new HashSet<>(Collections.singletonList(VertexComputeKey.of(property, false))); + return new HashSet<>(Arrays.asList( + VertexComputeKey.of(property, false), + VertexComputeKey.of(TraversalVertexProgram.HALTED_TRAVERSERS, false))); } @Override @@ -158,6 +174,10 @@ public class ConnectedComponentVertexProgram implements VertexProgram<String> { @Override public boolean terminate(final Memory memory) { + if (memory.isInitialIteration() && this.haltedTraversersIndex != null) { + this.haltedTraversersIndex.clear(); + } + final boolean voteToHalt = memory.<Boolean>get(VOTE_TO_HALT); if (voteToHalt) { return true; @@ -206,6 +226,15 @@ public class ConnectedComponentVertexProgram implements VertexProgram<String> { }; } + private void copyHaltedTraversersFromMemory(final Vertex vertex) { + final Collection<Traverser.Admin<Vertex>> traversers = this.haltedTraversersIndex.get(vertex); + if (traversers != null) { + final TraverserSet<Vertex> newHaltedTraversers = new TraverserSet<>(); + newHaltedTraversers.addAll(traversers); + vertex.property(VertexProperty.Cardinality.single, TraversalVertexProgram.HALTED_TRAVERSERS, newHaltedTraversers); + } + } + public static ConnectedComponentVertexProgram.Builder build() { return new ConnectedComponentVertexProgram.Builder(); } @@ -216,11 +245,6 @@ public class ConnectedComponentVertexProgram implements VertexProgram<String> { super(ConnectedComponentVertexProgram.class); } - public ConnectedComponentVertexProgram.Builder hasHalted(final boolean hasHalted) { - this.configuration.setProperty(HAS_HALTED, hasHalted); - return this; - } - public ConnectedComponentVertexProgram.Builder edges(final Traversal.Admin<Vertex, Edge> edgeTraversal) { PureTraversal.storeState(this.configuration, EDGE_TRAVERSAL, edgeTraversal); return this; http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b2cb1874/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/ConnectedComponentVertexProgramStep.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/ConnectedComponentVertexProgramStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/ConnectedComponentVertexProgramStep.java index edeb497..c222cfa 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/ConnectedComponentVertexProgramStep.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/ConnectedComponentVertexProgramStep.java @@ -29,11 +29,16 @@ import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__; import org.apache.tinkerpop.gremlin.process.traversal.step.Configuring; import org.apache.tinkerpop.gremlin.process.traversal.step.TraversalParent; import org.apache.tinkerpop.gremlin.process.traversal.step.util.Parameters; +import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet; import org.apache.tinkerpop.gremlin.process.traversal.util.PureTraversal; import org.apache.tinkerpop.gremlin.structure.Edge; import org.apache.tinkerpop.gremlin.structure.Graph; import org.apache.tinkerpop.gremlin.structure.Vertex; import org.apache.tinkerpop.gremlin.structure.util.StringFactory; +import org.apache.tinkerpop.gremlin.util.Serializer; + +import java.io.IOException; +import java.util.Base64; /** * @author Stephen Mallette (http://stephen.genoprime.com) @@ -84,10 +89,25 @@ public final class ConnectedComponentVertexProgramStep extends VertexProgramStep public ConnectedComponentVertexProgram generateProgram(final Graph graph, final Memory memory) { final Traversal.Admin<Vertex, Edge> detachedTraversal = this.edgeTraversal.getPure(); detachedTraversal.setStrategies(TraversalStrategies.GlobalCache.getStrategies(graph.getClass())); - return ConnectedComponentVertexProgram.build(). - hasHalted(memory.exists(TraversalVertexProgram.HALTED_TRAVERSERS)). + + final ConnectedComponentVertexProgram.Builder builder = ConnectedComponentVertexProgram.build(). edges(detachedTraversal). - property(this.clusterProperty).create(graph); + property(this.clusterProperty); + + if (memory.exists(TraversalVertexProgram.HALTED_TRAVERSERS)) { + final TraverserSet<?> haltedTraversers = memory.get(TraversalVertexProgram.HALTED_TRAVERSERS); + if (!haltedTraversers.isEmpty()) { + Object haltedTraversersValue; + try { + haltedTraversersValue = Base64.getEncoder().encodeToString(Serializer.serializeObject(haltedTraversers)); + } catch (final IOException ignored) { + haltedTraversersValue = haltedTraversers; + } + builder.configure(TraversalVertexProgram.HALTED_TRAVERSERS, haltedTraversersValue); + } + } + + return builder.create(graph); } @Override http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b2cb1874/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/ConnectedComponentTest.java ---------------------------------------------------------------------- diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/ConnectedComponentTest.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/ConnectedComponentTest.java index 25e618a..8b1904f 100644 --- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/ConnectedComponentTest.java +++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/ConnectedComponentTest.java @@ -67,7 +67,7 @@ public abstract class ConnectedComponentTest extends AbstractGremlinProcessTest switch (name) { case "lop": case "ripple": - assertEquals("3", vertex.value(ConnectedComponentVertexProgram.COMPONENT)); + assertEquals("1", vertex.value(ConnectedComponentVertexProgram.COMPONENT)); break; } counter++;