jt2594838 commented on a change in pull request #1387:
URL: https://github.com/apache/incubator-iotdb/pull/1387#discussion_r444792728
##########
File path:
cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
##########
@@ -1684,6 +1743,77 @@ TSStatus forwardPlan(List<PartitionGroup>
partitionGroups, PhysicalPlan plan) {
return status;
}
+ /**
+ * Create timeseries automatically
+ *
+ * @param insertPlan, some of the timeseries in it are not created yet
+ * @return true of all uncreated timeseries are created
+ */
+ boolean autoCreateTimeseries(InsertPlan insertPlan) {
+ List<String> seriesList = new ArrayList<>();
+ String deviceId = insertPlan.getDeviceId();
+ String storageGroupName;
+ try {
+ storageGroupName = MetaUtils
+ .getStorageGroupNameByLevel(deviceId, IoTDBDescriptor.getInstance()
+ .getConfig().getDefaultStorageGroupLevel());
+ } catch (MetadataException e) {
+ logger.error("Failed to infer storage group from deviceId {}", deviceId);
+ return false;
+ }
+ for (String measurementId : insertPlan.getMeasurements()) {
+ seriesList.add(
+ new StringContainer(new String[]{deviceId, measurementId},
TsFileConstant.PATH_SEPARATOR)
+ .toString());
+ }
+ PartitionGroup partitionGroup = partitionTable.route(storageGroupName, 0);
+ List<String> unregisteredSeriesList =
getUnregisteredSeriesList(seriesList, partitionGroup);
+ for (String seriesPath : unregisteredSeriesList) {
+ int index = seriesList.indexOf(seriesPath);
+ TSDataType dataType = TypeInferenceUtils
+ .getPredictedDataType(insertPlan.getValues()[index], true);
+ TSEncoding encoding = getDefaultEncoding(dataType);
+ CompressionType compressionType =
TSFileDescriptor.getInstance().getConfig().getCompressor();
+ CreateTimeSeriesPlan createTimeSeriesPlan = new CreateTimeSeriesPlan(new
Path(seriesPath),
+ dataType, encoding, compressionType, null, null, null, null);
+ // TODO-Cluster: add executeNonQueryBatch()
+ TSStatus result = executeNonQuery(createTimeSeriesPlan);
+ if (result.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ logger.error("{} failed to execute create timeseries {}", thisNode,
seriesPath);
+ return false;
+ }
+ }
+ return true;
+ }
+
+
+ /**
+ * To check which timeseries in the input list is unregistered
+ *
+ * @param seriesList
+ * @param partitionGroup
+ * @return
+ */
+ List<String> getUnregisteredSeriesList(List<String> seriesList,
PartitionGroup partitionGroup) {
+ Set<String> unregistered = new HashSet<>();
+ for (Node node : partitionGroup) {
+ try {
+ DataClient client = getDataClient(node);
+ List<String> result = SyncClientAdaptor
+ .getUnregisteredMeasurements(client, partitionGroup.getHeader(),
seriesList);
+ unregistered.addAll(result);
+ } catch (TException | IOException e) {
+ logger.error("{}: cannot getting unregistered {} and other {} paths
from {}", name,
+ seriesList.get(0), seriesList.get(seriesList.size() - 1), node, e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ logger.error("{}: getting unregistered series list {} is interrupted
from {}", name,
+ Arrays.toString(seriesList.toArray(new String[0])), node, e);
Review comment:
Please refactor this message like the former one to avoid printing too
many seriesPaths.
##########
File path:
cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
##########
@@ -1530,9 +1544,39 @@ private TSStatus processPartitionedPlan(PhysicalPlan
plan) throws UnsupportedPla
} catch (MetadataException e) {
logger.error("Cannot route plan {}", plan, e);
}
- // the storage group is not found locally, forward it to the leader
+ // the storage group is not found locally
if (planGroupMap == null || planGroupMap.isEmpty()) {
- logger.debug("{}: Cannot found storage groups for {}", name, plan);
+ if (plan instanceof InsertPlan &&
ClusterDescriptor.getInstance().getConfig()
+ .isEnableAutoCreateSchema()) {
+ // try to set storage group
+ String deviceId = ((InsertPlan) plan).getDeviceId();
+ try {
+ String storageGroupName = MetaUtils
+ .getStorageGroupNameByLevel(deviceId,
IoTDBDescriptor.getInstance()
+ .getConfig().getDefaultStorageGroupLevel());
+ SetStorageGroupPlan setStorageGroupPlan = new SetStorageGroupPlan(
+ new Path(storageGroupName));
+ TSStatus setStorageGroupResult =
processNonPartitionedMetaPlan(setStorageGroupPlan);
+ if (setStorageGroupResult.getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode() &&
+ setStorageGroupResult.getCode() !=
TSStatusCode.PATH_ALREADY_EXIST_ERROR.getStatusCode()) {
+ throw new MetadataException(
+ String.format("Status Code: %d, failed to set storage group ",
+ setStorageGroupResult.getCode(), storageGroupName)
+ );
+ }
+ // try to create timeseries
+ boolean isAutoCreateTimeseriesSuccess =
autoCreateTimeseries((InsertPlan)plan);
+ if(!isAutoCreateTimeseriesSuccess){
+ throw new MetadataException(
+ String.format("Failed to create timeseries from InsertPlan
automatically.")
+ );
+ }
+ return executeNonQuery(plan);
+ } catch (MetadataException e) {
+ logger.error("Failed to set storage group or create timeseries,
because {}", e.getMessage());
Review comment:
It would be better to use `e` (preserve stack traces).
##########
File path:
cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
##########
@@ -1684,6 +1743,77 @@ TSStatus forwardPlan(List<PartitionGroup>
partitionGroups, PhysicalPlan plan) {
return status;
}
+ /**
+ * Create timeseries automatically
+ *
+ * @param insertPlan, some of the timeseries in it are not created yet
+ * @return true of all uncreated timeseries are created
+ */
+ boolean autoCreateTimeseries(InsertPlan insertPlan) {
+ List<String> seriesList = new ArrayList<>();
+ String deviceId = insertPlan.getDeviceId();
+ String storageGroupName;
+ try {
+ storageGroupName = MetaUtils
+ .getStorageGroupNameByLevel(deviceId, IoTDBDescriptor.getInstance()
+ .getConfig().getDefaultStorageGroupLevel());
+ } catch (MetadataException e) {
+ logger.error("Failed to infer storage group from deviceId {}", deviceId);
+ return false;
+ }
+ for (String measurementId : insertPlan.getMeasurements()) {
+ seriesList.add(
+ new StringContainer(new String[]{deviceId, measurementId},
TsFileConstant.PATH_SEPARATOR)
+ .toString());
+ }
+ PartitionGroup partitionGroup = partitionTable.route(storageGroupName, 0);
+ List<String> unregisteredSeriesList =
getUnregisteredSeriesList(seriesList, partitionGroup);
+ for (String seriesPath : unregisteredSeriesList) {
+ int index = seriesList.indexOf(seriesPath);
+ TSDataType dataType = TypeInferenceUtils
+ .getPredictedDataType(insertPlan.getValues()[index], true);
+ TSEncoding encoding = getDefaultEncoding(dataType);
+ CompressionType compressionType =
TSFileDescriptor.getInstance().getConfig().getCompressor();
+ CreateTimeSeriesPlan createTimeSeriesPlan = new CreateTimeSeriesPlan(new
Path(seriesPath),
+ dataType, encoding, compressionType, null, null, null, null);
+ // TODO-Cluster: add executeNonQueryBatch()
+ TSStatus result = executeNonQuery(createTimeSeriesPlan);
+ if (result.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ logger.error("{} failed to execute create timeseries {}", thisNode,
seriesPath);
+ return false;
+ }
+ }
+ return true;
+ }
+
+
+ /**
+ * To check which timeseries in the input list is unregistered
+ *
+ * @param seriesList
+ * @param partitionGroup
+ * @return
+ */
+ List<String> getUnregisteredSeriesList(List<String> seriesList,
PartitionGroup partitionGroup) {
+ Set<String> unregistered = new HashSet<>();
+ for (Node node : partitionGroup) {
+ try {
+ DataClient client = getDataClient(node);
+ List<String> result = SyncClientAdaptor
+ .getUnregisteredMeasurements(client, partitionGroup.getHeader(),
seriesList);
+ unregistered.addAll(result);
Review comment:
I think you should break once a non-null result is returned, and I do
not think using a Set is necessary.
##########
File path:
server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
##########
@@ -914,8 +914,26 @@ public void insert(InsertPlan insertPlan) throws
QueryProcessException {
insertPlan.setSchemasAndTransferType(schemas);
StorageEngine.getInstance().insert(insertPlan);
if (insertPlan.getFailedMeasurements() != null) {
- throw new StorageEngineException(
- "failed to insert points " + insertPlan.getFailedMeasurements());
+ // check if all path not exist exceptions
+ List<String> failedPaths = new
ArrayList<>(insertPlan.getFailedMeasurements().keySet());
+ List<Exception> exceptions = new
ArrayList<>(insertPlan.getFailedMeasurements().values());
+ boolean isPathNotExistException = true;
+ for(Exception e : exceptions){
+ Exception curException = e;
+ while(curException.getCause() != null){
+ curException = (Exception) curException.getCause();
Review comment:
You may just define this as a Throwable so you may avoid the typecast.
##########
File path:
server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
##########
@@ -914,8 +914,26 @@ public void insert(InsertPlan insertPlan) throws
QueryProcessException {
insertPlan.setSchemasAndTransferType(schemas);
StorageEngine.getInstance().insert(insertPlan);
if (insertPlan.getFailedMeasurements() != null) {
- throw new StorageEngineException(
- "failed to insert points " + insertPlan.getFailedMeasurements());
+ // check if all path not exist exceptions
+ List<String> failedPaths = new
ArrayList<>(insertPlan.getFailedMeasurements().keySet());
+ List<Exception> exceptions = new
ArrayList<>(insertPlan.getFailedMeasurements().values());
+ boolean isPathNotExistException = true;
+ for(Exception e : exceptions){
+ Exception curException = e;
+ while(curException.getCause() != null){
+ curException = (Exception) curException.getCause();
+ }
+ if(!(curException instanceof PathNotExistException)){
+ isPathNotExistException = false;
+ break;
+ }
+ }
+ if(isPathNotExistException){
+ throw new PathNotExistException(failedPaths.toString());
Review comment:
I suggest you just add a field of List<String> in PathNotExistException,
since concatenating a long string list is time-consuming.
Besides, please perform a reformat.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]