PakhomovAlexander commented on code in PR #2359: URL: https://github.com/apache/ignite-3/pull/2359#discussion_r1284461787
########## modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/ItConnectionHeartbeatTest.java: ########## @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.cli.commands; + +import static org.awaitility.Awaitility.await; +import static org.junit.jupiter.api.Assertions.assertAll; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +import io.micronaut.context.annotation.Property; +import io.micronaut.context.annotation.Value; +import jakarta.inject.Inject; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.internal.cli.core.repl.Session; +import org.apache.ignite.internal.cli.event.EventFactory; +import org.apache.ignite.internal.cli.event.EventListener; +import org.apache.ignite.internal.cli.event.EventType; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +@Property(name="cli.check.connection.period.second", value="1") +class ItConnectionHeartbeatTest extends CliCommandTestInitializedIntegrationBase { + + @Inject + Session session; + + @Inject + EventFactory eventFactory; + + @Value("${cli.check.connection.period.second}") + private long CLI_CHECK_CONNECTION_PERIOD_SECONDS; + + private final AtomicInteger connectionLost = new AtomicInteger(0); + private final AtomicInteger connectionRestored = new AtomicInteger(0); + + @BeforeEach + void setUp() { + //ToDo: Set connection check timeout to 1 sec to make test fast Review Comment: We tend to link any todo with the ticket. Could you create one or fix todo here? ########## modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/ItConnectionHeartbeatTest.java: ########## @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.cli.commands; + +import static org.awaitility.Awaitility.await; +import static org.junit.jupiter.api.Assertions.assertAll; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +import io.micronaut.context.annotation.Property; +import io.micronaut.context.annotation.Value; +import jakarta.inject.Inject; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.internal.cli.core.repl.Session; +import org.apache.ignite.internal.cli.event.EventFactory; +import org.apache.ignite.internal.cli.event.EventListener; +import org.apache.ignite.internal.cli.event.EventType; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +@Property(name="cli.check.connection.period.second", value="1") +class ItConnectionHeartbeatTest extends CliCommandTestInitializedIntegrationBase { + + @Inject + Session session; + + @Inject + EventFactory eventFactory; + + @Value("${cli.check.connection.period.second}") + private long CLI_CHECK_CONNECTION_PERIOD_SECONDS; Review Comment: ```suggestion private final long CLI_CHECK_CONNECTION_PERIOD_SECONDS = 1; ``` ########## modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/ConnectionHeartBeat.java: ########## @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.cli.core.repl; + +import io.micronaut.context.annotation.Value; +import jakarta.inject.Singleton; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import javax.annotation.Nullable; +import org.apache.ignite.internal.cli.core.rest.ApiClientFactory; +import org.apache.ignite.internal.cli.event.Event; +import org.apache.ignite.internal.cli.event.EventFactory; +import org.apache.ignite.internal.cli.event.EventListener; +import org.apache.ignite.internal.cli.event.EventType; +import org.apache.ignite.internal.cli.logger.CliLoggers; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.thread.NamedThreadFactory; +import org.apache.ignite.rest.client.api.NodeManagementApi; +import org.apache.ignite.rest.client.invoker.ApiException; + +/** + * Connection to node heart beat. + */ +@Singleton +public class ConnectionHeartBeat implements EventListener { + + private static final IgniteLogger log = CliLoggers.forClass(ConnectionHeartBeat.class); + + /** CLI check connection period period. */ + private final long cliCheckConnectionPeriodSecond; + + /** Scheduled executor for connection heartbeat. */ + @Nullable + private ScheduledExecutorService scheduledConnectionHeartbeatExecutor; + + private final ApiClientFactory clientFactory; + + private final EventFactory eventFactory; + + private final AtomicBoolean connected = new AtomicBoolean(false); + + /** + * Created instance of connection heartbeat. Review Comment: ```suggestion * Creates the instance of connection heartbeat. ``` ########## modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/ConnectionHeartBeat.java: ########## @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.cli.core.repl; + +import io.micronaut.context.annotation.Value; +import jakarta.inject.Singleton; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import javax.annotation.Nullable; +import org.apache.ignite.internal.cli.core.rest.ApiClientFactory; +import org.apache.ignite.internal.cli.event.Event; +import org.apache.ignite.internal.cli.event.EventFactory; +import org.apache.ignite.internal.cli.event.EventListener; +import org.apache.ignite.internal.cli.event.EventType; +import org.apache.ignite.internal.cli.logger.CliLoggers; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.thread.NamedThreadFactory; +import org.apache.ignite.rest.client.api.NodeManagementApi; +import org.apache.ignite.rest.client.invoker.ApiException; + +/** + * Connection to node heart beat. + */ +@Singleton +public class ConnectionHeartBeat implements EventListener { + + private static final IgniteLogger log = CliLoggers.forClass(ConnectionHeartBeat.class); + + /** CLI check connection period period. */ + private final long cliCheckConnectionPeriodSecond; + + /** Scheduled executor for connection heartbeat. */ + @Nullable + private ScheduledExecutorService scheduledConnectionHeartbeatExecutor; + + private final ApiClientFactory clientFactory; + + private final EventFactory eventFactory; + + private final AtomicBoolean connected = new AtomicBoolean(false); + + /** + * Created instance of connection heartbeat. + * + * @param clientFactory api client factory. + * @param eventFactory event factory. + */ + public ConnectionHeartBeat(@Value("${cli.check.connection.period.second:5}") long cliCheckConnectionPeriodSecond, + ApiClientFactory clientFactory, + EventFactory eventFactory) { + this.clientFactory = clientFactory; + this.eventFactory = eventFactory; + this.cliCheckConnectionPeriodSecond = cliCheckConnectionPeriodSecond; + } + + /** + * Starts connection heartbeat. By default connection will be checked every 5 sec. + * + * @param sessionInfo session info with node url + */ + private void onConnect(SessionInfo sessionInfo) { + //eventFactory.fireEvent(EventType.CONNECTION_RESTORED, new ConnectionStatusEvent()); + + if (scheduledConnectionHeartbeatExecutor == null) { + scheduledConnectionHeartbeatExecutor = + Executors.newScheduledThreadPool(1, new NamedThreadFactory("cli-check-connection-thread", log)); + + //Start connection heart beat + scheduledConnectionHeartbeatExecutor.scheduleAtFixedRate( + () -> pingConnection(sessionInfo.nodeUrl()), + 0, + cliCheckConnectionPeriodSecond, + TimeUnit.SECONDS + ); + } + } + + /** + * Stops connection heartbeat. + */ + private void onDisconnect() { + if (scheduledConnectionHeartbeatExecutor != null) { + scheduledConnectionHeartbeatExecutor.shutdownNow(); + scheduledConnectionHeartbeatExecutor = null; + } + } + + private void pingConnection(String nodeUrl) { + try { + new NodeManagementApi(clientFactory.getClient(nodeUrl)).nodeState(); + if (!connected.get()) { + connected.compareAndSet(false, true); Review Comment: That is not thread-safe. I would suggest calling `connected.compareAndSet(false, true)` and check the result of the operation. ########## modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/ItConnectionHeartbeatTest.java: ########## @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.cli.commands; + +import static org.awaitility.Awaitility.await; +import static org.junit.jupiter.api.Assertions.assertAll; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +import io.micronaut.context.annotation.Property; +import io.micronaut.context.annotation.Value; +import jakarta.inject.Inject; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.internal.cli.core.repl.Session; +import org.apache.ignite.internal.cli.event.EventFactory; +import org.apache.ignite.internal.cli.event.EventListener; +import org.apache.ignite.internal.cli.event.EventType; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +@Property(name="cli.check.connection.period.second", value="1") +class ItConnectionHeartbeatTest extends CliCommandTestInitializedIntegrationBase { + + @Inject + Session session; + + @Inject + EventFactory eventFactory; + + @Value("${cli.check.connection.period.second}") + private long CLI_CHECK_CONNECTION_PERIOD_SECONDS; + + private final AtomicInteger connectionLost = new AtomicInteger(0); + private final AtomicInteger connectionRestored = new AtomicInteger(0); + + @BeforeEach + void setUp() { + //ToDo: Set connection check timeout to 1 sec to make test fast + connectionLost.set(0); + connectionRestored.set(0); + EventListener eventListener = (eventType, event) -> { + if (EventType.CONNECTION_LOST == eventType) { + connectionLost.incrementAndGet(); + } else if (EventType.CONNECTION_RESTORED == eventType) { + connectionRestored.incrementAndGet(); + } + }; + + //Register listeners + eventFactory.listen(EventType.CONNECTION_LOST, eventListener); + eventFactory.listen(EventType.CONNECTION_RESTORED, eventListener); + } + + @Override + protected Class<?> getCommandClass() { + return TopLevelCliReplCommand.class; + } + + @Test + @DisplayName("Should send event CONNECTION_RESTORED on connection start") + void connectionEstablished() { + // Given null session info before connect + assertNull(session.info()); + + // When connect without parameters + execute("connect"); + + // Then + assertAll( + this::assertErrOutputIsEmpty, + () -> assertOutputContains("Connected to http://localhost:10300") + ); + + //Listener was invoked + await().timeout(CLI_CHECK_CONNECTION_PERIOD_SECONDS * 2, TimeUnit.SECONDS).until(() -> connectionRestored.get() == 1); + assertEquals(0, connectionLost.get()); + } + + @Test + @DisplayName("Should send event CONNECTION_LOST on cluster stop") + void onConnectionLost() { + // Given connected cli + execute("connect"); + + // Then + assertAll( + this::assertErrOutputIsEmpty, + () -> assertOutputContains("Connected to http://localhost:10300") + ); + + // When stop node + String nodeName = session.info().nodeName(); + this.stopNode(nodeName); + + //Listener was invoked + await().timeout(CLI_CHECK_CONNECTION_PERIOD_SECONDS * 2, TimeUnit.SECONDS).until(() -> connectionRestored.get() == 1); + await().timeout(CLI_CHECK_CONNECTION_PERIOD_SECONDS * 2, TimeUnit.SECONDS).until(() -> connectionLost.get() == 1); + + //Tear down + this.startNode(nodeName); Review Comment: As far as I see it is already stopped. ########## modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/ItConnectionHeartbeatTest.java: ########## @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.cli.commands; + +import static org.awaitility.Awaitility.await; +import static org.junit.jupiter.api.Assertions.assertAll; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +import io.micronaut.context.annotation.Property; +import io.micronaut.context.annotation.Value; +import jakarta.inject.Inject; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.internal.cli.core.repl.Session; +import org.apache.ignite.internal.cli.event.EventFactory; +import org.apache.ignite.internal.cli.event.EventListener; +import org.apache.ignite.internal.cli.event.EventType; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +@Property(name="cli.check.connection.period.second", value="1") +class ItConnectionHeartbeatTest extends CliCommandTestInitializedIntegrationBase { + + @Inject + Session session; + + @Inject + EventFactory eventFactory; + + @Value("${cli.check.connection.period.second}") + private long CLI_CHECK_CONNECTION_PERIOD_SECONDS; + + private final AtomicInteger connectionLost = new AtomicInteger(0); + private final AtomicInteger connectionRestored = new AtomicInteger(0); + + @BeforeEach + void setUp() { + //ToDo: Set connection check timeout to 1 sec to make test fast + connectionLost.set(0); + connectionRestored.set(0); + EventListener eventListener = (eventType, event) -> { + if (EventType.CONNECTION_LOST == eventType) { + connectionLost.incrementAndGet(); + } else if (EventType.CONNECTION_RESTORED == eventType) { + connectionRestored.incrementAndGet(); + } + }; + + //Register listeners + eventFactory.listen(EventType.CONNECTION_LOST, eventListener); + eventFactory.listen(EventType.CONNECTION_RESTORED, eventListener); + } + + @Override + protected Class<?> getCommandClass() { + return TopLevelCliReplCommand.class; + } + + @Test + @DisplayName("Should send event CONNECTION_RESTORED on connection start") + void connectionEstablished() { + // Given null session info before connect + assertNull(session.info()); + + // When connect without parameters + execute("connect"); + + // Then + assertAll( + this::assertErrOutputIsEmpty, + () -> assertOutputContains("Connected to http://localhost:10300") + ); + + //Listener was invoked + await().timeout(CLI_CHECK_CONNECTION_PERIOD_SECONDS * 2, TimeUnit.SECONDS).until(() -> connectionRestored.get() == 1); + assertEquals(0, connectionLost.get()); + } + + @Test + @DisplayName("Should send event CONNECTION_LOST on cluster stop") + void onConnectionLost() { + // Given connected cli + execute("connect"); + + // Then + assertAll( + this::assertErrOutputIsEmpty, + () -> assertOutputContains("Connected to http://localhost:10300") + ); + + // When stop node + String nodeName = session.info().nodeName(); + this.stopNode(nodeName); + + //Listener was invoked + await().timeout(CLI_CHECK_CONNECTION_PERIOD_SECONDS * 2, TimeUnit.SECONDS).until(() -> connectionRestored.get() == 1); + await().timeout(CLI_CHECK_CONNECTION_PERIOD_SECONDS * 2, TimeUnit.SECONDS).until(() -> connectionLost.get() == 1); + + //Tear down + this.startNode(nodeName); + } + + @Test + @DisplayName("Should send event CONNECTION_LOST on cluster stop") + void restoreConnectionAfterConnectionLost() { + // Given connected cli + execute("connect"); + + // Then + assertAll( + this::assertErrOutputIsEmpty, + () -> assertOutputContains("Connected to http://localhost:10300") + ); + + // When stop node + String nodeName = session.info().nodeName(); + this.stopNode(nodeName); + + //Then + await().timeout(CLI_CHECK_CONNECTION_PERIOD_SECONDS * 2, TimeUnit.SECONDS).until(() -> connectionRestored.get() == 1); + await().timeout(CLI_CHECK_CONNECTION_PERIOD_SECONDS * 2, TimeUnit.SECONDS).until(() -> connectionLost.get() == 1); + + // When + this.startNode(nodeName); + + //Then Review Comment: ```suggestion // Then ``` ########## modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/ItConnectionHeartbeatTest.java: ########## @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.cli.commands; + +import static org.awaitility.Awaitility.await; +import static org.junit.jupiter.api.Assertions.assertAll; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +import io.micronaut.context.annotation.Property; +import io.micronaut.context.annotation.Value; +import jakarta.inject.Inject; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.internal.cli.core.repl.Session; +import org.apache.ignite.internal.cli.event.EventFactory; +import org.apache.ignite.internal.cli.event.EventListener; +import org.apache.ignite.internal.cli.event.EventType; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +@Property(name="cli.check.connection.period.second", value="1") +class ItConnectionHeartbeatTest extends CliCommandTestInitializedIntegrationBase { + + @Inject + Session session; + + @Inject + EventFactory eventFactory; + + @Value("${cli.check.connection.period.second}") + private long CLI_CHECK_CONNECTION_PERIOD_SECONDS; + + private final AtomicInteger connectionLost = new AtomicInteger(0); + private final AtomicInteger connectionRestored = new AtomicInteger(0); + + @BeforeEach + void setUp() { + //ToDo: Set connection check timeout to 1 sec to make test fast + connectionLost.set(0); + connectionRestored.set(0); + EventListener eventListener = (eventType, event) -> { + if (EventType.CONNECTION_LOST == eventType) { Review Comment: It is better not to use if statement but register two separate event listeners ########## modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/ItConnectionHeartbeatTest.java: ########## @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.cli.commands; + +import static org.awaitility.Awaitility.await; +import static org.junit.jupiter.api.Assertions.assertAll; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +import io.micronaut.context.annotation.Property; +import io.micronaut.context.annotation.Value; +import jakarta.inject.Inject; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.internal.cli.core.repl.Session; +import org.apache.ignite.internal.cli.event.EventFactory; +import org.apache.ignite.internal.cli.event.EventListener; +import org.apache.ignite.internal.cli.event.EventType; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +@Property(name="cli.check.connection.period.second", value="1") +class ItConnectionHeartbeatTest extends CliCommandTestInitializedIntegrationBase { + + @Inject + Session session; + + @Inject + EventFactory eventFactory; + + @Value("${cli.check.connection.period.second}") + private long CLI_CHECK_CONNECTION_PERIOD_SECONDS; + + private final AtomicInteger connectionLost = new AtomicInteger(0); + private final AtomicInteger connectionRestored = new AtomicInteger(0); + + @BeforeEach + void setUp() { + //ToDo: Set connection check timeout to 1 sec to make test fast + connectionLost.set(0); + connectionRestored.set(0); + EventListener eventListener = (eventType, event) -> { + if (EventType.CONNECTION_LOST == eventType) { + connectionLost.incrementAndGet(); + } else if (EventType.CONNECTION_RESTORED == eventType) { + connectionRestored.incrementAndGet(); + } + }; + + //Register listeners + eventFactory.listen(EventType.CONNECTION_LOST, eventListener); + eventFactory.listen(EventType.CONNECTION_RESTORED, eventListener); + } + + @Override + protected Class<?> getCommandClass() { + return TopLevelCliReplCommand.class; + } + + @Test + @DisplayName("Should send event CONNECTION_RESTORED on connection start") + void connectionEstablished() { + // Given null session info before connect + assertNull(session.info()); + + // When connect without parameters + execute("connect"); + + // Then + assertAll( + this::assertErrOutputIsEmpty, + () -> assertOutputContains("Connected to http://localhost:10300") + ); + + //Listener was invoked + await().timeout(CLI_CHECK_CONNECTION_PERIOD_SECONDS * 2, TimeUnit.SECONDS).until(() -> connectionRestored.get() == 1); + assertEquals(0, connectionLost.get()); + } + + @Test + @DisplayName("Should send event CONNECTION_LOST on cluster stop") + void onConnectionLost() { + // Given connected cli + execute("connect"); + + // Then + assertAll( + this::assertErrOutputIsEmpty, + () -> assertOutputContains("Connected to http://localhost:10300") + ); + + // When stop node + String nodeName = session.info().nodeName(); + this.stopNode(nodeName); + + //Listener was invoked + await().timeout(CLI_CHECK_CONNECTION_PERIOD_SECONDS * 2, TimeUnit.SECONDS).until(() -> connectionRestored.get() == 1); + await().timeout(CLI_CHECK_CONNECTION_PERIOD_SECONDS * 2, TimeUnit.SECONDS).until(() -> connectionLost.get() == 1); + + //Tear down Review Comment: ```suggestion // Tear down ``` ########## modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/ItConnectionHeartbeatTest.java: ########## @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.cli.commands; + +import static org.awaitility.Awaitility.await; +import static org.junit.jupiter.api.Assertions.assertAll; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +import io.micronaut.context.annotation.Property; +import io.micronaut.context.annotation.Value; +import jakarta.inject.Inject; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.internal.cli.core.repl.Session; +import org.apache.ignite.internal.cli.event.EventFactory; +import org.apache.ignite.internal.cli.event.EventListener; +import org.apache.ignite.internal.cli.event.EventType; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +@Property(name="cli.check.connection.period.second", value="1") +class ItConnectionHeartbeatTest extends CliCommandTestInitializedIntegrationBase { + + @Inject + Session session; + + @Inject + EventFactory eventFactory; + + @Value("${cli.check.connection.period.second}") + private long CLI_CHECK_CONNECTION_PERIOD_SECONDS; + + private final AtomicInteger connectionLost = new AtomicInteger(0); + private final AtomicInteger connectionRestored = new AtomicInteger(0); + + @BeforeEach + void setUp() { + //ToDo: Set connection check timeout to 1 sec to make test fast + connectionLost.set(0); + connectionRestored.set(0); + EventListener eventListener = (eventType, event) -> { + if (EventType.CONNECTION_LOST == eventType) { + connectionLost.incrementAndGet(); + } else if (EventType.CONNECTION_RESTORED == eventType) { + connectionRestored.incrementAndGet(); + } + }; + + //Register listeners + eventFactory.listen(EventType.CONNECTION_LOST, eventListener); + eventFactory.listen(EventType.CONNECTION_RESTORED, eventListener); + } + + @Override + protected Class<?> getCommandClass() { + return TopLevelCliReplCommand.class; + } + + @Test + @DisplayName("Should send event CONNECTION_RESTORED on connection start") + void connectionEstablished() { + // Given null session info before connect + assertNull(session.info()); + + // When connect without parameters + execute("connect"); + + // Then + assertAll( + this::assertErrOutputIsEmpty, + () -> assertOutputContains("Connected to http://localhost:10300") + ); + + //Listener was invoked + await().timeout(CLI_CHECK_CONNECTION_PERIOD_SECONDS * 2, TimeUnit.SECONDS).until(() -> connectionRestored.get() == 1); + assertEquals(0, connectionLost.get()); + } + + @Test + @DisplayName("Should send event CONNECTION_LOST on cluster stop") + void onConnectionLost() { + // Given connected cli + execute("connect"); + + // Then + assertAll( + this::assertErrOutputIsEmpty, + () -> assertOutputContains("Connected to http://localhost:10300") + ); + + // When stop node + String nodeName = session.info().nodeName(); + this.stopNode(nodeName); + + //Listener was invoked + await().timeout(CLI_CHECK_CONNECTION_PERIOD_SECONDS * 2, TimeUnit.SECONDS).until(() -> connectionRestored.get() == 1); + await().timeout(CLI_CHECK_CONNECTION_PERIOD_SECONDS * 2, TimeUnit.SECONDS).until(() -> connectionLost.get() == 1); + + //Tear down + this.startNode(nodeName); + } + + @Test + @DisplayName("Should send event CONNECTION_LOST on cluster stop") + void restoreConnectionAfterConnectionLost() { + // Given connected cli + execute("connect"); + + // Then + assertAll( + this::assertErrOutputIsEmpty, + () -> assertOutputContains("Connected to http://localhost:10300") + ); + + // When stop node + String nodeName = session.info().nodeName(); + this.stopNode(nodeName); + + //Then Review Comment: ```suggestion // Then ``` ########## modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/ItConnectionHeartbeatTest.java: ########## @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.cli.commands; + +import static org.awaitility.Awaitility.await; +import static org.junit.jupiter.api.Assertions.assertAll; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +import io.micronaut.context.annotation.Property; +import io.micronaut.context.annotation.Value; +import jakarta.inject.Inject; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.internal.cli.core.repl.Session; +import org.apache.ignite.internal.cli.event.EventFactory; +import org.apache.ignite.internal.cli.event.EventListener; +import org.apache.ignite.internal.cli.event.EventType; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +@Property(name="cli.check.connection.period.second", value="1") +class ItConnectionHeartbeatTest extends CliCommandTestInitializedIntegrationBase { + + @Inject + Session session; + + @Inject + EventFactory eventFactory; + + @Value("${cli.check.connection.period.second}") + private long CLI_CHECK_CONNECTION_PERIOD_SECONDS; + + private final AtomicInteger connectionLost = new AtomicInteger(0); + private final AtomicInteger connectionRestored = new AtomicInteger(0); + + @BeforeEach + void setUp() { + //ToDo: Set connection check timeout to 1 sec to make test fast + connectionLost.set(0); + connectionRestored.set(0); + EventListener eventListener = (eventType, event) -> { + if (EventType.CONNECTION_LOST == eventType) { + connectionLost.incrementAndGet(); + } else if (EventType.CONNECTION_RESTORED == eventType) { + connectionRestored.incrementAndGet(); + } + }; + + //Register listeners + eventFactory.listen(EventType.CONNECTION_LOST, eventListener); + eventFactory.listen(EventType.CONNECTION_RESTORED, eventListener); + } + + @Override + protected Class<?> getCommandClass() { + return TopLevelCliReplCommand.class; + } + + @Test + @DisplayName("Should send event CONNECTION_RESTORED on connection start") + void connectionEstablished() { + // Given null session info before connect + assertNull(session.info()); + + // When connect without parameters + execute("connect"); + + // Then + assertAll( + this::assertErrOutputIsEmpty, + () -> assertOutputContains("Connected to http://localhost:10300") + ); + + //Listener was invoked + await().timeout(CLI_CHECK_CONNECTION_PERIOD_SECONDS * 2, TimeUnit.SECONDS).until(() -> connectionRestored.get() == 1); + assertEquals(0, connectionLost.get()); + } + + @Test + @DisplayName("Should send event CONNECTION_LOST on cluster stop") + void onConnectionLost() { + // Given connected cli + execute("connect"); + + // Then + assertAll( + this::assertErrOutputIsEmpty, + () -> assertOutputContains("Connected to http://localhost:10300") + ); + + // When stop node + String nodeName = session.info().nodeName(); + this.stopNode(nodeName); + + //Listener was invoked + await().timeout(CLI_CHECK_CONNECTION_PERIOD_SECONDS * 2, TimeUnit.SECONDS).until(() -> connectionRestored.get() == 1); + await().timeout(CLI_CHECK_CONNECTION_PERIOD_SECONDS * 2, TimeUnit.SECONDS).until(() -> connectionLost.get() == 1); + + //Tear down + this.startNode(nodeName); Review Comment: I think stop should be invoked instead. ########## modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/ItConnectionHeartbeatTest.java: ########## @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.cli.commands; + +import static org.awaitility.Awaitility.await; +import static org.junit.jupiter.api.Assertions.assertAll; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +import io.micronaut.context.annotation.Property; +import io.micronaut.context.annotation.Value; +import jakarta.inject.Inject; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.internal.cli.core.repl.Session; +import org.apache.ignite.internal.cli.event.EventFactory; +import org.apache.ignite.internal.cli.event.EventListener; +import org.apache.ignite.internal.cli.event.EventType; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +@Property(name="cli.check.connection.period.second", value="1") +class ItConnectionHeartbeatTest extends CliCommandTestInitializedIntegrationBase { + + @Inject + Session session; + + @Inject + EventFactory eventFactory; + + @Value("${cli.check.connection.period.second}") + private long CLI_CHECK_CONNECTION_PERIOD_SECONDS; + + private final AtomicInteger connectionLost = new AtomicInteger(0); Review Comment: ```suggestion private final AtomicInteger connectionLostCount = new AtomicInteger(0); ``` ########## modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/ItConnectionHeartbeatTest.java: ########## @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.cli.commands; + +import static org.awaitility.Awaitility.await; +import static org.junit.jupiter.api.Assertions.assertAll; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +import io.micronaut.context.annotation.Property; +import io.micronaut.context.annotation.Value; +import jakarta.inject.Inject; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.internal.cli.core.repl.Session; +import org.apache.ignite.internal.cli.event.EventFactory; +import org.apache.ignite.internal.cli.event.EventListener; +import org.apache.ignite.internal.cli.event.EventType; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +@Property(name="cli.check.connection.period.second", value="1") +class ItConnectionHeartbeatTest extends CliCommandTestInitializedIntegrationBase { + + @Inject + Session session; + + @Inject + EventFactory eventFactory; + + @Value("${cli.check.connection.period.second}") + private long CLI_CHECK_CONNECTION_PERIOD_SECONDS; + + private final AtomicInteger connectionLost = new AtomicInteger(0); + private final AtomicInteger connectionRestored = new AtomicInteger(0); Review Comment: ```suggestion private final AtomicInteger connectionRestoredCount = new AtomicInteger(0); ``` ########## modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/ItConnectionHeartbeatTest.java: ########## @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.cli.commands; + +import static org.awaitility.Awaitility.await; +import static org.junit.jupiter.api.Assertions.assertAll; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +import io.micronaut.context.annotation.Property; +import io.micronaut.context.annotation.Value; +import jakarta.inject.Inject; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.internal.cli.core.repl.Session; +import org.apache.ignite.internal.cli.event.EventFactory; +import org.apache.ignite.internal.cli.event.EventListener; +import org.apache.ignite.internal.cli.event.EventType; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +@Property(name="cli.check.connection.period.second", value="1") +class ItConnectionHeartbeatTest extends CliCommandTestInitializedIntegrationBase { + + @Inject + Session session; + + @Inject + EventFactory eventFactory; + + @Value("${cli.check.connection.period.second}") + private long CLI_CHECK_CONNECTION_PERIOD_SECONDS; + + private final AtomicInteger connectionLost = new AtomicInteger(0); + private final AtomicInteger connectionRestored = new AtomicInteger(0); + + @BeforeEach + void setUp() { + //ToDo: Set connection check timeout to 1 sec to make test fast + connectionLost.set(0); + connectionRestored.set(0); + EventListener eventListener = (eventType, event) -> { + if (EventType.CONNECTION_LOST == eventType) { + connectionLost.incrementAndGet(); + } else if (EventType.CONNECTION_RESTORED == eventType) { + connectionRestored.incrementAndGet(); + } + }; + + //Register listeners + eventFactory.listen(EventType.CONNECTION_LOST, eventListener); + eventFactory.listen(EventType.CONNECTION_RESTORED, eventListener); + } + + @Override + protected Class<?> getCommandClass() { + return TopLevelCliReplCommand.class; + } + + @Test + @DisplayName("Should send event CONNECTION_RESTORED on connection start") + void connectionEstablished() { + // Given null session info before connect + assertNull(session.info()); + + // When connect without parameters + execute("connect"); + + // Then + assertAll( + this::assertErrOutputIsEmpty, + () -> assertOutputContains("Connected to http://localhost:10300") + ); + + //Listener was invoked + await().timeout(CLI_CHECK_CONNECTION_PERIOD_SECONDS * 2, TimeUnit.SECONDS).until(() -> connectionRestored.get() == 1); + assertEquals(0, connectionLost.get()); + } + + @Test + @DisplayName("Should send event CONNECTION_LOST on cluster stop") + void onConnectionLost() { + // Given connected cli + execute("connect"); + + // Then + assertAll( + this::assertErrOutputIsEmpty, + () -> assertOutputContains("Connected to http://localhost:10300") + ); + + // When stop node + String nodeName = session.info().nodeName(); + this.stopNode(nodeName); + + //Listener was invoked Review Comment: ```suggestion // Listener was invoked ``` ########## modules/cli/src/main/java/org/apache/ignite/internal/cli/call/connect/ConnectCall.java: ########## @@ -56,15 +61,18 @@ public class ConnectCall implements Call<UrlCallInput, String> { private final JdbcUrlFactory jdbcUrlFactory; + private final EventFactory eventFactory; + /** * Constructor. */ public ConnectCall(Session session, StateConfigProvider stateConfigProvider, ApiClientFactory clientFactory, - JdbcUrlFactory jdbcUrlFactory) { + JdbcUrlFactory jdbcUrlFactory, ConnectionHeartBeat connectionHeartBeat, EventFactory eventFactory) { Review Comment: ```suggestion JdbcUrlFactory jdbcUrlFactory, EventFactory eventFactory) { ``` ########## modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/ItConnectionHeartbeatTest.java: ########## @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.cli.commands; + +import static org.awaitility.Awaitility.await; +import static org.junit.jupiter.api.Assertions.assertAll; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +import io.micronaut.context.annotation.Property; +import io.micronaut.context.annotation.Value; +import jakarta.inject.Inject; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.internal.cli.core.repl.Session; +import org.apache.ignite.internal.cli.event.EventFactory; +import org.apache.ignite.internal.cli.event.EventListener; +import org.apache.ignite.internal.cli.event.EventType; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +@Property(name="cli.check.connection.period.second", value="1") +class ItConnectionHeartbeatTest extends CliCommandTestInitializedIntegrationBase { + + @Inject + Session session; + + @Inject + EventFactory eventFactory; + + @Value("${cli.check.connection.period.second}") + private long CLI_CHECK_CONNECTION_PERIOD_SECONDS; + + private final AtomicInteger connectionLost = new AtomicInteger(0); + private final AtomicInteger connectionRestored = new AtomicInteger(0); + + @BeforeEach + void setUp() { + //ToDo: Set connection check timeout to 1 sec to make test fast + connectionLost.set(0); + connectionRestored.set(0); + EventListener eventListener = (eventType, event) -> { + if (EventType.CONNECTION_LOST == eventType) { + connectionLost.incrementAndGet(); + } else if (EventType.CONNECTION_RESTORED == eventType) { + connectionRestored.incrementAndGet(); + } + }; + + //Register listeners + eventFactory.listen(EventType.CONNECTION_LOST, eventListener); + eventFactory.listen(EventType.CONNECTION_RESTORED, eventListener); + } + + @Override + protected Class<?> getCommandClass() { + return TopLevelCliReplCommand.class; + } + + @Test + @DisplayName("Should send event CONNECTION_RESTORED on connection start") + void connectionEstablished() { + // Given null session info before connect + assertNull(session.info()); + + // When connect without parameters + execute("connect"); + + // Then + assertAll( + this::assertErrOutputIsEmpty, + () -> assertOutputContains("Connected to http://localhost:10300") + ); + + //Listener was invoked + await().timeout(CLI_CHECK_CONNECTION_PERIOD_SECONDS * 2, TimeUnit.SECONDS).until(() -> connectionRestored.get() == 1); + assertEquals(0, connectionLost.get()); + } + + @Test + @DisplayName("Should send event CONNECTION_LOST on cluster stop") + void onConnectionLost() { + // Given connected cli + execute("connect"); + + // Then + assertAll( + this::assertErrOutputIsEmpty, + () -> assertOutputContains("Connected to http://localhost:10300") + ); + + // When stop node + String nodeName = session.info().nodeName(); + this.stopNode(nodeName); + + //Listener was invoked + await().timeout(CLI_CHECK_CONNECTION_PERIOD_SECONDS * 2, TimeUnit.SECONDS).until(() -> connectionRestored.get() == 1); + await().timeout(CLI_CHECK_CONNECTION_PERIOD_SECONDS * 2, TimeUnit.SECONDS).until(() -> connectionLost.get() == 1); + + //Tear down + this.startNode(nodeName); + } + + @Test + @DisplayName("Should send event CONNECTION_LOST on cluster stop") + void restoreConnectionAfterConnectionLost() { + // Given connected cli + execute("connect"); + + // Then + assertAll( + this::assertErrOutputIsEmpty, + () -> assertOutputContains("Connected to http://localhost:10300") + ); + + // When stop node + String nodeName = session.info().nodeName(); + this.stopNode(nodeName); + + //Then + await().timeout(CLI_CHECK_CONNECTION_PERIOD_SECONDS * 2, TimeUnit.SECONDS).until(() -> connectionRestored.get() == 1); + await().timeout(CLI_CHECK_CONNECTION_PERIOD_SECONDS * 2, TimeUnit.SECONDS).until(() -> connectionLost.get() == 1); + + // When + this.startNode(nodeName); + + //Then + await().timeout(CLI_CHECK_CONNECTION_PERIOD_SECONDS * 2, TimeUnit.SECONDS).until(() -> connectionRestored.get() == 2); + await().timeout(CLI_CHECK_CONNECTION_PERIOD_SECONDS * 2, TimeUnit.SECONDS).until(() -> connectionLost.get() == 1); Review Comment: We've already waited for this condition. ########## modules/cli/src/main/java/org/apache/ignite/internal/cli/call/connect/DisconnectCall.java: ########## @@ -17,35 +17,40 @@ package org.apache.ignite.internal.cli.call.connect; -import jakarta.inject.Inject; import jakarta.inject.Singleton; import org.apache.ignite.internal.cli.core.call.Call; import org.apache.ignite.internal.cli.core.call.CallOutput; import org.apache.ignite.internal.cli.core.call.DefaultCallOutput; import org.apache.ignite.internal.cli.core.call.EmptyCallInput; +import org.apache.ignite.internal.cli.core.repl.ConnectionHeartBeat; import org.apache.ignite.internal.cli.core.repl.Session; +import org.apache.ignite.internal.cli.core.repl.SessionDisconnectEvent; import org.apache.ignite.internal.cli.core.repl.SessionInfo; import org.apache.ignite.internal.cli.core.style.component.MessageUiComponent; import org.apache.ignite.internal.cli.core.style.element.UiElements; +import org.apache.ignite.internal.cli.event.EventFactory; +import org.apache.ignite.internal.cli.event.EventType; /** * Call for disconnect. */ @Singleton public class DisconnectCall implements Call<EmptyCallInput, String> { - @Inject private final Session session; - public DisconnectCall(Session session) { + private final EventFactory eventFactory; + + public DisconnectCall(Session session, ConnectionHeartBeat connectionHeartBeat, EventFactory eventFactory) { Review Comment: ```suggestion public DisconnectCall(Session session, EventFactory eventFactory) { ``` ########## modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/ConnectionHeartBeat.java: ########## @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.cli.core.repl; + +import io.micronaut.context.annotation.Value; +import jakarta.inject.Singleton; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import javax.annotation.Nullable; +import org.apache.ignite.internal.cli.core.rest.ApiClientFactory; +import org.apache.ignite.internal.cli.event.Event; +import org.apache.ignite.internal.cli.event.EventFactory; +import org.apache.ignite.internal.cli.event.EventListener; +import org.apache.ignite.internal.cli.event.EventType; +import org.apache.ignite.internal.cli.logger.CliLoggers; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.thread.NamedThreadFactory; +import org.apache.ignite.rest.client.api.NodeManagementApi; +import org.apache.ignite.rest.client.invoker.ApiException; + +/** + * Connection to node heart beat. + */ +@Singleton +public class ConnectionHeartBeat implements EventListener { + + private static final IgniteLogger log = CliLoggers.forClass(ConnectionHeartBeat.class); + + /** CLI check connection period period. */ + private final long cliCheckConnectionPeriodSecond; + + /** Scheduled executor for connection heartbeat. */ + @Nullable + private ScheduledExecutorService scheduledConnectionHeartbeatExecutor; + + private final ApiClientFactory clientFactory; + + private final EventFactory eventFactory; + + private final AtomicBoolean connected = new AtomicBoolean(false); + + /** + * Created instance of connection heartbeat. + * + * @param clientFactory api client factory. + * @param eventFactory event factory. + */ + public ConnectionHeartBeat(@Value("${cli.check.connection.period.second:5}") long cliCheckConnectionPeriodSecond, + ApiClientFactory clientFactory, + EventFactory eventFactory) { + this.clientFactory = clientFactory; + this.eventFactory = eventFactory; + this.cliCheckConnectionPeriodSecond = cliCheckConnectionPeriodSecond; + } + + /** + * Starts connection heartbeat. By default connection will be checked every 5 sec. + * + * @param sessionInfo session info with node url + */ + private void onConnect(SessionInfo sessionInfo) { + //eventFactory.fireEvent(EventType.CONNECTION_RESTORED, new ConnectionStatusEvent()); Review Comment: commented code ########## modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/ConnectionHeartBeat.java: ########## @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.cli.core.repl; + +import io.micronaut.context.annotation.Value; +import jakarta.inject.Singleton; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import javax.annotation.Nullable; +import org.apache.ignite.internal.cli.core.rest.ApiClientFactory; +import org.apache.ignite.internal.cli.event.Event; +import org.apache.ignite.internal.cli.event.EventFactory; +import org.apache.ignite.internal.cli.event.EventListener; +import org.apache.ignite.internal.cli.event.EventType; +import org.apache.ignite.internal.cli.logger.CliLoggers; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.thread.NamedThreadFactory; +import org.apache.ignite.rest.client.api.NodeManagementApi; +import org.apache.ignite.rest.client.invoker.ApiException; + +/** + * Connection to node heart beat. + */ +@Singleton +public class ConnectionHeartBeat implements EventListener { + + private static final IgniteLogger log = CliLoggers.forClass(ConnectionHeartBeat.class); + + /** CLI check connection period period. */ + private final long cliCheckConnectionPeriodSecond; + + /** Scheduled executor for connection heartbeat. */ + @Nullable + private ScheduledExecutorService scheduledConnectionHeartbeatExecutor; + + private final ApiClientFactory clientFactory; + + private final EventFactory eventFactory; + + private final AtomicBoolean connected = new AtomicBoolean(false); + + /** + * Created instance of connection heartbeat. + * + * @param clientFactory api client factory. + * @param eventFactory event factory. + */ + public ConnectionHeartBeat(@Value("${cli.check.connection.period.second:5}") long cliCheckConnectionPeriodSecond, + ApiClientFactory clientFactory, + EventFactory eventFactory) { + this.clientFactory = clientFactory; + this.eventFactory = eventFactory; + this.cliCheckConnectionPeriodSecond = cliCheckConnectionPeriodSecond; + } + + /** + * Starts connection heartbeat. By default connection will be checked every 5 sec. + * + * @param sessionInfo session info with node url + */ + private void onConnect(SessionInfo sessionInfo) { + //eventFactory.fireEvent(EventType.CONNECTION_RESTORED, new ConnectionStatusEvent()); + + if (scheduledConnectionHeartbeatExecutor == null) { + scheduledConnectionHeartbeatExecutor = + Executors.newScheduledThreadPool(1, new NamedThreadFactory("cli-check-connection-thread", log)); + + //Start connection heart beat Review Comment: ```suggestion // Start connection heart beat. ``` ########## modules/cli/src/main/java/org/apache/ignite/internal/cli/event/Event.java: ########## @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.cli.event; + +/** + * The event cas which is produced by event producer component. + */ +public interface Event { +} Review Comment: missed eof ########## modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/ReplBuilder.java: ########## @@ -152,4 +155,15 @@ public ReplBuilder withAutosuggestionsWidgets() { this.autosuggestionsWidgetsEnabled = true; return this; } + + /** + * Builder setter of {@code aliases} field. + * + * @param eventSubscriber ???? map of aliases for commands. Review Comment: ```suggestion * @param eventSubscriber map of aliases for commands. ``` ########## modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/PeriodicSessionTaskExecutor.java: ########## @@ -24,35 +24,48 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import org.apache.ignite.internal.cli.event.Event; +import org.apache.ignite.internal.cli.event.EventListener; +import org.apache.ignite.internal.cli.event.EventType; import org.apache.ignite.internal.cli.logger.CliLoggers; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.thread.NamedThreadFactory; import org.jetbrains.annotations.Nullable; /** Executes tasks periodically while the session is connected. */ @Singleton -public class PeriodicSessionTaskExecutor implements AsyncSessionEventListener { +public class PeriodicSessionTaskExecutor implements EventListener { private static final IgniteLogger LOG = CliLoggers.forClass(PeriodicSessionTaskExecutor.class); @Nullable private ScheduledExecutorService executor; private final List<? extends PeriodicSessionTask> tasks; + //private final EventFactory eventFactory; Review Comment: commented ########## modules/cli/src/main/java/org/apache/ignite/internal/cli/event/EventFactory.java: ########## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.cli.event; + +import jakarta.inject.Singleton; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import org.apache.ignite.internal.cli.logger.CliLoggers; +import org.apache.ignite.internal.logger.IgniteLogger; + +/** + * Register listeners and produces events. + */ +@Singleton +public class EventFactory { + + private static final IgniteLogger log = CliLoggers.forClass(EventFactory.class); + + /** All listeners. */ + private final ConcurrentHashMap<EventType, List<EventListener>> listeners = new ConcurrentHashMap<>(); + + public EventFactory() { + } + + /** + * Registers an event listener. + * + * @param eventType type of event to listen. + * @param eventListener event listener. + */ + public void listen(EventType eventType, EventListener eventListener) { + listeners.computeIfAbsent(eventType, evtKey -> new CopyOnWriteArrayList<>()).add(eventListener); + } + + /** + * Removes a listener associated with the event type. + * + * @param eventType type of event to listen. + * @param eventListener event listener. + */ + public void removeListener(EventType eventType, EventListener eventListener) { + listeners.computeIfAbsent(eventType, eventT -> new CopyOnWriteArrayList<>()).remove(eventListener); Review Comment: Use`computeIfPresent` here ########## modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/ReplBuilder.java: ########## @@ -152,4 +155,15 @@ public ReplBuilder withAutosuggestionsWidgets() { this.autosuggestionsWidgetsEnabled = true; return this; } + + /** + * Builder setter of {@code aliases} field. + * + * @param eventSubscriber ???? map of aliases for commands. Review Comment: the whole java doc here looks like it came from another file. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
