Pochatkin commented on code in PR #2843:
URL: https://github.com/apache/ignite-3/pull/2843#discussion_r1398875054
##########
modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java:
##########
@@ -75,111 +58,118 @@ public class ComputeComponentImpl implements
ComputeComponent {
private final InFlightFutures inFlightFutures = new InFlightFutures();
- private final Ignite ignite;
-
private final MessagingService messagingService;
- private final ComputeConfiguration configuration;
-
private final JobContextManager jobContextManager;
- private ExecutorService jobExecutorService;
+ private final ComputeExecutor executor;
/**
* Creates a new instance.
*/
public ComputeComponentImpl(
- Ignite ignite,
MessagingService messagingService,
- ComputeConfiguration configuration,
- JobContextManager jobContextManager) {
- this.ignite = ignite;
+ JobContextManager jobContextManager,
+ ComputeExecutor executor
+ ) {
this.messagingService = messagingService;
- this.configuration = configuration;
this.jobContextManager = jobContextManager;
+ this.executor = executor;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void start() {
+ executor.start();
+
+ messagingService.addMessageHandler(ComputeMessageTypes.class,
(message, senderConsistentId, correlationId) -> {
+ assert correlationId != null;
+
+ if (message instanceof ExecuteRequest) {
+ processExecuteRequest((ExecuteRequest) message,
senderConsistentId, correlationId);
+ }
+ });
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void stop() throws Exception {
+ if (!stopGuard.compareAndSet(false, true)) {
+ return;
+ }
+
+ busyLock.block();
+
+ executor.stop();
+
+ inFlightFutures.cancelInFlightFutures();
}
/** {@inheritDoc} */
@Override
- public <R> CompletableFuture<R> executeLocally(List<DeploymentUnit> units,
String jobClassName, Object... args) {
+ public <R> CompletableFuture<R> executeLocally(
+ ExecutionOptions options,
+ List<DeploymentUnit> units,
+ String jobClassName,
+ Object... args
+ ) {
if (!busyLock.enterBusy()) {
return failedFuture(new NodeStoppingException());
}
try {
- return mapClassLoaderExceptions(jobClassLoader(units),
jobClassName)
- .thenCompose(context -> doExecuteLocally(this.<R,
ComputeJob<R>>jobClass(context.classLoader(), jobClassName), args)
- .whenComplete((r, e) -> context.close())
+ CompletableFuture<JobContext> jobContextCompletableFuture =
mapClassLoaderExceptions(
+ jobContextManager.acquireClassLoader(units), jobClassName);
+ return jobContextCompletableFuture
+ .thenCompose(context ->
+ doExecuteLocally(options,
ComputeUtils.<R>instantiateJob(context.classLoader(), jobClassName), args)
+ .whenComplete((r, e) -> context.close())
);
} finally {
busyLock.leaveBusy();
}
}
- private <R> CompletableFuture<R> doExecuteLocally(Class<? extends
ComputeJob<R>> jobClass, Object[] args) {
- assert jobExecutorService != null : "Not started yet!";
-
- CompletableFuture<R> future = startLocalExecution(jobClass, args);
+ private <R> CompletableFuture<R> doExecuteLocally(ExecutionOptions
options, ComputeJob<R> jobInstance, Object[] args) {
+ CompletableFuture<R> future = executor.executeJob(options,
jobInstance, args);
inFlightFutures.registerFuture(future);
return future;
}
- private <R> CompletableFuture<R> startLocalExecution(Class<? extends
ComputeJob<R>> jobClass, Object[] args) {
- try {
- return CompletableFuture.supplyAsync(() -> executeJob(jobClass,
args), jobExecutorService);
- } catch (RejectedExecutionException e) {
- return failedFuture(e);
- }
- }
-
- private <R> R executeJob(Class<? extends ComputeJob<R>> jobClass, Object[]
args) {
- ComputeJob<R> job = instantiateJob(jobClass);
- JobExecutionContext context = new JobExecutionContextImpl(ignite);
- // TODO: IGNITE-16746 - translate NodeStoppingException to a public
exception
Review Comment:
This exception should be translatted by top level exception mapper, because
this is IgniteInternalCheckedException
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]