denis-chudov commented on code in PR #2867:
URL: https://github.com/apache/ignite-3/pull/2867#discussion_r1407810685
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -629,201 +630,222 @@ private CompletableFuture<?>
createTablePartitionsLocally(
CompletableFuture<?>[] futures = new
CompletableFuture<?>[partitions];
- // TODO: https://issues.apache.org/jira/browse/IGNITE-19713
Process assignments and set partitions only for assigned partitions.
- PartitionSet parts = new BitSetPartitionSet();
+ PartitionSet partitionSet = new BitSetPartitionSet();
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-19713
Process assignments and set partitions only for assigned partitions.
for (int i = 0; i < futures.length; i++) {
futures[i] = new CompletableFuture<>();
- parts.set(i);
+ partitionSet.set(i);
}
- String localMemberName = localNode().name();
-
for (int i = 0; i < partitions; i++) {
int partId = i;
Set<Assignment> newPartAssignment = newAssignments.get(partId);
- InternalTable internalTbl = table.internalTable();
+ // TODO https://issues.apache.org/jira/browse/IGNITE-19170
#handleChangePendingAssignmentEvent should be called on
+ // TODO actual event, the method #createTablePartitionsLocally
should be removed.
+ handleChangePendingAssignmentEvent(
+ new TablePartitionId(tableId, partId),
+ table,
+ assignmentEntry(new TablePartitionId(tableId,
partId), newPartAssignment, causalityToken),
+ assignmentEntry(new TablePartitionId(tableId,
partId), Set.of(), causalityToken),
+ completedFuture(Map.of(tableId, partitionSet)),
+ false
+ )
+ .whenComplete((res, ex) -> {
+ if (ex != null) {
+ LOG.warn("Unable to update raft groups on the
node [tableId={}, partitionId={}]", ex, tableId, partId);
- Assignment localMemberAssignment = newPartAssignment.stream()
- .filter(a -> a.consistentId().equals(localMemberName))
- .findAny()
- .orElse(null);
+ futures[partId].completeExceptionally(ex);
+ } else {
+ futures[partId].complete(null);
+ }
+ });
+ }
- PeersAndLearners newConfiguration =
configurationFromAssignments(newPartAssignment);
+ return allOf(futures);
+ });
+ }
- TablePartitionId replicaGrpId = new TablePartitionId(tableId,
partId);
+ private Entry assignmentEntry(TablePartitionId tblPartId, Set<Assignment>
assignments, long revision) {
+ return new Entry() {
+ @Override
+ public byte[] key() {
+ return pendingPartAssignmentsKey(tblPartId).bytes();
+ }
- transactionStateResolver.updateAssignment(replicaGrpId,
newConfiguration.peers().stream().map(Peer::consistentId)
- .collect(toList()));
+ @Override
+ public byte @Nullable [] value() {
+ return ByteUtils.toBytes(assignments);
+ }
- var safeTimeTracker = new
PendingComparableValuesTracker<HybridTimestamp, Void>(
- new HybridTimestamp(1, 0)
- );
- var storageIndexTracker = new
PendingComparableValuesTracker<Long, Void>(0L);
+ @Override
+ public long revision() {
+ return 1;
Review Comment:
Fixed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]