LebronAl commented on a change in pull request #3191:
URL: https://github.com/apache/iotdb/pull/3191#discussion_r643798830
##########
File path:
cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
##########
@@ -314,97 +321,61 @@ public synchronized boolean addNode(Node node,
NodeAdditionResult result) {
if (insertIndex > 0) {
allNodes.add(insertIndex, node);
peerMap.putIfAbsent(node, new Peer(logManager.getLastLogIndex()));
- // remove the last node because the group size is fixed to replication
number
- Node removedNode = allNodes.remove(allNodes.size() - 1);
- peerMap.remove(removedNode);
// if the local node is the last node and the insertion succeeds, this
node should leave
// the group
logger.debug("{}: Node {} is inserted into the data group {}", name,
node, allNodes);
- return removedNode.equals(thisNode);
}
- return false;
+ return insertIndex > 0;
}
}
/**
- * Process the election request from another node in the group. To win the
vote from the local
- * member, a node must have both meta and data logs no older than then local
member, or it will be
- * turned down.
+ * Try to add a Node into the group to which the member belongs.
*
- * @param electionRequest
- * @return Response.RESPONSE_META_LOG_STALE if the meta logs of the elector
fall behind
- * Response.RESPONSE_LOG_MISMATCH if the data logs of the elector fall
behind Response.SUCCESS
- * if the vote is given to the elector the term of local member if the
elector's term is no
- * bigger than the local member
+ * @return true if this node should leave the group because of the addition
of the node, false
+ * otherwise
*/
- @Override
- long checkElectorLogProgress(ElectionRequest electionRequest) {
- // to be a data group leader, a node should also be qualified to be the
meta group leader
- // which guarantees the data group leader has the newest partition table.
- long thatTerm = electionRequest.getTerm();
- long thatMetaLastLogIndex = electionRequest.getLastLogIndex();
- long thatMetaLastLogTerm = electionRequest.getLastLogTerm();
- long thatDataLastLogIndex = electionRequest.getDataLogLastIndex();
- long thatDataLastLogTerm = electionRequest.getDataLogLastTerm();
- logger.info(
- "{} received an dataGroup election request, term:{},
metaLastLogIndex:{}, metaLastLogTerm:{}, dataLastLogIndex:{},
dataLastLogTerm:{}",
- name,
- thatTerm,
- thatMetaLastLogIndex,
- thatMetaLastLogTerm,
- thatDataLastLogIndex,
- thatDataLastLogTerm);
-
- // check meta logs
- // term of the electors' MetaGroupMember is not verified, so 0 and 1 are
used to make sure
- // the verification does not fail
- long metaResponse = metaGroupMember.checkLogProgress(thatMetaLastLogIndex,
thatMetaLastLogTerm);
- if (metaResponse == Response.RESPONSE_LOG_MISMATCH) {
- return Response.RESPONSE_META_LOG_STALE;
- }
-
- long resp = checkLogProgress(thatDataLastLogIndex, thatDataLastLogTerm);
- if (resp == Response.RESPONSE_AGREE) {
- logger.info(
- "{} accepted an dataGroup election request, term:{}/{},
dataLogIndex:{}/{}, dataLogTerm:{}/{}, metaLogIndex:{}/{},metaLogTerm:{}/{}",
- name,
- thatTerm,
- term.get(),
- thatDataLastLogIndex,
- logManager.getLastLogIndex(),
- thatDataLastLogTerm,
- logManager.getLastLogTerm(),
- thatMetaLastLogIndex,
- metaGroupMember.getLogManager().getLastLogIndex(),
- thatMetaLastLogTerm,
- metaGroupMember.getLogManager().getLastLogTerm());
- setCharacter(NodeCharacter.FOLLOWER);
- lastHeartbeatReceivedTime = System.currentTimeMillis();
- setVoteFor(electionRequest.getElector());
- updateHardState(thatTerm, getVoteFor());
- } else {
- logger.info(
- "{} rejected an dataGroup election request, term:{}/{},
dataLogIndex:{}/{}, dataLogTerm:{}/{}, metaLogIndex:{}/{},metaLogTerm:{}/{}",
- name,
- thatTerm,
- term.get(),
- thatDataLastLogIndex,
- logManager.getLastLogIndex(),
- thatDataLastLogTerm,
- logManager.getLastLogTerm(),
- thatMetaLastLogIndex,
- metaGroupMember.getLogManager().getLastLogIndex(),
- thatMetaLastLogTerm,
- metaGroupMember.getLogManager().getLastLogTerm());
+ public boolean addNode(Node node, NodeAdditionResult result) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}: start to add node {}", name, node);
+ }
+
+ // mark slots that do not belong to this group any more
+ Set<Integer> lostSlots =
+ ((SlotNodeAdditionResult) result)
+ .getLostSlots()
+ .getOrDefault(new RaftNode(getHeader(), getRaftGroupId()),
Collections.emptySet());
+ for (Integer lostSlot : lostSlots) {
+ slotManager.setToSending(lostSlot, false);
+ }
+ slotManager.save();
+
+ synchronized (allNodes) {
+ preAddNode(node);
+ if (allNodes.contains(node) && allNodes.size() >
config.getReplicationNum()) {
+ // remove the last node because the group size is fixed to replication
number
+ Node removedNode = allNodes.remove(allNodes.size() - 1);
+ peerMap.remove(removedNode);
+
+ if (removedNode.equals(leader.get()) && !removedNode.equals(thisNode))
{
+ // if the leader is removed, also start an election immediately
+ synchronized (term) {
+ setCharacter(NodeCharacter.ELECTOR);
Review comment:
It seems that we should sync leader first to make this node catch up and
then try to start an election?
##########
File path:
cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/AddNodeLog.java
##########
@@ -32,25 +32,57 @@
/** AddNodeLog records the operation of adding a node into this cluster. */
public class AddNodeLog extends Log {
+ private ByteBuffer partitionTable;
+
private Node newNode;
- public Node getNewNode() {
- return newNode;
+ private long metaLogIndex;
Review comment:
do we really need this field? It seems that it is always equal to
currLogIndex?
##########
File path:
cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/FileSnapshot.java
##########
@@ -195,64 +194,70 @@ public String toString() {
}
@Override
- public void install(FileSnapshot snapshot, int slot) throws
SnapshotInstallationException {
+ public void install(FileSnapshot snapshot, int slot, boolean
isDataMigration)
Review comment:
It looks like the isDataIntegration parameter is all false in the
callers of this function
##########
File path:
cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotManager.java
##########
@@ -86,16 +109,15 @@ public void waitSlot(int slotId) {
*/
public void waitSlotForWrite(int slotId) throws StorageEngineException {
SlotDescriptor slotDescriptor = idSlotMap.get(slotId);
+ long startTime = System.currentTimeMillis();
while (true) {
synchronized (slotDescriptor) {
- if (slotDescriptor.slotStatus == SlotStatus.SENDING
- || slotDescriptor.slotStatus == SlotStatus.SENT) {
- throw new StorageEngineException(
- String.format("Slot %d no longer belongs to the node", slotId));
- }
- if (slotDescriptor.slotStatus != SlotStatus.NULL
- && slotDescriptor.slotStatus != SlotStatus.PULLING_WRITABLE) {
+ if (slotDescriptor.slotStatus == SlotStatus.PULLING) {
try {
+ if ((System.currentTimeMillis() - startTime) >= 5000) {
Review comment:
same as above
##########
File path:
cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
##########
@@ -1023,30 +1157,85 @@ public IReaderByTimestamp getReaderByTimestamp(
TSDataType dataType,
QueryContext context,
DataGroupMember dataGroupMember,
- boolean ascending)
+ boolean ascending,
+ Set<Integer> requiredSlots)
throws StorageEngineException, QueryProcessException {
try {
dataGroupMember.syncLeaderWithConsistencyCheck(false);
} catch (CheckConsistencyException e) {
throw new StorageEngineException(e);
}
- SeriesReader seriesReader =
- getSeriesReader(
- path,
- allSensors,
- dataType,
- TimeFilter.gtEq(Long.MIN_VALUE),
- null,
- context,
- dataGroupMember.getHeader(),
- ascending);
+
+ // find the groups that should be queried due to data migration.
+ // when a slot is in the status of PULLING or PULLING_WRITABLE, the read
of it should merge
+ // result to guarantee integrity.
+ Map<PartitionGroup, Set<Integer>> holderSlotMap =
dataGroupMember.getPreviousHolderSlotMap();
try {
- if (seriesReader.isEmpty()) {
- return null;
+ // If requiredSlots is not null, it means that this data group is the
previous holder of
+ // required slots, which is no need to merge other resource,
+ if (requiredSlots == null && !holderSlotMap.isEmpty()) {
+ // merge remote reader and local reader
+ ManagedMergeReader mergeReader = new ManagedMergeReader(dataType);
+
+ // add local reader
+ IPointReader seriesPointReader =
+ getSeriesPointReader(
+ path,
+ allSensors,
+ dataType,
+ TimeFilter.gtEq(Long.MIN_VALUE),
+ null,
+ context,
+ dataGroupMember,
+ ascending,
+ requiredSlots);
Review comment:
always null?
##########
File path:
cluster/src/main/java/org/apache/iotdb/cluster/log/manage/MetaSingleSnapshotLogManager.java
##########
@@ -82,4 +84,20 @@ public Snapshot getSnapshot(long minIndex) {
snapshot.setLastLogTerm(term);
return snapshot;
}
+
+ @Override
+ void applyEntries(List<Log> entries) {
Review comment:
What is the difference between this function and its parent class? Is
there a corner case? You'd better add some comments
##########
File path:
cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/AddNodeLog.java
##########
@@ -82,11 +120,17 @@ public boolean equals(Object o) {
return false;
}
AddNodeLog that = (AddNodeLog) o;
- return Objects.equals(newNode, that.newNode);
+ return Objects.equals(newNode, that.newNode)
Review comment:
The base class `Log` use `currLogIndex` and `currLogTerm` for `equals`
and `hashcode`. What's the purpose of rewriting these two functions here?
##########
File path:
cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
##########
@@ -677,6 +677,9 @@ public boolean matchTerm(long term, long index) {
*/
void applyEntries(List<Log> entries) {
for (Log entry : entries) {
+ if (entry.isApplied()) {
Review comment:
Please add some comments
##########
File path:
cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java
##########
@@ -55,22 +56,26 @@
* @param timestamp
* @return
*/
- Node routeToHeaderByTime(String storageGroupName, long timestamp);
+ RaftNode routeToHeaderByTime(String storageGroupName, long timestamp);
/**
* Add a new node to update the partition table.
*
* @param node
* @return the new group generated by the node
*/
- NodeAdditionResult addNode(Node node);
+ void addNode(Node node);
+
+ NodeAdditionResult getNodeAdditionResult(Node node);
Review comment:
add comments
##########
File path:
cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotManager.java
##########
@@ -73,6 +92,10 @@ public void waitSlot(int slotId) {
logger.error("Unexpected interruption when waiting for slot {}",
slotId, e);
}
} else {
+ long cost = System.currentTimeMillis() - startTime;
+ if (cost > 1000) {
Review comment:
do not use magic number, make it a final field
##########
File path:
cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java
##########
@@ -55,22 +56,26 @@
* @param timestamp
* @return
*/
- Node routeToHeaderByTime(String storageGroupName, long timestamp);
+ RaftNode routeToHeaderByTime(String storageGroupName, long timestamp);
/**
* Add a new node to update the partition table.
*
* @param node
* @return the new group generated by the node
*/
- NodeAdditionResult addNode(Node node);
+ void addNode(Node node);
+
+ NodeAdditionResult getNodeAdditionResult(Node node);
/**
* Remove a node and update the partition table.
*
* @param node
*/
- NodeRemovalResult removeNode(Node node);
+ void removeNode(Node node);
+
+ NodeRemovalResult getNodeRemovalResult();
Review comment:
add comments
##########
File path:
cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RemoveNodeLog.java
##########
@@ -77,11 +115,17 @@ public boolean equals(Object o) {
return false;
}
RemoveNodeLog that = (RemoveNodeLog) o;
- return Objects.equals(removedNode, that.removedNode);
+ return Objects.equals(removedNode, that.removedNode)
Review comment:
same as above
##########
File path:
cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RemoveNodeLog.java
##########
@@ -31,17 +31,49 @@
public class RemoveNodeLog extends Log {
+ private ByteBuffer partitionTable;
+
private Node removedNode;
+ private long metaLogIndex;
Review comment:
same as above
##########
File path:
cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
##########
@@ -507,11 +541,16 @@ private SeriesReader getSeriesReader(
Filter valueFilter,
QueryContext context,
Node header,
- boolean ascending)
+ int raftId,
+ boolean ascending,
+ Set<Integer> requiredSlots)
throws StorageEngineException, QueryProcessException {
ClusterQueryUtils.checkPathExistence(path);
- List<Integer> nodeSlots =
- ((SlotPartitionTable)
metaGroupMember.getPartitionTable()).getNodeSlots(header);
+ if (requiredSlots == null) {
Review comment:
Please add some comments for this if statement.
BTW, why don't judge this in other functions such as `getMultSeriesReader`?
##########
File path:
cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
##########
@@ -337,7 +362,8 @@ private AbstractMultPointReader getMultSeriesReader(
deviceMeasurements,
partitionGroup,
context,
- ascending);
+ ascending,
+ requiredSlots);
Review comment:
always NULL?
##########
File path:
cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java
##########
@@ -55,22 +56,26 @@
* @param timestamp
* @return
*/
- Node routeToHeaderByTime(String storageGroupName, long timestamp);
+ RaftNode routeToHeaderByTime(String storageGroupName, long timestamp);
/**
* Add a new node to update the partition table.
*
* @param node
* @return the new group generated by the node
Review comment:
update comments
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]