moved all the GroupStep work against tp32/

Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/3496402a
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/3496402a
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/3496402a

Branch: refs/heads/TINKERPOP-1602
Commit: 3496402a4e0c2803031d3b88086aabd5c6a2cfd8
Parents: 97cc07d
Author: Marko A. Rodriguez <okramma...@gmail.com>
Authored: Thu Jan 19 04:16:56 2017 -0700
Committer: Marko A. Rodriguez <okramma...@gmail.com>
Committed: Thu Jan 19 04:16:56 2017 -0700

----------------------------------------------------------------------
 CHANGELOG.asciidoc                              |   1 +
 .../process/traversal/step/map/GroupStep.java   | 263 +++----------------
 .../step/sideEffect/GroupSideEffectStep.java    |  50 ++--
 .../step/sideEffect/GroovyGroupTest.groovy      |   5 +
 .../traversal/step/sideEffect/GroupTest.java    |  30 ++-
 5 files changed, 91 insertions(+), 258 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3496402a/CHANGELOG.asciidoc
----------------------------------------------------------------------
diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index 4f3f9ce..74751fa 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -26,6 +26,7 @@ 
image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima
 TinkerPop 3.2.4 (Release Date: NOT OFFICIALLY RELEASED YET)
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 
+* `GroupBiOperator` no longer maintains state and thus, no more side-effect 
related OLAP inconsistencies.
 * SASL negotiation supports both a byte array and Base64 encoded bytes as a 
string for authentication to Gremlin Server.
 * Deprecated `TinkerIoRegistry` replacing it with the more consistently named 
`TinkerIoRegistryV1d0`.
 * Made error messaging more consistent during result iteration timeouts in 
Gremlin Server.

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3496402a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java
----------------------------------------------------------------------
diff --git 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java
 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java
index d6ce421..07ca4ae 100644
--- 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java
+++ 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java
@@ -19,7 +19,7 @@
 
 package org.apache.tinkerpop.gremlin.process.traversal.step.map;
 
-import org.apache.tinkerpop.gremlin.process.traversal.Step;
+import org.apache.tinkerpop.gremlin.process.traversal.Operator;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
@@ -29,22 +29,14 @@ import 
org.apache.tinkerpop.gremlin.process.traversal.lambda.IdentityTraversal;
 import org.apache.tinkerpop.gremlin.process.traversal.lambda.TokenTraversal;
 import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
 import org.apache.tinkerpop.gremlin.process.traversal.step.ByModulating;
-import org.apache.tinkerpop.gremlin.process.traversal.step.LambdaHolder;
 import org.apache.tinkerpop.gremlin.process.traversal.step.TraversalParent;
-import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyStep;
 import 
org.apache.tinkerpop.gremlin.process.traversal.step.util.ReducingBarrierStep;
 import 
org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
-import 
org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
 import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
 import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalUtil;
 import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
 import org.apache.tinkerpop.gremlin.util.function.HashMapSupplier;
-import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
-import org.javatuples.Pair;
 
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -60,14 +52,14 @@ public final class GroupStep<S, K, V> extends 
ReducingBarrierStep<S, Map<K, V>>
 
     private char state = 'k';
     private Traversal.Admin<S, K> keyTraversal;
-    private Traversal.Admin<S, ?> preTraversal;
     private Traversal.Admin<S, V> valueTraversal;
+    private Barrier barrierStep;
 
     public GroupStep(final Traversal.Admin traversal) {
         super(traversal);
         this.valueTraversal = this.integrateChild(__.fold().asAdmin());
-        this.preTraversal = 
this.integrateChild(generatePreTraversal(this.valueTraversal));
-        this.setReducingBiOperator(new GroupBiOperator<>(this.valueTraversal));
+        this.barrierStep = 
TraversalHelper.getFirstStepOfAssignableClass(Barrier.class, 
this.valueTraversal).orElse(null);
+        this.setReducingBiOperator(new GroupBiOperator<>(null == 
this.barrierStep ? Operator.assign : 
this.barrierStep.getMemoryComputeKey().getReducer()));
         this.setSeedSupplier(HashMapSupplier.instance());
     }
 
