sashapolo commented on code in PR #2592:
URL: https://github.com/apache/ignite-3/pull/2592#discussion_r1327022038


##########
modules/core/src/main/java/org/apache/ignite/internal/event/EventListener.java:
##########
@@ -15,35 +15,22 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.manager;
+package org.apache.ignite.internal.event;
 
 import java.util.concurrent.CompletableFuture;
 import org.jetbrains.annotations.Nullable;
 
-/**
- * The listener handles events from a producer.
- *
- * @see Producer#listen(Event, EventListener)
- */
+/** The listener handles events from a {@link EventProducer event producer}. */

Review Comment:
   ```suggestion
   /** The listener handles events from an {@link EventProducer event 
producer}. */
   ```



##########
modules/core/src/main/java/org/apache/ignite/internal/event/EventParameters.java:
##########
@@ -15,15 +15,10 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.manager;
+package org.apache.ignite.internal.event;
 
-/**
- * Event parameters. This type passed to the event listener.
- *
- * @see Producer#fireEvent(Event, EventParameters, Throwable)
- */
+/** Event parameters. This type passed to the {@link 
EventListener#notify(EventParameters, Throwable)}. */

Review Comment:
   ```suggestion
   /** Event parameters. This type is passed to the {@link 
EventListener#notify(EventParameters, Throwable)}. */
   ```



