sashapolo commented on a change in pull request #147:
URL: https://github.com/apache/ignite-3/pull/147#discussion_r640817603



##########
File path: 
modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupServiceImpl.java
##########
@@ -287,67 +324,124 @@ public RaftGroupServiceImpl(
         if (leader == null)
             return refreshLeader().thenCompose(res -> run(cmd));
 
-        ActionRequest<R> req = 
factory.actionRequest().command(cmd).groupId(groupId).build();
+        ActionRequest req = 
factory.actionRequest().command(cmd).groupId(groupId).readOnlySafe(true).build();
+
+        CompletableFuture<ActionResponse<R>> fut = new CompletableFuture<>();
 
-        CompletableFuture<ActionResponse<R>> fut = 
sendWithRetry(leader.getNode(), req, currentTimeMillis() + timeout);
+        sendWithRetry(leader, req, currentTimeMillis() + timeout, fut);
 
         return fut.thenApply(resp -> resp.result());
     }
 
-    /** {@inheritDoc} */
+    /**
+     * {@inheritDoc}
+     */
     @Override public <R> CompletableFuture<R> run(Peer peer, ReadCommand cmd) {
-        ActionRequest req = 
factory.actionRequest().command(cmd).groupId(groupId).build();
+        ActionRequest req = 
factory.actionRequest().command(cmd).groupId(groupId).readOnlySafe(false).build();
+
+        CompletableFuture fut = 
cluster.messagingService().invoke(peer.address(), req, timeout);
 
-        CompletableFuture<?> fut = 
cluster.messagingService().invoke(peer.getNode(), req, timeout);
+        return fut.thenApply(resp -> ((ActionResponse) resp).result());
+    }
 
-        return fut.thenApply(resp -> ((ActionResponse<R>) resp).result());
+    /** {@inheritDoc} */
+    @Override public void shutdown() {
+        if (!reuse)
+            cluster.shutdown();
     }
 
-    private <R> CompletableFuture<R> sendWithRetry(ClusterNode node, 
NetworkMessage req, long stopTime) {
-        if (currentTimeMillis() >= stopTime)
-            return CompletableFuture.failedFuture(new TimeoutException());
-        return cluster.messagingService().invoke(node, req, timeout)
-            .thenCompose(resp -> {
-                if (resp instanceof RaftErrorResponse) {
-                    RaftErrorResponse resp0 = (RaftErrorResponse)resp;
-                    switch (resp0.errorCode()) {
-                        case NO_LEADER:
-                            return composeWithDelay(() -> 
sendWithRetry(randomNode(), req, stopTime));
-                        case LEADER_CHANGED:
-                            leader = resp0.newLeader();
-                            return composeWithDelay(() -> 
sendWithRetry(resp0.newLeader().getNode(), req, stopTime));
-                        case SUCCESS:
-                            return CompletableFuture.completedFuture(null);
-                        default:
-                            return CompletableFuture.failedFuture(new 
RaftException(resp0.errorCode()));
+    /**
+     * Retries request until success or timeout.
+     *
+     * @param addr Target address.
+     * @param req The request.
+     * @param stopTime Stop time.
+     * @param fut The future.
+     * @param <R> Response type.
+     */
+    private <R> void sendWithRetry(Peer peer, Object req, long stopTime, 
CompletableFuture<R> fut) {

Review comment:
       Why did you change the previous approach for implementing this method? I 
think it looked more elegant and readable. It is possible to implement the 
current logic in a similar way. Here's what I was able to come with:
   
   ```
   private <R> CompletableFuture<R> sendWithRetry(Peer peer, Object req, long 
stopTime) {
       if (currentTimeMillis() >= stopTime) {
           return CompletableFuture.failedFuture(new TimeoutException());
       }
   
       return cluster.messagingService().invoke(peer.address(), 
(NetworkMessage)req, timeout)
           .handle((resp, err) -> {
               if (err != null) {
                   if (recoverable(err)) {
                       return composeWithDelay(() -> 
this.<R>sendWithRetry(randomNode(), req, stopTime));
                   }
                   else {
                       return CompletableFuture.<R>failedFuture(err);
                   }
               }
               else if (resp instanceof RaftErrorResponse) {
                   RaftErrorResponse resp0 = (RaftErrorResponse)resp;
                   RaftErrorCode errorCode = resp0.errorCode();
   
                   if (errorCode == null) {
                       leader = peer;
   
                       return CompletableFuture.<R>completedFuture(null);
                   }
   
                   switch (errorCode) {
                       case NO_LEADER:
                           return composeWithDelay(() -> 
this.<R>sendWithRetry(randomNode(), req, stopTime));
                       case LEADER_CHANGED:
                           leader = resp0.newLeader(); // Update a leader.
   
                           return composeWithDelay(() -> 
this.<R>sendWithRetry(resp0.newLeader(), req, stopTime));
                       default:
                           return CompletableFuture.<R>failedFuture(new 
RaftException(errorCode));
                   }
               }
               else {
                   leader = peer;
   
                   return CompletableFuture.completedFuture((R)resp);
               }
           })
           .thenCompose(Function.identity());
   }
   ```
   
   What do you think?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to