@@ -78,8 +70,8 @@ public final class GroupStep<S, K, V> extends 
ReducingBarrierStep<S, Map<K, V>>
             this.state = 'v';
         } else if ('v' == this.state) {
             this.valueTraversal = 
this.integrateChild(convertValueTraversal(kvTraversal));
-            this.preTraversal = 
this.integrateChild(generatePreTraversal(this.valueTraversal));
-            this.setReducingBiOperator(new 
GroupBiOperator<>(this.valueTraversal));
+            this.barrierStep = 
TraversalHelper.getFirstStepOfAssignableClass(Barrier.class, 
this.valueTraversal).orElse(null);
+            this.setReducingBiOperator(new GroupBiOperator<>(null == 
this.barrierStep ? Operator.assign : 
this.barrierStep.getMemoryComputeKey().getReducer()));
             this.state = 'x';
         } else {
             throw new IllegalStateException("The key and value traversals for 
group()-step have already been set: " + this);
@@ -89,17 +81,13 @@ public final class GroupStep<S, K, V> extends 
ReducingBarrierStep<S, Map<K, V>>
     @Override
     public Map<K, V> projectTraverser(final Traverser.Admin<S> traverser) {
         final Map<K, V> map = new HashMap<>(1);
-        if (null == this.preTraversal) {
-            map.put(TraversalUtil.applyNullable(traverser, this.keyTraversal), 
(V) traverser);
-        } else {
-            final TraverserSet traverserSet = new TraverserSet<>();
-            this.preTraversal.reset();
-            this.preTraversal.addStart(traverser);
-            while (this.preTraversal.hasNext()) {
-                traverserSet.add(this.preTraversal.nextTraverser());
-            }
-            map.put(TraversalUtil.applyNullable(traverser, this.keyTraversal), 
(V) traverserSet);
-        }
+        this.valueTraversal.reset();
+        this.valueTraversal.addStart(traverser);
+        if (null == this.barrierStep) {
+            if (this.valueTraversal.hasNext())
+                map.put(TraversalUtil.applyNullable(traverser, 
this.keyTraversal), (V) this.valueTraversal.next());
+        } else if (this.barrierStep.hasNextBarrier())
+            map.put(TraversalUtil.applyNullable(traverser, this.keyTraversal), 
(V) this.barrierStep.nextBarrier());
         return map;
     }
 
@@ -110,12 +98,10 @@ public final class GroupStep<S, K, V> extends 
ReducingBarrierStep<S, Map<K, V>>
 
     @Override
     public List<Traversal.Admin<?, ?>> getLocalChildren() {
-        final List<Traversal.Admin<?, ?>> children = new ArrayList<>(3);
+        final List<Traversal.Admin<?, ?>> children = new ArrayList<>(2);
         if (null != this.keyTraversal)
             children.add(this.keyTraversal);
         children.add(this.valueTraversal);
-        if (null != this.preTraversal)
-            children.add(this.preTraversal);
         return children;
     }
 
@@ -130,8 +116,7 @@ public final class GroupStep<S, K, V> extends 
ReducingBarrierStep<S, Map<K, V>>
         if (null != this.keyTraversal)
             clone.keyTraversal = this.keyTraversal.clone();
         clone.valueTraversal = this.valueTraversal.clone();
-        clone.preTraversal = (Traversal.Admin<S, ?>) 
GroupStep.generatePreTraversal(clone.valueTraversal);
-        clone.setReducingBiOperator(new 
GroupBiOperator<>(clone.valueTraversal));
+        clone.barrierStep = 
TraversalHelper.getFirstStepOfAssignableClass(Barrier.class, 
clone.valueTraversal).orElse(null);
         return clone;
     }
 
@@ -140,7 +125,6 @@ public final class GroupStep<S, K, V> extends 
ReducingBarrierStep<S, Map<K, V>>
         super.setTraversal(parentTraversal);
         integrateChild(this.keyTraversal);
         integrateChild(this.valueTraversal);
-        integrateChild(this.preTraversal);
     }
 
     @Override
@@ -158,180 +142,31 @@ public final class GroupStep<S, K, V> extends 
ReducingBarrierStep<S, Map<K, V>>
 
     ///////////////////////
 
