tkalkirill commented on code in PR #2772:
URL: https://github.com/apache/ignite-3/pull/2772#discussion_r1390952999
##########
modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/JDKMarshaller.java:
##########
@@ -21,11 +21,14 @@
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.nio.ByteBuffer;
+import org.apache.ignite.internal.raft.Marshaller;
/**
*
*/
public class JDKMarshaller implements Marshaller {
+ public static final Marshaller INSTANCE = new JDKMarshaller();
Review Comment:
Missing javadoc.
##########
modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/server/raft/ItMetaStorageRaftGroupTest.java:
##########
@@ -403,7 +410,8 @@ private List<Pair<RaftServer, RaftGroupService>>
prepareJraftMetaStorages(Atomic
raftConfiguration,
membersConfiguration,
true,
- executor
+ executor,
+ commandsMarshaller
).get();
Review Comment:
same
##########
modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageDeserializerGenerator.java:
##########
@@ -118,7 +120,10 @@ private MethodSpec readMessageMethod(MessageClass message,
FieldSpec msgField) {
.addParameter(MessageReader.class, "reader")
.addException(MessageMappingException.class);
- List<ExecutableElement> getters = message.getters();
+ List<ExecutableElement> getters = message.getters()
+ .stream()
+ .filter(e -> e.getAnnotation(Transient.class) == null)
+ .collect(Collectors.toList());
Review Comment:
Suggetion: `.collect(toList());`
##########
modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/server/raft/ItMetaStorageRaftGroupTest.java:
##########
@@ -393,7 +399,8 @@ private List<Pair<RaftServer, RaftGroupService>>
prepareJraftMetaStorages(Atomic
raftConfiguration,
membersConfiguration,
true,
- executor
+ executor,
+ commandsMarshaller
).get();
Review Comment:
Lets use assertThat(future, willCompleteSuccsessfully());
future.join();
##########
modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftManager.java:
##########
@@ -148,6 +150,7 @@ <T extends RaftGroupService> CompletableFuture<T>
startRaftGroupNodeAndWaitNodeR
* @return Future that will be completed with an instance of a Raft group
service.
* @throws NodeStoppingException If node stopping intention was detected.
*/
+ @TestOnly
Review Comment:
I see that it is used not only in tests but also, for example, in
`PlacementDriverManager`.
##########
modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java:
##########
@@ -150,7 +150,8 @@ public void start() {
return raftManager.startRaftGroupService(
replicationGroupId,
PeersAndLearners.fromConsistentIds(placementDriverNodes),
- topologyAwareRaftGroupServiceFactory
+ topologyAwareRaftGroupServiceFactory,
+ null
Review Comment:
why `null`?
##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java:
##########
@@ -452,9 +459,12 @@ public <R> CompletableFuture<R> run(Command cmd) {
Function<Peer, ActionRequest> requestFactory;
if (cmd instanceof WriteCommand) {
+ byte[] commandBytes = commandsMarshaller.marshall(cmd);
+
requestFactory = targetPeer -> factory.writeActionRequest()
.groupId(groupId)
- .command((WriteCommand) cmd)
+ .command(commandBytes)
+ .deserializedCommand((WriteCommand) cmd)
Review Comment:
Please give more description here, why pass the command in raw form.
##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java:
##########
@@ -123,6 +125,7 @@ public class RaftGroupServiceImpl implements
RaftGroupService {
* @param membersConfiguration Raft members configuration.
* @param leader Group leader.
* @param executor Executor for retrying requests.
+ * @param commandsMarshaller Marshaller that should be used to
[de]serialize commands.
Review Comment:
```suggestion
* @param commandsMarshaller Marshaller that should be used to
de]serialize commands.
```
##########
modules/raft-api/src/main/java/org/apache/ignite/internal/raft/Marshaller.java:
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Marshaller interface, for instances that convert objects to {@code byte[]}
and back.
+ */
Review Comment:
```suggestion
/** Marshaller interface, for instances that convert objects to {@code
byte[]} and back. */
```
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/PartitionCommand.java:
##########
@@ -38,5 +41,15 @@ public interface PartitionCommand extends
SafeTimePropagatingCommand, CatalogVer
* Returns version that the Catalog must have locally for the node to be
allowed to accept this command via replication.
*/
@Override
+ @Transient
+ @WithSetter
int requiredCatalogVersion();
+
+ /**
+ * Setter for {@link #requiredCatalogVersion()}. Called by the creator or
the {@link PartitionCommandsMarshaller} while deserializing.
+ */
+ @Override
+ default void requiredCatalogVersion(int version) {
Review Comment:
Little strange
##########
modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageSerializerGenerator.java:
##########
@@ -87,7 +89,10 @@ private MethodSpec writeMessageMethod(MessageClass message) {
method.addStatement("$T message = ($T) msg", message.implClassName(),
message.implClassName()).addCode("\n");
- List<ExecutableElement> getters = message.getters();
+ List<ExecutableElement> getters = message.getters()
+ .stream()
Review Comment:
```suggestion
List<ExecutableElement> getters = message.getters().stream()
```
##########
modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftServiceFactory.java:
##########
@@ -40,6 +40,7 @@ CompletableFuture<T> startRaftGroupService(
ReplicationGroupId groupId,
PeersAndLearners peersAndLearners,
RaftConfiguration raftConfiguration,
- ScheduledExecutorService raftClientExecutor
+ ScheduledExecutorService raftClientExecutor,
+ Marshaller commandsMarshaller
Review Comment:
Missing field description
##########
modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageDeserializerGenerator.java:
##########
@@ -118,7 +120,10 @@ private MethodSpec readMessageMethod(MessageClass message,
FieldSpec msgField) {
.addParameter(MessageReader.class, "reader")
.addException(MessageMappingException.class);
- List<ExecutableElement> getters = message.getters();
+ List<ExecutableElement> getters = message.getters()
+ .stream()
Review Comment:
```suggestion
List<ExecutableElement> getters = message.getters().stream()
```
##########
modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/BeforeApplyHandler.java:
##########
@@ -38,6 +38,7 @@ public interface BeforeApplyHandler {
* this is a place to do it.
*
* @param command The command.
+ * @return {@code true} if command has been changed, {@code false}
otherwise.
Review Comment:
What does command changed mean? from a write command to a read command and
vice versa?
##########
modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/server/raft/ItMetaStorageRaftGroupTest.java:
##########
@@ -413,7 +421,8 @@ private List<Pair<RaftServer, RaftGroupService>>
prepareJraftMetaStorages(Atomic
raftConfiguration,
membersConfiguration,
true,
- executor
+ executor,
+ commandsMarshaller
).get();
Review Comment:
same
##########
modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageSerializerGenerator.java:
##########
@@ -87,7 +89,10 @@ private MethodSpec writeMessageMethod(MessageClass message) {
method.addStatement("$T message = ($T) msg", message.implClassName(),
message.implClassName()).addCode("\n");
- List<ExecutableElement> getters = message.getters();
+ List<ExecutableElement> getters = message.getters()
+ .stream()
+ .filter(e -> e.getAnnotation(Transient.class) == null)
+ .collect(Collectors.toList());
Review Comment:
Suggession: `.collect(toList());`
##########
modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftManager.java:
##########
@@ -165,6 +168,7 @@ CompletableFuture<RaftGroupService> startRaftGroupService(
<T extends RaftGroupService> CompletableFuture<T> startRaftGroupService(
ReplicationGroupId groupId,
PeersAndLearners configuration,
- RaftServiceFactory<T> factory
+ RaftServiceFactory<T> factory,
+ @Nullable Marshaller commandsMarshaller
Review Comment:
Missing field description
##########
modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/WriteActionRequest.java:
##########
@@ -19,15 +19,23 @@
import org.apache.ignite.internal.raft.WriteCommand;
import org.apache.ignite.network.annotations.Transferable;
+import org.apache.ignite.network.annotations.Transient;
import org.apache.ignite.raft.jraft.RaftMessageGroup.RpcActionMessageGroup;
+import org.jetbrains.annotations.Nullable;
/**
* Submit a write action to a replication group.
*/
@Transferable(RpcActionMessageGroup.WRITE_ACTION_REQUEST)
public interface WriteActionRequest extends ActionRequest {
/**
- * Returns an action's command.
+ * @return Serialized action's command. Specific serialization format may
differ from group to group.
*/
- WriteCommand command();
+ byte[] command();
+
+ /**
+ * @return Original non-serialized command, if available. {@code null} if
not.
Review Comment:
Please expand the description and describe why this field is and why it
should be used and when it will be `null`.
##########
modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/ActionRequestProcessor.java:
##########
@@ -82,32 +83,80 @@ public void handleRequest(RpcContext rpcCtx, ActionRequest
request) {
return;
}
- JraftServerImpl.DelegatingStateMachine fsm =
(JraftServerImpl.DelegatingStateMachine) node.getOptions().getFsm();
+ Marshaller commandsMarshaller =
node.getOptions().getCommandsMarshaller();
+
+ assert commandsMarshaller != null : "Marshaller for group " +
request.groupId() + " is not found.";
+
+ handleRequestInternal(rpcCtx, node, request, commandsMarshaller);
+ }
+
+ /**
+ * Internal part of the {@link #handleRequest(RpcContext, ActionRequest)},
that contains resolved RAFT node, as well as a commands
+ * marshaller instance. May be conveniently reused in subclasses.
+ */
+ protected void handleRequestInternal(RpcContext rpcCtx, Node node,
ActionRequest request, Marshaller commandsMarshaller) {
+ DelegatingStateMachine fsm = (DelegatingStateMachine)
node.getOptions().getFsm();
+ RaftGroupListener listener = fsm.getListener();
if (request instanceof WriteActionRequest) {
+ WriteActionRequest writeRequest = (WriteActionRequest)request;
+
+ WriteCommand command = writeRequest.deserializedCommand();
+
+ if (command == null) {
+ command =
commandsMarshaller.unmarshall(writeRequest.command());
+ }
+
if (fsm.getListener() instanceof BeforeApplyHandler) {
synchronized (groupIdSyncMonitor(request.groupId())) {
- callOnBeforeApply(request, fsm);
- applyWrite(node, (WriteActionRequest) request, rpcCtx);
+ writeRequest = patchCommandBeforeApply(writeRequest,
(BeforeApplyHandler) listener, command, commandsMarshaller);
+
+ applyWrite(node, writeRequest, command, rpcCtx);
}
} else {
- applyWrite(node, (WriteActionRequest) request, rpcCtx);
+ applyWrite(node, writeRequest, command, rpcCtx);
}
} else {
- if (fsm.getListener() instanceof BeforeApplyHandler) {
- callOnBeforeApply(request, fsm);
+ ReadActionRequest readRequest = (ReadActionRequest) request;
+
+ if (listener instanceof BeforeApplyHandler) {
+ ReadCommand command = readRequest.command();
+
+ readRequest = patchCommandBeforeApply(readRequest,
(BeforeApplyHandler) listener, command, commandsMarshaller);
}
- applyRead(node, (ReadActionRequest) request, rpcCtx);
+ applyRead(node, readRequest, rpcCtx);
}
}
- private static void callOnBeforeApply(ActionRequest request,
DelegatingStateMachine fsm) {
- Command command = request instanceof WriteActionRequest
- ? ((WriteActionRequest) request).command()
- : ((ReadActionRequest) request).command();
-
- ((BeforeApplyHandler) fsm.getListener()).onBeforeApply(command);
+ /**
+ * This method calls {@link BeforeApplyHandler#onBeforeApply(Command)} and
returns action request with a serialized version of the
+ * updated command, if it has been updated. Otherwise, the method returns
the original {@code request} instance. The reason for such
+ * behavior is the fact that we use {@code byte[]} in action requests,
thus modified command should be serialized twice.
+ */
+ private <AR extends ActionRequest> AR patchCommandBeforeApply(
+ AR request,
+ BeforeApplyHandler beforeApplyHandler,
+ Command command,
+ Marshaller commandsMarshaller
+ ) {
+ if (beforeApplyHandler.onBeforeApply(command)) {
+ if (request instanceof WriteActionRequest) {
+ return (AR) factory.writeActionRequest()
+ .groupId(request.groupId())
+ .command(commandsMarshaller.marshall(command))
+ .deserializedCommand((WriteCommand)command)
Review Comment:
Do you think you shouldn’t do this with reading commands?
##########
modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/ActionRequestProcessor.java:
##########
@@ -82,32 +83,80 @@ public void handleRequest(RpcContext rpcCtx, ActionRequest
request) {
return;
}
- JraftServerImpl.DelegatingStateMachine fsm =
(JraftServerImpl.DelegatingStateMachine) node.getOptions().getFsm();
+ Marshaller commandsMarshaller =
node.getOptions().getCommandsMarshaller();
+
+ assert commandsMarshaller != null : "Marshaller for group " +
request.groupId() + " is not found.";
+
+ handleRequestInternal(rpcCtx, node, request, commandsMarshaller);
+ }
+
+ /**
+ * Internal part of the {@link #handleRequest(RpcContext, ActionRequest)},
that contains resolved RAFT node, as well as a commands
+ * marshaller instance. May be conveniently reused in subclasses.
+ */
+ protected void handleRequestInternal(RpcContext rpcCtx, Node node,
ActionRequest request, Marshaller commandsMarshaller) {
+ DelegatingStateMachine fsm = (DelegatingStateMachine)
node.getOptions().getFsm();
+ RaftGroupListener listener = fsm.getListener();
if (request instanceof WriteActionRequest) {
+ WriteActionRequest writeRequest = (WriteActionRequest)request;
+
+ WriteCommand command = writeRequest.deserializedCommand();
+
+ if (command == null) {
+ command =
commandsMarshaller.unmarshall(writeRequest.command());
+ }
+
if (fsm.getListener() instanceof BeforeApplyHandler) {
synchronized (groupIdSyncMonitor(request.groupId())) {
- callOnBeforeApply(request, fsm);
- applyWrite(node, (WriteActionRequest) request, rpcCtx);
+ writeRequest = patchCommandBeforeApply(writeRequest,
(BeforeApplyHandler) listener, command, commandsMarshaller);
+
+ applyWrite(node, writeRequest, command, rpcCtx);
}
} else {
- applyWrite(node, (WriteActionRequest) request, rpcCtx);
+ applyWrite(node, writeRequest, command, rpcCtx);
}
} else {
- if (fsm.getListener() instanceof BeforeApplyHandler) {
- callOnBeforeApply(request, fsm);
+ ReadActionRequest readRequest = (ReadActionRequest) request;
+
+ if (listener instanceof BeforeApplyHandler) {
+ ReadCommand command = readRequest.command();
+
+ readRequest = patchCommandBeforeApply(readRequest,
(BeforeApplyHandler) listener, command, commandsMarshaller);
}
- applyRead(node, (ReadActionRequest) request, rpcCtx);
+ applyRead(node, readRequest, rpcCtx);
}
}
- private static void callOnBeforeApply(ActionRequest request,
DelegatingStateMachine fsm) {
- Command command = request instanceof WriteActionRequest
- ? ((WriteActionRequest) request).command()
- : ((ReadActionRequest) request).command();
-
- ((BeforeApplyHandler) fsm.getListener()).onBeforeApply(command);
+ /**
+ * This method calls {@link BeforeApplyHandler#onBeforeApply(Command)} and
returns action request with a serialized version of the
+ * updated command, if it has been updated. Otherwise, the method returns
the original {@code request} instance. The reason for such
+ * behavior is the fact that we use {@code byte[]} in action requests,
thus modified command should be serialized twice.
+ */
+ private <AR extends ActionRequest> AR patchCommandBeforeApply(
+ AR request,
+ BeforeApplyHandler beforeApplyHandler,
+ Command command,
+ Marshaller commandsMarshaller
+ ) {
+ if (beforeApplyHandler.onBeforeApply(command)) {
Review Comment:
maybe:
`if (!beforeApplyHandler.onBeforeApply(command)) {
return request.
}
... other code`
##########
modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/ActionRequestProcessor.java:
##########
@@ -82,32 +83,80 @@ public void handleRequest(RpcContext rpcCtx, ActionRequest
request) {
return;
}
- JraftServerImpl.DelegatingStateMachine fsm =
(JraftServerImpl.DelegatingStateMachine) node.getOptions().getFsm();
+ Marshaller commandsMarshaller =
node.getOptions().getCommandsMarshaller();
+
+ assert commandsMarshaller != null : "Marshaller for group " +
request.groupId() + " is not found.";
+
+ handleRequestInternal(rpcCtx, node, request, commandsMarshaller);
+ }
+
+ /**
+ * Internal part of the {@link #handleRequest(RpcContext, ActionRequest)},
that contains resolved RAFT node, as well as a commands
+ * marshaller instance. May be conveniently reused in subclasses.
+ */
+ protected void handleRequestInternal(RpcContext rpcCtx, Node node,
ActionRequest request, Marshaller commandsMarshaller) {
+ DelegatingStateMachine fsm = (DelegatingStateMachine)
node.getOptions().getFsm();
+ RaftGroupListener listener = fsm.getListener();
if (request instanceof WriteActionRequest) {
+ WriteActionRequest writeRequest = (WriteActionRequest)request;
+
+ WriteCommand command = writeRequest.deserializedCommand();
+
+ if (command == null) {
+ command =
commandsMarshaller.unmarshall(writeRequest.command());
+ }
+
if (fsm.getListener() instanceof BeforeApplyHandler) {
synchronized (groupIdSyncMonitor(request.groupId())) {
- callOnBeforeApply(request, fsm);
- applyWrite(node, (WriteActionRequest) request, rpcCtx);
+ writeRequest = patchCommandBeforeApply(writeRequest,
(BeforeApplyHandler) listener, command, commandsMarshaller);
+
+ applyWrite(node, writeRequest, command, rpcCtx);
}
} else {
- applyWrite(node, (WriteActionRequest) request, rpcCtx);
+ applyWrite(node, writeRequest, command, rpcCtx);
}
} else {
- if (fsm.getListener() instanceof BeforeApplyHandler) {
- callOnBeforeApply(request, fsm);
+ ReadActionRequest readRequest = (ReadActionRequest) request;
+
+ if (listener instanceof BeforeApplyHandler) {
+ ReadCommand command = readRequest.command();
+
+ readRequest = patchCommandBeforeApply(readRequest,
(BeforeApplyHandler) listener, command, commandsMarshaller);
}
- applyRead(node, (ReadActionRequest) request, rpcCtx);
+ applyRead(node, readRequest, rpcCtx);
}
}
- private static void callOnBeforeApply(ActionRequest request,
DelegatingStateMachine fsm) {
- Command command = request instanceof WriteActionRequest
- ? ((WriteActionRequest) request).command()
- : ((ReadActionRequest) request).command();
-
- ((BeforeApplyHandler) fsm.getListener()).onBeforeApply(command);
+ /**
+ * This method calls {@link BeforeApplyHandler#onBeforeApply(Command)} and
returns action request with a serialized version of the
+ * updated command, if it has been updated. Otherwise, the method returns
the original {@code request} instance. The reason for such
+ * behavior is the fact that we use {@code byte[]} in action requests,
thus modified command should be serialized twice.
+ */
+ private <AR extends ActionRequest> AR patchCommandBeforeApply(
+ AR request,
+ BeforeApplyHandler beforeApplyHandler,
+ Command command,
+ Marshaller commandsMarshaller
+ ) {
+ if (beforeApplyHandler.onBeforeApply(command)) {
+ if (request instanceof WriteActionRequest) {
+ return (AR) factory.writeActionRequest()
+ .groupId(request.groupId())
+ .command(commandsMarshaller.marshall(command))
+ .deserializedCommand((WriteCommand)command)
Review Comment:
Do you think you shouldn’t do this with reading commands?
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/CatalogVersionAware.java:
##########
@@ -25,4 +25,9 @@ public interface CatalogVersionAware {
* Returns version that the Catalog must have locally for the node to be
allowed to accept this command via replication.
*/
int requiredCatalogVersion();
+
+ /**
+ * Setter for the required catalog version. Should be used with caution.
Only object's creator should call it.
+ */
+ void requiredCatalogVersion(int version);
Review Comment:
Shouldn't
`org.apache.ignite.internal.table.distributed.command.PartitionCommand#requiredCatalogVersion()`
hide this setter?
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/PartitionCommandsMarshaller.java:
##########
@@ -29,7 +29,7 @@ public interface PartitionCommandsMarshaller extends
Marshaller {
* Reads required catalog version from the provided buffer.
*
* @param raw Buffer to read from.
- * @return Catalog version.
+ * @return Catalog version. {@code 0} if version is not required for the
given command.
Review Comment:
`-1` is more obvious.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]