rpuch commented on code in PR #1240:
URL: https://github.com/apache/ignite-3/pull/1240#discussion_r1005276859


##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java:
##########
@@ -0,0 +1,711 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft;
+
+import static java.lang.System.currentTimeMillis;
+import static java.util.concurrent.ThreadLocalRandom.current;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.AddLearnersRequest;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.AddPeerRequest;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.AddPeerResponse;
+import static 
org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersAsyncRequest;
+import static 
org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersAsyncResponse;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersRequest;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersResponse;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.GetLeaderRequest;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.GetLeaderResponse;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.GetPeersRequest;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.GetPeersResponse;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.LearnersOpResponse;
+import static 
org.apache.ignite.raft.jraft.rpc.CliRequests.RemoveLearnersRequest;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.RemovePeerRequest;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.RemovePeerResponse;
+import static 
org.apache.ignite.raft.jraft.rpc.CliRequests.ResetLearnersRequest;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.SnapshotRequest;
+import static 
org.apache.ignite.raft.jraft.rpc.CliRequests.TransferLeaderRequest;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.raft.client.Command;
+import org.apache.ignite.raft.client.Peer;
+import org.apache.ignite.raft.client.service.RaftGroupService;
+import org.apache.ignite.raft.jraft.RaftMessagesFactory;
+import org.apache.ignite.raft.jraft.entity.PeerId;
+import org.apache.ignite.raft.jraft.error.RaftError;
+import org.apache.ignite.raft.jraft.rpc.ActionRequest;
+import org.apache.ignite.raft.jraft.rpc.ActionResponse;
+import org.apache.ignite.raft.jraft.rpc.RpcRequests;
+import org.apache.ignite.raft.jraft.rpc.RpcRequests.ErrorResponse;
+import org.apache.ignite.raft.jraft.rpc.RpcRequests.SMErrorResponse;
+import org.apache.ignite.raft.jraft.rpc.impl.RaftException;
+import org.apache.ignite.raft.jraft.rpc.impl.SMCompactedThrowable;
+import org.apache.ignite.raft.jraft.rpc.impl.SMFullThrowable;
+import org.apache.ignite.raft.jraft.rpc.impl.SMThrowable;
+import org.apache.ignite.raft.jraft.util.Endpoint;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * The implementation of {@link RaftGroupService}.
+ */
+public class RaftGroupServiceImpl implements RaftGroupService {
+    /** The logger. */
+    private static final IgniteLogger LOG = 
Loggers.forClass(RaftGroupServiceImpl.class);
+
+    private volatile long timeout;
+
+    /** Timeout for network calls. */
+    private final long rpcTimeout;
+
+    private final String groupId;
+
+    private final ReplicationGroupId realGroupId;
+
+    private final RaftMessagesFactory factory;
+
+    @Nullable
+    private volatile Peer leader;
+
+    private volatile List<Peer> peers;
+
+    private volatile List<Peer> learners;
+
+    private final ClusterService cluster;
+
+    private final long retryDelay;
+
+    /** Executor for scheduling retries of {@link 
RaftGroupServiceImpl#sendWithRetry} invocations. */
+    private final ScheduledExecutorService executor;
+
+    /**
+     * Constructor.
+     *
+     * @param groupId Group id.
+     * @param cluster A cluster.
+     * @param factory A message factory.
+     * @param timeout Request timeout.
+     * @param peers Peers list.

Review Comment:
   I think 'Initial' was important here. Can we restore it? Same thing about  
learners list.



##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java:
##########
@@ -0,0 +1,711 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft;
+
+import static java.lang.System.currentTimeMillis;
+import static java.util.concurrent.ThreadLocalRandom.current;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.AddLearnersRequest;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.AddPeerRequest;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.AddPeerResponse;
+import static 
org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersAsyncRequest;
+import static 
org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersAsyncResponse;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersRequest;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersResponse;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.GetLeaderRequest;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.GetLeaderResponse;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.GetPeersRequest;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.GetPeersResponse;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.LearnersOpResponse;
+import static 
org.apache.ignite.raft.jraft.rpc.CliRequests.RemoveLearnersRequest;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.RemovePeerRequest;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.RemovePeerResponse;
+import static 
org.apache.ignite.raft.jraft.rpc.CliRequests.ResetLearnersRequest;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.SnapshotRequest;
+import static 
org.apache.ignite.raft.jraft.rpc.CliRequests.TransferLeaderRequest;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.raft.client.Command;
+import org.apache.ignite.raft.client.Peer;
+import org.apache.ignite.raft.client.service.RaftGroupService;
+import org.apache.ignite.raft.jraft.RaftMessagesFactory;
+import org.apache.ignite.raft.jraft.entity.PeerId;
+import org.apache.ignite.raft.jraft.error.RaftError;
+import org.apache.ignite.raft.jraft.rpc.ActionRequest;
+import org.apache.ignite.raft.jraft.rpc.ActionResponse;
+import org.apache.ignite.raft.jraft.rpc.RpcRequests;
+import org.apache.ignite.raft.jraft.rpc.RpcRequests.ErrorResponse;
+import org.apache.ignite.raft.jraft.rpc.RpcRequests.SMErrorResponse;
+import org.apache.ignite.raft.jraft.rpc.impl.RaftException;
+import org.apache.ignite.raft.jraft.rpc.impl.SMCompactedThrowable;
+import org.apache.ignite.raft.jraft.rpc.impl.SMFullThrowable;
+import org.apache.ignite.raft.jraft.rpc.impl.SMThrowable;
+import org.apache.ignite.raft.jraft.util.Endpoint;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * The implementation of {@link RaftGroupService}.
+ */
+public class RaftGroupServiceImpl implements RaftGroupService {
+    /** The logger. */
+    private static final IgniteLogger LOG = 
Loggers.forClass(RaftGroupServiceImpl.class);
+
+    private volatile long timeout;
+
+    /** Timeout for network calls. */
+    private final long rpcTimeout;
+
+    private final String groupId;
+
+    private final ReplicationGroupId realGroupId;
+
+    private final RaftMessagesFactory factory;
+
+    @Nullable
+    private volatile Peer leader;
+
+    private volatile List<Peer> peers;
+
+    private volatile List<Peer> learners;
+
+    private final ClusterService cluster;
+
+    private final long retryDelay;
+
+    /** Executor for scheduling retries of {@link 
RaftGroupServiceImpl#sendWithRetry} invocations. */
+    private final ScheduledExecutorService executor;
+
+    /**
+     * Constructor.
+     *
+     * @param groupId Group id.
+     * @param cluster A cluster.
+     * @param factory A message factory.
+     * @param timeout Request timeout.
+     * @param peers Peers list.
+     * @param learners Learners list.
+     * @param leader Group leader.
+     * @param retryDelay Retry delay.
+     * @param executor Executor for retrying requests.
+     */
+    private RaftGroupServiceImpl(
+            ReplicationGroupId groupId,
+            ClusterService cluster,
+            RaftMessagesFactory factory,
+            int timeout,
+            int rpcTimeout,
+            List<Peer> peers,
+            List<Peer> learners,
+            @Nullable Peer leader,
+            long retryDelay,
+            ScheduledExecutorService executor
+    ) {
+        this.cluster = cluster;
+        this.peers = List.copyOf(peers);
+        this.learners = List.copyOf(learners);
+        this.factory = factory;
+        this.timeout = timeout;
+        this.rpcTimeout = rpcTimeout;
+        this.groupId = groupId.toString();
+        this.realGroupId = groupId;
+        this.retryDelay = retryDelay;
+        this.leader = leader;
+        this.executor = executor;
+    }
+
+    /**
+     * Starts raft group service.
+     *
+     * @param groupId Raft group id.
+     * @param cluster Cluster service.
+     * @param factory Message factory.
+     * @param timeout Timeout.
+     * @param rpcTimeout Network call timeout.
+     * @param peers Peers list.
+     * @param learners Learners list.
+     * @param getLeader {@code True} to get the group's leader upon service 
creation.
+     * @param retryDelay Retry delay.
+     * @param executor Executor for retrying requests.
+     * @return Future representing pending completion of the operation.
+     */
+    public static CompletableFuture<RaftGroupService> start(
+            ReplicationGroupId groupId,
+            ClusterService cluster,
+            RaftMessagesFactory factory,
+            int timeout,
+            int rpcTimeout,
+            List<Peer> peers,
+            List<Peer> learners,
+            boolean getLeader,
+            long retryDelay,
+            ScheduledExecutorService executor
+    ) {
+        var service = new RaftGroupServiceImpl(groupId, cluster, factory, 
timeout, rpcTimeout, peers, learners, null, retryDelay, executor);
+
+        if (!getLeader) {
+            return CompletableFuture.completedFuture(service);
+        }
+
+        return service.refreshLeader().handle((unused, throwable) -> {
+            if (throwable != null) {
+                if (throwable.getCause() instanceof TimeoutException) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Failed to refresh a leader [groupId={}]", 
groupId);
+                    }
+                } else {
+                    if (LOG.isWarnEnabled()) {
+                        LOG.warn("Failed to refresh a leader [groupId={}]", 
throwable, groupId);
+                    }
+                }
+            }
+
+            return service;
+        });
+    }
+
+    /**
+     * Starts raft group service.
+     *
+     * @param groupId Raft group id.
+     * @param cluster Cluster service.
+     * @param factory Message factory.
+     * @param timeout Timeout.
+     * @param peers List of all peers.
+     * @param getLeader {@code True} to get the group's leader upon service 
creation.
+     * @param retryDelay Retry delay.
+     * @param executor Executor for retrying requests.
+     * @return Future representing pending completion of the operation.
+     */
+    public static CompletableFuture<RaftGroupService> start(
+            ReplicationGroupId groupId,
+            ClusterService cluster,
+            RaftMessagesFactory factory,
+            int timeout,
+            List<Peer> peers,
+            boolean getLeader,
+            long retryDelay,
+            ScheduledExecutorService executor
+    ) {
+        return start(groupId, cluster, factory, timeout, timeout, peers, 
List.of(), getLeader, retryDelay, executor);
+    }
+
+    @Override
+    public ReplicationGroupId groupId() {
+        return realGroupId;
+    }
+
+    @Override
+    public long timeout() {
+        return timeout;
+    }
+
+    @Override
+    public void timeout(long newTimeout) {
+        this.timeout = newTimeout;
+    }
+
+    @Override
+    public Peer leader() {
+        return leader;
+    }
+
+    @Override
+    public List<Peer> peers() {
+        return peers;
+    }
+
+    @Override
+    public List<Peer> learners() {
+        return learners;
+    }
+
+    @Override
+    public CompletableFuture<Void> refreshLeader() {
+        GetLeaderRequest req = 
factory.getLeaderRequest().groupId(groupId).build();
+
+        return this.<GetLeaderResponse>sendWithRetry(randomNode(), req)
+                .thenAccept(resp -> this.leader = parsePeer(resp.leaderId()));
+    }
+
+    @Override
+    public CompletableFuture<IgniteBiTuple<Peer, Long>> 
refreshAndGetLeaderWithTerm() {
+        GetLeaderRequest req = 
factory.getLeaderRequest().groupId(groupId).build();
+
+        return this.<GetLeaderResponse>sendWithRetry(randomNode(), req)
+                .thenApply(resp -> {
+                    Peer respLeader = parsePeer(resp.leaderId());
+
+                    this.leader = respLeader;
+
+                    return new IgniteBiTuple<>(respLeader, resp.currentTerm());
+                });
+    }
+
+    @Override
+    public CompletableFuture<Void> refreshMembers(boolean onlyAlive) {
+        Peer leader = this.leader;
+
+        if (leader == null) {
+            return refreshLeader().thenCompose(res -> 
refreshMembers(onlyAlive));
+        }
+
+        GetPeersRequest req = 
factory.getPeersRequest().onlyAlive(onlyAlive).groupId(groupId).build();
+
+        return this.<GetPeersResponse>sendWithRetry(leader, req)
+                .thenAccept(resp -> {
+                    this.peers = parsePeerList(resp.peersList());
+                    this.learners = parsePeerList(resp.learnersList());
+                });
+    }
+
+    @Override
+    public CompletableFuture<Void> addPeer(Peer peer) {
+        Peer leader = this.leader;
+
+        if (leader == null) {
+            return refreshLeader().thenCompose(res -> addPeer(peer));
+        }
+
+        AddPeerRequest req = 
factory.addPeerRequest().groupId(groupId).peerId(peerId(peer)).build();
+
+        return this.<AddPeerResponse>sendWithRetry(leader, req)
+                .thenAccept(resp -> this.peers = 
parsePeerList(resp.newPeersList()));
+    }
+
+    @Override
+    public CompletableFuture<Void> removePeer(Peer peer) {
+        Peer leader = this.leader;
+
+        if (leader == null) {
+            return refreshLeader().thenCompose(res -> removePeer(peer));
+        }
+
+        RemovePeerRequest req = 
factory.removePeerRequest().groupId(groupId).peerId(peerId(peer)).build();
+
+        return this.<RemovePeerResponse>sendWithRetry(leader, req)
+                .thenAccept(resp -> this.peers = 
parsePeerList(resp.newPeersList()));
+    }
+
+    @Override
+    public CompletableFuture<Void> changePeers(List<Peer> peers) {
+        Peer leader = this.leader;
+
+        if (leader == null) {
+            return refreshLeader().thenCompose(res -> changePeers(peers));
+        }
+
+        ChangePeersRequest req = 
factory.changePeersRequest().groupId(groupId).newPeersList(peerIds(peers)).build();
+
+        return this.<ChangePeersResponse>sendWithRetry(leader, req)
+                .thenAccept(resp -> this.peers = 
parsePeerList(resp.newPeersList()));
+    }
+
+    @Override
+    public CompletableFuture<Void> changePeersAsync(List<Peer> peers, long 
term) {
+        Peer leader = this.leader;
+
+        if (leader == null) {
+            return refreshLeader().thenCompose(res -> changePeersAsync(peers, 
term));
+        }
+
+        ChangePeersAsyncRequest req = factory.changePeersAsyncRequest()
+                .groupId(groupId)
+                .term(term)
+                .newPeersList(peerIds(peers))
+                .build();
+
+        LOG.info("Sending changePeersAsync request for group={} to peers={} 
with leader term={}",
+                groupId, peers, term);
+
+        return this.<ChangePeersAsyncResponse>sendWithRetry(leader, req)
+                .thenAccept(resp -> {
+                    // We expect that all raft related errors will be handled 
by sendWithRetry, means that
+                    // such responses will initiate a retrying of the original 
request.
+                    assert !(resp instanceof RpcRequests.ErrorResponse);
+                });
+    }
+
+    @Override
+    public CompletableFuture<Void> addLearners(List<Peer> learners) {
+        Peer leader = this.leader;
+
+        if (leader == null) {
+            return refreshLeader().thenCompose(res -> addLearners(learners));
+        }
+
+        AddLearnersRequest req = 
factory.addLearnersRequest().groupId(groupId).learnersList(peerIds(learners)).build();
+
+        return this.<LearnersOpResponse>sendWithRetry(leader, req)
+                .thenAccept(resp -> this.learners = 
parsePeerList(resp.newLearnersList()));
+    }
+
+    @Override
+    public CompletableFuture<Void> removeLearners(List<Peer> learners) {
+        Peer leader = this.leader;
+
+        if (leader == null) {
+            return refreshLeader().thenCompose(res -> 
removeLearners(learners));
+        }
+
+        RemoveLearnersRequest req = 
factory.removeLearnersRequest().groupId(groupId).learnersList(peerIds(learners)).build();
+
+        return this.<LearnersOpResponse>sendWithRetry(leader, req)
+                .thenAccept(resp -> this.learners = 
parsePeerList(resp.newLearnersList()));
+    }
+
+    @Override
+    public CompletableFuture<Void> resetLearners(List<Peer> learners) {
+        Peer leader = this.leader;
+
+        if (leader == null) {
+            return refreshLeader().thenCompose(res -> resetLearners(learners));
+        }
+
+        ResetLearnersRequest req = 
factory.resetLearnersRequest().groupId(groupId).learnersList(peerIds(learners)).build();
+
+        return this.<LearnersOpResponse>sendWithRetry(leader, req)
+                .thenAccept(resp -> this.learners = 
parsePeerList(resp.newLearnersList()));
+    }
+
+    @Override
+    public CompletableFuture<Void> snapshot(Peer peer) {
+        SnapshotRequest req = 
factory.snapshotRequest().groupId(groupId).build();
+
+        // Disable the timeout for a snapshot request.
+        return cluster.messagingService().invoke(peer.address(), req, 
Integer.MAX_VALUE)
+                .thenAccept(resp -> {
+                    if (resp != null) {
+                        RpcRequests.ErrorResponse resp0 = 
(RpcRequests.ErrorResponse) resp;
+
+                        if (resp0.errorCode() != 
RaftError.SUCCESS.getNumber()) {
+                            var ex = new 
RaftException(RaftError.forNumber(resp0.errorCode()), resp0.errorMsg());
+
+                            throw new CompletionException(ex);
+                        }
+                    }
+                });
+    }
+
+    @Override
+    public CompletableFuture<Void> transferLeadership(Peer newLeader) {
+        Peer leader = this.leader;
+
+        if (leader == null) {
+            return refreshLeader().thenCompose(res -> 
transferLeadership(newLeader));
+        }
+
+        TransferLeaderRequest req = factory.transferLeaderRequest()
+                .groupId(groupId)
+                .leaderId(peerId(leader))
+                .peerId(peerId(newLeader))
+                .build();
+
+        return sendWithRetry(leader, req)
+                .thenRun(() -> this.leader = newLeader);
+    }
+
+    @Override
+    public <R> CompletableFuture<R> run(Command cmd) {
+        Peer leader = this.leader;
+
+        if (leader == null) {
+            return refreshLeader().thenCompose(res -> run(cmd));
+        }
+
+        ActionRequest req = 
factory.actionRequest().command(cmd).groupId(groupId).readOnlySafe(true).build();
+
+        return this.<ActionResponse>sendWithRetry(leader, req)
+                .thenApply(resp -> (R) resp.result());
+    }
+
+    @Override
+    public void shutdown() {
+        // No-op.
+    }
+
+    @Override
+    public ClusterService clusterService() {
+        return cluster;
+    }
+
+    private <R extends NetworkMessage> CompletableFuture<R> sendWithRetry(Peer 
peer, NetworkMessage req) {
+        var future = new CompletableFuture<R>();
+
+        sendWithRetry(peer, req, currentTimeMillis() + timeout, future);
+
+        return future;
+    }
+
+    /**
+     * Retries a request until success or timeout.
+     *
+     * @param peer Target peer.
+     * @param req The request.
+     * @param stopTime Stop time.
+     * @param fut The future.
+     * @param <R> Response type.
+     */
+    private <R extends NetworkMessage> void sendWithRetry(Peer peer, 
NetworkMessage req, long stopTime, CompletableFuture<R> fut) {
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("sendWithRetry peers={} req={} from={} to={}",
+                    peers,
+                    S.toString(req),
+                    cluster.topologyService().localMember().address(),
+                    peer.address());
+        }
+
+        if (currentTimeMillis() >= stopTime) {
+            fut.completeExceptionally(new TimeoutException());
+
+            return;
+        }
+
+        //TODO: IGNITE-15389 
org.apache.ignite.internal.metastorage.client.CursorImpl has potential deadlock 
inside
+        cluster.messagingService().invoke(peer.address(), req, rpcTimeout)
+                .whenCompleteAsync((resp, err) -> {
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace("sendWithRetry resp={} from={} to={} err={}",
+                                S.toString(resp),
+                                
cluster.topologyService().localMember().address(),
+                                peer.address(),
+                                err == null ? null : err.getMessage());
+                    }
+
+                    if (err != null) {
+                        handleThrowable(err, peer, req, stopTime, fut);
+                    } else if (resp instanceof ErrorResponse) {
+                        handleErrorResponse((ErrorResponse) resp, peer, req, 
stopTime, fut);
+                    } else if (resp instanceof SMErrorResponse) {
+                        handleSmErrorResponse((SMErrorResponse) resp, fut);
+                    } else {
+                        leader = peer; // The OK response was received from a 
leader.
+
+                        fut.complete((R) resp);
+                    }
+                });
+    }
+
+    private void handleThrowable(
+            Throwable err, Peer peer, NetworkMessage req, long stopTime, 
CompletableFuture<? extends NetworkMessage> fut
+    ) {
+        if (recoverable(err)) {
+            LOG.warn(
+                    "Recoverable error during the request type={} occurred 
(will be retried on the randomly selected node): ",
+                    err, req.getClass().getSimpleName()
+            );
+
+            scheduleRetry(() -> sendWithRetry(randomNode(peer), req, stopTime, 
fut));
+        } else {
+            fut.completeExceptionally(err);
+        }
+    }
+
+    private void handleErrorResponse(
+            ErrorResponse resp, Peer peer, NetworkMessage req, long stopTime, 
CompletableFuture<? extends NetworkMessage> fut
+    ) {
+        RaftError error = RaftError.forNumber(resp.errorCode());
+
+        switch (error) {
+            case SUCCESS:
+                leader = peer; // The OK response was received from a leader.
+
+                fut.complete(null); // Void response.
+
+                break;
+
+            case EBUSY:
+            case EAGAIN:
+                scheduleRetry(() -> sendWithRetry(peer, req, stopTime, fut));
+
+                break;
+
+            case ENOENT:
+                scheduleRetry(() -> {
+                    // If changing peers or requesting a leader and something 
is not found
+                    // probably target peer is doing rebalancing, try another 
peer.
+                    if (req instanceof GetLeaderRequest || req instanceof 
ChangePeersAsyncRequest) {
+                        sendWithRetry(randomNode(peer), req, stopTime, fut);
+                    } else {
+                        sendWithRetry(peer, req, stopTime, fut);
+                    }
+                });
+
+                break;
+
+            case EPERM:
+                // TODO: IGNITE-15706
+            case UNKNOWN:
+            case EINTERNAL:
+                if (resp.leaderId() == null) {
+                    scheduleRetry(() -> sendWithRetry(randomNode(peer), req, 
stopTime, fut));
+                } else {
+                    leader = parsePeer(resp.leaderId()); // Update a leader.
+
+                    scheduleRetry(() -> sendWithRetry(leader, req, stopTime, 
fut));
+                }
+
+                break;
+
+            default:
+                fut.completeExceptionally(new RaftException(error, 
resp.errorMsg()));
+
+                break;
+        }
+    }
+
+    private static void handleSmErrorResponse(SMErrorResponse resp, 
CompletableFuture<? extends NetworkMessage> fut) {
+        SMThrowable th = resp.error();
+
+        if (th instanceof SMCompactedThrowable) {
+            SMCompactedThrowable compactedThrowable = (SMCompactedThrowable) 
th;
+
+            try {
+                Throwable restoredTh = (Throwable) 
Class.forName(compactedThrowable.throwableClassName())
+                        .getConstructor(String.class)
+                        .newInstance(compactedThrowable.throwableMessage());
+
+                fut.completeExceptionally(restoredTh);
+            } catch (Exception e) {
+                LOG.warn("Cannot restore throwable from user's state machine. "
+                        + "Check if throwable " + 
compactedThrowable.throwableClassName()
+                        + " is present in the classpath.");
+
+                fut.completeExceptionally(new 
IgniteException(compactedThrowable.throwableMessage()));
+            }
+        } else if (th instanceof SMFullThrowable) {
+            fut.completeExceptionally(((SMFullThrowable) th).throwable());
+        }
+    }
+
+    private void scheduleRetry(Runnable runnable) {
+        executor.schedule(runnable, retryDelay, TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * Checks if an error is recoverable, for example, {@link 
java.net.ConnectException}.
+     *
+     * @param t The throwable.
+     * @return {@code True} if this is a recoverable exception.
+     */
+    private static boolean recoverable(Throwable t) {
+        if (t instanceof ExecutionException || t instanceof 
CompletionException) {
+            t = t.getCause();
+        }
+
+        return t instanceof TimeoutException || t instanceof IOException;
+    }
+
+    private Peer randomNode() {
+        return randomNode(null);
+    }
+
+    /**
+     * Returns a random peer. Tries 5 times finding a peer different from the 
excluded peer. If excluded peer is null, just returns a random
+     * peer.
+     *
+     * @param excludedPeer Excluded peer.
+     * @return Random peer.
+     */
+    private Peer randomNode(@Nullable Peer excludedPeer) {
+        List<Peer> peers0 = peers;
+
+        assert peers0 != null && !peers0.isEmpty();
+
+        int lastPeerIndex = excludedPeer == null ? -1 : 
peers0.indexOf(excludedPeer);
+
+        ThreadLocalRandom random = current();
+
+        int newIdx = 0;
+
+        for (int retries = 0; retries < 5; retries++) {
+            newIdx = random.nextInt(peers0.size());
+
+            if (newIdx != lastPeerIndex) {
+                break;
+            }
+        }
+
+        return peers0.get(newIdx);
+    }
+
+    /**
+     * Parse {@link Peer} from string representation of {@link PeerId}.
+     *
+     * @param peerId String representation of {@link PeerId}
+     * @return Peer
+     */
+    // TODO: Remove after IGNITE-15506
+    private static @Nullable Peer parsePeer(@Nullable String peerId) {
+        PeerId id = PeerId.parsePeer(peerId);
+
+        if (id == null) {
+            return null;
+        } else {
+            Endpoint endpoint = id.getEndpoint();
+
+            return new Peer(new NetworkAddress(endpoint.getIp(), 
endpoint.getPort()));
+        }
+    }
+
+    /**
+     * Parse list of {@link PeerId} from list with string representations.
+     *
+     * @param peers List of {@link PeerId} string representations.
+     * @return List of {@link PeerId}
+     */
+    private static @Nullable List<Peer> parsePeerList(@Nullable 
Collection<String> peers) {
+        if (peers == null) {
+            return null;
+        }
+
+        List<Peer> res = new ArrayList<>(peers.size());
+
+        for (String peer : peers) {
+            res.add(parsePeer(peer));
+        }
+
+        return res;
+    }
+
+    private static String peerId(Peer peer) {
+        return PeerId.fromPeer(peer).toString();
+    }
+
+    private static List<String> peerIds(Collection<Peer> peers) {
+        return 
peers.stream().map(RaftGroupServiceImpl::peerId).collect(Collectors.toList());

Review Comment:
   Let's import `toList()` statically



##########
modules/raft/src/test/java/org/apache/ignite/internal/raft/RaftGroupServiceTest.java:
##########
@@ -213,36 +188,29 @@ public void testRefreshLeaderElectedAfterDelay() throws 
Exception {
         assertEquals(NODES.get(0), service.leader());
     }
 
-    /**
-     * @throws Exception
-     */
     @Test
     public void testRefreshLeaderWithTimeout() throws Exception {
         mockLeaderRequest(true);
 
         RaftGroupService service =
-            RaftGroupServiceImpl.start(TEST_GRP, cluster, FACTORY, TIMEOUT, 
NODES, false, DELAY, executor).get(3, TimeUnit.SECONDS);
+                RaftGroupServiceImpl.start(TEST_GRP, cluster, FACTORY, 
TIMEOUT, NODES, false, DELAY, executor).get(3, TimeUnit.SECONDS);
 
         try {
             service.refreshLeader().get(500, TimeUnit.MILLISECONDS);

Review Comment:
   This could be changed to `assertThrows(TimeoutException.class, ...)`



##########
modules/raft/src/test/java/org/apache/ignite/internal/raft/RaftGroupServiceTest.java:
##########
@@ -88,25 +85,22 @@ public class RaftGroupServiceTest {
     /** The logger. */
     private static final IgniteLogger LOG = 
Loggers.forClass(RaftGroupServiceTest.class);
 
-    /** */
     private static final List<Peer> NODES = Stream.of(20000, 20001, 20002)
-        .map(port -> new NetworkAddress("localhost", port))
-        .map(Peer::new)
-        .collect(Collectors.toUnmodifiableList());
+            .map(port -> new NetworkAddress("localhost", port))
+            .map(Peer::new)
+            .collect(Collectors.toUnmodifiableList());

Review Comment:
   How about importing `Collectors.*` methods statically?



##########
modules/raft/src/test/java/org/apache/ignite/internal/raft/RaftGroupServiceTest.java:
##########
@@ -315,22 +273,18 @@ public void testUserRequestLeaderNotElected() throws 
Exception {
             service.run(new TestCommand()).get();
 
             fail("Expecting timeout");
-        }
-        catch (ExecutionException e) {
+        } catch (ExecutionException e) {
             assertTrue(e.getCause() instanceof TimeoutException);

Review Comment:
   `assertThat(... instanceOf())`?



##########
modules/raft/src/test/java/org/apache/ignite/internal/raft/RaftGroupServiceTest.java:
##########
@@ -271,37 +236,30 @@ public void testUserRequestLazyInitLeader() throws 
Exception {
         assertEquals(leader, service.leader());
     }
 
-    /**
-     * @throws Exception
-     */
     @Test
     public void testUserRequestWithTimeout() throws Exception {
         mockLeaderRequest(false);
         mockUserInput(true, null);
 
         RaftGroupService service =
-            RaftGroupServiceImpl.start(TEST_GRP, cluster, FACTORY, TIMEOUT, 
NODES, false, DELAY, executor).get(3, TimeUnit.SECONDS);
+                RaftGroupServiceImpl.start(TEST_GRP, cluster, FACTORY, 
TIMEOUT, NODES, false, DELAY, executor).get(3, TimeUnit.SECONDS);
 
         try {
             service.run(new TestCommand()).get(500, TimeUnit.MILLISECONDS);

Review Comment:
   `assertThrows()`?



##########
modules/raft/src/test/java/org/apache/ignite/internal/raft/RaftGroupServiceTest.java:
##########
@@ -171,40 +156,30 @@ public void testRefreshLeaderNotElected() throws 
Exception {
         leader = null;
 
         RaftGroupService service =
-            RaftGroupServiceImpl.start(TEST_GRP, cluster, FACTORY, TIMEOUT, 
NODES, false, DELAY, executor).get(3, TimeUnit.SECONDS);
+                RaftGroupServiceImpl.start(TEST_GRP, cluster, FACTORY, 
TIMEOUT, NODES, false, DELAY, executor).get(3, TimeUnit.SECONDS);
 
         assertNull(service.leader());
 
         try {
             service.refreshLeader().get();
 
             fail("Should fail");
-        }
-        catch (ExecutionException e) {
+        } catch (ExecutionException e) {
             assertTrue(e.getCause() instanceof TimeoutException);

Review Comment:
   How about changing this to `assertThat(e.getCause(), 
is(instanceOf(TimeoutException.class)))`? It will allow us to see the real 
exception type if it does not match our expectation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to