This is an automated email from the ASF dual-hosted git repository.

himanshug pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 176da53  make double sum/min/max agg work on string columns (#8243)
176da53 is described below

commit 176da53996220776979f4ef7672ca4d31acb00b6
Author: Himanshu <g.himan...@gmail.com>
AuthorDate: Tue Aug 13 15:55:14 2019 -0700

    make double sum/min/max agg work on string columns (#8243)
    
    * make double sum/min/max agg work on string columns
    
    * style and compilation fixes
    
    * fix tests
    
    * address review comments
    
    * add comment on SimpleDoubleAggregatorFactory
    
    * make checkstyle happy
---
 .../org/apache/druid/java/util/common/Numbers.java |  24 +++
 .../druid/query/aggregation/AggregatorUtil.java    |   3 +-
 .../query/aggregation/DelegatingAggregator.java    |  74 ++++++++
 .../aggregation/DelegatingBufferAggregator.java    |  95 ++++++++++
 .../aggregation/DoubleMaxAggregatorFactory.java    |  15 +-
 .../aggregation/DoubleMinAggregatorFactory.java    |  15 +-
 .../aggregation/DoubleSumAggregatorFactory.java    |  28 ++-
 .../aggregation/SimpleDoubleAggregatorFactory.java |  62 ++++++-
 .../StringColumnDoubleAggregatorWrapper.java       |  69 +++++++
 .../StringColumnDoubleBufferAggregatorWrapper.java |  70 +++++++
 .../SettableValueDoubleColumnValueSelector.java    |  56 ++++++
 .../apache/druid/query/SchemaEvolutionTest.java    |   8 +-
 .../query/aggregation/AggregationTestHelper.java   |  54 ++++++
 .../aggregation/DoubleMaxAggregationTest.java      |   1 +
 .../aggregation/DoubleMinAggregationTest.java      |   1 +
 .../aggregation/StringColumnAggregationTest.java   | 201 +++++++++++++++++++++
 16 files changed, 728 insertions(+), 48 deletions(-)

diff --git a/core/src/main/java/org/apache/druid/java/util/common/Numbers.java 
b/core/src/main/java/org/apache/druid/java/util/common/Numbers.java
index c9d40d7..c40cb8e 100644
--- a/core/src/main/java/org/apache/druid/java/util/common/Numbers.java
+++ b/core/src/main/java/org/apache/druid/java/util/common/Numbers.java
@@ -19,6 +19,10 @@
 
 package org.apache.druid.java.util.common;
 
+import com.google.common.primitives.Doubles;
+
+import javax.annotation.Nullable;
+
 public final class Numbers
 {
   /**
@@ -92,6 +96,26 @@ public final class Numbers
     }
   }
 
+  /**
+   * Try parsing the given Number or String object val as double.
+   * @param val
+   * @param nullValue value to return when input was string type but not 
parseable into double value
+   * @return parsed double value
+   */
+  public static double tryParseDouble(@Nullable Object val, double nullValue)
+  {
+    if (val == null) {
+      return nullValue;
+    } else if (val instanceof Number) {
+      return ((Number) val).doubleValue();
+    } else if (val instanceof String) {
+      Double d = Doubles.tryParse((String) val);
+      return d == null ? nullValue : d.doubleValue();
+    } else {
+      throw new IAE("Unknown object type [%s]", val.getClass().getName());
+    }
+  }
+
   public static int toIntExact(long value, String error)
   {
     if ((int) value != value) {
diff --git 
a/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorUtil.java
 
b/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorUtil.java
index ca0eb3a..13a16d0 100644
--- 
a/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorUtil.java
+++ 
b/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorUtil.java
@@ -25,7 +25,6 @@ import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.math.expr.Expr;
 import org.apache.druid.math.expr.ExprEval;
 import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
-import org.apache.druid.segment.BaseDoubleColumnValueSelector;
 import org.apache.druid.segment.BaseFloatColumnValueSelector;
 import org.apache.druid.segment.BaseLongColumnValueSelector;
 import org.apache.druid.segment.ColumnSelectorFactory;
@@ -266,7 +265,7 @@ public class AggregatorUtil
   /**
    * Only one of fieldName and fieldExpression should be non-null
    */
-  static BaseDoubleColumnValueSelector 
makeColumnValueSelectorWithDoubleDefault(
+  static ColumnValueSelector makeColumnValueSelectorWithDoubleDefault(
       final ColumnSelectorFactory metricFactory,
       @Nullable final String fieldName,
       @Nullable final Expr fieldExpression,
diff --git 
a/processing/src/main/java/org/apache/druid/query/aggregation/DelegatingAggregator.java
 
b/processing/src/main/java/org/apache/druid/query/aggregation/DelegatingAggregator.java
new file mode 100644
index 0000000..c1b4b40
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/druid/query/aggregation/DelegatingAggregator.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation;
+
+import javax.annotation.Nullable;
+
+/**
+ * An Aggregator that delegates everything. It is used by Aggregator wrappers 
e.g.
+ * {@link StringColumnDoubleAggregatorWrapper} that modify some behavior of a 
delegate.
+ */
+public abstract class DelegatingAggregator implements Aggregator
+{
+  protected Aggregator delegate;
+
+  @Override
+  public void aggregate()
+  {
+    delegate.aggregate();
+  }
+
+  @Nullable
+  @Override
+  public Object get()
+  {
+    return delegate.get();
+  }
+
+  @Override
+  public float getFloat()
+  {
+    return delegate.getFloat();
+  }
+
+  @Override
+  public long getLong()
+  {
+    return delegate.getLong();
+  }
+
+  @Override
+  public double getDouble()
+  {
+    return delegate.getDouble();
+  }
+
+  @Override
+  public boolean isNull()
+  {
+    return delegate.isNull();
+  }
+
+  @Override
+  public void close()
+  {
+    delegate.close();
+  }
+}
diff --git 
a/processing/src/main/java/org/apache/druid/query/aggregation/DelegatingBufferAggregator.java
 
b/processing/src/main/java/org/apache/druid/query/aggregation/DelegatingBufferAggregator.java
new file mode 100644
index 0000000..9b1aa80
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/druid/query/aggregation/DelegatingBufferAggregator.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation;
+
+import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+/**
+ * A BufferAggregator that delegates everything. It is used by 
BufferAggregator wrappers e.g.
+ * {@link StringColumnDoubleBufferAggregatorWrapper} that modify some behavior 
of a delegate.
+ */
+public abstract class DelegatingBufferAggregator implements BufferAggregator
+{
+  protected BufferAggregator delegate;
+
+  @Override
+  public void init(ByteBuffer buf, int position)
+  {
+    delegate.init(buf, position);
+  }
+
+  @Override
+  public void aggregate(ByteBuffer buf, int position)
+  {
+    delegate.aggregate(buf, position);
+  }
+
+  @Nullable
+  @Override
+  public Object get(ByteBuffer buf, int position)
+  {
+    return delegate.get(buf, position);
+  }
+
+  @Override
+  public float getFloat(ByteBuffer buf, int position)
+  {
+    return delegate.getFloat(buf, position);
+  }
+
+  @Override
+  public long getLong(ByteBuffer buf, int position)
+  {
+    return delegate.getLong(buf, position);
+  }
+
+  @Override
+  public double getDouble(ByteBuffer buf, int position)
+  {
+    return delegate.getDouble(buf, position);
+  }
+
+  @Override
+  public void close()
+  {
+    delegate.close();
+  }
+
+  @Override
+  public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+  {
+    delegate.inspectRuntimeShape(inspector);
+  }
+
+  @Override
+  public void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, 
ByteBuffer newBuffer)
+  {
+    delegate.relocate(oldPosition, newPosition, oldBuffer, newBuffer);
+  }
+
+  @Override
+  public boolean isNull(ByteBuffer buf, int position)
+  {
+    return delegate.isNull(buf, position);
+  }
+}
diff --git 
a/processing/src/main/java/org/apache/druid/query/aggregation/DoubleMaxAggregatorFactory.java
 
b/processing/src/main/java/org/apache/druid/query/aggregation/DoubleMaxAggregatorFactory.java
index 21562af..1c697c8 100644
--- 
a/processing/src/main/java/org/apache/druid/query/aggregation/DoubleMaxAggregatorFactory.java
+++ 
b/processing/src/main/java/org/apache/druid/query/aggregation/DoubleMaxAggregatorFactory.java
@@ -25,7 +25,6 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.math.expr.ExprMacroTable;
 import org.apache.druid.segment.BaseDoubleColumnValueSelector;
-import org.apache.druid.segment.ColumnSelectorFactory;
 
 import javax.annotation.Nullable;
 import java.nio.ByteBuffer;
@@ -53,25 +52,19 @@ public class DoubleMaxAggregatorFactory extends 
SimpleDoubleAggregatorFactory
   }
 
   @Override
-  protected BaseDoubleColumnValueSelector selector(ColumnSelectorFactory 
metricFactory)
+  protected double nullValue()
   {
-    return getDoubleColumnSelector(
-        metricFactory,
-        Double.NEGATIVE_INFINITY
-    );
+    return Double.NEGATIVE_INFINITY;
   }
 
   @Override
-  protected Aggregator factorize(ColumnSelectorFactory metricFactory, 
BaseDoubleColumnValueSelector selector)
+  protected Aggregator buildAggregator(BaseDoubleColumnValueSelector selector)
   {
     return new DoubleMaxAggregator(selector);
   }
 
   @Override
-  protected BufferAggregator factorizeBuffered(
-      ColumnSelectorFactory metricFactory,
-      BaseDoubleColumnValueSelector selector
-  )
+  protected BufferAggregator 
buildBufferAggregator(BaseDoubleColumnValueSelector selector)
   {
     return new DoubleMaxBufferAggregator(selector);
   }
diff --git 
a/processing/src/main/java/org/apache/druid/query/aggregation/DoubleMinAggregatorFactory.java
 
b/processing/src/main/java/org/apache/druid/query/aggregation/DoubleMinAggregatorFactory.java
index f308db4..d56d8ed 100644
--- 
a/processing/src/main/java/org/apache/druid/query/aggregation/DoubleMinAggregatorFactory.java
+++ 
b/processing/src/main/java/org/apache/druid/query/aggregation/DoubleMinAggregatorFactory.java
@@ -25,7 +25,6 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.math.expr.ExprMacroTable;
 import org.apache.druid.segment.BaseDoubleColumnValueSelector;
-import org.apache.druid.segment.ColumnSelectorFactory;
 
 import javax.annotation.Nullable;
 import java.nio.ByteBuffer;
@@ -53,25 +52,19 @@ public class DoubleMinAggregatorFactory extends 
SimpleDoubleAggregatorFactory
   }
 
   @Override
-  protected BaseDoubleColumnValueSelector selector(ColumnSelectorFactory 
metricFactory)
+  protected double nullValue()
   {
-    return getDoubleColumnSelector(
-        metricFactory,
-        Double.POSITIVE_INFINITY
-    );
+    return Double.POSITIVE_INFINITY;
   }
 
   @Override
-  protected Aggregator factorize(ColumnSelectorFactory metricFactory, 
BaseDoubleColumnValueSelector selector)
+  protected Aggregator buildAggregator(BaseDoubleColumnValueSelector selector)
   {
     return new DoubleMinAggregator(selector);
   }
 
   @Override
-  protected BufferAggregator factorizeBuffered(
-      ColumnSelectorFactory metricFactory,
-      BaseDoubleColumnValueSelector selector
-  )
+  protected BufferAggregator 
buildBufferAggregator(BaseDoubleColumnValueSelector selector)
   {
     return new DoubleMinBufferAggregator(selector);
   }
diff --git 
a/processing/src/main/java/org/apache/druid/query/aggregation/DoubleSumAggregatorFactory.java
 
b/processing/src/main/java/org/apache/druid/query/aggregation/DoubleSumAggregatorFactory.java
index 1a019d7..00bc89b 100644
--- 
a/processing/src/main/java/org/apache/druid/query/aggregation/DoubleSumAggregatorFactory.java
+++ 
b/processing/src/main/java/org/apache/druid/query/aggregation/DoubleSumAggregatorFactory.java
@@ -25,7 +25,6 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.math.expr.ExprMacroTable;
 import org.apache.druid.segment.BaseDoubleColumnValueSelector;
-import org.apache.druid.segment.ColumnSelectorFactory;
 import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
 import org.apache.druid.segment.vector.VectorValueSelector;
 
@@ -55,39 +54,34 @@ public class DoubleSumAggregatorFactory extends 
SimpleDoubleAggregatorFactory
   }
 
   @Override
-  protected BaseDoubleColumnValueSelector selector(ColumnSelectorFactory 
metricFactory)
+  protected double nullValue()
   {
-    return getDoubleColumnSelector(
-        metricFactory,
-        0.0d
-    );
+    return 0.0d;
   }
 
   @Override
-  protected VectorValueSelector vectorSelector(VectorColumnSelectorFactory 
columnSelectorFactory)
+  protected Aggregator buildAggregator(BaseDoubleColumnValueSelector selector)
   {
-    return columnSelectorFactory.makeValueSelector(fieldName);
+    return new DoubleSumAggregator(selector);
   }
 
   @Override
-  protected Aggregator factorize(ColumnSelectorFactory metricFactory, 
BaseDoubleColumnValueSelector selector)
+  protected BufferAggregator 
buildBufferAggregator(BaseDoubleColumnValueSelector selector)
   {
-    return new DoubleSumAggregator(selector);
+    return new DoubleSumBufferAggregator(selector);
   }
 
+
   @Override
-  public boolean canVectorize()
+  protected VectorValueSelector vectorSelector(VectorColumnSelectorFactory 
columnSelectorFactory)
   {
-    return expression == null;
+    return columnSelectorFactory.makeValueSelector(fieldName);
   }
 
   @Override
-  protected BufferAggregator factorizeBuffered(
-      ColumnSelectorFactory metricFactory,
-      BaseDoubleColumnValueSelector selector
-  )
+  public boolean canVectorize()
   {
-    return new DoubleSumBufferAggregator(selector);
+    return expression == null;
   }
 
   @Override
diff --git 
a/processing/src/main/java/org/apache/druid/query/aggregation/SimpleDoubleAggregatorFactory.java
 
b/processing/src/main/java/org/apache/druid/query/aggregation/SimpleDoubleAggregatorFactory.java
index 6b3643a..f5586d0 100644
--- 
a/processing/src/main/java/org/apache/druid/query/aggregation/SimpleDoubleAggregatorFactory.java
+++ 
b/processing/src/main/java/org/apache/druid/query/aggregation/SimpleDoubleAggregatorFactory.java
@@ -29,7 +29,10 @@ import org.apache.druid.math.expr.ExprMacroTable;
 import org.apache.druid.math.expr.Parser;
 import org.apache.druid.segment.BaseDoubleColumnValueSelector;
 import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.column.ColumnCapabilities;
 import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.segment.column.ValueType;
 
 import javax.annotation.Nullable;
 import java.util.Collections;
@@ -37,7 +40,13 @@ import java.util.Comparator;
 import java.util.List;
 import java.util.Objects;
 
-public abstract class SimpleDoubleAggregatorFactory extends 
NullableAggregatorFactory<BaseDoubleColumnValueSelector>
+/**
+ * This is an abstract class inherited by various {@link AggregatorFactory} 
implementations that consume double input
+ * and produce double output on aggregation.
+ * It extends "NullableAggregatorFactory<ColumnValueSelector>" instead of 
"NullableAggregatorFactory<BaseDoubleColumnValueSelector>"
+ * to additionally support aggregation on single/multi value string column 
types.
+ */
+public abstract class SimpleDoubleAggregatorFactory extends 
NullableAggregatorFactory<ColumnValueSelector>
 {
   protected final String name;
   @Nullable
@@ -68,16 +77,57 @@ public abstract class SimpleDoubleAggregatorFactory extends 
NullableAggregatorFa
     );
   }
 
-  protected BaseDoubleColumnValueSelector 
getDoubleColumnSelector(ColumnSelectorFactory metricFactory, double nullValue)
+  @Override
+  protected Aggregator factorize(ColumnSelectorFactory metricFactory, 
ColumnValueSelector selector)
+  {
+    if (shouldUseStringColumnAggregatorWrapper(metricFactory)) {
+      return new StringColumnDoubleAggregatorWrapper(
+          selector,
+          SimpleDoubleAggregatorFactory.this::buildAggregator,
+          nullValue()
+      );
+    } else {
+      return buildAggregator(selector);
+    }
+  }
+
+  @Override
+  protected BufferAggregator factorizeBuffered(
+      ColumnSelectorFactory metricFactory,
+      ColumnValueSelector selector
+  )
+  {
+    if (shouldUseStringColumnAggregatorWrapper(metricFactory)) {
+      return new StringColumnDoubleBufferAggregatorWrapper(
+          selector,
+          SimpleDoubleAggregatorFactory.this::buildBufferAggregator,
+          nullValue()
+      );
+    } else {
+      return buildBufferAggregator(selector);
+    }
+  }
+
+  @Override
+  protected ColumnValueSelector selector(ColumnSelectorFactory metricFactory)
   {
     return AggregatorUtil.makeColumnValueSelectorWithDoubleDefault(
         metricFactory,
         fieldName,
         fieldExpression.get(),
-        nullValue
+        nullValue()
     );
   }
 
+  private boolean shouldUseStringColumnAggregatorWrapper(ColumnSelectorFactory 
columnSelectorFactory)
+  {
+    if (fieldName != null) {
+      ColumnCapabilities capabilities = 
columnSelectorFactory.getColumnCapabilities(fieldName);
+      return capabilities != null && capabilities.getType() == 
ValueType.STRING;
+    }
+    return false;
+  }
+
   @Override
   public Object deserialize(Object object)
   {
@@ -184,4 +234,10 @@ public abstract class SimpleDoubleAggregatorFactory 
extends NullableAggregatorFa
   {
     return expression;
   }
+
+  protected abstract double nullValue();
+
+  protected abstract Aggregator buildAggregator(BaseDoubleColumnValueSelector 
selector);
+
+  protected abstract BufferAggregator 
buildBufferAggregator(BaseDoubleColumnValueSelector selector);
 }
diff --git 
a/processing/src/main/java/org/apache/druid/query/aggregation/StringColumnDoubleAggregatorWrapper.java
 
b/processing/src/main/java/org/apache/druid/query/aggregation/StringColumnDoubleAggregatorWrapper.java
new file mode 100644
index 0000000..e970c94
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/druid/query/aggregation/StringColumnDoubleAggregatorWrapper.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation;
+
+import org.apache.druid.java.util.common.Numbers;
+import org.apache.druid.segment.BaseDoubleColumnValueSelector;
+import org.apache.druid.segment.BaseObjectColumnValueSelector;
+import 
org.apache.druid.segment.selector.settable.SettableValueDoubleColumnValueSelector;
+
+import java.util.List;
+import java.util.function.Function;
+
+/**
+ * This class can be used to wrap Double Aggregator that consume double type 
columns to handle String type.
+ */
+public class StringColumnDoubleAggregatorWrapper extends DelegatingAggregator
+{
+  private final BaseObjectColumnValueSelector selector;
+  private final double nullValue;
+  private final SettableValueDoubleColumnValueSelector doubleSelector;
+
+  public StringColumnDoubleAggregatorWrapper(
+      BaseObjectColumnValueSelector selector,
+      Function<BaseDoubleColumnValueSelector, Aggregator> delegateBuilder,
+      double nullValue
+  )
+  {
+    this.doubleSelector = new SettableValueDoubleColumnValueSelector();
+    this.selector = selector;
+    this.nullValue = nullValue;
+    this.delegate = delegateBuilder.apply(doubleSelector);
+  }
+
+  @Override
+  public void aggregate()
+  {
+    Object update = selector.getObject();
+
+    if (update == null) {
+      doubleSelector.setValue(nullValue);
+      delegate.aggregate();
+    } else if (update instanceof List) {
+      for (Object o : (List) update) {
+        doubleSelector.setValue(Numbers.tryParseDouble(o, nullValue));
+        delegate.aggregate();
+      }
+    } else {
+      doubleSelector.setValue(Numbers.tryParseDouble(update, nullValue));
+      delegate.aggregate();
+    }
+  }
+}
diff --git 
a/processing/src/main/java/org/apache/druid/query/aggregation/StringColumnDoubleBufferAggregatorWrapper.java
 
b/processing/src/main/java/org/apache/druid/query/aggregation/StringColumnDoubleBufferAggregatorWrapper.java
new file mode 100644
index 0000000..fb58ad5
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/druid/query/aggregation/StringColumnDoubleBufferAggregatorWrapper.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation;
+
+import org.apache.druid.java.util.common.Numbers;
+import org.apache.druid.segment.BaseDoubleColumnValueSelector;
+import org.apache.druid.segment.BaseObjectColumnValueSelector;
+import 
org.apache.druid.segment.selector.settable.SettableValueDoubleColumnValueSelector;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.function.Function;
+
+/**
+ * This class can be used to wrap Double BufferAggregator that consume double 
type columns to handle String type.
+ */
+public class StringColumnDoubleBufferAggregatorWrapper extends 
DelegatingBufferAggregator
+{
+  private final BaseObjectColumnValueSelector selector;
+  private final double nullValue;
+  private final SettableValueDoubleColumnValueSelector doubleSelector;
+
+  public StringColumnDoubleBufferAggregatorWrapper(
+      BaseObjectColumnValueSelector selector,
+      Function<BaseDoubleColumnValueSelector, BufferAggregator> 
delegateBuilder,
+      double nullValue
+  )
+  {
+    this.doubleSelector = new SettableValueDoubleColumnValueSelector();
+    this.selector = selector;
+    this.nullValue = nullValue;
+    this.delegate = delegateBuilder.apply(doubleSelector);
+  }
+
+  @Override
+  public void aggregate(ByteBuffer buf, int position)
+  {
+    Object update = selector.getObject();
+
+    if (update == null) {
+      doubleSelector.setValue(nullValue);
+      delegate.aggregate(buf, position);
+    } else if (update instanceof List) {
+      for (Object o : (List) update) {
+        doubleSelector.setValue(Numbers.tryParseDouble(o, nullValue));
+        delegate.aggregate(buf, position);
+      }
+    } else {
+      doubleSelector.setValue(Numbers.tryParseDouble(update, nullValue));
+      delegate.aggregate(buf, position);
+    }
+  }
+}
diff --git 
a/processing/src/main/java/org/apache/druid/segment/selector/settable/SettableValueDoubleColumnValueSelector.java
 
b/processing/src/main/java/org/apache/druid/segment/selector/settable/SettableValueDoubleColumnValueSelector.java
new file mode 100644
index 0000000..5b79c52
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/druid/segment/selector/settable/SettableValueDoubleColumnValueSelector.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.selector.settable;
+
+import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+import org.apache.druid.segment.BaseDoubleColumnValueSelector;
+import org.apache.druid.segment.ColumnValueSelector;
+
+/**
+ * A BaseDoubleColumnValueSelector impl to return settable double value on 
calls to
+ * {@link ColumnValueSelector#getDouble()}
+ */
+public class SettableValueDoubleColumnValueSelector implements 
BaseDoubleColumnValueSelector
+{
+  private double value;
+
+  @Override
+  public double getDouble()
+  {
+    return value;
+  }
+
+  @Override
+  public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+  {
+
+  }
+
+  @Override
+  public boolean isNull()
+  {
+    return false;
+  }
+
+  public void setValue(double value)
+  {
+    this.value = value;
+  }
+}
diff --git 
a/processing/src/test/java/org/apache/druid/query/SchemaEvolutionTest.java 
b/processing/src/test/java/org/apache/druid/query/SchemaEvolutionTest.java
index 889e80f..157cb3b 100644
--- a/processing/src/test/java/org/apache/druid/query/SchemaEvolutionTest.java
+++ b/processing/src/test/java/org/apache/druid/query/SchemaEvolutionTest.java
@@ -261,7 +261,7 @@ public class SchemaEvolutionTest
     // Only string(1)
     // Note: Expressions implicitly cast strings to numbers, leading to the 
a/b vs c/d difference.
     Assert.assertEquals(
-        timeseriesResult(ImmutableMap.of("a", 0L, "b", 0.0, "c", 31L, "d", 
THIRTY_ONE_POINT_ONE)),
+        timeseriesResult(ImmutableMap.of("a", 0L, "b", THIRTY_ONE_POINT_ONE, 
"c", 31L, "d", THIRTY_ONE_POINT_ONE)),
         runQuery(query, factory, ImmutableList.of(index1))
     );
 
@@ -293,7 +293,7 @@ public class SchemaEvolutionTest
     Assert.assertEquals(
         timeseriesResult(ImmutableMap.of(
             "a", 31L * 2,
-            "b", THIRTY_ONE_POINT_ONE + 31,
+            "b", THIRTY_ONE_POINT_ONE * 2 + 31,
             "c", 31L * 3,
             "d", THIRTY_ONE_POINT_ONE * 2 + 31
         )),
@@ -335,7 +335,7 @@ public class SchemaEvolutionTest
 
     // Only string(1) -- which we can filter but not aggregate
     Assert.assertEquals(
-        timeseriesResult(ImmutableMap.of("a", 0L, "b", 0.0, "c", 2L)),
+        timeseriesResult(ImmutableMap.of("a", 0L, "b", 19.1, "c", 2L)),
         runQuery(query, factory, ImmutableList.of(index1))
     );
 
@@ -368,7 +368,7 @@ public class SchemaEvolutionTest
     Assert.assertEquals(
         timeseriesResult(ImmutableMap.of(
             "a", 38L,
-            "b", 38.1,
+            "b", 57.2,
             "c", 6L
         )),
         runQuery(query, factory, ImmutableList.of(index1, index2, index3, 
index4))
diff --git 
a/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java
 
b/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java
index 7178769..6121281 100644
--- 
a/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java
+++ 
b/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java
@@ -571,6 +571,60 @@ public class AggregationTestHelper implements Closeable
     }
   }
 
+  public IncrementalIndex createIncrementalIndex(
+      Iterator rows,
+      InputRowParser parser,
+      final AggregatorFactory[] metrics,
+      long minTimestamp,
+      Granularity gran,
+      boolean deserializeComplexMetrics,
+      int maxRowCount,
+      boolean rollup
+  ) throws Exception
+  {
+    IncrementalIndex index = new IncrementalIndex.Builder()
+        .setIndexSchema(
+            new IncrementalIndexSchema.Builder()
+                .withMinTimestamp(minTimestamp)
+                .withQueryGranularity(gran)
+                .withMetrics(metrics)
+                .withRollup(rollup)
+                .build()
+        )
+        .setDeserializeComplexMetrics(deserializeComplexMetrics)
+        .setMaxRowCount(maxRowCount)
+        .buildOnheap();
+
+    while (rows.hasNext()) {
+      Object row = rows.next();
+      if (!index.canAppendRow()) {
+        throw new IAE("Can't add row to index");
+      }
+      if (row instanceof String && parser instanceof StringInputRowParser) {
+        //Note: this is required because StringInputRowParser is 
InputRowParser<ByteBuffer> as opposed to
+        //InputRowsParser<String>
+        index.add(((StringInputRowParser) parser).parse((String) row));
+      } else {
+        index.add(((List<InputRow>) parser.parseBatch(row)).get(0));
+      }
+    }
+
+    return index;
+  }
+
+  public Segment persistIncrementalIndex(
+      IncrementalIndex index,
+      File outDir
+  ) throws Exception
+  {
+    if (outDir == null) {
+      outDir = tempFolder.newFolder();
+    }
+    indexMerger.persist(index, outDir, new IndexSpec(), null);
+
+    return new QueryableIndexSegment(indexIO.loadIndex(outDir), 
SegmentId.dummy(""));
+  }
+
   //Simulates running group-by query on individual segments as historicals 
would do, json serialize the results
   //from each segment, later deserialize and merge and finally return the 
results
   public Sequence<ResultRow> runQueryOnSegments(final List<File> segmentDirs, 
final String queryJson)
diff --git 
a/processing/src/test/java/org/apache/druid/query/aggregation/DoubleMaxAggregationTest.java
 
b/processing/src/test/java/org/apache/druid/query/aggregation/DoubleMaxAggregationTest.java
index dde8fee..7342627 100644
--- 
a/processing/src/test/java/org/apache/druid/query/aggregation/DoubleMaxAggregationTest.java
+++ 
b/processing/src/test/java/org/apache/druid/query/aggregation/DoubleMaxAggregationTest.java
@@ -50,6 +50,7 @@ public class DoubleMaxAggregationTest
     selector = new TestDoubleColumnSelectorImpl(values);
     colSelectorFactory = EasyMock.createMock(ColumnSelectorFactory.class);
     
EasyMock.expect(colSelectorFactory.makeColumnValueSelector("nilly")).andReturn(selector);
+    
EasyMock.expect(colSelectorFactory.getColumnCapabilities("nilly")).andReturn(null);
     EasyMock.replay(colSelectorFactory);
   }
 
diff --git 
a/processing/src/test/java/org/apache/druid/query/aggregation/DoubleMinAggregationTest.java
 
b/processing/src/test/java/org/apache/druid/query/aggregation/DoubleMinAggregationTest.java
index a35ad33..89a4289 100644
--- 
a/processing/src/test/java/org/apache/druid/query/aggregation/DoubleMinAggregationTest.java
+++ 
b/processing/src/test/java/org/apache/druid/query/aggregation/DoubleMinAggregationTest.java
@@ -50,6 +50,7 @@ public class DoubleMinAggregationTest
     selector = new TestDoubleColumnSelectorImpl(values);
     colSelectorFactory = EasyMock.createMock(ColumnSelectorFactory.class);
     
EasyMock.expect(colSelectorFactory.makeColumnValueSelector("nilly")).andReturn(selector);
+    
EasyMock.expect(colSelectorFactory.getColumnCapabilities("nilly")).andReturn(null);
     EasyMock.replay(colSelectorFactory);
   }
 
