This is an automated email from the ASF dual-hosted git repository. tingchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new cf017f1c63 Add FrequentStringsSketch and FrequentLonsSketch aggregation functions (#11098) cf017f1c63 is described below commit cf017f1c63de8703a71dcaaab4506ffee0672950 Author: Caner Balci <ba...@uber.com> AuthorDate: Tue Sep 19 16:49:59 2023 -0700 Add FrequentStringsSketch and FrequentLonsSketch aggregation functions (#11098) Squashed commit: Fix checkstyle validations Split FrequentItems sketch into FrequentStrings and FrequentLongs sketches Fix datasketch utility import Add documentation and do some cleanups Whitespace and typo fixes Add missing license header --- .../apache/pinot/core/common/ObjectSerDeUtils.java | 53 ++- .../function/AggregationFunctionFactory.java | 4 + .../FrequentLongsSketchAggregationFunction.java | 268 ++++++++++++++ .../FrequentStringsSketchAggregationFunction.java | 252 ++++++++++++++ .../queries/FrequentItemsSketchQueriesTest.java | 387 +++++++++++++++++++++ .../SerializedFrequentLongsSketch.java | 44 +++ .../SerializedFrequentStringsSketch.java | 44 +++ .../pinot/segment/spi/AggregationFunctionType.java | 3 + 8 files changed, 1054 insertions(+), 1 deletion(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java index a18a23df35..69c00ea5b8 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java @@ -57,6 +57,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.datasketches.common.ArrayOfStringsSerDe; +import org.apache.datasketches.frequencies.ItemsSketch; +import org.apache.datasketches.frequencies.LongsSketch; import org.apache.datasketches.kll.KllDoublesSketch; import org.apache.datasketches.memory.Memory; import org.apache.datasketches.theta.Sketch; @@ -134,7 +137,9 @@ public class ObjectSerDeUtils { PinotFourthMoment(34), ArgMinMaxObject(35), KllDataSketch(36), - IntegerTupleSketch(37); + IntegerTupleSketch(37), + FrequentStringsSketch(38), + FrequentLongsSketch(39); private final int _value; @@ -226,6 +231,10 @@ public class ObjectSerDeUtils { return ObjectType.IntegerTupleSketch; } else if (value instanceof ExprMinMaxObject) { return ObjectType.ArgMinMaxObject; + } else if (value instanceof ItemsSketch) { + return ObjectType.FrequentStringsSketch; + } else if (value instanceof LongsSketch) { + return ObjectType.FrequentLongsSketch; } else { throw new IllegalArgumentException("Unsupported type of value: " + value.getClass().getSimpleName()); } @@ -1285,6 +1294,46 @@ public class ObjectSerDeUtils { } }; + public static final ObjectSerDe<ItemsSketch<String>> FREQUENT_STRINGS_SKETCH_SER_DE = + new ObjectSerDe<>() { + @Override + public byte[] serialize(ItemsSketch<String> sketch) { + return sketch.toByteArray(new ArrayOfStringsSerDe()); + } + + @Override + public ItemsSketch<String> deserialize(byte[] bytes) { + return ItemsSketch.getInstance(Memory.wrap(bytes), new ArrayOfStringsSerDe()); + } + + @Override + public ItemsSketch<String> deserialize(ByteBuffer byteBuffer) { + byte[] arr = new byte[byteBuffer.remaining()]; + byteBuffer.get(arr); + return ItemsSketch.getInstance(Memory.wrap(arr), new ArrayOfStringsSerDe()); + } + }; + + public static final ObjectSerDe<LongsSketch> FREQUENT_LONGS_SKETCH_SER_DE = + new ObjectSerDe<>() { + @Override + public byte[] serialize(LongsSketch sketch) { + return sketch.toByteArray(); + } + + @Override + public LongsSketch deserialize(byte[] bytes) { + return LongsSketch.getInstance(Memory.wrap(bytes)); + } + + @Override + public LongsSketch deserialize(ByteBuffer byteBuffer) { + byte[] arr = new byte[byteBuffer.remaining()]; + byteBuffer.get(arr); + return LongsSketch.getInstance(Memory.wrap(arr)); + } + }; + // NOTE: DO NOT change the order, it has to be the same order as the ObjectType //@formatter:off private static final ObjectSerDe[] SER_DES = { @@ -1326,6 +1375,8 @@ public class ObjectSerDeUtils { ARG_MIN_MAX_OBJECT_SER_DE, KLL_SKETCH_SER_DE, DATA_SKETCH_INT_TUPLE_SER_DE, + FREQUENT_STRINGS_SKETCH_SER_DE, + FREQUENT_LONGS_SKETCH_SER_DE, }; //@formatter:on diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java index 7f5695aa6c..2d69c093f2 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java @@ -351,6 +351,10 @@ public class AggregationFunctionFactory { "Aggregation function: " + functionType + " is only supported in selection without alias."); case FUNNELCOUNT: return new FunnelCountAggregationFunctionFactory(arguments).get(); + case FREQUENTSTRINGSSKETCH: + return new FrequentStringsSketchAggregationFunction(arguments); + case FREQUENTLONGSSKETCH: + return new FrequentLongsSketchAggregationFunction(arguments); default: throw new IllegalArgumentException("Unsupported aggregation function type: " + functionType); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FrequentLongsSketchAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FrequentLongsSketchAggregationFunction.java new file mode 100644 index 0000000000..64761096df --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FrequentLongsSketchAggregationFunction.java @@ -0,0 +1,268 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pinot.core.query.aggregation.function; + +import com.google.common.base.Preconditions; +import java.util.List; +import java.util.Map; +import org.apache.datasketches.frequencies.LongsSketch; +import org.apache.datasketches.memory.Memory; +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.core.common.BlockValSet; +import org.apache.pinot.core.query.aggregation.AggregationResultHolder; +import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder; +import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder; +import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder; +import org.apache.pinot.segment.local.customobject.SerializedFrequentLongsSketch; +import org.apache.pinot.segment.spi.AggregationFunctionType; +import org.apache.pinot.spi.data.FieldSpec; + + +/** + * <p> + * {@code FrequentLongsSketchAggregationFunction} provides an approximate FrequentItems aggregation function based on + * <a href="https://datasketches.apache.org/docs/Frequency/FrequentItemsOverview.html">Apache DataSketches library</a>. + * It is memory efficient compared to exact counting. + * </p> + * <p> + * The function takes an INT or LONG column as input and returns a Base64 encoded sketch object which can be + * deserialized and used to estimate the frequency of items in the dataset (how many times they appear). + * </p> + * <p><b>FREQUENT_STRINGS_SKETCH(col, maxMapSize=256)</b></p> + * <p>E.g.:</p> + * <ul> + * <li><b>FREQUENT_LONGS_SKETCH(col)</b></li> + * <li><b>FREQUENT_LONGS_SKETCH(col, 1024)</b></li> + * </ul> + * + * <p> + * If the column type is BYTES, the aggregation function will assume it is a serialized FrequentItems data sketch + * of type `LongsSketch`and will attempt to deserialize it for merging with other sketch objects. + * </p> + * + * <p> + * Second argument, maxMapsSize, refers to the size of the physical length of the hashmap which stores counts. It + * influences the accuracy of the sketch and should be a power of 2. + * </p> + * + * <p> + * There is a variation of the function (<b>FREQUENT_STRINGS_SKETCH</b>) which accepts STRING type input columns. + * </p> + */ +public class FrequentLongsSketchAggregationFunction + extends BaseSingleInputAggregationFunction<LongsSketch, Comparable<?>> { + protected static final int DEFAULT_MAX_MAP_SIZE = 256; + + protected int _maxMapSize; + + public FrequentLongsSketchAggregationFunction(List<ExpressionContext> arguments) { + super(arguments.get(0)); + int numArguments = arguments.size(); + Preconditions.checkArgument(numArguments == 1 || numArguments == 2, + "Expecting 1 or 2 arguments for FrequentLongsSketch function: FREQUENTITEMSSKETCH(column, maxMapSize"); + _maxMapSize = numArguments == 2 ? arguments.get(1).getLiteral().getIntValue() : DEFAULT_MAX_MAP_SIZE; + } + + @Override + public AggregationFunctionType getType() { + return AggregationFunctionType.FREQUENTLONGSSKETCH; + } + + @Override + public AggregationResultHolder createAggregationResultHolder() { + return new ObjectAggregationResultHolder(); + } + + @Override + public GroupByResultHolder createGroupByResultHolder(int initialCapacity, int maxCapacity) { + return new ObjectGroupByResultHolder(initialCapacity, maxCapacity); + } + + @Override + public void aggregate(int length, AggregationResultHolder aggregationResultHolder, + Map<ExpressionContext, BlockValSet> blockValSetMap) { + BlockValSet valueSet = blockValSetMap.get(_expression); + FieldSpec.DataType valueType = valueSet.getValueType(); + + LongsSketch sketch = getOrCreateSketch(aggregationResultHolder); + + switch (valueType) { + case BYTES: + // Assuming the column contains serialized data sketch + LongsSketch[] deserializedSketches = deserializeSketches(blockValSetMap.get(_expression).getBytesValuesSV()); + sketch = getOrCreateSketch(aggregationResultHolder); + + for (LongsSketch colSketch : deserializedSketches) { + sketch.merge(colSketch); + } + break; + case INT: + case LONG: + for (Long val : valueSet.getLongValuesSV()) { + sketch.update(val); + } + break; + default: + throw new UnsupportedOperationException("Cannot aggregate on non int/long types"); + } + } + + @Override + public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder, + Map<ExpressionContext, BlockValSet> blockValSetMap) { + BlockValSet valueSet = blockValSetMap.get(_expression); + FieldSpec.DataType valueType = valueSet.getValueType(); + + switch (valueType) { + case BYTES: + // serialized sketch + LongsSketch[] deserializedSketches = + deserializeSketches(blockValSetMap.get(_expression).getBytesValuesSV()); + for (int i = 0; i < length; i++) { + LongsSketch sketch = getOrCreateSketch(groupByResultHolder, groupKeyArray[i]); + sketch.merge(deserializedSketches[i]); + } + break; + case INT: + case LONG: + long[] values = valueSet.getLongValuesSV(); + for (int i = 0; i < length; i++) { + LongsSketch sketch = getOrCreateSketch(groupByResultHolder, groupKeyArray[i]); + sketch.update(values[i]); + } + break; + default: + throw new UnsupportedOperationException("Cannot aggregate on non int/long types"); + } + } + + @Override + public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder, + Map<ExpressionContext, BlockValSet> blockValSetMap) { + BlockValSet valueSet = blockValSetMap.get(_expression); + FieldSpec.DataType valueType = valueSet.getValueType(); + + switch (valueType) { + case BYTES: + // serialized sketch + LongsSketch[] deserializedSketches = + deserializeSketches(blockValSetMap.get(_expression).getBytesValuesSV()); + for (int i = 0; i < length; i++) { + for (int groupKey : groupKeysArray[i]) { + LongsSketch sketch = getOrCreateSketch(groupByResultHolder, groupKey); + sketch.merge(deserializedSketches[i]); + } + } + break; + case INT: + case LONG: + long[] values = valueSet.getLongValuesSV(); + for (int i = 0; i < length; i++) { + for (int groupKey : groupKeysArray[i]) { + LongsSketch sketch = getOrCreateSketch(groupByResultHolder, groupKey); + sketch.update(values[i]); + } + } + break; + default: + throw new UnsupportedOperationException("Cannot aggregate on non int/long types"); + } + } + + /** + * Extracts the sketch from the result holder or creates a new one if it does not exist. + */ + protected LongsSketch getOrCreateSketch(AggregationResultHolder aggregationResultHolder) { + LongsSketch sketch = aggregationResultHolder.getResult(); + if (sketch == null) { + sketch = new LongsSketch(_maxMapSize); + aggregationResultHolder.setValue(sketch); + } + return sketch; + } + + /** + * Extracts the sketch from the group by result holder for key + * or creates a new one if it does not exist. + */ + protected LongsSketch getOrCreateSketch(GroupByResultHolder groupByResultHolder, int groupKey) { + LongsSketch sketch = groupByResultHolder.getResult(groupKey); + if (sketch == null) { + sketch = new LongsSketch(_maxMapSize); + groupByResultHolder.setValueForKey(groupKey, sketch); + } + return sketch; + } + + /** + * Deserializes the sketches from the bytes. + */ + protected LongsSketch[] deserializeSketches(byte[][] serializedSketches) { + LongsSketch[] sketches = new LongsSketch[serializedSketches.length]; + for (int i = 0; i < serializedSketches.length; i++) { + sketches[i] = LongsSketch.getInstance(Memory.wrap(serializedSketches[i])); + } + return sketches; + } + + @Override + public LongsSketch extractAggregationResult(AggregationResultHolder aggregationResultHolder) { + return aggregationResultHolder.getResult(); + } + + @Override + public LongsSketch extractGroupByResult(GroupByResultHolder groupByResultHolder, int groupKey) { + return groupByResultHolder.getResult(groupKey); + } + + @Override + public LongsSketch merge(LongsSketch sketch1, LongsSketch sketch2) { + LongsSketch union = new LongsSketch(_maxMapSize); + if (sketch1 != null) { + union.merge(sketch1); + } + if (sketch2 != null) { + union.merge(sketch2); + } + return union; + } + + @Override + public DataSchema.ColumnDataType getIntermediateResultColumnType() { + return DataSchema.ColumnDataType.OBJECT; + } + + @Override + public DataSchema.ColumnDataType getFinalResultColumnType() { + return DataSchema.ColumnDataType.STRING; + } + + @Override + public String getResultColumnName() { + return AggregationFunctionType.FREQUENTLONGSSKETCH.getName().toLowerCase() + + "(" + _expression + ")"; + } + + @Override + public Comparable<?> extractFinalResult(LongsSketch sketch) { + return new SerializedFrequentLongsSketch(sketch); + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FrequentStringsSketchAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FrequentStringsSketchAggregationFunction.java new file mode 100644 index 0000000000..2424f223cf --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FrequentStringsSketchAggregationFunction.java @@ -0,0 +1,252 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pinot.core.query.aggregation.function; + +import com.google.common.base.Preconditions; +import java.util.List; +import java.util.Map; +import org.apache.datasketches.common.ArrayOfStringsSerDe; +import org.apache.datasketches.frequencies.ItemsSketch; +import org.apache.datasketches.memory.Memory; +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.core.common.BlockValSet; +import org.apache.pinot.core.query.aggregation.AggregationResultHolder; +import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder; +import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder; +import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder; +import org.apache.pinot.segment.local.customobject.SerializedFrequentStringsSketch; +import org.apache.pinot.segment.spi.AggregationFunctionType; +import org.apache.pinot.spi.data.FieldSpec; + + +/** + * <p> + * {@code FrequentStringsSketchAggregationFunction} provides an approximate FrequentItems aggregation function based on + * <a href="https://datasketches.apache.org/docs/Frequency/FrequentItemsOverview.html">Apache DataSketches library</a>. + * It is memory efficient compared to exact counting. + * </p> + * <p> + * The function takes a STRING column as input and returns a Base64 encoded sketch object which can be + * deserialized and used to estimate the frequency of items in the dataset (how many times they appear). + * </p> + * <p><b>FREQUENT_STRINGS_SKETCH(col, maxMapSize=256)</b></p> + * <p>E.g.:</p> + * <ul> + * <li><b>FREQUENT_STRINGS_SKETCH(col)</b></li> + * <li><b>FREQUENT_STRINGS_SKETCH(col, 1024)</b></li> + * </ul> + * + * <p> + * If the column type is BYTES, the aggregation function will assume it is a serialized FrequentItems data sketch + * of type `ItemSketch<String>`and will attempt to deserialize it for merging with other sketch objects. + * </p> + * + * <p> + * Second argument, maxMapsSize, refers to the size of the physical length of the hashmap which stores counts. It + * influences the accuracy of the sketch and should be a power of 2. + * </p> + * + * <p> + * There is a variation of the function (<b>FREQUENT_LONGS_SKETCH</b>) which accept INT and LONG type input columns. + * </p> + */ +public class FrequentStringsSketchAggregationFunction + extends BaseSingleInputAggregationFunction<ItemsSketch<String>, Comparable<?>> { + protected static final int DEFAULT_MAX_MAP_SIZE = 256; + + protected int _maxMapSize; + + public FrequentStringsSketchAggregationFunction(List<ExpressionContext> arguments) { + super(arguments.get(0)); + int numArguments = arguments.size(); + Preconditions.checkArgument(numArguments == 1 || numArguments == 2, + "Expecting 1 or 2 arguments for FrequentItemsSketch function: FREQUENTSTRINGSSKETCH(column, maxMapSize"); + _maxMapSize = numArguments == 2 ? arguments.get(1).getLiteral().getIntValue() : DEFAULT_MAX_MAP_SIZE; + } + + @Override + public AggregationFunctionType getType() { + return AggregationFunctionType.FREQUENTSTRINGSSKETCH; + } + + @Override + public AggregationResultHolder createAggregationResultHolder() { + return new ObjectAggregationResultHolder(); + } + + @Override + public GroupByResultHolder createGroupByResultHolder(int initialCapacity, int maxCapacity) { + return new ObjectGroupByResultHolder(initialCapacity, maxCapacity); + } + + @Override + public void aggregate(int length, AggregationResultHolder aggregationResultHolder, + Map<ExpressionContext, BlockValSet> blockValSetMap) { + BlockValSet valueSet = blockValSetMap.get(_expression); + FieldSpec.DataType valueType = valueSet.getValueType(); + + ItemsSketch<String> sketch = getOrCreateSketch(aggregationResultHolder); + + if (valueType == FieldSpec.DataType.BYTES) { + // Assuming the column contains serialized data sketch + ItemsSketch<String>[] deserializedSketches = + deserializeSketches(blockValSetMap.get(_expression).getBytesValuesSV()); + sketch = getOrCreateSketch(aggregationResultHolder); + + for (ItemsSketch<String> colSketch : deserializedSketches) { + sketch.merge(colSketch); + } + } else { + for (String val : valueSet.getStringValuesSV()) { + sketch.update(val); + } + } + } + + @Override + public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder, + Map<ExpressionContext, BlockValSet> blockValSetMap) { + BlockValSet valueSet = blockValSetMap.get(_expression); + FieldSpec.DataType valueType = valueSet.getValueType(); + + if (valueType == FieldSpec.DataType.BYTES) { + // serialized sketch + ItemsSketch<String>[] deserializedSketches = + deserializeSketches(blockValSetMap.get(_expression).getBytesValuesSV()); + for (int i = 0; i < length; i++) { + ItemsSketch<String> sketch = getOrCreateSketch(groupByResultHolder, groupKeyArray[i]); + sketch.merge(deserializedSketches[i]); + } + } else { + String[] values = valueSet.getStringValuesSV(); + for (int i = 0; i < length; i++) { + ItemsSketch<String> sketch = getOrCreateSketch(groupByResultHolder, groupKeyArray[i]); + sketch.update(values[i]); + } + } + } + + @Override + public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder, + Map<ExpressionContext, BlockValSet> blockValSetMap) { + BlockValSet valueSet = blockValSetMap.get(_expression); + FieldSpec.DataType valueType = valueSet.getValueType(); + + if (valueType == FieldSpec.DataType.BYTES) { + // serialized sketch + ItemsSketch<String>[] deserializedSketches = + deserializeSketches(blockValSetMap.get(_expression).getBytesValuesSV()); + for (int i = 0; i < length; i++) { + for (int groupKey : groupKeysArray[i]) { + ItemsSketch<String> sketch = getOrCreateSketch(groupByResultHolder, groupKey); + sketch.merge(deserializedSketches[i]); + } + } + } else { + String[] values = valueSet.getStringValuesSV(); + for (int i = 0; i < length; i++) { + for (int groupKey : groupKeysArray[i]) { + ItemsSketch<String> sketch = getOrCreateSketch(groupByResultHolder, groupKey); + sketch.update(values[i]); + } + } + } + } + + /** + * Extracts the sketch from the result holder or creates a new one if it does not exist. + */ + protected ItemsSketch<String> getOrCreateSketch(AggregationResultHolder aggregationResultHolder) { + ItemsSketch<String> sketch = aggregationResultHolder.getResult(); + if (sketch == null) { + sketch = new ItemsSketch<>(_maxMapSize); + aggregationResultHolder.setValue(sketch); + } + return sketch; + } + + /** + * Extracts the sketch from the group by result holder for key + * or creates a new one if it does not exist. + */ + protected ItemsSketch<String> getOrCreateSketch(GroupByResultHolder groupByResultHolder, int groupKey) { + ItemsSketch<String> sketch = groupByResultHolder.getResult(groupKey); + if (sketch == null) { + sketch = new ItemsSketch<>(_maxMapSize); + groupByResultHolder.setValueForKey(groupKey, sketch); + } + return sketch; + } + + /** + * Deserializes the sketches from the bytes. + */ + protected ItemsSketch<String>[] deserializeSketches(byte[][] serializedSketches) { + ItemsSketch<String>[] sketches = new ItemsSketch[serializedSketches.length]; + for (int i = 0; i < serializedSketches.length; i++) { + sketches[i] = ItemsSketch.getInstance(Memory.wrap(serializedSketches[i]), new ArrayOfStringsSerDe()); + } + return sketches; + } + + @Override + public ItemsSketch<String> extractAggregationResult(AggregationResultHolder aggregationResultHolder) { + return aggregationResultHolder.getResult(); + } + + @Override + public ItemsSketch<String> extractGroupByResult(GroupByResultHolder groupByResultHolder, int groupKey) { + return groupByResultHolder.getResult(groupKey); + } + + @Override + public ItemsSketch<String> merge(ItemsSketch<String> sketch1, ItemsSketch<String> sketch2) { + ItemsSketch<String> union = new ItemsSketch<>(_maxMapSize); + if (sketch1 != null) { + union.merge(sketch1); + } + if (sketch2 != null) { + union.merge(sketch2); + } + return union; + } + + @Override + public DataSchema.ColumnDataType getIntermediateResultColumnType() { + return DataSchema.ColumnDataType.OBJECT; + } + + @Override + public DataSchema.ColumnDataType getFinalResultColumnType() { + return DataSchema.ColumnDataType.STRING; + } + + @Override + public String getResultColumnName() { + return AggregationFunctionType.FREQUENTSTRINGSSKETCH.getName().toLowerCase() + + "(" + _expression + ")"; + } + + @Override + public Comparable<?> extractFinalResult(ItemsSketch<String> sketch) { + return new SerializedFrequentStringsSketch(sketch); + } +} diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/FrequentItemsSketchQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/FrequentItemsSketchQueriesTest.java new file mode 100644 index 0000000000..deee8f64a3 --- /dev/null +++ b/pinot-core/src/test/java/org/apache/pinot/queries/FrequentItemsSketchQueriesTest.java @@ -0,0 +1,387 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.queries; + +import java.io.File; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Base64; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.commons.io.FileUtils; +import org.apache.datasketches.common.ArrayOfStringsSerDe; +import org.apache.datasketches.frequencies.ErrorType; +import org.apache.datasketches.frequencies.ItemsSketch; +import org.apache.datasketches.frequencies.LongsSketch; +import org.apache.datasketches.memory.Memory; +import org.apache.pinot.common.response.BrokerResponse; +import org.apache.pinot.common.response.broker.ResultTable; +import org.apache.pinot.core.operator.blocks.results.AggregationResultsBlock; +import org.apache.pinot.core.operator.query.AggregationOperator; +import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader; +import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl; +import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader; +import org.apache.pinot.segment.spi.ImmutableSegment; +import org.apache.pinot.segment.spi.IndexSegment; +import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.DimensionFieldSpec; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.MetricFieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.data.readers.RecordReader; +import org.apache.pinot.spi.utils.ReadMode; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; + + +/** + * Tests for FREQUENT_STRINGS_SKETCH and FREQUENT_LONGS_SKETCH aggregation functions. + * + * <ul> + * <li>Generates a segment with LONG, STRING, SKETCH and a group-by column</li> + * <li>Runs aggregation and group-by queries on the generated segment</li> + * <li>Compares the results from sketches to exact calculations via count()</li> + * </ul> + */ +public class FrequentItemsSketchQueriesTest extends BaseQueriesTest { + protected static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "FrequentItemsQueriesTest"); + protected static final String TABLE_NAME = "testTable"; + protected static final String SEGMENT_NAME = "testSegment"; + + protected static final int MAX_MAP_SIZE = 64; + protected static final String LONG_COLUMN = "longColumn"; + protected static final String STRING_COLUMN = "stringColumn"; + protected static final String STRING_SKETCH_COLUMN = "stringSketchColumn"; + protected static final String LONG_SKETCH_COLUMN = "longSketchColumn"; + protected static final String GROUP_BY_COLUMN = "groupByColumn"; + + private IndexSegment _indexSegment; + private List<IndexSegment> _indexSegments; + + @Override + protected String getFilter() { + return ""; // No filtering required for this test. + } + + @Override + protected IndexSegment getIndexSegment() { + return _indexSegment; + } + + @Override + protected List<IndexSegment> getIndexSegments() { + return _indexSegments; + } + + @BeforeClass + public void setUp() + throws Exception { + FileUtils.deleteQuietly(INDEX_DIR); + + buildSegment(); + ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(new File(INDEX_DIR, SEGMENT_NAME), ReadMode.mmap); + _indexSegment = immutableSegment; + _indexSegments = Arrays.asList(immutableSegment, immutableSegment); + } + + protected void buildSegment() throws Exception { + + // Values chosen with distinct frequencies not to create ambiguity in testing + String[] strValues = new String[] {"a", "a", "a", "b", "b", "a", "d", "d", "c", "d"}; + Long[] longValues = new Long[] {1L, 2L, 1L, 1L, 1L, 2L, 5L, 4L, 4L, 4L}; + String[] groups = new String[] {"g1", "g1", "g1", "g1", "g1", "g1", "g2", "g2", "g2", "g2"}; + + List<GenericRow> rows = new ArrayList<>(strValues.length); + for (int i = 0; i < strValues.length; i++) { + GenericRow row = new GenericRow(); + + row.putValue(LONG_COLUMN, longValues[i]); + row.putValue(STRING_COLUMN, strValues[i]); + + LongsSketch longSketch = new LongsSketch(MAX_MAP_SIZE); + longSketch.update(longValues[i]); + row.putValue(LONG_SKETCH_COLUMN, longSketch.toByteArray()); + + ItemsSketch<String> strSketch = new ItemsSketch<>(MAX_MAP_SIZE); + strSketch.update(strValues[i]); + row.putValue(STRING_SKETCH_COLUMN, strSketch.toByteArray(new ArrayOfStringsSerDe())); + + row.putValue(GROUP_BY_COLUMN, groups[i]); + + rows.add(row); + } + + Schema schema = new Schema(); + schema.addField(new DimensionFieldSpec(LONG_COLUMN, FieldSpec.DataType.LONG, true)); + schema.addField(new DimensionFieldSpec(STRING_COLUMN, FieldSpec.DataType.STRING, true)); + schema.addField(new MetricFieldSpec(LONG_SKETCH_COLUMN, FieldSpec.DataType.BYTES)); + schema.addField(new MetricFieldSpec(STRING_SKETCH_COLUMN, FieldSpec.DataType.BYTES)); + schema.addField(new DimensionFieldSpec(GROUP_BY_COLUMN, FieldSpec.DataType.STRING, true)); + TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build(); + + SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, schema); + config.setOutDir(INDEX_DIR.getPath()); + config.setTableName(TABLE_NAME); + config.setSegmentName(SEGMENT_NAME); + + SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl(); + try (RecordReader recordReader = new GenericRowRecordReader(rows)) { + driver.init(config, recordReader); + driver.build(); + } + } + + @Test + public void testAggregationForStringValues() { + // Fetch the sketch object which collects Frequent Items + String query = String.format( + "SELECT FREQUENTSTRINGSSKETCH(%1$s) FROM %2$s", STRING_COLUMN, TABLE_NAME); + AggregationOperator aggregationOperator = getOperator(query); + AggregationResultsBlock resultsBlock = aggregationOperator.nextBlock(); + List<Object> aggregationResult = resultsBlock.getResults(); + + assertNotNull(aggregationResult); + assertEquals(aggregationResult.size(), 1); + + // Fetch the exact list by count/group-by and compare + String[] exactOrdered = getExactOrderedStrings(); + assertStringsSketch((ItemsSketch<String>) aggregationResult.get(0), exactOrdered); + } + + @Test + public void testAggregationForLongValues() { + // Fetch the sketch object which collects Frequent Items + String query = String.format( + "SELECT FREQUENTLONGSSKETCH(%1$s) FROM %2$s", LONG_COLUMN, TABLE_NAME); + AggregationOperator aggregationOperator = getOperator(query); + AggregationResultsBlock resultsBlock = aggregationOperator.nextBlock(); + List<Object> aggregationResult = resultsBlock.getResults(); + + assertNotNull(aggregationResult); + assertEquals(aggregationResult.size(), 1); + + // Fetch the exact list by count/group-by and compare + Long[] exactOrdered = getExactOrderedLongs(); + assertLongsSketch((LongsSketch) aggregationResult.get(0), exactOrdered); + } + + @Test + public void testAggregationForStringSketches() { + // Retrieve sketches calculated by: 1) merger of sketches, 2) from plain values + String query = String.format( + "SELECT FREQUENTSTRINGSSKETCH(%1$s), FREQUENTSTRINGSSKETCH(%2$s) FROM %3$s", + STRING_SKETCH_COLUMN, STRING_COLUMN, TABLE_NAME); + AggregationOperator aggregationOperator = getOperator(query); + AggregationResultsBlock resultsBlock = aggregationOperator.nextBlock(); + List<Object> aggregationResult = resultsBlock.getResults(); + + assertNotNull(aggregationResult); + assertEquals(aggregationResult.size(), 2); + + // Assert the sketches are equivalent + ItemsSketch<String> sketch1 = (ItemsSketch<String>) aggregationResult.get(0); + ItemsSketch<String> sketch2 = (ItemsSketch<String>) aggregationResult.get(1); + assertEquals( + sketch1.getFrequentItems(ErrorType.NO_FALSE_NEGATIVES), + sketch2.getFrequentItems(ErrorType.NO_FALSE_NEGATIVES)); + } + + @Test + public void testAggregationForLongSketches() { + // Retrieve sketches calculated by: 1) merger of sketches, 2) from plain values + String query = String.format( + "SELECT FREQUENTLONGSSKETCH(%1$s), FREQUENTLONGSSKETCH(%2$s) FROM %3$s", + LONG_SKETCH_COLUMN, LONG_COLUMN, TABLE_NAME); + AggregationOperator aggregationOperator = getOperator(query); + AggregationResultsBlock resultsBlock = aggregationOperator.nextBlock(); + List<Object> aggregationResult = resultsBlock.getResults(); + + assertNotNull(aggregationResult); + assertEquals(aggregationResult.size(), 2); + + // Assert the sketches are equivalent + LongsSketch sketch1 = (LongsSketch) aggregationResult.get(0); + LongsSketch sketch2 = (LongsSketch) aggregationResult.get(1); + assertEquals( + sketch1.getFrequentItems(ErrorType.NO_FALSE_NEGATIVES), + sketch2.getFrequentItems(ErrorType.NO_FALSE_NEGATIVES)); + } + + @Test + public void testGroupByStringSketches() { + // Fetch the sketch object which collects Frequent Items + String query = String.format( + "SELECT %1$s, FREQUENTSTRINGSSKETCH(%2$s) FROM %3$s GROUP BY 1", + GROUP_BY_COLUMN, STRING_COLUMN, TABLE_NAME); + BrokerResponse brokerResponse = getBrokerResponse(query); + ResultTable resultTable = brokerResponse.getResultTable(); + List<Object[]> rows = resultTable.getRows(); + + assertNotNull(rows); + assertEquals(rows.size(), 2); // should be 2 groups + + // Fetch the exact list by count/group-by and compare + Map<String, ArrayList<String>> exactOrdered = getExactOrderedStringGroups(); + for (Object[] row: rows) { + String group = (String) row[0]; + ItemsSketch<String> sketch = decodeStringsSketch((String) row[1]); + List<String> exactOrder = exactOrdered.get(group); + assertStringsSketch(sketch, exactOrder); + } + } + + @Test + public void testGroupByLongSketches() { + // Fetch the sketch object which collects Frequent Items + String query = String.format( + "SELECT %1$s, FREQUENTLONGSSKETCH(%2$s) FROM %3$s GROUP BY 1", + GROUP_BY_COLUMN, LONG_COLUMN, TABLE_NAME); + BrokerResponse brokerResponse = getBrokerResponse(query); + ResultTable resultTable = brokerResponse.getResultTable(); + List<Object[]> rows = resultTable.getRows(); + + assertNotNull(rows); + assertEquals(rows.size(), 2); // should be 2 groups + + // Fetch the exact list by count/group-by and compare + Map<String, ArrayList<Long>> exactOrdered = getExactOrderedLongGroups(); + for (Object[] row: rows) { + String group = (String) row[0]; + LongsSketch sketch = decodeLongsSketch((String) row[1]); + List<Long> exactOrder = exactOrdered.get(group); + assertLongsSketch(sketch, exactOrder); + } + } + + + private String[] getExactOrderedStrings() { + Object[] objects = getExactOrderForColumn(STRING_COLUMN); + return Arrays.copyOf(objects, objects.length, String[].class); + } + + private Long[] getExactOrderedLongs() { + Object[] objects = getExactOrderForColumn(LONG_COLUMN); + return Arrays.copyOf(objects, objects.length, Long[].class); + } + + private Object[] getExactOrderForColumn(String col) { + String query = String.format( + "SELECT %1$s, COUNT(1) FROM %2$s GROUP BY 1 ORDER BY 2 DESC", col, TABLE_NAME); + BrokerResponse resp = getBrokerResponse(query); + ResultTable results = resp.getResultTable(); + List<Object[]> rows = results.getRows(); + return rows.stream().map((Object[] row) -> row[0]).toArray(); + } + + private Object[] getExactOrderForColumn2(String query) { + BrokerResponse resp = getBrokerResponse(query); + ResultTable results = resp.getResultTable(); + List<Object[]> rows = results.getRows(); + return rows.stream().map((Object[] row) -> row[0]).toArray(); + } + + private Map<String, ArrayList<String>> getExactOrderedStringGroups() { + String query = String.format( + "SELECT %1$s, %2$s, COUNT(1) FROM %3$s GROUP BY 1,2 ORDER BY 3 DESC", + GROUP_BY_COLUMN, STRING_COLUMN, TABLE_NAME); + BrokerResponse resp = getBrokerResponse(query); + ResultTable results = resp.getResultTable(); + List<Object[]> rows = results.getRows(); + Map<String, ArrayList<String>> order = new HashMap<>(); + for (Object[] row: rows) { + String group = (String) row[0]; + if (!order.containsKey(group)) { + order.put(group, new ArrayList<>()); + } + order.get(group).add((String) row[1]); + } + return order; + } + + private Map<String, ArrayList<Long>> getExactOrderedLongGroups() { + String query = String.format( + "SELECT %1$s, %2$s, COUNT(1) FROM %3$s GROUP BY 1,2 ORDER BY 3 DESC", + GROUP_BY_COLUMN, LONG_COLUMN, TABLE_NAME); + BrokerResponse resp = getBrokerResponse(query); + ResultTable results = resp.getResultTable(); + List<Object[]> rows = results.getRows(); + Map<String, ArrayList<Long>> order = new HashMap<>(); + for (Object[] row: rows) { + String group = (String) row[0]; + if (!order.containsKey(group)) { + order.put(group, new ArrayList<>()); + } + order.get(group).add((Long) row[1]); + } + return order; + } + + private void assertStringsSketch(ItemsSketch<String> sketch, List<String> exact) { + String[] arr = new String[exact.size()]; + exact.toArray(arr); + assertStringsSketch(sketch, arr); + } + + private void assertStringsSketch(ItemsSketch<String> sketch, String[] exact) { + ItemsSketch.Row[] items = sketch.getFrequentItems(ErrorType.NO_FALSE_NEGATIVES); + assertEquals(exact.length, items.length); + for (int i = 0; i < exact.length; i++) { + assertEquals((String) items[i].getItem(), exact[i]); + } + } + + private void assertLongsSketch(LongsSketch sketch, List<Long> exact) { + Long[] arr = new Long[exact.size()]; + exact.toArray(arr); + assertLongsSketch(sketch, arr); + } + + private void assertLongsSketch(LongsSketch sketch, Long[] exact) { + LongsSketch.Row[] items = sketch.getFrequentItems(ErrorType.NO_FALSE_NEGATIVES); + assertEquals(exact.length, items.length); + for (int i = 0; i < exact.length; i++) { + assertEquals((Long) items[i].getItem(), exact[i]); + } + } + + private ItemsSketch<String> decodeStringsSketch(String encodedSketch) { + byte[] byteArr = Base64.getDecoder().decode(encodedSketch); + return ItemsSketch.getInstance(Memory.wrap(byteArr), new ArrayOfStringsSerDe()); + } + + private LongsSketch decodeLongsSketch(String encodedSketch) { + byte[] byteArr = Base64.getDecoder().decode(encodedSketch); + return LongsSketch.getInstance(Memory.wrap(byteArr)); + } + + @AfterClass + public void tearDown() { + _indexSegment.destroy(); + FileUtils.deleteQuietly(INDEX_DIR); + } +} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/SerializedFrequentLongsSketch.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/SerializedFrequentLongsSketch.java new file mode 100644 index 0000000000..53124e473b --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/SerializedFrequentLongsSketch.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.customobject; + +import java.util.Base64; +import javax.validation.constraints.NotNull; +import org.apache.datasketches.frequencies.LongsSketch; + + +public class SerializedFrequentLongsSketch implements Comparable<LongsSketch> { + private final LongsSketch _sketch; + + public SerializedFrequentLongsSketch(LongsSketch sketch) { + _sketch = sketch; + } + + @Override + public int compareTo(@NotNull LongsSketch other) { + // There is no well-defined ordering for these sketches + // numActiveItems is just a placeholder, which can be changed later + return _sketch.getNumActiveItems() - other.getNumActiveItems(); + } + + @Override + public String toString() { + return Base64.getEncoder().encodeToString(_sketch.toByteArray()); + } +} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/SerializedFrequentStringsSketch.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/SerializedFrequentStringsSketch.java new file mode 100644 index 0000000000..40f89bc83d --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/SerializedFrequentStringsSketch.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.customobject; + +import java.util.Base64; +import javax.validation.constraints.NotNull; +import org.apache.datasketches.common.ArrayOfStringsSerDe; +import org.apache.datasketches.frequencies.ItemsSketch; + +public class SerializedFrequentStringsSketch implements Comparable<ItemsSketch<String>> { + private final ItemsSketch<String> _sketch; + + public SerializedFrequentStringsSketch(ItemsSketch<String> sketch) { + _sketch = sketch; + } + + @Override + public int compareTo(@NotNull ItemsSketch<String> other) { + // There is no well-defined ordering for these sketches + // numActiveItems is just a placeholder, which can be changed later + return _sketch.getNumActiveItems() - other.getNumActiveItems(); + } + + @Override + public String toString() { + return Base64.getEncoder().encodeToString(_sketch.toByteArray(new ArrayOfStringsSerDe())); + } +} diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java index 4ebc95926c..4ac3b32af9 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java @@ -190,6 +190,9 @@ public enum AggregationFunctionType { SqlKind.OTHER_FUNCTION, SqlFunctionCategory.USER_DEFINED_FUNCTION, OperandTypes.BINARY, ReturnTypes.BIGINT, ReturnTypes.explicit(SqlTypeName.OTHER)), + FREQUENTSTRINGSSKETCH("frequentStringsSketch"), + FREQUENTLONGSSKETCH("frequentLongsSketch"), + // Geo aggregation functions STUNION("STUnion", ImmutableList.of("ST_UNION"), SqlKind.OTHER_FUNCTION, SqlFunctionCategory.USER_DEFINED_FUNCTION, OperandTypes.BINARY, ReturnTypes.explicit(SqlTypeName.VARBINARY), ReturnTypes.explicit(SqlTypeName.OTHER)), --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org