fanhualta commented on a change in pull request #3191:
URL: https://github.com/apache/iotdb/pull/3191#discussion_r645568590



##########
File path: 
cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
##########
@@ -1411,6 +1435,291 @@ public TSStatus 
processNonPartitionedMetaPlan(PhysicalPlan plan) {
     return result;
   }
 
+  /**
+   * Forward plans to the DataGroupMember of one node in the corresponding 
group. Only when all
+   * nodes time out, will a TIME_OUT be returned.
+   *
+   * @param planGroupMap sub-plan -> belong data group pairs
+   */
+  public TSStatus forwardPlan(Map<PhysicalPlan, PartitionGroup> planGroupMap, 
PhysicalPlan plan) {
+    // the error codes from the groups that cannot execute the plan
+    TSStatus status;
+    if (planGroupMap.size() == 1) {
+      status = forwardToSingleGroup(planGroupMap.entrySet().iterator().next());
+    } else {
+      if (plan instanceof InsertTabletPlan || plan instanceof 
CreateMultiTimeSeriesPlan) {
+        // InsertTabletPlan and CreateMultiTimeSeriesPlan contains many rows, 
each will correspond
+        // to a TSStatus as its
+        // execution result, as the plan is split and the sub-plans may have 
interleaving ranges,
+        // we must assure that each TSStatus is placed to the right position
+        // e.g., an InsertTabletPlan contains 3 rows, row1 and row3 belong to 
NodeA and row2
+        // belongs to NodeB, when NodeA returns a success while NodeB returns 
a failure, the
+        // failure and success should be placed into proper positions in 
TSStatus.subStatus
+        status = forwardMultiSubPlan(planGroupMap, plan);
+      } else {
+        status = forwardToMultipleGroup(planGroupMap);
+      }
+    }
+    if (plan instanceof InsertPlan
+        && status.getCode() == 
TSStatusCode.TIMESERIES_NOT_EXIST.getStatusCode()
+        && config.isEnableAutoCreateSchema()) {
+      TSStatus tmpStatus = createTimeseriesForFailedInsertion(planGroupMap, 
((InsertPlan) plan));
+      if (tmpStatus != null) {
+        status = tmpStatus;
+      }
+    }
+    logger.debug("{}: executed {} with answer {}", name, plan, status);
+    return status;
+  }
+
+  private TSStatus createTimeseriesForFailedInsertion(

Review comment:
       In the master branch, these functions are moved to the `Coordinator`.  
When merge master, these useless functions are left behind. I have removed 
these useless functions.

##########
File path: 
cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
##########
@@ -1411,6 +1435,291 @@ public TSStatus 
processNonPartitionedMetaPlan(PhysicalPlan plan) {
     return result;
   }
 
+  /**
+   * Forward plans to the DataGroupMember of one node in the corresponding 
group. Only when all
+   * nodes time out, will a TIME_OUT be returned.
+   *
+   * @param planGroupMap sub-plan -> belong data group pairs
+   */
+  public TSStatus forwardPlan(Map<PhysicalPlan, PartitionGroup> planGroupMap, 
PhysicalPlan plan) {

Review comment:
       Same as above.

##########
File path: 
cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
##########
@@ -1411,6 +1435,291 @@ public TSStatus 
processNonPartitionedMetaPlan(PhysicalPlan plan) {
     return result;
   }
 
+  /**
+   * Forward plans to the DataGroupMember of one node in the corresponding 
group. Only when all
+   * nodes time out, will a TIME_OUT be returned.
+   *
+   * @param planGroupMap sub-plan -> belong data group pairs
+   */
+  public TSStatus forwardPlan(Map<PhysicalPlan, PartitionGroup> planGroupMap, 
PhysicalPlan plan) {
+    // the error codes from the groups that cannot execute the plan
+    TSStatus status;
+    if (planGroupMap.size() == 1) {
+      status = forwardToSingleGroup(planGroupMap.entrySet().iterator().next());
+    } else {
+      if (plan instanceof InsertTabletPlan || plan instanceof 
CreateMultiTimeSeriesPlan) {
+        // InsertTabletPlan and CreateMultiTimeSeriesPlan contains many rows, 
each will correspond
+        // to a TSStatus as its
+        // execution result, as the plan is split and the sub-plans may have 
interleaving ranges,
+        // we must assure that each TSStatus is placed to the right position
+        // e.g., an InsertTabletPlan contains 3 rows, row1 and row3 belong to 
NodeA and row2
+        // belongs to NodeB, when NodeA returns a success while NodeB returns 
a failure, the
+        // failure and success should be placed into proper positions in 
TSStatus.subStatus
+        status = forwardMultiSubPlan(planGroupMap, plan);
+      } else {
+        status = forwardToMultipleGroup(planGroupMap);
+      }
+    }
+    if (plan instanceof InsertPlan
+        && status.getCode() == 
TSStatusCode.TIMESERIES_NOT_EXIST.getStatusCode()
+        && config.isEnableAutoCreateSchema()) {
+      TSStatus tmpStatus = createTimeseriesForFailedInsertion(planGroupMap, 
((InsertPlan) plan));
+      if (tmpStatus != null) {
+        status = tmpStatus;
+      }
+    }
+    logger.debug("{}: executed {} with answer {}", name, plan, status);
+    return status;
+  }
+
+  private TSStatus createTimeseriesForFailedInsertion(
+      Map<PhysicalPlan, PartitionGroup> planGroupMap, InsertPlan plan) {
+    // try to create timeseries
+    if (plan.getFailedMeasurements() != null) {
+      plan.getPlanFromFailed();
+    }
+    boolean hasCreate;
+    try {
+      hasCreate = ((CMManager) IoTDB.metaManager).createTimeseries(plan);
+    } catch (IllegalPathException | CheckConsistencyException e) {
+      return StatusUtils.getStatus(StatusUtils.EXECUTE_STATEMENT_ERROR, 
e.getMessage());
+    }
+    if (hasCreate) {
+      return forwardPlan(planGroupMap, plan);
+    } else {
+      logger.error("{}, Cannot auto create timeseries.", thisNode);
+    }
+    return null;
+  }
+
+  /**
+   * Forward each sub-plan to its belonging data group, and combine responses 
from the groups.
+   *
+   * @param planGroupMap sub-plan -> data group pairs
+   */
+  @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity 
warning
+  private TSStatus forwardMultiSubPlan(

Review comment:
       Same as above.

##########
File path: 
cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
##########
@@ -1411,6 +1435,291 @@ public TSStatus 
processNonPartitionedMetaPlan(PhysicalPlan plan) {
     return result;
   }
 
+  /**
+   * Forward plans to the DataGroupMember of one node in the corresponding 
group. Only when all
+   * nodes time out, will a TIME_OUT be returned.
+   *
+   * @param planGroupMap sub-plan -> belong data group pairs
+   */
+  public TSStatus forwardPlan(Map<PhysicalPlan, PartitionGroup> planGroupMap, 
PhysicalPlan plan) {
+    // the error codes from the groups that cannot execute the plan
+    TSStatus status;
+    if (planGroupMap.size() == 1) {
+      status = forwardToSingleGroup(planGroupMap.entrySet().iterator().next());
+    } else {
+      if (plan instanceof InsertTabletPlan || plan instanceof 
CreateMultiTimeSeriesPlan) {
+        // InsertTabletPlan and CreateMultiTimeSeriesPlan contains many rows, 
each will correspond
+        // to a TSStatus as its
+        // execution result, as the plan is split and the sub-plans may have 
interleaving ranges,
+        // we must assure that each TSStatus is placed to the right position
+        // e.g., an InsertTabletPlan contains 3 rows, row1 and row3 belong to 
NodeA and row2
+        // belongs to NodeB, when NodeA returns a success while NodeB returns 
a failure, the
+        // failure and success should be placed into proper positions in 
TSStatus.subStatus
+        status = forwardMultiSubPlan(planGroupMap, plan);
+      } else {
+        status = forwardToMultipleGroup(planGroupMap);
+      }
+    }
+    if (plan instanceof InsertPlan
+        && status.getCode() == 
TSStatusCode.TIMESERIES_NOT_EXIST.getStatusCode()
+        && config.isEnableAutoCreateSchema()) {
+      TSStatus tmpStatus = createTimeseriesForFailedInsertion(planGroupMap, 
((InsertPlan) plan));
+      if (tmpStatus != null) {
+        status = tmpStatus;
+      }
+    }
+    logger.debug("{}: executed {} with answer {}", name, plan, status);
+    return status;
+  }
+
+  private TSStatus createTimeseriesForFailedInsertion(
+      Map<PhysicalPlan, PartitionGroup> planGroupMap, InsertPlan plan) {
+    // try to create timeseries
+    if (plan.getFailedMeasurements() != null) {
+      plan.getPlanFromFailed();
+    }
+    boolean hasCreate;
+    try {
+      hasCreate = ((CMManager) IoTDB.metaManager).createTimeseries(plan);
+    } catch (IllegalPathException | CheckConsistencyException e) {
+      return StatusUtils.getStatus(StatusUtils.EXECUTE_STATEMENT_ERROR, 
e.getMessage());
+    }
+    if (hasCreate) {
+      return forwardPlan(planGroupMap, plan);
+    } else {
+      logger.error("{}, Cannot auto create timeseries.", thisNode);
+    }
+    return null;
+  }
+
+  /**
+   * Forward each sub-plan to its belonging data group, and combine responses 
from the groups.
+   *
+   * @param planGroupMap sub-plan -> data group pairs
+   */
+  @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity 
warning
+  private TSStatus forwardMultiSubPlan(
+      Map<PhysicalPlan, PartitionGroup> planGroupMap, PhysicalPlan parentPlan) 
{
+    List<String> errorCodePartitionGroups = new ArrayList<>();
+    TSStatus tmpStatus;
+    TSStatus[] subStatus = null;
+    boolean noFailure = true;
+    boolean isBatchFailure = false;
+    EndPoint endPoint = null;
+    int totalRowNum = 0;
+    // send sub-plans to each belonging data group and collect results
+    for (Map.Entry<PhysicalPlan, PartitionGroup> entry : 
planGroupMap.entrySet()) {
+      tmpStatus = forwardToSingleGroup(entry);
+      logger.debug("{}: from {},{},{}", name, entry.getKey(), 
entry.getValue(), tmpStatus);
+      noFailure = (tmpStatus.getCode() == 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) && noFailure;
+      isBatchFailure =
+          (tmpStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) 
|| isBatchFailure;
+      if (tmpStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
+        if (parentPlan instanceof InsertTabletPlan) {
+          totalRowNum = ((InsertTabletPlan) parentPlan).getRowCount();
+        } else if (parentPlan instanceof CreateMultiTimeSeriesPlan) {
+          totalRowNum = ((CreateMultiTimeSeriesPlan) 
parentPlan).getIndexes().size();
+        }
+        if (subStatus == null) {
+          subStatus = new TSStatus[totalRowNum];
+          Arrays.fill(subStatus, RpcUtils.SUCCESS_STATUS);
+        }
+        // set the status from one group to the proper positions of the 
overall status
+        if (parentPlan instanceof InsertTabletPlan) {
+          PartitionUtils.reordering(
+              (InsertTabletPlan) entry.getKey(),
+              subStatus,
+              tmpStatus.subStatus.toArray(new TSStatus[] {}));
+        } else if (parentPlan instanceof CreateMultiTimeSeriesPlan) {
+          CreateMultiTimeSeriesPlan subPlan = (CreateMultiTimeSeriesPlan) 
entry.getKey();
+          for (int i = 0; i < subPlan.getIndexes().size(); i++) {
+            subStatus[subPlan.getIndexes().get(i)] = 
tmpStatus.subStatus.get(i);
+          }
+        }
+      }
+      if (tmpStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        // execution failed, record the error message
+        errorCodePartitionGroups.add(
+            String.format(
+                "[%s@%s:%s:%s]",
+                tmpStatus.getCode(),
+                entry.getValue().getHeader(),
+                tmpStatus.getMessage(),
+                tmpStatus.subStatus));
+      }
+      if (parentPlan instanceof InsertTabletPlan
+          && tmpStatus.isSetRedirectNode()
+          && ((InsertTabletPlan) entry.getKey()).getMaxTime()
+              == ((InsertTabletPlan) parentPlan).getMaxTime()) {
+        endPoint = tmpStatus.getRedirectNode();
+      }
+    }
+
+    if (parentPlan instanceof CreateMultiTimeSeriesPlan
+        && !((CreateMultiTimeSeriesPlan) parentPlan).getResults().isEmpty()) {
+      if (subStatus == null) {
+        subStatus = new TSStatus[totalRowNum];
+        Arrays.fill(subStatus, RpcUtils.SUCCESS_STATUS);
+      }
+      noFailure = false;
+      isBatchFailure = true;
+      for (Entry<Integer, TSStatus> integerTSStatusEntry :
+          ((CreateMultiTimeSeriesPlan) parentPlan).getResults().entrySet()) {
+        subStatus[integerTSStatusEntry.getKey()] = 
integerTSStatusEntry.getValue();
+      }
+    }
+    return concludeFinalStatus(
+        noFailure, endPoint, isBatchFailure, subStatus, 
errorCodePartitionGroups);
+  }
+
+  private TSStatus concludeFinalStatus(

Review comment:
       Same as above.

##########
File path: 
cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
##########
@@ -1411,6 +1435,291 @@ public TSStatus 
processNonPartitionedMetaPlan(PhysicalPlan plan) {
     return result;
   }
 
+  /**
+   * Forward plans to the DataGroupMember of one node in the corresponding 
group. Only when all
+   * nodes time out, will a TIME_OUT be returned.
+   *
+   * @param planGroupMap sub-plan -> belong data group pairs
+   */
+  public TSStatus forwardPlan(Map<PhysicalPlan, PartitionGroup> planGroupMap, 
PhysicalPlan plan) {
+    // the error codes from the groups that cannot execute the plan
+    TSStatus status;
+    if (planGroupMap.size() == 1) {
+      status = forwardToSingleGroup(planGroupMap.entrySet().iterator().next());
+    } else {
+      if (plan instanceof InsertTabletPlan || plan instanceof 
CreateMultiTimeSeriesPlan) {
+        // InsertTabletPlan and CreateMultiTimeSeriesPlan contains many rows, 
each will correspond
+        // to a TSStatus as its
+        // execution result, as the plan is split and the sub-plans may have 
interleaving ranges,
+        // we must assure that each TSStatus is placed to the right position
+        // e.g., an InsertTabletPlan contains 3 rows, row1 and row3 belong to 
NodeA and row2
+        // belongs to NodeB, when NodeA returns a success while NodeB returns 
a failure, the
+        // failure and success should be placed into proper positions in 
TSStatus.subStatus
+        status = forwardMultiSubPlan(planGroupMap, plan);
+      } else {
+        status = forwardToMultipleGroup(planGroupMap);
+      }
+    }
+    if (plan instanceof InsertPlan
+        && status.getCode() == 
TSStatusCode.TIMESERIES_NOT_EXIST.getStatusCode()
+        && config.isEnableAutoCreateSchema()) {
+      TSStatus tmpStatus = createTimeseriesForFailedInsertion(planGroupMap, 
((InsertPlan) plan));
+      if (tmpStatus != null) {
+        status = tmpStatus;
+      }
+    }
+    logger.debug("{}: executed {} with answer {}", name, plan, status);
+    return status;
+  }
+
+  private TSStatus createTimeseriesForFailedInsertion(
+      Map<PhysicalPlan, PartitionGroup> planGroupMap, InsertPlan plan) {
+    // try to create timeseries
+    if (plan.getFailedMeasurements() != null) {
+      plan.getPlanFromFailed();
+    }
+    boolean hasCreate;
+    try {
+      hasCreate = ((CMManager) IoTDB.metaManager).createTimeseries(plan);
+    } catch (IllegalPathException | CheckConsistencyException e) {
+      return StatusUtils.getStatus(StatusUtils.EXECUTE_STATEMENT_ERROR, 
e.getMessage());
+    }
+    if (hasCreate) {
+      return forwardPlan(planGroupMap, plan);
+    } else {
+      logger.error("{}, Cannot auto create timeseries.", thisNode);
+    }
+    return null;
+  }
+
+  /**
+   * Forward each sub-plan to its belonging data group, and combine responses 
from the groups.
+   *
+   * @param planGroupMap sub-plan -> data group pairs
+   */
+  @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity 
warning
+  private TSStatus forwardMultiSubPlan(
+      Map<PhysicalPlan, PartitionGroup> planGroupMap, PhysicalPlan parentPlan) 
{
+    List<String> errorCodePartitionGroups = new ArrayList<>();
+    TSStatus tmpStatus;
+    TSStatus[] subStatus = null;
+    boolean noFailure = true;
+    boolean isBatchFailure = false;
+    EndPoint endPoint = null;
+    int totalRowNum = 0;
+    // send sub-plans to each belonging data group and collect results
+    for (Map.Entry<PhysicalPlan, PartitionGroup> entry : 
planGroupMap.entrySet()) {
+      tmpStatus = forwardToSingleGroup(entry);
+      logger.debug("{}: from {},{},{}", name, entry.getKey(), 
entry.getValue(), tmpStatus);
+      noFailure = (tmpStatus.getCode() == 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) && noFailure;
+      isBatchFailure =
+          (tmpStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) 
|| isBatchFailure;
+      if (tmpStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
+        if (parentPlan instanceof InsertTabletPlan) {
+          totalRowNum = ((InsertTabletPlan) parentPlan).getRowCount();
+        } else if (parentPlan instanceof CreateMultiTimeSeriesPlan) {
+          totalRowNum = ((CreateMultiTimeSeriesPlan) 
parentPlan).getIndexes().size();
+        }
+        if (subStatus == null) {
+          subStatus = new TSStatus[totalRowNum];
+          Arrays.fill(subStatus, RpcUtils.SUCCESS_STATUS);
+        }
+        // set the status from one group to the proper positions of the 
overall status
+        if (parentPlan instanceof InsertTabletPlan) {
+          PartitionUtils.reordering(
+              (InsertTabletPlan) entry.getKey(),
+              subStatus,
+              tmpStatus.subStatus.toArray(new TSStatus[] {}));
+        } else if (parentPlan instanceof CreateMultiTimeSeriesPlan) {
+          CreateMultiTimeSeriesPlan subPlan = (CreateMultiTimeSeriesPlan) 
entry.getKey();
+          for (int i = 0; i < subPlan.getIndexes().size(); i++) {
+            subStatus[subPlan.getIndexes().get(i)] = 
tmpStatus.subStatus.get(i);
+          }
+        }
+      }
+      if (tmpStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        // execution failed, record the error message
+        errorCodePartitionGroups.add(
+            String.format(
+                "[%s@%s:%s:%s]",
+                tmpStatus.getCode(),
+                entry.getValue().getHeader(),
+                tmpStatus.getMessage(),
+                tmpStatus.subStatus));
+      }
+      if (parentPlan instanceof InsertTabletPlan
+          && tmpStatus.isSetRedirectNode()
+          && ((InsertTabletPlan) entry.getKey()).getMaxTime()
+              == ((InsertTabletPlan) parentPlan).getMaxTime()) {
+        endPoint = tmpStatus.getRedirectNode();
+      }
+    }
+
+    if (parentPlan instanceof CreateMultiTimeSeriesPlan
+        && !((CreateMultiTimeSeriesPlan) parentPlan).getResults().isEmpty()) {
+      if (subStatus == null) {
+        subStatus = new TSStatus[totalRowNum];
+        Arrays.fill(subStatus, RpcUtils.SUCCESS_STATUS);
+      }
+      noFailure = false;
+      isBatchFailure = true;
+      for (Entry<Integer, TSStatus> integerTSStatusEntry :
+          ((CreateMultiTimeSeriesPlan) parentPlan).getResults().entrySet()) {
+        subStatus[integerTSStatusEntry.getKey()] = 
integerTSStatusEntry.getValue();
+      }
+    }
+    return concludeFinalStatus(
+        noFailure, endPoint, isBatchFailure, subStatus, 
errorCodePartitionGroups);
+  }
+
+  private TSStatus concludeFinalStatus(
+      boolean noFailure,
+      EndPoint endPoint,
+      boolean isBatchFailure,
+      TSStatus[] subStatus,
+      List<String> errorCodePartitionGroups) {
+    TSStatus status;
+    if (noFailure) {
+      status = StatusUtils.OK;
+      if (endPoint != null) {
+        status = StatusUtils.getStatus(status, endPoint);
+      }
+    } else if (isBatchFailure) {
+      status = RpcUtils.getStatus(Arrays.asList(subStatus));
+    } else {
+      status =
+          StatusUtils.getStatus(
+              StatusUtils.EXECUTE_STATEMENT_ERROR,
+              MSG_MULTIPLE_ERROR + errorCodePartitionGroups.toString());
+    }
+    return status;
+  }
+
+  private TSStatus forwardToSingleGroup(Map.Entry<PhysicalPlan, 
PartitionGroup> entry) {

Review comment:
       Same as above.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to