rpuch commented on code in PR #3562:
URL: https://github.com/apache/ignite-3/pull/3562#discussion_r1553798055
##########
modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerSubscriber.java:
##########
@@ -258,11 +254,7 @@ private void initFlushTimer() {
return;
}
- String threadPrefix = "client-data-streamer-flush-" + hashCode();
-
- flushTimer = Executors.newSingleThreadScheduledExecutor(new
NamedThreadFactory(threadPrefix, log));
-
- flushTask = flushTimer.scheduleAtFixedRate(this::flushBuffers,
interval, interval, TimeUnit.MILLISECONDS);
+ flushTask = flushExecutor.scheduleAtFixedRate(this::flushBuffers,
interval, interval, TimeUnit.MILLISECONDS);
Review Comment:
Is there any guarantee that `flushTask` will be assigned in the same thread
in which it is then read? The field is not volatile, so this looks suspiciously.
##########
modules/core/src/test/java/org/apache/ignite/internal/streamer/StreamerSubscriberTest.java:
##########
@@ -23,16 +23,35 @@
import static org.hamcrest.Matchers.is;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.LongFunction;
+import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.table.DataStreamerItem;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
class StreamerSubscriberTest extends BaseIgniteAbstractTest {
+ private static ScheduledExecutorService flushExecutor;
+
+ @BeforeAll
+ public static void flushExecutorInit() {
+ flushExecutor = Executors.newSingleThreadScheduledExecutor(
+ new NamedThreadFactory("flushExecutor",
Loggers.forClass(StreamerSubscriberTest.class)));
+ }
+
+ @AfterAll
+ public static void flushExecutorShutdown() {
+ flushExecutor.shutdown();
Review Comment:
Would it make sense to enclose this in a `null`-check of `flushExecutor`,
for a case when an exception gets thrown by `flushExecutorInit()` for some
reason?
##########
modules/core/src/test/java/org/apache/ignite/internal/streamer/StreamerSubscriberTest.java:
##########
@@ -157,6 +176,7 @@ public CompletableFuture<Void> refreshAsync() {
(part, batch, deleted) -> sendFuture,
partitionProvider,
options,
+ Executors.newSingleThreadScheduledExecutor(new
NamedThreadFactory("testBackpressureWithDelay", log)), // todo stop
Review Comment:
Should `flushExecutor` be used here?
##########
modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java:
##########
@@ -492,4 +493,10 @@ private void onDataAccess(String operation, Object arg) {
public @Nullable PendingComparableValuesTracker<Long, Void>
getPartitionStorageIndexTracker(int partitionId) {
return null;
}
+
+ @Override
+ public ScheduledExecutorService streamerFlushExecutor() {
+ //noinspection DataFlowIssue
+ return null;
Review Comment:
Would it make sense to throw an `UnsupportedOperationException` instead of
returning a `null` (that might make something else explode way later)?
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -2542,4 +2553,13 @@ private void
destroyTableStorageOnRecoveryBusy(CatalogTableDescriptor tableDescr
engine.dropMvTable(tableDescriptor.id());
}
+
+ private synchronized ScheduledExecutorService streamerFlushExecutor() {
+ if (streamerFlushExecutor == null) {
+ streamerFlushExecutor = Executors.newSingleThreadScheduledExecutor(
Review Comment:
Are we protected from first call to this method arriving after `close()` is
closed (leaving the executor never shut down)?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]