petrov-mg commented on code in PR #13148:
URL: https://github.com/apache/ignite/pull/13148#discussion_r3304562247


##########
modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java:
##########
@@ -2837,68 +2818,50 @@ public DiscoveryMessageNotifierThread(GridWorker 
worker) {
         }
     }
 
-    /**
-     *
-     */
-    private class DiscoveryMessageNotifierWorker extends GridWorker {
-        /** Queue. */
-        private final BlockingQueue<T2<GridFutureAdapter, Runnable>> queue = 
new LinkedBlockingQueue<>();
-
-        /**
-         * Default constructor.
-         */
-        protected DiscoveryMessageNotifierWorker() {
+    /** */
+    private class DiscoveryMessageNotifier extends 
IgniteAsyncObjectHandler<T2<GridFutureAdapter<?>, Runnable>> {
+        /** Default constructor. */
+        protected DiscoveryMessageNotifier() {
             super(ctx.igniteInstanceName(), "disco-notifier-worker", 
GridDiscoveryManager.this.log, ctx.workersRegistry());
         }
 
-        /**
-         *
-         */
+        /** {@inheritDoc} */
+        @Override public IgniteThread createWorkerThread(GridWorker worker) {
+            return new DiscoveryMessageNotifierThread(worker);
+        }
+
+        /** */
         private void body0() throws InterruptedException {
-            T2<GridFutureAdapter, Runnable> notification;
+            OperationContextAwareWrapper<T2<GridFutureAdapter<?>, Runnable>> 
contextualNotification = takeQueuedElement();
 
-            blockingSectionBegin();
+            try (Scope ignored = 
OperationContext.restoreSnapshot(contextualNotification.contextSnapshot())) {
+                T2<GridFutureAdapter<?>, Runnable> notification = 
contextualNotification.delegate();
 
-            try {
-                notification = queue.take();
-            }
-            finally {
-                blockingSectionEnd();
-            }
-
-            try {
-                notification.get2().run();
-            }
-            finally {
-                notification.get1().onDone();
+                try {
+                    notification.get2().run();
+                }
+                finally {
+                    notification.get1().onDone();
+                }
             }
         }
 
-        /**
-         * @param cmd Command.
-         */
-        public synchronized void submit(GridFutureAdapter notificationFut, 
Runnable cmd) {
+        /** @param cmd Command. */
+        public synchronized void submit(GridFutureAdapter<?> notificationFut, 
Runnable cmd) {
             if (isCancelled()) {
                 notificationFut.onDone();
 
                 return;
             }
 
-            queue.add(new T2<>(notificationFut, cmd));
+            addToQueue(new T2<>(notificationFut, cmd));
         }
 
-        /**
-         * Cancel thread execution and completes all notification futures.
-         */
+        /** Cancel thread execution and completes all notification futures. */

Review Comment:
   Done.



##########
modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java:
##########
@@ -190,6 +164,16 @@ public DataStreamerImpl<K, V> dataStreamer(@Nullable 
String cacheName) {
         }
     }
 
+    /** */
+    public void scheduleAutoFlush(DataStreamerImpl<?, ?> dataStreamer) {

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to