ibessonov commented on code in PR #2772:
URL: https://github.com/apache/ignite-3/pull/2772#discussion_r1392216912
##########
modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/ActionRequestProcessor.java:
##########
@@ -82,32 +83,80 @@ public void handleRequest(RpcContext rpcCtx, ActionRequest
request) {
return;
}
- JraftServerImpl.DelegatingStateMachine fsm =
(JraftServerImpl.DelegatingStateMachine) node.getOptions().getFsm();
+ Marshaller commandsMarshaller =
node.getOptions().getCommandsMarshaller();
+
+ assert commandsMarshaller != null : "Marshaller for group " +
request.groupId() + " is not found.";
+
+ handleRequestInternal(rpcCtx, node, request, commandsMarshaller);
+ }
+
+ /**
+ * Internal part of the {@link #handleRequest(RpcContext, ActionRequest)},
that contains resolved RAFT node, as well as a commands
+ * marshaller instance. May be conveniently reused in subclasses.
+ */
+ protected void handleRequestInternal(RpcContext rpcCtx, Node node,
ActionRequest request, Marshaller commandsMarshaller) {
+ DelegatingStateMachine fsm = (DelegatingStateMachine)
node.getOptions().getFsm();
+ RaftGroupListener listener = fsm.getListener();
if (request instanceof WriteActionRequest) {
+ WriteActionRequest writeRequest = (WriteActionRequest)request;
+
+ WriteCommand command = writeRequest.deserializedCommand();
+
+ if (command == null) {
+ command =
commandsMarshaller.unmarshall(writeRequest.command());
+ }
+
if (fsm.getListener() instanceof BeforeApplyHandler) {
synchronized (groupIdSyncMonitor(request.groupId())) {
- callOnBeforeApply(request, fsm);
- applyWrite(node, (WriteActionRequest) request, rpcCtx);
+ writeRequest = patchCommandBeforeApply(writeRequest,
(BeforeApplyHandler) listener, command, commandsMarshaller);
+
+ applyWrite(node, writeRequest, command, rpcCtx);
}
} else {
- applyWrite(node, (WriteActionRequest) request, rpcCtx);
+ applyWrite(node, writeRequest, command, rpcCtx);
}
} else {
- if (fsm.getListener() instanceof BeforeApplyHandler) {
- callOnBeforeApply(request, fsm);
+ ReadActionRequest readRequest = (ReadActionRequest) request;
+
+ if (listener instanceof BeforeApplyHandler) {
+ ReadCommand command = readRequest.command();
+
+ readRequest = patchCommandBeforeApply(readRequest,
(BeforeApplyHandler) listener, command, commandsMarshaller);
}
- applyRead(node, (ReadActionRequest) request, rpcCtx);
+ applyRead(node, readRequest, rpcCtx);
}
}
- private static void callOnBeforeApply(ActionRequest request,
DelegatingStateMachine fsm) {
- Command command = request instanceof WriteActionRequest
- ? ((WriteActionRequest) request).command()
- : ((ReadActionRequest) request).command();
-
- ((BeforeApplyHandler) fsm.getListener()).onBeforeApply(command);
+ /**
+ * This method calls {@link BeforeApplyHandler#onBeforeApply(Command)} and
returns action request with a serialized version of the
+ * updated command, if it has been updated. Otherwise, the method returns
the original {@code request} instance. The reason for such
+ * behavior is the fact that we use {@code byte[]} in action requests,
thus modified command should be serialized twice.
+ */
+ private <AR extends ActionRequest> AR patchCommandBeforeApply(
+ AR request,
+ BeforeApplyHandler beforeApplyHandler,
+ Command command,
+ Marshaller commandsMarshaller
+ ) {
+ if (beforeApplyHandler.onBeforeApply(command)) {
Review Comment:
Sure
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]