shuwenwei commented on code in PR #15639:
URL: https://github.com/apache/iotdb/pull/15639#discussion_r2128242957
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java:
##########
@@ -118,14 +122,99 @@ public FragmentInstanceDispatcherImpl(
}
@Override
- public Future<FragInstanceDispatchResult> dispatch(List<FragmentInstance>
instances) {
+ public Future<FragInstanceDispatchResult> dispatch(
+ SubPlan root, List<FragmentInstance> instances) {
if (type == QueryType.READ) {
- return dispatchRead(instances);
+ return instances.size() == 1 || root == null
+ ? dispatchRead(instances)
+ : topologicalParallelDispatchRead(root, instances);
} else {
return dispatchWrite(instances);
}
}
+ private Future<FragInstanceDispatchResult> topologicalParallelDispatchRead(
+ SubPlan root, List<FragmentInstance> instances) {
+ long startTime = System.nanoTime();
+ LinkedBlockingQueue<Pair<SubPlan, Boolean>> queue = new
LinkedBlockingQueue<>(instances.size());
+ List<Future<FragInstanceDispatchResult>> futures = new
ArrayList<>(instances.size());
+ queue.add(new Pair<>(root, true));
+ try {
+ while (futures.size() < instances.size()) {
+ Pair<SubPlan, Boolean> pair = queue.take();
+ SubPlan next = pair.getLeft();
+ boolean previousSuccess = pair.getRight();
+ if (!previousSuccess) {
+ break;
+ }
+ FragmentInstance fragmentInstance =
+
instances.get(next.getPlanFragment().getIndexInFragmentInstanceList());
+ futures.add(asyncDispatchOneInstance(next, fragmentInstance, queue));
+ }
+ for (Future<FragInstanceDispatchResult> future : futures) {
+ FragInstanceDispatchResult result = future.get();
+ if (!result.isSuccessful()) {
+ return immediateFuture(result);
+ }
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOGGER.error("Interrupted when dispatching read async", e);
+ return immediateFuture(
+ new FragInstanceDispatchResult(
+ RpcUtils.getStatus(
+ TSStatusCode.INTERNAL_SERVER_ERROR, "Interrupted errors: " +
e.getMessage())));
+ } catch (Throwable t) {
+ LOGGER.warn(DISPATCH_FAILED, t);
+ return immediateFuture(
+ new FragInstanceDispatchResult(
+ RpcUtils.getStatus(
+ TSStatusCode.INTERNAL_SERVER_ERROR, UNEXPECTED_ERRORS +
t.getMessage())));
+ } finally {
+ long queryDispatchReadTime = System.nanoTime() - startTime;
+ QUERY_EXECUTION_METRIC_SET.recordExecutionCost(DISPATCH_READ,
queryDispatchReadTime);
+ queryContext.recordDispatchCost(queryDispatchReadTime);
+ }
+ return immediateFuture(new FragInstanceDispatchResult(true));
+ }
+
+ private Future<FragInstanceDispatchResult> asyncDispatchOneInstance(
+ SubPlan plan, FragmentInstance instance,
LinkedBlockingQueue<Pair<SubPlan, Boolean>> queue) {
+ return FragmentInstanceManager.getInstance()
+ .getDispatchExecutor()
+ .submit(
+ () -> {
+ boolean success = false;
+ try (SetThreadName threadName = new
SetThreadName(instance.getId().getFullId())) {
+ dispatchOneInstance(instance);
+ success = true;
+ } catch (FragmentInstanceDispatchException e) {
+ return new FragInstanceDispatchResult(e.getFailureStatus());
+ } catch (Throwable t) {
+ LOGGER.warn(DISPATCH_FAILED, t);
+ return new FragInstanceDispatchResult(
+ RpcUtils.getStatus(
+ TSStatusCode.INTERNAL_SERVER_ERROR, UNEXPECTED_ERRORS
+ t.getMessage()));
+ } finally {
+ for (SubPlan child : plan.getChildren()) {
+ queue.add(new Pair<>(child, success));
Review Comment:
When a failure occurs, at least one needs to be sent to notify the outer
loop to exit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]