vldpyatkov commented on code in PR #4209:
URL: https://github.com/apache/ignite-3/pull/4209#discussion_r1711511166
##########
modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java:
##########
@@ -175,13 +179,37 @@ else if (prc.executor() != null) {
RpcProcessor<NetworkMessage> finalPrc = prc;
try {
- executor.execute(() -> finalPrc.handleRequest(new
NetworkRpcContext(executor, sender, correlationId), message));
+ if (shouldSwitchToRequestsExecutor()) {
+ executor.execute(() -> finalPrc.handleRequest(new
NetworkRpcContext(executor, sender, correlationId), message));
+ } else {
+ finalPrc.handleRequest(new NetworkRpcContext(executor,
sender, correlationId), message);
+ }
} catch (RejectedExecutionException e) {
// The rejection is ok if an executor has been stopped,
otherwise it shouldn't happen.
LOG.warn("A request execution was rejected [sender={} req={}
reason={}]", sender, S.toString(message), e.getMessage());
}
}
+ private boolean shouldSwitchToRequestsExecutor() {
Review Comment:
I renamed this method in order to avoid confusion when reading the code. I
am not sure that we can extract the logic to the specific method right now.
##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java:
##########
@@ -264,7 +268,13 @@ public CompletableFuture<Void> startAsync(ComponentContext
componentContext) {
opts.setSnapshotTimer(JRaftUtils.createTimer(opts,
"JRaft-SnapshotTimer"));
}
- requestExecutor = JRaftUtils.createRequestExecutor(opts);
+ requestExecutor = new ThreadPoolExecutor(
Review Comment:
Done.
##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java:
##########
@@ -264,7 +268,13 @@ public CompletableFuture<Void> startAsync(ComponentContext
componentContext) {
opts.setSnapshotTimer(JRaftUtils.createTimer(opts,
"JRaft-SnapshotTimer"));
}
- requestExecutor = JRaftUtils.createRequestExecutor(opts);
+ requestExecutor = new ThreadPoolExecutor(
+ opts.getRaftRpcThreadPoolSize(),
+ opts.getRaftRpcThreadPoolSize(),
+ 0, SECONDS,
+ new LinkedBlockingQueue<>(),
+ IgniteThreadFactory.create(opts.getServerName(),
"JRaft-Request-Processor", LOG, STORAGE_READ, STORAGE_WRITE)
Review Comment:
I replaced all the permits to dedicate one for processing ARFT requests. It
helps to avoid thread switching in both client-load and embedded-load cases.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]