ibessonov commented on code in PR #1235:
URL: https://github.com/apache/ignite-3/pull/1235#discussion_r1008127881
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -765,14 +791,23 @@ private CompletableFuture<Object>
processMultiEntryAction(ReadWriteMultiRowRepli
RowId lockedRowId = deleteLockFuts[futNum++].join();
if (lockedRowId != null) {
- rowIdsToDelete.add(lockedRowId);
+ rowIdsToDelete.put(new
UUID(lockedRowId.mostSignificantBits(), lockedRowId.leastSignificantBits()),
null);
Review Comment:
I also believe that RowId has `uuid` method for your convenience
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java:
##########
@@ -205,15 +215,17 @@ private void handleUpdateCommand(UpdateCommand cmd, long
commandIndex) {
private void handleUpdateAllCommand(UpdateAllCommand cmd, long
commandIndex) {
storage.runConsistently(() -> {
UUID txId = cmd.txId();
- Map<RowId, BinaryRow> rowsToUpdate = cmd.getRowsToUpdate();
- UUID commitTblId = cmd.getReplicationGroupId().getTableId();
- int commitPartId = cmd.getReplicationGroupId().getPartId();
+ Map<UUID, ByteString> rowsToUpdate = cmd.rowsToUpdate();
+ UUID commitTblId = cmd.tablePartitionId().tableId();
+ int commitPartId = cmd.tablePartitionId().partitionId();
if (!CollectionUtils.nullOrEmpty(rowsToUpdate)) {
- for (Map.Entry<RowId, BinaryRow> entry :
rowsToUpdate.entrySet()) {
- RowId rowId = entry.getKey();
- BinaryRow row = entry.getValue();
-
+ for (Map.Entry<UUID, ByteString> entry :
rowsToUpdate.entrySet()) {
+ UUID rowIdUuid = entry.getKey();
+ RowId rowId = new RowId(partitionId,
rowIdUuid.getMostSignificantBits(), rowIdUuid.getLeastSignificantBits());
Review Comment:
I think there is a public (int, UUID) constructor at this point
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -612,12 +620,22 @@ private CompletableFuture<Object>
finishTransaction(List<ReplicationGroupId> agg
HybridTimestamp commitTimestamp = hybridClock.now();
CompletableFuture<Object> changeStateFuture = raftClient.run(
- new FinishTxCommand(
- txId,
- commit,
- commitTimestamp,
- aggregatedGroupIds
- )
+ msgFactory.finishTxCommand()
+ .txId(txId)
+ .commit(commit)
+ .commitTimestamp(
+ msgFactory.hybridTimestampMessage()
Review Comment:
I think you have a `hybridTimestamp(...)` method in this class that'll make
this code shorter
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -979,9 +1064,17 @@ private CompletableFuture<Object>
processSingleEntryAction(ReadWriteSingleRowRep
return lockFut.thenCompose(lockedRowId -> {
boolean inserted = lockedRowId == null;
- CompletableFuture raftFut =
+ CompletableFuture<Object> raftFut =
lockedRowId == null ?
applyCmdWithExceptionHandling(
- new UpdateCommand(commitPartitionId, new
RowId(partId), searchRow, txId)) :
+ msgFactory.updateCommand()
+
.tablePartitionId(msgFactory.tablePartitionIdMessage()
+
.tableId(commitPartitionId.getTableId())
+
.partitionId(commitPartitionId.getPartId())
+ .build())
+
.rowUuid(Timestamp.nextVersion().toUuid())
Review Comment:
I think we should extract `Timestamp.nextVersion().toUuid()` into a method,
like `generateNewRowId()` or something....
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -979,9 +1064,17 @@ private CompletableFuture<Object>
processSingleEntryAction(ReadWriteSingleRowRep
return lockFut.thenCompose(lockedRowId -> {
boolean inserted = lockedRowId == null;
- CompletableFuture raftFut =
+ CompletableFuture<Object> raftFut =
lockedRowId == null ?
applyCmdWithExceptionHandling(
- new UpdateCommand(commitPartitionId, new
RowId(partId), searchRow, txId)) :
+ msgFactory.updateCommand()
Review Comment:
Talking about methods extractions, should we create private methods for all
these builds? Could it simplify the code?
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -1488,4 +1632,30 @@ private CompletableFuture<BinaryRow>
resolveWriteIntentAsync(
}
});
}
+
+ /**
+ * Compounds a RAFT group unique name.
+ *
+ * @param tblId Table identifier.
+ * @param partition Number of table partitions.
+ * @return A RAFT group name.
+ */
+ @NotNull
Review Comment:
We don't use NotNull
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -770,16 +795,28 @@ private CompletableFuture<Object>
processMultiEntryAction(ReadWriteMultiRowRepli
if (lockedRow != null) {
result.add(row);
} else {
- if (rowsToInsert.values().stream().noneMatch(row0
-> row0.keySlice().equals(row.keySlice()))) {
- rowsToInsert.put(new RowId(partId), row);
+ ByteBuffer keyToCheck = row.keySlice();
+ if (!uniqueKeys.contains(keyToCheck)) {
Review Comment:
Ok, I think we can simplify it even further with writing `if
(uniqueKeys.add(keyToCheck)) {`
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java:
##########
@@ -262,8 +274,11 @@ private void handleFinishTxCommand(FinishTxCommand cmd,
long commandIndex) throw
TxMeta txMetaToSet = new TxMeta(
stateToSet,
- cmd.replicationGroupIds(),
- cmd.commitTimestamp()
+ cmd.tablePartitionIds()
+ .stream()
+ .map(tpIdMsg -> (ReplicationGroupId)
tpIdMsg.asTablePartitionId())
Review Comment:
Can this be done with method reference?
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -1014,10 +1117,22 @@ private CompletableFuture<Object>
processSingleEntryAction(ReadWriteSingleRowRep
BinaryRow result = rowId != null
?
resolveReadResult(mvDataStorage.read(rowId, HybridTimestamp.MAX_VALUE), txId) :
null;
- CompletableFuture raftFut = rowId
!= null ? applyCmdWithExceptionHandling(
- new
UpdateCommand(commitPartitionId, rowId, searchRow, txId))
- :
applyCmdWithExceptionHandling(
- new
UpdateCommand(commitPartitionId, new RowId(partId), searchRow, txId));
+ RowId locRowId = (rowId != null) ?
rowId : new RowId(partId);
+
+ CompletableFuture<Object> raftFut =
Review Comment:
This padding is legendary. I hate it. It's not your fault though
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java:
##########
@@ -158,11 +167,12 @@ public void
onWrite(Iterator<CommandClosure<WriteCommand>> iterator) {
*/
private void handleUpdateCommand(UpdateCommand cmd, long commandIndex) {
storage.runConsistently(() -> {
- BinaryRow row = cmd.getRow();
- RowId rowId = cmd.getRowId();
+ BinaryRow row = cmd.rowBuffer() != null ? new
ByteBufferRow(cmd.rowBuffer().toByteArray()) : null;
+ UUID rowUuid = cmd.rowUuid();
+ RowId rowId = new RowId(partitionId,
rowUuid.getMostSignificantBits(), rowUuid.getLeastSignificantBits());
UUID txId = cmd.txId();
- UUID commitTblId = cmd.getCommitReplicationGroupId().getTableId();
- int commitPartId = cmd.getCommitReplicationGroupId().getPartId();
+ UUID commitTblId = cmd.tablePartitionId().tableId();
Review Comment:
Why did you decided to rename it? I would leave the original name
(commitReplicationGroupId)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]