This is an automated email from the ASF dual-hosted git repository. qiaojialin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 22a82b304875772f4b77796ba30f06af9fc8035f Author: LebronAl <txypot...@gmail.com> AuthorDate: Mon Jun 29 13:00:58 2020 +0800 Abstract insertPlan --- .../SystemDesign/StorageEngine/DataManipulation.md | 4 +- .../SystemDesign/StorageEngine/DataManipulation.md | 4 +- .../org/apache/iotdb/db/engine/StorageEngine.java | 16 +- .../iotdb/db/engine/memtable/AbstractMemTable.java | 18 +- .../apache/iotdb/db/engine/memtable/IMemTable.java | 4 +- .../engine/storagegroup/StorageGroupProcessor.java | 32 +- .../db/engine/storagegroup/TsFileProcessor.java | 20 +- .../org/apache/iotdb/db/metadata/MManager.java | 31 +- .../org/apache/iotdb/db/monitor/StatMonitor.java | 4 +- .../org/apache/iotdb/db/mqtt/PublishHandler.java | 9 +- .../apache/iotdb/db/qp/executor/IPlanExecutor.java | 9 +- .../apache/iotdb/db/qp/executor/PlanExecutor.java | 24 +- .../apache/iotdb/db/qp/physical/PhysicalPlan.java | 14 +- .../iotdb/db/qp/physical/crud/InsertPlan.java | 429 ++------------------- .../crud/{InsertPlan.java => InsertRowPlan.java} | 168 +++----- .../db/qp/physical/crud/InsertTabletPlan.java | 102 +---- .../iotdb/db/qp/strategy/PhysicalGenerator.java | 4 +- .../org/apache/iotdb/db/service/TSServiceImpl.java | 22 +- .../iotdb/db/writelog/recover/LogReplayer.java | 76 ++-- .../db/engine/cache/ChunkMetadataCacheTest.java | 6 +- .../engine/modification/DeletionFileNodeTest.java | 16 +- .../db/engine/modification/DeletionQueryTest.java | 21 +- .../storagegroup/FileNodeManagerBenchmark.java | 6 +- .../storagegroup/StorageGroupProcessorTest.java | 22 +- .../iotdb/db/engine/storagegroup/TTLTest.java | 48 +-- .../engine/storagegroup/TsFileProcessorTest.java | 10 +- .../apache/iotdb/db/mqtt/PublishHandlerTest.java | 4 +- .../java/org/apache/iotdb/db/qp/PlannerTest.java | 15 +- .../iotdb/db/query/reader/ReaderTestHelper.java | 4 - .../org/apache/iotdb/db/tools/WalCheckerTest.java | 6 +- .../apache/iotdb/db/writelog/PerformanceTest.java | 8 +- .../iotdb/db/writelog/WriteLogNodeManagerTest.java | 4 +- .../apache/iotdb/db/writelog/WriteLogNodeTest.java | 13 +- .../iotdb/db/writelog/io/LogWriterReaderTest.java | 10 +- .../iotdb/db/writelog/recover/LogReplayerTest.java | 8 +- .../recover/RecoverResourceFromReaderTest.java | 15 +- .../db/writelog/recover/SeqTsFileRecoverTest.java | 8 +- .../writelog/recover/UnseqTsFileRecoverTest.java | 14 +- 38 files changed, 349 insertions(+), 879 deletions(-) diff --git a/docs/SystemDesign/StorageEngine/DataManipulation.md b/docs/SystemDesign/StorageEngine/DataManipulation.md index 3931425..88ed345 100644 --- a/docs/SystemDesign/StorageEngine/DataManipulation.md +++ b/docs/SystemDesign/StorageEngine/DataManipulation.md @@ -30,7 +30,7 @@ The following describes four common data manipulation operations, which are inse * Corresponding interface * JDBC's execute and executeBatch interfaces * Session's insertRecord and insertRecords -* Main entrance: ```public void insert(InsertPlan insertPlan)``` StorageEngine.java +* Main entrance: ```public void insert(InsertRowPlan insertRowPlan)``` StorageEngine.java * Find the corresponding StorageGroupProcessor * Find the corresponding TsFileProcessor according to the time of writing the data and the last time stamp of the current device order * Pre-write log @@ -46,7 +46,7 @@ The following describes four common data manipulation operations, which are inse * Corresponding interface * Session‘s insertTablet -* Main entrance: ```public Integer[] insertTablet(InsertTabletPlan insertTabletPlan)``` StorageEngine.java +* Main entrance: ```public void insertTablet(InsertTabletPlan insertTabletPlan)``` StorageEngine.java * Find the corresponding StorageGroupProcessor * According to the time of this batch of data and the last timestamp of the current device order, this batch of data is divided into small batches, which correspond to a TsFileProcessor * Pre-write log diff --git a/docs/zh/SystemDesign/StorageEngine/DataManipulation.md b/docs/zh/SystemDesign/StorageEngine/DataManipulation.md index 2a87d06..188b8b3 100644 --- a/docs/zh/SystemDesign/StorageEngine/DataManipulation.md +++ b/docs/zh/SystemDesign/StorageEngine/DataManipulation.md @@ -31,7 +31,7 @@ * JDBC 的 execute 和 executeBatch 接口 * Session 的 insertRecord 和 insertRecords -* 总入口: public void insert(InsertPlan insertPlan) StorageEngine.java +* 总入口: public void insert(InsertRowPlan insertRowPlan) StorageEngine.java * 找到对应的 StorageGroupProcessor * 根据写入数据的时间以及当前设备落盘的最后时间戳,找到对应的 TsFileProcessor * 记录写前日志 @@ -47,7 +47,7 @@ * 对应的接口 * Session 的 insertTablet -* 总入口: public Integer[] insertTablet(InsertTabletPlan insertTabletPlan) StorageEngine.java +* 总入口: public void insertTablet(InsertTabletPlan insertTabletPlan) StorageEngine.java * 找到对应的 StorageGroupProcessor * 根据这批数据的时间以及当前设备落盘的最后时间戳,将这批数据分成小批,分别对应到一个 TsFileProcessor 中 * 记录写前日志 diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java index 95a854f..3f13ca9 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java @@ -51,17 +51,17 @@ import org.apache.iotdb.db.exception.BatchInsertionException; import org.apache.iotdb.db.exception.LoadFileException; import org.apache.iotdb.db.exception.ShutdownException; import org.apache.iotdb.db.exception.StorageEngineException; +import org.apache.iotdb.db.exception.StorageGroupProcessorException; import org.apache.iotdb.db.exception.TsFileProcessorException; +import org.apache.iotdb.db.exception.WriteProcessException; import org.apache.iotdb.db.exception.metadata.MetadataException; import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.exception.runtime.StorageEngineFailureException; -import org.apache.iotdb.db.exception.StorageGroupProcessorException; -import org.apache.iotdb.db.exception.WriteProcessException; import org.apache.iotdb.db.metadata.MManager; import org.apache.iotdb.db.metadata.mnode.StorageGroupMNode; +import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan; import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan; -import org.apache.iotdb.db.qp.physical.crud.InsertPlan; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.control.QueryFileManager; import org.apache.iotdb.db.service.IService; @@ -292,17 +292,17 @@ public class StorageEngine implements IService { /** - * insert an InsertPlan to a storage group. + * insert an InsertRowPlan to a storage group. * - * @param insertPlan physical plan of insertion + * @param insertRowPlan physical plan of insertion */ - public void insert(InsertPlan insertPlan) throws StorageEngineException { + public void insert(InsertRowPlan insertRowPlan) throws StorageEngineException { - StorageGroupProcessor storageGroupProcessor = getProcessor(insertPlan.getDeviceId()); + StorageGroupProcessor storageGroupProcessor = getProcessor(insertRowPlan.getDeviceId()); // TODO monitor: update statistics try { - storageGroupProcessor.insert(insertPlan); + storageGroupProcessor.insert(insertRowPlan); } catch (WriteProcessException e) { throw new StorageEngineException(e); } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java index 48c4a38..a150c86 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java @@ -30,8 +30,8 @@ import org.apache.iotdb.db.engine.modification.Modification; import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk; import org.apache.iotdb.db.exception.WriteProcessException; import org.apache.iotdb.db.exception.query.QueryProcessException; +import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan; import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan; -import org.apache.iotdb.db.qp.physical.crud.InsertPlan; import org.apache.iotdb.db.rescon.TVListAllocator; import org.apache.iotdb.db.utils.MemUtils; import org.apache.iotdb.db.utils.datastructure.TVList; @@ -97,21 +97,21 @@ public abstract class AbstractMemTable implements IMemTable { protected abstract IWritableMemChunk genMemSeries(MeasurementSchema schema); @Override - public void insert(InsertPlan insertPlan) { - for (int i = 0; i < insertPlan.getValues().length; i++) { + public void insert(InsertRowPlan insertRowPlan) { + for (int i = 0; i < insertRowPlan.getValues().length; i++) { - if (insertPlan.getValues()[i] == null) { + if (insertRowPlan.getValues()[i] == null) { continue; } - Object value = insertPlan.getValues()[i]; - memSize += MemUtils.getRecordSize(insertPlan.getSchemas()[i].getType(), value); + Object value = insertRowPlan.getValues()[i]; + memSize += MemUtils.getRecordSize(insertRowPlan.getSchemas()[i].getType(), value); - write(insertPlan.getDeviceId(), insertPlan.getMeasurements()[i], - insertPlan.getSchemas()[i], insertPlan.getTime(), value); + write(insertRowPlan.getDeviceId(), insertRowPlan.getMeasurements()[i], + insertRowPlan.getSchemas()[i], insertRowPlan.getTime(), value); } - totalPointsNum += insertPlan.getMeasurements().length - insertPlan.getFailedMeasurementNumber(); + totalPointsNum += insertRowPlan.getMeasurements().length - insertRowPlan.getFailedMeasurementNumber(); } @Override diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java index 203b1b0..5843179 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java @@ -24,8 +24,8 @@ import org.apache.iotdb.db.engine.modification.Deletion; import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk; import org.apache.iotdb.db.exception.WriteProcessException; import org.apache.iotdb.db.exception.query.QueryProcessException; +import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan; import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan; -import org.apache.iotdb.db.qp.physical.crud.InsertPlan; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; @@ -66,7 +66,7 @@ public interface IMemTable { long getTotalPointsNum(); - void insert(InsertPlan insertPlan) throws WriteProcessException; + void insert(InsertRowPlan insertRowPlan) throws WriteProcessException; /** * [start, end) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java index 85c14d0..a1e9e97 100755 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java @@ -77,7 +77,7 @@ import org.apache.iotdb.db.metadata.MManager; import org.apache.iotdb.db.metadata.mnode.MNode; import org.apache.iotdb.db.metadata.mnode.MeasurementMNode; import org.apache.iotdb.db.qp.physical.crud.DeletePlan; -import org.apache.iotdb.db.qp.physical.crud.InsertPlan; +import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan; import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.control.QueryFileManager; @@ -600,23 +600,23 @@ public class StorageGroupProcessor { } - public void insert(InsertPlan insertPlan) throws WriteProcessException { + public void insert(InsertRowPlan insertRowPlan) throws WriteProcessException { // reject insertions that are out of ttl - if (!isAlive(insertPlan.getTime())) { - throw new OutOfTTLException(insertPlan.getTime(), (System.currentTimeMillis() - dataTTL)); + if (!isAlive(insertRowPlan.getTime())) { + throw new OutOfTTLException(insertRowPlan.getTime(), (System.currentTimeMillis() - dataTTL)); } writeLock(); try { // init map - long timePartitionId = StorageEngine.getTimePartition(insertPlan.getTime()); + long timePartitionId = StorageEngine.getTimePartition(insertRowPlan.getTime()); latestTimeForEachDevice.computeIfAbsent(timePartitionId, l -> new HashMap<>()); partitionLatestFlushedTimeForEachDevice.computeIfAbsent(timePartitionId, id -> new HashMap<>()); // insert to sequence or unSequence file - insertToTsFileProcessor(insertPlan, - insertPlan.getTime() > partitionLatestFlushedTimeForEachDevice.get(timePartitionId) - .getOrDefault(insertPlan.getDeviceId(), Long.MIN_VALUE)); + insertToTsFileProcessor(insertRowPlan, + insertRowPlan.getTime() > partitionLatestFlushedTimeForEachDevice.get(timePartitionId) + .getOrDefault(insertRowPlan.getDeviceId(), Long.MIN_VALUE)); } finally { writeUnlock(); @@ -797,9 +797,9 @@ public class StorageGroupProcessor { } } - private void insertToTsFileProcessor(InsertPlan insertPlan, boolean sequence) + private void insertToTsFileProcessor(InsertRowPlan insertRowPlan, boolean sequence) throws WriteProcessException { - long timePartitionId = StorageEngine.getTimePartition(insertPlan.getTime()); + long timePartitionId = StorageEngine.getTimePartition(insertRowPlan.getTime()); TsFileProcessor tsFileProcessor = getOrCreateTsFileProcessor(timePartitionId, sequence); @@ -808,19 +808,19 @@ public class StorageGroupProcessor { } // insert TsFileProcessor - tsFileProcessor.insert(insertPlan); + tsFileProcessor.insert(insertRowPlan); // try to update the latest time of the device of this tsRecord if (latestTimeForEachDevice.get(timePartitionId) - .getOrDefault(insertPlan.getDeviceId(), Long.MIN_VALUE) < insertPlan.getTime()) { + .getOrDefault(insertRowPlan.getDeviceId(), Long.MIN_VALUE) < insertRowPlan.getTime()) { latestTimeForEachDevice.get(timePartitionId) - .put(insertPlan.getDeviceId(), insertPlan.getTime()); + .put(insertRowPlan.getDeviceId(), insertRowPlan.getTime()); } long globalLatestFlushTime = globalLatestFlushedTimeForEachDevice.getOrDefault( - insertPlan.getDeviceId(), Long.MIN_VALUE); + insertRowPlan.getDeviceId(), Long.MIN_VALUE); - tryToUpdateInsertLastCache(insertPlan, globalLatestFlushTime); + tryToUpdateInsertLastCache(insertRowPlan, globalLatestFlushTime); // check memtable size and may asyncTryToFlush the work memtable if (tsFileProcessor.shouldFlush()) { @@ -828,7 +828,7 @@ public class StorageGroupProcessor { } } - private void tryToUpdateInsertLastCache(InsertPlan plan, Long latestFlushedTime) + private void tryToUpdateInsertLastCache(InsertRowPlan plan, Long latestFlushedTime) throws WriteProcessException { MNode node = null; try { diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java index 1e82581..cd200cf 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java @@ -48,8 +48,8 @@ import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.UpdateEndTi import org.apache.iotdb.db.engine.version.VersionController; import org.apache.iotdb.db.exception.TsFileProcessorException; import org.apache.iotdb.db.exception.WriteProcessException; +import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan; import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan; -import org.apache.iotdb.db.qp.physical.crud.InsertPlan; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.rescon.MemTablePool; import org.apache.iotdb.db.utils.QueryUtils; @@ -150,22 +150,22 @@ public class TsFileProcessor { } /** - * insert data in an InsertPlan into the workingMemtable. + * insert data in an InsertRowPlan into the workingMemtable. * - * @param insertPlan physical plan of insertion + * @param insertRowPlan physical plan of insertion */ - public void insert(InsertPlan insertPlan) throws WriteProcessException { + public void insert(InsertRowPlan insertRowPlan) throws WriteProcessException { if (workMemTable == null) { workMemTable = MemTablePool.getInstance().getAvailableMemTable(this); } - // insert insertPlan to the work memtable - workMemTable.insert(insertPlan); + // insert insertRowPlan to the work memtable + workMemTable.insert(insertRowPlan); if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) { try { - getLogNode().write(insertPlan); + getLogNode().write(insertRowPlan); } catch (Exception e) { throw new WriteProcessException(String.format("%s: %s write WAL failed", storageGroupName, tsFileResource.getFile().getAbsolutePath()), e); @@ -173,11 +173,11 @@ public class TsFileProcessor { } // update start time of this memtable - tsFileResource.updateStartTime(insertPlan.getDeviceId(), insertPlan.getTime()); + tsFileResource.updateStartTime(insertRowPlan.getDeviceId(), insertRowPlan.getTime()); //for sequence tsfile, we update the endTime only when the file is prepared to be closed. //for unsequence tsfile, we have to update the endTime for each insertion. if (!sequence) { - tsFileResource.updateEndTime(insertPlan.getDeviceId(), insertPlan.getTime()); + tsFileResource.updateEndTime(insertRowPlan.getDeviceId(), insertRowPlan.getTime()); } } @@ -197,7 +197,7 @@ public class TsFileProcessor { workMemTable = MemTablePool.getInstance().getAvailableMemTable(this); } - // insert insertPlan to the work memtable + // insert insertRowPlan to the work memtable try { workMemTable.insertTablet(insertTabletPlan, start, end); if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) { diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java index e39d0ac..48acc44 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java @@ -34,6 +34,7 @@ import org.apache.iotdb.db.monitor.MonitorConstants; import org.apache.iotdb.db.qp.constant.SQLConstant; import org.apache.iotdb.db.qp.physical.PhysicalPlan; import org.apache.iotdb.db.qp.physical.crud.InsertPlan; +import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan; import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan; import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan; import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan; @@ -1829,7 +1830,7 @@ public class MManager { /** * get schema for device. - * Attention!!! Only support insertPlan and insertTabletsPlan + * Attention!!! Only support insertPlan * @param deviceId * @param measurementList * @param plan @@ -1839,7 +1840,7 @@ public class MManager { public MeasurementSchema[] getSeriesSchemas(String deviceId, String[] measurementList, PhysicalPlan plan) throws MetadataException { MeasurementSchema[] schemas = new MeasurementSchema[measurementList.length]; - MNode deviceNode = null; + MNode deviceNode; // 1. get device node deviceNode = getDeviceNode(deviceId); @@ -1870,9 +1871,9 @@ public class MManager { // check type is match TSDataType insertDataType = null; - if (plan instanceof InsertPlan) { - if (!((InsertPlan)plan).isNeedInferType()) { - // only when InsertPlan's values is object[], we should check type + if (plan instanceof InsertRowPlan) { + if (!((InsertRowPlan)plan).isNeedInferType()) { + // only when InsertRowPlan's values is object[], we should check type insertDataType = getTypeInLoc(plan, i); } else { insertDataType = measurementNode.getSchema().getType(); @@ -1890,9 +1891,7 @@ public class MManager { measurementList[i], insertDataType, measurementNode.getSchema().getType())); } else { // mark failed measurement - if (plan instanceof InsertTabletPlan) { - ((InsertTabletPlan) plan).markMeasurementInsertionFailed(i); - } else if (plan instanceof InsertPlan) { + if( plan instanceof InsertPlan){ ((InsertPlan) plan).markMeasurementInsertionFailed(i); } continue; @@ -1900,8 +1899,8 @@ public class MManager { } // maybe need to convert value type to the true type - if ((plan instanceof InsertPlan) && ((InsertPlan) plan).isNeedInferType()) { - changeStringValueToRealType((InsertPlan) plan, i, measurementNode.getSchema().getType()); + if ((plan instanceof InsertRowPlan) && ((InsertRowPlan) plan).isNeedInferType()) { + changeStringValueToRealType((InsertRowPlan) plan, i, measurementNode.getSchema().getType()); } schemas[i] = measurementNode.getSchema(); @@ -1915,8 +1914,6 @@ public class MManager { // mark failed measurement if (plan instanceof InsertPlan) { ((InsertPlan) plan).markMeasurementInsertionFailed(i); - } else if (plan instanceof InsertTabletPlan) { - ((InsertTabletPlan) plan).markMeasurementInsertionFailed(i); } } else { throw e; @@ -1926,8 +1923,8 @@ public class MManager { return schemas; } - private void changeStringValueToRealType(InsertPlan plan, int loc, TSDataType type) throws MetadataException { - plan.getTypes()[loc] = type; + private void changeStringValueToRealType(InsertRowPlan plan, int loc, TSDataType type) throws MetadataException { + plan.getDataTypes()[loc] = type; try { switch (type) { case INT32: @@ -1990,7 +1987,7 @@ public class MManager { /** * get dataType of plan, in loc measurements - * only support InsertPlan and InsertTabletPlan + * only support InsertRowPlan and InsertTabletPlan * @param plan * @param loc * @return @@ -1998,8 +1995,8 @@ public class MManager { */ private TSDataType getTypeInLoc(PhysicalPlan plan, int loc) throws MetadataException { TSDataType dataType; - if (plan instanceof InsertPlan) { - InsertPlan tPlan = (InsertPlan) plan; + if (plan instanceof InsertRowPlan) { + InsertRowPlan tPlan = (InsertRowPlan) plan; dataType = TypeInferenceUtils.getPredictedDataType(tPlan.getValues()[loc], tPlan.isNeedInferType()); } else if (plan instanceof InsertTabletPlan) { dataType = ((InsertTabletPlan) plan).getDataTypes()[loc]; diff --git a/server/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java b/server/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java index 3d59a89..fabc0e8 100644 --- a/server/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java +++ b/server/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java @@ -39,7 +39,7 @@ import org.apache.iotdb.db.metadata.MManager; import org.apache.iotdb.db.monitor.MonitorConstants.FileNodeManagerStatConstants; import org.apache.iotdb.db.monitor.MonitorConstants.FileNodeProcessorStatConstants; import org.apache.iotdb.db.monitor.collector.FileSize; -import org.apache.iotdb.db.qp.physical.crud.InsertPlan; +import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan; import org.apache.iotdb.db.service.IService; import org.apache.iotdb.db.service.ServiceType; import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; @@ -380,7 +380,7 @@ public class StatMonitor implements IService { int pointNum; for (Map.Entry<String, TSRecord> entry : tsRecordHashMap.entrySet()) { try { - fManager.insert(new InsertPlan(entry.getValue())); + fManager.insert(new InsertRowPlan(entry.getValue())); numInsert.incrementAndGet(); pointNum = entry.getValue().dataPointList.size(); numPointsInsert.addAndGet(pointNum); diff --git a/server/src/main/java/org/apache/iotdb/db/mqtt/PublishHandler.java b/server/src/main/java/org/apache/iotdb/db/mqtt/PublishHandler.java index bd5e036..87b70e5 100644 --- a/server/src/main/java/org/apache/iotdb/db/mqtt/PublishHandler.java +++ b/server/src/main/java/org/apache/iotdb/db/mqtt/PublishHandler.java @@ -21,6 +21,7 @@ import io.moquette.interception.AbstractInterceptHandler; import io.moquette.interception.messages.InterceptPublishMessage; import io.netty.buffer.ByteBuf; import io.netty.handler.codec.mqtt.MqttQoS; +import java.util.List; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.StorageEngineException; @@ -29,13 +30,11 @@ import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.qp.executor.IPlanExecutor; import org.apache.iotdb.db.qp.executor.PlanExecutor; import org.apache.iotdb.db.qp.physical.PhysicalPlan; -import org.apache.iotdb.db.qp.physical.crud.InsertPlan; +import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.List; - /** * PublishHandler handle the messages from MQTT clients. */ @@ -86,12 +85,12 @@ public class PublishHandler extends AbstractInterceptHandler { continue; } - InsertPlan plan = new InsertPlan(); + InsertRowPlan plan = new InsertRowPlan(); plan.setDeviceId(event.getDevice()); plan.setTime(event.getTimestamp()); plan.setMeasurements(event.getMeasurements().toArray(new String[event.getMeasurements().size()])); plan.setValues(event.getValues().toArray(new Object[event.getValues().size()])); - plan.setTypes(new TSDataType[event.getValues().size()]); + plan.setDataTypes(new TSDataType[event.getValues().size()]); plan.setNeedInferType(true); boolean status = false; diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/IPlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/IPlanExecutor.java index 6fb5d98..7bd6319 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/executor/IPlanExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/IPlanExecutor.java @@ -26,11 +26,10 @@ import org.apache.iotdb.db.exception.metadata.MetadataException; import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.qp.physical.PhysicalPlan; -import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan; import org.apache.iotdb.db.qp.physical.crud.DeletePlan; -import org.apache.iotdb.db.qp.physical.crud.InsertPlan; +import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan; +import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan; import org.apache.iotdb.db.query.context.QueryContext; -import org.apache.iotdb.service.rpc.thrift.TSStatus; import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException; import org.apache.iotdb.tsfile.read.common.Path; import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; @@ -86,9 +85,9 @@ public interface IPlanExecutor { /** * execute insert command and return whether the operator is successful. * - * @param insertPlan physical insert plan + * @param insertRowPlan physical insert plan */ - void insert(InsertPlan insertPlan) throws QueryProcessException; + void insert(InsertRowPlan insertRowPlan) throws QueryProcessException; /** * execute batch insert plan diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java index 761d27b..e15d229 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java @@ -139,7 +139,7 @@ public class PlanExecutor implements IPlanExecutor { } return true; case INSERT: - insert((InsertPlan) plan); + insert((InsertRowPlan) plan); return true; case BATCHINSERT: insertTablet((InsertTabletPlan) plan); @@ -869,27 +869,21 @@ public class PlanExecutor implements IPlanExecutor { return mManager.getSeriesSchemas(insertPlan.getDeviceId(), insertPlan.getMeasurements(), insertPlan); } - protected MeasurementSchema[] getSeriesSchemas(InsertTabletPlan insertTabletPlan) - throws MetadataException { - return mManager.getSeriesSchemas(insertTabletPlan.getDeviceId(), - insertTabletPlan.getMeasurements(), insertTabletPlan); - } - @Override - public void insert(InsertPlan insertPlan) throws QueryProcessException { + public void insert(InsertRowPlan insertRowPlan) throws QueryProcessException { try { - mManager.lockInsert(insertPlan.getDeviceId()); - MeasurementSchema[] schemas = getSeriesSchemas(insertPlan); - insertPlan.setSchemasAndTransferType(schemas); - StorageEngine.getInstance().insert(insertPlan); - if (insertPlan.getFailedMeasurements() != null) { + mManager.lockInsert(insertRowPlan.getDeviceId()); + MeasurementSchema[] schemas = getSeriesSchemas(insertRowPlan); + insertRowPlan.setSchemasAndTransferType(schemas); + StorageEngine.getInstance().insert(insertRowPlan); + if (insertRowPlan.getFailedMeasurements() != null) { throw new StorageEngineException( - "failed to insert measurements " + insertPlan.getFailedMeasurements()); + "failed to insert measurements " + insertRowPlan.getFailedMeasurements()); } } catch (StorageEngineException | MetadataException e) { throw new QueryProcessException(e); } finally { - mManager.unlockInsert(insertPlan.getDeviceId()); + mManager.unlockInsert(insertRowPlan.getDeviceId()); } } diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java index 5d30f66..9746b7b 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java @@ -25,9 +25,9 @@ import java.util.Collections; import java.util.List; import org.apache.iotdb.db.qp.logical.Operator; import org.apache.iotdb.db.qp.logical.Operator.OperatorType; -import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan; import org.apache.iotdb.db.qp.physical.crud.DeletePlan; -import org.apache.iotdb.db.qp.physical.crud.InsertPlan; +import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan; +import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan; import org.apache.iotdb.db.qp.physical.sys.AuthorPlan; import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan; import org.apache.iotdb.db.qp.physical.sys.DataAuthPlan; @@ -166,17 +166,17 @@ public abstract class PhysicalPlan { // TODO-Cluster: support more plans switch (type) { case INSERT: - plan = new InsertPlan(); - plan.deserialize(buffer); - break; - case DELETE: - plan = new DeletePlan(); + plan = new InsertRowPlan(); plan.deserialize(buffer); break; case BATCHINSERT: plan = new InsertTabletPlan(); plan.deserialize(buffer); break; + case DELETE: + plan = new DeletePlan(); + plan.deserialize(buffer); + break; case SET_STORAGE_GROUP: plan = new SetStorageGroupPlan(); plan.deserialize(buffer); diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java index 0190699..503175c 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java @@ -16,219 +16,33 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.db.qp.physical.crud; -import org.apache.iotdb.db.conf.IoTDBConstant; -import org.apache.iotdb.db.conf.IoTDBDescriptor; -import org.apache.iotdb.db.exception.metadata.PathNotExistException; -import org.apache.iotdb.db.exception.query.QueryProcessException; +import java.util.ArrayList; +import java.util.List; import org.apache.iotdb.db.qp.logical.Operator; -import org.apache.iotdb.db.qp.logical.Operator.OperatorType; import org.apache.iotdb.db.qp.physical.PhysicalPlan; -import org.apache.iotdb.db.utils.CommonUtils; -import org.apache.iotdb.db.utils.TestOnly; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; -import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; -import org.apache.iotdb.tsfile.read.TimeValuePair; -import org.apache.iotdb.tsfile.read.common.Path; -import org.apache.iotdb.tsfile.utils.Binary; -import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; -import org.apache.iotdb.tsfile.utils.TsPrimitiveType; -import org.apache.iotdb.tsfile.write.record.TSRecord; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.DataOutputStream; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Objects; - -public class InsertPlan extends PhysicalPlan { - - private static final Logger logger = LoggerFactory.getLogger(InsertPlan.class); - private long time; - private String deviceId; - private String[] measurements; - private Object[] values; - private TSDataType[] types; - private MeasurementSchema[] schemas; +abstract public class InsertPlan extends PhysicalPlan { - // if isNeedInferType is true, the values must be String[], so we could infer types from them - // if values is object[], we could use the raw type of them, and we should set this to false - private boolean isNeedInferType = false; + protected String deviceId; + protected String[] measurements; + protected TSDataType[] dataTypes; + protected MeasurementSchema[] schemas; // record the failed measurements - private List<String> failedMeasurements; - - public InsertPlan() { - super(false, OperatorType.INSERT); - canBeSplit = false; - } - - @TestOnly - public InsertPlan(String deviceId, long insertTime, String[] measurements, TSDataType[] types, - String[] insertValues) { - super(false, OperatorType.INSERT); - this.time = insertTime; - this.deviceId = deviceId; - this.measurements = measurements; - - this.types = types; - this.values = new Object[measurements.length]; - for (int i = 0; i < measurements.length; i++) { - try { - values[i] = CommonUtils.parseValueForTest(types[i], insertValues[i]); - } catch (QueryProcessException e) { - e.printStackTrace(); - } - } - canBeSplit = false; - } - - @TestOnly - public InsertPlan(String deviceId, long insertTime, String measurement, TSDataType type, String insertValue) { - super(false, OperatorType.INSERT); - this.time = insertTime; - this.deviceId = deviceId; - this.measurements = new String[]{measurement}; - this.types = new TSDataType[]{type}; - this.values = new Object[1]; - try { - values[0] = CommonUtils.parseValueForTest(types[0], insertValue); - } catch (QueryProcessException e) { - e.printStackTrace(); - } - canBeSplit = false; - } - - public InsertPlan(TSRecord tsRecord) { - super(false, OperatorType.INSERT); - this.deviceId = tsRecord.deviceId; - this.time = tsRecord.time; - this.measurements = new String[tsRecord.dataPointList.size()]; - this.schemas = new MeasurementSchema[tsRecord.dataPointList.size()]; - this.types = new TSDataType[tsRecord.dataPointList.size()]; - this.values = new Object[tsRecord.dataPointList.size()]; - for (int i = 0; i < tsRecord.dataPointList.size(); i++) { - measurements[i] = tsRecord.dataPointList.get(i).getMeasurementId(); - schemas[i] = new MeasurementSchema(measurements[i], tsRecord.dataPointList.get(i).getType(), - TSEncoding.PLAIN); - types[i] = tsRecord.dataPointList.get(i).getType(); - values[i] = tsRecord.dataPointList.get(i).getValue(); - } - canBeSplit = false; - } - - public InsertPlan(String deviceId, long insertTime, String[] measurementList, TSDataType[] types, - Object[] insertValues) { - super(false, Operator.OperatorType.INSERT); - this.time = insertTime; - this.deviceId = deviceId; - this.measurements = measurementList; - this.types = types; - this.values = insertValues; - canBeSplit = false; - } - - public InsertPlan(String deviceId, long insertTime, String[] measurementList, - String[] insertValues) { - super(false, Operator.OperatorType.INSERT); - this.time = insertTime; - this.deviceId = deviceId; - this.measurements = measurementList; - // build types and values - this.types = new TSDataType[measurements.length]; - this.values = new Object[measurements.length]; - System.arraycopy(insertValues, 0, values, 0, measurements.length); - isNeedInferType = true; - canBeSplit = false; - } + protected List<String> failedMeasurements; - - public long getTime() { - return time; - } - - public void setTime(long time) { - this.time = time; - } - - public boolean isNeedInferType() { - return isNeedInferType; - } - - public void setNeedInferType(boolean inferType) { - this.isNeedInferType = inferType; - } - - public MeasurementSchema[] getSchemas() { - return schemas; - } - - /** - * if inferType is true, - * transfer String[] values to specific data types (Integer, Long, Float, Double, Binary) - */ - public void setSchemasAndTransferType(MeasurementSchema[] schemas) throws QueryProcessException { - this.schemas = schemas; - if (isNeedInferType) { - for (int i = 0; i < schemas.length; i++) { - if (schemas[i] == null) { - if (IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) { - markMeasurementInsertionFailed(i); - } else { - throw new QueryProcessException(new PathNotExistException( - deviceId + IoTDBConstant.PATH_SEPARATOR + measurements[i])); - } - continue; - } - types[i] = schemas[i].getType(); - try { - values[i] = CommonUtils.parseValue(types[i], values[i].toString()); - } catch (Exception e) { - logger.warn("{}.{} data type is not consistent, input {}, registered {}", deviceId, - measurements[i], values[i], types[i]); - if (IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) { - markMeasurementInsertionFailed(i); - schemas[i] = null; - } else { - throw e; - } - } - } - } - } - - /** - * @param index failed measurement index - */ - public void markMeasurementInsertionFailed(int index) { - if (failedMeasurements == null) { - failedMeasurements = new ArrayList<>(); - } - failedMeasurements.add(measurements[index]); - measurements[index] = null; - types[index] = null; - values[index] = null; - } - - @Override - public List<Path> getPaths() { - List<Path> ret = new ArrayList<>(); - - for (String m : measurements) { - ret.add(new Path(deviceId, m)); - } - return ret; + public InsertPlan(Operator.OperatorType operatorType) { + super(false, operatorType); + super.canBeSplit = false; } public String getDeviceId() { - return this.deviceId; + return deviceId; } public void setDeviceId(String deviceId) { @@ -243,122 +57,20 @@ public class InsertPlan extends PhysicalPlan { this.measurements = measurements; } - public Object[] getValues() { - return this.values; - } - - public void setValues(Object[] values) { - this.values = values; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - InsertPlan that = (InsertPlan) o; - return time == that.time && Objects.equals(deviceId, that.deviceId) - && Arrays.equals(measurements, that.measurements) - && Arrays.equals(values, that.values); + public TSDataType[] getDataTypes() { + return dataTypes; } - @Override - public int hashCode() { - return Objects.hash(deviceId, time); - } - - @Override - public void serialize(DataOutputStream stream) throws IOException { - int type = PhysicalPlanType.INSERT.ordinal(); - stream.writeByte((byte) type); - stream.writeLong(time); - - putString(stream, deviceId); - - stream.writeInt(measurements.length - (failedMeasurements == null ? 0 : failedMeasurements.size())); - - for (String m : measurements) { - if (m != null) { - putString(stream, m); - } - } - - for (MeasurementSchema schema: schemas) { - if (schema != null) { - schema.serializeTo(stream); - } - } - - try { - putValues(stream); - } catch (QueryProcessException e) { - throw new IOException(e); - } + public void setDataTypes(TSDataType[] dataTypes) { + this.dataTypes = dataTypes; } - private void putValues(DataOutputStream outputStream) throws QueryProcessException, IOException { - for (int i = 0; i < values.length; i++) { - if (types[i] == null) { - continue; - } - ReadWriteIOUtils.write(types[i], outputStream); - switch (types[i]) { - case BOOLEAN: - ReadWriteIOUtils.write((Boolean) values[i], outputStream); - break; - case INT32: - ReadWriteIOUtils.write((Integer) values[i], outputStream); - break; - case INT64: - ReadWriteIOUtils.write((Long) values[i], outputStream); - break; - case FLOAT: - ReadWriteIOUtils.write((Float) values[i], outputStream); - break; - case DOUBLE: - ReadWriteIOUtils.write((Double) values[i], outputStream); - break; - case TEXT: - ReadWriteIOUtils.write((Binary) values[i], outputStream); - break; - default: - throw new QueryProcessException("Unsupported data type:" + types[i]); - } - } + public MeasurementSchema[] getSchemas() { + return schemas; } - private void putValues(ByteBuffer buffer) throws QueryProcessException { - for (int i = 0; i < values.length; i++) { - if (types[i] == null) { - continue; - } - ReadWriteIOUtils.write(types[i], buffer); - switch (types[i]) { - case BOOLEAN: - ReadWriteIOUtils.write((Boolean) values[i], buffer); - break; - case INT32: - ReadWriteIOUtils.write((Integer) values[i], buffer); - break; - case INT64: - ReadWriteIOUtils.write((Long) values[i], buffer); - break; - case FLOAT: - ReadWriteIOUtils.write((Float) values[i], buffer); - break; - case DOUBLE: - ReadWriteIOUtils.write((Double) values[i], buffer); - break; - case TEXT: - ReadWriteIOUtils.write((Binary) values[i], buffer); - break; - default: - throw new QueryProcessException("Unsupported data type:" + types[i]); - } - } + public void setSchemas(MeasurementSchema[] schemas) { + this.schemas = schemas; } public List<String> getFailedMeasurements() { @@ -369,97 +81,16 @@ public class InsertPlan extends PhysicalPlan { return failedMeasurements == null ? 0 : failedMeasurements.size(); } - public TSDataType[] getTypes() { - return types; - } - - public void setTypes(TSDataType[] types) { - this.types = types; - } - - public void setValues(ByteBuffer buffer) throws QueryProcessException { - for (int i = 0; i < measurements.length; i++) { - types[i] = ReadWriteIOUtils.readDataType(buffer); - switch (types[i]) { - case BOOLEAN: - values[i] = ReadWriteIOUtils.readBool(buffer); - break; - case INT32: - values[i] = ReadWriteIOUtils.readInt(buffer); - break; - case INT64: - values[i] = ReadWriteIOUtils.readLong(buffer); - break; - case FLOAT: - values[i] = ReadWriteIOUtils.readFloat(buffer); - break; - case DOUBLE: - values[i] = ReadWriteIOUtils.readDouble(buffer); - break; - case TEXT: - values[i] = ReadWriteIOUtils.readBinary(buffer); - break; - default: - throw new QueryProcessException("Unsupported data type:" + types[i]); - } - } - } - - @Override - public void serialize(ByteBuffer buffer) { - int type = PhysicalPlanType.INSERT.ordinal(); - buffer.put((byte) type); - buffer.putLong(time); - - putString(buffer, deviceId); - - buffer.putInt(measurements.length - (failedMeasurements == null ? 0 : failedMeasurements.size())); - - for (String measurement : measurements) { - if (measurement != null) { - putString(buffer, measurement); - } - } - - try { - putValues(buffer); - } catch (QueryProcessException e) { - e.printStackTrace(); - } - } - - @Override - public void deserialize(ByteBuffer buffer) { - this.time = buffer.getLong(); - this.deviceId = readString(buffer); - - int measurementSize = buffer.getInt(); - - this.measurements = new String[measurementSize]; - for (int i = 0; i < measurementSize; i++) { - measurements[i] = readString(buffer); - } - - this.types = new TSDataType[measurementSize]; - this.values = new Object[measurementSize]; - try { - setValues(buffer); - } catch (QueryProcessException e) { - e.printStackTrace(); + /** + * @param index failed measurement index + */ + public void markMeasurementInsertionFailed(int index) { + if (failedMeasurements == null) { + failedMeasurements = new ArrayList<>(); } + failedMeasurements.add(measurements[index]); + measurements[index] = null; + dataTypes[index] = null; } - @Override - public String toString() { - return "deviceId: " + deviceId + ", time: " + time; - } - - public TimeValuePair composeTimeValuePair(int measurementIndex) { - if (measurementIndex >= values.length) { - return null; - } - Object value = values[measurementIndex]; - return new TimeValuePair(time, - TsPrimitiveType.getByType(schemas[measurementIndex].getType(), value)); - } } diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java similarity index 74% copy from server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java copy to server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java index 0190699..3b63efc 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java @@ -18,13 +18,19 @@ */ package org.apache.iotdb.db.qp.physical.crud; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; import org.apache.iotdb.db.conf.IoTDBConstant; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.metadata.PathNotExistException; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.qp.logical.Operator; import org.apache.iotdb.db.qp.logical.Operator.OperatorType; -import org.apache.iotdb.db.qp.physical.PhysicalPlan; import org.apache.iotdb.db.utils.CommonUtils; import org.apache.iotdb.db.utils.TestOnly; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; @@ -39,114 +45,95 @@ import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.DataOutputStream; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Objects; +public class InsertRowPlan extends InsertPlan { -public class InsertPlan extends PhysicalPlan { - - private static final Logger logger = LoggerFactory.getLogger(InsertPlan.class); + private static final Logger logger = LoggerFactory.getLogger(InsertRowPlan.class); private long time; - private String deviceId; - private String[] measurements; private Object[] values; - private TSDataType[] types; - private MeasurementSchema[] schemas; // if isNeedInferType is true, the values must be String[], so we could infer types from them // if values is object[], we could use the raw type of them, and we should set this to false private boolean isNeedInferType = false; - // record the failed measurements - private List<String> failedMeasurements; - - public InsertPlan() { - super(false, OperatorType.INSERT); - canBeSplit = false; + public InsertRowPlan() { + super(OperatorType.INSERT); } @TestOnly - public InsertPlan(String deviceId, long insertTime, String[] measurements, TSDataType[] types, + public InsertRowPlan(String deviceId, long insertTime, String[] measurements, + TSDataType[] dataTypes, String[] insertValues) { - super(false, OperatorType.INSERT); + super(OperatorType.INSERT); this.time = insertTime; this.deviceId = deviceId; this.measurements = measurements; - - this.types = types; + this.dataTypes = dataTypes; this.values = new Object[measurements.length]; for (int i = 0; i < measurements.length; i++) { try { - values[i] = CommonUtils.parseValueForTest(types[i], insertValues[i]); + values[i] = CommonUtils.parseValueForTest(dataTypes[i], insertValues[i]); } catch (QueryProcessException e) { e.printStackTrace(); } } - canBeSplit = false; } @TestOnly - public InsertPlan(String deviceId, long insertTime, String measurement, TSDataType type, String insertValue) { - super(false, OperatorType.INSERT); + public InsertRowPlan(String deviceId, long insertTime, String measurement, TSDataType type, + String insertValue) { + super(OperatorType.INSERT); this.time = insertTime; this.deviceId = deviceId; this.measurements = new String[]{measurement}; - this.types = new TSDataType[]{type}; + this.dataTypes = new TSDataType[]{type}; this.values = new Object[1]; try { - values[0] = CommonUtils.parseValueForTest(types[0], insertValue); + values[0] = CommonUtils.parseValueForTest(dataTypes[0], insertValue); } catch (QueryProcessException e) { e.printStackTrace(); } - canBeSplit = false; } - public InsertPlan(TSRecord tsRecord) { - super(false, OperatorType.INSERT); + public InsertRowPlan(TSRecord tsRecord) { + super(OperatorType.INSERT); this.deviceId = tsRecord.deviceId; this.time = tsRecord.time; this.measurements = new String[tsRecord.dataPointList.size()]; this.schemas = new MeasurementSchema[tsRecord.dataPointList.size()]; - this.types = new TSDataType[tsRecord.dataPointList.size()]; + this.dataTypes = new TSDataType[tsRecord.dataPointList.size()]; this.values = new Object[tsRecord.dataPointList.size()]; for (int i = 0; i < tsRecord.dataPointList.size(); i++) { measurements[i] = tsRecord.dataPointList.get(i).getMeasurementId(); schemas[i] = new MeasurementSchema(measurements[i], tsRecord.dataPointList.get(i).getType(), TSEncoding.PLAIN); - types[i] = tsRecord.dataPointList.get(i).getType(); + dataTypes[i] = tsRecord.dataPointList.get(i).getType(); values[i] = tsRecord.dataPointList.get(i).getValue(); } - canBeSplit = false; } - public InsertPlan(String deviceId, long insertTime, String[] measurementList, TSDataType[] types, + public InsertRowPlan(String deviceId, long insertTime, String[] measurementList, + TSDataType[] dataTypes, Object[] insertValues) { - super(false, Operator.OperatorType.INSERT); + super(Operator.OperatorType.INSERT); this.time = insertTime; this.deviceId = deviceId; this.measurements = measurementList; - this.types = types; + this.dataTypes = dataTypes; this.values = insertValues; - canBeSplit = false; } - public InsertPlan(String deviceId, long insertTime, String[] measurementList, + public InsertRowPlan(String deviceId, long insertTime, String[] measurementList, String[] insertValues) { - super(false, Operator.OperatorType.INSERT); + super(Operator.OperatorType.INSERT); this.time = insertTime; this.deviceId = deviceId; this.measurements = measurementList; // build types and values - this.types = new TSDataType[measurements.length]; + this.dataTypes = new TSDataType[measurements.length]; this.values = new Object[measurements.length]; System.arraycopy(insertValues, 0, values, 0, measurements.length); isNeedInferType = true; - canBeSplit = false; } @@ -166,13 +153,9 @@ public class InsertPlan extends PhysicalPlan { this.isNeedInferType = inferType; } - public MeasurementSchema[] getSchemas() { - return schemas; - } - /** - * if inferType is true, - * transfer String[] values to specific data types (Integer, Long, Float, Double, Binary) + * if inferType is true, transfer String[] values to specific data types (Integer, Long, Float, + * Double, Binary) */ public void setSchemasAndTransferType(MeasurementSchema[] schemas) throws QueryProcessException { this.schemas = schemas; @@ -187,12 +170,12 @@ public class InsertPlan extends PhysicalPlan { } continue; } - types[i] = schemas[i].getType(); + dataTypes[i] = schemas[i].getType(); try { - values[i] = CommonUtils.parseValue(types[i], values[i].toString()); + values[i] = CommonUtils.parseValue(dataTypes[i], values[i].toString()); } catch (Exception e) { logger.warn("{}.{} data type is not consistent, input {}, registered {}", deviceId, - measurements[i], values[i], types[i]); + measurements[i], values[i], dataTypes[i]); if (IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) { markMeasurementInsertionFailed(i); schemas[i] = null; @@ -204,16 +187,9 @@ public class InsertPlan extends PhysicalPlan { } } - /** - * @param index failed measurement index - */ + @Override public void markMeasurementInsertionFailed(int index) { - if (failedMeasurements == null) { - failedMeasurements = new ArrayList<>(); - } - failedMeasurements.add(measurements[index]); - measurements[index] = null; - types[index] = null; + super.markMeasurementInsertionFailed(index); values[index] = null; } @@ -227,22 +203,6 @@ public class InsertPlan extends PhysicalPlan { return ret; } - public String getDeviceId() { - return this.deviceId; - } - - public void setDeviceId(String deviceId) { - this.deviceId = deviceId; - } - - public String[] getMeasurements() { - return this.measurements; - } - - public void setMeasurements(String[] measurements) { - this.measurements = measurements; - } - public Object[] getValues() { return this.values; } @@ -259,7 +219,7 @@ public class InsertPlan extends PhysicalPlan { if (o == null || getClass() != o.getClass()) { return false; } - InsertPlan that = (InsertPlan) o; + InsertRowPlan that = (InsertRowPlan) o; return time == that.time && Objects.equals(deviceId, that.deviceId) && Arrays.equals(measurements, that.measurements) && Arrays.equals(values, that.values); @@ -278,7 +238,8 @@ public class InsertPlan extends PhysicalPlan { putString(stream, deviceId); - stream.writeInt(measurements.length - (failedMeasurements == null ? 0 : failedMeasurements.size())); + stream.writeInt( + measurements.length - (failedMeasurements == null ? 0 : failedMeasurements.size())); for (String m : measurements) { if (m != null) { @@ -286,7 +247,7 @@ public class InsertPlan extends PhysicalPlan { } } - for (MeasurementSchema schema: schemas) { + for (MeasurementSchema schema : schemas) { if (schema != null) { schema.serializeTo(stream); } @@ -301,11 +262,11 @@ public class InsertPlan extends PhysicalPlan { private void putValues(DataOutputStream outputStream) throws QueryProcessException, IOException { for (int i = 0; i < values.length; i++) { - if (types[i] == null) { + if (dataTypes[i] == null) { continue; } - ReadWriteIOUtils.write(types[i], outputStream); - switch (types[i]) { + ReadWriteIOUtils.write(dataTypes[i], outputStream); + switch (dataTypes[i]) { case BOOLEAN: ReadWriteIOUtils.write((Boolean) values[i], outputStream); break; @@ -325,18 +286,18 @@ public class InsertPlan extends PhysicalPlan { ReadWriteIOUtils.write((Binary) values[i], outputStream); break; default: - throw new QueryProcessException("Unsupported data type:" + types[i]); + throw new QueryProcessException("Unsupported data type:" + dataTypes[i]); } } } private void putValues(ByteBuffer buffer) throws QueryProcessException { for (int i = 0; i < values.length; i++) { - if (types[i] == null) { + if (dataTypes[i] == null) { continue; } - ReadWriteIOUtils.write(types[i], buffer); - switch (types[i]) { + ReadWriteIOUtils.write(dataTypes[i], buffer); + switch (dataTypes[i]) { case BOOLEAN: ReadWriteIOUtils.write((Boolean) values[i], buffer); break; @@ -356,31 +317,15 @@ public class InsertPlan extends PhysicalPlan { ReadWriteIOUtils.write((Binary) values[i], buffer); break; default: - throw new QueryProcessException("Unsupported data type:" + types[i]); + throw new QueryProcessException("Unsupported data type:" + dataTypes[i]); } } } - public List<String> getFailedMeasurements() { - return failedMeasurements; - } - - public int getFailedMeasurementNumber() { - return failedMeasurements == null ? 0 : failedMeasurements.size(); - } - - public TSDataType[] getTypes() { - return types; - } - - public void setTypes(TSDataType[] types) { - this.types = types; - } - public void setValues(ByteBuffer buffer) throws QueryProcessException { for (int i = 0; i < measurements.length; i++) { - types[i] = ReadWriteIOUtils.readDataType(buffer); - switch (types[i]) { + dataTypes[i] = ReadWriteIOUtils.readDataType(buffer); + switch (dataTypes[i]) { case BOOLEAN: values[i] = ReadWriteIOUtils.readBool(buffer); break; @@ -400,7 +345,7 @@ public class InsertPlan extends PhysicalPlan { values[i] = ReadWriteIOUtils.readBinary(buffer); break; default: - throw new QueryProcessException("Unsupported data type:" + types[i]); + throw new QueryProcessException("Unsupported data type:" + dataTypes[i]); } } } @@ -413,7 +358,8 @@ public class InsertPlan extends PhysicalPlan { putString(buffer, deviceId); - buffer.putInt(measurements.length - (failedMeasurements == null ? 0 : failedMeasurements.size())); + buffer + .putInt(measurements.length - (failedMeasurements == null ? 0 : failedMeasurements.size())); for (String measurement : measurements) { if (measurement != null) { @@ -440,7 +386,7 @@ public class InsertPlan extends PhysicalPlan { measurements[i] = readString(buffer); } - this.types = new TSDataType[measurementSize]; + this.dataTypes = new TSDataType[measurementSize]; this.values = new Object[measurementSize]; try { setValues(buffer); diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java index 7a8745c..687a3d9 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java @@ -25,7 +25,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Set; import org.apache.iotdb.db.qp.logical.Operator.OperatorType; -import org.apache.iotdb.db.qp.physical.PhysicalPlan; import org.apache.iotdb.db.utils.QueryDataSetUtils; import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; @@ -40,18 +39,11 @@ import org.apache.iotdb.tsfile.utils.TsPrimitiveType.TsDouble; import org.apache.iotdb.tsfile.utils.TsPrimitiveType.TsFloat; import org.apache.iotdb.tsfile.utils.TsPrimitiveType.TsInt; import org.apache.iotdb.tsfile.utils.TsPrimitiveType.TsLong; -import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; -public class InsertTabletPlan extends PhysicalPlan { +public class InsertTabletPlan extends InsertPlan { private static final String DATATYPE_UNSUPPORTED = "Data type %s is not supported."; - private String deviceId; - private String[] measurements; - private TSDataType[] dataTypes; - // only be set in insert - private MeasurementSchema[] schemas; - private long[] times; // times should be sorted. It is done in the session API. private ByteBuffer timeBuffer; @@ -66,26 +58,25 @@ public class InsertTabletPlan extends PhysicalPlan { private int start; private int end; - // record the failed measurements - private List<String> failedMeasurements; public InsertTabletPlan() { - super(false, OperatorType.BATCHINSERT); + super(OperatorType.BATCHINSERT); } public InsertTabletPlan(String deviceId, List<String> measurements) { - super(false, OperatorType.BATCHINSERT); + super(OperatorType.BATCHINSERT); this.deviceId = deviceId; - setMeasurements(measurements); + this.measurements = measurements.toArray(new String[0]); } + public InsertTabletPlan(String deviceId, String[] measurements) { - super(false, OperatorType.BATCHINSERT); + super(OperatorType.BATCHINSERT); this.deviceId = deviceId; - setMeasurements(measurements); + this.measurements = measurements; } public InsertTabletPlan(String deviceId, String[] measurements, List<Integer> dataTypes) { - super(false, OperatorType.BATCHINSERT); + super(OperatorType.BATCHINSERT); this.deviceId = deviceId; this.measurements = measurements; setDataTypes(dataTypes); @@ -135,7 +126,8 @@ public class InsertTabletPlan extends PhysicalPlan { putString(stream, deviceId); - stream.writeInt(measurements.length - (failedMeasurements == null ? 0 : failedMeasurements.size())); + stream.writeInt( + measurements.length - (failedMeasurements == null ? 0 : failedMeasurements.size())); for (String m : measurements) { if (m == null) { continue; @@ -153,7 +145,7 @@ public class InsertTabletPlan extends PhysicalPlan { stream.writeInt(index.size()); if (timeBuffer == null) { - for(int loc : index){ + for (int loc : index) { stream.writeLong(times[loc]); } } else { @@ -184,37 +176,37 @@ public class InsertTabletPlan extends PhysicalPlan { switch (dataType) { case INT32: int[] intValues = (int[]) column; - for(int loc : index){ + for (int loc : index) { stream.writeInt(intValues[loc]); } break; case INT64: long[] longValues = (long[]) column; - for(int loc : index){ + for (int loc : index) { stream.writeLong(longValues[loc]); } break; case FLOAT: float[] floatValues = (float[]) column; - for(int loc : index){ + for (int loc : index) { stream.writeFloat(floatValues[loc]); } break; case DOUBLE: double[] doubleValues = (double[]) column; - for(int loc : index){ + for (int loc : index) { stream.writeDouble(doubleValues[loc]); } break; case BOOLEAN: boolean[] boolValues = (boolean[]) column; - for(int loc : index){ + for (int loc : index) { stream.write(BytesUtils.boolToByte(boolValues[loc])); } break; case TEXT: Binary[] binaryValues = (Binary[]) column; - for(int loc : index){ + for (int loc : index) { stream.writeInt(binaryValues[loc].getLength()); stream.write(binaryValues[loc].getValues()); } @@ -232,7 +224,8 @@ public class InsertTabletPlan extends PhysicalPlan { putString(buffer, deviceId); - buffer.putInt(measurements.length - (failedMeasurements == null ? 0 : failedMeasurements.size())); + buffer + .putInt(measurements.length - (failedMeasurements == null ? 0 : failedMeasurements.size())); for (String m : measurements) { if (m != null) { putString(buffer, m); @@ -352,40 +345,6 @@ public class InsertTabletPlan extends PhysicalPlan { columns = QueryDataSetUtils.readValuesFromBuffer(buffer, dataTypes, measurementSize, rows); } - - public String getDeviceId() { - return deviceId; - } - - public void setDeviceId(String deviceId) { - this.deviceId = deviceId; - } - - public String[] getMeasurements() { - return measurements; - } - - public void setMeasurements(List<String> measurements) { - this.measurements = new String[measurements.size()]; - measurements.toArray(this.measurements); - } - - public void setMeasurements(String[] measurements) { - this.measurements = measurements; - } - - public TSDataType[] getDataTypes() { - return dataTypes; - } - - public MeasurementSchema[] getSchemas() { - return schemas; - } - - public void setSchemas(MeasurementSchema[] schemas) { - this.schemas = schemas; - } - public void setDataTypes(List<Integer> dataTypes) { this.dataTypes = new TSDataType[dataTypes.size()]; for (int i = 0; i < dataTypes.size(); i++) { @@ -393,10 +352,6 @@ public class InsertTabletPlan extends PhysicalPlan { } } - public void setDataTypes(TSDataType[] dataTypes) { - this.dataTypes = dataTypes; - } - public Object[] getColumns() { return columns; } @@ -488,25 +443,10 @@ public class InsertTabletPlan extends PhysicalPlan { this.rowCount = size; } - /** - * @param index failed measurement index - */ + @Override public void markMeasurementInsertionFailed(int index) { - if (failedMeasurements == null) { - failedMeasurements = new ArrayList<>(); - } - failedMeasurements.add(measurements[index]); - measurements[index] = null; - dataTypes[index] = null; + super.markMeasurementInsertionFailed(index); columns[index] = null; } - public List<String> getFailedMeasurements() { - return failedMeasurements; - } - - public int getFailedMeasurementNumber() { - return failedMeasurements == null ? 0 : failedMeasurements.size(); - } - } diff --git a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java index 6b5dc43..a1e52aa 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java @@ -71,7 +71,7 @@ import org.apache.iotdb.db.qp.physical.crud.DeletePlan; import org.apache.iotdb.db.qp.physical.crud.FillQueryPlan; import org.apache.iotdb.db.qp.physical.crud.GroupByTimeFillPlan; import org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan; -import org.apache.iotdb.db.qp.physical.crud.InsertPlan; +import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan; import org.apache.iotdb.db.qp.physical.crud.LastQueryPlan; import org.apache.iotdb.db.qp.physical.crud.QueryPlan; import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan; @@ -175,7 +175,7 @@ public class PhysicalGenerator { "For Insert command, cannot specified more than one seriesPath: " + paths); } - return new InsertPlan(paths.get(0).getFullPath(), insert.getTime(), + return new InsertRowPlan(paths.get(0).getFullPath(), insert.getTime(), insert.getMeasurementList(), insert.getValueList()); case MERGE: if (operator.getTokenIntType() == SQLConstant.TOK_FULL_MERGE) { diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java index 0adc2c6..cd60197 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java @@ -65,7 +65,7 @@ import org.apache.iotdb.db.qp.physical.crud.AggregationPlan; import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan; import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan.MeasurementType; import org.apache.iotdb.db.qp.physical.crud.DeletePlan; -import org.apache.iotdb.db.qp.physical.crud.InsertPlan; +import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan; import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan; import org.apache.iotdb.db.qp.physical.crud.LastQueryPlan; import org.apache.iotdb.db.qp.physical.crud.QueryPlan; @@ -101,18 +101,6 @@ import org.apache.thrift.server.ServerContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.sql.SQLException; -import java.time.ZoneId; -import java.util.*; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicLong; -import java.util.stream.Collectors; - -import static org.apache.iotdb.db.conf.IoTDBConfig.PATH_PATTERN; -import static org.apache.iotdb.db.qp.physical.sys.ShowPlan.ShowContentType.TIMESERIES; - /** * Thrift RPC implementation at server side. @@ -1092,13 +1080,13 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { } List<TSStatus> statusList = new ArrayList<>(); - InsertPlan plan = new InsertPlan(); + InsertRowPlan plan = new InsertRowPlan(); for (int i = 0; i < req.deviceIds.size(); i++) { try { plan.setDeviceId(req.getDeviceIds().get(i)); plan.setTime(req.getTimestamps().get(i)); plan.setMeasurements(req.getMeasurementsList().get(i).toArray(new String[0])); - plan.setTypes(new TSDataType[plan.getMeasurements().length]); + plan.setDataTypes(new TSDataType[plan.getMeasurements().length]); plan.setValues(new Object[plan.getMeasurements().length]); plan.setValues(req.valuesList.get(i)); plan.setNeedInferType(req.isInferType()); @@ -1152,11 +1140,11 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { return RpcUtils.getStatus(TSStatusCode.NOT_LOGIN_ERROR); } - InsertPlan plan = new InsertPlan(); + InsertRowPlan plan = new InsertRowPlan(); plan.setDeviceId(req.getDeviceId()); plan.setTime(req.getTimestamp()); plan.setMeasurements(req.getMeasurements().toArray(new String[0])); - plan.setTypes(new TSDataType[plan.getMeasurements().length]); + plan.setDataTypes(new TSDataType[plan.getMeasurements().length]); plan.setValues(new Object[plan.getMeasurements().length]); plan.setValues(req.values); plan.setNeedInferType(req.isInferType()); diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java b/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java index d9dfcd3..99514a6 100644 --- a/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java +++ b/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java @@ -28,15 +28,15 @@ import org.apache.iotdb.db.engine.modification.Deletion; import org.apache.iotdb.db.engine.modification.ModificationFile; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.engine.version.VersionController; +import org.apache.iotdb.db.exception.WriteProcessException; import org.apache.iotdb.db.exception.metadata.MetadataException; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.metadata.MManager; -import org.apache.iotdb.db.exception.WriteProcessException; -import org.apache.iotdb.db.exception.StorageGroupProcessorException; import org.apache.iotdb.db.qp.physical.PhysicalPlan; -import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan; import org.apache.iotdb.db.qp.physical.crud.DeletePlan; import org.apache.iotdb.db.qp.physical.crud.InsertPlan; +import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan; +import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan; import org.apache.iotdb.db.qp.physical.crud.UpdatePlan; import org.apache.iotdb.db.writelog.io.ILogReader; import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager; @@ -52,6 +52,7 @@ import org.slf4j.LoggerFactory; * WALs from the logNode and redoes them into a given MemTable and ModificationFile. */ public class LogReplayer { + private Logger logger = LoggerFactory.getLogger(LogReplayer.class); private String logNodePrefix; private String insertFilePath; @@ -97,8 +98,6 @@ public class LogReplayer { replayDelete((DeletePlan) plan); } else if (plan instanceof UpdatePlan) { replayUpdate((UpdatePlan) plan); - } else if (plan instanceof InsertTabletPlan) { - replayBatchInsert((InsertTabletPlan) plan); } } catch (Exception e) { logger.error("recover wal of {} failed", insertFilePath, e); @@ -127,61 +126,46 @@ public class LogReplayer { } } - private void replayBatchInsert(InsertTabletPlan insertTabletPlan) - throws WriteProcessException, QueryProcessException { + private void replayInsert(InsertPlan plan) throws WriteProcessException, QueryProcessException { if (currentTsFileResource != null) { + long minTime, maxTime; + if (plan instanceof InsertRowPlan) { + minTime = ((InsertRowPlan) plan).getTime(); + maxTime = ((InsertRowPlan) plan).getTime(); + } else { + minTime = ((InsertTabletPlan) plan).getMinTime(); + maxTime = ((InsertTabletPlan) plan).getMaxTime(); + } // the last chunk group may contain the same data with the logs, ignore such logs in seq file - long lastEndTime = currentTsFileResource.getEndTime(insertTabletPlan.getDeviceId()); - if (lastEndTime != Long.MIN_VALUE && lastEndTime >= insertTabletPlan.getMinTime() && + long lastEndTime = currentTsFileResource.getEndTime(plan.getDeviceId()); + if (lastEndTime != Long.MIN_VALUE && lastEndTime >= minTime && !acceptDuplication) { return; } - Long startTime = tempStartTimeMap.get(insertTabletPlan.getDeviceId()); - if (startTime == null || startTime > insertTabletPlan.getMinTime()) { - tempStartTimeMap.put(insertTabletPlan.getDeviceId(), insertTabletPlan.getMinTime()); + Long startTime = tempStartTimeMap.get(plan.getDeviceId()); + if (startTime == null || startTime > minTime) { + tempStartTimeMap.put(plan.getDeviceId(), minTime); } - Long endTime = tempEndTimeMap.get(insertTabletPlan.getDeviceId()); - if (endTime == null || endTime < insertTabletPlan.getMaxTime()) { - tempEndTimeMap.put(insertTabletPlan.getDeviceId(), insertTabletPlan.getMaxTime()); + Long endTime = tempEndTimeMap.get(plan.getDeviceId()); + if (endTime == null || endTime < maxTime) { + tempEndTimeMap.put(plan.getDeviceId(), maxTime); } } MeasurementSchema[] schemas; try { - schemas = MManager.getInstance().getSchemas(insertTabletPlan.getDeviceId(), insertTabletPlan + schemas = MManager.getInstance().getSchemas(plan.getDeviceId(), plan .getMeasurements()); } catch (MetadataException e) { throw new QueryProcessException(e); } - insertTabletPlan.setSchemas(schemas); - recoverMemTable.insertTablet(insertTabletPlan, 0, insertTabletPlan.getRowCount()); - } - - private void replayInsert(InsertPlan insertPlan) { - if (currentTsFileResource != null) { - // the last chunk group may contain the same data with the logs, ignore such logs in seq file - long lastEndTime = currentTsFileResource.getEndTime(insertPlan.getDeviceId()); - if (lastEndTime != Long.MIN_VALUE && lastEndTime >= insertPlan.getTime() && - !acceptDuplication) { - return; - } - Long startTime = tempStartTimeMap.get(insertPlan.getDeviceId()); - if (startTime == null || startTime > insertPlan.getTime()) { - tempStartTimeMap.put(insertPlan.getDeviceId(), insertPlan.getTime()); - } - Long endTime = tempEndTimeMap.get(insertPlan.getDeviceId()); - if (endTime == null || endTime < insertPlan.getTime()) { - tempEndTimeMap.put(insertPlan.getDeviceId(), insertPlan.getTime()); - } - } - try { - MeasurementSchema[] schemas = - MManager.getInstance().getSchemas(insertPlan.getDeviceId(), insertPlan.getMeasurements()); - insertPlan.setSchemasAndTransferType(schemas); - recoverMemTable.insert(insertPlan); - } catch (Exception e) { - logger.error( - "occurs exception when replaying the record {} at timestamp {}: {}.(Will ignore the record)", - insertPlan.getPaths(), insertPlan.getTime(), e.getMessage()); + if (plan instanceof InsertRowPlan) { + InsertRowPlan tPlan = (InsertRowPlan) plan; + tPlan.setSchemasAndTransferType(schemas); + recoverMemTable.insert(tPlan); + } else { + InsertTabletPlan tPlan = (InsertTabletPlan) plan; + tPlan.setSchemas(schemas); + recoverMemTable.insertTablet(tPlan, 0, tPlan.getRowCount()); } } diff --git a/server/src/test/java/org/apache/iotdb/db/engine/cache/ChunkMetadataCacheTest.java b/server/src/test/java/org/apache/iotdb/db/engine/cache/ChunkMetadataCacheTest.java index 2926973..08c139e 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/cache/ChunkMetadataCacheTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/cache/ChunkMetadataCacheTest.java @@ -30,9 +30,9 @@ import org.apache.iotdb.db.engine.querycontext.QueryDataSource; import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor; import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; -import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.exception.WriteProcessException; -import org.apache.iotdb.db.qp.physical.crud.InsertPlan; +import org.apache.iotdb.db.exception.query.QueryProcessException; +import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.control.FileReaderManager; import org.apache.iotdb.db.utils.EnvironmentUtils; @@ -85,7 +85,7 @@ public class ChunkMetadataCacheTest { record.addTuple(DataPoint.getDataPoint(TSDataType.FLOAT, measurementId2, String.valueOf(num))); record.addTuple(DataPoint.getDataPoint(TSDataType.DOUBLE, measurementId3, String.valueOf(num))); record.addTuple(DataPoint.getDataPoint(TSDataType.BOOLEAN, measurementId4, "True")); - storageGroupProcessor.insert(new InsertPlan(record)); + storageGroupProcessor.insert(new InsertRowPlan(record)); } protected void insertData() throws IOException, WriteProcessException { diff --git a/server/src/test/java/org/apache/iotdb/db/engine/modification/DeletionFileNodeTest.java b/server/src/test/java/org/apache/iotdb/db/engine/modification/DeletionFileNodeTest.java index 3a5aabe..1e0bcfe 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/modification/DeletionFileNodeTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/modification/DeletionFileNodeTest.java @@ -27,12 +27,12 @@ import org.apache.iotdb.db.conf.directories.DirectoryManager; import org.apache.iotdb.db.engine.StorageEngine; import org.apache.iotdb.db.engine.modification.io.LocalTextModificationAccessor; import org.apache.iotdb.db.engine.querycontext.QueryDataSource; -import org.apache.iotdb.db.exception.metadata.MetadataException; import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk; import org.apache.iotdb.db.exception.StorageEngineException; +import org.apache.iotdb.db.exception.metadata.MetadataException; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.metadata.MManager; -import org.apache.iotdb.db.qp.physical.crud.InsertPlan; +import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan; import org.apache.iotdb.db.query.control.QueryResourceManager; import org.apache.iotdb.db.utils.EnvironmentUtils; import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; @@ -93,7 +93,7 @@ public class DeletionFileNodeTest { for (int j = 0; j < 10; j++) { record.addTuple(new DoubleDataPoint(measurements[j], i * 1.0)); } - StorageEngine.getInstance().insert(new InsertPlan(record)); + StorageEngine.getInstance().insert(new InsertRowPlan(record)); } StorageEngine.getInstance().delete(processorName, measurements[3], 50); @@ -127,7 +127,7 @@ public class DeletionFileNodeTest { for (int j = 0; j < 10; j++) { record.addTuple(new DoubleDataPoint(measurements[j], i * 1.0)); } - StorageEngine.getInstance().insert(new InsertPlan(record)); + StorageEngine.getInstance().insert(new InsertRowPlan(record)); } StorageEngine.getInstance().syncCloseAllProcessor(); @@ -182,7 +182,7 @@ public class DeletionFileNodeTest { for (int j = 0; j < 10; j++) { record.addTuple(new DoubleDataPoint(measurements[j], i * 1.0)); } - StorageEngine.getInstance().insert(new InsertPlan(record)); + StorageEngine.getInstance().insert(new InsertRowPlan(record)); } StorageEngine.getInstance().syncCloseAllProcessor(); @@ -192,7 +192,7 @@ public class DeletionFileNodeTest { for (int j = 0; j < 10; j++) { record.addTuple(new DoubleDataPoint(measurements[j], i * 1.0)); } - StorageEngine.getInstance().insert(new InsertPlan(record)); + StorageEngine.getInstance().insert(new InsertRowPlan(record)); } StorageEngine.getInstance().delete(processorName, measurements[3], 50); @@ -230,7 +230,7 @@ public class DeletionFileNodeTest { for (int j = 0; j < 10; j++) { record.addTuple(new DoubleDataPoint(measurements[j], i * 1.0)); } - StorageEngine.getInstance().insert(new InsertPlan(record)); + StorageEngine.getInstance().insert(new InsertRowPlan(record)); } StorageEngine.getInstance().syncCloseAllProcessor(); @@ -240,7 +240,7 @@ public class DeletionFileNodeTest { for (int j = 0; j < 10; j++) { record.addTuple(new DoubleDataPoint(measurements[j], i * 1.0)); } - StorageEngine.getInstance().insert(new InsertPlan(record)); + StorageEngine.getInstance().insert(new InsertRowPlan(record)); } StorageEngine.getInstance().syncCloseAllProcessor(); diff --git a/server/src/test/java/org/apache/iotdb/db/engine/modification/DeletionQueryTest.java b/server/src/test/java/org/apache/iotdb/db/engine/modification/DeletionQueryTest.java index 5899108..92cd429 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/modification/DeletionQueryTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/modification/DeletionQueryTest.java @@ -27,12 +27,11 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import org.apache.iotdb.db.engine.StorageEngine; -import org.apache.iotdb.db.exception.StartupException; import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.exception.metadata.MetadataException; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.metadata.MManager; -import org.apache.iotdb.db.qp.physical.crud.InsertPlan; +import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan; import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan; import org.apache.iotdb.db.query.executor.QueryRouter; import org.apache.iotdb.db.utils.EnvironmentUtils; @@ -87,7 +86,7 @@ public class DeletionQueryTest { for (int j = 0; j < 10; j++) { record.addTuple(new DoubleDataPoint(measurements[j], i * 1.0)); } - StorageEngine.getInstance().insert(new InsertPlan(record)); + StorageEngine.getInstance().insert(new InsertRowPlan(record)); } StorageEngine.getInstance().delete(processorName, measurements[3], 50); @@ -125,7 +124,7 @@ public class DeletionQueryTest { for (int j = 0; j < 10; j++) { record.addTuple(new DoubleDataPoint(measurements[j], i * 1.0)); } - StorageEngine.getInstance().insert(new InsertPlan(record)); + StorageEngine.getInstance().insert(new InsertRowPlan(record)); } StorageEngine.getInstance().syncCloseAllProcessor(); @@ -165,7 +164,7 @@ public class DeletionQueryTest { for (int j = 0; j < 10; j++) { record.addTuple(new DoubleDataPoint(measurements[j], i * 1.0)); } - StorageEngine.getInstance().insert(new InsertPlan(record)); + StorageEngine.getInstance().insert(new InsertRowPlan(record)); } StorageEngine.getInstance().syncCloseAllProcessor(); @@ -175,7 +174,7 @@ public class DeletionQueryTest { for (int j = 0; j < 10; j++) { record.addTuple(new DoubleDataPoint(measurements[j], i * 1.0)); } - StorageEngine.getInstance().insert(new InsertPlan(record)); + StorageEngine.getInstance().insert(new InsertRowPlan(record)); } StorageEngine.getInstance().delete(processorName, measurements[3], 50); @@ -214,7 +213,7 @@ public class DeletionQueryTest { for (int j = 0; j < 10; j++) { record.addTuple(new DoubleDataPoint(measurements[j], i * 1.0)); } - StorageEngine.getInstance().insert(new InsertPlan(record)); + StorageEngine.getInstance().insert(new InsertRowPlan(record)); } StorageEngine.getInstance().syncCloseAllProcessor(); @@ -224,7 +223,7 @@ public class DeletionQueryTest { for (int j = 0; j < 10; j++) { record.addTuple(new DoubleDataPoint(measurements[j], i * 1.0)); } - StorageEngine.getInstance().insert(new InsertPlan(record)); + StorageEngine.getInstance().insert(new InsertRowPlan(record)); } StorageEngine.getInstance().syncCloseAllProcessor(); @@ -263,7 +262,7 @@ public class DeletionQueryTest { for (int j = 0; j < 10; j++) { record.addTuple(new DoubleDataPoint(measurements[j], i * 1.0)); } - StorageEngine.getInstance().insert(new InsertPlan(record)); + StorageEngine.getInstance().insert(new InsertRowPlan(record)); } StorageEngine.getInstance().delete(processorName, measurements[3], 50); @@ -278,7 +277,7 @@ public class DeletionQueryTest { for (int j = 0; j < 10; j++) { record.addTuple(new DoubleDataPoint(measurements[j], i * 1.0)); } - StorageEngine.getInstance().insert(new InsertPlan(record)); + StorageEngine.getInstance().insert(new InsertRowPlan(record)); } StorageEngine.getInstance().delete(processorName, measurements[3], 250); @@ -293,7 +292,7 @@ public class DeletionQueryTest { for (int j = 0; j < 10; j++) { record.addTuple(new DoubleDataPoint(measurements[j], i * 1.0)); } - StorageEngine.getInstance().insert(new InsertPlan(record)); + StorageEngine.getInstance().insert(new InsertRowPlan(record)); } StorageEngine.getInstance().delete(processorName, measurements[3], 50); diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/FileNodeManagerBenchmark.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/FileNodeManagerBenchmark.java index b2d091e..ac630d4 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/FileNodeManagerBenchmark.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/FileNodeManagerBenchmark.java @@ -23,10 +23,10 @@ import java.util.Collections; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicLong; import org.apache.iotdb.db.engine.StorageEngine; -import org.apache.iotdb.db.exception.metadata.MetadataException; import org.apache.iotdb.db.exception.StorageEngineException; +import org.apache.iotdb.db.exception.metadata.MetadataException; import org.apache.iotdb.db.metadata.MManager; -import org.apache.iotdb.db.qp.physical.crud.InsertPlan; +import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan; import org.apache.iotdb.db.utils.EnvironmentUtils; import org.apache.iotdb.db.utils.RandomNum; import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; @@ -117,7 +117,7 @@ public class FileNodeManagerBenchmark { long time = RandomNum.getRandomLong(1, seed); String deltaObject = devices[(int) (time % numOfDevice)]; TSRecord tsRecord = getRecord(deltaObject, time); - StorageEngine.getInstance().insert(new InsertPlan(tsRecord)); + StorageEngine.getInstance().insert(new InsertRowPlan(tsRecord)); } } catch (StorageEngineException e) { e.printStackTrace(); diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java index 6e11479..9082637 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java @@ -28,15 +28,15 @@ import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk; import org.apache.iotdb.db.exception.StorageGroupProcessorException; import org.apache.iotdb.db.exception.WriteProcessException; import org.apache.iotdb.db.exception.query.QueryProcessException; +import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan; import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan; -import org.apache.iotdb.db.qp.physical.crud.InsertPlan; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.utils.EnvironmentUtils; -import org.apache.iotdb.tsfile.read.reader.IPointReader; -import org.apache.iotdb.tsfile.read.TimeValuePair; import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; +import org.apache.iotdb.tsfile.read.TimeValuePair; +import org.apache.iotdb.tsfile.read.reader.IPointReader; import org.apache.iotdb.tsfile.utils.Pair; import org.apache.iotdb.tsfile.write.record.TSRecord; import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint; @@ -88,13 +88,13 @@ public class StorageGroupProcessorTest { public void testUnseqUnsealedDelete() throws WriteProcessException, IOException { TSRecord record = new TSRecord(10000, deviceId); record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(1000))); - processor.insert(new InsertPlan(record)); + processor.insert(new InsertRowPlan(record)); processor.syncCloseAllWorkingTsFileProcessors(); for (int j = 1; j <= 10; j++) { record = new TSRecord(j, deviceId); record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j))); - processor.insert(new InsertPlan(record)); + processor.insert(new InsertRowPlan(record)); } for (TsFileProcessor tsfileProcessor : processor.getWorkUnsequenceTsFileProcessor()) { @@ -104,7 +104,7 @@ public class StorageGroupProcessorTest { for (int j = 11; j <= 20; j++) { record = new TSRecord(j, deviceId); record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j))); - processor.insert(new InsertPlan(record)); + processor.insert(new InsertRowPlan(record)); } processor.delete(deviceId, measurementId, 15L); @@ -136,7 +136,7 @@ public class StorageGroupProcessorTest { for (int j = 1; j <= 10; j++) { TSRecord record = new TSRecord(j, deviceId); record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j))); - processor.insert(new InsertPlan(record)); + processor.insert(new InsertRowPlan(record)); processor.asyncCloseAllWorkingTsFileProcessors(); } @@ -220,7 +220,7 @@ public class StorageGroupProcessorTest { for (int j = 21; j <= 30; j++) { TSRecord record = new TSRecord(j, deviceId); record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j))); - processor.insert(new InsertPlan(record)); + processor.insert(new InsertRowPlan(record)); processor.asyncCloseAllWorkingTsFileProcessors(); } processor.syncCloseAllWorkingTsFileProcessors(); @@ -228,7 +228,7 @@ public class StorageGroupProcessorTest { for (int j = 10; j >= 1; j--) { TSRecord record = new TSRecord(j, deviceId); record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j))); - processor.insert(new InsertPlan(record)); + processor.insert(new InsertRowPlan(record)); processor.asyncCloseAllWorkingTsFileProcessors(); } @@ -253,7 +253,7 @@ public class StorageGroupProcessorTest { for (int j = 21; j <= 30; j++) { TSRecord record = new TSRecord(j, deviceId); record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j))); - processor.insert(new InsertPlan(record)); + processor.insert(new InsertRowPlan(record)); processor.asyncCloseAllWorkingTsFileProcessors(); } processor.syncCloseAllWorkingTsFileProcessors(); @@ -261,7 +261,7 @@ public class StorageGroupProcessorTest { for (int j = 10; j >= 1; j--) { TSRecord record = new TSRecord(j, deviceId); record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j))); - processor.insert(new InsertPlan(record)); + processor.insert(new InsertRowPlan(record)); processor.asyncCloseAllWorkingTsFileProcessors(); } diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java index b98118d..5a436fe 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java @@ -47,7 +47,7 @@ import org.apache.iotdb.db.metadata.MManager; import org.apache.iotdb.db.metadata.mnode.StorageGroupMNode; import org.apache.iotdb.db.qp.Planner; import org.apache.iotdb.db.qp.executor.PlanExecutor; -import org.apache.iotdb.db.qp.physical.crud.InsertPlan; +import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan; import org.apache.iotdb.db.qp.physical.sys.SetTTLPlan; import org.apache.iotdb.db.qp.physical.sys.ShowTTLPlan; import org.apache.iotdb.db.query.control.QueryResourceManager; @@ -129,55 +129,55 @@ public class TTLTest { @Test public void testTTLWrite() throws WriteProcessException, QueryProcessException { - InsertPlan insertPlan = new InsertPlan(); - insertPlan.setDeviceId(sg1); - insertPlan.setTime(System.currentTimeMillis()); - insertPlan.setMeasurements(new String[]{"s1"}); - insertPlan.setTypes(new TSDataType[]{TSDataType.INT64}); - insertPlan.setValues(new Object[]{1L}); - insertPlan.setSchemasAndTransferType( + InsertRowPlan plan = new InsertRowPlan(); + plan.setDeviceId(sg1); + plan.setTime(System.currentTimeMillis()); + plan.setMeasurements(new String[]{"s1"}); + plan.setDataTypes(new TSDataType[]{TSDataType.INT64}); + plan.setValues(new Object[]{1L}); + plan.setSchemasAndTransferType( new MeasurementSchema[]{new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.PLAIN)}); // ok without ttl - storageGroupProcessor.insert(insertPlan); + storageGroupProcessor.insert(plan); storageGroupProcessor.setDataTTL(1000); // with ttl - insertPlan.setTime(System.currentTimeMillis() - 1001); + plan.setTime(System.currentTimeMillis() - 1001); boolean caught = false; try { - storageGroupProcessor.insert(insertPlan); + storageGroupProcessor.insert(plan); } catch (OutOfTTLException e) { caught = true; } assertTrue(caught); - insertPlan.setTime(System.currentTimeMillis() - 900); - storageGroupProcessor.insert(insertPlan); + plan.setTime(System.currentTimeMillis() - 900); + storageGroupProcessor.insert(plan); } private void prepareData() throws WriteProcessException, QueryProcessException { - InsertPlan insertPlan = new InsertPlan(); - insertPlan.setDeviceId(sg1); - insertPlan.setTime(System.currentTimeMillis()); - insertPlan.setMeasurements(new String[]{"s1"}); - insertPlan.setTypes(new TSDataType[]{TSDataType.INT64}); - insertPlan.setValues(new Object[]{1L}); - insertPlan.setSchemasAndTransferType( + InsertRowPlan plan = new InsertRowPlan(); + plan.setDeviceId(sg1); + plan.setTime(System.currentTimeMillis()); + plan.setMeasurements(new String[]{"s1"}); + plan.setDataTypes(new TSDataType[]{TSDataType.INT64}); + plan.setValues(new Object[]{1L}); + plan.setSchemasAndTransferType( new MeasurementSchema[]{new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.PLAIN)}); long initTime = System.currentTimeMillis(); // sequence data for (int i = 1000; i < 2000; i++) { - insertPlan.setTime(initTime - 2000 + i); - storageGroupProcessor.insert(insertPlan); + plan.setTime(initTime - 2000 + i); + storageGroupProcessor.insert(plan); if ((i + 1) % 300 == 0) { storageGroupProcessor.syncCloseAllWorkingTsFileProcessors(); } } // unsequence data for (int i = 0; i < 1000; i++) { - insertPlan.setTime(initTime - 2000 + i); - storageGroupProcessor.insert(insertPlan); + plan.setTime(initTime - 2000 + i); + storageGroupProcessor.insert(plan); if ((i + 1) % 300 == 0) { storageGroupProcessor.syncCloseAllWorkingTsFileProcessors(); } diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java index 9b356bd..7c473e1 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java @@ -35,7 +35,7 @@ import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk; import org.apache.iotdb.db.engine.version.SysTimeVersionController; import org.apache.iotdb.db.exception.TsFileProcessorException; import org.apache.iotdb.db.exception.WriteProcessException; -import org.apache.iotdb.db.qp.physical.crud.InsertPlan; +import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.utils.EnvironmentUtils; import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; @@ -97,7 +97,7 @@ public class TsFileProcessorTest { for (int i = 1; i <= 100; i++) { TSRecord record = new TSRecord(i, deviceId); record.addTuple(DataPoint.getDataPoint(dataType, measurementId, String.valueOf(i))); - processor.insert(new InsertPlan(record)); + processor.insert(new InsertRowPlan(record)); } // query data in memory @@ -145,7 +145,7 @@ public class TsFileProcessorTest { for (int i = 1; i <= 100; i++) { TSRecord record = new TSRecord(i, deviceId); record.addTuple(DataPoint.getDataPoint(dataType, measurementId, String.valueOf(i))); - processor.insert(new InsertPlan(record)); + processor.insert(new InsertRowPlan(record)); } // query data in memory @@ -220,7 +220,7 @@ public class TsFileProcessorTest { for (int i = 1; i <= 10; i++) { TSRecord record = new TSRecord(i, deviceId); record.addTuple(DataPoint.getDataPoint(dataType, measurementId, String.valueOf(i))); - processor.insert(new InsertPlan(record)); + processor.insert(new InsertRowPlan(record)); } processor.asyncFlush(); } @@ -254,7 +254,7 @@ public class TsFileProcessorTest { for (int i = 1; i <= 100; i++) { TSRecord record = new TSRecord(i, deviceId); record.addTuple(DataPoint.getDataPoint(dataType, measurementId, String.valueOf(i))); - processor.insert(new InsertPlan(record)); + processor.insert(new InsertRowPlan(record)); } // query data in memory diff --git a/server/src/test/java/org/apache/iotdb/db/mqtt/PublishHandlerTest.java b/server/src/test/java/org/apache/iotdb/db/mqtt/PublishHandlerTest.java index 44c07ec..e95046e 100644 --- a/server/src/test/java/org/apache/iotdb/db/mqtt/PublishHandlerTest.java +++ b/server/src/test/java/org/apache/iotdb/db/mqtt/PublishHandlerTest.java @@ -22,7 +22,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.handler.codec.mqtt.*; import org.apache.iotdb.db.qp.executor.IPlanExecutor; -import org.apache.iotdb.db.qp.physical.crud.InsertPlan; +import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan; import org.junit.Test; import java.nio.charset.StandardCharsets; @@ -54,6 +54,6 @@ public class PublishHandlerTest { MqttPublishMessage publishMessage = new MqttPublishMessage(fixedHeader, variableHeader, buf); InterceptPublishMessage message = new InterceptPublishMessage(publishMessage, null, null); handler.onPublish(message); - verify(executor).processNonQuery(any(InsertPlan.class)); + verify(executor).processNonQuery(any(InsertRowPlan.class)); } } \ No newline at end of file diff --git a/server/src/test/java/org/apache/iotdb/db/qp/PlannerTest.java b/server/src/test/java/org/apache/iotdb/db/qp/PlannerTest.java index ec05a13..0a093b8 100644 --- a/server/src/test/java/org/apache/iotdb/db/qp/PlannerTest.java +++ b/server/src/test/java/org/apache/iotdb/db/qp/PlannerTest.java @@ -18,12 +18,16 @@ */ package org.apache.iotdb.db.qp; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.Collections; import org.antlr.v4.runtime.misc.ParseCancellationException; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.metadata.MManager; import org.apache.iotdb.db.qp.logical.Operator.OperatorType; import org.apache.iotdb.db.qp.physical.PhysicalPlan; -import org.apache.iotdb.db.qp.physical.crud.InsertPlan; +import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan; import org.apache.iotdb.db.utils.EnvironmentUtils; import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; @@ -33,11 +37,6 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; -import java.util.Collections; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - public class PlannerTest { private CompressionType compressionType = @@ -159,8 +158,8 @@ public class PlannerTest { String createTSStatement = "insert into root.vehicle.d0(time,s0) values(10,NaN)"; PhysicalPlan physicalPlan = processor.parseSQLToPhysicalPlan(createTSStatement); - assertTrue(physicalPlan instanceof InsertPlan); - assertEquals("NaN", ((InsertPlan) physicalPlan).getValues()[0]); + assertTrue(physicalPlan instanceof InsertRowPlan); + assertEquals("NaN", ((InsertRowPlan) physicalPlan).getValues()[0]); // Later we will use Double.parseDouble so we have to ensure that it is parsed right assertEquals(Double.NaN, Double.parseDouble("NaN"), 1e-15); } diff --git a/server/src/test/java/org/apache/iotdb/db/query/reader/ReaderTestHelper.java b/server/src/test/java/org/apache/iotdb/db/query/reader/ReaderTestHelper.java index 453e27a..6d7357a 100644 --- a/server/src/test/java/org/apache/iotdb/db/query/reader/ReaderTestHelper.java +++ b/server/src/test/java/org/apache/iotdb/db/query/reader/ReaderTestHelper.java @@ -25,15 +25,11 @@ import org.apache.iotdb.db.constant.TestConstant; import org.apache.iotdb.db.engine.MetadataManagerHelper; import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy.DirectFlushPolicy; import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor; -import org.apache.iotdb.db.exception.WriteProcessException; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.metadata.MManager; -import org.apache.iotdb.db.qp.physical.crud.InsertPlan; import org.apache.iotdb.db.query.control.FileReaderManager; import org.apache.iotdb.db.utils.EnvironmentUtils; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; -import org.apache.iotdb.tsfile.write.record.TSRecord; -import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint; import org.junit.After; import org.junit.Before; diff --git a/server/src/test/java/org/apache/iotdb/db/tools/WalCheckerTest.java b/server/src/test/java/org/apache/iotdb/db/tools/WalCheckerTest.java index 49efed9..a327a8b 100644 --- a/server/src/test/java/org/apache/iotdb/db/tools/WalCheckerTest.java +++ b/server/src/test/java/org/apache/iotdb/db/tools/WalCheckerTest.java @@ -30,7 +30,7 @@ import java.nio.ByteBuffer; import org.apache.commons.io.FileUtils; import org.apache.iotdb.db.constant.TestConstant; import org.apache.iotdb.db.exception.SystemCheckException; -import org.apache.iotdb.db.qp.physical.crud.InsertPlan; +import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan; import org.apache.iotdb.db.writelog.io.LogWriter; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.junit.Test; @@ -80,7 +80,7 @@ public class WalCheckerTest { TSDataType[] types = new TSDataType[]{TSDataType.INT64, TSDataType.INT64, TSDataType.INT64}; String[] values = new String[]{"5", "6", "7"}; for (int j = 0; j < 10; j++) { - new InsertPlan(deviceId, j, measurements, values).serialize(binaryPlans); + new InsertRowPlan(deviceId, j, measurements, values).serialize(binaryPlans); } binaryPlans.flip(); logWriter.write(binaryPlans); @@ -114,7 +114,7 @@ public class WalCheckerTest { TSDataType[] types = new TSDataType[]{TSDataType.INT64, TSDataType.INT64, TSDataType.INT64}; String[] values = new String[]{"5", "6", "7"}; for (int j = 0; j < 10; j++) { - new InsertPlan(deviceId, j, measurements, types, values).serialize(binaryPlans); + new InsertRowPlan(deviceId, j, measurements, types, values).serialize(binaryPlans); } if (i > 2) { binaryPlans.put("not a wal".getBytes()); diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/PerformanceTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/PerformanceTest.java index 269f93c..dd40e42 100644 --- a/server/src/test/java/org/apache/iotdb/db/writelog/PerformanceTest.java +++ b/server/src/test/java/org/apache/iotdb/db/writelog/PerformanceTest.java @@ -23,11 +23,11 @@ import java.io.IOException; import java.util.Collections; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; -import org.apache.iotdb.db.exception.metadata.MetadataException; import org.apache.iotdb.db.exception.WriteProcessException; +import org.apache.iotdb.db.exception.metadata.MetadataException; import org.apache.iotdb.db.metadata.MManager; import org.apache.iotdb.db.qp.physical.crud.DeletePlan; -import org.apache.iotdb.db.qp.physical.crud.InsertPlan; +import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan; import org.apache.iotdb.db.qp.physical.crud.UpdatePlan; import org.apache.iotdb.db.utils.EnvironmentUtils; import org.apache.iotdb.db.writelog.node.ExclusiveWriteLogNode; @@ -86,7 +86,7 @@ public class PerformanceTest { long time = System.currentTimeMillis(); for (int i = 0; i < 1000000; i++) { - InsertPlan bwInsertPlan = new InsertPlan("logTestDevice", 100, + InsertRowPlan bwInsertPlan = new InsertRowPlan("logTestDevice", 100, new String[]{"s1", "s2", "s3", "s4"}, new TSDataType[]{TSDataType.DOUBLE, TSDataType.INT64, TSDataType.TEXT, TSDataType.BOOLEAN}, new String[]{"1.0", "15", "str", "false"}); @@ -147,7 +147,7 @@ public class PerformanceTest { WriteLogNode logNode = new ExclusiveWriteLogNode("root.logTestDevice"); for (int i = 0; i < 1000000; i++) { - InsertPlan bwInsertPlan = new InsertPlan("root.logTestDevice", 100, + InsertRowPlan bwInsertPlan = new InsertRowPlan("root.logTestDevice", 100, new String[]{"s1", "s2", "s3", "s4"}, new TSDataType[]{TSDataType.DOUBLE, TSDataType.INT64, TSDataType.TEXT, TSDataType.BOOLEAN}, new String[]{"1.0", "15", "str", "false"}); diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeManagerTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeManagerTest.java index 7125d88..68cabd3 100644 --- a/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeManagerTest.java +++ b/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeManagerTest.java @@ -28,7 +28,7 @@ import java.io.IOException; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.qp.physical.crud.DeletePlan; -import org.apache.iotdb.db.qp.physical.crud.InsertPlan; +import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan; import org.apache.iotdb.db.utils.EnvironmentUtils; import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager; import org.apache.iotdb.db.writelog.manager.WriteLogNodeManager; @@ -85,7 +85,7 @@ public class WriteLogNodeManagerTest { WriteLogNode logNode = manager .getNode("root.managerTest"); - InsertPlan bwInsertPlan = new InsertPlan("logTestDevice", 100, + InsertRowPlan bwInsertPlan = new InsertRowPlan("logTestDevice", 100, new String[]{"s1", "s2", "s3", "s4"}, new TSDataType[]{TSDataType.DOUBLE, TSDataType.INT64, TSDataType.TEXT, TSDataType.BOOLEAN}, new String[]{"1.0", "15", "str", "false"}); diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java index ba321bc..95f9ec8 100644 --- a/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java +++ b/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java @@ -26,11 +26,10 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.List; - import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.qp.physical.crud.DeletePlan; -import org.apache.iotdb.db.qp.physical.crud.InsertPlan; +import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan; import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan; import org.apache.iotdb.db.utils.EnvironmentUtils; import org.apache.iotdb.db.writelog.io.ILogReader; @@ -70,7 +69,7 @@ public class WriteLogNodeTest { WriteLogNode logNode = new ExclusiveWriteLogNode(identifier); - InsertPlan bwInsertPlan = new InsertPlan(identifier, 100, + InsertRowPlan bwInsertPlan = new InsertRowPlan(identifier, 100, new String[]{"s1", "s2", "s3", "s4"}, new TSDataType[]{TSDataType.DOUBLE, TSDataType.INT64, TSDataType.TEXT, TSDataType.BOOLEAN}, new String[]{"1.0", "15", "str", "false"}); @@ -133,7 +132,7 @@ public class WriteLogNodeTest { WriteLogNode logNode = new ExclusiveWriteLogNode(identifier); - InsertPlan bwInsertPlan = new InsertPlan(identifier, 100, + InsertRowPlan bwInsertPlan = new InsertRowPlan(identifier, 100, new String[]{"s1", "s2", "s3", "s4"}, new TSDataType[]{TSDataType.DOUBLE, TSDataType.INT64, TSDataType.TEXT, TSDataType.BOOLEAN}, new String[]{"1.0", "15", "str", "false"}); @@ -170,7 +169,7 @@ public class WriteLogNodeTest { WriteLogNode logNode = new ExclusiveWriteLogNode("root.logTestDevice"); - InsertPlan bwInsertPlan = new InsertPlan("root.logTestDevice", 100, + InsertRowPlan bwInsertPlan = new InsertRowPlan("root.logTestDevice", 100, new String[]{"s1", "s2", "s3", "s4"}, new TSDataType[]{TSDataType.DOUBLE, TSDataType.INT64, TSDataType.TEXT, TSDataType.BOOLEAN}, new String[]{"1.0", "15", "str", "false"}); @@ -196,7 +195,7 @@ public class WriteLogNodeTest { WriteLogNode logNode = new ExclusiveWriteLogNode("root.logTestDevice"); - InsertPlan bwInsertPlan = new InsertPlan("logTestDevice", 100, + InsertRowPlan bwInsertPlan = new InsertRowPlan("logTestDevice", 100, new String[]{"s1", "s2", "s3", "s4"}, new TSDataType[]{TSDataType.DOUBLE, TSDataType.INT64, TSDataType.TEXT, TSDataType.BOOLEAN}, new String[]{"1.0", "15", "str", "false"}); @@ -221,7 +220,7 @@ public class WriteLogNodeTest { // this test uses a dummy insert log node to insert an over-sized log and assert exception caught WriteLogNode logNode = new ExclusiveWriteLogNode("root.logTestDevice.oversize"); - InsertPlan bwInsertPlan = new InsertPlan("root.logTestDevice.oversize", 100, + InsertRowPlan bwInsertPlan = new InsertRowPlan("root.logTestDevice.oversize", 100, new String[]{"s1", "s2", "s3", "s4"}, new TSDataType[]{TSDataType.DOUBLE, TSDataType.INT64, TSDataType.TEXT, TSDataType.BOOLEAN}, new String[]{"1.0", "15", new String(new char[65 * 1024 * 1024]), "false"}); diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/io/LogWriterReaderTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/io/LogWriterReaderTest.java index 552f644..d995af6 100644 --- a/server/src/test/java/org/apache/iotdb/db/writelog/io/LogWriterReaderTest.java +++ b/server/src/test/java/org/apache/iotdb/db/writelog/io/LogWriterReaderTest.java @@ -28,7 +28,7 @@ import java.util.ArrayList; import java.util.List; import org.apache.iotdb.db.qp.physical.PhysicalPlan; import org.apache.iotdb.db.qp.physical.crud.DeletePlan; -import org.apache.iotdb.db.qp.physical.crud.InsertPlan; +import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.Path; import org.junit.Before; @@ -45,15 +45,15 @@ public class LogWriterReaderTest { if (new File(filePath).exists()) { new File(filePath).delete(); } - InsertPlan insertPlan1 = new InsertPlan("d1", 10L, new String[]{"s1", "s2"}, + InsertRowPlan insertRowPlan1 = new InsertRowPlan("d1", 10L, new String[]{"s1", "s2"}, new TSDataType[]{TSDataType.INT64, TSDataType.INT64}, new String[]{"1", "2"}); - InsertPlan insertPlan2 = new InsertPlan("d1", 10L, new String[]{"s1", "s2"}, + InsertRowPlan insertRowPlan2 = new InsertRowPlan("d1", 10L, new String[]{"s1", "s2"}, new TSDataType[]{TSDataType.INT64, TSDataType.INT64}, new String[]{"1", "2"}); DeletePlan deletePlan = new DeletePlan(10L, new Path("root.d1.s1")); - plans.add(insertPlan1); - plans.add(insertPlan2); + plans.add(insertRowPlan1); + plans.add(insertRowPlan2); plans.add(deletePlan); for (PhysicalPlan plan : plans) { plan.serialize(logsBuffer); diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java index ab52edb..745e06b 100644 --- a/server/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java +++ b/server/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java @@ -41,7 +41,7 @@ import org.apache.iotdb.db.exception.metadata.MetadataException; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.metadata.MManager; import org.apache.iotdb.db.qp.physical.crud.DeletePlan; -import org.apache.iotdb.db.qp.physical.crud.InsertPlan; +import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan; import org.apache.iotdb.db.utils.EnvironmentUtils; import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager; import org.apache.iotdb.db.writelog.node.WriteLogNode; @@ -105,11 +105,11 @@ public class LogReplayerTest { WriteLogNode node = MultiFileLogNodeManager.getInstance().getNode(logNodePrefix + tsFile.getName()); node.write( - new InsertPlan("root.sg.device0", 100, "sensor0", TSDataType.INT64, String.valueOf(0))); + new InsertRowPlan("root.sg.device0", 100, "sensor0", TSDataType.INT64, String.valueOf(0))); node.write( - new InsertPlan("root.sg.device0", 2, "sensor1", TSDataType.INT64, String.valueOf(0))); + new InsertRowPlan("root.sg.device0", 2, "sensor1", TSDataType.INT64, String.valueOf(0))); for (int i = 1; i < 5; i++) { - node.write(new InsertPlan("root.sg.device" + i, i, "sensor" + i, TSDataType.INT64, + node.write(new InsertRowPlan("root.sg.device" + i, i, "sensor" + i, TSDataType.INT64, String.valueOf(i))); } DeletePlan deletePlan = new DeletePlan(200, new Path("root.sg.device0", "sensor0")); diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/recover/RecoverResourceFromReaderTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/recover/RecoverResourceFromReaderTest.java index 00e3e4c..a7bc9f0 100644 --- a/server/src/test/java/org/apache/iotdb/db/writelog/recover/RecoverResourceFromReaderTest.java +++ b/server/src/test/java/org/apache/iotdb/db/writelog/recover/RecoverResourceFromReaderTest.java @@ -25,7 +25,6 @@ import java.io.File; import java.io.IOException; import java.io.OutputStream; import java.util.Collections; - import org.apache.commons.io.FileUtils; import org.apache.iotdb.db.conf.adapter.ActiveTimeSeriesCounter; import org.apache.iotdb.db.constant.TestConstant; @@ -36,7 +35,7 @@ import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.exception.StorageGroupProcessorException; import org.apache.iotdb.db.exception.metadata.MetadataException; import org.apache.iotdb.db.metadata.MManager; -import org.apache.iotdb.db.qp.physical.crud.InsertPlan; +import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan; import org.apache.iotdb.db.utils.EnvironmentUtils; import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager; import org.apache.iotdb.db.writelog.node.WriteLogNode; @@ -145,18 +144,18 @@ public class RecoverResourceFromReaderTest { types[k] = TSDataType.INT64; values[k] = String.valueOf(k + 10); } - InsertPlan insertPlan = new InsertPlan("root.sg.device" + j, i, measurements, + InsertRowPlan insertRowPlan = new InsertRowPlan("root.sg.device" + j, i, measurements, types, values); - node.write(insertPlan); + node.write(insertRowPlan); } node.notifyStartFlush(); } - InsertPlan insertPlan = new InsertPlan("root.sg.device99", 1, new String[]{"sensor4"}, + InsertRowPlan insertRowPlan = new InsertRowPlan("root.sg.device99", 1, new String[]{"sensor4"}, new TSDataType[]{TSDataType.INT64}, new String[]{"4"}); - node.write(insertPlan); - insertPlan = new InsertPlan("root.sg.device99", 300, new String[]{"sensor2"}, + node.write(insertRowPlan); + insertRowPlan = new InsertRowPlan("root.sg.device99", 300, new String[]{"sensor2"}, new TSDataType[]{TSDataType.INT64}, new String[]{"2"}); - node.write(insertPlan); + node.write(insertRowPlan); node.close(); resource = new TsFileResource(tsF); diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java index c3b3b06..ecd49a8 100644 --- a/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java +++ b/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java @@ -37,10 +37,10 @@ import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.engine.version.VersionController; import org.apache.iotdb.db.exception.StorageEngineException; +import org.apache.iotdb.db.exception.StorageGroupProcessorException; import org.apache.iotdb.db.exception.metadata.MetadataException; import org.apache.iotdb.db.metadata.MManager; -import org.apache.iotdb.db.exception.StorageGroupProcessorException; -import org.apache.iotdb.db.qp.physical.crud.InsertPlan; +import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan; import org.apache.iotdb.db.utils.EnvironmentUtils; import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager; import org.apache.iotdb.db.writelog.node.WriteLogNode; @@ -58,8 +58,8 @@ import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; import org.apache.iotdb.tsfile.write.TsFileWriter; import org.apache.iotdb.tsfile.write.record.TSRecord; import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint; -import org.apache.iotdb.tsfile.write.schema.Schema; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; +import org.apache.iotdb.tsfile.write.schema.Schema; import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter; import org.junit.After; import org.junit.Assert; @@ -149,7 +149,7 @@ public class SeqTsFileRecoverTest { types[k] = TSDataType.INT64; values[k] = String.valueOf(k); } - InsertPlan insertPlan = new InsertPlan("root.sg.device" + j, i, measurements, types, + InsertRowPlan insertPlan = new InsertRowPlan("root.sg.device" + j, i, measurements, types, values); node.write(insertPlan); } diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java index 8b9c697..cd63f94 100644 --- a/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java +++ b/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java @@ -34,7 +34,7 @@ import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.exception.StorageGroupProcessorException; import org.apache.iotdb.db.exception.metadata.MetadataException; import org.apache.iotdb.db.metadata.MManager; -import org.apache.iotdb.db.qp.physical.crud.InsertPlan; +import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan; import org.apache.iotdb.db.query.reader.chunk.ChunkDataIterator; import org.apache.iotdb.db.query.reader.universal.PriorityMergeReader; import org.apache.iotdb.db.utils.EnvironmentUtils; @@ -152,16 +152,16 @@ public class UnseqTsFileRecoverTest { types[k] = TSDataType.INT64; values[k] = String.valueOf(k + 10); } - InsertPlan insertPlan = new InsertPlan("root.sg.device" + j, i, measurements, types, + InsertRowPlan insertRowPlan = new InsertRowPlan("root.sg.device" + j, i, measurements, types, values); - node.write(insertPlan); + node.write(insertRowPlan); } node.notifyStartFlush(); } - InsertPlan insertPlan = new InsertPlan("root.sg.device99", 1, "sensor4", TSDataType.INT64, "4"); - node.write(insertPlan); - insertPlan = new InsertPlan("root.sg.device99", 300, "sensor2", TSDataType.INT64, "2"); - node.write(insertPlan); + InsertRowPlan insertRowPlan = new InsertRowPlan("root.sg.device99", 1, "sensor4", TSDataType.INT64, "4"); + node.write(insertRowPlan); + insertRowPlan = new InsertRowPlan("root.sg.device99", 300, "sensor2", TSDataType.INT64, "2"); + node.write(insertRowPlan); node.close(); resource = new TsFileResource(tsF);