PakhomovAlexander commented on code in PR #2843: URL: https://github.com/apache/ignite-3/pull/2843#discussion_r1399197562
########## modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/LimitedPriorityBlockingQueue.java: ########## @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.compute.queue; + +import java.util.Collection; +import java.util.Comparator; +import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +/** + * Implementation of {@link PriorityBlockingQueue} with max size limitation. + * + * @param <E> The type of elements held in this queue. + */ +public class LimitedPriorityBlockingQueue<E> extends PriorityBlockingQueue<E> { + private final Supplier<Integer> maxSize; + + /** + * Constructor. + * + * @param maxSize Max queue size supplier. + */ + public LimitedPriorityBlockingQueue(Supplier<Integer> maxSize) { + this.maxSize = maxSize; + } + + /** + * Constructor. + * + * @param maxSize Max queue size supplier. + * @param initialCapacity Initial queue capacity. + */ + public LimitedPriorityBlockingQueue(Supplier<Integer> maxSize, int initialCapacity) { + super(initialCapacity); + this.maxSize = maxSize; + checkInsert(initialCapacity); + } + + /** + * Constructor. + * + * @param maxSize Max queue size supplier. + * @param initialCapacity Initial queue capacity. + * @param comparator the comparator that will be used to order this priority queue. + * If {@code null}, the {@linkplain Comparable natural ordering} of the elements will be used. + */ + public LimitedPriorityBlockingQueue(Supplier<Integer> maxSize, int initialCapacity, Comparator<? super E> comparator) { + super(initialCapacity, comparator); + this.maxSize = maxSize; + checkInsert(initialCapacity); + } + + @Override + public boolean add(E o) { + checkInsert(1); + return super.add(o); + } + + @Override + public boolean offer(E o) { + checkInsert(1); + return super.offer(o); + } + + @Override + public boolean offer(E e, long timeout, TimeUnit unit) { + checkInsert(1); + return super.offer(e, timeout, unit); + } + + @Override + public void put(E o) { + checkInsert(1); Review Comment: `checkInsert` and any actions that are taken after that are not synchronized. Shall we check and do an action in a thread-safe manner? ########## modules/compute/src/test/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutorTest.java: ########## @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.compute.queue; + +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willTimeoutIn; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.apache.ignite.internal.compute.configuration.ComputeConfiguration; +import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; +import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.Loggers; +import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; +import org.apache.ignite.internal.thread.NamedThreadFactory; +import org.apache.ignite.lang.IgniteException; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +/** + * Test suite for {@link PriorityQueueExecutor}. + */ +@ExtendWith(ConfigurationExtension.class) +public class PriorityQueueExecutorTest extends BaseIgniteAbstractTest { + private static final IgniteLogger LOG = Loggers.forClass(PriorityQueueExecutorTest.class); + + @InjectConfiguration + private ComputeConfiguration configuration; + + private PriorityQueueExecutor priorityQueueExecutor; + + private void initExecutor(int threads) { + initExecutor(threads, Integer.MAX_VALUE); + } + + private void initExecutor(int threads, int maxQueueSize) { + if (priorityQueueExecutor != null) { + priorityQueueExecutor.shutdown(); + } + + assertThat( + configuration.change(computeChange -> computeChange.changeThreadPoolSize(threads).changeQueueMaxSize(maxQueueSize)), + willCompleteSuccessfully() + ); + + priorityQueueExecutor = new PriorityQueueExecutor( + configuration, + new NamedThreadFactory(NamedThreadFactory.threadPrefix("testNode", "compute"), LOG) + ); + } + + @Test + public void testQueueIsWorking() { + initExecutor(1); + + CountDownLatch latch1 = new CountDownLatch(1); + CountDownLatch latch2 = new CountDownLatch(1); + + CompletableFuture<Integer> task1 = priorityQueueExecutor.submit(() -> { + latch1.await(); + return 0; + }); + + CompletableFuture<Integer> task2 = priorityQueueExecutor.submit(() -> { + latch2.await(); + return 1; + }); + + assertThat(task1.isDone(), is(false)); + assertThat(task2.isDone(), is(false)); + + latch2.countDown(); + + assertThat(task1, willTimeoutIn(100, TimeUnit.MILLISECONDS)); + assertThat(task2, willTimeoutIn(100, TimeUnit.MILLISECONDS)); Review Comment: Could you add a couple of words about priority and why `task2` is not executed? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
