valepakh commented on code in PR #2843:
URL: https://github.com/apache/ignite-3/pull/2843#discussion_r1398886117
##########
modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponent.java:
##########
@@ -30,21 +30,72 @@ public interface ComputeComponent extends IgniteComponent {
/**
* Executes a job of the given class on the current node.
*
- * @param jobClassName name of the job class
- * @param args job args
- * @param <R> result type
- * @return future execution result
+ * @param options Job execution options.
+ * @param jobClassName Name of the job class.
+ * @param units Deployment units which will be loaded for execution.
Review Comment:
```suggestion
* @param units Deployment units which will be loaded for execution.
* @param jobClassName Name of the job class.
```
##########
modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponent.java:
##########
@@ -30,21 +30,72 @@ public interface ComputeComponent extends IgniteComponent {
/**
* Executes a job of the given class on the current node.
*
- * @param jobClassName name of the job class
- * @param args job args
- * @param <R> result type
- * @return future execution result
+ * @param options Job execution options.
+ * @param jobClassName Name of the job class.
+ * @param units Deployment units which will be loaded for execution.
+ * @param args Job args.
+ * @param <R> Job result type.
+ * @return Future execution result.
+ */
+ <R> CompletableFuture<R> executeLocally(
+ ExecutionOptions options,
+ List<DeploymentUnit> units,
+ String jobClassName,
+ Object... args
+ );
+
+ /**
+ * Executes a job of the given class on the current node with default
execution options {@link ExecutionOptions.DEFAULT}.
+ *
+ * @param jobClassName Name of the job class.
+ * @param units Deployment units which will be loaded for execution.
+ * @param args Job args.
+ * @param <R> Job result type.
+ * @return Future execution result.
*/
- <R> CompletableFuture<R> executeLocally(List<DeploymentUnit> units, String
jobClassName, Object... args);
+ default <R> CompletableFuture<R> executeLocally(
+ List<DeploymentUnit> units,
+ String jobClassName,
+ Object... args
+ ) {
+ return executeLocally(ExecutionOptions.DEFAULT, units, jobClassName,
args);
+ }
/**
* Executes a job of the given class on a remote node.
*
+ * @param options Job execution options.
+ * @param remoteNode name of the job class
Review Comment:
```suggestion
* @param remoteNode name of the job class.
```
##########
modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponent.java:
##########
@@ -30,21 +30,72 @@ public interface ComputeComponent extends IgniteComponent {
/**
* Executes a job of the given class on the current node.
*
- * @param jobClassName name of the job class
- * @param args job args
- * @param <R> result type
- * @return future execution result
+ * @param options Job execution options.
+ * @param jobClassName Name of the job class.
+ * @param units Deployment units which will be loaded for execution.
+ * @param args Job args.
+ * @param <R> Job result type.
+ * @return Future execution result.
+ */
+ <R> CompletableFuture<R> executeLocally(
+ ExecutionOptions options,
+ List<DeploymentUnit> units,
+ String jobClassName,
+ Object... args
+ );
+
+ /**
+ * Executes a job of the given class on the current node with default
execution options {@link ExecutionOptions.DEFAULT}.
+ *
+ * @param jobClassName Name of the job class.
+ * @param units Deployment units which will be loaded for execution.
+ * @param args Job args.
+ * @param <R> Job result type.
+ * @return Future execution result.
*/
- <R> CompletableFuture<R> executeLocally(List<DeploymentUnit> units, String
jobClassName, Object... args);
+ default <R> CompletableFuture<R> executeLocally(
+ List<DeploymentUnit> units,
+ String jobClassName,
+ Object... args
+ ) {
+ return executeLocally(ExecutionOptions.DEFAULT, units, jobClassName,
args);
+ }
/**
* Executes a job of the given class on a remote node.
*
+ * @param options Job execution options.
+ * @param remoteNode name of the job class
+ * @param units Deployment units which will be loaded for execution.
+ * @param jobClassName name of the job class
+ * @param args Job args.
+ * @param <R> Job result type.
+ * @return Future execution result.
+ */
+ <R> CompletableFuture<R> executeRemotely(
+ ExecutionOptions options,
+ ClusterNode remoteNode,
+ List<DeploymentUnit> units,
+ String jobClassName,
+ Object... args
+ );
+
+ /**
+ * Executes a job of the given class on a remote node with default
execution options {@link ExecutionOptions.DEFAULT}.
+ *
* @param remoteNode name of the job class
+ * @param units Deployment units which will be loaded for execution.
* @param jobClassName name of the job class
Review Comment:
```suggestion
* @param jobClassName name of the job class.
```
##########
modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/LimitedPriorityBlockingQueue.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute.queue;
+
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+/**
+ * Implementation of {@link PriorityBlockingQueue} with max size limitation.
+ *
+ * @param <E> The type of elements held in this queue.
+ */
+public class LimitedPriorityBlockingQueue<E> extends PriorityBlockingQueue<E> {
+ private final Supplier<Integer> maxSize;
+
+ /**
+ * Constructor.
+ *
+ * @param maxSize Max queue size supplier.
+ */
+ public LimitedPriorityBlockingQueue(Supplier<Integer> maxSize) {
+ this.maxSize = maxSize;
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param maxSize Max queue size supplier.
+ * @param initialCapacity Initial queue capacity.
+ */
+ public LimitedPriorityBlockingQueue(Supplier<Integer> maxSize, int
initialCapacity) {
+ super(initialCapacity);
+ this.maxSize = maxSize;
+ checkInsert(initialCapacity);
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param maxSize Max queue size supplier.
+ * @param initialCapacity Initial queue capacity.
+ * @param comparator the comparator that will be used to order this
priority queue.
+ * If {@code null}, the {@linkplain Comparable natural ordering} of
the elements will be used.
+ */
+ public LimitedPriorityBlockingQueue(Supplier<Integer> maxSize, int
initialCapacity, Comparator<? super E> comparator) {
+ super(initialCapacity, comparator);
+ this.maxSize = maxSize;
+ checkInsert(initialCapacity);
+ }
+
+ @Override
+ public boolean add(E o) {
+ checkInsert(1);
+ return super.add(o);
+ }
+
+ @Override
+ public boolean offer(E o) {
+ checkInsert(1);
+ return super.offer(o);
+ }
+
+ @Override
+ public boolean offer(E e, long timeout, TimeUnit unit) {
+ checkInsert(1);
+ return super.offer(e, timeout, unit);
+ }
+
+ @Override
+ public void put(E o) {
+ checkInsert(1);
+ super.put(o);
+ }
+
+ @Override
+ public int remainingCapacity() {
+ return maxSize.get() - size();
+ }
+
+ @Override
+ public boolean addAll(Collection<? extends E> c) {
+ checkInsert(c.size());
+ return super.addAll(c);
+ }
+
+ private void checkInsert(int size) {
+ Integer maxSize = this.maxSize.get();
+ int currentSize = size();
+ if (currentSize + size > maxSize) {
Review Comment:
Better safe, than sorry
```suggestion
if (currentSize > maxSize - size ) {
```
##########
modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponent.java:
##########
@@ -30,21 +30,72 @@ public interface ComputeComponent extends IgniteComponent {
/**
* Executes a job of the given class on the current node.
*
- * @param jobClassName name of the job class
- * @param args job args
- * @param <R> result type
- * @return future execution result
+ * @param options Job execution options.
+ * @param jobClassName Name of the job class.
+ * @param units Deployment units which will be loaded for execution.
+ * @param args Job args.
+ * @param <R> Job result type.
+ * @return Future execution result.
+ */
+ <R> CompletableFuture<R> executeLocally(
+ ExecutionOptions options,
+ List<DeploymentUnit> units,
+ String jobClassName,
+ Object... args
+ );
+
+ /**
+ * Executes a job of the given class on the current node with default
execution options {@link ExecutionOptions.DEFAULT}.
+ *
+ * @param jobClassName Name of the job class.
+ * @param units Deployment units which will be loaded for execution.
Review Comment:
```suggestion
* @param units Deployment units which will be loaded for execution.
* @param jobClassName Name of the job class.
```
##########
modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponent.java:
##########
@@ -30,21 +30,72 @@ public interface ComputeComponent extends IgniteComponent {
/**
* Executes a job of the given class on the current node.
*
- * @param jobClassName name of the job class
- * @param args job args
- * @param <R> result type
- * @return future execution result
+ * @param options Job execution options.
+ * @param jobClassName Name of the job class.
+ * @param units Deployment units which will be loaded for execution.
+ * @param args Job args.
+ * @param <R> Job result type.
+ * @return Future execution result.
+ */
+ <R> CompletableFuture<R> executeLocally(
+ ExecutionOptions options,
+ List<DeploymentUnit> units,
+ String jobClassName,
+ Object... args
+ );
+
+ /**
+ * Executes a job of the given class on the current node with default
execution options {@link ExecutionOptions.DEFAULT}.
+ *
+ * @param jobClassName Name of the job class.
+ * @param units Deployment units which will be loaded for execution.
+ * @param args Job args.
+ * @param <R> Job result type.
+ * @return Future execution result.
*/
- <R> CompletableFuture<R> executeLocally(List<DeploymentUnit> units, String
jobClassName, Object... args);
+ default <R> CompletableFuture<R> executeLocally(
+ List<DeploymentUnit> units,
+ String jobClassName,
+ Object... args
+ ) {
+ return executeLocally(ExecutionOptions.DEFAULT, units, jobClassName,
args);
+ }
/**
* Executes a job of the given class on a remote node.
*
+ * @param options Job execution options.
+ * @param remoteNode name of the job class
+ * @param units Deployment units which will be loaded for execution.
+ * @param jobClassName name of the job class
Review Comment:
```suggestion
* @param jobClassName name of the job class.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]