PakhomovAlexander commented on code in PR #2359:
URL: https://github.com/apache/ignite-3/pull/2359#discussion_r1284461787


##########
modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/ItConnectionHeartbeatTest.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cli.commands;
+
+import static org.awaitility.Awaitility.await;
+import static org.junit.jupiter.api.Assertions.assertAll;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+import io.micronaut.context.annotation.Property;
+import io.micronaut.context.annotation.Value;
+import jakarta.inject.Inject;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.cli.core.repl.Session;
+import org.apache.ignite.internal.cli.event.EventFactory;
+import org.apache.ignite.internal.cli.event.EventListener;
+import org.apache.ignite.internal.cli.event.EventType;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+
+@Property(name="cli.check.connection.period.second", value="1")
+class ItConnectionHeartbeatTest extends 
CliCommandTestInitializedIntegrationBase {
+
+    @Inject
+    Session session;
+
+    @Inject
+    EventFactory eventFactory;
+
+    @Value("${cli.check.connection.period.second}")
+    private long CLI_CHECK_CONNECTION_PERIOD_SECONDS;
+
+    private final AtomicInteger connectionLost = new AtomicInteger(0);
+    private final AtomicInteger connectionRestored = new AtomicInteger(0);
+
+    @BeforeEach
+    void setUp() {
+        //ToDo: Set connection check timeout to 1 sec to make test fast

Review Comment:
   We tend to link any todo with the ticket. Could you create one or fix todo 
here?



##########
modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/ItConnectionHeartbeatTest.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cli.commands;
+
+import static org.awaitility.Awaitility.await;
+import static org.junit.jupiter.api.Assertions.assertAll;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+import io.micronaut.context.annotation.Property;
+import io.micronaut.context.annotation.Value;
+import jakarta.inject.Inject;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.cli.core.repl.Session;
+import org.apache.ignite.internal.cli.event.EventFactory;
+import org.apache.ignite.internal.cli.event.EventListener;
+import org.apache.ignite.internal.cli.event.EventType;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+
+@Property(name="cli.check.connection.period.second", value="1")
+class ItConnectionHeartbeatTest extends 
CliCommandTestInitializedIntegrationBase {
+
+    @Inject
+    Session session;
+
+    @Inject
+    EventFactory eventFactory;
+
+    @Value("${cli.check.connection.period.second}")
+    private long CLI_CHECK_CONNECTION_PERIOD_SECONDS;

Review Comment:
   ```suggestion
       private final long CLI_CHECK_CONNECTION_PERIOD_SECONDS = 1;
   ```



##########
modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/ConnectionHeartBeat.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cli.core.repl;
+
+import io.micronaut.context.annotation.Value;
+import jakarta.inject.Singleton;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.annotation.Nullable;
+import org.apache.ignite.internal.cli.core.rest.ApiClientFactory;
+import org.apache.ignite.internal.cli.event.Event;
+import org.apache.ignite.internal.cli.event.EventFactory;
+import org.apache.ignite.internal.cli.event.EventListener;
+import org.apache.ignite.internal.cli.event.EventType;
+import org.apache.ignite.internal.cli.logger.CliLoggers;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.rest.client.api.NodeManagementApi;
+import org.apache.ignite.rest.client.invoker.ApiException;
+
+/**
+ * Connection to node heart beat.
+ */
+@Singleton
+public class ConnectionHeartBeat implements EventListener {
+
+    private static final IgniteLogger log = 
CliLoggers.forClass(ConnectionHeartBeat.class);
+
+    /** CLI check connection period period. */
+    private final long cliCheckConnectionPeriodSecond;
+
+    /** Scheduled executor for connection heartbeat. */
+    @Nullable
+    private ScheduledExecutorService scheduledConnectionHeartbeatExecutor;
+
+    private final ApiClientFactory clientFactory;
+
+    private final EventFactory eventFactory;
+
+    private final AtomicBoolean connected = new AtomicBoolean(false);
+
+    /**
+     * Created instance of connection heartbeat.

Review Comment:
   ```suggestion
        * Creates the instance of connection heartbeat.
   ```



##########
modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/ConnectionHeartBeat.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cli.core.repl;
+
+import io.micronaut.context.annotation.Value;
+import jakarta.inject.Singleton;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.annotation.Nullable;
+import org.apache.ignite.internal.cli.core.rest.ApiClientFactory;
+import org.apache.ignite.internal.cli.event.Event;
+import org.apache.ignite.internal.cli.event.EventFactory;
+import org.apache.ignite.internal.cli.event.EventListener;
+import org.apache.ignite.internal.cli.event.EventType;
+import org.apache.ignite.internal.cli.logger.CliLoggers;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.rest.client.api.NodeManagementApi;
+import org.apache.ignite.rest.client.invoker.ApiException;
+
+/**
+ * Connection to node heart beat.
+ */
+@Singleton
+public class ConnectionHeartBeat implements EventListener {
+
+    private static final IgniteLogger log = 
CliLoggers.forClass(ConnectionHeartBeat.class);
+
+    /** CLI check connection period period. */
+    private final long cliCheckConnectionPeriodSecond;
+
+    /** Scheduled executor for connection heartbeat. */
+    @Nullable
+    private ScheduledExecutorService scheduledConnectionHeartbeatExecutor;
+
+    private final ApiClientFactory clientFactory;
+
+    private final EventFactory eventFactory;
+
+    private final AtomicBoolean connected = new AtomicBoolean(false);
+
+    /**
+     * Created instance of connection heartbeat.
+     *
+     * @param clientFactory api client factory.
+     * @param eventFactory event factory.
+     */
+    public 
ConnectionHeartBeat(@Value("${cli.check.connection.period.second:5}") long 
cliCheckConnectionPeriodSecond,
+            ApiClientFactory clientFactory,
+            EventFactory eventFactory) {
+        this.clientFactory = clientFactory;
+        this.eventFactory = eventFactory;
+        this.cliCheckConnectionPeriodSecond = cliCheckConnectionPeriodSecond;
+    }
+
+    /**
+     * Starts connection heartbeat. By default connection will be checked 
every 5 sec.
+     *
+     * @param sessionInfo session info with node url
+     */
+    private void onConnect(SessionInfo sessionInfo) {
+        //eventFactory.fireEvent(EventType.CONNECTION_RESTORED, new 
ConnectionStatusEvent());
+
+        if (scheduledConnectionHeartbeatExecutor == null) {
+            scheduledConnectionHeartbeatExecutor =
+                    Executors.newScheduledThreadPool(1, new 
NamedThreadFactory("cli-check-connection-thread", log));
+
+            //Start connection heart beat
+            scheduledConnectionHeartbeatExecutor.scheduleAtFixedRate(
+                    () -> pingConnection(sessionInfo.nodeUrl()),
+                    0,
+                    cliCheckConnectionPeriodSecond,
+                    TimeUnit.SECONDS
+            );
+        }
+    }
+
+    /**
+     * Stops connection heartbeat.
+     */
+    private void onDisconnect() {
+        if (scheduledConnectionHeartbeatExecutor != null) {
+            scheduledConnectionHeartbeatExecutor.shutdownNow();
+            scheduledConnectionHeartbeatExecutor = null;
+        }
+    }
+
+    private void pingConnection(String nodeUrl) {
+        try {
+            new 
NodeManagementApi(clientFactory.getClient(nodeUrl)).nodeState();
+            if (!connected.get()) {
+                connected.compareAndSet(false, true);

Review Comment:
   That is not thread-safe. I would suggest calling 
`connected.compareAndSet(false, true)` and check the result of the operation.



##########
modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/ItConnectionHeartbeatTest.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cli.commands;
+
+import static org.awaitility.Awaitility.await;
+import static org.junit.jupiter.api.Assertions.assertAll;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+import io.micronaut.context.annotation.Property;
+import io.micronaut.context.annotation.Value;
+import jakarta.inject.Inject;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.cli.core.repl.Session;
+import org.apache.ignite.internal.cli.event.EventFactory;
+import org.apache.ignite.internal.cli.event.EventListener;
+import org.apache.ignite.internal.cli.event.EventType;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+
+@Property(name="cli.check.connection.period.second", value="1")
+class ItConnectionHeartbeatTest extends 
CliCommandTestInitializedIntegrationBase {
+
+    @Inject
+    Session session;
+
+    @Inject
+    EventFactory eventFactory;
+
+    @Value("${cli.check.connection.period.second}")
+    private long CLI_CHECK_CONNECTION_PERIOD_SECONDS;
+
+    private final AtomicInteger connectionLost = new AtomicInteger(0);
+    private final AtomicInteger connectionRestored = new AtomicInteger(0);
+
+    @BeforeEach
+    void setUp() {
+        //ToDo: Set connection check timeout to 1 sec to make test fast
+        connectionLost.set(0);
+        connectionRestored.set(0);
+        EventListener eventListener = (eventType, event) -> {
+            if (EventType.CONNECTION_LOST == eventType) {
+                connectionLost.incrementAndGet();
+            } else if (EventType.CONNECTION_RESTORED == eventType) {
+                connectionRestored.incrementAndGet();
+            }
+        };
+
+        //Register listeners
+        eventFactory.listen(EventType.CONNECTION_LOST, eventListener);
+        eventFactory.listen(EventType.CONNECTION_RESTORED, eventListener);
+    }
+
+    @Override
+    protected Class<?> getCommandClass() {
+        return TopLevelCliReplCommand.class;
+    }
+
+    @Test
+    @DisplayName("Should send event CONNECTION_RESTORED on connection start")
+    void connectionEstablished() {
+        // Given null session info before connect
+        assertNull(session.info());
+
+        // When connect without parameters
+        execute("connect");
+
+        // Then
+        assertAll(
+                this::assertErrOutputIsEmpty,
+                () -> assertOutputContains("Connected to 
http://localhost:10300";)
+        );
+
+        //Listener was invoked
+        await().timeout(CLI_CHECK_CONNECTION_PERIOD_SECONDS * 2, 
TimeUnit.SECONDS).until(() -> connectionRestored.get() == 1);
+        assertEquals(0, connectionLost.get());
+    }
+
+    @Test
+    @DisplayName("Should send event CONNECTION_LOST on cluster stop")
+    void onConnectionLost() {
+        // Given connected cli
+        execute("connect");
+
+        // Then
+        assertAll(
+                this::assertErrOutputIsEmpty,
+                () -> assertOutputContains("Connected to 
http://localhost:10300";)
+        );
+
+        // When stop node
+        String nodeName = session.info().nodeName();
+        this.stopNode(nodeName);
+
+        //Listener was invoked
+        await().timeout(CLI_CHECK_CONNECTION_PERIOD_SECONDS * 2, 
TimeUnit.SECONDS).until(() -> connectionRestored.get() == 1);
+        await().timeout(CLI_CHECK_CONNECTION_PERIOD_SECONDS * 2, 
TimeUnit.SECONDS).until(() -> connectionLost.get() == 1);
+
+        //Tear down
+        this.startNode(nodeName);

Review Comment:
   As far as I see it is already stopped.



##########
modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/ItConnectionHeartbeatTest.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cli.commands;
+
+import static org.awaitility.Awaitility.await;
+import static org.junit.jupiter.api.Assertions.assertAll;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+import io.micronaut.context.annotation.Property;
+import io.micronaut.context.annotation.Value;
+import jakarta.inject.Inject;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.cli.core.repl.Session;
+import org.apache.ignite.internal.cli.event.EventFactory;
+import org.apache.ignite.internal.cli.event.EventListener;
+import org.apache.ignite.internal.cli.event.EventType;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+
+@Property(name="cli.check.connection.period.second", value="1")
+class ItConnectionHeartbeatTest extends 
CliCommandTestInitializedIntegrationBase {
+
+    @Inject
+    Session session;
+
+    @Inject
+    EventFactory eventFactory;
+
+    @Value("${cli.check.connection.period.second}")
+    private long CLI_CHECK_CONNECTION_PERIOD_SECONDS;
+
+    private final AtomicInteger connectionLost = new AtomicInteger(0);
+    private final AtomicInteger connectionRestored = new AtomicInteger(0);
+
+    @BeforeEach
+    void setUp() {
+        //ToDo: Set connection check timeout to 1 sec to make test fast
+        connectionLost.set(0);
+        connectionRestored.set(0);
+        EventListener eventListener = (eventType, event) -> {
+            if (EventType.CONNECTION_LOST == eventType) {
+                connectionLost.incrementAndGet();
+            } else if (EventType.CONNECTION_RESTORED == eventType) {
+                connectionRestored.incrementAndGet();
+            }
+        };
+
+        //Register listeners
+        eventFactory.listen(EventType.CONNECTION_LOST, eventListener);
+        eventFactory.listen(EventType.CONNECTION_RESTORED, eventListener);
+    }
+
+    @Override
+    protected Class<?> getCommandClass() {
+        return TopLevelCliReplCommand.class;
+    }
+
+    @Test
+    @DisplayName("Should send event CONNECTION_RESTORED on connection start")
+    void connectionEstablished() {
+        // Given null session info before connect
+        assertNull(session.info());
+
+        // When connect without parameters
+        execute("connect");
+
+        // Then
+        assertAll(
+                this::assertErrOutputIsEmpty,
+                () -> assertOutputContains("Connected to 
http://localhost:10300";)
+        );
+
+        //Listener was invoked
+        await().timeout(CLI_CHECK_CONNECTION_PERIOD_SECONDS * 2, 
TimeUnit.SECONDS).until(() -> connectionRestored.get() == 1);
+        assertEquals(0, connectionLost.get());
+    }
+
+    @Test
+    @DisplayName("Should send event CONNECTION_LOST on cluster stop")
+    void onConnectionLost() {
+        // Given connected cli
+        execute("connect");
+
+        // Then
+        assertAll(
+                this::assertErrOutputIsEmpty,
+                () -> assertOutputContains("Connected to 
http://localhost:10300";)
+        );
+
+        // When stop node
+        String nodeName = session.info().nodeName();
+        this.stopNode(nodeName);
+
+        //Listener was invoked
+        await().timeout(CLI_CHECK_CONNECTION_PERIOD_SECONDS * 2, 
TimeUnit.SECONDS).until(() -> connectionRestored.get() == 1);
+        await().timeout(CLI_CHECK_CONNECTION_PERIOD_SECONDS * 2, 
TimeUnit.SECONDS).until(() -> connectionLost.get() == 1);
+
+        //Tear down
+        this.startNode(nodeName);
+    }
+
+    @Test
+    @DisplayName("Should send event CONNECTION_LOST on cluster stop")
+    void restoreConnectionAfterConnectionLost() {
+        // Given connected cli
+        execute("connect");
+
+        // Then
+        assertAll(
+                this::assertErrOutputIsEmpty,
+                () -> assertOutputContains("Connected to 
http://localhost:10300";)
+        );
+
+        // When stop node
+        String nodeName = session.info().nodeName();
+        this.stopNode(nodeName);
+
+        //Then
+        await().timeout(CLI_CHECK_CONNECTION_PERIOD_SECONDS * 2, 
TimeUnit.SECONDS).until(() -> connectionRestored.get() == 1);
+        await().timeout(CLI_CHECK_CONNECTION_PERIOD_SECONDS * 2, 
TimeUnit.SECONDS).until(() -> connectionLost.get() == 1);
+
+        // When
+        this.startNode(nodeName);
+
+        //Then

Review Comment:
   ```suggestion
           // Then
   ```



##########
modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/ItConnectionHeartbeatTest.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cli.commands;
+
+import static org.awaitility.Awaitility.await;
+import static org.junit.jupiter.api.Assertions.assertAll;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+import io.micronaut.context.annotation.Property;
+import io.micronaut.context.annotation.Value;
+import jakarta.inject.Inject;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.cli.core.repl.Session;
+import org.apache.ignite.internal.cli.event.EventFactory;
+import org.apache.ignite.internal.cli.event.EventListener;
+import org.apache.ignite.internal.cli.event.EventType;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+
+@Property(name="cli.check.connection.period.second", value="1")
+class ItConnectionHeartbeatTest extends 
CliCommandTestInitializedIntegrationBase {
+
+    @Inject
+    Session session;
+
+    @Inject
+    EventFactory eventFactory;
+
+    @Value("${cli.check.connection.period.second}")
+    private long CLI_CHECK_CONNECTION_PERIOD_SECONDS;
+
+    private final AtomicInteger connectionLost = new AtomicInteger(0);
+    private final AtomicInteger connectionRestored = new AtomicInteger(0);
+
+    @BeforeEach
+    void setUp() {
+        //ToDo: Set connection check timeout to 1 sec to make test fast
+        connectionLost.set(0);
+        connectionRestored.set(0);
+        EventListener eventListener = (eventType, event) -> {
+            if (EventType.CONNECTION_LOST == eventType) {

Review Comment:
   It is better not to use if statement but register two separate event 
listeners



##########
modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/ItConnectionHeartbeatTest.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cli.commands;
+
+import static org.awaitility.Awaitility.await;
+import static org.junit.jupiter.api.Assertions.assertAll;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+import io.micronaut.context.annotation.Property;
+import io.micronaut.context.annotation.Value;
+import jakarta.inject.Inject;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.cli.core.repl.Session;
+import org.apache.ignite.internal.cli.event.EventFactory;
+import org.apache.ignite.internal.cli.event.EventListener;
+import org.apache.ignite.internal.cli.event.EventType;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+
+@Property(name="cli.check.connection.period.second", value="1")
+class ItConnectionHeartbeatTest extends 
CliCommandTestInitializedIntegrationBase {
+
+    @Inject
+    Session session;
+
+    @Inject
+    EventFactory eventFactory;
+
+    @Value("${cli.check.connection.period.second}")
+    private long CLI_CHECK_CONNECTION_PERIOD_SECONDS;
+
+    private final AtomicInteger connectionLost = new AtomicInteger(0);
+    private final AtomicInteger connectionRestored = new AtomicInteger(0);
+
+    @BeforeEach
+    void setUp() {
+        //ToDo: Set connection check timeout to 1 sec to make test fast
+        connectionLost.set(0);
+        connectionRestored.set(0);
+        EventListener eventListener = (eventType, event) -> {
+            if (EventType.CONNECTION_LOST == eventType) {
+                connectionLost.incrementAndGet();
+            } else if (EventType.CONNECTION_RESTORED == eventType) {
+                connectionRestored.incrementAndGet();
+            }
+        };
+
+        //Register listeners
+        eventFactory.listen(EventType.CONNECTION_LOST, eventListener);
+        eventFactory.listen(EventType.CONNECTION_RESTORED, eventListener);
+    }
+
+    @Override
+    protected Class<?> getCommandClass() {
+        return TopLevelCliReplCommand.class;
+    }
+
+    @Test
+    @DisplayName("Should send event CONNECTION_RESTORED on connection start")
+    void connectionEstablished() {
+        // Given null session info before connect
+        assertNull(session.info());
+
+        // When connect without parameters
+        execute("connect");
+
+        // Then
+        assertAll(
+                this::assertErrOutputIsEmpty,
+                () -> assertOutputContains("Connected to 
http://localhost:10300";)
+        );
+
+        //Listener was invoked
+        await().timeout(CLI_CHECK_CONNECTION_PERIOD_SECONDS * 2, 
TimeUnit.SECONDS).until(() -> connectionRestored.get() == 1);
+        assertEquals(0, connectionLost.get());
+    }
+
+    @Test
+    @DisplayName("Should send event CONNECTION_LOST on cluster stop")
+    void onConnectionLost() {
+        // Given connected cli
+        execute("connect");
+
+        // Then
+        assertAll(
+                this::assertErrOutputIsEmpty,
+                () -> assertOutputContains("Connected to 
http://localhost:10300";)
+        );
+
+        // When stop node
+        String nodeName = session.info().nodeName();
+        this.stopNode(nodeName);
+
+        //Listener was invoked
+        await().timeout(CLI_CHECK_CONNECTION_PERIOD_SECONDS * 2, 
TimeUnit.SECONDS).until(() -> connectionRestored.get() == 1);
+        await().timeout(CLI_CHECK_CONNECTION_PERIOD_SECONDS * 2, 
TimeUnit.SECONDS).until(() -> connectionLost.get() == 1);
+
+        //Tear down

Review Comment:
   ```suggestion
           // Tear down
   ```



##########
modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/ItConnectionHeartbeatTest.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cli.commands;
+
+import static org.awaitility.Awaitility.await;
+import static org.junit.jupiter.api.Assertions.assertAll;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+import io.micronaut.context.annotation.Property;
+import io.micronaut.context.annotation.Value;
+import jakarta.inject.Inject;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.cli.core.repl.Session;
+import org.apache.ignite.internal.cli.event.EventFactory;
+import org.apache.ignite.internal.cli.event.EventListener;
+import org.apache.ignite.internal.cli.event.EventType;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+
+@Property(name="cli.check.connection.period.second", value="1")
+class ItConnectionHeartbeatTest extends 
CliCommandTestInitializedIntegrationBase {
+
+    @Inject
+    Session session;
+
+    @Inject
+    EventFactory eventFactory;
+
+    @Value("${cli.check.connection.period.second}")
+    private long CLI_CHECK_CONNECTION_PERIOD_SECONDS;
+
+    private final AtomicInteger connectionLost = new AtomicInteger(0);
+    private final AtomicInteger connectionRestored = new AtomicInteger(0);
+
+    @BeforeEach
+    void setUp() {
+        //ToDo: Set connection check timeout to 1 sec to make test fast
+        connectionLost.set(0);
+        connectionRestored.set(0);
+        EventListener eventListener = (eventType, event) -> {
+            if (EventType.CONNECTION_LOST == eventType) {
+                connectionLost.incrementAndGet();
+            } else if (EventType.CONNECTION_RESTORED == eventType) {
+                connectionRestored.incrementAndGet();
+            }
+        };
+
+        //Register listeners
+        eventFactory.listen(EventType.CONNECTION_LOST, eventListener);
+        eventFactory.listen(EventType.CONNECTION_RESTORED, eventListener);
+    }
+
+    @Override
+    protected Class<?> getCommandClass() {
+        return TopLevelCliReplCommand.class;
+    }
+
+    @Test
+    @DisplayName("Should send event CONNECTION_RESTORED on connection start")
+    void connectionEstablished() {
+        // Given null session info before connect
+        assertNull(session.info());
+
+        // When connect without parameters
+        execute("connect");
+
+        // Then
+        assertAll(
+                this::assertErrOutputIsEmpty,
+                () -> assertOutputContains("Connected to 
http://localhost:10300";)
+        );
+
+        //Listener was invoked
+        await().timeout(CLI_CHECK_CONNECTION_PERIOD_SECONDS * 2, 
TimeUnit.SECONDS).until(() -> connectionRestored.get() == 1);
+        assertEquals(0, connectionLost.get());
+    }
+
+    @Test
+    @DisplayName("Should send event CONNECTION_LOST on cluster stop")
+    void onConnectionLost() {
+        // Given connected cli
+        execute("connect");
+
+        // Then
+        assertAll(
+                this::assertErrOutputIsEmpty,
+                () -> assertOutputContains("Connected to 
http://localhost:10300";)
+        );
+
+        // When stop node
+        String nodeName = session.info().nodeName();
+        this.stopNode(nodeName);
+
+        //Listener was invoked
+        await().timeout(CLI_CHECK_CONNECTION_PERIOD_SECONDS * 2, 
TimeUnit.SECONDS).until(() -> connectionRestored.get() == 1);
+        await().timeout(CLI_CHECK_CONNECTION_PERIOD_SECONDS * 2, 
TimeUnit.SECONDS).until(() -> connectionLost.get() == 1);
+
+        //Tear down
+        this.startNode(nodeName);
+    }
+
+    @Test
+    @DisplayName("Should send event CONNECTION_LOST on cluster stop")
+    void restoreConnectionAfterConnectionLost() {
+        // Given connected cli
+        execute("connect");
+
+        // Then
+        assertAll(
+                this::assertErrOutputIsEmpty,
+                () -> assertOutputContains("Connected to 
http://localhost:10300";)
+        );
+
+        // When stop node
+        String nodeName = session.info().nodeName();
+        this.stopNode(nodeName);
+
+        //Then

Review Comment:
   ```suggestion
           // Then
   ```



##########
modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/ItConnectionHeartbeatTest.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cli.commands;
+
+import static org.awaitility.Awaitility.await;
+import static org.junit.jupiter.api.Assertions.assertAll;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+import io.micronaut.context.annotation.Property;
+import io.micronaut.context.annotation.Value;
+import jakarta.inject.Inject;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.cli.core.repl.Session;
+import org.apache.ignite.internal.cli.event.EventFactory;
+import org.apache.ignite.internal.cli.event.EventListener;
+import org.apache.ignite.internal.cli.event.EventType;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+
+@Property(name="cli.check.connection.period.second", value="1")
+class ItConnectionHeartbeatTest extends 
CliCommandTestInitializedIntegrationBase {
+
+    @Inject
+    Session session;
+
+    @Inject
+    EventFactory eventFactory;
+
+    @Value("${cli.check.connection.period.second}")
+    private long CLI_CHECK_CONNECTION_PERIOD_SECONDS;
+
+    private final AtomicInteger connectionLost = new AtomicInteger(0);
+    private final AtomicInteger connectionRestored = new AtomicInteger(0);
+
+    @BeforeEach
+    void setUp() {
+        //ToDo: Set connection check timeout to 1 sec to make test fast
+        connectionLost.set(0);
+        connectionRestored.set(0);
+        EventListener eventListener = (eventType, event) -> {
+            if (EventType.CONNECTION_LOST == eventType) {
+                connectionLost.incrementAndGet();
+            } else if (EventType.CONNECTION_RESTORED == eventType) {
+                connectionRestored.incrementAndGet();
+            }
+        };
+
+        //Register listeners
+        eventFactory.listen(EventType.CONNECTION_LOST, eventListener);
+        eventFactory.listen(EventType.CONNECTION_RESTORED, eventListener);
+    }
+
+    @Override
+    protected Class<?> getCommandClass() {
+        return TopLevelCliReplCommand.class;
+    }
+
+    @Test
+    @DisplayName("Should send event CONNECTION_RESTORED on connection start")
+    void connectionEstablished() {
+        // Given null session info before connect
+        assertNull(session.info());
+
+        // When connect without parameters
+        execute("connect");
+
+        // Then
+        assertAll(
+                this::assertErrOutputIsEmpty,
+                () -> assertOutputContains("Connected to 
http://localhost:10300";)
+        );
+
+        //Listener was invoked
+        await().timeout(CLI_CHECK_CONNECTION_PERIOD_SECONDS * 2, 
TimeUnit.SECONDS).until(() -> connectionRestored.get() == 1);
+        assertEquals(0, connectionLost.get());
+    }
+
+    @Test
+    @DisplayName("Should send event CONNECTION_LOST on cluster stop")
+    void onConnectionLost() {
+        // Given connected cli
+        execute("connect");
+
+        // Then
+        assertAll(
+                this::assertErrOutputIsEmpty,
+                () -> assertOutputContains("Connected to 
http://localhost:10300";)
+        );
+
+        // When stop node
+        String nodeName = session.info().nodeName();
+        this.stopNode(nodeName);
+
+        //Listener was invoked
+        await().timeout(CLI_CHECK_CONNECTION_PERIOD_SECONDS * 2, 
TimeUnit.SECONDS).until(() -> connectionRestored.get() == 1);
+        await().timeout(CLI_CHECK_CONNECTION_PERIOD_SECONDS * 2, 
TimeUnit.SECONDS).until(() -> connectionLost.get() == 1);
+
+        //Tear down
+        this.startNode(nodeName);

Review Comment:
   I think stop should be invoked instead.



##########
modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/ItConnectionHeartbeatTest.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cli.commands;
+
+import static org.awaitility.Awaitility.await;
+import static org.junit.jupiter.api.Assertions.assertAll;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+import io.micronaut.context.annotation.Property;
+import io.micronaut.context.annotation.Value;
+import jakarta.inject.Inject;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.cli.core.repl.Session;
+import org.apache.ignite.internal.cli.event.EventFactory;
+import org.apache.ignite.internal.cli.event.EventListener;
+import org.apache.ignite.internal.cli.event.EventType;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+
+@Property(name="cli.check.connection.period.second", value="1")
+class ItConnectionHeartbeatTest extends 
CliCommandTestInitializedIntegrationBase {
+
+    @Inject
+    Session session;
+
+    @Inject
+    EventFactory eventFactory;
+
+    @Value("${cli.check.connection.period.second}")
+    private long CLI_CHECK_CONNECTION_PERIOD_SECONDS;
+
+    private final AtomicInteger connectionLost = new AtomicInteger(0);

Review Comment:
   ```suggestion
       private final AtomicInteger connectionLostCount = new AtomicInteger(0);
   ```



##########
modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/ItConnectionHeartbeatTest.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cli.commands;
+
+import static org.awaitility.Awaitility.await;
+import static org.junit.jupiter.api.Assertions.assertAll;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+import io.micronaut.context.annotation.Property;
+import io.micronaut.context.annotation.Value;
+import jakarta.inject.Inject;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.cli.core.repl.Session;
+import org.apache.ignite.internal.cli.event.EventFactory;
+import org.apache.ignite.internal.cli.event.EventListener;
+import org.apache.ignite.internal.cli.event.EventType;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+
+@Property(name="cli.check.connection.period.second", value="1")
+class ItConnectionHeartbeatTest extends 
CliCommandTestInitializedIntegrationBase {
+
+    @Inject
+    Session session;
+
+    @Inject
+    EventFactory eventFactory;
+
+    @Value("${cli.check.connection.period.second}")
+    private long CLI_CHECK_CONNECTION_PERIOD_SECONDS;
+
+    private final AtomicInteger connectionLost = new AtomicInteger(0);
+    private final AtomicInteger connectionRestored = new AtomicInteger(0);

Review Comment:
   ```suggestion
       private final AtomicInteger connectionRestoredCount = new 
AtomicInteger(0);
   ```



##########
modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/ItConnectionHeartbeatTest.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cli.commands;
+
+import static org.awaitility.Awaitility.await;
+import static org.junit.jupiter.api.Assertions.assertAll;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+import io.micronaut.context.annotation.Property;
+import io.micronaut.context.annotation.Value;
+import jakarta.inject.Inject;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.cli.core.repl.Session;
+import org.apache.ignite.internal.cli.event.EventFactory;
+import org.apache.ignite.internal.cli.event.EventListener;
+import org.apache.ignite.internal.cli.event.EventType;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+
+@Property(name="cli.check.connection.period.second", value="1")
+class ItConnectionHeartbeatTest extends 
CliCommandTestInitializedIntegrationBase {
+
+    @Inject
+    Session session;
+
+    @Inject
+    EventFactory eventFactory;
+
+    @Value("${cli.check.connection.period.second}")
+    private long CLI_CHECK_CONNECTION_PERIOD_SECONDS;
+
+    private final AtomicInteger connectionLost = new AtomicInteger(0);
+    private final AtomicInteger connectionRestored = new AtomicInteger(0);
+
+    @BeforeEach
+    void setUp() {
+        //ToDo: Set connection check timeout to 1 sec to make test fast
+        connectionLost.set(0);
+        connectionRestored.set(0);
+        EventListener eventListener = (eventType, event) -> {
+            if (EventType.CONNECTION_LOST == eventType) {
+                connectionLost.incrementAndGet();
+            } else if (EventType.CONNECTION_RESTORED == eventType) {
+                connectionRestored.incrementAndGet();
+            }
+        };
+
+        //Register listeners
+        eventFactory.listen(EventType.CONNECTION_LOST, eventListener);
+        eventFactory.listen(EventType.CONNECTION_RESTORED, eventListener);
+    }
+
+    @Override
+    protected Class<?> getCommandClass() {
+        return TopLevelCliReplCommand.class;
+    }
+
+    @Test
+    @DisplayName("Should send event CONNECTION_RESTORED on connection start")
+    void connectionEstablished() {
+        // Given null session info before connect
+        assertNull(session.info());
+
+        // When connect without parameters
+        execute("connect");
+
+        // Then
+        assertAll(
+                this::assertErrOutputIsEmpty,
+                () -> assertOutputContains("Connected to 
http://localhost:10300";)
+        );
+
+        //Listener was invoked
+        await().timeout(CLI_CHECK_CONNECTION_PERIOD_SECONDS * 2, 
TimeUnit.SECONDS).until(() -> connectionRestored.get() == 1);
+        assertEquals(0, connectionLost.get());
+    }
+
+    @Test
+    @DisplayName("Should send event CONNECTION_LOST on cluster stop")
+    void onConnectionLost() {
+        // Given connected cli
+        execute("connect");
+
+        // Then
+        assertAll(
+                this::assertErrOutputIsEmpty,
+                () -> assertOutputContains("Connected to 
http://localhost:10300";)
+        );
+
+        // When stop node
+        String nodeName = session.info().nodeName();
+        this.stopNode(nodeName);
+
+        //Listener was invoked

Review Comment:
   ```suggestion
           // Listener was invoked
   ```



##########
modules/cli/src/main/java/org/apache/ignite/internal/cli/call/connect/ConnectCall.java:
##########
@@ -56,15 +61,18 @@ public class ConnectCall implements Call<UrlCallInput, 
String> {
 
     private final JdbcUrlFactory jdbcUrlFactory;
 
+    private final EventFactory eventFactory;
+
     /**
      * Constructor.
      */
     public ConnectCall(Session session, StateConfigProvider 
stateConfigProvider, ApiClientFactory clientFactory,
-            JdbcUrlFactory jdbcUrlFactory) {
+            JdbcUrlFactory jdbcUrlFactory, ConnectionHeartBeat 
connectionHeartBeat, EventFactory eventFactory) {

Review Comment:
   ```suggestion
               JdbcUrlFactory jdbcUrlFactory, EventFactory eventFactory) {
   ```



##########
modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/ItConnectionHeartbeatTest.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cli.commands;
+
+import static org.awaitility.Awaitility.await;
+import static org.junit.jupiter.api.Assertions.assertAll;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+import io.micronaut.context.annotation.Property;
+import io.micronaut.context.annotation.Value;
+import jakarta.inject.Inject;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.cli.core.repl.Session;
+import org.apache.ignite.internal.cli.event.EventFactory;
+import org.apache.ignite.internal.cli.event.EventListener;
+import org.apache.ignite.internal.cli.event.EventType;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+
+@Property(name="cli.check.connection.period.second", value="1")
+class ItConnectionHeartbeatTest extends 
CliCommandTestInitializedIntegrationBase {
+
+    @Inject
+    Session session;
+
+    @Inject
+    EventFactory eventFactory;
+
+    @Value("${cli.check.connection.period.second}")
+    private long CLI_CHECK_CONNECTION_PERIOD_SECONDS;
+
+    private final AtomicInteger connectionLost = new AtomicInteger(0);
+    private final AtomicInteger connectionRestored = new AtomicInteger(0);
+
+    @BeforeEach
+    void setUp() {
+        //ToDo: Set connection check timeout to 1 sec to make test fast
+        connectionLost.set(0);
+        connectionRestored.set(0);
+        EventListener eventListener = (eventType, event) -> {
+            if (EventType.CONNECTION_LOST == eventType) {
+                connectionLost.incrementAndGet();
+            } else if (EventType.CONNECTION_RESTORED == eventType) {
+                connectionRestored.incrementAndGet();
+            }
+        };
+
+        //Register listeners
+        eventFactory.listen(EventType.CONNECTION_LOST, eventListener);
+        eventFactory.listen(EventType.CONNECTION_RESTORED, eventListener);
+    }
+
+    @Override
+    protected Class<?> getCommandClass() {
+        return TopLevelCliReplCommand.class;
+    }
+
+    @Test
+    @DisplayName("Should send event CONNECTION_RESTORED on connection start")
+    void connectionEstablished() {
+        // Given null session info before connect
+        assertNull(session.info());
+
+        // When connect without parameters
+        execute("connect");
+
+        // Then
+        assertAll(
+                this::assertErrOutputIsEmpty,
+                () -> assertOutputContains("Connected to 
http://localhost:10300";)
+        );
+
+        //Listener was invoked
+        await().timeout(CLI_CHECK_CONNECTION_PERIOD_SECONDS * 2, 
TimeUnit.SECONDS).until(() -> connectionRestored.get() == 1);
+        assertEquals(0, connectionLost.get());
+    }
+
+    @Test
+    @DisplayName("Should send event CONNECTION_LOST on cluster stop")
+    void onConnectionLost() {
+        // Given connected cli
+        execute("connect");
+
+        // Then
+        assertAll(
+                this::assertErrOutputIsEmpty,
+                () -> assertOutputContains("Connected to 
http://localhost:10300";)
+        );
+
+        // When stop node
+        String nodeName = session.info().nodeName();
+        this.stopNode(nodeName);
+
+        //Listener was invoked
+        await().timeout(CLI_CHECK_CONNECTION_PERIOD_SECONDS * 2, 
TimeUnit.SECONDS).until(() -> connectionRestored.get() == 1);
+        await().timeout(CLI_CHECK_CONNECTION_PERIOD_SECONDS * 2, 
TimeUnit.SECONDS).until(() -> connectionLost.get() == 1);
+
+        //Tear down
+        this.startNode(nodeName);
+    }
+
+    @Test
+    @DisplayName("Should send event CONNECTION_LOST on cluster stop")
+    void restoreConnectionAfterConnectionLost() {
+        // Given connected cli
+        execute("connect");
+
+        // Then
+        assertAll(
+                this::assertErrOutputIsEmpty,
+                () -> assertOutputContains("Connected to 
http://localhost:10300";)
+        );
+
+        // When stop node
+        String nodeName = session.info().nodeName();
+        this.stopNode(nodeName);
+
+        //Then
+        await().timeout(CLI_CHECK_CONNECTION_PERIOD_SECONDS * 2, 
TimeUnit.SECONDS).until(() -> connectionRestored.get() == 1);
+        await().timeout(CLI_CHECK_CONNECTION_PERIOD_SECONDS * 2, 
TimeUnit.SECONDS).until(() -> connectionLost.get() == 1);
+
+        // When
+        this.startNode(nodeName);
+
+        //Then
+        await().timeout(CLI_CHECK_CONNECTION_PERIOD_SECONDS * 2, 
TimeUnit.SECONDS).until(() -> connectionRestored.get() == 2);
+        await().timeout(CLI_CHECK_CONNECTION_PERIOD_SECONDS * 2, 
TimeUnit.SECONDS).until(() -> connectionLost.get() == 1);

Review Comment:
   We've already waited for this condition.



##########
modules/cli/src/main/java/org/apache/ignite/internal/cli/call/connect/DisconnectCall.java:
##########
@@ -17,35 +17,40 @@
 
 package org.apache.ignite.internal.cli.call.connect;
 
-import jakarta.inject.Inject;
 import jakarta.inject.Singleton;
 import org.apache.ignite.internal.cli.core.call.Call;
 import org.apache.ignite.internal.cli.core.call.CallOutput;
 import org.apache.ignite.internal.cli.core.call.DefaultCallOutput;
 import org.apache.ignite.internal.cli.core.call.EmptyCallInput;
+import org.apache.ignite.internal.cli.core.repl.ConnectionHeartBeat;
 import org.apache.ignite.internal.cli.core.repl.Session;
+import org.apache.ignite.internal.cli.core.repl.SessionDisconnectEvent;
 import org.apache.ignite.internal.cli.core.repl.SessionInfo;
 import org.apache.ignite.internal.cli.core.style.component.MessageUiComponent;
 import org.apache.ignite.internal.cli.core.style.element.UiElements;
+import org.apache.ignite.internal.cli.event.EventFactory;
+import org.apache.ignite.internal.cli.event.EventType;
 
 /**
  * Call for disconnect.
  */
 @Singleton
 public class DisconnectCall implements Call<EmptyCallInput, String> {
-    @Inject
     private final Session session;
 
-    public DisconnectCall(Session session) {
+    private final EventFactory eventFactory;
+
+    public DisconnectCall(Session session, ConnectionHeartBeat 
connectionHeartBeat, EventFactory eventFactory) {

Review Comment:
   ```suggestion
       public DisconnectCall(Session session, EventFactory eventFactory) {
   ```



##########
modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/ConnectionHeartBeat.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cli.core.repl;
+
+import io.micronaut.context.annotation.Value;
+import jakarta.inject.Singleton;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.annotation.Nullable;
+import org.apache.ignite.internal.cli.core.rest.ApiClientFactory;
+import org.apache.ignite.internal.cli.event.Event;
+import org.apache.ignite.internal.cli.event.EventFactory;
+import org.apache.ignite.internal.cli.event.EventListener;
+import org.apache.ignite.internal.cli.event.EventType;
+import org.apache.ignite.internal.cli.logger.CliLoggers;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.rest.client.api.NodeManagementApi;
+import org.apache.ignite.rest.client.invoker.ApiException;
+
+/**
+ * Connection to node heart beat.
+ */
+@Singleton
+public class ConnectionHeartBeat implements EventListener {
+
+    private static final IgniteLogger log = 
CliLoggers.forClass(ConnectionHeartBeat.class);
+
+    /** CLI check connection period period. */
+    private final long cliCheckConnectionPeriodSecond;
+
+    /** Scheduled executor for connection heartbeat. */
+    @Nullable
+    private ScheduledExecutorService scheduledConnectionHeartbeatExecutor;
+
+    private final ApiClientFactory clientFactory;
+
+    private final EventFactory eventFactory;
+
+    private final AtomicBoolean connected = new AtomicBoolean(false);
+
+    /**
+     * Created instance of connection heartbeat.
+     *
+     * @param clientFactory api client factory.
+     * @param eventFactory event factory.
+     */
+    public 
ConnectionHeartBeat(@Value("${cli.check.connection.period.second:5}") long 
cliCheckConnectionPeriodSecond,
+            ApiClientFactory clientFactory,
+            EventFactory eventFactory) {
+        this.clientFactory = clientFactory;
+        this.eventFactory = eventFactory;
+        this.cliCheckConnectionPeriodSecond = cliCheckConnectionPeriodSecond;
+    }
+
+    /**
+     * Starts connection heartbeat. By default connection will be checked 
every 5 sec.
+     *
+     * @param sessionInfo session info with node url
+     */
+    private void onConnect(SessionInfo sessionInfo) {
+        //eventFactory.fireEvent(EventType.CONNECTION_RESTORED, new 
ConnectionStatusEvent());

Review Comment:
   commented code



##########
modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/ConnectionHeartBeat.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cli.core.repl;
+
+import io.micronaut.context.annotation.Value;
+import jakarta.inject.Singleton;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.annotation.Nullable;
+import org.apache.ignite.internal.cli.core.rest.ApiClientFactory;
+import org.apache.ignite.internal.cli.event.Event;
+import org.apache.ignite.internal.cli.event.EventFactory;
+import org.apache.ignite.internal.cli.event.EventListener;
+import org.apache.ignite.internal.cli.event.EventType;
+import org.apache.ignite.internal.cli.logger.CliLoggers;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.rest.client.api.NodeManagementApi;
+import org.apache.ignite.rest.client.invoker.ApiException;
+
+/**
+ * Connection to node heart beat.
+ */
+@Singleton
+public class ConnectionHeartBeat implements EventListener {
+
+    private static final IgniteLogger log = 
CliLoggers.forClass(ConnectionHeartBeat.class);
+
+    /** CLI check connection period period. */
+    private final long cliCheckConnectionPeriodSecond;
+
+    /** Scheduled executor for connection heartbeat. */
+    @Nullable
+    private ScheduledExecutorService scheduledConnectionHeartbeatExecutor;
+
+    private final ApiClientFactory clientFactory;
+
+    private final EventFactory eventFactory;
+
+    private final AtomicBoolean connected = new AtomicBoolean(false);
+
+    /**
+     * Created instance of connection heartbeat.
+     *
+     * @param clientFactory api client factory.
+     * @param eventFactory event factory.
+     */
+    public 
ConnectionHeartBeat(@Value("${cli.check.connection.period.second:5}") long 
cliCheckConnectionPeriodSecond,
+            ApiClientFactory clientFactory,
+            EventFactory eventFactory) {
+        this.clientFactory = clientFactory;
+        this.eventFactory = eventFactory;
+        this.cliCheckConnectionPeriodSecond = cliCheckConnectionPeriodSecond;
+    }
+
+    /**
+     * Starts connection heartbeat. By default connection will be checked 
every 5 sec.
+     *
+     * @param sessionInfo session info with node url
+     */
+    private void onConnect(SessionInfo sessionInfo) {
+        //eventFactory.fireEvent(EventType.CONNECTION_RESTORED, new 
ConnectionStatusEvent());
+
+        if (scheduledConnectionHeartbeatExecutor == null) {
+            scheduledConnectionHeartbeatExecutor =
+                    Executors.newScheduledThreadPool(1, new 
NamedThreadFactory("cli-check-connection-thread", log));
+
+            //Start connection heart beat

Review Comment:
   ```suggestion
               // Start connection heart beat.
   ```



##########
modules/cli/src/main/java/org/apache/ignite/internal/cli/event/Event.java:
##########
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cli.event;
+
+/**
+ * The event cas which is produced by event producer component.
+ */
+public interface Event {
+}

Review Comment:
   missed eof



##########
modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/ReplBuilder.java:
##########
@@ -152,4 +155,15 @@ public ReplBuilder withAutosuggestionsWidgets() {
         this.autosuggestionsWidgetsEnabled = true;
         return this;
     }
+
+    /**
+     * Builder setter of {@code aliases} field.
+     *
+     * @param eventSubscriber ???? map of aliases for commands.

Review Comment:
   ```suggestion
        * @param eventSubscriber map of aliases for commands.
   ```



##########
modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/PeriodicSessionTaskExecutor.java:
##########
@@ -24,35 +24,48 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.cli.event.Event;
+import org.apache.ignite.internal.cli.event.EventListener;
+import org.apache.ignite.internal.cli.event.EventType;
 import org.apache.ignite.internal.cli.logger.CliLoggers;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
 import org.jetbrains.annotations.Nullable;
 
 /** Executes tasks periodically while the session is connected. */
 @Singleton
-public class PeriodicSessionTaskExecutor implements AsyncSessionEventListener {
+public class PeriodicSessionTaskExecutor implements EventListener {
     private static final IgniteLogger LOG = 
CliLoggers.forClass(PeriodicSessionTaskExecutor.class);
 
     @Nullable
     private ScheduledExecutorService executor;
 
     private final List<? extends PeriodicSessionTask> tasks;
 
+    //private final EventFactory eventFactory;

Review Comment:
   commented



##########
modules/cli/src/main/java/org/apache/ignite/internal/cli/event/EventFactory.java:
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cli.event;
+
+import jakarta.inject.Singleton;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import org.apache.ignite.internal.cli.logger.CliLoggers;
+import org.apache.ignite.internal.logger.IgniteLogger;
+
+/**
+ * Register listeners and produces events.
+ */
+@Singleton
+public class EventFactory {
+
+    private static final IgniteLogger log = 
CliLoggers.forClass(EventFactory.class);
+
+    /** All listeners. */
+    private final ConcurrentHashMap<EventType, List<EventListener>> listeners 
= new ConcurrentHashMap<>();
+
+    public EventFactory() {
+    }
+
+    /**
+     * Registers an event listener.
+     *
+     * @param eventType     type of event to listen.
+     * @param eventListener event listener.
+     */
+    public void listen(EventType eventType, EventListener eventListener) {
+        listeners.computeIfAbsent(eventType, evtKey -> new 
CopyOnWriteArrayList<>()).add(eventListener);
+    }
+
+    /**
+     * Removes a listener associated with the event type.
+     *
+     * @param eventType     type of event to listen.
+     * @param eventListener event listener.
+     */
+    public void removeListener(EventType eventType, EventListener 
eventListener) {
+        listeners.computeIfAbsent(eventType, eventT -> new 
CopyOnWriteArrayList<>()).remove(eventListener);

Review Comment:
   Use`computeIfPresent` here



##########
modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/ReplBuilder.java:
##########
@@ -152,4 +155,15 @@ public ReplBuilder withAutosuggestionsWidgets() {
         this.autosuggestionsWidgetsEnabled = true;
         return this;
     }
+
+    /**
+     * Builder setter of {@code aliases} field.
+     *
+     * @param eventSubscriber ???? map of aliases for commands.

Review Comment:
   the whole java doc here looks like it came from another file.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to