This is an automated email from the ASF dual-hosted git repository.

tingchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new cf017f1c63 Add FrequentStringsSketch and FrequentLonsSketch 
aggregation functions (#11098)
cf017f1c63 is described below

commit cf017f1c63de8703a71dcaaab4506ffee0672950
Author: Caner Balci <ba...@uber.com>
AuthorDate: Tue Sep 19 16:49:59 2023 -0700

    Add FrequentStringsSketch and FrequentLonsSketch aggregation functions 
(#11098)
    
    Squashed commit:
    
    Fix checkstyle validations
    Split FrequentItems sketch into FrequentStrings and FrequentLongs sketches
    Fix datasketch utility import
    Add documentation and do some cleanups
    Whitespace and typo fixes
    Add missing license header
---
 .../apache/pinot/core/common/ObjectSerDeUtils.java |  53 ++-
 .../function/AggregationFunctionFactory.java       |   4 +
 .../FrequentLongsSketchAggregationFunction.java    | 268 ++++++++++++++
 .../FrequentStringsSketchAggregationFunction.java  | 252 ++++++++++++++
 .../queries/FrequentItemsSketchQueriesTest.java    | 387 +++++++++++++++++++++
 .../SerializedFrequentLongsSketch.java             |  44 +++
 .../SerializedFrequentStringsSketch.java           |  44 +++
 .../pinot/segment/spi/AggregationFunctionType.java |   3 +
 8 files changed, 1054 insertions(+), 1 deletion(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java 
b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
index a18a23df35..69c00ea5b8 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
@@ -57,6 +57,9 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import org.apache.datasketches.common.ArrayOfStringsSerDe;
+import org.apache.datasketches.frequencies.ItemsSketch;
+import org.apache.datasketches.frequencies.LongsSketch;
 import org.apache.datasketches.kll.KllDoublesSketch;
 import org.apache.datasketches.memory.Memory;
 import org.apache.datasketches.theta.Sketch;
@@ -134,7 +137,9 @@ public class ObjectSerDeUtils {
     PinotFourthMoment(34),
     ArgMinMaxObject(35),
     KllDataSketch(36),
-    IntegerTupleSketch(37);
+    IntegerTupleSketch(37),
+    FrequentStringsSketch(38),
+    FrequentLongsSketch(39);
 
     private final int _value;
 
@@ -226,6 +231,10 @@ public class ObjectSerDeUtils {
         return ObjectType.IntegerTupleSketch;
       } else if (value instanceof ExprMinMaxObject) {
         return ObjectType.ArgMinMaxObject;
+      } else if (value instanceof ItemsSketch) {
+        return ObjectType.FrequentStringsSketch;
+      } else if (value instanceof LongsSketch) {
+        return ObjectType.FrequentLongsSketch;
       } else {
         throw new IllegalArgumentException("Unsupported type of value: " + 
value.getClass().getSimpleName());
       }
@@ -1285,6 +1294,46 @@ public class ObjectSerDeUtils {
         }
       };
 
+  public static final ObjectSerDe<ItemsSketch<String>> 
FREQUENT_STRINGS_SKETCH_SER_DE =
+      new ObjectSerDe<>() {
+        @Override
+        public byte[] serialize(ItemsSketch<String> sketch) {
+          return sketch.toByteArray(new ArrayOfStringsSerDe());
+        }
+
+        @Override
+        public ItemsSketch<String> deserialize(byte[] bytes) {
+          return ItemsSketch.getInstance(Memory.wrap(bytes), new 
ArrayOfStringsSerDe());
+        }
+
+        @Override
+        public ItemsSketch<String> deserialize(ByteBuffer byteBuffer) {
+          byte[] arr = new byte[byteBuffer.remaining()];
+          byteBuffer.get(arr);
+          return ItemsSketch.getInstance(Memory.wrap(arr), new 
ArrayOfStringsSerDe());
+        }
+      };
+
+  public static final ObjectSerDe<LongsSketch> FREQUENT_LONGS_SKETCH_SER_DE =
+      new ObjectSerDe<>() {
+        @Override
+        public byte[] serialize(LongsSketch sketch) {
+          return sketch.toByteArray();
+        }
+
+        @Override
+        public LongsSketch deserialize(byte[] bytes) {
+          return LongsSketch.getInstance(Memory.wrap(bytes));
+        }
+
+        @Override
+        public LongsSketch deserialize(ByteBuffer byteBuffer) {
+          byte[] arr = new byte[byteBuffer.remaining()];
+          byteBuffer.get(arr);
+          return LongsSketch.getInstance(Memory.wrap(arr));
+        }
+      };
+
   // NOTE: DO NOT change the order, it has to be the same order as the 
ObjectType
   //@formatter:off
   private static final ObjectSerDe[] SER_DES = {
@@ -1326,6 +1375,8 @@ public class ObjectSerDeUtils {
       ARG_MIN_MAX_OBJECT_SER_DE,
       KLL_SKETCH_SER_DE,
       DATA_SKETCH_INT_TUPLE_SER_DE,
+      FREQUENT_STRINGS_SKETCH_SER_DE,
+      FREQUENT_LONGS_SKETCH_SER_DE,
   };
   //@formatter:on
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
index 7f5695aa6c..2d69c093f2 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
@@ -351,6 +351,10 @@ public class AggregationFunctionFactory {
                 "Aggregation function: " + functionType + " is only supported 
in selection without alias.");
           case FUNNELCOUNT:
             return new FunnelCountAggregationFunctionFactory(arguments).get();
+          case FREQUENTSTRINGSSKETCH:
+            return new FrequentStringsSketchAggregationFunction(arguments);
+          case FREQUENTLONGSSKETCH:
+            return new FrequentLongsSketchAggregationFunction(arguments);
 
           default:
             throw new IllegalArgumentException("Unsupported aggregation 
function type: " + functionType);
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FrequentLongsSketchAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FrequentLongsSketchAggregationFunction.java
new file mode 100644
index 0000000000..64761096df
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FrequentLongsSketchAggregationFunction.java
@@ -0,0 +1,268 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.core.query.aggregation.function;
+
+import com.google.common.base.Preconditions;
+import java.util.List;
+import java.util.Map;
+import org.apache.datasketches.frequencies.LongsSketch;
+import org.apache.datasketches.memory.Memory;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import 
org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import 
org.apache.pinot.segment.local.customobject.SerializedFrequentLongsSketch;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.data.FieldSpec;
+
+
+/**
+ * <p>
+ *  {@code FrequentLongsSketchAggregationFunction} provides an approximate 
FrequentItems aggregation function based on
+ *  <a 
href="https://datasketches.apache.org/docs/Frequency/FrequentItemsOverview.html";>Apache
 DataSketches library</a>.
+ *  It is memory efficient compared to exact counting.
+ * </p>
+ * <p>
+ *   The function takes an INT or LONG column as input and returns a Base64 
encoded sketch object which can be
+ *   deserialized and used to estimate the frequency of items in the dataset 
(how many times they appear).
+ * </p>
+ * <p><b>FREQUENT_STRINGS_SKETCH(col, maxMapSize=256)</b></p>
+ * <p>E.g.:</p>
+ * <ul>
+ *   <li><b>FREQUENT_LONGS_SKETCH(col)</b></li>
+ *   <li><b>FREQUENT_LONGS_SKETCH(col, 1024)</b></li>
+ * </ul>
+ *
+ * <p>
+ *   If the column type is BYTES, the aggregation function will assume it is a 
serialized FrequentItems data sketch
+ *   of type `LongsSketch`and will attempt to deserialize it for merging with 
other sketch objects.
+ * </p>
+ *
+ * <p>
+ *   Second argument, maxMapsSize, refers to the size of the physical length 
of the hashmap which stores counts. It
+ *   influences the accuracy of the sketch and should be a power of 2.
+ * </p>
+ *
+ * <p>
+ *   There is a variation of the function (<b>FREQUENT_STRINGS_SKETCH</b>) 
which accepts STRING type input columns.
+ * </p>
+ */
+public class FrequentLongsSketchAggregationFunction
+    extends BaseSingleInputAggregationFunction<LongsSketch, Comparable<?>> {
+  protected static final int DEFAULT_MAX_MAP_SIZE = 256;
+
+  protected int _maxMapSize;
+
+  public FrequentLongsSketchAggregationFunction(List<ExpressionContext> 
arguments) {
+    super(arguments.get(0));
+    int numArguments = arguments.size();
+    Preconditions.checkArgument(numArguments == 1 || numArguments == 2,
+        "Expecting 1 or 2 arguments for FrequentLongsSketch function: 
FREQUENTITEMSSKETCH(column, maxMapSize");
+    _maxMapSize = numArguments == 2 ? 
arguments.get(1).getLiteral().getIntValue() : DEFAULT_MAX_MAP_SIZE;
+  }
+
+  @Override
+  public AggregationFunctionType getType() {
+    return AggregationFunctionType.FREQUENTLONGSSKETCH;
+  }
+
+  @Override
+  public AggregationResultHolder createAggregationResultHolder() {
+    return new ObjectAggregationResultHolder();
+  }
+
+  @Override
+  public GroupByResultHolder createGroupByResultHolder(int initialCapacity, 
int maxCapacity) {
+    return new ObjectGroupByResultHolder(initialCapacity, maxCapacity);
+  }
+
+  @Override
+  public void aggregate(int length, AggregationResultHolder 
aggregationResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    BlockValSet valueSet = blockValSetMap.get(_expression);
+    FieldSpec.DataType valueType = valueSet.getValueType();
+
+    LongsSketch sketch = getOrCreateSketch(aggregationResultHolder);
+
+    switch (valueType) {
+      case BYTES:
+        // Assuming the column contains serialized data sketch
+        LongsSketch[] deserializedSketches = 
deserializeSketches(blockValSetMap.get(_expression).getBytesValuesSV());
+        sketch = getOrCreateSketch(aggregationResultHolder);
+
+        for (LongsSketch colSketch : deserializedSketches) {
+          sketch.merge(colSketch);
+        }
+        break;
+      case INT:
+      case LONG:
+        for (Long val : valueSet.getLongValuesSV()) {
+          sketch.update(val);
+        }
+        break;
+      default:
+        throw new UnsupportedOperationException("Cannot aggregate on non 
int/long types");
+    }
+  }
+
+  @Override
+  public void aggregateGroupBySV(int length, int[] groupKeyArray, 
GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    BlockValSet valueSet = blockValSetMap.get(_expression);
+    FieldSpec.DataType valueType = valueSet.getValueType();
+
+    switch (valueType) {
+      case BYTES:
+        // serialized sketch
+        LongsSketch[] deserializedSketches =
+            
deserializeSketches(blockValSetMap.get(_expression).getBytesValuesSV());
+        for (int i = 0; i < length; i++) {
+          LongsSketch sketch = getOrCreateSketch(groupByResultHolder, 
groupKeyArray[i]);
+          sketch.merge(deserializedSketches[i]);
+        }
+        break;
+      case INT:
+      case LONG:
+        long[] values = valueSet.getLongValuesSV();
+        for (int i = 0; i < length; i++) {
+          LongsSketch sketch = getOrCreateSketch(groupByResultHolder, 
groupKeyArray[i]);
+          sketch.update(values[i]);
+        }
+        break;
+      default:
+        throw new UnsupportedOperationException("Cannot aggregate on non 
int/long types");
+    }
+  }
+
+  @Override
+  public void aggregateGroupByMV(int length, int[][] groupKeysArray, 
GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    BlockValSet valueSet = blockValSetMap.get(_expression);
+    FieldSpec.DataType valueType = valueSet.getValueType();
+
+    switch (valueType) {
+      case BYTES:
+        // serialized sketch
+        LongsSketch[] deserializedSketches =
+            
deserializeSketches(blockValSetMap.get(_expression).getBytesValuesSV());
+        for (int i = 0; i < length; i++) {
+          for (int groupKey : groupKeysArray[i]) {
+            LongsSketch sketch = getOrCreateSketch(groupByResultHolder, 
groupKey);
+            sketch.merge(deserializedSketches[i]);
+          }
+        }
+        break;
+    case INT:
+    case LONG:
+      long[] values = valueSet.getLongValuesSV();
+      for (int i = 0; i < length; i++) {
+        for (int groupKey : groupKeysArray[i]) {
+          LongsSketch sketch = getOrCreateSketch(groupByResultHolder, 
groupKey);
+          sketch.update(values[i]);
+        }
+      }
+      break;
+    default:
+      throw new UnsupportedOperationException("Cannot aggregate on non 
int/long types");
+    }
+  }
+
+  /**
+   * Extracts the sketch from the result holder or creates a new one if it 
does not exist.
+   */
+  protected LongsSketch getOrCreateSketch(AggregationResultHolder 
aggregationResultHolder) {
+    LongsSketch sketch = aggregationResultHolder.getResult();
+    if (sketch == null) {
+      sketch = new LongsSketch(_maxMapSize);
+      aggregationResultHolder.setValue(sketch);
+    }
+    return sketch;
+  }
+
+  /**
+   * Extracts the sketch from the group by result holder for key
+   * or creates a new one if it does not exist.
+   */
+  protected LongsSketch getOrCreateSketch(GroupByResultHolder 
groupByResultHolder, int groupKey) {
+    LongsSketch sketch = groupByResultHolder.getResult(groupKey);
+    if (sketch == null) {
+      sketch = new LongsSketch(_maxMapSize);
+      groupByResultHolder.setValueForKey(groupKey, sketch);
+    }
+    return sketch;
+  }
+
+  /**
+   * Deserializes the sketches from the bytes.
+   */
+  protected LongsSketch[] deserializeSketches(byte[][] serializedSketches) {
+    LongsSketch[] sketches = new LongsSketch[serializedSketches.length];
+    for (int i = 0; i < serializedSketches.length; i++) {
+      sketches[i] = 
LongsSketch.getInstance(Memory.wrap(serializedSketches[i]));
+    }
+    return sketches;
+  }
+
+  @Override
+  public LongsSketch extractAggregationResult(AggregationResultHolder 
aggregationResultHolder) {
+    return aggregationResultHolder.getResult();
+  }
+
+  @Override
+  public LongsSketch extractGroupByResult(GroupByResultHolder 
groupByResultHolder, int groupKey) {
+    return groupByResultHolder.getResult(groupKey);
+  }
+
+  @Override
+  public LongsSketch merge(LongsSketch sketch1, LongsSketch sketch2) {
+    LongsSketch union = new LongsSketch(_maxMapSize);
+    if (sketch1 != null) {
+      union.merge(sketch1);
+    }
+    if (sketch2 != null) {
+      union.merge(sketch2);
+    }
+    return union;
+  }
+
+  @Override
+  public DataSchema.ColumnDataType getIntermediateResultColumnType() {
+    return DataSchema.ColumnDataType.OBJECT;
+  }
+
+  @Override
+  public DataSchema.ColumnDataType getFinalResultColumnType() {
+    return DataSchema.ColumnDataType.STRING;
+  }
+
+  @Override
+  public String getResultColumnName() {
+    return AggregationFunctionType.FREQUENTLONGSSKETCH.getName().toLowerCase()
+        + "(" + _expression + ")";
+  }
+
+  @Override
+  public Comparable<?> extractFinalResult(LongsSketch sketch) {
+    return new SerializedFrequentLongsSketch(sketch);
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FrequentStringsSketchAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FrequentStringsSketchAggregationFunction.java
new file mode 100644
index 0000000000..2424f223cf
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FrequentStringsSketchAggregationFunction.java
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.core.query.aggregation.function;
+
+import com.google.common.base.Preconditions;
+import java.util.List;
+import java.util.Map;
+import org.apache.datasketches.common.ArrayOfStringsSerDe;
+import org.apache.datasketches.frequencies.ItemsSketch;
+import org.apache.datasketches.memory.Memory;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import 
org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import 
org.apache.pinot.segment.local.customobject.SerializedFrequentStringsSketch;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.data.FieldSpec;
+
+
+/**
+ * <p>
+ *  {@code FrequentStringsSketchAggregationFunction} provides an approximate 
FrequentItems aggregation function based on
+ *  <a 
href="https://datasketches.apache.org/docs/Frequency/FrequentItemsOverview.html";>Apache
 DataSketches library</a>.
+ *  It is memory efficient compared to exact counting.
+ * </p>
+ * <p>
+ *   The function takes a STRING column as input and returns a Base64 encoded 
sketch object which can be
+ *   deserialized and used to estimate the frequency of items in the dataset 
(how many times they appear).
+ * </p>
+ * <p><b>FREQUENT_STRINGS_SKETCH(col, maxMapSize=256)</b></p>
+ * <p>E.g.:</p>
+ * <ul>
+ *   <li><b>FREQUENT_STRINGS_SKETCH(col)</b></li>
+ *   <li><b>FREQUENT_STRINGS_SKETCH(col, 1024)</b></li>
+ * </ul>
+ *
+ * <p>
+ *   If the column type is BYTES, the aggregation function will assume it is a 
serialized FrequentItems data sketch
+ *   of type `ItemSketch<String>`and will attempt to deserialize it for 
merging with other sketch objects.
+ * </p>
+ *
+ * <p>
+ *   Second argument, maxMapsSize, refers to the size of the physical length 
of the hashmap which stores counts. It
+ *   influences the accuracy of the sketch and should be a power of 2.
+ * </p>
+ *
+ * <p>
+ *   There is a variation of the function (<b>FREQUENT_LONGS_SKETCH</b>) which 
accept INT and LONG type input columns.
+ * </p>
+ */
+public class FrequentStringsSketchAggregationFunction
+    extends BaseSingleInputAggregationFunction<ItemsSketch<String>, 
Comparable<?>> {
+  protected static final int DEFAULT_MAX_MAP_SIZE = 256;
+
+  protected int _maxMapSize;
+
+  public FrequentStringsSketchAggregationFunction(List<ExpressionContext> 
arguments) {
+    super(arguments.get(0));
+    int numArguments = arguments.size();
+    Preconditions.checkArgument(numArguments == 1 || numArguments == 2,
+        "Expecting 1 or 2 arguments for FrequentItemsSketch function: 
FREQUENTSTRINGSSKETCH(column, maxMapSize");
+    _maxMapSize = numArguments == 2 ? 
arguments.get(1).getLiteral().getIntValue() : DEFAULT_MAX_MAP_SIZE;
+  }
+
+  @Override
+  public AggregationFunctionType getType() {
+    return AggregationFunctionType.FREQUENTSTRINGSSKETCH;
+  }
+
+  @Override
+  public AggregationResultHolder createAggregationResultHolder() {
+    return new ObjectAggregationResultHolder();
+  }
+
+  @Override
+  public GroupByResultHolder createGroupByResultHolder(int initialCapacity, 
int maxCapacity) {
+    return new ObjectGroupByResultHolder(initialCapacity, maxCapacity);
+  }
+
+  @Override
+  public void aggregate(int length, AggregationResultHolder 
aggregationResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    BlockValSet valueSet = blockValSetMap.get(_expression);
+    FieldSpec.DataType valueType = valueSet.getValueType();
+
+    ItemsSketch<String> sketch = getOrCreateSketch(aggregationResultHolder);
+
+    if (valueType == FieldSpec.DataType.BYTES) {
+      // Assuming the column contains serialized data sketch
+      ItemsSketch<String>[] deserializedSketches =
+          
deserializeSketches(blockValSetMap.get(_expression).getBytesValuesSV());
+      sketch = getOrCreateSketch(aggregationResultHolder);
+
+      for (ItemsSketch<String> colSketch : deserializedSketches) {
+        sketch.merge(colSketch);
+      }
+    } else {
+      for (String val : valueSet.getStringValuesSV()) {
+        sketch.update(val);
+      }
+    }
+  }
+
+  @Override
+  public void aggregateGroupBySV(int length, int[] groupKeyArray, 
GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    BlockValSet valueSet = blockValSetMap.get(_expression);
+    FieldSpec.DataType valueType = valueSet.getValueType();
+
+    if (valueType == FieldSpec.DataType.BYTES) {
+      // serialized sketch
+      ItemsSketch<String>[] deserializedSketches =
+          
deserializeSketches(blockValSetMap.get(_expression).getBytesValuesSV());
+      for (int i = 0; i < length; i++) {
+        ItemsSketch<String> sketch = getOrCreateSketch(groupByResultHolder, 
groupKeyArray[i]);
+        sketch.merge(deserializedSketches[i]);
+      }
+    } else {
+      String[] values = valueSet.getStringValuesSV();
+      for (int i = 0; i < length; i++) {
+        ItemsSketch<String> sketch = getOrCreateSketch(groupByResultHolder, 
groupKeyArray[i]);
+        sketch.update(values[i]);
+      }
+    }
+  }
+
+  @Override
+  public void aggregateGroupByMV(int length, int[][] groupKeysArray, 
GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    BlockValSet valueSet = blockValSetMap.get(_expression);
+    FieldSpec.DataType valueType = valueSet.getValueType();
+
+    if (valueType == FieldSpec.DataType.BYTES) {
+      // serialized sketch
+      ItemsSketch<String>[] deserializedSketches =
+          
deserializeSketches(blockValSetMap.get(_expression).getBytesValuesSV());
+      for (int i = 0; i < length; i++) {
+        for (int groupKey : groupKeysArray[i]) {
+          ItemsSketch<String> sketch = getOrCreateSketch(groupByResultHolder, 
groupKey);
+          sketch.merge(deserializedSketches[i]);
+        }
+      }
+    } else {
+      String[] values = valueSet.getStringValuesSV();
+      for (int i = 0; i < length; i++) {
+        for (int groupKey : groupKeysArray[i]) {
+          ItemsSketch<String> sketch = getOrCreateSketch(groupByResultHolder, 
groupKey);
+          sketch.update(values[i]);
+        }
+      }
+    }
+  }
+
+  /**
+   * Extracts the sketch from the result holder or creates a new one if it 
does not exist.
+   */
+  protected ItemsSketch<String> getOrCreateSketch(AggregationResultHolder 
aggregationResultHolder) {
+    ItemsSketch<String> sketch = aggregationResultHolder.getResult();
+    if (sketch == null) {
+      sketch = new ItemsSketch<>(_maxMapSize);
+      aggregationResultHolder.setValue(sketch);
+    }
+    return sketch;
+  }
+
+  /**
+   * Extracts the sketch from the group by result holder for key
+   * or creates a new one if it does not exist.
+   */
+  protected ItemsSketch<String> getOrCreateSketch(GroupByResultHolder 
groupByResultHolder, int groupKey) {
+    ItemsSketch<String> sketch = groupByResultHolder.getResult(groupKey);
+    if (sketch == null) {
+      sketch = new ItemsSketch<>(_maxMapSize);
+      groupByResultHolder.setValueForKey(groupKey, sketch);
+    }
+    return sketch;
+  }
+
+  /**
+   * Deserializes the sketches from the bytes.
+   */
+  protected ItemsSketch<String>[] deserializeSketches(byte[][] 
serializedSketches) {
+    ItemsSketch<String>[] sketches = new 
ItemsSketch[serializedSketches.length];
+    for (int i = 0; i < serializedSketches.length; i++) {
+      sketches[i] = 
ItemsSketch.getInstance(Memory.wrap(serializedSketches[i]), new 
ArrayOfStringsSerDe());
+    }
+    return sketches;
+  }
+
+  @Override
+  public ItemsSketch<String> extractAggregationResult(AggregationResultHolder 
aggregationResultHolder) {
+    return aggregationResultHolder.getResult();
+  }
+
+  @Override
+  public ItemsSketch<String> extractGroupByResult(GroupByResultHolder 
groupByResultHolder, int groupKey) {
+    return groupByResultHolder.getResult(groupKey);
+  }
+
+  @Override
+  public ItemsSketch<String> merge(ItemsSketch<String> sketch1, 
ItemsSketch<String> sketch2) {
+    ItemsSketch<String> union = new ItemsSketch<>(_maxMapSize);
+    if (sketch1 != null) {
+      union.merge(sketch1);
+    }
+    if (sketch2 != null) {
+      union.merge(sketch2);
+    }
+    return union;
+  }
+
+  @Override
+  public DataSchema.ColumnDataType getIntermediateResultColumnType() {
+    return DataSchema.ColumnDataType.OBJECT;
+  }
+
+  @Override
+  public DataSchema.ColumnDataType getFinalResultColumnType() {
+    return DataSchema.ColumnDataType.STRING;
+  }
+
+  @Override
+  public String getResultColumnName() {
+    return 
AggregationFunctionType.FREQUENTSTRINGSSKETCH.getName().toLowerCase()
+        + "(" + _expression + ")";
+  }
+
+  @Override
+  public Comparable<?> extractFinalResult(ItemsSketch<String> sketch) {
+    return new SerializedFrequentStringsSketch(sketch);
+  }
+}
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/FrequentItemsSketchQueriesTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/queries/FrequentItemsSketchQueriesTest.java
new file mode 100644
index 0000000000..deee8f64a3
--- /dev/null
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/FrequentItemsSketchQueriesTest.java
@@ -0,0 +1,387 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.queries;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.io.FileUtils;
+import org.apache.datasketches.common.ArrayOfStringsSerDe;
+import org.apache.datasketches.frequencies.ErrorType;
+import org.apache.datasketches.frequencies.ItemsSketch;
+import org.apache.datasketches.frequencies.LongsSketch;
+import org.apache.datasketches.memory.Memory;
+import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.core.operator.blocks.results.AggregationResultsBlock;
+import org.apache.pinot.core.operator.query.AggregationOperator;
+import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
+import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
+import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.DimensionFieldSpec;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.MetricFieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.utils.ReadMode;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+
+
+/**
+ * Tests for FREQUENT_STRINGS_SKETCH and FREQUENT_LONGS_SKETCH aggregation 
functions.
+ *
+ * <ul>
+ *   <li>Generates a segment with LONG, STRING, SKETCH and a group-by 
column</li>
+ *   <li>Runs aggregation and group-by queries on the generated segment</li>
+ *   <li>Compares the results from sketches to exact calculations via 
count()</li>
+ * </ul>
+ */
+public class FrequentItemsSketchQueriesTest extends BaseQueriesTest {
+  protected static final File INDEX_DIR = new 
File(FileUtils.getTempDirectory(), "FrequentItemsQueriesTest");
+  protected static final String TABLE_NAME = "testTable";
+  protected static final String SEGMENT_NAME = "testSegment";
+
+  protected static final int MAX_MAP_SIZE = 64;
+  protected static final String LONG_COLUMN = "longColumn";
+  protected static final String STRING_COLUMN = "stringColumn";
+  protected static final String STRING_SKETCH_COLUMN = "stringSketchColumn";
+  protected static final String LONG_SKETCH_COLUMN = "longSketchColumn";
+  protected static final String GROUP_BY_COLUMN = "groupByColumn";
+
+  private IndexSegment _indexSegment;
+  private List<IndexSegment> _indexSegments;
+
+  @Override
+  protected String getFilter() {
+    return ""; // No filtering required for this test.
+  }
+
+  @Override
+  protected IndexSegment getIndexSegment() {
+    return _indexSegment;
+  }
+
+  @Override
+  protected List<IndexSegment> getIndexSegments() {
+    return _indexSegments;
+  }
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    FileUtils.deleteQuietly(INDEX_DIR);
+
+    buildSegment();
+    ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(new 
File(INDEX_DIR, SEGMENT_NAME), ReadMode.mmap);
+    _indexSegment = immutableSegment;
+    _indexSegments = Arrays.asList(immutableSegment, immutableSegment);
+  }
+
+  protected void buildSegment() throws Exception {
+
+    // Values chosen with distinct frequencies not to create ambiguity in 
testing
+    String[] strValues = new String[] {"a", "a", "a", "b", "b", "a", "d", "d", 
"c", "d"};
+    Long[] longValues = new Long[] {1L, 2L, 1L, 1L, 1L, 2L, 5L, 4L, 4L, 4L};
+    String[] groups = new String[] {"g1", "g1", "g1", "g1", "g1", "g1", "g2", 
"g2", "g2", "g2"};
+
+    List<GenericRow> rows = new ArrayList<>(strValues.length);
+    for (int i = 0; i < strValues.length; i++) {
+      GenericRow row = new GenericRow();
+
+      row.putValue(LONG_COLUMN, longValues[i]);
+      row.putValue(STRING_COLUMN, strValues[i]);
+
+      LongsSketch longSketch = new LongsSketch(MAX_MAP_SIZE);
+      longSketch.update(longValues[i]);
+      row.putValue(LONG_SKETCH_COLUMN, longSketch.toByteArray());
+
+      ItemsSketch<String> strSketch = new ItemsSketch<>(MAX_MAP_SIZE);
+      strSketch.update(strValues[i]);
+      row.putValue(STRING_SKETCH_COLUMN, strSketch.toByteArray(new 
ArrayOfStringsSerDe()));
+
+      row.putValue(GROUP_BY_COLUMN, groups[i]);
+
+      rows.add(row);
+    }
+
+    Schema schema = new Schema();
+    schema.addField(new DimensionFieldSpec(LONG_COLUMN, 
FieldSpec.DataType.LONG, true));
+    schema.addField(new DimensionFieldSpec(STRING_COLUMN, 
FieldSpec.DataType.STRING, true));
+    schema.addField(new MetricFieldSpec(LONG_SKETCH_COLUMN, 
FieldSpec.DataType.BYTES));
+    schema.addField(new MetricFieldSpec(STRING_SKETCH_COLUMN, 
FieldSpec.DataType.BYTES));
+    schema.addField(new DimensionFieldSpec(GROUP_BY_COLUMN, 
FieldSpec.DataType.STRING, true));
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
+
+    SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, 
schema);
+    config.setOutDir(INDEX_DIR.getPath());
+    config.setTableName(TABLE_NAME);
+    config.setSegmentName(SEGMENT_NAME);
+
+    SegmentIndexCreationDriverImpl driver = new 
SegmentIndexCreationDriverImpl();
+    try (RecordReader recordReader = new GenericRowRecordReader(rows)) {
+      driver.init(config, recordReader);
+      driver.build();
+    }
+  }
+
+  @Test
+  public void testAggregationForStringValues() {
+    // Fetch the sketch object which collects Frequent Items
+    String query = String.format(
+        "SELECT FREQUENTSTRINGSSKETCH(%1$s) FROM %2$s", STRING_COLUMN, 
TABLE_NAME);
+    AggregationOperator aggregationOperator = getOperator(query);
+    AggregationResultsBlock resultsBlock = aggregationOperator.nextBlock();
+    List<Object> aggregationResult = resultsBlock.getResults();
+
+    assertNotNull(aggregationResult);
+    assertEquals(aggregationResult.size(), 1);
+
+    // Fetch the exact list by count/group-by and compare
+    String[] exactOrdered = getExactOrderedStrings();
+    assertStringsSketch((ItemsSketch<String>) aggregationResult.get(0), 
exactOrdered);
+  }
+
+  @Test
+  public void testAggregationForLongValues() {
+    // Fetch the sketch object which collects Frequent Items
+    String query = String.format(
+        "SELECT FREQUENTLONGSSKETCH(%1$s) FROM %2$s", LONG_COLUMN, TABLE_NAME);
+    AggregationOperator aggregationOperator = getOperator(query);
+    AggregationResultsBlock resultsBlock = aggregationOperator.nextBlock();
+    List<Object> aggregationResult = resultsBlock.getResults();
+
+    assertNotNull(aggregationResult);
+    assertEquals(aggregationResult.size(), 1);
+
+    // Fetch the exact list by count/group-by and compare
+    Long[] exactOrdered = getExactOrderedLongs();
+    assertLongsSketch((LongsSketch) aggregationResult.get(0), exactOrdered);
+  }
+
+  @Test
+  public void testAggregationForStringSketches() {
+    // Retrieve sketches calculated by: 1) merger of sketches, 2) from plain 
values
+    String query = String.format(
+        "SELECT FREQUENTSTRINGSSKETCH(%1$s), FREQUENTSTRINGSSKETCH(%2$s) FROM 
%3$s",
+        STRING_SKETCH_COLUMN, STRING_COLUMN, TABLE_NAME);
+    AggregationOperator aggregationOperator = getOperator(query);
+    AggregationResultsBlock resultsBlock = aggregationOperator.nextBlock();
+    List<Object> aggregationResult = resultsBlock.getResults();
+
+    assertNotNull(aggregationResult);
+    assertEquals(aggregationResult.size(), 2);
+
+    // Assert the sketches are equivalent
+    ItemsSketch<String> sketch1 = (ItemsSketch<String>) 
aggregationResult.get(0);
+    ItemsSketch<String> sketch2 = (ItemsSketch<String>) 
aggregationResult.get(1);
+    assertEquals(
+        sketch1.getFrequentItems(ErrorType.NO_FALSE_NEGATIVES),
+        sketch2.getFrequentItems(ErrorType.NO_FALSE_NEGATIVES));
+  }
+
+  @Test
+  public void testAggregationForLongSketches() {
+    // Retrieve sketches calculated by: 1) merger of sketches, 2) from plain 
values
+    String query = String.format(
+        "SELECT FREQUENTLONGSSKETCH(%1$s), FREQUENTLONGSSKETCH(%2$s) FROM 
%3$s",
+        LONG_SKETCH_COLUMN, LONG_COLUMN, TABLE_NAME);
+    AggregationOperator aggregationOperator = getOperator(query);
+    AggregationResultsBlock resultsBlock = aggregationOperator.nextBlock();
+    List<Object> aggregationResult = resultsBlock.getResults();
+
+    assertNotNull(aggregationResult);
+    assertEquals(aggregationResult.size(), 2);
+
+    // Assert the sketches are equivalent
+    LongsSketch sketch1 = (LongsSketch) aggregationResult.get(0);
+    LongsSketch sketch2 = (LongsSketch) aggregationResult.get(1);
+    assertEquals(
+        sketch1.getFrequentItems(ErrorType.NO_FALSE_NEGATIVES),
+        sketch2.getFrequentItems(ErrorType.NO_FALSE_NEGATIVES));
+  }
+
+  @Test
+  public void testGroupByStringSketches() {
+    // Fetch the sketch object which collects Frequent Items
+    String query = String.format(
+        "SELECT %1$s, FREQUENTSTRINGSSKETCH(%2$s) FROM %3$s GROUP BY 1",
+        GROUP_BY_COLUMN, STRING_COLUMN, TABLE_NAME);
+    BrokerResponse brokerResponse = getBrokerResponse(query);
+    ResultTable resultTable = brokerResponse.getResultTable();
+    List<Object[]> rows = resultTable.getRows();
+
+    assertNotNull(rows);
+    assertEquals(rows.size(), 2); // should be 2 groups
+
+    // Fetch the exact list by count/group-by and compare
+    Map<String, ArrayList<String>> exactOrdered = 
getExactOrderedStringGroups();
+    for (Object[] row: rows) {
+      String group = (String) row[0];
+      ItemsSketch<String> sketch = decodeStringsSketch((String) row[1]);
+      List<String> exactOrder = exactOrdered.get(group);
+      assertStringsSketch(sketch, exactOrder);
+    }
+  }
+
+  @Test
+  public void testGroupByLongSketches() {
+    // Fetch the sketch object which collects Frequent Items
+    String query = String.format(
+        "SELECT %1$s, FREQUENTLONGSSKETCH(%2$s) FROM %3$s GROUP BY 1",
+        GROUP_BY_COLUMN, LONG_COLUMN, TABLE_NAME);
+    BrokerResponse brokerResponse = getBrokerResponse(query);
+    ResultTable resultTable = brokerResponse.getResultTable();
+    List<Object[]> rows = resultTable.getRows();
+
+    assertNotNull(rows);
+    assertEquals(rows.size(), 2); // should be 2 groups
+
+    // Fetch the exact list by count/group-by and compare
+    Map<String, ArrayList<Long>> exactOrdered = getExactOrderedLongGroups();
+    for (Object[] row: rows) {
+      String group = (String) row[0];
+      LongsSketch sketch = decodeLongsSketch((String) row[1]);
+      List<Long> exactOrder = exactOrdered.get(group);
+      assertLongsSketch(sketch, exactOrder);
+    }
+  }
+
+
+  private String[] getExactOrderedStrings() {
+    Object[] objects = getExactOrderForColumn(STRING_COLUMN);
+    return Arrays.copyOf(objects, objects.length, String[].class);
+  }
+
+  private Long[] getExactOrderedLongs() {
+    Object[] objects = getExactOrderForColumn(LONG_COLUMN);
+    return Arrays.copyOf(objects, objects.length, Long[].class);
+  }
+
+  private Object[] getExactOrderForColumn(String col) {
+    String query = String.format(
+        "SELECT %1$s, COUNT(1) FROM %2$s GROUP BY 1 ORDER BY 2 DESC", col, 
TABLE_NAME);
+    BrokerResponse resp = getBrokerResponse(query);
+    ResultTable results = resp.getResultTable();
+    List<Object[]> rows = results.getRows();
+    return rows.stream().map((Object[] row) -> row[0]).toArray();
+  }
+
+  private Object[] getExactOrderForColumn2(String query) {
+    BrokerResponse resp = getBrokerResponse(query);
+    ResultTable results = resp.getResultTable();
+    List<Object[]> rows = results.getRows();
+    return rows.stream().map((Object[] row) -> row[0]).toArray();
+  }
+
+  private Map<String, ArrayList<String>> getExactOrderedStringGroups() {
+    String query = String.format(
+        "SELECT %1$s, %2$s, COUNT(1) FROM %3$s GROUP BY 1,2 ORDER BY 3 DESC",
+        GROUP_BY_COLUMN, STRING_COLUMN, TABLE_NAME);
+    BrokerResponse resp = getBrokerResponse(query);
+    ResultTable results = resp.getResultTable();
+    List<Object[]> rows = results.getRows();
+    Map<String, ArrayList<String>> order = new HashMap<>();
+    for (Object[] row: rows) {
+      String group = (String) row[0];
+      if (!order.containsKey(group)) {
+        order.put(group, new ArrayList<>());
+      }
+      order.get(group).add((String) row[1]);
+    }
+    return order;
+  }
+
+  private Map<String, ArrayList<Long>> getExactOrderedLongGroups() {
+    String query = String.format(
+        "SELECT %1$s, %2$s, COUNT(1) FROM %3$s GROUP BY 1,2 ORDER BY 3 DESC",
+        GROUP_BY_COLUMN, LONG_COLUMN, TABLE_NAME);
+    BrokerResponse resp = getBrokerResponse(query);
+    ResultTable results = resp.getResultTable();
+    List<Object[]> rows = results.getRows();
+    Map<String, ArrayList<Long>> order = new HashMap<>();
+    for (Object[] row: rows) {
+      String group = (String) row[0];
+      if (!order.containsKey(group)) {
+        order.put(group, new ArrayList<>());
+      }
+      order.get(group).add((Long) row[1]);
+    }
+    return order;
+  }
+
+  private void assertStringsSketch(ItemsSketch<String> sketch, List<String> 
exact) {
+    String[] arr = new String[exact.size()];
+    exact.toArray(arr);
+    assertStringsSketch(sketch, arr);
+  }
+
+  private void assertStringsSketch(ItemsSketch<String> sketch, String[] exact) 
{
+    ItemsSketch.Row[] items = 
sketch.getFrequentItems(ErrorType.NO_FALSE_NEGATIVES);
+    assertEquals(exact.length, items.length);
+    for (int i = 0; i < exact.length; i++) {
+      assertEquals((String) items[i].getItem(), exact[i]);
+    }
+  }
+
+  private void assertLongsSketch(LongsSketch sketch, List<Long> exact) {
+    Long[] arr = new Long[exact.size()];
+    exact.toArray(arr);
+    assertLongsSketch(sketch, arr);
+  }
+
+  private void assertLongsSketch(LongsSketch sketch, Long[] exact) {
+    LongsSketch.Row[] items = 
sketch.getFrequentItems(ErrorType.NO_FALSE_NEGATIVES);
+    assertEquals(exact.length, items.length);
+    for (int i = 0; i < exact.length; i++) {
+      assertEquals((Long) items[i].getItem(), exact[i]);
+    }
+  }
+
+  private ItemsSketch<String> decodeStringsSketch(String encodedSketch) {
+    byte[] byteArr = Base64.getDecoder().decode(encodedSketch);
+    return ItemsSketch.getInstance(Memory.wrap(byteArr), new 
ArrayOfStringsSerDe());
+  }
+
+  private LongsSketch decodeLongsSketch(String encodedSketch) {
+    byte[] byteArr = Base64.getDecoder().decode(encodedSketch);
+    return LongsSketch.getInstance(Memory.wrap(byteArr));
+  }
+
+  @AfterClass
+  public void tearDown() {
+    _indexSegment.destroy();
+    FileUtils.deleteQuietly(INDEX_DIR);
+  }
+}
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/SerializedFrequentLongsSketch.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/SerializedFrequentLongsSketch.java
new file mode 100644
index 0000000000..53124e473b
--- /dev/null
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/SerializedFrequentLongsSketch.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.customobject;
+
+import java.util.Base64;
+import javax.validation.constraints.NotNull;
+import org.apache.datasketches.frequencies.LongsSketch;
+
+
+public class SerializedFrequentLongsSketch implements Comparable<LongsSketch> {
+  private final LongsSketch _sketch;
+
+  public SerializedFrequentLongsSketch(LongsSketch sketch) {
+    _sketch = sketch;
+  }
+
+  @Override
+  public int compareTo(@NotNull LongsSketch other) {
+    // There is no well-defined ordering for these sketches
+    // numActiveItems is just a placeholder, which can be changed later
+    return _sketch.getNumActiveItems() - other.getNumActiveItems();
+  }
+
+  @Override
+  public String toString() {
+    return Base64.getEncoder().encodeToString(_sketch.toByteArray());
+  }
+}
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/SerializedFrequentStringsSketch.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/SerializedFrequentStringsSketch.java
new file mode 100644
index 0000000000..40f89bc83d
--- /dev/null
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/SerializedFrequentStringsSketch.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.customobject;
+
+import java.util.Base64;
+import javax.validation.constraints.NotNull;
+import org.apache.datasketches.common.ArrayOfStringsSerDe;
+import org.apache.datasketches.frequencies.ItemsSketch;
+
+public class SerializedFrequentStringsSketch implements 
Comparable<ItemsSketch<String>> {
+  private final ItemsSketch<String> _sketch;
+
+  public SerializedFrequentStringsSketch(ItemsSketch<String> sketch) {
+    _sketch = sketch;
+  }
+
+  @Override
+  public int compareTo(@NotNull ItemsSketch<String> other) {
+    // There is no well-defined ordering for these sketches
+    // numActiveItems is just a placeholder, which can be changed later
+    return _sketch.getNumActiveItems() - other.getNumActiveItems();
+  }
+
+  @Override
+  public String toString() {
+    return Base64.getEncoder().encodeToString(_sketch.toByteArray(new 
ArrayOfStringsSerDe()));
+  }
+}
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
index 4ebc95926c..4ac3b32af9 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
@@ -190,6 +190,9 @@ public enum AggregationFunctionType {
       SqlKind.OTHER_FUNCTION, SqlFunctionCategory.USER_DEFINED_FUNCTION, 
OperandTypes.BINARY, ReturnTypes.BIGINT,
       ReturnTypes.explicit(SqlTypeName.OTHER)),
 
+  FREQUENTSTRINGSSKETCH("frequentStringsSketch"),
+  FREQUENTLONGSSKETCH("frequentLongsSketch"),
+
   // Geo aggregation functions
   STUNION("STUnion", ImmutableList.of("ST_UNION"), SqlKind.OTHER_FUNCTION, 
SqlFunctionCategory.USER_DEFINED_FUNCTION,
       OperandTypes.BINARY, ReturnTypes.explicit(SqlTypeName.VARBINARY), 
ReturnTypes.explicit(SqlTypeName.OTHER)),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org


Reply via email to