jt2594838 commented on code in PR #307:
URL: https://github.com/apache/tsfile/pull/307#discussion_r1851602309
##########
java/common/src/main/java/org/apache/tsfile/utils/BitMap.java:
##########
@@ -147,6 +165,35 @@ public boolean equals(Object obj) {
return this.size == other.size && Arrays.equals(this.bits, other.bits);
}
+ public boolean equalsInRange(Object obj, int rangeSize) {
+ if (obj == this) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (!(obj instanceof BitMap)) {
+ return false;
+ }
+ BitMap other = (BitMap) obj;
+ // implement
+
+ int byteSize = rangeSize / Byte.SIZE;
+ for (int i = 0; i < byteSize; i++) {
+ if (this.bits[i] != other.bits[i]) {
+ return false;
+ }
+ }
+ int remainingBits = rangeSize % Byte.SIZE;
+ if (remainingBits > 0) {
+ byte mask = (byte) (0xFF >> (Byte.SIZE - remainingBits));
+ if ((this.bits[byteSize] & mask) != (other.bits[byteSize] & mask)) {
+ return false;
+ }
+ }
+ return true;
+ }
Review Comment:
If other.length < this.length, ArrayIndexOutOfRangeException may be thrown.
How about Arrays.compare to replace the first loop?
##########
java/common/src/main/java/org/apache/tsfile/utils/BitMap.java:
##########
@@ -147,6 +165,35 @@ public boolean equals(Object obj) {
return this.size == other.size && Arrays.equals(this.bits, other.bits);
}
+ public boolean equalsInRange(Object obj, int rangeSize) {
+ if (obj == this) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (!(obj instanceof BitMap)) {
+ return false;
+ }
+ BitMap other = (BitMap) obj;
+ // implement
Review Comment:
implement what?
##########
java/tsfile/src/main/java/org/apache/tsfile/read/v4/DeviceTableModelReader.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tsfile.read.v4;
+
+import org.apache.tsfile.common.TsFileApi;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.exception.read.ReadProcessException;
+import org.apache.tsfile.exception.write.NoMeasurementException;
+import org.apache.tsfile.exception.write.NoTableException;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.MetadataIndexNode;
+import org.apache.tsfile.file.metadata.TableSchema;
+import org.apache.tsfile.file.metadata.TsFileMetadata;
+import org.apache.tsfile.read.expression.ExpressionTree;
+import org.apache.tsfile.read.query.dataset.ResultSet;
+import org.apache.tsfile.read.query.dataset.TableResultSet;
+import org.apache.tsfile.read.query.executor.TableQueryExecutor;
+import org.apache.tsfile.read.reader.block.TsBlockReader;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public class DeviceTableModelReader extends CommonModelReader {
+ protected TableQueryExecutor queryExecutor;
+
+ public DeviceTableModelReader(File file) throws IOException {
+ super(file);
+ this.queryExecutor =
+ new TableQueryExecutor(
+ metadataQuerier, chunkLoader,
TableQueryExecutor.TableQueryOrdering.DEVICE);
+ }
+
+ @TsFileApi
+ public List<String> getAllTables() throws IOException {
+ Map<String, TableSchema> tableSchemaMap =
fileReader.readFileMetadata().getTableSchemaMap();
+ return new ArrayList<>(tableSchemaMap.keySet());
+ }
+
+ @TsFileApi
+ public List<IDeviceID> getAllTableDevices(String tableName) throws
IOException {
+ MetadataIndexNode tableMetadataIndexNode =
+ fileReader.readFileMetadata().getTableMetadataIndexNode(tableName);
+ if (tableMetadataIndexNode == null) {
+ return Collections.emptyList();
+ }
+ return fileReader.getAllDevices(tableMetadataIndexNode);
+ }
+
+ @TsFileApi
+ public List<TableSchema> getTableSchema(List<String> tableNames) throws
IOException {
+ TsFileMetadata tsFileMetadata = fileReader.readFileMetadata();
+ Map<String, TableSchema> tableSchemaMap =
tsFileMetadata.getTableSchemaMap();
+ List<TableSchema> result = new ArrayList<>(tableNames.size());
+ for (String tableName : tableNames) {
+ result.add(tableSchemaMap.get(tableName));
+ }
+ return result;
+ }
+
+ @TsFileApi
+ public ResultSet queryTable(
Review Comment:
`query` is enough here.
##########
java/tsfile/src/main/java/org/apache/tsfile/read/v4/PointTreeModelReader.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tsfile.read.v4;
+
+import org.apache.tsfile.common.TsFileApi;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.tsfile.read.common.Path;
+import org.apache.tsfile.read.common.TimeSeries;
+import org.apache.tsfile.read.expression.QueryExpression;
+import org.apache.tsfile.read.expression.impl.GlobalTimeExpression;
+import org.apache.tsfile.read.filter.operator.TimeFilterOperators;
+import org.apache.tsfile.read.query.dataset.ResultSet;
+import org.apache.tsfile.read.query.dataset.TreeResultSet;
+import org.apache.tsfile.read.query.executor.TsFileExecutor;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class PointTreeModelReader extends CommonModelReader {
+
+ protected TsFileExecutor queryExecutor;
+
+ public PointTreeModelReader(File file) throws IOException {
+ super(file);
+ this.queryExecutor = new TsFileExecutor(metadataQuerier, chunkLoader);
+ }
+
+ @TsFileApi
+ public List<IMeasurementSchema> getTimeseriesSchema(IDeviceID deviceId)
throws IOException {
+ List<TimeseriesMetadata> deviceTimeseriesMetadata =
+ fileReader.getDeviceTimeseriesMetadataWithoutChunkMetadata(deviceId);
+ List<IMeasurementSchema> measurementSchemaList = new ArrayList<>();
+ for (TimeseriesMetadata timeseriesMetadata : deviceTimeseriesMetadata) {
+ measurementSchemaList.add(
+ new MeasurementSchema(
+ timeseriesMetadata.getMeasurementId(),
timeseriesMetadata.getTsDataType()));
+ }
+ return measurementSchemaList;
+ }
+
+ @TsFileApi
+ public ResultSet query(List<TimeSeries> pathList, long startTime, long
endTime)
Review Comment:
pathList -> timeSeriesList
##########
java/tsfile/src/main/java/org/apache/tsfile/write/v4/AbstractTableModelTsFileWriter.java:
##########
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tsfile.write.v4;
+
+import org.apache.tsfile.common.TsFileApi;
+import org.apache.tsfile.common.conf.TSFileConfig;
+import org.apache.tsfile.common.conf.TSFileDescriptor;
+import org.apache.tsfile.encrypt.EncryptParameter;
+import org.apache.tsfile.encrypt.IEncryptor;
+import org.apache.tsfile.exception.encrypt.EncryptException;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.write.chunk.AlignedChunkGroupWriterImpl;
+import org.apache.tsfile.write.chunk.IChunkGroupWriter;
+import org.apache.tsfile.write.chunk.NonAlignedChunkGroupWriterImpl;
+import org.apache.tsfile.write.schema.Schema;
+import org.apache.tsfile.write.writer.TsFileIOWriter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.security.MessageDigest;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+abstract class AbstractTableModelTsFileWriter implements ITsFileWriter {
+
+ protected static final TSFileConfig config =
TSFileDescriptor.getInstance().getConfig();
+ protected static final Logger LOG =
LoggerFactory.getLogger(AbstractTableModelTsFileWriter.class);
+
+ /** IO writer of this TsFile. */
+ protected final TsFileIOWriter fileWriter;
+
+ protected EncryptParameter encryptParam;
+
+ protected final int pageSize;
+ protected long recordCount = 0;
+
+ // deviceId -> measurementIdList
+ protected Map<IDeviceID, List<String>> flushedMeasurementsInDeviceMap = new
HashMap<>();
+
+ // DeviceId -> LastTime
+ protected Map<IDeviceID, Long> alignedDeviceLastTimeMap = new HashMap<>();
+
+ // TimeseriesId -> LastTime
+ protected Map<IDeviceID, Map<String, Long>> nonAlignedTimeseriesLastTimeMap
= new HashMap<>();
+
+ protected Map<IDeviceID, IChunkGroupWriter> groupWriters = new TreeMap<>();
+
+ /** min value of threshold of data points num check. */
+ protected long recordCountForNextMemCheck = 100;
+
+ protected long chunkGroupSizeThreshold;
+
+ /**
+ * init this Writer.
+ *
+ * @param file the File to be written by this TsFileWriter
+ */
+ @TsFileApi
+ protected AbstractTableModelTsFileWriter(File file, long
chunkGroupSizeThreshold)
+ throws IOException {
+ Schema schema = new Schema();
+ TSFileConfig conf = TSFileDescriptor.getInstance().getConfig();
+ this.fileWriter = new TsFileIOWriter(file);
+ fileWriter.setSchema(schema);
+
+ this.pageSize = conf.getPageSizeInByte();
+ this.chunkGroupSizeThreshold = chunkGroupSizeThreshold;
+ config.setTSFileStorageFs(conf.getTSFileStorageFs());
+ if (this.pageSize >= chunkGroupSizeThreshold) {
+ LOG.warn(
+ "TsFile's page size {} is greater than chunk group size {}, please
enlarge the chunk group"
+ + " size or decrease page size. ",
+ pageSize,
+ chunkGroupSizeThreshold);
+ }
+
+ String encryptLevel;
+ byte[] encryptKey;
+ byte[] dataEncryptKey;
+ String encryptType;
+ if (config.getEncryptFlag()) {
+ encryptLevel = "2";
+ encryptType = config.getEncryptType();
+ try {
+ MessageDigest md = MessageDigest.getInstance("SHA-256");
+ md.update("IoTDB is the best".getBytes());
+ md.update(config.getEncryptKey().getBytes());
+ dataEncryptKey = Arrays.copyOfRange(md.digest(), 0, 16);
+ encryptKey =
+ IEncryptor.getEncryptor(config.getEncryptType(),
config.getEncryptKey().getBytes())
+ .encrypt(dataEncryptKey);
+ } catch (Exception e) {
+ throw new EncryptException(
+ "SHA-256 function not found while using SHA-256 to generate data
key");
+ }
+ } else {
+ encryptLevel = "0";
+ encryptType = "org.apache.tsfile.encrypt.UNENCRYPTED";
+ encryptKey = null;
+ dataEncryptKey = null;
+ }
+ this.encryptParam = new EncryptParameter(encryptType, dataEncryptKey);
+ if (encryptKey != null) {
+ StringBuilder valueStr = new StringBuilder();
+
+ for (byte b : encryptKey) {
+ valueStr.append(b).append(",");
+ }
+
+ valueStr.deleteCharAt(valueStr.length() - 1);
+ String str = valueStr.toString();
+
+ fileWriter.setEncryptParam(encryptLevel, encryptType, str);
+ } else {
+ fileWriter.setEncryptParam(encryptLevel, encryptType, "");
+ }
+ }
+
+ protected IChunkGroupWriter tryToInitialGroupWriter(IDeviceID deviceId,
boolean isAligned) {
+ IChunkGroupWriter groupWriter = groupWriters.get(deviceId);
+ if (groupWriter == null) {
+ if (isAligned) {
+ groupWriter = new AlignedChunkGroupWriterImpl(deviceId, encryptParam);
+ ((AlignedChunkGroupWriterImpl) groupWriter)
+ .setLastTime(alignedDeviceLastTimeMap.get(deviceId));
+ } else {
+ groupWriter = new NonAlignedChunkGroupWriterImpl(deviceId,
encryptParam);
+ ((NonAlignedChunkGroupWriterImpl) groupWriter)
+ .setLastTimeMap(
+ nonAlignedTimeseriesLastTimeMap.getOrDefault(deviceId, new
HashMap<>()));
+ }
+ groupWriters.put(deviceId, groupWriter);
+ }
+ return groupWriter;
+ }
+
+ /**
+ * calculate total memory size occupied by allT ChunkGroupWriter instances
currently.
+ *
+ * @return total memory size used
+ */
+ protected long calculateMemSizeForAllGroup() {
+ long memTotalSize = 0;
+ for (IChunkGroupWriter group : groupWriters.values()) {
+ memTotalSize += group.updateMaxGroupMemSize();
+ }
+ return memTotalSize;
+ }
+
+ /**
+ * check occupied memory size, if it exceeds the chunkGroupSize threshold,
flush them to given
+ * OutputStream.
+ *
+ * @return true - size of tsfile or metadata reaches the threshold. false -
otherwise
+ * @throws IOException exception in IO
+ */
+ protected boolean checkMemorySizeAndMayFlushChunks() throws IOException {
+ if (recordCount >= recordCountForNextMemCheck) {
+ long memSize = calculateMemSizeForAllGroup();
+ assert memSize > 0;
+ if (memSize > chunkGroupSizeThreshold) {
+ LOG.debug("start to flush chunk groups, memory space occupy:{}",
memSize);
+ recordCountForNextMemCheck = recordCount * chunkGroupSizeThreshold /
memSize;
+ return flush();
+ } else {
+ recordCountForNextMemCheck = recordCount * chunkGroupSizeThreshold /
memSize;
+ return false;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * flush the data in all series writers of all chunk group writers and their
page writers to
+ * outputStream.
+ *
+ * @return true - size of tsfile or metadata reaches the threshold. false -
otherwise. But this
+ * function just return false, the Override of IoTDB may return true.
Review Comment:
Why cannot this method return true?
##########
java/tsfile/src/main/java/org/apache/tsfile/write/v4/AbstractTableModelTsFileWriter.java:
##########
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tsfile.write.v4;
+
+import org.apache.tsfile.common.TsFileApi;
+import org.apache.tsfile.common.conf.TSFileConfig;
+import org.apache.tsfile.common.conf.TSFileDescriptor;
+import org.apache.tsfile.encrypt.EncryptParameter;
+import org.apache.tsfile.encrypt.IEncryptor;
+import org.apache.tsfile.exception.encrypt.EncryptException;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.write.chunk.AlignedChunkGroupWriterImpl;
+import org.apache.tsfile.write.chunk.IChunkGroupWriter;
+import org.apache.tsfile.write.chunk.NonAlignedChunkGroupWriterImpl;
+import org.apache.tsfile.write.schema.Schema;
+import org.apache.tsfile.write.writer.TsFileIOWriter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.security.MessageDigest;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+abstract class AbstractTableModelTsFileWriter implements ITsFileWriter {
+
+ protected static final TSFileConfig config =
TSFileDescriptor.getInstance().getConfig();
+ protected static final Logger LOG =
LoggerFactory.getLogger(AbstractTableModelTsFileWriter.class);
+
+ /** IO writer of this TsFile. */
+ protected final TsFileIOWriter fileWriter;
+
+ protected EncryptParameter encryptParam;
+
+ protected final int pageSize;
+ protected long recordCount = 0;
+
+ // deviceId -> measurementIdList
+ protected Map<IDeviceID, List<String>> flushedMeasurementsInDeviceMap = new
HashMap<>();
+
+ // DeviceId -> LastTime
+ protected Map<IDeviceID, Long> alignedDeviceLastTimeMap = new HashMap<>();
+
+ // TimeseriesId -> LastTime
+ protected Map<IDeviceID, Map<String, Long>> nonAlignedTimeseriesLastTimeMap
= new HashMap<>();
+
+ protected Map<IDeviceID, IChunkGroupWriter> groupWriters = new TreeMap<>();
+
+ /** min value of threshold of data points num check. */
+ protected long recordCountForNextMemCheck = 100;
+
+ protected long chunkGroupSizeThreshold;
+
+ /**
+ * init this Writer.
+ *
+ * @param file the File to be written by this TsFileWriter
+ */
+ @TsFileApi
+ protected AbstractTableModelTsFileWriter(File file, long
chunkGroupSizeThreshold)
+ throws IOException {
+ Schema schema = new Schema();
+ TSFileConfig conf = TSFileDescriptor.getInstance().getConfig();
+ this.fileWriter = new TsFileIOWriter(file);
+ fileWriter.setSchema(schema);
+
+ this.pageSize = conf.getPageSizeInByte();
+ this.chunkGroupSizeThreshold = chunkGroupSizeThreshold;
+ config.setTSFileStorageFs(conf.getTSFileStorageFs());
+ if (this.pageSize >= chunkGroupSizeThreshold) {
+ LOG.warn(
+ "TsFile's page size {} is greater than chunk group size {}, please
enlarge the chunk group"
+ + " size or decrease page size. ",
+ pageSize,
+ chunkGroupSizeThreshold);
+ }
+
+ String encryptLevel;
+ byte[] encryptKey;
+ byte[] dataEncryptKey;
+ String encryptType;
+ if (config.getEncryptFlag()) {
+ encryptLevel = "2";
+ encryptType = config.getEncryptType();
+ try {
+ MessageDigest md = MessageDigest.getInstance("SHA-256");
+ md.update("IoTDB is the best".getBytes());
+ md.update(config.getEncryptKey().getBytes());
+ dataEncryptKey = Arrays.copyOfRange(md.digest(), 0, 16);
+ encryptKey =
+ IEncryptor.getEncryptor(config.getEncryptType(),
config.getEncryptKey().getBytes())
+ .encrypt(dataEncryptKey);
+ } catch (Exception e) {
+ throw new EncryptException(
+ "SHA-256 function not found while using SHA-256 to generate data
key");
+ }
+ } else {
+ encryptLevel = "0";
+ encryptType = "org.apache.tsfile.encrypt.UNENCRYPTED";
+ encryptKey = null;
+ dataEncryptKey = null;
+ }
+ this.encryptParam = new EncryptParameter(encryptType, dataEncryptKey);
+ if (encryptKey != null) {
+ StringBuilder valueStr = new StringBuilder();
+
+ for (byte b : encryptKey) {
+ valueStr.append(b).append(",");
+ }
+
+ valueStr.deleteCharAt(valueStr.length() - 1);
+ String str = valueStr.toString();
+
+ fileWriter.setEncryptParam(encryptLevel, encryptType, str);
+ } else {
+ fileWriter.setEncryptParam(encryptLevel, encryptType, "");
+ }
+ }
+
+ protected IChunkGroupWriter tryToInitialGroupWriter(IDeviceID deviceId,
boolean isAligned) {
+ IChunkGroupWriter groupWriter = groupWriters.get(deviceId);
+ if (groupWriter == null) {
+ if (isAligned) {
+ groupWriter = new AlignedChunkGroupWriterImpl(deviceId, encryptParam);
+ ((AlignedChunkGroupWriterImpl) groupWriter)
+ .setLastTime(alignedDeviceLastTimeMap.get(deviceId));
+ } else {
+ groupWriter = new NonAlignedChunkGroupWriterImpl(deviceId,
encryptParam);
+ ((NonAlignedChunkGroupWriterImpl) groupWriter)
+ .setLastTimeMap(
+ nonAlignedTimeseriesLastTimeMap.getOrDefault(deviceId, new
HashMap<>()));
+ }
+ groupWriters.put(deviceId, groupWriter);
+ }
+ return groupWriter;
+ }
+
+ /**
+ * calculate total memory size occupied by allT ChunkGroupWriter instances
currently.
Review Comment:
allT -> all
##########
java/tsfile/src/test/java/org/apache/tsfile/read/TsFileV4ReadWriteInterfacesTest.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tsfile.read;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.StringArrayDeviceID;
+import org.apache.tsfile.file.metadata.TableSchema;
+import org.apache.tsfile.read.v4.DeviceTableModelReader;
+import org.apache.tsfile.utils.TsFileGeneratorForTest;
+import org.apache.tsfile.utils.TsFileGeneratorUtils;
+import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.apache.tsfile.write.v4.ITsFileWriter;
+import org.apache.tsfile.write.v4.TsFileWriterBuilder;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class TsFileV4ReadWriteInterfacesTest {
+
+ @Test
+ public void testGetTableDeviceMethods() throws Exception {
+ String filePath = TsFileGeneratorForTest.getTestTsFilePath("root.testsg",
0, 0, 0);
+ try {
+ File file = TsFileGeneratorUtils.generateAlignedTsFile(filePath, 5, 1,
10, 1, 1, 10, 100);
Review Comment:
Better to name the numbers with some variables or add some comment,
otherwise, it is really difficult to figure out the test scenario.
##########
java/tsfile/src/main/java/org/apache/tsfile/file/metadata/ColumnSchemaBuilder.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tsfile.file.metadata;
+
+import org.apache.tsfile.common.TsFileApi;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.write.record.Tablet.ColumnCategory;
+
+public class ColumnSchemaBuilder {
+
+ private String columnName;
+ private TSDataType columnDataType;
+ private ColumnCategory columnCategory = ColumnCategory.MEASUREMENT;
+
+ @TsFileApi
+ public ColumnSchema build() {
+ validateNameParameters();
+ return new ColumnSchema(columnName, columnDataType, columnCategory);
+ }
+
+ @TsFileApi
+ public ColumnSchemaBuilder name(String columnName) {
+ this.columnName = columnName;
+ return this;
+ }
+
+ @TsFileApi
+ public ColumnSchemaBuilder dataType(TSDataType columnType) {
+ this.columnDataType = columnType;
+ return this;
+ }
+
+ @TsFileApi
+ public ColumnSchemaBuilder category(ColumnCategory columnCategory) {
+ this.columnCategory = columnCategory;
+ return this;
+ }
+
+ private void validateNameParameters() {
+ if (columnName == null || columnName.trim().isEmpty()) {
+ throw new IllegalStateException("Column name must be set before
building");
+ }
Review Comment:
Maybe we can trim in name().
##########
java/tsfile/src/main/java/org/apache/tsfile/file/metadata/ColumnSchemaBuilder.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tsfile.file.metadata;
+
+import org.apache.tsfile.common.TsFileApi;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.write.record.Tablet.ColumnCategory;
+
+public class ColumnSchemaBuilder {
+
+ private String columnName;
+ private TSDataType columnDataType;
+ private ColumnCategory columnCategory = ColumnCategory.MEASUREMENT;
+
+ @TsFileApi
+ public ColumnSchema build() {
+ validateNameParameters();
Review Comment:
"Name" is confusing, "validateParameters" or "validateRequiredParameters"
may be better.
##########
java/tsfile/src/main/java/org/apache/tsfile/write/v4/AbstractTableModelTsFileWriter.java:
##########
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tsfile.write.v4;
+
+import org.apache.tsfile.common.TsFileApi;
+import org.apache.tsfile.common.conf.TSFileConfig;
+import org.apache.tsfile.common.conf.TSFileDescriptor;
+import org.apache.tsfile.encrypt.EncryptParameter;
+import org.apache.tsfile.encrypt.IEncryptor;
+import org.apache.tsfile.exception.encrypt.EncryptException;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.write.chunk.AlignedChunkGroupWriterImpl;
+import org.apache.tsfile.write.chunk.IChunkGroupWriter;
+import org.apache.tsfile.write.chunk.NonAlignedChunkGroupWriterImpl;
+import org.apache.tsfile.write.schema.Schema;
+import org.apache.tsfile.write.writer.TsFileIOWriter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.security.MessageDigest;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+abstract class AbstractTableModelTsFileWriter implements ITsFileWriter {
+
+ protected static final TSFileConfig config =
TSFileDescriptor.getInstance().getConfig();
+ protected static final Logger LOG =
LoggerFactory.getLogger(AbstractTableModelTsFileWriter.class);
+
+ /** IO writer of this TsFile. */
+ protected final TsFileIOWriter fileWriter;
+
+ protected EncryptParameter encryptParam;
+
+ protected final int pageSize;
+ protected long recordCount = 0;
+
+ // deviceId -> measurementIdList
+ protected Map<IDeviceID, List<String>> flushedMeasurementsInDeviceMap = new
HashMap<>();
+
+ // DeviceId -> LastTime
+ protected Map<IDeviceID, Long> alignedDeviceLastTimeMap = new HashMap<>();
+
+ // TimeseriesId -> LastTime
+ protected Map<IDeviceID, Map<String, Long>> nonAlignedTimeseriesLastTimeMap
= new HashMap<>();
+
+ protected Map<IDeviceID, IChunkGroupWriter> groupWriters = new TreeMap<>();
+
+ /** min value of threshold of data points num check. */
+ protected long recordCountForNextMemCheck = 100;
+
+ protected long chunkGroupSizeThreshold;
+
+ /**
+ * init this Writer.
+ *
+ * @param file the File to be written by this TsFileWriter
+ */
+ @TsFileApi
+ protected AbstractTableModelTsFileWriter(File file, long
chunkGroupSizeThreshold)
+ throws IOException {
+ Schema schema = new Schema();
+ TSFileConfig conf = TSFileDescriptor.getInstance().getConfig();
+ this.fileWriter = new TsFileIOWriter(file);
+ fileWriter.setSchema(schema);
+
+ this.pageSize = conf.getPageSizeInByte();
+ this.chunkGroupSizeThreshold = chunkGroupSizeThreshold;
+ config.setTSFileStorageFs(conf.getTSFileStorageFs());
Review Comment:
What is this for?
##########
java/tsfile/src/main/java/org/apache/tsfile/read/v4/DeviceTableModelReader.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tsfile.read.v4;
+
+import org.apache.tsfile.common.TsFileApi;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.exception.read.ReadProcessException;
+import org.apache.tsfile.exception.write.NoMeasurementException;
+import org.apache.tsfile.exception.write.NoTableException;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.MetadataIndexNode;
+import org.apache.tsfile.file.metadata.TableSchema;
+import org.apache.tsfile.file.metadata.TsFileMetadata;
+import org.apache.tsfile.read.expression.ExpressionTree;
+import org.apache.tsfile.read.query.dataset.ResultSet;
+import org.apache.tsfile.read.query.dataset.TableResultSet;
+import org.apache.tsfile.read.query.executor.TableQueryExecutor;
+import org.apache.tsfile.read.reader.block.TsBlockReader;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public class DeviceTableModelReader extends CommonModelReader {
+ protected TableQueryExecutor queryExecutor;
+
+ public DeviceTableModelReader(File file) throws IOException {
+ super(file);
+ this.queryExecutor =
+ new TableQueryExecutor(
+ metadataQuerier, chunkLoader,
TableQueryExecutor.TableQueryOrdering.DEVICE);
+ }
+
+ @TsFileApi
+ public List<String> getAllTables() throws IOException {
+ Map<String, TableSchema> tableSchemaMap =
fileReader.readFileMetadata().getTableSchemaMap();
+ return new ArrayList<>(tableSchemaMap.keySet());
+ }
+
+ @TsFileApi
+ public List<IDeviceID> getAllTableDevices(String tableName) throws
IOException {
+ MetadataIndexNode tableMetadataIndexNode =
+ fileReader.readFileMetadata().getTableMetadataIndexNode(tableName);
+ if (tableMetadataIndexNode == null) {
+ return Collections.emptyList();
+ }
+ return fileReader.getAllDevices(tableMetadataIndexNode);
+ }
+
+ @TsFileApi
+ public List<TableSchema> getTableSchema(List<String> tableNames) throws
IOException {
+ TsFileMetadata tsFileMetadata = fileReader.readFileMetadata();
+ Map<String, TableSchema> tableSchemaMap =
tsFileMetadata.getTableSchemaMap();
+ List<TableSchema> result = new ArrayList<>(tableNames.size());
+ for (String tableName : tableNames) {
+ result.add(tableSchemaMap.get(tableName));
+ }
Review Comment:
How about `result.addAll(tableSchemaMap.values())`
##########
java/tsfile/src/main/java/org/apache/tsfile/file/metadata/TableSchema.java:
##########
@@ -42,7 +43,7 @@ public class TableSchema {
// the tableName is not serialized since the TableSchema is always stored in
a Map, from whose
// key the tableName can be known
protected String tableName;
- protected List<IMeasurementSchema> columnSchemas;
+ protected List<IMeasurementSchema> measurementSchemas;
protected List<ColumnCategory> columnCategories;
Review Comment:
Why change it back?
##########
java/tsfile/src/main/java/org/apache/tsfile/write/v4/DeviceTableModelWriter.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tsfile.write.v4;
+
+import org.apache.tsfile.common.TsFileApi;
+import org.apache.tsfile.exception.write.ConflictDataTypeException;
+import org.apache.tsfile.exception.write.NoMeasurementException;
+import org.apache.tsfile.exception.write.NoTableException;
+import org.apache.tsfile.exception.write.WriteProcessException;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.TableSchema;
+import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.utils.WriteUtils;
+import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class DeviceTableModelWriter extends AbstractTableModelTsFileWriter {
+
+ private String tableName;
+ private boolean isTableWriteAligned = true;
+
+ public DeviceTableModelWriter(File file, TableSchema tableSchema, long
memoryThreshold)
+ throws IOException {
+ super(file, memoryThreshold);
+ registerTableSchema(tableSchema);
+ }
+
+ /**
+ * Write the tablet in to the TsFile with the table-view. The method will
try to split the tablet
+ * by device. If you know the device association within the tablet, please
use writeTable(Tablet
+ * tablet, List<Pair<IDeviceID, Integer>> deviceIdEndIndexPairs). One
typical case where the other
+ * method should be used is that all rows in the tablet belong to the same
device.
Review Comment:
The comment does not apply any more since there is only this method.
##########
java/tsfile/src/main/java/org/apache/tsfile/write/v4/DeviceTableModelWriter.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tsfile.write.v4;
+
+import org.apache.tsfile.common.TsFileApi;
+import org.apache.tsfile.exception.write.ConflictDataTypeException;
+import org.apache.tsfile.exception.write.NoMeasurementException;
+import org.apache.tsfile.exception.write.NoTableException;
+import org.apache.tsfile.exception.write.WriteProcessException;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.TableSchema;
+import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.utils.WriteUtils;
+import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class DeviceTableModelWriter extends AbstractTableModelTsFileWriter {
+
+ private String tableName;
+ private boolean isTableWriteAligned = true;
+
+ public DeviceTableModelWriter(File file, TableSchema tableSchema, long
memoryThreshold)
+ throws IOException {
+ super(file, memoryThreshold);
+ registerTableSchema(tableSchema);
+ }
+
+ /**
+ * Write the tablet in to the TsFile with the table-view. The method will
try to split the tablet
+ * by device. If you know the device association within the tablet, please
use writeTable(Tablet
+ * tablet, List<Pair<IDeviceID, Integer>> deviceIdEndIndexPairs). One
typical case where the other
+ * method should be used is that all rows in the tablet belong to the same
device.
+ *
+ * @param table data to write
+ * @return true if a flush is triggered after write, false otherwise
Review Comment:
remove
##########
java/tsfile/src/main/java/org/apache/tsfile/write/v4/AbstractTableModelTsFileWriter.java:
##########
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tsfile.write.v4;
+
+import org.apache.tsfile.common.TsFileApi;
+import org.apache.tsfile.common.conf.TSFileConfig;
+import org.apache.tsfile.common.conf.TSFileDescriptor;
+import org.apache.tsfile.encrypt.EncryptParameter;
+import org.apache.tsfile.encrypt.IEncryptor;
+import org.apache.tsfile.exception.encrypt.EncryptException;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.write.chunk.AlignedChunkGroupWriterImpl;
+import org.apache.tsfile.write.chunk.IChunkGroupWriter;
+import org.apache.tsfile.write.chunk.NonAlignedChunkGroupWriterImpl;
+import org.apache.tsfile.write.schema.Schema;
+import org.apache.tsfile.write.writer.TsFileIOWriter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.security.MessageDigest;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+abstract class AbstractTableModelTsFileWriter implements ITsFileWriter {
+
+ protected static final TSFileConfig config =
TSFileDescriptor.getInstance().getConfig();
+ protected static final Logger LOG =
LoggerFactory.getLogger(AbstractTableModelTsFileWriter.class);
+
+ /** IO writer of this TsFile. */
+ protected final TsFileIOWriter fileWriter;
+
+ protected EncryptParameter encryptParam;
+
+ protected final int pageSize;
+ protected long recordCount = 0;
+
+ // deviceId -> measurementIdList
+ protected Map<IDeviceID, List<String>> flushedMeasurementsInDeviceMap = new
HashMap<>();
+
+ // DeviceId -> LastTime
+ protected Map<IDeviceID, Long> alignedDeviceLastTimeMap = new HashMap<>();
+
+ // TimeseriesId -> LastTime
+ protected Map<IDeviceID, Map<String, Long>> nonAlignedTimeseriesLastTimeMap
= new HashMap<>();
+
+ protected Map<IDeviceID, IChunkGroupWriter> groupWriters = new TreeMap<>();
+
+ /** min value of threshold of data points num check. */
+ protected long recordCountForNextMemCheck = 100;
+
+ protected long chunkGroupSizeThreshold;
+
+ /**
+ * init this Writer.
+ *
+ * @param file the File to be written by this TsFileWriter
+ */
+ @TsFileApi
+ protected AbstractTableModelTsFileWriter(File file, long
chunkGroupSizeThreshold)
+ throws IOException {
+ Schema schema = new Schema();
+ TSFileConfig conf = TSFileDescriptor.getInstance().getConfig();
+ this.fileWriter = new TsFileIOWriter(file);
+ fileWriter.setSchema(schema);
+
+ this.pageSize = conf.getPageSizeInByte();
+ this.chunkGroupSizeThreshold = chunkGroupSizeThreshold;
+ config.setTSFileStorageFs(conf.getTSFileStorageFs());
+ if (this.pageSize >= chunkGroupSizeThreshold) {
+ LOG.warn(
+ "TsFile's page size {} is greater than chunk group size {}, please
enlarge the chunk group"
+ + " size or decrease page size. ",
+ pageSize,
+ chunkGroupSizeThreshold);
+ }
+
+ String encryptLevel;
+ byte[] encryptKey;
+ byte[] dataEncryptKey;
+ String encryptType;
+ if (config.getEncryptFlag()) {
+ encryptLevel = "2";
+ encryptType = config.getEncryptType();
+ try {
+ MessageDigest md = MessageDigest.getInstance("SHA-256");
+ md.update("IoTDB is the best".getBytes());
+ md.update(config.getEncryptKey().getBytes());
+ dataEncryptKey = Arrays.copyOfRange(md.digest(), 0, 16);
+ encryptKey =
+ IEncryptor.getEncryptor(config.getEncryptType(),
config.getEncryptKey().getBytes())
+ .encrypt(dataEncryptKey);
+ } catch (Exception e) {
+ throw new EncryptException(
+ "SHA-256 function not found while using SHA-256 to generate data
key");
+ }
+ } else {
+ encryptLevel = "0";
+ encryptType = "org.apache.tsfile.encrypt.UNENCRYPTED";
+ encryptKey = null;
+ dataEncryptKey = null;
+ }
+ this.encryptParam = new EncryptParameter(encryptType, dataEncryptKey);
+ if (encryptKey != null) {
+ StringBuilder valueStr = new StringBuilder();
+
+ for (byte b : encryptKey) {
+ valueStr.append(b).append(",");
+ }
+
+ valueStr.deleteCharAt(valueStr.length() - 1);
+ String str = valueStr.toString();
+
+ fileWriter.setEncryptParam(encryptLevel, encryptType, str);
+ } else {
+ fileWriter.setEncryptParam(encryptLevel, encryptType, "");
+ }
+ }
+
+ protected IChunkGroupWriter tryToInitialGroupWriter(IDeviceID deviceId,
boolean isAligned) {
+ IChunkGroupWriter groupWriter = groupWriters.get(deviceId);
+ if (groupWriter == null) {
+ if (isAligned) {
+ groupWriter = new AlignedChunkGroupWriterImpl(deviceId, encryptParam);
+ ((AlignedChunkGroupWriterImpl) groupWriter)
+ .setLastTime(alignedDeviceLastTimeMap.get(deviceId));
+ } else {
+ groupWriter = new NonAlignedChunkGroupWriterImpl(deviceId,
encryptParam);
+ ((NonAlignedChunkGroupWriterImpl) groupWriter)
+ .setLastTimeMap(
+ nonAlignedTimeseriesLastTimeMap.getOrDefault(deviceId, new
HashMap<>()));
+ }
+ groupWriters.put(deviceId, groupWriter);
+ }
+ return groupWriter;
+ }
+
+ /**
+ * calculate total memory size occupied by allT ChunkGroupWriter instances
currently.
+ *
+ * @return total memory size used
+ */
+ protected long calculateMemSizeForAllGroup() {
+ long memTotalSize = 0;
+ for (IChunkGroupWriter group : groupWriters.values()) {
+ memTotalSize += group.updateMaxGroupMemSize();
+ }
+ return memTotalSize;
+ }
+
+ /**
+ * check occupied memory size, if it exceeds the chunkGroupSize threshold,
flush them to given
+ * OutputStream.
+ *
+ * @return true - size of tsfile or metadata reaches the threshold. false -
otherwise
+ * @throws IOException exception in IO
+ */
+ protected boolean checkMemorySizeAndMayFlushChunks() throws IOException {
+ if (recordCount >= recordCountForNextMemCheck) {
+ long memSize = calculateMemSizeForAllGroup();
+ assert memSize > 0;
+ if (memSize > chunkGroupSizeThreshold) {
+ LOG.debug("start to flush chunk groups, memory space occupy:{}",
memSize);
+ recordCountForNextMemCheck = recordCount * chunkGroupSizeThreshold /
memSize;
+ return flush();
+ } else {
+ recordCountForNextMemCheck = recordCount * chunkGroupSizeThreshold /
memSize;
+ return false;
+ }
+ }
+ return false;
+ }
Review Comment:
flush() seems to always return false, and thus, so does this method.
##########
java/tsfile/src/main/java/org/apache/tsfile/write/v4/AbstractTableModelTsFileWriter.java:
##########
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tsfile.write.v4;
+
+import org.apache.tsfile.common.TsFileApi;
+import org.apache.tsfile.common.conf.TSFileConfig;
+import org.apache.tsfile.common.conf.TSFileDescriptor;
+import org.apache.tsfile.encrypt.EncryptParameter;
+import org.apache.tsfile.encrypt.IEncryptor;
+import org.apache.tsfile.exception.encrypt.EncryptException;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.write.chunk.AlignedChunkGroupWriterImpl;
+import org.apache.tsfile.write.chunk.IChunkGroupWriter;
+import org.apache.tsfile.write.chunk.NonAlignedChunkGroupWriterImpl;
+import org.apache.tsfile.write.schema.Schema;
+import org.apache.tsfile.write.writer.TsFileIOWriter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.security.MessageDigest;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+abstract class AbstractTableModelTsFileWriter implements ITsFileWriter {
+
+ protected static final TSFileConfig config =
TSFileDescriptor.getInstance().getConfig();
+ protected static final Logger LOG =
LoggerFactory.getLogger(AbstractTableModelTsFileWriter.class);
+
+ /** IO writer of this TsFile. */
+ protected final TsFileIOWriter fileWriter;
+
+ protected EncryptParameter encryptParam;
+
+ protected final int pageSize;
+ protected long recordCount = 0;
+
+ // deviceId -> measurementIdList
+ protected Map<IDeviceID, List<String>> flushedMeasurementsInDeviceMap = new
HashMap<>();
+
+ // DeviceId -> LastTime
+ protected Map<IDeviceID, Long> alignedDeviceLastTimeMap = new HashMap<>();
+
+ // TimeseriesId -> LastTime
+ protected Map<IDeviceID, Map<String, Long>> nonAlignedTimeseriesLastTimeMap
= new HashMap<>();
+
+ protected Map<IDeviceID, IChunkGroupWriter> groupWriters = new TreeMap<>();
+
+ /** min value of threshold of data points num check. */
+ protected long recordCountForNextMemCheck = 100;
+
+ protected long chunkGroupSizeThreshold;
+
+ /**
+ * init this Writer.
+ *
+ * @param file the File to be written by this TsFileWriter
+ */
+ @TsFileApi
+ protected AbstractTableModelTsFileWriter(File file, long
chunkGroupSizeThreshold)
+ throws IOException {
+ Schema schema = new Schema();
+ TSFileConfig conf = TSFileDescriptor.getInstance().getConfig();
+ this.fileWriter = new TsFileIOWriter(file);
+ fileWriter.setSchema(schema);
+
+ this.pageSize = conf.getPageSizeInByte();
+ this.chunkGroupSizeThreshold = chunkGroupSizeThreshold;
+ config.setTSFileStorageFs(conf.getTSFileStorageFs());
+ if (this.pageSize >= chunkGroupSizeThreshold) {
+ LOG.warn(
+ "TsFile's page size {} is greater than chunk group size {}, please
enlarge the chunk group"
+ + " size or decrease page size. ",
+ pageSize,
+ chunkGroupSizeThreshold);
+ }
+
+ String encryptLevel;
+ byte[] encryptKey;
+ byte[] dataEncryptKey;
+ String encryptType;
+ if (config.getEncryptFlag()) {
+ encryptLevel = "2";
+ encryptType = config.getEncryptType();
+ try {
+ MessageDigest md = MessageDigest.getInstance("SHA-256");
+ md.update("IoTDB is the best".getBytes());
+ md.update(config.getEncryptKey().getBytes());
+ dataEncryptKey = Arrays.copyOfRange(md.digest(), 0, 16);
+ encryptKey =
+ IEncryptor.getEncryptor(config.getEncryptType(),
config.getEncryptKey().getBytes())
+ .encrypt(dataEncryptKey);
+ } catch (Exception e) {
+ throw new EncryptException(
+ "SHA-256 function not found while using SHA-256 to generate data
key");
+ }
+ } else {
+ encryptLevel = "0";
+ encryptType = "org.apache.tsfile.encrypt.UNENCRYPTED";
+ encryptKey = null;
+ dataEncryptKey = null;
+ }
+ this.encryptParam = new EncryptParameter(encryptType, dataEncryptKey);
+ if (encryptKey != null) {
+ StringBuilder valueStr = new StringBuilder();
+
+ for (byte b : encryptKey) {
+ valueStr.append(b).append(",");
+ }
+
+ valueStr.deleteCharAt(valueStr.length() - 1);
+ String str = valueStr.toString();
+
+ fileWriter.setEncryptParam(encryptLevel, encryptType, str);
+ } else {
+ fileWriter.setEncryptParam(encryptLevel, encryptType, "");
+ }
+ }
+
+ protected IChunkGroupWriter tryToInitialGroupWriter(IDeviceID deviceId,
boolean isAligned) {
+ IChunkGroupWriter groupWriter = groupWriters.get(deviceId);
+ if (groupWriter == null) {
+ if (isAligned) {
+ groupWriter = new AlignedChunkGroupWriterImpl(deviceId, encryptParam);
+ ((AlignedChunkGroupWriterImpl) groupWriter)
+ .setLastTime(alignedDeviceLastTimeMap.get(deviceId));
+ } else {
+ groupWriter = new NonAlignedChunkGroupWriterImpl(deviceId,
encryptParam);
+ ((NonAlignedChunkGroupWriterImpl) groupWriter)
+ .setLastTimeMap(
+ nonAlignedTimeseriesLastTimeMap.getOrDefault(deviceId, new
HashMap<>()));
+ }
+ groupWriters.put(deviceId, groupWriter);
+ }
+ return groupWriter;
+ }
+
+ /**
+ * calculate total memory size occupied by allT ChunkGroupWriter instances
currently.
+ *
+ * @return total memory size used
+ */
+ protected long calculateMemSizeForAllGroup() {
+ long memTotalSize = 0;
+ for (IChunkGroupWriter group : groupWriters.values()) {
+ memTotalSize += group.updateMaxGroupMemSize();
+ }
+ return memTotalSize;
+ }
+
+ /**
+ * check occupied memory size, if it exceeds the chunkGroupSize threshold,
flush them to given
+ * OutputStream.
+ *
+ * @return true - size of tsfile or metadata reaches the threshold. false -
otherwise
+ * @throws IOException exception in IO
+ */
+ protected boolean checkMemorySizeAndMayFlushChunks() throws IOException {
+ if (recordCount >= recordCountForNextMemCheck) {
+ long memSize = calculateMemSizeForAllGroup();
+ assert memSize > 0;
Review Comment:
replace with log or exception
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]