diff --git 
a/processing/src/test/java/org/apache/druid/query/aggregation/StringColumnAggregationTest.java
 
b/processing/src/test/java/org/apache/druid/query/aggregation/StringColumnAggregationTest.java
new file mode 100644
index 0000000..69c1390
--- /dev/null
+++ 
b/processing/src/test/java/org/apache/druid/query/aggregation/StringColumnAggregationTest.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.MapBasedInputRow;
+import org.apache.druid.data.input.Row;
+import org.apache.druid.data.input.impl.NoopInputRowParser;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.guava.CloseQuietly;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.query.Druids;
+import org.apache.druid.query.Result;
+import org.apache.druid.query.groupby.GroupByQuery;
+import org.apache.druid.query.groupby.GroupByQueryConfig;
+import org.apache.druid.query.groupby.ResultRow;
+import org.apache.druid.query.timeseries.TimeseriesQuery;
+import org.apache.druid.query.timeseries.TimeseriesResultValue;
+import org.apache.druid.segment.IncrementalIndexSegment;
+import org.apache.druid.segment.Segment;
+import org.apache.druid.segment.incremental.IncrementalIndex;
+import org.apache.druid.timeline.SegmentId;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class StringColumnAggregationTest
+{
+  @Rule
+  public final TemporaryFolder tempFolder = new TemporaryFolder();
+
+  private final String singleValue = "singleValue";
+  private final String multiValue = "multiValue";
+
+  private final int n = 10;
+
+  // results after aggregation
+  private long numRows;
+  private double singleValueSum;
+  private double multiValueSum;
+  private double singleValueMax;
+  private double multiValueMax;
+  private double singleValueMin;
+  private double multiValueMin;
+
+  private List<Segment> segments;
+
+  private AggregationTestHelper aggregationTestHelper;
+
+  @Before
+  public void setup() throws Exception
+  {
+    List<String> dimensions = ImmutableList.of(singleValue, multiValue);
+    List<InputRow> inputRows = new ArrayList<>(n);
+    for (int i = 1; i <= n; i++) {
+      String val = String.valueOf(i * 1.0d);
+
+      inputRows.add(new MapBasedInputRow(
+          DateTime.now(DateTimeZone.UTC),
+          dimensions,
+          ImmutableMap.of(
+              singleValue, val,
+              multiValue, Lists.newArrayList(val, null, val)
+          )
+      ));
+    }
+
+    aggregationTestHelper = 
AggregationTestHelper.createGroupByQueryAggregationTestHelper(
+        Collections.EMPTY_LIST,
+        new GroupByQueryConfig(),
+        tempFolder
+    );
+
+    IncrementalIndex index = aggregationTestHelper.createIncrementalIndex(
+        inputRows.iterator(),
+        new NoopInputRowParser(null),
+        new AggregatorFactory[]{new CountAggregatorFactory("count")},
+        0,
+        Granularities.NONE,
+        false,
+        100,
+        false
+    );
+
+    this.segments = ImmutableList.of(
+        new IncrementalIndexSegment(index, SegmentId.dummy("test")),
+        aggregationTestHelper.persistIncrementalIndex(index, null)
+    );
+
+    // we have ingested arithmetic progression from 1 to 10, so sums can be 
computed using following
+    // All sum values are multiplied by 2 because we are running query on 
duplicated segment twice.
+    numRows = 2 * n;
+    singleValueSum = n * (n + 1);
+    multiValueSum = 2 * n * (n + 1);
+    singleValueMax = n;
+    multiValueMax = n;
+    singleValueMin = 1;
+    multiValueMin = 1;
+  }
+
+  @After
+  public void tearDown() throws Exception
+  {
+    if (segments != null) {
+      for (Segment seg : segments) {
+        CloseQuietly.close(seg);
+      }
+    }
+  }
+
+  @Test
+  public void testGroupBy() throws Exception
+  {
+    GroupByQuery query = new GroupByQuery.Builder()
+        .setDataSource("test")
+        .setGranularity(Granularities.ALL)
+        .setInterval("1970/2050")
+        .setAggregatorSpecs(
+            new DoubleSumAggregatorFactory("singleDoubleSum", singleValue),
+            new DoubleSumAggregatorFactory("multiDoubleSum", multiValue),
+            new DoubleMaxAggregatorFactory("singleDoubleMax", singleValue),
+            new DoubleMaxAggregatorFactory("multiDoubleMax", multiValue),
+            new DoubleMinAggregatorFactory("singleDoubleMin", singleValue),
+            new DoubleMinAggregatorFactory("multiDoubleMin", multiValue),
+            new LongSumAggregatorFactory("count", "count")
+        )
+        .build();
+
+    Sequence<ResultRow> seq = 
aggregationTestHelper.runQueryOnSegmentsObjs(segments, query);
+    Row result = Iterables.getOnlyElement(seq.toList()).toMapBasedRow(query);
+
+    Assert.assertEquals(numRows, result.getMetric("count").longValue());
+    Assert.assertEquals(singleValueSum, 
result.getMetric("singleDoubleSum").doubleValue(), 0.0001d);
+    Assert.assertEquals(multiValueSum, 
result.getMetric("multiDoubleSum").doubleValue(), 0.0001d);
+    Assert.assertEquals(singleValueMax, 
result.getMetric("singleDoubleMax").doubleValue(), 0.0001d);
+    Assert.assertEquals(multiValueMax, 
result.getMetric("multiDoubleMax").doubleValue(), 0.0001d);
+    Assert.assertEquals(singleValueMin, 
result.getMetric("singleDoubleMin").doubleValue(), 0.0001d);
+    Assert.assertEquals(multiValueMin, 
result.getMetric("multiDoubleMin").doubleValue(), 0.0001d);
+  }
+
+  @Test
+  public void testTimeseries() throws Exception
+  {
+    TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
+                                  .dataSource("test")
+                                  .granularity(Granularities.ALL)
+                                  .intervals("1970/2050")
+                                  .aggregators(
+                                      new 
DoubleSumAggregatorFactory("singleDoubleSum", singleValue),
+                                      new 
DoubleSumAggregatorFactory("multiDoubleSum", multiValue),
+                                      new 
DoubleMaxAggregatorFactory("singleDoubleMax", singleValue),
+                                      new 
DoubleMaxAggregatorFactory("multiDoubleMax", multiValue),
+                                      new 
DoubleMinAggregatorFactory("singleDoubleMin", singleValue),
+                                      new 
DoubleMinAggregatorFactory("multiDoubleMin", multiValue),
+                                      new LongSumAggregatorFactory("count", 
"count")
+                                  )
+                                  .build();
+
+    Sequence seq = 
AggregationTestHelper.createTimeseriesQueryAggregationTestHelper(Collections.EMPTY_LIST,
 tempFolder)
+                                        .runQueryOnSegmentsObjs(segments, 
query);
+    TimeseriesResultValue result = ((Result<TimeseriesResultValue>) 
Iterables.getOnlyElement(seq.toList())).getValue();
+
+    Assert.assertEquals(numRows, result.getLongMetric("count").longValue());
+    Assert.assertEquals(singleValueSum, 
result.getDoubleMetric("singleDoubleSum").doubleValue(), 0.0001d);
+    Assert.assertEquals(multiValueSum, 
result.getDoubleMetric("multiDoubleSum").doubleValue(), 0.0001d);
+    Assert.assertEquals(singleValueMax, 
result.getDoubleMetric("singleDoubleMax").doubleValue(), 0.0001d);
+    Assert.assertEquals(multiValueMax, 
result.getDoubleMetric("multiDoubleMax").doubleValue(), 0.0001d);
+    Assert.assertEquals(singleValueMin, 
result.getDoubleMetric("singleDoubleMin").doubleValue(), 0.0001d);
+    Assert.assertEquals(multiValueMin, 
result.getDoubleMetric("multiDoubleMin").doubleValue(), 0.0001d);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to