ibessonov commented on code in PR #2450:
URL: https://github.com/apache/ignite-3/pull/2450#discussion_r1299791137
##########
modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/TestIgnitionManager.java:
##########
@@ -100,20 +104,35 @@ private static InitParameters
applyTestDefaultsToClusterConfig(InitParameters pa
.metaStorageNodeNames(params.metaStorageNodeNames())
.cmgNodeNames(params.cmgNodeNames());
+ ConfigDocument configDocument;
+
if (params.clusterConfiguration() == null) {
- builder.clusterConfiguration("{ schemaSync.delayDuration: 0 }");
+ configDocument = ConfigDocumentFactory.parseString("{}");
} else {
- ConfigDocument configDocument =
ConfigDocumentFactory.parseString(params.clusterConfiguration());
+ configDocument =
ConfigDocumentFactory.parseString(params.clusterConfiguration());
+ }
- String delayDurationPath = "schemaSync.delayDuration";
+ configDocument = applyTestDefault(
Review Comment:
There's no way to override this value, right?
##########
modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/ActionRequestProcessor.java:
##########
@@ -91,7 +87,7 @@ public void handleRequest(RpcContext rpcCtx, ActionRequest
request) {
* @param rpcCtx The context.
*/
private void applyWrite(Node node, ActionRequest request, RpcContext
rpcCtx) {
- node.apply(new
Task(ByteBuffer.wrap(commandsMarshaller.marshall(request.command())),
+ node.apply(new
Task(ByteBuffer.wrap(node.getOptions().requiredCommandsMarshaller().marshall(request.command())),
Review Comment:
Please extract a variable, the line is pretty long already :)
##########
modules/core/src/main/java/org/apache/ignite/internal/util/VarIntUtils.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Utilities to work with general-purpose varints.
+ */
+public class VarIntUtils {
Review Comment:
I completely forgot that such code is also duplicated in V1Decoder /
V1Encoder. Please simplify that code as well.
The only real difference is that we use "long" there, but data format is the
same.
##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java:
##########
@@ -146,6 +147,16 @@ public Loza(
);
}
+ /**
+ * Sets {@link AppendEntriesRequestInterceptor} to use.
+ *
+ * @param appendEntriesRequestInterceptor Interceptor to use.
+ */
+ public void
appendEntriesRequestInterceptor(AppendEntriesRequestInterceptor
appendEntriesRequestInterceptor) {
Review Comment:
Can we pass it into a constructor?
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/PartitionCommandsMarshallerImpl.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.schema;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.raft.util.OptimizedMarshaller;
+import org.apache.ignite.internal.table.distributed.command.CatalogLevelAware;
+import org.apache.ignite.internal.util.VarIntUtils;
+import org.apache.ignite.network.serialization.MessageSerializationRegistry;
+
+/**
+ * Default {@link PartitionCommandsMarshaller} implementation.
+ */
+public class PartitionCommandsMarshallerImpl extends OptimizedMarshaller
implements PartitionCommandsMarshaller {
+ public PartitionCommandsMarshallerImpl(MessageSerializationRegistry
serializationRegistry) {
+ super(serializationRegistry);
+ }
+
+ @Override
+ public byte[] marshall(Object o) {
+ int requiredCatalogVersion = o instanceof CatalogLevelAware ?
((CatalogLevelAware) o).requiredCatalogVersion() : 0;
Review Comment:
Should it be -1 otherwise, or it doesn't matter?
##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/util/OptimizedMarshaller.java:
##########
@@ -102,7 +100,11 @@ public byte[] marshall(Object o) {
buffer = expandBuffer(buffer);
}
- return Arrays.copyOf(buffer.array(), buffer.position());
+ byte[] result = Arrays.copyOf(buffer.array(), buffer.position());
+
+ buffer.position(0);
Review Comment:
Please assert that position is 0 in constructor
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/CatalogLevelAware.java:
##########
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.command;
+
+/**
+ * A command that requires certain level of catalog version to be locally
available just to be accepted on the node.
+ */
+public interface CatalogLevelAware {
Review Comment:
Maybe it should be "catalog version", as everywhere else?
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -1158,25 +1169,31 @@ private CompletableFuture<Object>
finishTransaction(List<TablePartitionId> aggre
HybridTimestamp currentTimestamp = hybridClock.now();
HybridTimestamp commitTimestamp = commit ? currentTimestamp : null;
- FinishTxCommandBuilder finishTxCmdBldr = MSG_FACTORY.finishTxCommand()
- .txId(txId)
- .commit(commit)
- .safeTimeLong(currentTimestamp.longValue())
- .tablePartitionIds(
- aggregatedGroupIds.stream()
-
.map(PartitionReplicaListener::tablePartitionId)
- .collect(toList())
- );
-
- if (commit) {
- finishTxCmdBldr.commitTimestampLong(commitTimestamp.longValue());
- }
+ return catalogVersionFor(currentTimestamp)
+ .thenApply(catalogVersion -> {
Review Comment:
How can we guarantee that these asynchronous callbacks won't be reordered?
Safe time must be monotonous. I fell like this code is incorrect. Same applies
to the rest of the new async code in this class.
##########
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/PartitionCommandsMarshallerImplTest.java:
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.schema;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+import org.apache.ignite.internal.raft.util.OptimizedMarshaller;
+import org.apache.ignite.internal.replicator.command.SafeTimeSyncCommand;
+import
org.apache.ignite.internal.replicator.command.SafeTimeSyncCommandSerializationFactory;
+import org.apache.ignite.internal.replicator.message.ReplicaMessageGroup;
+import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
+import org.apache.ignite.internal.table.distributed.TableMessageGroup;
+import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
+import org.apache.ignite.internal.table.distributed.command.FinishTxCommand;
+import
org.apache.ignite.internal.table.distributed.command.FinishTxCommandSerializationFactory;
+import org.apache.ignite.network.MessageSerializationRegistryImpl;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.serialization.MessageSerializationRegistry;
+import org.junit.jupiter.api.Test;
+
+class PartitionCommandsMarshallerImplTest {
+ private final TableMessagesFactory tableMessagesFactory = new
TableMessagesFactory();
+ private final ReplicaMessagesFactory replicaMessagesFactory = new
ReplicaMessagesFactory();
+
+ private final MessageSerializationRegistry registry = new
MessageSerializationRegistryImpl();
+
+ {
+ // For a command that has required catalog version property.
+ registry.registerFactory(
+ TableMessageGroup.GROUP_TYPE,
+ TableMessageGroup.Commands.FINISH_TX,
+ new FinishTxCommandSerializationFactory(tableMessagesFactory)
+ );
+
+ // For a command that does not have required catalog version property.
+ registry.registerFactory(
+ (short) 8,
Review Comment:
Is there a constant for this that you could use?
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CheckCatalogVersionOnAppendEntries.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.schema;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.raft.jraft.Node;
+import org.apache.ignite.raft.jraft.entity.EnumOutter.EntryType;
+import org.apache.ignite.raft.jraft.entity.RaftOutter;
+import org.apache.ignite.raft.jraft.entity.RaftOutter.EntryMeta;
+import org.apache.ignite.raft.jraft.error.RaftError;
+import org.apache.ignite.raft.jraft.rpc.Message;
+import org.apache.ignite.raft.jraft.rpc.RaftRpcFactory;
+import org.apache.ignite.raft.jraft.rpc.RaftServerService;
+import org.apache.ignite.raft.jraft.rpc.RpcRequestClosure;
+import org.apache.ignite.raft.jraft.rpc.RpcRequests.AppendEntriesRequest;
+import org.apache.ignite.raft.jraft.rpc.impl.AppendEntriesRequestInterceptor;
+import org.apache.ignite.raft.jraft.util.Marshaller;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * An {@link AppendEntriesRequestInterceptor} that rejects requests (by
returning EBUSY error code) if any of the
+ * incoming commands requires catalog version that is not available locally
yet.
+ */
+public class CheckCatalogVersionOnAppendEntries implements
AppendEntriesRequestInterceptor {
+ private static final IgniteLogger LOG =
Loggers.forClass(CheckCatalogVersionOnAppendEntries.class);
+
+ private static final int NO_LEVEL_REQUIREMENT = Integer.MIN_VALUE;
Review Comment:
Maybe we should call it "version"
##########
modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/TestIgnitionManager.java:
##########
@@ -100,20 +104,35 @@ private static InitParameters
applyTestDefaultsToClusterConfig(InitParameters pa
.metaStorageNodeNames(params.metaStorageNodeNames())
.cmgNodeNames(params.cmgNodeNames());
+ ConfigDocument configDocument;
+
if (params.clusterConfiguration() == null) {
- builder.clusterConfiguration("{ schemaSync.delayDuration: 0 }");
+ configDocument = ConfigDocumentFactory.parseString("{}");
} else {
- ConfigDocument configDocument =
ConfigDocumentFactory.parseString(params.clusterConfiguration());
+ configDocument =
ConfigDocumentFactory.parseString(params.clusterConfiguration());
+ }
- String delayDurationPath = "schemaSync.delayDuration";
+ configDocument = applyTestDefault(
Review Comment:
I wish we would be able to get configuration from annotations, but that's
for the future
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/action/RequestType.java:
##########
@@ -79,6 +79,13 @@ public boolean isSingleRow() {
}
}
+ /**
+ * Returns {@code true} if the operation works with a single row and it's
a write.
+ */
+ public boolean isSingleRowWrite() {
Review Comment:
It's only used in a single test, right? Are you sure that we need to pollute
this class even more?
I feel like some other methods are also test-only here.
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -1158,25 +1169,31 @@ private CompletableFuture<Object>
finishTransaction(List<TablePartitionId> aggre
HybridTimestamp currentTimestamp = hybridClock.now();
HybridTimestamp commitTimestamp = commit ? currentTimestamp : null;
- FinishTxCommandBuilder finishTxCmdBldr = MSG_FACTORY.finishTxCommand()
- .txId(txId)
- .commit(commit)
- .safeTimeLong(currentTimestamp.longValue())
- .tablePartitionIds(
- aggregatedGroupIds.stream()
-
.map(PartitionReplicaListener::tablePartitionId)
- .collect(toList())
- );
-
- if (commit) {
- finishTxCmdBldr.commitTimestampLong(commitTimestamp.longValue());
- }
+ return catalogVersionFor(currentTimestamp)
+ .thenApply(catalogVersion -> {
+ FinishTxCommandBuilder finishTxCmdBldr =
MSG_FACTORY.finishTxCommand()
+ .txId(txId)
+ .commit(commit)
+ .safeTimeLong(currentTimestamp.longValue())
+ .requiredCatalogVersion(catalogVersion)
+ .tablePartitionIds(
+ aggregatedGroupIds.stream()
+
.map(PartitionReplicaListener::tablePartitionId)
+ .collect(toList())
+ );
+
+ if (commit) {
+
finishTxCmdBldr.commitTimestampLong(commitTimestamp.longValue());
Review Comment:
Off-topic: any idea, why do even need a commit timestamp, if it always
matches the safe time?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]