rpuch commented on code in PR #4209:
URL: https://github.com/apache/ignite-3/pull/4209#discussion_r1711227345
##########
modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java:
##########
@@ -175,13 +179,37 @@ else if (prc.executor() != null) {
RpcProcessor<NetworkMessage> finalPrc = prc;
try {
- executor.execute(() -> finalPrc.handleRequest(new
NetworkRpcContext(executor, sender, correlationId), message));
+ if (shouldSwitchToRequestsExecutor()) {
+ executor.execute(() -> finalPrc.handleRequest(new
NetworkRpcContext(executor, sender, correlationId), message));
+ } else {
+ finalPrc.handleRequest(new NetworkRpcContext(executor,
sender, correlationId), message);
+ }
} catch (RejectedExecutionException e) {
// The rejection is ok if an executor has been stopped,
otherwise it shouldn't happen.
LOG.warn("A request execution was rejected [sender={} req={}
reason={}]", sender, S.toString(message), e.getMessage());
}
}
+ private boolean shouldSwitchToRequestsExecutor() {
Review Comment:
This method differs from the corresponding method in ReplicaManager in a
tiny way (this method does not check for 3rd permission). Would it make sense
to extract a common method and parameterize it with the set of permissions to
check, to avoid duplication?
##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java:
##########
@@ -264,7 +268,13 @@ public CompletableFuture<Void> startAsync(ComponentContext
componentContext) {
opts.setSnapshotTimer(JRaftUtils.createTimer(opts,
"JRaft-SnapshotTimer"));
}
- requestExecutor = JRaftUtils.createRequestExecutor(opts);
+ requestExecutor = new ThreadPoolExecutor(
+ opts.getRaftRpcThreadPoolSize(),
+ opts.getRaftRpcThreadPoolSize(),
+ 0, SECONDS,
+ new LinkedBlockingQueue<>(),
+ IgniteThreadFactory.create(opts.getServerName(),
"JRaft-Request-Processor", LOG, STORAGE_READ, STORAGE_WRITE)
Review Comment:
Why do we need to allow it STORAGE_READ and STORAGE_WRITE?
##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java:
##########
@@ -264,7 +268,13 @@ public CompletableFuture<Void> startAsync(ComponentContext
componentContext) {
opts.setSnapshotTimer(JRaftUtils.createTimer(opts,
"JRaft-SnapshotTimer"));
}
- requestExecutor = JRaftUtils.createRequestExecutor(opts);
+ requestExecutor = new ThreadPoolExecutor(
Review Comment:
It seems to be the same as
`Executors.newFixedThreadPool(opts.getRaftRpcThreadPoolSize(), ... factory ...)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]