-    public static final class GroupBiOperator<K, V> implements 
BinaryOperator<Map<K, V>>, Serializable, Cloneable {
-
-        // size limit before Barrier.processAllStarts() to lazy reduce
-        private static final int SIZE_LIMIT = 1000;
-
-        private Traversal.Admin<?, V> valueTraversal;
-        private Barrier barrierStep;
+    public static final class GroupBiOperator<K, V> implements 
BinaryOperator<Map<K, V>>, Serializable {
 
-        public GroupBiOperator(final Traversal.Admin<?, V> valueTraversal) {
-            // if there is a lambda that can not be serialized, then simply 
use TraverserSets
-            if 
(TraversalHelper.hasStepOfAssignableClassRecursively(LambdaHolder.class, 
valueTraversal)) {
-                this.valueTraversal = null;
-                this.barrierStep = null;
-            } else {
-                this.valueTraversal = valueTraversal.clone();
-                this.barrierStep = 
TraversalHelper.getFirstStepOfAssignableClass(Barrier.class, 
this.valueTraversal).orElse(null);
-            }
-        }
+        private BinaryOperator<V> barrierAggregator;
 
         public GroupBiOperator() {
             // no-arg constructor for serialization
         }
 
-        @Override
-        public GroupBiOperator<K, V> clone() {
-            try {
-                final GroupBiOperator<K, V> clone = (GroupBiOperator<K, V>) 
super.clone();
-                if (null != this.valueTraversal) {
-                    clone.valueTraversal = this.valueTraversal.clone();
-                    clone.barrierStep = 
TraversalHelper.getFirstStepOfAssignableClass(Barrier.class, 
clone.valueTraversal).orElse(null);
-                }
-                return clone;
-            } catch (final CloneNotSupportedException e) {
-                throw new IllegalStateException(e.getMessage(), e);
-            }
+        public GroupBiOperator(final BinaryOperator<V> barrierAggregator) {
+            this.barrierAggregator = barrierAggregator;
         }
 
         @Override
         public Map<K, V> apply(final Map<K, V> mapA, final Map<K, V> mapB) {
             for (final K key : mapB.keySet()) {
-                Object objectA = mapA.get(key);
-                final Object objectB = mapB.get(key);
-                assert null != objectB;
-                if (null == objectA) {
+                V objectA = mapA.get(key);
+                final V objectB = mapB.get(key);
+                if (null == objectA)
                     objectA = objectB;
-                } else {
-                    // TRAVERSER
-                    if (objectA instanceof Traverser.Admin) {
-                        if (objectB instanceof Traverser.Admin) {
-                            final TraverserSet set = new TraverserSet();
-                            set.add((Traverser.Admin) objectA);
-                            set.add((Traverser.Admin) objectB);
-                            objectA = set;
-                        } else if (objectB instanceof TraverserSet) {
-                            final TraverserSet set = (TraverserSet) objectB;
-                            set.add((Traverser.Admin) objectA);
-                            if (null != this.barrierStep && set.size() > 
SIZE_LIMIT) {
-                                this.valueTraversal.reset();
-                                ((Step) 
this.barrierStep).addStarts(set.iterator());
-                                objectA = this.barrierStep.nextBarrier();
-                            } else
-                                objectA = objectB;
-                        } else if (objectB instanceof Pair) {
-                            final TraverserSet set = (TraverserSet) ((Pair) 
objectB).getValue0();
-                            set.add((Traverser.Admin) objectA);
-                            if (set.size() > SIZE_LIMIT) {    // barrier step 
can never be null -- no need to check
-                                this.valueTraversal.reset();
-                                ((Step) 
this.barrierStep).addStarts(set.iterator());
-                                this.barrierStep.addBarrier(((Pair) 
objectB).getValue1());
-                                objectA = this.barrierStep.nextBarrier();
-                            } else
-                                objectA = Pair.with(set, ((Pair) 
objectB).getValue1());
-                        } else
-                            objectA = Pair.with(new 
TraverserSet((Traverser.Admin) objectA), objectB);
-                        // TRAVERSER SET
-                    } else if (objectA instanceof TraverserSet) {
-                        if (objectB instanceof Traverser.Admin) {
-                            final TraverserSet set = (TraverserSet) objectA;
-                            set.add((Traverser.Admin) objectB);
-                            if (null != this.barrierStep && set.size() > 
SIZE_LIMIT) {
-                                this.valueTraversal.reset();
-                                ((Step) 
this.barrierStep).addStarts(set.iterator());
-                                objectA = this.barrierStep.nextBarrier();
-                            }
-                        } else if (objectB instanceof TraverserSet) {
-                            final TraverserSet set = (TraverserSet) objectA;
-                            set.addAll((TraverserSet) objectB);
-                            if (null != this.barrierStep && set.size() > 
SIZE_LIMIT) {
-                                this.valueTraversal.reset();
-                                ((Step) 
this.barrierStep).addStarts(set.iterator());
-                                objectA = this.barrierStep.nextBarrier();
-                            }
-                        } else if (objectB instanceof Pair) {
-                            final TraverserSet set = (TraverserSet) objectA;
-                            set.addAll((TraverserSet) ((Pair) 
objectB).getValue0());
-                            if (set.size() > SIZE_LIMIT) {  // barrier step 
can never be null -- no need to check
-                                this.valueTraversal.reset();
-                                ((Step) 
this.barrierStep).addStarts(set.iterator());
-                                this.barrierStep.addBarrier(((Pair) 
objectB).getValue1());
-                                objectA = this.barrierStep.nextBarrier();
-                            } else
-                                objectA = Pair.with(set, ((Pair) 
objectB).getValue1());
-                        } else
-                            objectA = Pair.with(objectA, objectB);
-                        // TRAVERSER SET + BARRIER
-                    } else if (objectA instanceof Pair) {
-                        if (objectB instanceof Traverser.Admin) {
-                            final TraverserSet set = ((TraverserSet) ((Pair) 
objectA).getValue0());
-                            set.add((Traverser.Admin) objectB);
-                            if (set.size() > SIZE_LIMIT) { // barrier step can 
never be null -- no need to check
-                                this.valueTraversal.reset();
-                                ((Step) 
this.barrierStep).addStarts(set.iterator());
-                                this.barrierStep.addBarrier(((Pair) 
objectA).getValue1());
-                                objectA = this.barrierStep.nextBarrier();
-                            }
-                        } else if (objectB instanceof TraverserSet) {
-                            final TraverserSet set = (TraverserSet) ((Pair) 
objectA).getValue0();
-                            set.addAll((TraverserSet) objectB);
-                            if (set.size() > SIZE_LIMIT) {   // barrier step 
can never be null -- no need to check
-                                this.valueTraversal.reset();
-                                ((Step) 
this.barrierStep).addStarts(set.iterator());
-                                this.barrierStep.addBarrier(((Pair) 
objectA).getValue1());
-                                objectA = this.barrierStep.nextBarrier();
-                            }
-                        } else if (objectB instanceof Pair) {
-                            this.valueTraversal.reset();
-                            this.barrierStep.addBarrier(((Pair) 
objectA).getValue1());
-                            this.barrierStep.addBarrier(((Pair) 
objectB).getValue1());
-                            ((Step) 
this.barrierStep).addStarts(((TraverserSet) ((Pair) 
objectA).getValue0()).iterator());
-                            ((Step) 
this.barrierStep).addStarts(((TraverserSet) ((Pair) 
objectB).getValue0()).iterator());
-                            objectA = this.barrierStep.nextBarrier();
-                        } else {
-                            this.valueTraversal.reset();
-                            this.barrierStep.addBarrier(((Pair) 
objectA).getValue1());
-                            this.barrierStep.addBarrier(objectB);
-                            ((Step) 
this.barrierStep).addStarts(((TraverserSet) ((Pair) 
objectA).getValue0()).iterator());
-                            objectA = this.barrierStep.nextBarrier();
-                        }
-                        // BARRIER
-                    } else {
-                        if (objectB instanceof Traverser.Admin) {
-                            objectA = Pair.with(new 
TraverserSet<>((Traverser.Admin) objectB), objectA);
-                        } else if (objectB instanceof TraverserSet) {
-                            objectA = Pair.with(objectB, objectA);
-                        } else if (objectB instanceof Pair) {
-                            this.valueTraversal.reset();
-                            this.barrierStep.addBarrier(objectA);
-                            this.barrierStep.addBarrier(((Pair) 
objectB).getValue1());
-                            ((Step) 
this.barrierStep).addStarts(((TraverserSet) ((Pair) 
objectB).getValue0()).iterator());
-                            objectA = this.barrierStep.nextBarrier();
-                        } else {
-                            this.valueTraversal.reset();
-                            this.barrierStep.addBarrier(objectA);
-                            this.barrierStep.addBarrier(objectB);
-                            objectA = this.barrierStep.nextBarrier();
-                        }
-                    }
-                }
-                mapA.put(key, (V) objectA);
+                else if (null != objectB)
+                    objectA = this.barrierAggregator.apply(objectA, objectB);
+                mapA.put(key, objectA);
             }
             return mapA;
         }
-
-        // necessary to control Java Serialization to ensure proper clearing 
of internal traverser data
-        private void writeObject(final ObjectOutputStream outputStream) throws 
IOException {
-            // necessary as a non-root child is being sent over the wire
-            if (null != this.valueTraversal) 
this.valueTraversal.setParent(EmptyStep.instance());
-            outputStream.writeObject(null == this.valueTraversal ? null : 
this.valueTraversal.clone()); // todo: reset() instead?
-        }
-
-        private void readObject(final ObjectInputStream inputStream) throws 
IOException, ClassNotFoundException {
-            this.valueTraversal = (Traversal.Admin<?, V>) 
inputStream.readObject();
-            this.barrierStep = null == this.valueTraversal ? null : 
TraversalHelper.getFirstStepOfAssignableClass(Barrier.class, 
this.valueTraversal).orElse(null);
-        }
     }
 
 
@@ -343,55 +178,19 @@ public final class GroupStep<S, K, V> extends 
ReducingBarrierStep<S, Map<K, V>>
                 valueTraversal instanceof IdentityTraversal ||
                 valueTraversal.getStartStep() instanceof LambdaMapStep && 
((LambdaMapStep) valueTraversal.getStartStep()).getMapFunction() instanceof 
FunctionTraverser) {
             return (Traversal.Admin<S, E>) __.map(valueTraversal).fold();
-        } else {
+        } else
             return valueTraversal;
-        }
-    }
-
-    public static Traversal.Admin<?, ?> generatePreTraversal(final 
Traversal.Admin<?, ?> valueTraversal) {
-        if (!TraversalHelper.hasStepOfAssignableClass(Barrier.class, 
valueTraversal))
-            return valueTraversal.clone();
-        final Traversal.Admin<?, ?> first = __.identity().asAdmin();
-        boolean updated = false;
-        for (final Step step : valueTraversal.getSteps()) {
-            if (step instanceof Barrier)
-                break;
-            first.addStep(step.clone());
-            updated = true;
-        }
-        return updated ? first : null;
     }
 
     public static <K, V> Map<K, V> doFinalReduction(final Map<K, Object> map, 
final Traversal.Admin<?, V> valueTraversal) {
-        final Map<K, V> reducedMap = new HashMap<>(map.size());
-        final Barrier reducingBarrierStep = 
TraversalHelper.getFirstStepOfAssignableClass(Barrier.class, 
valueTraversal).orElse(null);
-        
IteratorUtils.removeOnNext(map.entrySet().iterator()).forEachRemaining(entry -> 
{
-            if (null == reducingBarrierStep) {
-                if (entry.getValue() instanceof TraverserSet) {
-                    if (!((TraverserSet) entry.getValue()).isEmpty())
-                        reducedMap.put(entry.getKey(), ((TraverserSet<V>) 
entry.getValue()).peek().get());
-                } else
-                    reducedMap.put(entry.getKey(), (V) entry.getValue());
-            } else {
+        TraversalHelper.getFirstStepOfAssignableClass(Barrier.class, 
valueTraversal).ifPresent(barrierStep -> {
+            for (final K key : map.keySet()) {
                 valueTraversal.reset();
-                if (entry.getValue() instanceof Traverser.Admin)
-                    ((Step) reducingBarrierStep).addStart((Traverser.Admin) 
entry.getValue());
-                else if (entry.getValue() instanceof TraverserSet)
-                    ((Step) reducingBarrierStep).addStarts(((TraverserSet) 
entry.getValue()).iterator());
-                else if (entry.getValue() instanceof Pair) {
-                    ((Step) reducingBarrierStep).addStarts(((TraverserSet) 
(((Pair) entry.getValue()).getValue0())).iterator());
-                    reducingBarrierStep.addBarrier((((Pair) 
entry.getValue()).getValue1()));
-                } else
-                    reducingBarrierStep.addBarrier(entry.getValue());
+                barrierStep.addBarrier(map.get(key));
                 if (valueTraversal.hasNext())
-                    reducedMap.put(entry.getKey(), valueTraversal.next());
+                    map.put(key, valueTraversal.next());
             }
         });
-        assert map.isEmpty();
-        map.clear();
-        map.putAll(reducedMap);
         return (Map<K, V>) map;
     }
-}
-
-
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3496402a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GroupSideEffectStep.java
----------------------------------------------------------------------
diff --git 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GroupSideEffectStep.java
 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GroupSideEffectStep.java
index 0e8a4f5..9847a53 100644
--- 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GroupSideEffectStep.java
+++ 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GroupSideEffectStep.java
@@ -18,15 +18,17 @@
  */
 package org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect;
 
+import org.apache.tinkerpop.gremlin.process.traversal.Operator;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
+import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
 import org.apache.tinkerpop.gremlin.process.traversal.step.ByModulating;
 import org.apache.tinkerpop.gremlin.process.traversal.step.SideEffectCapable;
 import org.apache.tinkerpop.gremlin.process.traversal.step.TraversalParent;
 import org.apache.tinkerpop.gremlin.process.traversal.step.map.GroupStep;
 import 
org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
-import 
org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
 import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalUtil;
 import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
 import org.apache.tinkerpop.gremlin.util.function.HashMapSupplier;
@@ -44,8 +46,8 @@ public final class GroupSideEffectStep<S, K, V> extends 
SideEffectStep<S> implem
 
     private char state = 'k';
     private Traversal.Admin<S, K> keyTraversal;
-    private Traversal.Admin<S, ?> preTraversal;
     private Traversal.Admin<S, V> valueTraversal;
+    private Barrier barrierStep;
     ///
     private String sideEffectKey;
 
@@ -53,8 +55,11 @@ public final class GroupSideEffectStep<S, K, V> extends 
SideEffectStep<S> implem
         super(traversal);
         this.sideEffectKey = sideEffectKey;
         this.valueTraversal = this.integrateChild(__.fold().asAdmin());
-        this.preTraversal = 
this.integrateChild(GroupStep.generatePreTraversal(this.valueTraversal));
-        
this.getTraversal().getSideEffects().registerIfAbsent(this.sideEffectKey, 
HashMapSupplier.instance(), new 
GroupStep.GroupBiOperator<>(this.valueTraversal));
+        this.barrierStep = 
TraversalHelper.getFirstStepOfAssignableClass(Barrier.class, 
this.valueTraversal).orElse(null);
+        
this.getTraversal().getSideEffects().registerIfAbsent(this.sideEffectKey, 
HashMapSupplier.instance(),
+                new GroupStep.GroupBiOperator<>(null == this.barrierStep ?
+                        Operator.assign :
+                        this.barrierStep.getMemoryComputeKey().getReducer()));
     }
 
     @Override
@@ -64,8 +69,11 @@ public final class GroupSideEffectStep<S, K, V> extends 
SideEffectStep<S> implem
             this.state = 'v';
         } else if ('v' == this.state) {
             this.valueTraversal = 
this.integrateChild(GroupStep.convertValueTraversal(kvTraversal));
-            this.preTraversal = 
this.integrateChild(GroupStep.generatePreTraversal(this.valueTraversal));
-            this.getTraversal().getSideEffects().register(this.sideEffectKey, 
null, new GroupStep.GroupBiOperator<>(this.valueTraversal));
+            this.barrierStep = 
TraversalHelper.getFirstStepOfAssignableClass(Barrier.class, 
this.valueTraversal).orElse(null);
+            this.getTraversal().getSideEffects().register(this.sideEffectKey, 
null,
+                    new GroupStep.GroupBiOperator<>(null == this.barrierStep ?
+                            Operator.assign :
+                            
this.barrierStep.getMemoryComputeKey().getReducer()));
             this.state = 'x';
         } else {
             throw new IllegalStateException("The key and value traversals for 
group()-step have already been set: " + this);
@@ -75,18 +83,15 @@ public final class GroupSideEffectStep<S, K, V> extends 
SideEffectStep<S> implem
     @Override
     protected void sideEffect(final Traverser.Admin<S> traverser) {
         final Map<K, V> map = new HashMap<>(1);
-        if (null == this.preTraversal) {
-            map.put(TraversalUtil.applyNullable(traverser, this.keyTraversal), 
(V) traverser.split());
-        } else {
-            final TraverserSet traverserSet = new TraverserSet<>();
-            this.preTraversal.reset();
-            this.preTraversal.addStart(traverser.split());
-            while(this.preTraversal.hasNext()) {
-                traverserSet.add(this.preTraversal.nextTraverser());
-            }
-            map.put(TraversalUtil.applyNullable(traverser, this.keyTraversal), 
(V) traverserSet);
-        }
-        this.getTraversal().getSideEffects().add(this.sideEffectKey, map);
+        this.valueTraversal.reset();
+        this.valueTraversal.addStart(traverser);
+        if (null == this.barrierStep) {
+            if (this.valueTraversal.hasNext())
+                map.put(TraversalUtil.applyNullable(traverser, 
this.keyTraversal), (V) this.valueTraversal.next());
+        } else if (this.barrierStep.hasNextBarrier())
+            map.put(TraversalUtil.applyNullable(traverser, this.keyTraversal), 
(V) this.barrierStep.nextBarrier());
+        if (!map.isEmpty())
+            this.getTraversal().getSideEffects().add(this.sideEffectKey, map);
     }
 
     @Override
@@ -101,12 +106,10 @@ public final class GroupSideEffectStep<S, K, V> extends 
SideEffectStep<S> implem
 
     @Override
     public List<Traversal.Admin<?, ?>> getLocalChildren() {
-        final List<Traversal.Admin<?, ?>> children = new ArrayList<>(3);
+        final List<Traversal.Admin<?, ?>> children = new ArrayList<>(2);
         if (null != this.keyTraversal)
             children.add(this.keyTraversal);
         children.add(this.valueTraversal);
-        if (null != this.preTraversal)
-            children.add(this.preTraversal);
         return children;
     }
 
@@ -121,7 +124,7 @@ public final class GroupSideEffectStep<S, K, V> extends 
SideEffectStep<S> implem
         if (null != this.keyTraversal)
             clone.keyTraversal = this.keyTraversal.clone();
         clone.valueTraversal = this.valueTraversal.clone();
-        clone.preTraversal = (Traversal.Admin<S, ?>) 
GroupStep.generatePreTraversal(clone.valueTraversal);
+        clone.barrierStep = 
TraversalHelper.getFirstStepOfAssignableClass(Barrier.class, 
clone.valueTraversal).orElse(null);
         return clone;
     }
 
@@ -130,7 +133,6 @@ public final class GroupSideEffectStep<S, K, V> extends 
SideEffectStep<S> implem
         super.setTraversal(parentTraversal);
         this.integrateChild(this.keyTraversal);
         this.integrateChild(this.valueTraversal);
-        this.integrateChild(this.preTraversal);
     }
 
     @Override
