Ring-k commented on a change in pull request #1387:
URL: https://github.com/apache/incubator-iotdb/pull/1387#discussion_r444929476
##########
File path:
cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
##########
@@ -1684,6 +1743,77 @@ TSStatus forwardPlan(List<PartitionGroup>
partitionGroups, PhysicalPlan plan) {
return status;
}
+ /**
+ * Create timeseries automatically
+ *
+ * @param insertPlan, some of the timeseries in it are not created yet
+ * @return true of all uncreated timeseries are created
+ */
+ boolean autoCreateTimeseries(InsertPlan insertPlan) {
+ List<String> seriesList = new ArrayList<>();
+ String deviceId = insertPlan.getDeviceId();
+ String storageGroupName;
+ try {
+ storageGroupName = MetaUtils
+ .getStorageGroupNameByLevel(deviceId, IoTDBDescriptor.getInstance()
+ .getConfig().getDefaultStorageGroupLevel());
+ } catch (MetadataException e) {
+ logger.error("Failed to infer storage group from deviceId {}", deviceId);
+ return false;
+ }
+ for (String measurementId : insertPlan.getMeasurements()) {
+ seriesList.add(
+ new StringContainer(new String[]{deviceId, measurementId},
TsFileConstant.PATH_SEPARATOR)
+ .toString());
+ }
+ PartitionGroup partitionGroup = partitionTable.route(storageGroupName, 0);
+ List<String> unregisteredSeriesList =
getUnregisteredSeriesList(seriesList, partitionGroup);
+ for (String seriesPath : unregisteredSeriesList) {
+ int index = seriesList.indexOf(seriesPath);
+ TSDataType dataType = TypeInferenceUtils
+ .getPredictedDataType(insertPlan.getValues()[index], true);
+ TSEncoding encoding = getDefaultEncoding(dataType);
+ CompressionType compressionType =
TSFileDescriptor.getInstance().getConfig().getCompressor();
+ CreateTimeSeriesPlan createTimeSeriesPlan = new CreateTimeSeriesPlan(new
Path(seriesPath),
+ dataType, encoding, compressionType, null, null, null, null);
+ // TODO-Cluster: add executeNonQueryBatch()
+ TSStatus result = executeNonQuery(createTimeSeriesPlan);
+ if (result.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ logger.error("{} failed to execute create timeseries {}", thisNode,
seriesPath);
+ return false;
+ }
+ }
+ return true;
+ }
+
+
+ /**
+ * To check which timeseries in the input list is unregistered
+ *
+ * @param seriesList
+ * @param partitionGroup
+ * @return
+ */
+ List<String> getUnregisteredSeriesList(List<String> seriesList,
PartitionGroup partitionGroup) {
+ Set<String> unregistered = new HashSet<>();
+ for (Node node : partitionGroup) {
+ try {
+ DataClient client = getDataClient(node);
+ List<String> result = SyncClientAdaptor
+ .getUnregisteredMeasurements(client, partitionGroup.getHeader(),
seriesList);
+ unregistered.addAll(result);
+ } catch (TException | IOException e) {
+ logger.error("{}: cannot getting unregistered {} and other {} paths
from {}", name,
+ seriesList.get(0), seriesList.get(seriesList.size() - 1), node, e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ logger.error("{}: getting unregistered series list {} is interrupted
from {}", name,
+ Arrays.toString(seriesList.toArray(new String[0])), node, e);
Review comment:
Thanks for your reminding.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]