ascherbakoff commented on a change in pull request #147:
URL: https://github.com/apache/ignite-3/pull/147#discussion_r641592583
##########
File path:
modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupServiceImpl.java
##########
@@ -287,67 +324,124 @@ public RaftGroupServiceImpl(
if (leader == null)
return refreshLeader().thenCompose(res -> run(cmd));
- ActionRequest<R> req =
factory.actionRequest().command(cmd).groupId(groupId).build();
+ ActionRequest req =
factory.actionRequest().command(cmd).groupId(groupId).readOnlySafe(true).build();
+
+ CompletableFuture<ActionResponse<R>> fut = new CompletableFuture<>();
- CompletableFuture<ActionResponse<R>> fut =
sendWithRetry(leader.getNode(), req, currentTimeMillis() + timeout);
+ sendWithRetry(leader, req, currentTimeMillis() + timeout, fut);
return fut.thenApply(resp -> resp.result());
}
- /** {@inheritDoc} */
+ /**
+ * {@inheritDoc}
+ */
@Override public <R> CompletableFuture<R> run(Peer peer, ReadCommand cmd) {
- ActionRequest req =
factory.actionRequest().command(cmd).groupId(groupId).build();
+ ActionRequest req =
factory.actionRequest().command(cmd).groupId(groupId).readOnlySafe(false).build();
+
+ CompletableFuture fut =
cluster.messagingService().invoke(peer.address(), req, timeout);
- CompletableFuture<?> fut =
cluster.messagingService().invoke(peer.getNode(), req, timeout);
+ return fut.thenApply(resp -> ((ActionResponse) resp).result());
+ }
- return fut.thenApply(resp -> ((ActionResponse<R>) resp).result());
+ /** {@inheritDoc} */
+ @Override public void shutdown() {
+ if (!reuse)
+ cluster.shutdown();
}
- private <R> CompletableFuture<R> sendWithRetry(ClusterNode node,
NetworkMessage req, long stopTime) {
- if (currentTimeMillis() >= stopTime)
- return CompletableFuture.failedFuture(new TimeoutException());
- return cluster.messagingService().invoke(node, req, timeout)
- .thenCompose(resp -> {
- if (resp instanceof RaftErrorResponse) {
- RaftErrorResponse resp0 = (RaftErrorResponse)resp;
- switch (resp0.errorCode()) {
- case NO_LEADER:
- return composeWithDelay(() ->
sendWithRetry(randomNode(), req, stopTime));
- case LEADER_CHANGED:
- leader = resp0.newLeader();
- return composeWithDelay(() ->
sendWithRetry(resp0.newLeader().getNode(), req, stopTime));
- case SUCCESS:
- return CompletableFuture.completedFuture(null);
- default:
- return CompletableFuture.failedFuture(new
RaftException(resp0.errorCode()));
+ /**
+ * Retries request until success or timeout.
+ *
+ * @param addr Target address.
+ * @param req The request.
+ * @param stopTime Stop time.
+ * @param fut The future.
+ * @param <R> Response type.
+ */
+ private <R> void sendWithRetry(Peer peer, Object req, long stopTime,
CompletableFuture<R> fut) {
Review comment:
Because it doesn't work as intended. I've added a test reproducing a
problem: testUserRequestLeaderElectedAfterDelayWithFailedNode.
Actually I prefer to not change current approach, because: 1) it works for
me 2) it looks quite readable to me 3) it's a bit more efficient (creates less
amount of futures. 4) I think your solution is not perfect either and can be
improved.
I'm ok to address this later in separate ticket, after merging jraft.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]