@@ -145,4 +147,4 @@ public final class GroupSideEffectStep<S, K, V> extends 
SideEffectStep<S> implem
     public Map<K, V> generateFinalResult(final Map<K, ?> object) {
         return GroupStep.doFinalReduction((Map<K, Object>) object, 
this.valueTraversal);
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3496402a/gremlin-groovy-test/src/main/groovy/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GroovyGroupTest.groovy
----------------------------------------------------------------------
diff --git 
a/gremlin-groovy-test/src/main/groovy/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GroovyGroupTest.groovy
 
b/gremlin-groovy-test/src/main/groovy/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GroovyGroupTest.groovy
index 84da296..3ce9efe 100644
--- 
a/gremlin-groovy-test/src/main/groovy/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GroovyGroupTest.groovy
+++ 
b/gremlin-groovy-test/src/main/groovy/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GroovyGroupTest.groovy
@@ -123,5 +123,10 @@ public abstract class GroovyGroupTest {
         public Traversal<Vertex, Map<String, String>> 
get_g_V_groupXmX_byXnameX_byXinXknowsX_nameX_capXmX() {
             new ScriptTraversal<>(g, "gremlin-groovy", 
"g.V.group('m').by('name').by(__.in('knows').name).cap('m')")
         }
+
+        @Override
+        public Traversal<Vertex, Map<String, Number>> 
get_g_V_group_byXlabelX_byXbothE_groupXaX_byXlabelX_byXweight_sumX_weight_sumX()
 {
+            new ScriptTraversal<>(g, "gremlin-groovy", 
"g.V.group().by(label).by(bothE().group('a').by(label).by(values('weight').sum).weight.sum)")
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3496402a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GroupTest.java
----------------------------------------------------------------------
diff --git 
a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GroupTest.java
 
b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GroupTest.java
index 036c8c8..71b15a5 100644
--- 
a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GroupTest.java
+++ 
b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GroupTest.java
@@ -37,10 +37,12 @@ import java.util.Map;
 import static org.apache.tinkerpop.gremlin.LoadGraphWith.GraphData.GRATEFUL;
 import static org.apache.tinkerpop.gremlin.LoadGraphWith.GraphData.MODERN;
 import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.both;
+import static 
org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.bothE;
 import static 
org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.constant;
 import static 
org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.count;
 import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.out;
 import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.outE;
+import static 
org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.values;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -90,6 +92,8 @@ public abstract class GroupTest extends 
AbstractGremlinProcessTest {
 
     public abstract Traversal<Vertex, Map<String, String>> 
get_g_V_groupXmX_byXnameX_byXinXknowsX_nameX_capXmX();
 
+    public abstract Traversal<Vertex, Map<String, Number>> 
get_g_V_group_byXlabelX_byXbothE_groupXaX_byXlabelX_byXweight_sumX_weight_sumX();
+
     @Test
     @LoadGraphWith(MODERN)
     public void g_V_group_byXnameX() {
@@ -441,6 +445,23 @@ public abstract class GroupTest extends 
AbstractGremlinProcessTest {
         checkSideEffects(traversal.asAdmin().getSideEffects(), "m", 
HashMap.class);
     }
 
+    @Test
+    @LoadGraphWith(MODERN)
+    public void 
g_V_group_byXlabelX_byXbothE_groupXaX_byXlabelX_byXweight_sumX_weight_sumX() {
+        final Traversal<Vertex, Map<String, Number>> traversal = 
get_g_V_group_byXlabelX_byXbothE_groupXaX_byXlabelX_byXweight_sumX_weight_sumX();
+        printTraversalForm(traversal);
+        final Map<String, Number> map = traversal.next();
+        assertFalse(traversal.hasNext());
+        assertEquals(2, map.size());
+        assertEquals(2.0d, map.get("software").doubleValue(), 0.01d);
+        assertEquals(5.0d, map.get("person").doubleValue(), 0.01d);
+        checkSideEffects(traversal.asAdmin().getSideEffects(), "a", 
HashMap.class);
+        final Map<String, Number> sideEffect = 
traversal.asAdmin().getSideEffects().get("a");
+        assertEquals(2, sideEffect.size());
+        assertEquals(4.0d, sideEffect.get("created").doubleValue(), 0.01d);
+        assertEquals(3.0d, sideEffect.get("knows").doubleValue(), 0.01d);
+    }
+
     public static class Traversals extends GroupTest {
 
         @Override
@@ -525,17 +546,22 @@ public abstract class GroupTest extends 
AbstractGremlinProcessTest {
 
         @Override
         public Traversal<Vertex, Map<Long, Map<String, List<Vertex>>>> 
get_g_V_group_byXbothE_countX_byXgroup_byXlabelXX() {
-            return g.V().<Long, Map<String, 
List<Vertex>>>group().by(__.bothE().count()).by(__.group().by(T.label));
+            return g.V().<Long, Map<String, 
List<Vertex>>>group().by(bothE().count()).by(__.group().by(T.label));
         }
 
         @Override
         public Traversal<Vertex, Map<String, Map<String, Number>>> 
get_g_V_outXfollowedByX_group_byXsongTypeX_byXbothE_group_byXlabelX_byXweight_sumXX()
 {
-            return g.V().out("followedBy").<String, Map<String, 
Number>>group().by("songType").by(__.bothE().group().by(T.label).by(__.values("weight").sum()));
+            return g.V().out("followedBy").<String, Map<String, 
Number>>group().by("songType").by(bothE().group().by(T.label).by(values("weight").sum()));
         }
 
         @Override
         public Traversal<Vertex, Map<String, String>> 
get_g_V_groupXmX_byXnameX_byXinXknowsX_nameX_capXmX() {
             return 
g.V().group("m").by("name").by(__.in("knows").values("name")).cap("m");
         }
+
+        @Override
+        public Traversal<Vertex, Map<String, Number>> 
get_g_V_group_byXlabelX_byXbothE_groupXaX_byXlabelX_byXweight_sumX_weight_sumX()
 {
+            return g.V().<String, 
Number>group().by(T.label).by(bothE().group("a").by(T.label).by(values("weight").sum()).values("weight").sum());
+        }
     }
 }

Reply via email to