##########
modules/core/src/main/java/org/apache/ignite/internal/event/AbstractEventProducer.java:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.event;
+
+import static java.util.concurrent.CompletableFuture.allOf;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Event producer.
+ *
+ * <p>Allows to {@link #listen add} and {@link #removeListener remove} event 
listeners for events, as well as
+ * {@link #fireEvent fire events}.</p>
+ */
+public abstract class AbstractEventProducer<T extends Event, P extends 
EventParameters> implements EventProducer<T, P> {
+    private final ConcurrentHashMap<T, List<EventListener<P>>> 
listenersByEvent = new ConcurrentHashMap<>();
+
+    @Override
+    public void listen(T evt, EventListener<? extends P> listener) {
+        listenersByEvent.computeIfAbsent(evt, evtKey -> new 
CopyOnWriteArrayList<>()).add((EventListener<P>) listener);

Review Comment:
   Can we use a manual copy approach here instead of using 
`CopyOnWriteArrayList`? We are always accessing it inside of `compute` anyway



##########
modules/core/src/main/java/org/apache/ignite/internal/event/EventListener.java:
##########
@@ -15,35 +15,22 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.manager;
+package org.apache.ignite.internal.event;
 
 import java.util.concurrent.CompletableFuture;
 import org.jetbrains.annotations.Nullable;
 
-/**
- * The listener handles events from a producer.
- *
- * @see Producer#listen(Event, EventListener)
- */
+/** The listener handles events from a event producer. */
 @FunctionalInterface
 public interface EventListener<P extends EventParameters> {
     /**
      * Notifies the listener about an event.
      *
-     * @param parameters Parameters provide a properties of the event. This 
attribute cannot be {@code null}.
+     * @param parameters Parameters provide a properties of the event.

Review Comment:
   ```suggestion
        * @param parameters Properties of the event.
   ```



##########
modules/core/src/main/java/org/apache/ignite/internal/event/EventListener.java:
##########
@@ -15,35 +15,22 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.manager;
+package org.apache.ignite.internal.event;
 
 import java.util.concurrent.CompletableFuture;
 import org.jetbrains.annotations.Nullable;
 
-/**
- * The listener handles events from a producer.
- *
- * @see Producer#listen(Event, EventListener)
- */
+/** The listener handles events from a event producer. */
 @FunctionalInterface
 public interface EventListener<P extends EventParameters> {
     /**
      * Notifies the listener about an event.
      *
-     * @param parameters Parameters provide a properties of the event. This 
attribute cannot be {@code null}.
+     * @param parameters Parameters provide a properties of the event.
      * @param exception  Exception which is happened during the event produced 
or {@code null}.
      * @return Completable future, which is completed when event handling is 
finished. The {@code True} value of the future
      *     means that the event is handled and a listener will be removed, 
{@code false} is the listener will stay listen.
      *     This future must not be completed with {@code null} value.

Review Comment:
   ```suggestion
        *     This future will never be completed with {@code null} value.
   ```



##########
modules/core/src/main/java/org/apache/ignite/internal/event/EventProducer.java:
##########
@@ -15,26 +15,24 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.manager;
+package org.apache.ignite.internal.event;
 
-import org.apache.ignite.lang.IgniteInternalCheckedException;
-
-/**
- * The exception notifies a listener when the listener was removed from queue 
and never receive a notification again.
- */
-public class ListenerRemovedException extends IgniteInternalCheckedException {
+/** Allows to {@link #listen add} and {@link #removeListener remove} event 
listeners that the component will fire. */

Review Comment:
   ```suggestion
   /** Allows to {@link #listen add} and {@link #removeListener remove} event 
listeners that the component will produce. */
   ```



##########
modules/core/src/test/java/org/apache/ignite/internal/event/EventProducerTest.java:
##########
@@ -74,31 +68,28 @@ public void stopListenTest() {
         }
 
         assertEquals(stopListenAfterCount, listenConter.get());
-        assertEquals(0, removeCounter.get());
     }
 
     @Test
     public void parallelTest() {
-        Producer<TestEvent, TestEventParameters> producer = new Producer<>() {
-        };
+        AbstractEventProducer<TestEvent, TestEventParameters> producer = new 
AbstractEventProducer<>() {};
 
         final int listenersCount = 10_000;
         final int listenerIndexToRemove = 10;
         EventListener<TestEventParameters> listenerToRemove = null;
 
-        CompletableFuture toRemoveFuture = new CompletableFuture();
+        CompletableFuture<Void> toRemoveFuture = new CompletableFuture<>();
 
         for (int i = 0; i < listenersCount; i++) {
             EventListener<TestEventParameters> listener = i == 
listenerIndexToRemove
                     ? createEventListener(
-                        (p, e) -> {
-                            toRemoveFuture.complete(null);
+                    (p, e) -> {
+                        toRemoveFuture.complete(null);
 
-                            return completedFuture(false);
-                        },
-                        t -> {}
-                    )
-                    : createEventListener((p, e) -> completedFuture(false), t 
-> {});
+                        return completedFuture(false);
+                    }
+            )

Review Comment:
   This formatting looks wrong



##########
modules/core/src/main/java/org/apache/ignite/internal/event/Event.java:
##########
@@ -15,12 +15,8 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.manager;
+package org.apache.ignite.internal.event;
 
-/**
- * The event cas which is produced by event producer component.
- *
- * @see Producer#fireEvent(Event, EventParameters, Throwable)
- */
+/** The event which is produced by a component. */

Review Comment:
   ```suggestion
   /** An event which is produced by a component. */
   ```



##########
modules/core/src/main/java/org/apache/ignite/internal/event/EventListener.java:
##########
@@ -15,35 +15,22 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.manager;
+package org.apache.ignite.internal.event;
 
 import java.util.concurrent.CompletableFuture;
 import org.jetbrains.annotations.Nullable;
 
-/**
- * The listener handles events from a producer.
- *
- * @see Producer#listen(Event, EventListener)
- */
+/** The listener handles events from a event producer. */
 @FunctionalInterface
 public interface EventListener<P extends EventParameters> {
     /**
      * Notifies the listener about an event.
      *
-     * @param parameters Parameters provide a properties of the event. This 
attribute cannot be {@code null}.
+     * @param parameters Parameters provide a properties of the event.
      * @param exception  Exception which is happened during the event produced 
or {@code null}.

Review Comment:
   ```suggestion
        * @param exception  Exception which has happened when producing the 
event or {@code null}.
   ```



##########
modules/core/src/main/java/org/apache/ignite/internal/event/EventListener.java:
##########
@@ -15,35 +15,22 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.manager;
+package org.apache.ignite.internal.event;
 
 import java.util.concurrent.CompletableFuture;
 import org.jetbrains.annotations.Nullable;
 
-/**
- * The listener handles events from a producer.
- *
- * @see Producer#listen(Event, EventListener)
- */
+/** The listener handles events from a event producer. */

Review Comment:
   ```suggestion
   /** A listener that handles events from an event producer. */
   ```



##########
modules/core/src/main/java/org/apache/ignite/internal/event/EventListener.java:
##########
@@ -15,35 +15,22 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.manager;
+package org.apache.ignite.internal.event;
 
 import java.util.concurrent.CompletableFuture;
 import org.jetbrains.annotations.Nullable;
 
-/**
- * The listener handles events from a producer.
- *
- * @see Producer#listen(Event, EventListener)
- */
+/** The listener handles events from a event producer. */
 @FunctionalInterface
 public interface EventListener<P extends EventParameters> {
     /**
      * Notifies the listener about an event.
      *
-     * @param parameters Parameters provide a properties of the event. This 
attribute cannot be {@code null}.
+     * @param parameters Parameters provide a properties of the event.
      * @param exception  Exception which is happened during the event produced 
or {@code null}.
      * @return Completable future, which is completed when event handling is 
finished. The {@code True} value of the future

Review Comment:
   ```suggestion
        * @return Completable future, which is completed when event handling is 
finished. The {@code true} value of the future
   ```



##########
modules/core/src/main/java/org/apache/ignite/internal/event/AbstractEventProducer.java:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.event;
+
+import static java.util.concurrent.CompletableFuture.allOf;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Event producer.
+ *
+ * <p>Allows to {@link #listen add} and {@link #removeListener remove} event 
listeners for events, as well as
+ * {@link #fireEvent fire events}.</p>
+ */
+public abstract class AbstractEventProducer<T extends Event, P extends 
EventParameters> implements EventProducer<T, P> {
+    private final ConcurrentHashMap<T, List<EventListener<P>>> 
listenersByEvent = new ConcurrentHashMap<>();
+
+    @Override
+    public void listen(T evt, EventListener<? extends P> listener) {
+        listenersByEvent.computeIfAbsent(evt, evtKey -> new 
CopyOnWriteArrayList<>()).add((EventListener<P>) listener);
+    }
+
+    @Override
+    public void removeListener(T evt, EventListener<? extends P> listener) {
+        listenersByEvent.computeIfPresent(evt, (evt0, listeners) -> {
+            listeners.remove(listener);
+
+            return listeners.isEmpty() ? null : listeners;
+        });
+    }
+
+    /**
+     * Notifies every listener that subscribed before.
+     *
+     * @param evt Event.
+     * @param params Event parameters.
+     * @param err Exception when it was happened, or {@code null} otherwise.
+     * @return Completable future which is completed when event handling is 
complete.
+     */
+    protected CompletableFuture<Void> fireEvent(T evt, P params, @Nullable 
Throwable err) {
+        List<EventListener<P>> listeners = listenersByEvent.get(evt);
+
+        if (listeners == null) {
+            return completedFuture(null);
+        }
+
+        List<CompletableFuture<?>> futures = new ArrayList<>();
+
+        for (EventListener<P> listener : listeners) {

Review Comment:
   Since `listeners` is a `List`, we can declare `futures` as an array and 
remove the copy below.



##########
modules/core/src/main/java/org/apache/ignite/internal/event/EventListener.java:
##########
@@ -15,35 +15,22 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.manager;
+package org.apache.ignite.internal.event;
 
 import java.util.concurrent.CompletableFuture;
 import org.jetbrains.annotations.Nullable;
 
-/**
- * The listener handles events from a producer.
- *
- * @see Producer#listen(Event, EventListener)
- */
+/** The listener handles events from a event producer. */
 @FunctionalInterface
 public interface EventListener<P extends EventParameters> {
     /**
      * Notifies the listener about an event.
      *
-     * @param parameters Parameters provide a properties of the event. This 
attribute cannot be {@code null}.
+     * @param parameters Parameters provide a properties of the event.
      * @param exception  Exception which is happened during the event produced 
or {@code null}.
      * @return Completable future, which is completed when event handling is 
finished. The {@code True} value of the future
      *     means that the event is handled and a listener will be removed, 
{@code false} is the listener will stay listen.

Review Comment:
   ```suggestion
        *     means that the event has been handled and a listener will be 
removed, {@code false} is that the listener will continue listening.
   ```



##########
modules/core/src/test/java/org/apache/ignite/internal/event/EventProducerTest.java:
##########
@@ -45,26 +42,23 @@ public void simpleAsyncTest() {
 
         CompletableFuture<?> eventHandleFuture = 
producer.fireEvent(TestEvent.TEST, new TestEventParameters(0L));
 
-        assertFalse(eventHandleFuture.isDone());
+        assertThat(eventHandleFuture, willTimeoutFast());
 
         future.complete(true);
 
-        assertTrue(eventHandleFuture.isDone());
+        assertThat(eventHandleFuture, willCompleteSuccessfully());

Review Comment:
   And this?



##########
modules/core/src/test/java/org/apache/ignite/internal/event/EventProducerTest.java:
##########
@@ -45,26 +42,23 @@ public void simpleAsyncTest() {
 
         CompletableFuture<?> eventHandleFuture = 
producer.fireEvent(TestEvent.TEST, new TestEventParameters(0L));
 
-        assertFalse(eventHandleFuture.isDone());
+        assertThat(eventHandleFuture, willTimeoutFast());

Review Comment:
   Why did you change this check?



##########
modules/core/src/main/java/org/apache/ignite/internal/event/EventProducer.java:
##########
@@ -15,26 +15,24 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.manager;
+package org.apache.ignite.internal.event;
 
-import org.apache.ignite.lang.IgniteInternalCheckedException;
-
-/**
- * The exception notifies a listener when the listener was removed from queue 
and never receive a notification again.
- */
-public class ListenerRemovedException extends IgniteInternalCheckedException {
+/** Allows to {@link #listen add} and {@link #removeListener remove} event 
listeners that the component will fire. */
+public interface EventProducer<T extends Event, P extends EventParameters> {
     /**
-     * Default constructor.
+     * Registers an event listener. When the event predicate returns true it 
would never invoke after, otherwise this predicate would
+     * receive an event again.

Review Comment:
   ```suggestion
        * Registers an event listener. If {@link EventListener#notify} returns 
{@code true}, it would never be invoked again.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to