JackieTien97 commented on code in PR #15338:
URL: https://github.com/apache/iotdb/pull/15338#discussion_r2050416262
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/HyperLogLogState.java:
##########
@@ -0,0 +1,7 @@
+package
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation;
+
+public interface HyperLogLogState {
Review Comment:
what's this used for?
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/HyperLogLog.java:
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation;
+
+import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
+
+import com.google.common.base.Preconditions;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
+import org.apache.tsfile.common.conf.TSFileConfig;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.BytesUtils;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+
+import static org.apache.iotdb.rpc.TSStatusCode.NUMERIC_VALUE_OUT_OF_RANGE;
+
+public class HyperLogLog {
+ private final int[] registers;
+ // Number of registers
+ private final int m;
+ // Number of bits used for register indexing
+ private final int b;
+ private final double maxStandardError;
Review Comment:
```suggestion
```
It seems that we don't need this field
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/HyperLogLog.java:
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation;
+
+import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
+
+import com.google.common.base.Preconditions;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
+import org.apache.tsfile.common.conf.TSFileConfig;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.BytesUtils;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+
+import static org.apache.iotdb.rpc.TSStatusCode.NUMERIC_VALUE_OUT_OF_RANGE;
+
+public class HyperLogLog {
+ private final int[] registers;
+ // Number of registers
+ private final int m;
+ // Number of bits used for register indexing
+ private final int b;
+ private final double maxStandardError;
+ // Alpha constant for bias correction
+ private final double alpha;
+
+ private static final HashFunction hashFunction = Hashing.murmur3_128();
+
+ public static final double DEFAULT_STANDARD_ERROR = 0.023;
+ private static final double LOWEST_MAX_STANDARD_ERROR = 0.0040625;
+ private static final double HIGHEST_MAX_STANDARD_ERROR = 0.26000;
+
+ /**
+ * Constructs a HyperLogLog with the given precision.
+ *
+ * @param precision The precision parameter (4 <= precision <= 16)
+ */
+ public HyperLogLog(double maxStandardError) {
+ int buckets = standardErrorToBuckets(maxStandardError);
+ int precision = indexBitLength(buckets);
+
+ this.maxStandardError = maxStandardError;
+ this.b = precision;
+ // m = 2^precision, buckets
+ this.m = buckets;
+ this.registers = new int[m];
+
+ // Set alpha based on precision
+ this.alpha = getAlpha(precision, m);
+ }
+
+ public HyperLogLog(byte[] bytes) {
+ // 反序列化
+
+ this.b = BytesUtils.bytesToInt(bytes, 0);
+ this.m = BytesUtils.bytesToInt(bytes, 4);
+ this.maxStandardError = BytesUtils.bytesToDouble(bytes, 8);
+
+ this.registers = new int[m];
+ for (int i = 0; i < m; i++) {
+ int baseIndex = 16 + i * 4;
+ registers[i] =
+ (bytes[baseIndex] & 0xFF)
+ | (bytes[baseIndex + 1] & 0xFF) << 8
+ | (bytes[baseIndex + 2] & 0xFF) << 16
+ | (bytes[baseIndex + 3] & 0xFF) << 24;
+ }
+ this.alpha = getAlpha(b, m);
Review Comment:
```suggestion
// deserialize
ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
this.b = ReadWriteIOUtils.readInt(byteBuffer);
this.m = ReadWriteIOUtils.readInt(byteBuffer);
this.registers = new int[m];
for (int i = 0; i < m; i++) {
registers[i] = ReadWriteIOUtils.readInt(byteBuffer);
}
this.alpha = getAlpha(b, m);
```
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/GroupedApproxCountDistinctAccumulator.java:
##########
@@ -0,0 +1,272 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped;
+
+import
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.AggregationMask;
+import
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.HyperLogLog;
+import
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.HyperLogLogStateFactory;
+import
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.array.ObjectBigArray;
+
+import org.apache.tsfile.block.column.Column;
+import org.apache.tsfile.block.column.ColumnBuilder;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.RamUsageEstimator;
+import org.apache.tsfile.write.UnSupportedDataTypeException;
+
+public class GroupedApproxCountDistinctAccumulator implements
GroupedAccumulator {
+ private static final long INSTANCE_SIZE =
+
RamUsageEstimator.shallowSizeOfInstance(GroupedApproxCountDistinctAccumulator.class);
+ private final TSDataType seriesDataType;
+
+ private final HyperLogLogStateFactory.GroupedHyperLogLogState state =
Review Comment:
You should create a class named HyperLogLogBigArray, you can refer to
BinaryBigArray
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java:
##########
@@ -630,6 +630,19 @@ && isIntegerNumber(argumentTypes.get(2)))) {
}
break;
+ case SqlConstant.APPROX_COUNT_DISTINCT:
+ if (argumentTypes.size() != 1 && argumentTypes.size() != 2) {
+ throw new SemanticException(
+ String.format(
+ "Aggregate functions [%s] should only have two arguments",
functionName));
+ }
+
+ if (argumentTypes.size() == 2 && !DOUBLE.equals(argumentTypes.get(1)))
{
Review Comment:
Second parameter should only be a Literal
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/ApproxCountDistinctAccumulator.java:
##########
@@ -0,0 +1,262 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation;
+
+import org.apache.tsfile.block.column.Column;
+import org.apache.tsfile.block.column.ColumnBuilder;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.statistics.Statistics;
+import org.apache.tsfile.read.common.block.column.BinaryColumnBuilder;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.RamUsageEstimator;
+import org.apache.tsfile.write.UnSupportedDataTypeException;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+public class ApproxCountDistinctAccumulator implements TableAccumulator {
+ private static final long INSTANCE_SIZE =
+
RamUsageEstimator.shallowSizeOfInstance(ApproxCountDistinctAccumulator.class);
+ private final TSDataType seriesDataType;
+ private final HyperLogLogStateFactory.SingleHyperLogLogState state =
+ HyperLogLogStateFactory.createSingleState();
+
+ public ApproxCountDistinctAccumulator(TSDataType seriesDataType) {
+ this.seriesDataType = seriesDataType;
+ }
+
+ @Override
+ public long getEstimatedSize() {
+ return INSTANCE_SIZE;
Review Comment:
you should also add the memory size of `HyperLogLog `
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/ApproxCountDistinctAccumulator.java:
##########
@@ -0,0 +1,262 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation;
+
+import org.apache.tsfile.block.column.Column;
+import org.apache.tsfile.block.column.ColumnBuilder;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.statistics.Statistics;
+import org.apache.tsfile.read.common.block.column.BinaryColumnBuilder;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.RamUsageEstimator;
+import org.apache.tsfile.write.UnSupportedDataTypeException;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+public class ApproxCountDistinctAccumulator implements TableAccumulator {
+ private static final long INSTANCE_SIZE =
+
RamUsageEstimator.shallowSizeOfInstance(ApproxCountDistinctAccumulator.class);
+ private final TSDataType seriesDataType;
+ private final HyperLogLogStateFactory.SingleHyperLogLogState state =
Review Comment:
Don't need to use `SingleHyperLogLogState` to wrap the `HyperLogLog`, you
can refer to `TableModeAccumulator`
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/GroupedApproxCountDistinctAccumulator.java:
##########
@@ -0,0 +1,272 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped;
+
+import
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.AggregationMask;
+import
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.HyperLogLog;
+import
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.HyperLogLogStateFactory;
+import
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.array.ObjectBigArray;
+
+import org.apache.tsfile.block.column.Column;
+import org.apache.tsfile.block.column.ColumnBuilder;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.RamUsageEstimator;
+import org.apache.tsfile.write.UnSupportedDataTypeException;
+
+public class GroupedApproxCountDistinctAccumulator implements
GroupedAccumulator {
+ private static final long INSTANCE_SIZE =
+
RamUsageEstimator.shallowSizeOfInstance(GroupedApproxCountDistinctAccumulator.class);
+ private final TSDataType seriesDataType;
+
+ private final HyperLogLogStateFactory.GroupedHyperLogLogState state =
+ HyperLogLogStateFactory.createGroupedState();
+
+ public GroupedApproxCountDistinctAccumulator(TSDataType seriesDataType) {
+ this.seriesDataType = seriesDataType;
+ }
+
+ @Override
+ public long getEstimatedSize() {
+ ObjectBigArray<HyperLogLog> hlls = state.getHyperLogLogs();
+ return INSTANCE_SIZE + hlls.sizeOf();
+ }
+
+ @Override
+ public void setGroupCount(long groupCount) {
+ ObjectBigArray<HyperLogLog> hlls = state.getHyperLogLogs();
+ hlls.ensureCapacity(groupCount);
+ }
+
+ @Override
+ public void addInput(int[] groupIds, Column[] arguments, AggregationMask
mask) {
+ ObjectBigArray<HyperLogLog> hlls;
+ if (arguments.length == 1) {
+ hlls = HyperLogLogStateFactory.getOrCreateHyperLogLog(state);
+ } else if (arguments.length == 2) {
+ double maxStandardError = arguments[1].getDouble(0);
+ hlls = HyperLogLogStateFactory.getOrCreateHyperLogLog(state,
maxStandardError);
+ } else {
+ throw new IllegalArgumentException(
+ "argument of APPROX_COUNT_DISTINCT should be one column with Max
Standard Error");
+ }
Review Comment:
init it in constrcutor
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/HyperLogLog.java:
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation;
+
+import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
+
+import com.google.common.base.Preconditions;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
+import org.apache.tsfile.common.conf.TSFileConfig;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.BytesUtils;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+
+import static org.apache.iotdb.rpc.TSStatusCode.NUMERIC_VALUE_OUT_OF_RANGE;
+
+public class HyperLogLog {
+ private final int[] registers;
+ // Number of registers
+ private final int m;
+ // Number of bits used for register indexing
+ private final int b;
+ private final double maxStandardError;
+ // Alpha constant for bias correction
+ private final double alpha;
+
+ private static final HashFunction hashFunction = Hashing.murmur3_128();
+
+ public static final double DEFAULT_STANDARD_ERROR = 0.023;
+ private static final double LOWEST_MAX_STANDARD_ERROR = 0.0040625;
+ private static final double HIGHEST_MAX_STANDARD_ERROR = 0.26000;
+
+ /**
+ * Constructs a HyperLogLog with the given precision.
+ *
+ * @param precision The precision parameter (4 <= precision <= 16)
+ */
+ public HyperLogLog(double maxStandardError) {
+ int buckets = standardErrorToBuckets(maxStandardError);
+ int precision = indexBitLength(buckets);
+
+ this.maxStandardError = maxStandardError;
+ this.b = precision;
+ // m = 2^precision, buckets
+ this.m = buckets;
+ this.registers = new int[m];
+
+ // Set alpha based on precision
+ this.alpha = getAlpha(precision, m);
+ }
+
+ public HyperLogLog(byte[] bytes) {
+ // 反序列化
+
+ this.b = BytesUtils.bytesToInt(bytes, 0);
+ this.m = BytesUtils.bytesToInt(bytes, 4);
+ this.maxStandardError = BytesUtils.bytesToDouble(bytes, 8);
+
+ this.registers = new int[m];
+ for (int i = 0; i < m; i++) {
+ int baseIndex = 16 + i * 4;
+ registers[i] =
+ (bytes[baseIndex] & 0xFF)
+ | (bytes[baseIndex + 1] & 0xFF) << 8
+ | (bytes[baseIndex + 2] & 0xFF) << 16
+ | (bytes[baseIndex + 3] & 0xFF) << 24;
+ }
+ this.alpha = getAlpha(b, m);
+ }
+
+ public double getMaxStandardError() {
+ return this.maxStandardError;
+ }
+
+ private static double getAlpha(int precision, int m) {
+ switch (precision) {
+ case 4:
+ return 0.673;
+ case 5:
+ return 0.697;
+ case 6:
+ return 0.709;
+ default:
+ return 0.7213 / (1 + 1.079 / m);
+ }
+ }
+
+ private static boolean isPowerOf2(long value) {
+ Preconditions.checkArgument(value > 0L, "value must be positive");
+ return (value & value - 1L) == 0L;
+ }
+
+ private static int indexBitLength(int numberOfBuckets) {
+ Preconditions.checkArgument(
+ isPowerOf2((long) numberOfBuckets),
Review Comment:
```suggestion
isPowerOf2(numberOfBuckets),
```
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java:
##########
@@ -630,6 +630,19 @@ && isIntegerNumber(argumentTypes.get(2)))) {
}
break;
+ case SqlConstant.APPROX_COUNT_DISTINCT:
+ if (argumentTypes.size() != 1 && argumentTypes.size() != 2) {
+ throw new SemanticException(
+ String.format(
+ "Aggregate functions [%s] should only have two arguments",
functionName));
+ }
+
+ if (argumentTypes.size() == 2 && !DOUBLE.equals(argumentTypes.get(1)))
{
Review Comment:
for second parameter, just need to be number? you can use
`isSupportedMathNumericType` function in this class
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/ApproxCountDistinctAccumulator.java:
##########
@@ -0,0 +1,262 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation;
+
+import org.apache.tsfile.block.column.Column;
+import org.apache.tsfile.block.column.ColumnBuilder;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.statistics.Statistics;
+import org.apache.tsfile.read.common.block.column.BinaryColumnBuilder;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.RamUsageEstimator;
+import org.apache.tsfile.write.UnSupportedDataTypeException;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+public class ApproxCountDistinctAccumulator implements TableAccumulator {
+ private static final long INSTANCE_SIZE =
+
RamUsageEstimator.shallowSizeOfInstance(ApproxCountDistinctAccumulator.class);
+ private final TSDataType seriesDataType;
+ private final HyperLogLogStateFactory.SingleHyperLogLogState state =
+ HyperLogLogStateFactory.createSingleState();
+
+ public ApproxCountDistinctAccumulator(TSDataType seriesDataType) {
+ this.seriesDataType = seriesDataType;
+ }
+
+ @Override
+ public long getEstimatedSize() {
+ return INSTANCE_SIZE;
+ }
+
+ @Override
+ public TableAccumulator copy() {
+ return new ApproxCountDistinctAccumulator(seriesDataType);
+ }
+
+ @Override
+ public void addInput(Column[] arguments, AggregationMask mask) {
+ HyperLogLog hll;
Review Comment:
HyperLogLog hll should be inited in constructor instead of getOrCreate each
time addInput is called
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/HyperLogLog.java:
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation;
+
+import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
+
+import com.google.common.base.Preconditions;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
+import org.apache.tsfile.common.conf.TSFileConfig;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.BytesUtils;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+
+import static org.apache.iotdb.rpc.TSStatusCode.NUMERIC_VALUE_OUT_OF_RANGE;
+
+public class HyperLogLog {
+ private final int[] registers;
+ // Number of registers
+ private final int m;
+ // Number of bits used for register indexing
+ private final int b;
+ private final double maxStandardError;
+ // Alpha constant for bias correction
+ private final double alpha;
+
+ private static final HashFunction hashFunction = Hashing.murmur3_128();
+
+ public static final double DEFAULT_STANDARD_ERROR = 0.023;
+ private static final double LOWEST_MAX_STANDARD_ERROR = 0.0040625;
+ private static final double HIGHEST_MAX_STANDARD_ERROR = 0.26000;
+
+ /**
+ * Constructs a HyperLogLog with the given precision.
+ *
+ * @param precision The precision parameter (4 <= precision <= 16)
+ */
+ public HyperLogLog(double maxStandardError) {
+ int buckets = standardErrorToBuckets(maxStandardError);
+ int precision = indexBitLength(buckets);
+
+ this.maxStandardError = maxStandardError;
+ this.b = precision;
+ // m = 2^precision, buckets
+ this.m = buckets;
+ this.registers = new int[m];
+
+ // Set alpha based on precision
+ this.alpha = getAlpha(precision, m);
+ }
+
+ public HyperLogLog(byte[] bytes) {
+ // 反序列化
+
+ this.b = BytesUtils.bytesToInt(bytes, 0);
+ this.m = BytesUtils.bytesToInt(bytes, 4);
+ this.maxStandardError = BytesUtils.bytesToDouble(bytes, 8);
+
+ this.registers = new int[m];
+ for (int i = 0; i < m; i++) {
+ int baseIndex = 16 + i * 4;
+ registers[i] =
+ (bytes[baseIndex] & 0xFF)
+ | (bytes[baseIndex + 1] & 0xFF) << 8
+ | (bytes[baseIndex + 2] & 0xFF) << 16
+ | (bytes[baseIndex + 3] & 0xFF) << 24;
+ }
+ this.alpha = getAlpha(b, m);
+ }
+
+ public double getMaxStandardError() {
+ return this.maxStandardError;
+ }
+
+ private static double getAlpha(int precision, int m) {
+ switch (precision) {
+ case 4:
+ return 0.673;
+ case 5:
+ return 0.697;
+ case 6:
+ return 0.709;
+ default:
+ return 0.7213 / (1 + 1.079 / m);
+ }
+ }
+
+ private static boolean isPowerOf2(long value) {
+ Preconditions.checkArgument(value > 0L, "value must be positive");
+ return (value & value - 1L) == 0L;
+ }
+
+ private static int indexBitLength(int numberOfBuckets) {
+ Preconditions.checkArgument(
+ isPowerOf2((long) numberOfBuckets),
+ "numberOfBuckets must be a power of 2, actual: %s",
+ numberOfBuckets);
+ return Integer.numberOfTrailingZeros(numberOfBuckets);
+ }
+
+ private static int standardErrorToBuckets(double maxStandardError) {
+ if (maxStandardError <= LOWEST_MAX_STANDARD_ERROR
+ || maxStandardError >= HIGHEST_MAX_STANDARD_ERROR) {
+ throw new IoTDBRuntimeException(
+ String.format(
+ "Max Standard Error must be in [%s, %s]: %s",
+ LOWEST_MAX_STANDARD_ERROR, HIGHEST_MAX_STANDARD_ERROR,
maxStandardError),
+ NUMERIC_VALUE_OUT_OF_RANGE.getStatusCode(),
+ true);
+ }
+ return log2Ceiling((int) Math.ceil(1.04 / (maxStandardError *
maxStandardError)));
+ }
+
+ private static int log2Ceiling(int value) {
+ return Integer.highestOneBit(value - 1) << 1;
+ }
+
+ public void add(boolean value) {
+ offer(hashFunction.hashString(String.valueOf(value),
StandardCharsets.UTF_8).asLong());
+ }
+
+ public void add(int value) {
+ offer(hashFunction.hashInt(value).asLong());
+ }
+
+ public void add(long value) {
+ offer(hashFunction.hashLong(value).asLong());
+ }
+
+ public void add(float value) {
+ offer(hashFunction.hashString(String.valueOf(value),
StandardCharsets.UTF_8).asLong());
+ }
+
+ public void add(double value) {
+ offer(hashFunction.hashString(String.valueOf(value),
StandardCharsets.UTF_8).asLong());
+ }
+
+ public void add(Binary value) {
+ offer(
+ hashFunction
+ .hashString(value.getStringValue(TSFileConfig.STRING_CHARSET),
StandardCharsets.UTF_8)
+ .asLong());
+ }
+
+ /**
+ * Adds a value to the estimator.
+ *
+ * @param value The value to add
+ */
+ public void offer(long hash) {
+ // Compute hash of the value
+
+ // Extract the first b bits for the register index
+ int idx = (int) (hash & (m - 1));
+
+ // Count the number of leading zeros in the remaining bits
+ // Add 1 to get the position of the leftmost 1
+
+ int leadingZeros = Long.numberOfTrailingZeros(hash >>> b) + 1;
+
+ // Update the register if the new value is larger
+ registers[idx] = Math.max(registers[idx], leadingZeros);
+ }
+
+ /**
+ * Returns the estimated cardinality of the data set.
+ *
+ * @return The estimated cardinality
+ */
+ public long cardinality() {
+ double sum = 0;
+ int zeros = 0;
+
+ // Compute the harmonic mean of 2^register[i]
+ for (int i = 0; i < m; i++) {
+ sum += 1.0 / (1 << registers[i]);
+ if (registers[i] == 0) {
+ zeros++;
+ }
+ }
+
+ // Apply bias correction formula
+ double estimate = alpha * m * m / sum;
+
+ // Small range correction
+ if (estimate <= 2.5 * m) {
+ if (zeros > 0) {
+ // Linear counting for small cardinalities
+ return Math.round(m * Math.log((double) m / zeros));
+ }
+ }
+
+ // Large range correction (for values > 2^32 / 30)
+ double maxCardinality = (double) (1L << 32);
+ if (estimate > maxCardinality / 30) {
+ return Math.round(-maxCardinality * Math.log(1 - estimate /
maxCardinality));
+ }
+
+ return Math.round(estimate);
+ }
+
+ /** Resets the estimator. */
+ public void reset() {
+ Arrays.fill(registers, 0);
+ }
+
+ /**
+ * Merges another HyperLogLog instance into this one.
+ *
+ * @param other The other HyperLogLog instance to merge
+ * @throws IllegalArgumentException if the precision doesn't match
+ */
+ public void merge(HyperLogLog other) {
+ // not use currently
+ if (this.m != other.m) {
+ throw new IllegalArgumentException(
+ "Cannot merge HyperLogLog instances with different precision");
+ }
+
+ for (int i = 0; i < m; i++) {
+ registers[i] = Math.max(registers[i], other.registers[i]);
+ }
+ }
+
+ // 序列化
Review Comment:
```suggestion
// serialize
```
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/HyperLogLog.java:
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation;
+
+import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
+
+import com.google.common.base.Preconditions;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
+import org.apache.tsfile.common.conf.TSFileConfig;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.BytesUtils;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+
+import static org.apache.iotdb.rpc.TSStatusCode.NUMERIC_VALUE_OUT_OF_RANGE;
+
+public class HyperLogLog {
+ private final int[] registers;
+ // Number of registers
+ private final int m;
+ // Number of bits used for register indexing
+ private final int b;
+ private final double maxStandardError;
+ // Alpha constant for bias correction
+ private final double alpha;
+
+ private static final HashFunction hashFunction = Hashing.murmur3_128();
+
+ public static final double DEFAULT_STANDARD_ERROR = 0.023;
+ private static final double LOWEST_MAX_STANDARD_ERROR = 0.0040625;
+ private static final double HIGHEST_MAX_STANDARD_ERROR = 0.26000;
+
+ /**
+ * Constructs a HyperLogLog with the given precision.
+ *
+ * @param precision The precision parameter (4 <= precision <= 16)
+ */
+ public HyperLogLog(double maxStandardError) {
+ int buckets = standardErrorToBuckets(maxStandardError);
+ int precision = indexBitLength(buckets);
+
+ this.maxStandardError = maxStandardError;
+ this.b = precision;
+ // m = 2^precision, buckets
+ this.m = buckets;
+ this.registers = new int[m];
+
+ // Set alpha based on precision
+ this.alpha = getAlpha(precision, m);
+ }
+
+ public HyperLogLog(byte[] bytes) {
+ // 反序列化
+
+ this.b = BytesUtils.bytesToInt(bytes, 0);
+ this.m = BytesUtils.bytesToInt(bytes, 4);
+ this.maxStandardError = BytesUtils.bytesToDouble(bytes, 8);
+
+ this.registers = new int[m];
+ for (int i = 0; i < m; i++) {
+ int baseIndex = 16 + i * 4;
+ registers[i] =
+ (bytes[baseIndex] & 0xFF)
+ | (bytes[baseIndex + 1] & 0xFF) << 8
+ | (bytes[baseIndex + 2] & 0xFF) << 16
+ | (bytes[baseIndex + 3] & 0xFF) << 24;
+ }
+ this.alpha = getAlpha(b, m);
+ }
+
+ public double getMaxStandardError() {
+ return this.maxStandardError;
+ }
+
+ private static double getAlpha(int precision, int m) {
+ switch (precision) {
+ case 4:
+ return 0.673;
+ case 5:
+ return 0.697;
+ case 6:
+ return 0.709;
+ default:
+ return 0.7213 / (1 + 1.079 / m);
+ }
+ }
+
+ private static boolean isPowerOf2(long value) {
+ Preconditions.checkArgument(value > 0L, "value must be positive");
+ return (value & value - 1L) == 0L;
+ }
+
+ private static int indexBitLength(int numberOfBuckets) {
+ Preconditions.checkArgument(
+ isPowerOf2((long) numberOfBuckets),
+ "numberOfBuckets must be a power of 2, actual: %s",
+ numberOfBuckets);
+ return Integer.numberOfTrailingZeros(numberOfBuckets);
+ }
+
+ private static int standardErrorToBuckets(double maxStandardError) {
+ if (maxStandardError <= LOWEST_MAX_STANDARD_ERROR
+ || maxStandardError >= HIGHEST_MAX_STANDARD_ERROR) {
+ throw new IoTDBRuntimeException(
+ String.format(
+ "Max Standard Error must be in [%s, %s]: %s",
+ LOWEST_MAX_STANDARD_ERROR, HIGHEST_MAX_STANDARD_ERROR,
maxStandardError),
+ NUMERIC_VALUE_OUT_OF_RANGE.getStatusCode(),
+ true);
+ }
+ return log2Ceiling((int) Math.ceil(1.04 / (maxStandardError *
maxStandardError)));
+ }
+
+ private static int log2Ceiling(int value) {
+ return Integer.highestOneBit(value - 1) << 1;
+ }
+
+ public void add(boolean value) {
+ offer(hashFunction.hashString(String.valueOf(value),
StandardCharsets.UTF_8).asLong());
+ }
+
+ public void add(int value) {
+ offer(hashFunction.hashInt(value).asLong());
+ }
+
+ public void add(long value) {
+ offer(hashFunction.hashLong(value).asLong());
+ }
+
+ public void add(float value) {
+ offer(hashFunction.hashString(String.valueOf(value),
StandardCharsets.UTF_8).asLong());
+ }
+
+ public void add(double value) {
+ offer(hashFunction.hashString(String.valueOf(value),
StandardCharsets.UTF_8).asLong());
+ }
+
+ public void add(Binary value) {
+ offer(
+ hashFunction
+ .hashString(value.getStringValue(TSFileConfig.STRING_CHARSET),
StandardCharsets.UTF_8)
+ .asLong());
+ }
+
+ /**
+ * Adds a value to the estimator.
+ *
+ * @param value The value to add
+ */
+ public void offer(long hash) {
+ // Compute hash of the value
+
+ // Extract the first b bits for the register index
+ int idx = (int) (hash & (m - 1));
+
+ // Count the number of leading zeros in the remaining bits
+ // Add 1 to get the position of the leftmost 1
+
+ int leadingZeros = Long.numberOfTrailingZeros(hash >>> b) + 1;
+
+ // Update the register if the new value is larger
+ registers[idx] = Math.max(registers[idx], leadingZeros);
+ }
+
+ /**
+ * Returns the estimated cardinality of the data set.
+ *
+ * @return The estimated cardinality
+ */
+ public long cardinality() {
+ double sum = 0;
+ int zeros = 0;
+
+ // Compute the harmonic mean of 2^register[i]
+ for (int i = 0; i < m; i++) {
+ sum += 1.0 / (1 << registers[i]);
+ if (registers[i] == 0) {
+ zeros++;
+ }
+ }
+
+ // Apply bias correction formula
+ double estimate = alpha * m * m / sum;
+
+ // Small range correction
+ if (estimate <= 2.5 * m) {
+ if (zeros > 0) {
+ // Linear counting for small cardinalities
+ return Math.round(m * Math.log((double) m / zeros));
+ }
+ }
+
+ // Large range correction (for values > 2^32 / 30)
+ double maxCardinality = (double) (1L << 32);
+ if (estimate > maxCardinality / 30) {
+ return Math.round(-maxCardinality * Math.log(1 - estimate /
maxCardinality));
+ }
+
+ return Math.round(estimate);
+ }
+
+ /** Resets the estimator. */
+ public void reset() {
+ Arrays.fill(registers, 0);
+ }
+
+ /**
+ * Merges another HyperLogLog instance into this one.
+ *
+ * @param other The other HyperLogLog instance to merge
+ * @throws IllegalArgumentException if the precision doesn't match
+ */
+ public void merge(HyperLogLog other) {
+ // not use currently
+ if (this.m != other.m) {
+ throw new IllegalArgumentException(
+ "Cannot merge HyperLogLog instances with different precision");
+ }
+
+ for (int i = 0; i < m; i++) {
+ registers[i] = Math.max(registers[i], other.registers[i]);
+ }
+ }
+
+ // 序列化
+ public byte[] serialize() {
+ int totalBytes = Integer.BYTES * 2 + registers.length * Integer.BYTES +
Double.BYTES;
+ byte[] result = new byte[totalBytes];
+
+ BytesUtils.intToBytes(b, result, 0);
+ BytesUtils.intToBytes(m, result, 4);
+ BytesUtils.doubleToBytes(maxStandardError, result, 8);
+
+ for (int i = 0; i < m; i++) {
+ int baseIndex = 16 + i * 4;
+ int value = registers[i];
+ result[baseIndex] = (byte) value;
+ result[baseIndex + 1] = (byte) (value >> 8);
+ result[baseIndex + 2] = (byte) (value >> 16);
+ result[baseIndex + 3] = (byte) (value >> 24);
+ }
+ return result;
Review Comment:
```suggestion
int totalBytes = Integer.BYTES * 2 + registers.length * Integer.BYTES;
ByteBuffer byteBuffer = ByteBuffer.allocate(totalBytes);
ReadWriteIOUtils.write(b, byteBuffer);
ReadWriteIOUtils.write(m, byteBuffer);
for (int i = 0; i < m; i++) {
ReadWriteIOUtils.write(registers[i], byteBuffer);
}
return byteBuffer.array();
```
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/HyperLogLog.java:
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation;
+
+import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
+
+import com.google.common.base.Preconditions;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
+import org.apache.tsfile.common.conf.TSFileConfig;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.BytesUtils;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+
+import static org.apache.iotdb.rpc.TSStatusCode.NUMERIC_VALUE_OUT_OF_RANGE;
+
+public class HyperLogLog {
+ private final int[] registers;
+ // Number of registers
+ private final int m;
+ // Number of bits used for register indexing
+ private final int b;
+ private final double maxStandardError;
+ // Alpha constant for bias correction
+ private final double alpha;
+
+ private static final HashFunction hashFunction = Hashing.murmur3_128();
+
+ public static final double DEFAULT_STANDARD_ERROR = 0.023;
+ private static final double LOWEST_MAX_STANDARD_ERROR = 0.0040625;
+ private static final double HIGHEST_MAX_STANDARD_ERROR = 0.26000;
+
+ /**
+ * Constructs a HyperLogLog with the given precision.
+ *
+ * @param precision The precision parameter (4 <= precision <= 16)
+ */
+ public HyperLogLog(double maxStandardError) {
+ int buckets = standardErrorToBuckets(maxStandardError);
+ int precision = indexBitLength(buckets);
+
+ this.maxStandardError = maxStandardError;
+ this.b = precision;
+ // m = 2^precision, buckets
+ this.m = buckets;
+ this.registers = new int[m];
+
+ // Set alpha based on precision
+ this.alpha = getAlpha(precision, m);
+ }
+
+ public HyperLogLog(byte[] bytes) {
+ // 反序列化
+
+ this.b = BytesUtils.bytesToInt(bytes, 0);
+ this.m = BytesUtils.bytesToInt(bytes, 4);
+ this.maxStandardError = BytesUtils.bytesToDouble(bytes, 8);
+
+ this.registers = new int[m];
+ for (int i = 0; i < m; i++) {
+ int baseIndex = 16 + i * 4;
+ registers[i] =
+ (bytes[baseIndex] & 0xFF)
+ | (bytes[baseIndex + 1] & 0xFF) << 8
+ | (bytes[baseIndex + 2] & 0xFF) << 16
+ | (bytes[baseIndex + 3] & 0xFF) << 24;
+ }
+ this.alpha = getAlpha(b, m);
+ }
+
+ public double getMaxStandardError() {
+ return this.maxStandardError;
+ }
+
+ private static double getAlpha(int precision, int m) {
+ switch (precision) {
+ case 4:
+ return 0.673;
+ case 5:
+ return 0.697;
+ case 6:
+ return 0.709;
+ default:
+ return 0.7213 / (1 + 1.079 / m);
+ }
+ }
+
+ private static boolean isPowerOf2(long value) {
+ Preconditions.checkArgument(value > 0L, "value must be positive");
+ return (value & value - 1L) == 0L;
+ }
+
+ private static int indexBitLength(int numberOfBuckets) {
+ Preconditions.checkArgument(
+ isPowerOf2((long) numberOfBuckets),
+ "numberOfBuckets must be a power of 2, actual: %s",
+ numberOfBuckets);
+ return Integer.numberOfTrailingZeros(numberOfBuckets);
+ }
+
+ private static int standardErrorToBuckets(double maxStandardError) {
+ if (maxStandardError <= LOWEST_MAX_STANDARD_ERROR
+ || maxStandardError >= HIGHEST_MAX_STANDARD_ERROR) {
+ throw new IoTDBRuntimeException(
+ String.format(
+ "Max Standard Error must be in [%s, %s]: %s",
+ LOWEST_MAX_STANDARD_ERROR, HIGHEST_MAX_STANDARD_ERROR,
maxStandardError),
+ NUMERIC_VALUE_OUT_OF_RANGE.getStatusCode(),
+ true);
+ }
+ return log2Ceiling((int) Math.ceil(1.04 / (maxStandardError *
maxStandardError)));
+ }
+
+ private static int log2Ceiling(int value) {
+ return Integer.highestOneBit(value - 1) << 1;
+ }
+
+ public void add(boolean value) {
+ offer(hashFunction.hashString(String.valueOf(value),
StandardCharsets.UTF_8).asLong());
+ }
+
+ public void add(int value) {
+ offer(hashFunction.hashInt(value).asLong());
+ }
+
+ public void add(long value) {
+ offer(hashFunction.hashLong(value).asLong());
+ }
+
+ public void add(float value) {
+ offer(hashFunction.hashString(String.valueOf(value),
StandardCharsets.UTF_8).asLong());
+ }
+
+ public void add(double value) {
+ offer(hashFunction.hashString(String.valueOf(value),
StandardCharsets.UTF_8).asLong());
+ }
+
+ public void add(Binary value) {
+ offer(
+ hashFunction
+ .hashString(value.getStringValue(TSFileConfig.STRING_CHARSET),
StandardCharsets.UTF_8)
Review Comment:
```suggestion
.hashBytes(value.getValues())
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]