Repository: tinkerpop Updated Branches: refs/heads/tp32 5ac61b7af -> f1aed80b0
bugfix Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/28c514da Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/28c514da Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/28c514da Branch: refs/heads/tp32 Commit: 28c514da9065683ed90ea6aabc66ffdcbab99c11 Parents: b1c0723 Author: Sheldon <shel...@mindmaps.io> Authored: Thu Jun 15 18:00:32 2017 +0100 Committer: Sheldon <shel...@mindmaps.io> Committed: Mon Jun 19 11:20:20 2017 +0100 ---------------------------------------------------------------------- .../process/computer/TinkerMessageBoard.java | 4 ++-- .../process/computer/TinkerMessenger.java | 22 +++++++++++++------- 2 files changed, 16 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/28c514da/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerMessageBoard.java ---------------------------------------------------------------------- diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerMessageBoard.java b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerMessageBoard.java index b217801..422ab86 100644 --- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerMessageBoard.java +++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerMessageBoard.java @@ -32,8 +32,8 @@ import java.util.concurrent.ConcurrentHashMap; */ final class TinkerMessageBoard<M> { - public Map<Vertex, Queue<M>> sendMessages = new ConcurrentHashMap<>(); - public Map<Vertex, Queue<M>> receiveMessages = new ConcurrentHashMap<>(); + public Map<MessageScope, Map<Vertex,Queue<M>>> sendMessages = new ConcurrentHashMap<>(); + public Map<MessageScope, Map<Vertex, Queue<M>>> receiveMessages = new ConcurrentHashMap<>(); public Set<MessageScope> previousMessageScopes = new HashSet<>(); public Set<MessageScope> currentMessageScopes = new HashSet<>(); http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/28c514da/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerMessenger.java ---------------------------------------------------------------------- diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerMessenger.java b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerMessenger.java index 3298aff..d3f2de5 100644 --- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerMessenger.java +++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerMessenger.java @@ -37,6 +37,7 @@ import java.util.Optional; import java.util.Queue; import java.util.Spliterator; import java.util.Spliterators; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.stream.Stream; import java.util.stream.StreamSupport; @@ -59,14 +60,15 @@ public final class TinkerMessenger<M> implements Messenger<M> { @Override public Iterator<M> receiveMessages() { final MultiIterator<M> multiIterator = new MultiIterator<>(); - for (final MessageScope messageScope : this.messageBoard.previousMessageScopes) { + for (final MessageScope messageScope : this.messageBoard.receiveMessages.keySet()) { +// for (final MessageScope messageScope : this.messageBoard.previousMessageScopes) { if (messageScope instanceof MessageScope.Local) { final MessageScope.Local<M> localMessageScope = (MessageScope.Local<M>) messageScope; final Traversal.Admin<Vertex, Edge> incidentTraversal = TinkerMessenger.setVertexStart(localMessageScope.getIncidentTraversal().get().asAdmin(), this.vertex); final Direction direction = TinkerMessenger.getDirection(incidentTraversal); final Edge[] edge = new Edge[1]; // simulates storage side-effects available in Gremlin, but not Java8 streams multiIterator.addIterator(StreamSupport.stream(Spliterators.spliteratorUnknownSize(VertexProgramHelper.reverse(incidentTraversal.asAdmin()), Spliterator.IMMUTABLE | Spliterator.SIZED), false) - .map(e -> this.messageBoard.receiveMessages.get((edge[0] = e).vertices(direction).next())) + .map(e -> this.messageBoard.receiveMessages.get(messageScope).get((edge[0] = e).vertices(direction).next())) .filter(q -> null != q) .flatMap(Queue::stream) .map(message -> localMessageScope.getEdgeFunction().apply(message, edge[0])) @@ -74,7 +76,7 @@ public final class TinkerMessenger<M> implements Messenger<M> { } else { multiIterator.addIterator(Stream.of(this.vertex) - .map(this.messageBoard.receiveMessages::get) + .map(this.messageBoard.receiveMessages.get(messageScope)::get) .filter(q -> null != q) .flatMap(Queue::stream) .iterator()); @@ -85,16 +87,20 @@ public final class TinkerMessenger<M> implements Messenger<M> { @Override public void sendMessage(final MessageScope messageScope, final M message) { - this.messageBoard.currentMessageScopes.add(messageScope); +// this.messageBoard.currentMessageScopes.add(messageScope); if (messageScope instanceof MessageScope.Local) { - addMessage(this.vertex, message); + addMessage(this.vertex, message, messageScope); } else { - ((MessageScope.Global) messageScope).vertices().forEach(v -> addMessage(v, message)); + ((MessageScope.Global) messageScope).vertices().forEach(v -> addMessage(v, message, messageScope)); } } - private void addMessage(final Vertex vertex, final M message) { - this.messageBoard.sendMessages.compute(vertex, (v, queue) -> { + private void addMessage(final Vertex vertex, final M message, MessageScope messageScope) { + this.messageBoard.sendMessages.compute(messageScope, (ms, messages) -> { + if(null==messages) messages = new ConcurrentHashMap<>(); + return messages; + }); + this.messageBoard.sendMessages.get(messageScope).compute(vertex, (v, queue) -> { if (null == queue) queue = new ConcurrentLinkedQueue<>(); queue.add(null != this.combiner && !queue.isEmpty() ? this.combiner.combine(queue.remove(), message) : message); return queue;