DanielWang2035 commented on code in PR #14751:
URL: https://github.com/apache/iotdb/pull/14751#discussion_r1952318152


##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java:
##########
@@ -175,10 +252,154 @@ protected boolean doAnalyzeFileByFile(IAnalysis 
analysis) {
     return true;
   }
 
-  protected abstract void analyzeSingleTsFile(final File tsFile)
-      throws IOException, AuthException, LoadAnalyzeException;
+  private void analyzeSingleTsFile(final File tsFile, int i)
+      throws IOException, AuthException, LoadAnalyzeException {
+    try (final TsFileSequenceReader reader = new 
TsFileSequenceReader(tsFile.getAbsolutePath())) {
+      // can be reused when constructing tsfile resource
+      final TsFileSequenceReaderTimeseriesMetadataIterator 
timeseriesMetadataIterator =
+          new TsFileSequenceReaderTimeseriesMetadataIterator(reader, true, 1);
+
+      // check if the tsfile is empty
+      if (!timeseriesMetadataIterator.hasNext()) {
+        throw new LoadEmptyFileException(tsFile.getAbsolutePath());
+      }
+
+      // check whether the encrypt type of the tsfile is supported
+      final EncryptParameter param = reader.getEncryptParam();
+      if (!Objects.equals(param.getType(), EncryptUtils.encryptParam.getType())
+          || !Arrays.equals(param.getKey(), 
EncryptUtils.encryptParam.getKey())) {
+        throw new SemanticException("The encryption way of the TsFile is not 
supported.");
+      }
+
+      // check whether the tsfile is tree-model or not
+      final Map<String, TableSchema> tableSchemaMap = 
reader.getTableSchemaMap();
+      final boolean isTableModelFile = Objects.nonNull(tableSchemaMap) && 
!tableSchemaMap.isEmpty();
+      LOGGER.info(
+          "TsFile {} is a {}-model file.", tsFile.getPath(), isTableModelFile 
? "table" : "tree");
+      this.isTableModel.set(i, isTableModelFile);
+
+      // construct tsfile resource
+      final TsFileResource tsFileResource = constructTsFileResource(reader, 
tsFile);
+
+      long writePointCount = 0;
+
+      if (!isTableModelFile) {
+        // Tree-model file
+        
getOrCreateTreeSchemaVerifier().setCurrentModificationsAndTimeIndex(tsFileResource,
 reader);
+
+        final boolean isAutoCreateSchemaOrVerifySchemaEnabled =
+            
IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled()
+                || isVerifySchema();
+        while (timeseriesMetadataIterator.hasNext()) {
+          final Map<IDeviceID, List<TimeseriesMetadata>> 
device2TimeseriesMetadata =
+              timeseriesMetadataIterator.next();
+
+          if (isAutoCreateSchemaOrVerifySchemaEnabled) {
+            getOrCreateTreeSchemaVerifier().autoCreateAndVerify(reader, 
device2TimeseriesMetadata);
+          }
+
+          if (!tsFileResource.resourceFileExists()) {
+            
TsFileResourceUtils.updateTsFileResource(device2TimeseriesMetadata, 
tsFileResource);
+          }
+
+          // TODO: how to get the correct write point count when
+          //  !isAutoCreateSchemaOrVerifySchemaEnabled
+          writePointCount += getWritePointCount(device2TimeseriesMetadata);
+        }
+        if (isAutoCreateSchemaOrVerifySchemaEnabled) {
+          
getOrCreateTreeSchemaVerifier().flushAndClearDeviceIsAlignedCacheIfNecessary();
+        }
+      } else {
+        // Table-model file
+
+        if (Objects.isNull(databaseForTableData)) {
+          // If database is not specified, use the database from current 
session.
+          // If still not specified, throw an exception.
+          final Optional<String> dbName = context.getDatabaseName();
+          if (dbName.isPresent()) {
+            databaseForTableData = dbName.get();
+            if (isTableModelStatement) {
+              loadTsFileTableStatement.setDatabase(dbName.get());
+            } else {
+              loadTsFileTreeStatement.setDatabase(dbName.get());
+            }
+          } else {
+            throw new SemanticException(DATABASE_NOT_SPECIFIED);
+          }
+        }
+
+        autoCreateTableDatabaseIfAbsent(databaseForTableData);
+
+        getOrCreateTableSchemaCache().setDatabase(databaseForTableData);
+        
getOrCreateTableSchemaCache().setCurrentModificationsAndTimeIndex(tsFileResource,
 reader);
+
+        for (Map.Entry<String, org.apache.tsfile.file.metadata.TableSchema> 
name2Schema :
+            tableSchemaMap.entrySet()) {
+          final 
org.apache.iotdb.db.queryengine.plan.relational.metadata.TableSchema fileSchema 
=
+              
org.apache.iotdb.db.queryengine.plan.relational.metadata.TableSchema
+                  .fromTsFileTableSchema(name2Schema.getKey(), 
name2Schema.getValue());
+          getOrCreateTableSchemaCache().createTable(fileSchema, context, 
metadata);
+          accessControl.checkCanInsertIntoTable(
+              context.getSession().getUserName(),
+              new QualifiedObjectName(databaseForTableData, 
name2Schema.getKey()));
+        }
+
+        while (timeseriesMetadataIterator.hasNext()) {
+          final Map<IDeviceID, List<TimeseriesMetadata>> 
device2TimeseriesMetadata =
+              timeseriesMetadataIterator.next();
+
+          for (IDeviceID deviceId : device2TimeseriesMetadata.keySet()) {
+            getOrCreateTableSchemaCache().autoCreateAndVerify(deviceId);
+          }
+
+          if (!tsFileResource.resourceFileExists()) {
+            
TsFileResourceUtils.updateTsFileResource(device2TimeseriesMetadata, 
tsFileResource);
+          }
+
+          writePointCount += getWritePointCount(device2TimeseriesMetadata);
+        }
+
+        getOrCreateTableSchemaCache().flush();
+        getOrCreateTableSchemaCache().clearIdColumnMapper();
+      }
+
+      
TimestampPrecisionUtils.checkTimestampPrecision(tsFileResource.getFileEndTime());
+      tsFileResource.setStatus(TsFileResourceStatus.NORMAL);
+
+      addTsFileResource(tsFileResource);
+      addWritePointCount(writePointCount);
+    } catch (final LoadEmptyFileException loadEmptyFileException) {
+      LOGGER.warn("Failed to load empty file: {}", tsFile.getAbsolutePath());
+      if (isDeleteAfterLoad) {
+        FileUtils.deleteQuietly(tsFile);
+      }
+    }
+  }
+
+  private void autoCreateTableDatabaseIfAbsent(final String database) throws 
LoadAnalyzeException {
+    validateDatabaseName(database);
+    if (DataNodeTableCache.getInstance().isDatabaseExist(database)) {
+      return;
+    }
 
-  protected void executeTabletConversion(final IAnalysis analysis, final 
LoadAnalyzeException e) {
+    final CreateDBTask task =
+        new CreateDBTask(new TDatabaseSchema(database).setIsTableModel(true), 
true);
+    try {
+      final ListenableFuture<ConfigTaskResult> future =
+          task.execute(ClusterConfigTaskExecutor.getInstance());
+      final ConfigTaskResult result = future.get();
+      if (result.getStatusCode().getStatusCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        throw new LoadAnalyzeException(
+            String.format(
+                "Auto create database failed: %s, status code: %s",
+                database, result.getStatusCode()));
+      }
+    } catch (final ExecutionException | InterruptedException e) {

Review Comment:
   final Exception



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to