denis-chudov commented on code in PR #2380:
URL: https://github.com/apache/ignite-3/pull/2380#discussion_r1281100194
##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java:
##########
@@ -571,17 +570,17 @@ public static Set<String> filterDataNodes(
/**
* Create an executor for the zone manager.
- * Used a single thread executor to avoid concurrent executing several
tasks for the same zone.
+ * Used a striped thread executor to avoid concurrent executing several
tasks for the same zone.
* ScheduledThreadPoolExecutor guarantee that tasks scheduled for exactly
the same
* execution time are enabled in first-in-first-out (FIFO) order of
submission.
- * // TODO: IGNITE-19783 Need to use a striped executor.
*
+ * @param concurrentLvl Number of threads.
* @param namedThreadFactory Named thread factory.
* @return Executor.
*/
- static ScheduledExecutorService
createZoneManagerExecutor(NamedThreadFactory namedThreadFactory) {
- return new ScheduledThreadPoolExecutor(
- 1,
+ static StripedScheduledThreadPoolExecutor createZoneManagerExecutor(int
concurrentLvl, NamedThreadFactory namedThreadFactory) {
Review Comment:
```suggestion
static StripedScheduledThreadPoolExecutor createZoneManagerExecutor(int
concurrencyLvl, NamedThreadFactory namedThreadFactory) {
```
##########
modules/core/src/main/java/org/apache/ignite/internal/thread/StripedScheduledThreadPoolExecutor.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.thread;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * An {@link ScheduledExecutorService} that executes submitted tasks using
pooled grid threads.
+ */
+public class StripedScheduledThreadPoolExecutor extends
AbstractStripedThreadPoolExecutor<ScheduledExecutorService>
+ implements ScheduledExecutorService {
+ /**
+ * Create striped scheduled thread pool.
+ *
+ * @param concurrentLvl Concurrency level.
+ * @param threadFactory The factory to use when the executor creates a new
thread.
+ * @param executionHandler The handler to use when execution is blocked
+ * because the thread bounds and queue capacities are reached.
+ */
+ public StripedScheduledThreadPoolExecutor(
+ int concurrentLvl,
+ ThreadFactory threadFactory,
+ RejectedExecutionHandler executionHandler
+ ) {
+ super(createExecutors(concurrentLvl, threadFactory, executionHandler));
+ }
+
+ private static ScheduledExecutorService[] createExecutors(
+ int concurrentLvl,
Review Comment:
```suggestion
int concurrencyLvl,
```
##########
modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZonesSchedulersTest.java:
##########
@@ -258,21 +270,24 @@ void testCancelScaleDownTasksOnStopTimers() {
* if it is not started and has a delay greater than zero.
*/
private static void testCancelTask(
- BiConsumer<Long, Runnable> fn,
+ IgniteTriConsumer<Long, Runnable, Integer> fn,
Runnable stopTask,
Supplier<Boolean> isTaskCancelled
) {
CountDownLatch latch = new CountDownLatch(1);
- fn.accept(0L, () -> {
- try {
- assertTrue(latch.await(3, TimeUnit.SECONDS));
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- });
+ fn.accept(0L,
+ () -> {
+ try {
+ assertTrue(latch.await(3, TimeUnit.SECONDS));
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ },
+ STRIPE_0);
- fn.accept(1L, () -> {});
+ fn.accept(1L, () -> {
+ }, STRIPE_1);
Review Comment:
this also needs correct formatting, and the same cases below
##########
modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZonesSchedulersTest.java:
##########
@@ -183,7 +194,8 @@ private static void testOrdering(BiConsumer<Long, Runnable>
fn) throws Interrupt
if (counter.get() != j) {
sequentialOrder.set(false);
}
- });
+ },
+ STRIPE_0);
Review Comment:
this needs correct formatting
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]