sanpwc commented on code in PR #3931:
URL: https://github.com/apache/ignite-3/pull/3931#discussion_r1686389675
##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java:
##########
@@ -17,31 +17,103 @@
package org.apache.ignite.internal.partition.replicator;
+import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
-import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.raft.service.RaftCommandRunner;
+import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.replicator.ReplicaResult;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
import org.apache.ignite.internal.replicator.listener.ReplicaListener;
import org.apache.ignite.internal.replicator.message.ReplicaRequest;
-import org.apache.ignite.internal.schema.BinaryRowImpl;
+import org.apache.ignite.internal.replicator.message.TableAware;
+import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest;
+import org.apache.ignite.internal.tx.message.TxMessagesFactory;
/**
* Zone partition replica listener.
*/
public class ZonePartitionReplicaListener implements ReplicaListener {
+ private static final TxMessagesFactory TX_MESSAGES_FACTORY = new
TxMessagesFactory();
+
+ private static final IgniteLogger LOG =
Loggers.forClass(ZonePartitionReplicaListener.class);
+
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-22624 await for the
table replica listener if needed.
+ private final Map<TablePartitionId, ReplicaListener> replicas = new
ConcurrentHashMap<>();
+
+ private final RaftGroupService raftClient;
Review Comment:
Please use `RaftCommandRunner`.
##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionRaftListener.java:
##########
@@ -37,7 +50,35 @@ public void onRead(Iterator<CommandClosure<ReadCommand>>
iterator) {
Review Comment:
I'd rather add following implementation for onRead()
```
iterator.forEachRemaining((CommandClosure<? extends ReadCommand>
clo) -> {
Command command = clo.command();
assert false : "No read commands expected, [cmd=" + command +
']';
});
```
just like the one we have in `PartitionListener`
##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionRaftListener.java:
##########
@@ -17,18 +17,31 @@
package org.apache.ignite.internal.partition.replicator;
+import static org.apache.ignite.internal.tx.TxState.ABORTED;
+import static org.apache.ignite.internal.tx.TxState.COMMITTED;
+
+import java.io.Serializable;
import java.nio.file.Path;
import java.util.Iterator;
+import java.util.concurrent.CompletionException;
import java.util.function.Consumer;
+import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import
org.apache.ignite.internal.partition.replicator.network.command.FinishTxCommand;
+import org.apache.ignite.internal.raft.Command;
import org.apache.ignite.internal.raft.ReadCommand;
import org.apache.ignite.internal.raft.WriteCommand;
import org.apache.ignite.internal.raft.service.CommandClosure;
import org.apache.ignite.internal.raft.service.RaftGroupListener;
+import org.apache.ignite.internal.replicator.ZonePartitionReplicaImpl;
+import org.apache.ignite.internal.tx.TransactionResult;
/**
* RAFT listener for the zone partition.
*/
public class ZonePartitionRaftListener implements RaftGroupListener {
+ private static final IgniteLogger LOG =
Loggers.forClass(ZonePartitionReplicaImpl.class);
Review Comment:
Wrong class.
##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java:
##########
@@ -36,6 +37,14 @@ public interface Replica {
@Deprecated(forRemoval = true)
TopologyAwareRaftGroupService raftClient();
+ /**
+ * Returns replica's listener.
+ *
+ * @return Replica's listener.
+ */
+ @Deprecated(forRemoval = true)
+ ReplicaListener listener();
Review Comment:
Assuming that it's a tmp solution, how are you going to add table processor
to the ZoneReplcia?
I mean that currently it's
```
replicaMgr.replica(new ZonePartitionId(zoneId, partId))
.thenAcceptAsync(zoneReplica ->
((ZonePartitionReplicaListener)
zoneReplica.listener()).addTableReplicaListener(
new TablePartitionId(tableId, partId),
createListener
), ioExecutor
);
```
What is an expected solution to substitute aforementioned one?
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -635,6 +645,133 @@ public CompletableFuture<Void>
startAsync(ComponentContext componentContext) {
});
}
+ private CompletableFuture<Boolean>
initTableFsmOnTableCreate(CreateTableEventParameters parameters) {
+ if (!PartitionReplicaLifecycleManager.ENABLED) {
+ return completedFuture(false);
+ }
+
+ long causalityToken = parameters.causalityToken();
+ CatalogTableDescriptor tableDescriptor = parameters.tableDescriptor();
+ CatalogZoneDescriptor zoneDescriptor =
getZoneDescriptor(tableDescriptor, parameters.catalogVersion());
+
+ TableImpl table = createTableImpl(causalityToken, tableDescriptor,
zoneDescriptor);
+
+ tablesVv.update(causalityToken, (ignore, e) -> inBusyLock(busyLock, ()
-> {
+ if (e != null) {
+ return failedFuture(e);
+ }
+
+ return schemaManager.schemaRegistry(causalityToken,
parameters.tableId()).thenAccept(table::schemaView);
+ }));
+
+ // NB: all vv.update() calls must be made from the synchronous part of
the method (not in thenCompose()/etc!).
+ CompletableFuture<?> localPartsUpdateFuture =
localPartitionsVv.update(causalityToken,
+ (ignore, throwable) -> inBusyLock(busyLock, () ->
nullCompletedFuture().thenComposeAsync((ignored) -> {
+ PartitionSet parts = new BitSetPartitionSet();
+
+ for (int i = 0; i < zoneDescriptor.partitions(); i++) {
+ if
(partitionReplicaLifecycleManager.hasLocalPartition(new
ZonePartitionId(zoneDescriptor.id(), i))) {
+ parts.set(i);
+ }
+ }
+
+ return getOrCreatePartitionStorages(table,
parts).thenAccept(u -> localPartsByTableId.put(parameters.tableId(), parts));
+ }, ioExecutor)));
+
+ CompletableFuture<?> tablesByIdFuture = tablesVv.get(causalityToken);
+
+ CompletableFuture<?> createPartsFut =
assignmentsUpdatedVv.update(causalityToken, (token, e) -> {
+ if (e != null) {
+ return failedFuture(e);
+ }
+
+ return allOf(localPartsUpdateFuture,
tablesByIdFuture).thenComposeAsync(ignore -> inBusyLock(busyLock, () -> {
+ var startPartsFut = new
ArrayList<CompletableFuture<?>>();
+
+ for (int i = 0; i < zoneDescriptor.partitions(); i++) {
+ if
(partitionReplicaLifecycleManager.hasLocalPartition(new
ZonePartitionId(zoneDescriptor.id(), i))) {
Review Comment:
AFAIK we've discussed possible races here and several options to solve the
problem e.g. by using closure that will atomically check whether we have
partition localyl and populate it tableProcessors. Am I right?
##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java:
##########
@@ -17,31 +17,103 @@
package org.apache.ignite.internal.partition.replicator;
+import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
-import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.raft.service.RaftCommandRunner;
+import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.replicator.ReplicaResult;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
import org.apache.ignite.internal.replicator.listener.ReplicaListener;
import org.apache.ignite.internal.replicator.message.ReplicaRequest;
-import org.apache.ignite.internal.schema.BinaryRowImpl;
+import org.apache.ignite.internal.replicator.message.TableAware;
+import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest;
+import org.apache.ignite.internal.tx.message.TxMessagesFactory;
/**
* Zone partition replica listener.
*/
public class ZonePartitionReplicaListener implements ReplicaListener {
+ private static final TxMessagesFactory TX_MESSAGES_FACTORY = new
TxMessagesFactory();
+
+ private static final IgniteLogger LOG =
Loggers.forClass(ZonePartitionReplicaListener.class);
+
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-22624 await for the
table replica listener if needed.
+ private final Map<TablePartitionId, ReplicaListener> replicas = new
ConcurrentHashMap<>();
+
+ private final RaftGroupService raftClient;
+
+ public ZonePartitionReplicaListener(RaftGroupService raftClient) {
+ this.raftClient = raftClient;
+ }
@Override
public CompletableFuture<ReplicaResult> invoke(ReplicaRequest request,
String senderId) {
- var res = new BinaryRowImpl(
- 1,
- new BinaryTupleBuilder(2).appendLong(1).appendInt(-1).build());
- return CompletableFuture.completedFuture(new ReplicaResult(
- res,
- CompletableFuture.completedFuture(res)));
+ if (!(request instanceof TableAware)) {
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-22620
implement ReplicaSafeTimeSyncRequest processing.
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-22621
implement zone-based transaction storage
+ // and txn messages processing
+ if (request instanceof TxFinishReplicaRequest) {
+ TxFinishReplicaRequest txFinishReplicaRquest =
(TxFinishReplicaRequest) request;
+
+ TxFinishReplicaRequest requestForTableListener =
TX_MESSAGES_FACTORY.txFinishReplicaRequest()
+ .txId(txFinishReplicaRquest.txId())
+
.commitPartitionId(txFinishReplicaRquest.commitPartitionId())
+ .timestamp(txFinishReplicaRquest.timestamp())
+ .groupId(txFinishReplicaRquest.commitPartitionId())
+ .groups(txFinishReplicaRquest.groups())
+ .commit(txFinishReplicaRquest.commit())
+
.commitTimestamp(txFinishReplicaRquest.commitTimestamp())
+
.enlistmentConsistencyToken(txFinishReplicaRquest.enlistmentConsistencyToken())
+ .build();
+
+ return replicas
+
.get(txFinishReplicaRquest.commitPartitionId().asTablePartitionId())
+ .invoke(requestForTableListener, senderId);
+ } else {
+ LOG.debug("Non table request is not supported by the zone
partition yet " + request);
Review Comment:
So, we will see such messages for SafeTimePropagationRequest, right? Any
other? I believe I've already asked this before.
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -635,6 +645,133 @@ public CompletableFuture<Void>
startAsync(ComponentContext componentContext) {
});
}
+ private CompletableFuture<Boolean>
initTableFsmOnTableCreate(CreateTableEventParameters parameters) {
+ if (!PartitionReplicaLifecycleManager.ENABLED) {
+ return completedFuture(false);
+ }
+
+ long causalityToken = parameters.causalityToken();
+ CatalogTableDescriptor tableDescriptor = parameters.tableDescriptor();
+ CatalogZoneDescriptor zoneDescriptor =
getZoneDescriptor(tableDescriptor, parameters.catalogVersion());
+
+ TableImpl table = createTableImpl(causalityToken, tableDescriptor,
zoneDescriptor);
+
+ tablesVv.update(causalityToken, (ignore, e) -> inBusyLock(busyLock, ()
-> {
Review Comment:
I didn't check it carefully. Do you believe that an order of updating
corresponding VV's and addition of tableProcessors to the zone replica is
correctly linearised?
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -635,6 +645,133 @@ public CompletableFuture<Void>
startAsync(ComponentContext componentContext) {
});
}
+ private CompletableFuture<Boolean>
initTableFsmOnTableCreate(CreateTableEventParameters parameters) {
Review Comment:
The most confusing method in the PR, to my opinion.
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -635,6 +645,133 @@ public CompletableFuture<Void>
startAsync(ComponentContext componentContext) {
});
}
+ private CompletableFuture<Boolean>
initTableFsmOnTableCreate(CreateTableEventParameters parameters) {
Review Comment:
Naming seems confusing to me. I'd rather use
populateZonePartitionListenerWithTableProcessor or similar.
##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java:
##########
@@ -17,31 +17,103 @@
package org.apache.ignite.internal.partition.replicator;
+import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
-import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.raft.service.RaftCommandRunner;
+import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.replicator.ReplicaResult;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
import org.apache.ignite.internal.replicator.listener.ReplicaListener;
import org.apache.ignite.internal.replicator.message.ReplicaRequest;
-import org.apache.ignite.internal.schema.BinaryRowImpl;
+import org.apache.ignite.internal.replicator.message.TableAware;
+import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest;
+import org.apache.ignite.internal.tx.message.TxMessagesFactory;
/**
* Zone partition replica listener.
*/
public class ZonePartitionReplicaListener implements ReplicaListener {
+ private static final TxMessagesFactory TX_MESSAGES_FACTORY = new
TxMessagesFactory();
+
+ private static final IgniteLogger LOG =
Loggers.forClass(ZonePartitionReplicaListener.class);
+
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-22624 await for the
table replica listener if needed.
+ private final Map<TablePartitionId, ReplicaListener> replicas = new
ConcurrentHashMap<>();
+
+ private final RaftGroupService raftClient;
+
+ public ZonePartitionReplicaListener(RaftGroupService raftClient) {
+ this.raftClient = raftClient;
+ }
@Override
public CompletableFuture<ReplicaResult> invoke(ReplicaRequest request,
String senderId) {
- var res = new BinaryRowImpl(
- 1,
- new BinaryTupleBuilder(2).appendLong(1).appendInt(-1).build());
- return CompletableFuture.completedFuture(new ReplicaResult(
- res,
- CompletableFuture.completedFuture(res)));
+ if (!(request instanceof TableAware)) {
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-22620
implement ReplicaSafeTimeSyncRequest processing.
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-22621
implement zone-based transaction storage
+ // and txn messages processing
+ if (request instanceof TxFinishReplicaRequest) {
+ TxFinishReplicaRequest txFinishReplicaRquest =
(TxFinishReplicaRequest) request;
Review Comment:
Just in case. type in Rquest.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]