valepakh commented on code in PR #2843:
URL: https://github.com/apache/ignite-3/pull/2843#discussion_r1397073029
##########
modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java:
##########
@@ -268,46 +224,25 @@ private Object sendExecuteResponse(@Nullable Object
result, @Nullable Throwable
return null;
}
- private <R, J extends ComputeJob<R>> Class<J> jobClass(ClassLoader
jobClassLoader, String jobClassName) {
- try {
- return (Class<J>) Class.forName(jobClassName, true,
jobClassLoader);
- } catch (ClassNotFoundException e) {
- throw new IgniteInternalException("Cannot load job class by name
'" + jobClassName + "'", e);
- }
- }
-
- private CompletableFuture<JobContext> jobClassLoader(List<DeploymentUnit>
units) {
- return jobContextManager.acquireClassLoader(units);
- }
Review Comment:
What's the point of inlining this method?
##########
modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponent.java:
##########
@@ -35,7 +35,20 @@ public interface ComputeComponent extends IgniteComponent {
* @param <R> result type
* @return future execution result
*/
- <R> CompletableFuture<R> executeLocally(List<DeploymentUnit> units, String
jobClassName, Object... args);
Review Comment:
This javadoc should be copied to the corresponding default method and
options parameter description should be added.
I wonder why checkStyle didn't catch both these issues.
##########
modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/ComputeExecutorImpl.java:
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute.queue;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.JobExecutionContext;
+import org.apache.ignite.internal.compute.ExecutionOptions;
+import org.apache.ignite.internal.compute.JobExecutionContextImpl;
+import org.apache.ignite.internal.compute.configuration.ComputeConfiguration;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+
+/**
+ * Base implementation of {@link ComputeExecutor}.
+ */
+public class ComputeExecutorImpl implements ComputeExecutor {
+ private static final IgniteLogger LOG =
Loggers.forClass(ComputeExecutorImpl.class);
+
+ private final Ignite ignite;
+
+ private final ComputeConfiguration configuration;
+
+ private PriorityQueueExecutor executorService;
+
+ public ComputeExecutorImpl(Ignite ignite, ComputeConfiguration
configuration) {
+ this.ignite = ignite;
+ this.configuration = configuration;
+ }
+
+ @Override
+ public <R> CompletableFuture<R> executeJob(ExecutionOptions options,
ComputeJob<R> job, Object[] args) {
+ JobExecutionContext context = new JobExecutionContextImpl(ignite);
+
+ return executorService.submit(() -> job.execute(context, args),
options.priority());
+ }
+
+ @Override
+ public void start() {
+ executorService = new PriorityQueueExecutor(
+ configuration,
+ new
NamedThreadFactory(NamedThreadFactory.threadPrefix(ignite.name(), "compute"),
LOG)
+ );
+ }
+
+ @Override
+ public void stop() {
+ executorService.shutdown();
+ }
+
+ long stopTimeoutMillis() {
+ return configuration.threadPoolStopTimeoutMillis().value();
+ }
Review Comment:
This method is not used here, it should be moved to the
`PriorityQueueExecutor` and used there.
##########
modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java:
##########
@@ -75,111 +58,118 @@ public class ComputeComponentImpl implements
ComputeComponent {
private final InFlightFutures inFlightFutures = new InFlightFutures();
- private final Ignite ignite;
-
private final MessagingService messagingService;
- private final ComputeConfiguration configuration;
-
private final JobContextManager jobContextManager;
- private ExecutorService jobExecutorService;
+ private final ComputeExecutor executor;
/**
* Creates a new instance.
*/
public ComputeComponentImpl(
- Ignite ignite,
MessagingService messagingService,
- ComputeConfiguration configuration,
- JobContextManager jobContextManager) {
- this.ignite = ignite;
+ JobContextManager jobContextManager,
+ ComputeExecutor executor
+ ) {
this.messagingService = messagingService;
- this.configuration = configuration;
this.jobContextManager = jobContextManager;
+ this.executor = executor;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void start() {
+ executor.start();
+
+ messagingService.addMessageHandler(ComputeMessageTypes.class,
(message, senderConsistentId, correlationId) -> {
+ assert correlationId != null;
+
+ if (message instanceof ExecuteRequest) {
+ processExecuteRequest((ExecuteRequest) message,
senderConsistentId, correlationId);
+ }
+ });
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void stop() throws Exception {
+ if (!stopGuard.compareAndSet(false, true)) {
+ return;
+ }
+
+ busyLock.block();
+
+ executor.stop();
+
+ inFlightFutures.cancelInFlightFutures();
}
/** {@inheritDoc} */
@Override
- public <R> CompletableFuture<R> executeLocally(List<DeploymentUnit> units,
String jobClassName, Object... args) {
+ public <R> CompletableFuture<R> executeLocally(
+ ExecutionOptions options,
+ List<DeploymentUnit> units,
+ String jobClassName,
+ Object... args
+ ) {
if (!busyLock.enterBusy()) {
return failedFuture(new NodeStoppingException());
}
try {
- return mapClassLoaderExceptions(jobClassLoader(units),
jobClassName)
- .thenCompose(context -> doExecuteLocally(this.<R,
ComputeJob<R>>jobClass(context.classLoader(), jobClassName), args)
- .whenComplete((r, e) -> context.close())
+ CompletableFuture<JobContext> jobContextCompletableFuture =
mapClassLoaderExceptions(
+ jobContextManager.acquireClassLoader(units), jobClassName);
+ return jobContextCompletableFuture
+ .thenCompose(context ->
+ doExecuteLocally(options,
ComputeUtils.<R>instantiateJob(context.classLoader(), jobClassName), args)
+ .whenComplete((r, e) -> context.close())
);
} finally {
busyLock.leaveBusy();
}
}
- private <R> CompletableFuture<R> doExecuteLocally(Class<? extends
ComputeJob<R>> jobClass, Object[] args) {
- assert jobExecutorService != null : "Not started yet!";
-
- CompletableFuture<R> future = startLocalExecution(jobClass, args);
+ private <R> CompletableFuture<R> doExecuteLocally(ExecutionOptions
options, ComputeJob<R> jobInstance, Object[] args) {
+ CompletableFuture<R> future = executor.executeJob(options,
jobInstance, args);
inFlightFutures.registerFuture(future);
return future;
}
- private <R> CompletableFuture<R> startLocalExecution(Class<? extends
ComputeJob<R>> jobClass, Object[] args) {
- try {
- return CompletableFuture.supplyAsync(() -> executeJob(jobClass,
args), jobExecutorService);
- } catch (RejectedExecutionException e) {
- return failedFuture(e);
- }
- }
-
- private <R> R executeJob(Class<? extends ComputeJob<R>> jobClass, Object[]
args) {
- ComputeJob<R> job = instantiateJob(jobClass);
- JobExecutionContext context = new JobExecutionContextImpl(ignite);
- // TODO: IGNITE-16746 - translate NodeStoppingException to a public
exception
Review Comment:
What about translating this exception? I can't find this translation in the
new code.
##########
modules/compute/src/test/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutorTest.java:
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute.queue;
+
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willTimeoutIn;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.compute.configuration.ComputeConfiguration;
+import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Test suite for {@link PriorityQueueExecutor}.
+ */
+@ExtendWith(ConfigurationExtension.class)
+public class PriorityQueueExecutorTest extends BaseIgniteAbstractTest {
+ private static final IgniteLogger LOG =
Loggers.forClass(PriorityQueueExecutorTest.class);
+
+ @InjectConfiguration
+ private ComputeConfiguration configuration;
+
+ private PriorityQueueExecutor priorityQueueExecutor;
+
+
+ private void initExecutor(int threads) {
+ if (priorityQueueExecutor != null) {
+ priorityQueueExecutor.shutdown();
+ }
+
+ assertThat(configuration.change(computeChange ->
computeChange.changeThreadPoolSize(threads)), willCompleteSuccessfully());
+
+ priorityQueueExecutor = new PriorityQueueExecutor(
+ configuration,
+ new
NamedThreadFactory(NamedThreadFactory.threadPrefix("testNode", "compute"), LOG)
+ );
+ }
+
+ @Test
+ public void testQueueIsWorking() {
+ initExecutor(1);
+
+ CountDownLatch latch1 = new CountDownLatch(1);
+ CountDownLatch latch2 = new CountDownLatch(1);
+
+ CompletableFuture<Integer> task1 = priorityQueueExecutor.submit(() -> {
+ latch1.await();
+ return 0;
+ });
+
+ CompletableFuture<Integer> task2 = priorityQueueExecutor.submit(() -> {
+ latch2.await();
+ return 1;
+ });
+
+ assertThat(task1.isDone(), is(false));
+ assertThat(task2.isDone(), is(false));
+
+ latch2.countDown();
+
+ assertThat(task1, willTimeoutIn(100, TimeUnit.MILLISECONDS));
+ assertThat(task2, willTimeoutIn(100, TimeUnit.MILLISECONDS));
+
+ latch1.countDown();
+
+ assertThat(task1, willCompleteSuccessfully());
+ assertThat(task2, willCompleteSuccessfully());
+ }
+
+ @Test
+ public void testSubmitWithPriority() {
+ initExecutor(1);
+
+ CountDownLatch latch1 = new CountDownLatch(1);
+ CountDownLatch latch2 = new CountDownLatch(1);
+ CountDownLatch latch3 = new CountDownLatch(1);
+
+ CompletableFuture<Integer> task1 = priorityQueueExecutor.submit(() -> {
+ latch1.await();
+ return 0;
+ });
+
+ CompletableFuture<Integer> task2 = priorityQueueExecutor.submit(() -> {
+ latch2.await();
+ return 1;
+ }, 1);
+
+ CompletableFuture<Integer> task3 = priorityQueueExecutor.submit(() -> {
+ latch3.await();
+ return 1;
+ }, 2);
+
+
+ assertThat(task1.isDone(), is(false));
+ assertThat(task2.isDone(), is(false));
+ assertThat(task3.isDone(), is(false));
+
+ latch1.countDown();
+
+ assertThat(task1, willCompleteSuccessfully());
+ assertThat(task2.isDone(), is(false));
+ assertThat(task3.isDone(), is(false));
+
+ //Current executing task is 3 because of priority.
+ latch2.countDown();
+ assertThat(task2, willTimeoutIn(100, TimeUnit.MILLISECONDS));
+ assertThat(task3, willTimeoutIn(100, TimeUnit.MILLISECONDS));
+
+ latch3.countDown();
+ assertThat(task2, willCompleteSuccessfully());
+ assertThat(task3, willCompleteSuccessfully());
+ }
+
+ @Test
+ public void testSameOrder() {
+ initExecutor(1);
+
+ CountDownLatch latch1 = new CountDownLatch(1);
+ CountDownLatch latch2 = new CountDownLatch(1);
+ CountDownLatch latch3 = new CountDownLatch(1);
+
+ CompletableFuture<Integer> task1 = priorityQueueExecutor.submit(() -> {
+ latch1.await();
+ return 0;
+ }, 1);
+
+ CompletableFuture<Integer> task2 = priorityQueueExecutor.submit(() -> {
+ latch2.await();
+ return 1;
+ }, 1);
+
+ CompletableFuture<Integer> task3 = priorityQueueExecutor.submit(() -> {
+ latch3.await();
+ return 1;
+ }, 1);
+
+ assertThat(task1.isDone(), is(false));
+ assertThat(task2.isDone(), is(false));
+ assertThat(task3.isDone(), is(false));
+
+ latch1.countDown();
+ assertThat(task1, willCompleteSuccessfully());
+ assertThat(task2.isDone(), is(false));
+ assertThat(task3.isDone(), is(false));
+
+ //Current executing task is 3 because it was first.
Review Comment:
```suggestion
//Current executing task is 2 because it was added first.
```
##########
modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueEntry.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute.queue;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Class for queue's entries.
+ * This class implement comparable mechanism for {@link
it.unimi.dsi.fastutil.PriorityQueue}
+ * which used as queue in {@link java.util.concurrent.ThreadPoolExecutor}.
+ * Each entry has unique seqNum which used for comparable entries with
identity priority.
Review Comment:
```suggestion
* Each entry has unique seqNum which is used for comparing entries with
identical priority.
```
##########
modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java:
##########
@@ -75,111 +58,118 @@ public class ComputeComponentImpl implements
ComputeComponent {
private final InFlightFutures inFlightFutures = new InFlightFutures();
- private final Ignite ignite;
-
private final MessagingService messagingService;
- private final ComputeConfiguration configuration;
-
private final JobContextManager jobContextManager;
- private ExecutorService jobExecutorService;
+ private final ComputeExecutor executor;
/**
* Creates a new instance.
*/
public ComputeComponentImpl(
- Ignite ignite,
MessagingService messagingService,
- ComputeConfiguration configuration,
- JobContextManager jobContextManager) {
- this.ignite = ignite;
+ JobContextManager jobContextManager,
+ ComputeExecutor executor
+ ) {
this.messagingService = messagingService;
- this.configuration = configuration;
this.jobContextManager = jobContextManager;
+ this.executor = executor;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void start() {
+ executor.start();
+
+ messagingService.addMessageHandler(ComputeMessageTypes.class,
(message, senderConsistentId, correlationId) -> {
+ assert correlationId != null;
+
+ if (message instanceof ExecuteRequest) {
+ processExecuteRequest((ExecuteRequest) message,
senderConsistentId, correlationId);
+ }
+ });
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void stop() throws Exception {
+ if (!stopGuard.compareAndSet(false, true)) {
+ return;
+ }
+
+ busyLock.block();
+
+ executor.stop();
+
+ inFlightFutures.cancelInFlightFutures();
}
/** {@inheritDoc} */
@Override
- public <R> CompletableFuture<R> executeLocally(List<DeploymentUnit> units,
String jobClassName, Object... args) {
+ public <R> CompletableFuture<R> executeLocally(
+ ExecutionOptions options,
+ List<DeploymentUnit> units,
+ String jobClassName,
+ Object... args
+ ) {
if (!busyLock.enterBusy()) {
return failedFuture(new NodeStoppingException());
}
try {
- return mapClassLoaderExceptions(jobClassLoader(units),
jobClassName)
- .thenCompose(context -> doExecuteLocally(this.<R,
ComputeJob<R>>jobClass(context.classLoader(), jobClassName), args)
- .whenComplete((r, e) -> context.close())
+ CompletableFuture<JobContext> jobContextCompletableFuture =
mapClassLoaderExceptions(
+ jobContextManager.acquireClassLoader(units), jobClassName);
+ return jobContextCompletableFuture
+ .thenCompose(context ->
+ doExecuteLocally(options,
ComputeUtils.<R>instantiateJob(context.classLoader(), jobClassName), args)
Review Comment:
There's a subtle difference - previously job was instantiated on the
executor thread, I wonder whether it could have any implications or not.
##########
modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java:
##########
@@ -75,111 +58,118 @@ public class ComputeComponentImpl implements
ComputeComponent {
private final InFlightFutures inFlightFutures = new InFlightFutures();
- private final Ignite ignite;
-
private final MessagingService messagingService;
- private final ComputeConfiguration configuration;
-
private final JobContextManager jobContextManager;
- private ExecutorService jobExecutorService;
+ private final ComputeExecutor executor;
/**
* Creates a new instance.
*/
public ComputeComponentImpl(
- Ignite ignite,
MessagingService messagingService,
- ComputeConfiguration configuration,
- JobContextManager jobContextManager) {
- this.ignite = ignite;
+ JobContextManager jobContextManager,
+ ComputeExecutor executor
+ ) {
this.messagingService = messagingService;
- this.configuration = configuration;
this.jobContextManager = jobContextManager;
+ this.executor = executor;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void start() {
+ executor.start();
+
+ messagingService.addMessageHandler(ComputeMessageTypes.class,
(message, senderConsistentId, correlationId) -> {
+ assert correlationId != null;
+
+ if (message instanceof ExecuteRequest) {
+ processExecuteRequest((ExecuteRequest) message,
senderConsistentId, correlationId);
+ }
+ });
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void stop() throws Exception {
+ if (!stopGuard.compareAndSet(false, true)) {
+ return;
+ }
+
+ busyLock.block();
+
+ executor.stop();
+
+ inFlightFutures.cancelInFlightFutures();
}
/** {@inheritDoc} */
@Override
- public <R> CompletableFuture<R> executeLocally(List<DeploymentUnit> units,
String jobClassName, Object... args) {
+ public <R> CompletableFuture<R> executeLocally(
+ ExecutionOptions options,
+ List<DeploymentUnit> units,
+ String jobClassName,
+ Object... args
+ ) {
if (!busyLock.enterBusy()) {
return failedFuture(new NodeStoppingException());
}
try {
- return mapClassLoaderExceptions(jobClassLoader(units),
jobClassName)
- .thenCompose(context -> doExecuteLocally(this.<R,
ComputeJob<R>>jobClass(context.classLoader(), jobClassName), args)
- .whenComplete((r, e) -> context.close())
+ CompletableFuture<JobContext> jobContextCompletableFuture =
mapClassLoaderExceptions(
+ jobContextManager.acquireClassLoader(units), jobClassName);
+ return jobContextCompletableFuture
Review Comment:
```suggestion
return
mapClassLoaderExceptions(jobContextManager.acquireClassLoader(units),
jobClassName)
```
##########
modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/ComputeExecutorImpl.java:
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute.queue;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.JobExecutionContext;
+import org.apache.ignite.internal.compute.ExecutionOptions;
+import org.apache.ignite.internal.compute.JobExecutionContextImpl;
+import org.apache.ignite.internal.compute.configuration.ComputeConfiguration;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+
+/**
+ * Base implementation of {@link ComputeExecutor}.
+ */
+public class ComputeExecutorImpl implements ComputeExecutor {
+ private static final IgniteLogger LOG =
Loggers.forClass(ComputeExecutorImpl.class);
+
+ private final Ignite ignite;
+
+ private final ComputeConfiguration configuration;
+
+ private PriorityQueueExecutor executorService;
+
+ public ComputeExecutorImpl(Ignite ignite, ComputeConfiguration
configuration) {
+ this.ignite = ignite;
+ this.configuration = configuration;
+ }
+
+ @Override
+ public <R> CompletableFuture<R> executeJob(ExecutionOptions options,
ComputeJob<R> job, Object[] args) {
+ JobExecutionContext context = new JobExecutionContextImpl(ignite);
Review Comment:
Should we assert that the `executorService` is not null here as in previous
code?
##########
modules/compute/src/main/java/org/apache/ignite/internal/compute/configuration/ComputeConfigurationSchema.java:
##########
@@ -39,4 +39,8 @@ public class ComputeConfigurationSchema {
@Range(min = 1)
@Value(hasDefault = true)
public final long threadPoolStopTimeoutMillis = 10_000;
+
+ @Range(min = 1)
+ @Value(hasDefault = true)
+ public final int queueMaxSize = Integer.MAX_VALUE;
Review Comment:
Please add javadoc.
##########
modules/compute/src/main/java/org/apache/ignite/internal/compute/message/ExecuteRequest.java:
##########
@@ -20,16 +20,20 @@
import java.util.List;
import java.util.Set;
import org.apache.ignite.internal.compute.ComputeMessageTypes;
+import org.apache.ignite.internal.compute.ExecutionOptions;
import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.annotations.Marshallable;
import org.apache.ignite.network.annotations.Transferable;
import org.jetbrains.annotations.Nullable;
/**
- * Used to implement remote job execution in {@link
org.apache.ignite.compute.IgniteCompute#execute(Set, Class, Object...)}.
+ * Used to implement remote job execution in {@link
org.apache.ignite.compute.IgniteCompute#execute(Set, List, String, Object...)}.
*/
@Transferable(value = ComputeMessageTypes.EXECUTE_REQUEST)
public interface ExecuteRequest extends NetworkMessage {
+ @Marshallable
Review Comment:
Please add javadoc.
##########
modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueEntry.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute.queue;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Class for queue's entries.
+ * This class implement comparable mechanism for {@link
it.unimi.dsi.fastutil.PriorityQueue}
Review Comment:
```suggestion
* This class implement comparable mechanism for {@link
java.util.concurrent.PriorityBlockingQueue}
```
##########
modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueEntry.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute.queue;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Class for queue's entries.
+ * This class implement comparable mechanism for {@link
it.unimi.dsi.fastutil.PriorityQueue}
+ * which used as queue in {@link java.util.concurrent.ThreadPoolExecutor}.
+ * Each entry has unique seqNum which used for comparable entries with
identity priority.
+ * It means that entries with same priority has FIFO resolving strategy in
queue.
+ *
+ * @param <R> Compute job return type.
+ */
+public class QueueEntry<R> implements Runnable, Comparable<QueueEntry<R>> {
+ private static final AtomicLong seq = new AtomicLong(Long.MIN_VALUE);
+
+ private final CompletableFuture<R> future = new CompletableFuture<>();
+
+ private final Callable<R> job;
+
+ private final int priority;
+
+ private final long seqNum;
+
+ /**
+ * Constructor.
+ *
+ * @param job Compute job callable.
+ * @param priority Job priority.
+ */
+ public QueueEntry(Callable<R> job, int priority) {
+ this.job = job;
+ this.priority = priority;
+ seqNum = seq.getAndIncrement();
+ }
+
+ @Override
+ public void run() {
+ try {
+ future.complete(job.call());
+ } catch (Exception e) {
+ future.completeExceptionally(e);
+ }
+ }
+
+ /**
+ * To {@link CompletableFuture} transformer.
+ *
+ * @return Completable future that will be finished when Compute job
finished.
Review Comment:
```suggestion
* @return Completable future that will be finished when Compute job is
finished.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]