alievmirza commented on code in PR #1103:
URL: https://github.com/apache/ignite-3/pull/1103#discussion_r982459408
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java:
##########
@@ -33,62 +33,49 @@
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
-import org.apache.ignite.internal.logger.IgniteLogger;
-import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.hlc.HybridClock;
+import org.apache.ignite.internal.replicator.ReplicaService;
+import
org.apache.ignite.internal.replicator.exception.PrimaryReplicaMissException;
+import org.apache.ignite.internal.replicator.message.ReplicaRequest;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryRowEx;
import org.apache.ignite.internal.storage.engine.MvTableStorage;
import org.apache.ignite.internal.table.InternalTable;
-import org.apache.ignite.internal.table.distributed.command.DeleteAllCommand;
-import org.apache.ignite.internal.table.distributed.command.DeleteCommand;
-import
org.apache.ignite.internal.table.distributed.command.DeleteExactAllCommand;
-import org.apache.ignite.internal.table.distributed.command.DeleteExactCommand;
-import org.apache.ignite.internal.table.distributed.command.GetAllCommand;
-import
org.apache.ignite.internal.table.distributed.command.GetAndDeleteCommand;
-import
org.apache.ignite.internal.table.distributed.command.GetAndReplaceCommand;
-import
org.apache.ignite.internal.table.distributed.command.GetAndUpsertCommand;
-import org.apache.ignite.internal.table.distributed.command.GetCommand;
-import org.apache.ignite.internal.table.distributed.command.InsertAllCommand;
-import org.apache.ignite.internal.table.distributed.command.InsertCommand;
-import org.apache.ignite.internal.table.distributed.command.ReplaceCommand;
-import
org.apache.ignite.internal.table.distributed.command.ReplaceIfExistCommand;
-import org.apache.ignite.internal.table.distributed.command.UpsertAllCommand;
-import org.apache.ignite.internal.table.distributed.command.UpsertCommand;
-import
org.apache.ignite.internal.table.distributed.command.response.MultiRowsResponse;
-import
org.apache.ignite.internal.table.distributed.command.response.SingleRowResponse;
-import
org.apache.ignite.internal.table.distributed.command.scan.ScanCloseCommand;
-import
org.apache.ignite.internal.table.distributed.command.scan.ScanInitCommand;
-import
org.apache.ignite.internal.table.distributed.command.scan.ScanRetrieveBatchCommand;
+import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
+import
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteScanRetrieveBatchReplicaRequest;
+import
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteScanRetrieveBatchReplicaRequestBuilder;
+import
org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.TxState;
+import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
+import org.apache.ignite.lang.Function3;
+import org.apache.ignite.lang.Function4;
+import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IgniteStringFormatter;
-import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.lang.IgniteUuidGenerator;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.NetworkAddress;
-import org.apache.ignite.raft.client.Command;
import org.apache.ignite.raft.client.Peer;
import org.apache.ignite.raft.client.service.RaftGroupService;
import org.apache.ignite.tx.TransactionException;
+import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
/**
* Storage of table rows.
*/
public class InternalTableImpl implements InternalTable {
- /** Log. */
- private static final IgniteLogger LOG =
Loggers.forClass(InternalTableImpl.class);
+ /** Cursor id generator. */
+ private static final AtomicLong CURSOR_ID_GENERATOR = new AtomicLong();
- /** IgniteUuid generator. */
- private static final IgniteUuidGenerator UUID_GENERATOR = new
IgniteUuidGenerator(UUID.randomUUID(), 0);
+ /** Number of attempts. */
Review Comment:
attempts of what? please describe it more detailed
##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/LockKey.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx;
+
+import java.util.UUID;
+
+/** Lock key. */
Review Comment:
Javadoc is not descriptive at all, please describe the purpose of this key
##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/Lock.java:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx;
+
+import java.util.UUID;
+
+/** Lock. */
Review Comment:
This is not informative at all. Please describe the purpose of this lock
##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java:
##########
@@ -46,6 +46,7 @@ public interface TxManager extends IgniteComponent {
* @param txId Transaction id.
* @return The state or null if the state is unknown.
*/
+ // TODO sanpwc: remove
Review Comment:
here and below: todo without a ticket
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java:
##########
@@ -215,31 +292,105 @@ private <R, T> CompletableFuture<T> enlistInTx(
}
/**
- * Enlists a single row into a transaction.
+ * Retrieves a batch rows from replication storage.
Review Comment:
```suggestion
* Retrieves a batch of rows from replication storage.
```
##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/LockException.java:
##########
@@ -17,27 +17,35 @@
package org.apache.ignite.internal.tx;
-import org.apache.ignite.lang.IgniteInternalCheckedException;
+import java.util.UUID;
/**
- * This exception is thrown when a lock cannot be acquired due to conflict.
+ * This exception is thrown when a lock cannot be acquired, released or
downgraded.
Review Comment:
could you please explain what is this lock?
##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/LockMode.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx;
+
+/**
+ * Lock mode.
Review Comment:
Javadoc is not descriptive at all, at lease provide link with explanation
and to matrix and etc
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java:
##########
@@ -215,31 +292,105 @@ private <R, T> CompletableFuture<T> enlistInTx(
}
/**
- * Enlists a single row into a transaction.
+ * Retrieves a batch rows from replication storage.
*
- * @param row The row.
- * @param tx The transaction.
- * @param op Command factory.
- * @param trans Transform closure.
- * @param <R> Transform input.
- * @param <T> Transform output.
+ * @param tx Internal transaction.
+ * @param partId Partition number.
+ * @param scanId Scan id.
+ * @param batchSize Size of batch.
+ * @return Batch of retrieved rows.
+ */
+ private CompletableFuture<Collection<BinaryRow>> enlistCursorInTx(
+ @NotNull InternalTransaction tx,
+ int partId,
+ long scanId,
+ int batchSize
+ ) {
+ String partGroupId = partitionMap.get(partId).groupId();
+
+ IgniteBiTuple<ClusterNode, Long> primaryReplicaAndTerm =
tx.enlistedNodeAndTerm(partGroupId);
+
+ CompletableFuture<Collection<BinaryRow>> fut;
+
+ ReadWriteScanRetrieveBatchReplicaRequestBuilder requestBuilder =
tableMessagesFactory.readWriteScanRetrieveBatchReplicaRequest()
+ .groupId(partGroupId)
+ .transactionId(tx.id())
+ .scanId(scanId)
+ .batchSize(batchSize)
+ .timestamp(clock.now());
+
+ if (primaryReplicaAndTerm != null) {
+ ReadWriteScanRetrieveBatchReplicaRequest request =
requestBuilder.term(primaryReplicaAndTerm.get2()).build();
+
+ try {
+ fut = replicaSvc.invoke(primaryReplicaAndTerm.get1(), request);
+ } catch (PrimaryReplicaMissException e) {
+ throw new TransactionException(e);
+ } catch (Throwable e) {
+ throw new TransactionException("Failed to invoke the replica
request.");
+ }
+ } else {
+ fut = enlistWithRetry(tx, partId, term ->
requestBuilder.term(term).build(), ATTEMPTS_TO_ENLIST_PARTITION);
+ }
+
+ return postEnlist(fut, false, tx);
+ }
+
+ /**
+ * Partition enlisting with retrying.
Review Comment:
please describe what will happen after number of retries reach the limit.
##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/TxStateStorage.java:
##########
@@ -119,10 +81,27 @@ public interface TxStateStorage extends AutoCloseable {
Cursor<IgniteBiTuple<UUID, TxMeta>> scan();
/**
- * Removes all data from the storage and frees all resources.
+ * Flushes current state of the data or <i>the state from the nearest
future</i> to the storage. It means that the future can be
+ * completed when persisted index is higher than last applied index at the
moment of the method's call.
*
- * @throws IgniteInternalException with {@link
Transactions#TX_STATE_STORAGE_DESTROY_ERR} error code in case when
- * the operation has failed.
+ * @return Future that's completed when flushing of the data is completed.
+ */
+ CompletableFuture<Void> flush();
+
+ /**
+ * Index of the highest write command applied to the storage. {@code 0} if
index is unknown.
Review Comment:
return is missed here and below
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]