kgusakov commented on code in PR #759:
URL: https://github.com/apache/ignite-3/pull/759#discussion_r861577728
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -1317,152 +1379,153 @@ private RuntimeException convertThrowable(Throwable
th) {
}
/**
- * Sets the nodes as baseline for all tables created by the manager.
+ * Prepare the listener for handling configuration changes in raft group.
*
- * @param nodes New baseline nodes.
- * @throws NodeStoppingException If an implementation stopped before the
method was invoked.
+ * @param tblName Name of the table.
+ * @param partNum Number of partition.
+ * @param partId Partition unique id.
+ * @return prepare listener.
+ *
+ * @see RaftGroupEventsListener
*/
- public void setBaseline(Set<String> nodes) throws NodeStoppingException {
- if (!busyLock.enterBusy()) {
- throw new NodeStoppingException();
- }
- try {
- setBaselineInternal(nodes);
- } finally {
- busyLock.leaveBusy();
- }
+ private RaftGroupEventsListener raftGroupEventsListener(String tblName,
int partNum,
+ String partId) {
+ return new RaftGroupEventsListener() {
+ @Override
+ public void onLeaderElected() {
+ }
+
+ @Override
+ public void onNewPeersConfigurationApplied(List<PeerId> peers) {
+ Map<ByteArray, Entry> keys = metaStorageMgr.getAll(
+ Set.of(partAssignmentsPlannedKey(partId),
partAssignmentsPendingKey(partId))).join();
+
+ Entry plannedEntry =
keys.get(partAssignmentsPlannedKey(partId));
+ Entry pendingEntry =
keys.get(partAssignmentsPendingKey(partId));
+
+ tablesCfg.tables().get(tblName).change(ch -> {
+ List<List<ClusterNode>> assignments =
+ (List<List<ClusterNode>>)
ByteUtils.fromBytes(((ExtendedTableChange) ch).assignments());
+ assignments.set(partNum, ((List<ClusterNode>)
ByteUtils.fromBytes(pendingEntry.value())));
+ ((ExtendedTableChange)
ch).changeAssignments(ByteUtils.toBytes(assignments));
+ });
+
+ if (plannedEntry.value() != null) {
+ if (!metaStorageMgr.invoke(If.iif(
+
revision(partAssignmentsPlannedKey(partId)).eq(plannedEntry.revision()),
+ ops(
+ put(partAssignmentsStableKey(partId),
pendingEntry.value()),
+ put(partAssignmentsPendingKey(partId),
plannedEntry.value()),
+ remove(partAssignmentsPlannedKey(partId)))
+ .yield(true),
+ ops().yield(false))).join().getAsBoolean()) {
+ onNewPeersConfigurationApplied(peers);
+ }
+ } else {
+ if (!metaStorageMgr.invoke(If.iif(
+ notExists(partAssignmentsPlannedKey(partId)),
+ ops(put(partAssignmentsStableKey(partId),
pendingEntry.value()),
+
remove(partAssignmentsPendingKey(partId))).yield(true),
+ ops().yield(false))).join().getAsBoolean()) {
+ onNewPeersConfigurationApplied(peers);
+ }
+ }
+ }
+
+ @Override
+ public void onReconfigurationError(Status status) {}
+ };
}
/**
- * Internal method for setting a baseline.
- *
- * @param nodes Names of baseline nodes.
+ * Register the new meta storage listener for changes in pending
partitions.
*/
- private void setBaselineInternal(Set<String> nodes) {
- if (nodes == null || nodes.isEmpty()) {
- throw new IgniteException("New baseline can't be null or empty");
- }
+ private void registerRebalanceListeners() {
+
metaStorageMgr.registerWatchByPrefix(ByteArray.fromString(PENDING_ASSIGNMENTS_PREFIX),
new WatchListener() {
+ @Override
+ public boolean onUpdate(@NotNull WatchEvent evt) {
+ assert evt.single();
- var currClusterMembers = new HashSet<>(baselineMgr.nodes());
+ if (evt.entryEvent().newEntry().value() == null) {
+ return true;
+ }
- var currClusterMemberNames =
-
currClusterMembers.stream().map(ClusterNode::name).collect(Collectors.toSet());
+ int part =
extractPartitionNumber(evt.entryEvent().newEntry().key());
+ UUID tblId = extractTableId(evt.entryEvent().newEntry().key());
- for (String nodeName : nodes) {
- if (!currClusterMemberNames.contains(nodeName)) {
- throw new IgniteException("Node '" + nodeName + "' not in
current network cluster membership. "
- + " Adding not alive nodes is not supported yet.");
- }
- }
+ TableImpl tbl = tablesByIdVv.latest().get(tblId);
Review Comment:
This call states inside the watch for partition pending assignments.
As far as I understand, according to the nature of ordered metastore events:
- this event can't be triggered before table creation (rebalance can't be
triggered before it)
- this event can't be triggered after table drop (rebalance for partition
can't be triggered for removed table)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]