Repository: storm Updated Branches: refs/heads/master ce7ba7569 -> 5908b33b4
[STORM-2144] Fix Storm-sql group-by behavior in standalone mode Fix group-by to not rely on monotonic group-by keys Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/320ac9d3 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/320ac9d3 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/320ac9d3 Branch: refs/heads/master Commit: 320ac9d3a138f2dfd227d141fb2e16f355687c09 Parents: ce7ba75 Author: Arun Mahadevan <ar...@apache.org> Authored: Mon Oct 10 14:56:56 2016 +0530 Committer: Satish Duggana <sdugg...@hortonworks.com> Committed: Tue Oct 11 15:41:16 2016 +0530 ---------------------------------------------------------------------- .../backends/standalone/PlanCompiler.java | 1 + .../backends/standalone/RelNodeCompiler.java | 29 ++++++++++---------- 2 files changed, 15 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/320ac9d3/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/PlanCompiler.java ---------------------------------------------------------------------- diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/PlanCompiler.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/PlanCompiler.java index 8131934..4c69da1 100644 --- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/PlanCompiler.java +++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/PlanCompiler.java @@ -40,6 +40,7 @@ public class PlanCompiler { "// GENERATED CODE", "package " + PACKAGE_NAME + ";", "", "import java.util.Iterator;", "import java.util.Map;", "import java.util.HashMap;", "import java.util.List;", "import java.util.ArrayList;", + "import java.util.LinkedHashMap;", "import org.apache.storm.tuple.Values;", "import org.apache.storm.sql.runtime.AbstractChannelHandler;", "import org.apache.storm.sql.runtime.Channels;", http://git-wip-us.apache.org/repos/asf/storm/blob/320ac9d3/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java ---------------------------------------------------------------------- diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java index c6adb28..5c674ad 100644 --- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java +++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java @@ -66,8 +66,7 @@ class RelNodeCompiler extends PostOrderRelNodeVisitor<Void> { " private static final ChannelHandler %1$s = ", " new AbstractChannelHandler() {", " private final Values EMPTY_VALUES = new Values();", - " private List<Object> prevGroupValues = null;", - " private final Map<String, Object> accumulators = new HashMap<>();", + " private final Map<List<Object>, Map<String, Object>> state = new LinkedHashMap<>();", " private final int[] groupIndices = new int[] {%2$s};", " private List<Object> getGroupValues(Values _data) {", " List<Object> res = new ArrayList<>();", @@ -81,11 +80,15 @@ class RelNodeCompiler extends PostOrderRelNodeVisitor<Void> { " public void flush(ChannelContext ctx) {", " emitAggregateResults(ctx);", " super.flush(ctx);", - " prevGroupValues = null;", + " state.clear();", " }", "", " private void emitAggregateResults(ChannelContext ctx) {", - " %3$s", + " for (Map.Entry<List<Object>, Map<String, Object>> entry: state.entrySet()) {", + " List<Object> groupValues = entry.getKey();", + " Map<String, Object> accumulators = entry.getValue();", + " %3$s", + " }", " }", "", " @Override", @@ -245,18 +248,16 @@ class RelNodeCompiler extends PostOrderRelNodeVisitor<Void> { @Override public Void visitAggregate(Aggregate aggregate, List<Void> inputStreams) throws Exception { beginAggregateStage(aggregate); - pw.println(" List<Object> curGroupValues = _data == null ? null : getGroupValues(_data);"); - pw.println(" if (prevGroupValues != null && !prevGroupValues.equals(curGroupValues)) {"); - pw.println(" emitAggregateResults(ctx);"); + pw.println(" if (_data != null) {"); + pw.println(" List<Object> curGroupValues = getGroupValues(_data);"); + pw.println(" if (!state.containsKey(curGroupValues)) {"); + pw.println(" state.put(curGroupValues, new HashMap<String, Object>());"); pw.println(" }"); - pw.println(" if (curGroupValues != null) {"); + pw.println(" Map<String, Object> accumulators = state.get(curGroupValues);"); for (AggregateCall call : aggregate.getAggCallList()) { aggregate(call); } pw.println(" }"); - pw.println(" if (prevGroupValues != curGroupValues) {"); - pw.println(" prevGroupValues = curGroupValues;"); - pw.println(" }"); endStage(); return null; } @@ -293,10 +294,8 @@ class RelNodeCompiler extends PostOrderRelNodeVisitor<Void> { } return NEW_LINE_JOINER.join(sw.toString(), String.format(" ctx.emit(new Values(%s, %s));", - groupValueEmitStr("prevGroupValues", aggregate.getGroupSet().cardinality()), - Joiner.on(", ").join(res)), - " accumulators.clear();" - ); + groupValueEmitStr("groupValues", aggregate.getGroupSet().cardinality()), + Joiner.on(", ").join(res))); } private String aggregateResult(AggregateCall call, PrintWriter pw) {