fanhualta commented on a change in pull request #3191:
URL: https://github.com/apache/iotdb/pull/3191#discussion_r645568987
##########
File path:
cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
##########
@@ -1411,6 +1435,291 @@ public TSStatus
processNonPartitionedMetaPlan(PhysicalPlan plan) {
return result;
}
+ /**
+ * Forward plans to the DataGroupMember of one node in the corresponding
group. Only when all
+ * nodes time out, will a TIME_OUT be returned.
+ *
+ * @param planGroupMap sub-plan -> belong data group pairs
+ */
+ public TSStatus forwardPlan(Map<PhysicalPlan, PartitionGroup> planGroupMap,
PhysicalPlan plan) {
+ // the error codes from the groups that cannot execute the plan
+ TSStatus status;
+ if (planGroupMap.size() == 1) {
+ status = forwardToSingleGroup(planGroupMap.entrySet().iterator().next());
+ } else {
+ if (plan instanceof InsertTabletPlan || plan instanceof
CreateMultiTimeSeriesPlan) {
+ // InsertTabletPlan and CreateMultiTimeSeriesPlan contains many rows,
each will correspond
+ // to a TSStatus as its
+ // execution result, as the plan is split and the sub-plans may have
interleaving ranges,
+ // we must assure that each TSStatus is placed to the right position
+ // e.g., an InsertTabletPlan contains 3 rows, row1 and row3 belong to
NodeA and row2
+ // belongs to NodeB, when NodeA returns a success while NodeB returns
a failure, the
+ // failure and success should be placed into proper positions in
TSStatus.subStatus
+ status = forwardMultiSubPlan(planGroupMap, plan);
+ } else {
+ status = forwardToMultipleGroup(planGroupMap);
+ }
+ }
+ if (plan instanceof InsertPlan
+ && status.getCode() ==
TSStatusCode.TIMESERIES_NOT_EXIST.getStatusCode()
+ && config.isEnableAutoCreateSchema()) {
+ TSStatus tmpStatus = createTimeseriesForFailedInsertion(planGroupMap,
((InsertPlan) plan));
+ if (tmpStatus != null) {
+ status = tmpStatus;
+ }
+ }
+ logger.debug("{}: executed {} with answer {}", name, plan, status);
+ return status;
+ }
+
+ private TSStatus createTimeseriesForFailedInsertion(
+ Map<PhysicalPlan, PartitionGroup> planGroupMap, InsertPlan plan) {
+ // try to create timeseries
+ if (plan.getFailedMeasurements() != null) {
+ plan.getPlanFromFailed();
+ }
+ boolean hasCreate;
+ try {
+ hasCreate = ((CMManager) IoTDB.metaManager).createTimeseries(plan);
+ } catch (IllegalPathException | CheckConsistencyException e) {
+ return StatusUtils.getStatus(StatusUtils.EXECUTE_STATEMENT_ERROR,
e.getMessage());
+ }
+ if (hasCreate) {
+ return forwardPlan(planGroupMap, plan);
+ } else {
+ logger.error("{}, Cannot auto create timeseries.", thisNode);
+ }
+ return null;
+ }
+
+ /**
+ * Forward each sub-plan to its belonging data group, and combine responses
from the groups.
+ *
+ * @param planGroupMap sub-plan -> data group pairs
+ */
+ @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity
warning
+ private TSStatus forwardMultiSubPlan(
+ Map<PhysicalPlan, PartitionGroup> planGroupMap, PhysicalPlan parentPlan)
{
+ List<String> errorCodePartitionGroups = new ArrayList<>();
+ TSStatus tmpStatus;
+ TSStatus[] subStatus = null;
+ boolean noFailure = true;
+ boolean isBatchFailure = false;
+ EndPoint endPoint = null;
+ int totalRowNum = 0;
+ // send sub-plans to each belonging data group and collect results
+ for (Map.Entry<PhysicalPlan, PartitionGroup> entry :
planGroupMap.entrySet()) {
+ tmpStatus = forwardToSingleGroup(entry);
+ logger.debug("{}: from {},{},{}", name, entry.getKey(),
entry.getValue(), tmpStatus);
+ noFailure = (tmpStatus.getCode() ==
TSStatusCode.SUCCESS_STATUS.getStatusCode()) && noFailure;
+ isBatchFailure =
+ (tmpStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode())
|| isBatchFailure;
+ if (tmpStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
+ if (parentPlan instanceof InsertTabletPlan) {
+ totalRowNum = ((InsertTabletPlan) parentPlan).getRowCount();
+ } else if (parentPlan instanceof CreateMultiTimeSeriesPlan) {
+ totalRowNum = ((CreateMultiTimeSeriesPlan)
parentPlan).getIndexes().size();
+ }
+ if (subStatus == null) {
+ subStatus = new TSStatus[totalRowNum];
+ Arrays.fill(subStatus, RpcUtils.SUCCESS_STATUS);
+ }
+ // set the status from one group to the proper positions of the
overall status
+ if (parentPlan instanceof InsertTabletPlan) {
+ PartitionUtils.reordering(
+ (InsertTabletPlan) entry.getKey(),
+ subStatus,
+ tmpStatus.subStatus.toArray(new TSStatus[] {}));
+ } else if (parentPlan instanceof CreateMultiTimeSeriesPlan) {
+ CreateMultiTimeSeriesPlan subPlan = (CreateMultiTimeSeriesPlan)
entry.getKey();
+ for (int i = 0; i < subPlan.getIndexes().size(); i++) {
+ subStatus[subPlan.getIndexes().get(i)] =
tmpStatus.subStatus.get(i);
+ }
+ }
+ }
+ if (tmpStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ // execution failed, record the error message
+ errorCodePartitionGroups.add(
+ String.format(
+ "[%s@%s:%s:%s]",
+ tmpStatus.getCode(),
+ entry.getValue().getHeader(),
+ tmpStatus.getMessage(),
+ tmpStatus.subStatus));
+ }
+ if (parentPlan instanceof InsertTabletPlan
+ && tmpStatus.isSetRedirectNode()
+ && ((InsertTabletPlan) entry.getKey()).getMaxTime()
+ == ((InsertTabletPlan) parentPlan).getMaxTime()) {
+ endPoint = tmpStatus.getRedirectNode();
+ }
+ }
+
+ if (parentPlan instanceof CreateMultiTimeSeriesPlan
+ && !((CreateMultiTimeSeriesPlan) parentPlan).getResults().isEmpty()) {
+ if (subStatus == null) {
+ subStatus = new TSStatus[totalRowNum];
+ Arrays.fill(subStatus, RpcUtils.SUCCESS_STATUS);
+ }
+ noFailure = false;
+ isBatchFailure = true;
+ for (Entry<Integer, TSStatus> integerTSStatusEntry :
+ ((CreateMultiTimeSeriesPlan) parentPlan).getResults().entrySet()) {
+ subStatus[integerTSStatusEntry.getKey()] =
integerTSStatusEntry.getValue();
+ }
+ }
+ return concludeFinalStatus(
+ noFailure, endPoint, isBatchFailure, subStatus,
errorCodePartitionGroups);
+ }
+
+ private TSStatus concludeFinalStatus(
+ boolean noFailure,
+ EndPoint endPoint,
+ boolean isBatchFailure,
+ TSStatus[] subStatus,
+ List<String> errorCodePartitionGroups) {
+ TSStatus status;
+ if (noFailure) {
+ status = StatusUtils.OK;
+ if (endPoint != null) {
+ status = StatusUtils.getStatus(status, endPoint);
+ }
+ } else if (isBatchFailure) {
+ status = RpcUtils.getStatus(Arrays.asList(subStatus));
+ } else {
+ status =
+ StatusUtils.getStatus(
+ StatusUtils.EXECUTE_STATEMENT_ERROR,
+ MSG_MULTIPLE_ERROR + errorCodePartitionGroups.toString());
+ }
+ return status;
+ }
+
+ private TSStatus forwardToSingleGroup(Map.Entry<PhysicalPlan,
PartitionGroup> entry) {
+ TSStatus result;
+ if (entry.getValue().contains(thisNode)) {
+ // the query should be handled by a group the local node is in, handle
it with in the group
+ long startTime =
+ Timer.Statistic.META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP
+ .getOperationStartTime();
+ logger.debug(
+ "Execute {} in a local group of {}", entry.getKey(),
entry.getValue().getHeader());
+ result =
+ getLocalDataMember(entry.getValue().getHeader(),
entry.getValue().getId())
+ .executeNonQueryPlan(entry.getKey());
+ Timer.Statistic.META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP
+ .calOperationCostTimeFromStart(startTime);
+ } else {
+ // forward the query to the group that should handle it
+ long startTime =
+ Timer.Statistic.META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_REMOTE_GROUP
+ .getOperationStartTime();
+ logger.debug(
+ "Forward {} to a remote group of {}", entry.getKey(),
entry.getValue().getHeader());
+ result = forwardPlan(entry.getKey(), entry.getValue());
+ Timer.Statistic.META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_REMOTE_GROUP
+ .calOperationCostTimeFromStart(startTime);
+ }
+ return result;
+ }
+
+ /**
+ * forward each sub-plan to its corresponding data group, if some groups
goes wrong, the error
+ * messages from each group will be compacted into one string.
+ *
+ * @param planGroupMap sub-plan -> data group pairs
+ */
+ private TSStatus forwardToMultipleGroup(Map<PhysicalPlan, PartitionGroup>
planGroupMap) {
Review comment:
Same as above.
##########
File path:
cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
##########
@@ -1411,6 +1435,291 @@ public TSStatus
processNonPartitionedMetaPlan(PhysicalPlan plan) {
return result;
}
+ /**
+ * Forward plans to the DataGroupMember of one node in the corresponding
group. Only when all
+ * nodes time out, will a TIME_OUT be returned.
+ *
+ * @param planGroupMap sub-plan -> belong data group pairs
+ */
+ public TSStatus forwardPlan(Map<PhysicalPlan, PartitionGroup> planGroupMap,
PhysicalPlan plan) {
+ // the error codes from the groups that cannot execute the plan
+ TSStatus status;
+ if (planGroupMap.size() == 1) {
+ status = forwardToSingleGroup(planGroupMap.entrySet().iterator().next());
+ } else {
+ if (plan instanceof InsertTabletPlan || plan instanceof
CreateMultiTimeSeriesPlan) {
+ // InsertTabletPlan and CreateMultiTimeSeriesPlan contains many rows,
each will correspond
+ // to a TSStatus as its
+ // execution result, as the plan is split and the sub-plans may have
interleaving ranges,
+ // we must assure that each TSStatus is placed to the right position
+ // e.g., an InsertTabletPlan contains 3 rows, row1 and row3 belong to
NodeA and row2
+ // belongs to NodeB, when NodeA returns a success while NodeB returns
a failure, the
+ // failure and success should be placed into proper positions in
TSStatus.subStatus
+ status = forwardMultiSubPlan(planGroupMap, plan);
+ } else {
+ status = forwardToMultipleGroup(planGroupMap);
+ }
+ }
+ if (plan instanceof InsertPlan
+ && status.getCode() ==
TSStatusCode.TIMESERIES_NOT_EXIST.getStatusCode()
+ && config.isEnableAutoCreateSchema()) {
+ TSStatus tmpStatus = createTimeseriesForFailedInsertion(planGroupMap,
((InsertPlan) plan));
+ if (tmpStatus != null) {
+ status = tmpStatus;
+ }
+ }
+ logger.debug("{}: executed {} with answer {}", name, plan, status);
+ return status;
+ }
+
+ private TSStatus createTimeseriesForFailedInsertion(
+ Map<PhysicalPlan, PartitionGroup> planGroupMap, InsertPlan plan) {
+ // try to create timeseries
+ if (plan.getFailedMeasurements() != null) {
+ plan.getPlanFromFailed();
+ }
+ boolean hasCreate;
+ try {
+ hasCreate = ((CMManager) IoTDB.metaManager).createTimeseries(plan);
+ } catch (IllegalPathException | CheckConsistencyException e) {
+ return StatusUtils.getStatus(StatusUtils.EXECUTE_STATEMENT_ERROR,
e.getMessage());
+ }
+ if (hasCreate) {
+ return forwardPlan(planGroupMap, plan);
+ } else {
+ logger.error("{}, Cannot auto create timeseries.", thisNode);
+ }
+ return null;
+ }
+
+ /**
+ * Forward each sub-plan to its belonging data group, and combine responses
from the groups.
+ *
+ * @param planGroupMap sub-plan -> data group pairs
+ */
+ @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity
warning
+ private TSStatus forwardMultiSubPlan(
+ Map<PhysicalPlan, PartitionGroup> planGroupMap, PhysicalPlan parentPlan)
{
+ List<String> errorCodePartitionGroups = new ArrayList<>();
+ TSStatus tmpStatus;
+ TSStatus[] subStatus = null;
+ boolean noFailure = true;
+ boolean isBatchFailure = false;
+ EndPoint endPoint = null;
+ int totalRowNum = 0;
+ // send sub-plans to each belonging data group and collect results
+ for (Map.Entry<PhysicalPlan, PartitionGroup> entry :
planGroupMap.entrySet()) {
+ tmpStatus = forwardToSingleGroup(entry);
+ logger.debug("{}: from {},{},{}", name, entry.getKey(),
entry.getValue(), tmpStatus);
+ noFailure = (tmpStatus.getCode() ==
TSStatusCode.SUCCESS_STATUS.getStatusCode()) && noFailure;
+ isBatchFailure =
+ (tmpStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode())
|| isBatchFailure;
+ if (tmpStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
+ if (parentPlan instanceof InsertTabletPlan) {
+ totalRowNum = ((InsertTabletPlan) parentPlan).getRowCount();
+ } else if (parentPlan instanceof CreateMultiTimeSeriesPlan) {
+ totalRowNum = ((CreateMultiTimeSeriesPlan)
parentPlan).getIndexes().size();
+ }
+ if (subStatus == null) {
+ subStatus = new TSStatus[totalRowNum];
+ Arrays.fill(subStatus, RpcUtils.SUCCESS_STATUS);
+ }
+ // set the status from one group to the proper positions of the
overall status
+ if (parentPlan instanceof InsertTabletPlan) {
+ PartitionUtils.reordering(
+ (InsertTabletPlan) entry.getKey(),
+ subStatus,
+ tmpStatus.subStatus.toArray(new TSStatus[] {}));
+ } else if (parentPlan instanceof CreateMultiTimeSeriesPlan) {
+ CreateMultiTimeSeriesPlan subPlan = (CreateMultiTimeSeriesPlan)
entry.getKey();
+ for (int i = 0; i < subPlan.getIndexes().size(); i++) {
+ subStatus[subPlan.getIndexes().get(i)] =
tmpStatus.subStatus.get(i);
+ }
+ }
+ }
+ if (tmpStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ // execution failed, record the error message
+ errorCodePartitionGroups.add(
+ String.format(
+ "[%s@%s:%s:%s]",
+ tmpStatus.getCode(),
+ entry.getValue().getHeader(),
+ tmpStatus.getMessage(),
+ tmpStatus.subStatus));
+ }
+ if (parentPlan instanceof InsertTabletPlan
+ && tmpStatus.isSetRedirectNode()
+ && ((InsertTabletPlan) entry.getKey()).getMaxTime()
+ == ((InsertTabletPlan) parentPlan).getMaxTime()) {
+ endPoint = tmpStatus.getRedirectNode();
+ }
+ }
+
+ if (parentPlan instanceof CreateMultiTimeSeriesPlan
+ && !((CreateMultiTimeSeriesPlan) parentPlan).getResults().isEmpty()) {
+ if (subStatus == null) {
+ subStatus = new TSStatus[totalRowNum];
+ Arrays.fill(subStatus, RpcUtils.SUCCESS_STATUS);
+ }
+ noFailure = false;
+ isBatchFailure = true;
+ for (Entry<Integer, TSStatus> integerTSStatusEntry :
+ ((CreateMultiTimeSeriesPlan) parentPlan).getResults().entrySet()) {
+ subStatus[integerTSStatusEntry.getKey()] =
integerTSStatusEntry.getValue();
+ }
+ }
+ return concludeFinalStatus(
+ noFailure, endPoint, isBatchFailure, subStatus,
errorCodePartitionGroups);
+ }
+
+ private TSStatus concludeFinalStatus(
+ boolean noFailure,
+ EndPoint endPoint,
+ boolean isBatchFailure,
+ TSStatus[] subStatus,
+ List<String> errorCodePartitionGroups) {
+ TSStatus status;
+ if (noFailure) {
+ status = StatusUtils.OK;
+ if (endPoint != null) {
+ status = StatusUtils.getStatus(status, endPoint);
+ }
+ } else if (isBatchFailure) {
+ status = RpcUtils.getStatus(Arrays.asList(subStatus));
+ } else {
+ status =
+ StatusUtils.getStatus(
+ StatusUtils.EXECUTE_STATEMENT_ERROR,
+ MSG_MULTIPLE_ERROR + errorCodePartitionGroups.toString());
+ }
+ return status;
+ }
+
+ private TSStatus forwardToSingleGroup(Map.Entry<PhysicalPlan,
PartitionGroup> entry) {
+ TSStatus result;
+ if (entry.getValue().contains(thisNode)) {
+ // the query should be handled by a group the local node is in, handle
it with in the group
+ long startTime =
+ Timer.Statistic.META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP
+ .getOperationStartTime();
+ logger.debug(
+ "Execute {} in a local group of {}", entry.getKey(),
entry.getValue().getHeader());
+ result =
+ getLocalDataMember(entry.getValue().getHeader(),
entry.getValue().getId())
+ .executeNonQueryPlan(entry.getKey());
+ Timer.Statistic.META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP
+ .calOperationCostTimeFromStart(startTime);
+ } else {
+ // forward the query to the group that should handle it
+ long startTime =
+ Timer.Statistic.META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_REMOTE_GROUP
+ .getOperationStartTime();
+ logger.debug(
+ "Forward {} to a remote group of {}", entry.getKey(),
entry.getValue().getHeader());
+ result = forwardPlan(entry.getKey(), entry.getValue());
+ Timer.Statistic.META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_REMOTE_GROUP
+ .calOperationCostTimeFromStart(startTime);
+ }
+ return result;
+ }
+
+ /**
+ * forward each sub-plan to its corresponding data group, if some groups
goes wrong, the error
+ * messages from each group will be compacted into one string.
+ *
+ * @param planGroupMap sub-plan -> data group pairs
+ */
+ private TSStatus forwardToMultipleGroup(Map<PhysicalPlan, PartitionGroup>
planGroupMap) {
+ List<String> errorCodePartitionGroups = new ArrayList<>();
+ TSStatus tmpStatus;
+ boolean allRedirect = true;
+ for (Map.Entry<PhysicalPlan, PartitionGroup> entry :
planGroupMap.entrySet()) {
+ tmpStatus = forwardToSingleGroup(entry);
+ if (!tmpStatus.isSetRedirectNode()) {
+ allRedirect = false;
+ }
+ if (tmpStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ logger.error("Fail to send log {} to data group {}", entry.getKey(),
entry.getValue());
+ // execution failed, record the error message
+ errorCodePartitionGroups.add(
+ String.format(
+ "[%s@%s:%s]",
+ tmpStatus.getCode(), entry.getValue().getHeader(),
tmpStatus.getMessage()));
+ }
+ }
+ TSStatus status;
+ if (errorCodePartitionGroups.isEmpty()) {
+ if (allRedirect) {
+ status = new TSStatus();
+ status.setCode(TSStatusCode.NEED_REDIRECTION.getStatusCode());
+ } else {
+ status = StatusUtils.OK;
+ }
+ } else {
+ status =
+ StatusUtils.getStatus(
+ StatusUtils.EXECUTE_STATEMENT_ERROR, MSG_MULTIPLE_ERROR +
errorCodePartitionGroups);
+ }
+ return status;
+ }
+
+ /**
+ * Forward a plan to the DataGroupMember of one node in the group. Only when
all nodes time out,
+ * will a TIME_OUT be returned.
+ */
+ private TSStatus forwardPlan(PhysicalPlan plan, PartitionGroup group) {
Review comment:
Same as above.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]