fanhualta commented on a change in pull request #3191:
URL: https://github.com/apache/iotdb/pull/3191#discussion_r645582343
##########
File path:
cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
##########
@@ -767,57 +781,109 @@ public void removeLocalData(List<Integer> slots) {
SlotPartitionTable.getSlotStrategy()
.calculateSlotByPartitionNum(
storageGroupName, timePartitionId,
ClusterConstant.SLOT_NUM);
+ /**
+ * If this slot is just held by different raft groups in the same
node, it should keep the
+ * data of slot.
+ */
+ if (((SlotPartitionTable) metaGroupMember.getPartitionTable())
+ .judgeHoldSlot(thisNode, slot)) {
+ return false;
+ }
return slotSet.contains(slot);
};
for (PartialPath sg : allStorageGroupNames) {
StorageEngine.getInstance().removePartitions(sg, filter);
}
for (Integer slot : slots) {
- slotManager.setToNull(slot);
+ slotManager.setToNull(slot, false);
}
+ slotManager.save();
if (logger.isInfoEnabled()) {
logger.info(
"{}: data of {} and other {} slots are removed", name, slots.get(0),
slots.size() - 1);
}
}
+ public void preRemoveNode(Node removedNode) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}: start to pre remove node {}", name, removedNode);
+ }
+ synchronized (allNodes) {
+ if (allNodes.contains(removedNode) && allNodes.size() ==
config.getReplicationNum()) {
+ // update the group if the deleted node was in it
+ PartitionGroup newGroup =
metaGroupMember.getPartitionTable().getHeaderGroup(getHeader());
+ if (newGroup == null) {
+ return;
+ }
+ Node newNodeToGroup = newGroup.get(newGroup.size() - 1);
+ allNodes.add(newNodeToGroup);
+ peerMap.putIfAbsent(newNodeToGroup, new
Peer(logManager.getLastLogIndex()));
+ }
+ }
+ }
+
/**
* When a node is removed and IT IS NOT THE HEADER of the group, the member
should take over some
* slots from the removed group, and add a new node to the group the removed
node was in the
* group.
*/
@SuppressWarnings("java:S2445") // the reference of allNodes is unchanged
- public void removeNode(Node removedNode, NodeRemovalResult removalResult) {
+ public void removeNode(Node removedNode) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}: start to remove node {}", name, removedNode);
+ }
+
synchronized (allNodes) {
+ preRemoveNode(removedNode);
if (allNodes.contains(removedNode)) {
// update the group if the deleted node was in it
- allNodes =
metaGroupMember.getPartitionTable().getHeaderGroup(getHeader());
- initPeerMap();
+ allNodes.remove(removedNode);
+ peerMap.remove(removedNode);
if (removedNode.equals(leader.get())) {
// if the leader is removed, also start an election immediately
synchronized (term) {
setCharacter(NodeCharacter.ELECTOR);
- setLastHeartbeatReceivedTime(Long.MIN_VALUE);
+ setLeader(null);
+ }
+ synchronized (getHeartBeatWaitObject()) {
+ getHeartBeatWaitObject().notifyAll();
}
}
}
- List<Integer> slotsToPull =
- ((SlotNodeRemovalResult)
removalResult).getNewSlotOwners().get(getHeader());
- if (slotsToPull != null) {
- // pull the slots that should be taken over
- PullSnapshotTaskDescriptor taskDescriptor =
- new PullSnapshotTaskDescriptor(removalResult.getRemovedGroup(),
slotsToPull, true);
- pullFileSnapshot(taskDescriptor, null);
+ }
+ }
+
+ public void pullSlots(NodeRemovalResult removalResult) {
+ List<Integer> slotsToPull =
+ ((SlotNodeRemovalResult)
removalResult).getNewSlotOwners().get(getHeader());
+ if (slotsToPull != null) {
+ // pull the slots that should be taken over
+ PullSnapshotTaskDescriptor taskDescriptor =
+ new PullSnapshotTaskDescriptor(
+ removalResult.getRemovedGroup(getRaftGroupId()), new
ArrayList<>(slotsToPull), true);
+ pullFileSnapshot(taskDescriptor, null);
+ }
+ }
+
+ /** For data group, it's necessary to apply remove/add log immediately after
append. */
+ @Override
+ protected long appendEntry(long prevLogIndex, long prevLogTerm, long
leaderCommit, Log log) {
+ long resp = super.appendEntry(prevLogIndex, prevLogTerm, leaderCommit,
log);
+ if (resp == Response.RESPONSE_AGREE
+ && (log instanceof AddNodeLog || log instanceof RemoveNodeLog)) {
+ try {
+ commitLog(log);
Review comment:
> Is it possible to apply only this log?
Yes, I have fixed it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]