JackieTien97 commented on code in PR #15639:
URL: https://github.com/apache/iotdb/pull/15639#discussion_r2127780210
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java:
##########
@@ -118,14 +122,99 @@ public FragmentInstanceDispatcherImpl(
}
@Override
- public Future<FragInstanceDispatchResult> dispatch(List<FragmentInstance>
instances) {
+ public Future<FragInstanceDispatchResult> dispatch(
+ SubPlan root, List<FragmentInstance> instances) {
if (type == QueryType.READ) {
- return dispatchRead(instances);
+ return instances.size() == 1 || root == null
+ ? dispatchRead(instances)
+ : topologicalParallelDispatchRead(root, instances);
} else {
return dispatchWrite(instances);
}
}
+ private Future<FragInstanceDispatchResult> topologicalParallelDispatchRead(
+ SubPlan root, List<FragmentInstance> instances) {
+ long startTime = System.nanoTime();
+ LinkedBlockingQueue<Pair<SubPlan, Boolean>> queue = new
LinkedBlockingQueue<>(instances.size());
+ List<Future<FragInstanceDispatchResult>> futures = new
ArrayList<>(instances.size());
+ queue.add(new Pair<>(root, true));
+ try {
+ while (futures.size() < instances.size()) {
+ Pair<SubPlan, Boolean> pair = queue.take();
+ SubPlan next = pair.getLeft();
+ boolean previousSuccess = pair.getRight();
+ if (!previousSuccess) {
+ break;
+ }
+ FragmentInstance fragmentInstance =
+
instances.get(next.getPlanFragment().getIndexInFragmentInstanceList());
+ futures.add(asyncDispatchOneInstance(next, fragmentInstance, queue));
+ }
+ for (Future<FragInstanceDispatchResult> future : futures) {
+ FragInstanceDispatchResult result = future.get();
+ if (!result.isSuccessful()) {
+ return immediateFuture(result);
+ }
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOGGER.error("Interrupted when dispatching read async", e);
+ return immediateFuture(
+ new FragInstanceDispatchResult(
+ RpcUtils.getStatus(
+ TSStatusCode.INTERNAL_SERVER_ERROR, "Interrupted errors: " +
e.getMessage())));
+ } catch (Throwable t) {
+ LOGGER.warn(DISPATCH_FAILED, t);
+ return immediateFuture(
+ new FragInstanceDispatchResult(
+ RpcUtils.getStatus(
+ TSStatusCode.INTERNAL_SERVER_ERROR, UNEXPECTED_ERRORS +
t.getMessage())));
+ } finally {
+ long queryDispatchReadTime = System.nanoTime() - startTime;
+ QUERY_EXECUTION_METRIC_SET.recordExecutionCost(DISPATCH_READ,
queryDispatchReadTime);
+ queryContext.recordDispatchCost(queryDispatchReadTime);
+ }
+ return immediateFuture(new FragInstanceDispatchResult(true));
+ }
+
+ private Future<FragInstanceDispatchResult> asyncDispatchOneInstance(
+ SubPlan plan, FragmentInstance instance,
LinkedBlockingQueue<Pair<SubPlan, Boolean>> queue) {
+ return FragmentInstanceManager.getInstance()
+ .getDispatchExecutor()
+ .submit(
+ () -> {
+ boolean success = false;
+ try (SetThreadName threadName = new
SetThreadName(instance.getId().getFullId())) {
+ dispatchOneInstance(instance);
+ success = true;
+ } catch (FragmentInstanceDispatchException e) {
+ return new FragInstanceDispatchResult(e.getFailureStatus());
+ } catch (Throwable t) {
+ LOGGER.warn(DISPATCH_FAILED, t);
+ return new FragInstanceDispatchResult(
+ RpcUtils.getStatus(
+ TSStatusCode.INTERNAL_SERVER_ERROR, UNEXPECTED_ERRORS
+ t.getMessage()));
+ } finally {
+ for (SubPlan child : plan.getChildren()) {
+ queue.add(new Pair<>(child, success));
+ }
+ // friendly for gc, clear the plan node tree, for some queries
select all devices,
+ // it
+ // will
+ // release lots of memory
+ if (!queryContext.isExplainAnalyze()) {
+ // EXPLAIN ANALYZE will use these instances, so we can't
clear them
+ instance.getFragment().clearUselessField();
+ } else {
+ // TypeProvider is not used in EXPLAIN ANALYZE, so we can
clear it
+ instance.getFragment().clearTypeProvider();
+ }
+ }
+ return new FragInstanceDispatchResult(true);
+ });
+ }
+
// TODO: (xingtanzjr) currently we use a sequential dispatch policy for
READ, which is
// unsafe for current FragmentInstance scheduler framework. We need to
implement the
// topological dispatch according to dependency relations between
FragmentInstances
Review Comment:
you can delete this todo
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java:
##########
@@ -84,6 +84,8 @@ public class FragmentInstanceManager {
private final ExecutorService intoOperationExecutor;
private final ExecutorService modelInferenceExecutor;
+ private final ExecutorService dispatchExecutor;
Review Comment:
shouldn't put it here, better put in it in Coordinator.
FragmentInstanceManager takes effect actually in backend, however parallel
dispatch happends in front end.
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java:
##########
@@ -118,14 +122,99 @@ public FragmentInstanceDispatcherImpl(
}
@Override
- public Future<FragInstanceDispatchResult> dispatch(List<FragmentInstance>
instances) {
+ public Future<FragInstanceDispatchResult> dispatch(
+ SubPlan root, List<FragmentInstance> instances) {
if (type == QueryType.READ) {
- return dispatchRead(instances);
+ return instances.size() == 1 || root == null
+ ? dispatchRead(instances)
+ : topologicalParallelDispatchRead(root, instances);
} else {
return dispatchWrite(instances);
}
}
+ private Future<FragInstanceDispatchResult> topologicalParallelDispatchRead(
+ SubPlan root, List<FragmentInstance> instances) {
+ long startTime = System.nanoTime();
+ LinkedBlockingQueue<Pair<SubPlan, Boolean>> queue = new
LinkedBlockingQueue<>(instances.size());
+ List<Future<FragInstanceDispatchResult>> futures = new
ArrayList<>(instances.size());
+ queue.add(new Pair<>(root, true));
+ try {
+ while (futures.size() < instances.size()) {
+ Pair<SubPlan, Boolean> pair = queue.take();
+ SubPlan next = pair.getLeft();
+ boolean previousSuccess = pair.getRight();
+ if (!previousSuccess) {
+ break;
+ }
+ FragmentInstance fragmentInstance =
+
instances.get(next.getPlanFragment().getIndexInFragmentInstanceList());
+ futures.add(asyncDispatchOneInstance(next, fragmentInstance, queue));
+ }
+ for (Future<FragInstanceDispatchResult> future : futures) {
+ FragInstanceDispatchResult result = future.get();
+ if (!result.isSuccessful()) {
+ return immediateFuture(result);
+ }
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOGGER.error("Interrupted when dispatching read async", e);
+ return immediateFuture(
+ new FragInstanceDispatchResult(
+ RpcUtils.getStatus(
+ TSStatusCode.INTERNAL_SERVER_ERROR, "Interrupted errors: " +
e.getMessage())));
+ } catch (Throwable t) {
+ LOGGER.warn(DISPATCH_FAILED, t);
+ return immediateFuture(
+ new FragInstanceDispatchResult(
+ RpcUtils.getStatus(
+ TSStatusCode.INTERNAL_SERVER_ERROR, UNEXPECTED_ERRORS +
t.getMessage())));
+ } finally {
+ long queryDispatchReadTime = System.nanoTime() - startTime;
+ QUERY_EXECUTION_METRIC_SET.recordExecutionCost(DISPATCH_READ,
queryDispatchReadTime);
+ queryContext.recordDispatchCost(queryDispatchReadTime);
+ }
+ return immediateFuture(new FragInstanceDispatchResult(true));
+ }
+
+ private Future<FragInstanceDispatchResult> asyncDispatchOneInstance(
+ SubPlan plan, FragmentInstance instance,
LinkedBlockingQueue<Pair<SubPlan, Boolean>> queue) {
+ return FragmentInstanceManager.getInstance()
+ .getDispatchExecutor()
+ .submit(
+ () -> {
+ boolean success = false;
+ try (SetThreadName threadName = new
SetThreadName(instance.getId().getFullId())) {
+ dispatchOneInstance(instance);
+ success = true;
+ } catch (FragmentInstanceDispatchException e) {
+ return new FragInstanceDispatchResult(e.getFailureStatus());
+ } catch (Throwable t) {
+ LOGGER.warn(DISPATCH_FAILED, t);
+ return new FragInstanceDispatchResult(
+ RpcUtils.getStatus(
+ TSStatusCode.INTERNAL_SERVER_ERROR, UNEXPECTED_ERRORS
+ t.getMessage()));
+ } finally {
+ for (SubPlan child : plan.getChildren()) {
+ queue.add(new Pair<>(child, success));
Review Comment:
if not successful, there is no need to continue putting children into queue?
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java:
##########
@@ -118,14 +122,99 @@ public FragmentInstanceDispatcherImpl(
}
@Override
- public Future<FragInstanceDispatchResult> dispatch(List<FragmentInstance>
instances) {
+ public Future<FragInstanceDispatchResult> dispatch(
+ SubPlan root, List<FragmentInstance> instances) {
if (type == QueryType.READ) {
- return dispatchRead(instances);
+ return instances.size() == 1 || root == null
+ ? dispatchRead(instances)
+ : topologicalParallelDispatchRead(root, instances);
} else {
return dispatchWrite(instances);
}
}
+ private Future<FragInstanceDispatchResult> topologicalParallelDispatchRead(
+ SubPlan root, List<FragmentInstance> instances) {
+ long startTime = System.nanoTime();
+ LinkedBlockingQueue<Pair<SubPlan, Boolean>> queue = new
LinkedBlockingQueue<>(instances.size());
+ List<Future<FragInstanceDispatchResult>> futures = new
ArrayList<>(instances.size());
+ queue.add(new Pair<>(root, true));
+ try {
+ while (futures.size() < instances.size()) {
+ Pair<SubPlan, Boolean> pair = queue.take();
+ SubPlan next = pair.getLeft();
+ boolean previousSuccess = pair.getRight();
+ if (!previousSuccess) {
+ break;
+ }
+ FragmentInstance fragmentInstance =
+
instances.get(next.getPlanFragment().getIndexInFragmentInstanceList());
+ futures.add(asyncDispatchOneInstance(next, fragmentInstance, queue));
+ }
+ for (Future<FragInstanceDispatchResult> future : futures) {
+ FragInstanceDispatchResult result = future.get();
+ if (!result.isSuccessful()) {
+ return immediateFuture(result);
+ }
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOGGER.error("Interrupted when dispatching read async", e);
Review Comment:
warn is ok
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java:
##########
@@ -118,14 +122,99 @@ public FragmentInstanceDispatcherImpl(
}
@Override
- public Future<FragInstanceDispatchResult> dispatch(List<FragmentInstance>
instances) {
+ public Future<FragInstanceDispatchResult> dispatch(
+ SubPlan root, List<FragmentInstance> instances) {
if (type == QueryType.READ) {
- return dispatchRead(instances);
+ return instances.size() == 1 || root == null
+ ? dispatchRead(instances)
+ : topologicalParallelDispatchRead(root, instances);
} else {
return dispatchWrite(instances);
}
}
+ private Future<FragInstanceDispatchResult> topologicalParallelDispatchRead(
+ SubPlan root, List<FragmentInstance> instances) {
+ long startTime = System.nanoTime();
+ LinkedBlockingQueue<Pair<SubPlan, Boolean>> queue = new
LinkedBlockingQueue<>(instances.size());
+ List<Future<FragInstanceDispatchResult>> futures = new
ArrayList<>(instances.size());
+ queue.add(new Pair<>(root, true));
+ try {
+ while (futures.size() < instances.size()) {
+ Pair<SubPlan, Boolean> pair = queue.take();
+ SubPlan next = pair.getLeft();
+ boolean previousSuccess = pair.getRight();
+ if (!previousSuccess) {
+ break;
+ }
+ FragmentInstance fragmentInstance =
+
instances.get(next.getPlanFragment().getIndexInFragmentInstanceList());
+ futures.add(asyncDispatchOneInstance(next, fragmentInstance, queue));
+ }
+ for (Future<FragInstanceDispatchResult> future : futures) {
+ FragInstanceDispatchResult result = future.get();
+ if (!result.isSuccessful()) {
+ return immediateFuture(result);
+ }
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOGGER.error("Interrupted when dispatching read async", e);
+ return immediateFuture(
+ new FragInstanceDispatchResult(
+ RpcUtils.getStatus(
+ TSStatusCode.INTERNAL_SERVER_ERROR, "Interrupted errors: " +
e.getMessage())));
+ } catch (Throwable t) {
+ LOGGER.warn(DISPATCH_FAILED, t);
+ return immediateFuture(
+ new FragInstanceDispatchResult(
+ RpcUtils.getStatus(
+ TSStatusCode.INTERNAL_SERVER_ERROR, UNEXPECTED_ERRORS +
t.getMessage())));
+ } finally {
+ long queryDispatchReadTime = System.nanoTime() - startTime;
+ QUERY_EXECUTION_METRIC_SET.recordExecutionCost(DISPATCH_READ,
queryDispatchReadTime);
+ queryContext.recordDispatchCost(queryDispatchReadTime);
+ }
+ return immediateFuture(new FragInstanceDispatchResult(true));
+ }
+
+ private Future<FragInstanceDispatchResult> asyncDispatchOneInstance(
+ SubPlan plan, FragmentInstance instance,
LinkedBlockingQueue<Pair<SubPlan, Boolean>> queue) {
+ return FragmentInstanceManager.getInstance()
+ .getDispatchExecutor()
+ .submit(
+ () -> {
+ boolean success = false;
+ try (SetThreadName threadName = new
SetThreadName(instance.getId().getFullId())) {
+ dispatchOneInstance(instance);
+ success = true;
+ } catch (FragmentInstanceDispatchException e) {
+ return new FragInstanceDispatchResult(e.getFailureStatus());
+ } catch (Throwable t) {
+ LOGGER.warn(DISPATCH_FAILED, t);
+ return new FragInstanceDispatchResult(
+ RpcUtils.getStatus(
+ TSStatusCode.INTERNAL_SERVER_ERROR, UNEXPECTED_ERRORS
+ t.getMessage()));
+ } finally {
+ for (SubPlan child : plan.getChildren()) {
+ queue.add(new Pair<>(child, success));
+ }
+ // friendly for gc, clear the plan node tree, for some queries
select all devices,
+ // it
+ // will
+ // release lots of memory
Review Comment:
why not put it in one line
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]