fanhualta commented on a change in pull request #3191:
URL: https://github.com/apache/iotdb/pull/3191#discussion_r644860167
##########
File path:
cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
##########
@@ -314,97 +321,61 @@ public synchronized boolean addNode(Node node,
NodeAdditionResult result) {
if (insertIndex > 0) {
allNodes.add(insertIndex, node);
peerMap.putIfAbsent(node, new Peer(logManager.getLastLogIndex()));
- // remove the last node because the group size is fixed to replication
number
- Node removedNode = allNodes.remove(allNodes.size() - 1);
- peerMap.remove(removedNode);
// if the local node is the last node and the insertion succeeds, this
node should leave
// the group
logger.debug("{}: Node {} is inserted into the data group {}", name,
node, allNodes);
- return removedNode.equals(thisNode);
}
- return false;
+ return insertIndex > 0;
}
}
/**
- * Process the election request from another node in the group. To win the
vote from the local
- * member, a node must have both meta and data logs no older than then local
member, or it will be
- * turned down.
+ * Try to add a Node into the group to which the member belongs.
*
- * @param electionRequest
- * @return Response.RESPONSE_META_LOG_STALE if the meta logs of the elector
fall behind
- * Response.RESPONSE_LOG_MISMATCH if the data logs of the elector fall
behind Response.SUCCESS
- * if the vote is given to the elector the term of local member if the
elector's term is no
- * bigger than the local member
+ * @return true if this node should leave the group because of the addition
of the node, false
+ * otherwise
*/
- @Override
- long checkElectorLogProgress(ElectionRequest electionRequest) {
- // to be a data group leader, a node should also be qualified to be the
meta group leader
- // which guarantees the data group leader has the newest partition table.
- long thatTerm = electionRequest.getTerm();
- long thatMetaLastLogIndex = electionRequest.getLastLogIndex();
- long thatMetaLastLogTerm = electionRequest.getLastLogTerm();
- long thatDataLastLogIndex = electionRequest.getDataLogLastIndex();
- long thatDataLastLogTerm = electionRequest.getDataLogLastTerm();
- logger.info(
- "{} received an dataGroup election request, term:{},
metaLastLogIndex:{}, metaLastLogTerm:{}, dataLastLogIndex:{},
dataLastLogTerm:{}",
- name,
- thatTerm,
- thatMetaLastLogIndex,
- thatMetaLastLogTerm,
- thatDataLastLogIndex,
- thatDataLastLogTerm);
-
- // check meta logs
- // term of the electors' MetaGroupMember is not verified, so 0 and 1 are
used to make sure
- // the verification does not fail
- long metaResponse = metaGroupMember.checkLogProgress(thatMetaLastLogIndex,
thatMetaLastLogTerm);
- if (metaResponse == Response.RESPONSE_LOG_MISMATCH) {
- return Response.RESPONSE_META_LOG_STALE;
- }
-
- long resp = checkLogProgress(thatDataLastLogIndex, thatDataLastLogTerm);
- if (resp == Response.RESPONSE_AGREE) {
- logger.info(
- "{} accepted an dataGroup election request, term:{}/{},
dataLogIndex:{}/{}, dataLogTerm:{}/{}, metaLogIndex:{}/{},metaLogTerm:{}/{}",
- name,
- thatTerm,
- term.get(),
- thatDataLastLogIndex,
- logManager.getLastLogIndex(),
- thatDataLastLogTerm,
- logManager.getLastLogTerm(),
- thatMetaLastLogIndex,
- metaGroupMember.getLogManager().getLastLogIndex(),
- thatMetaLastLogTerm,
- metaGroupMember.getLogManager().getLastLogTerm());
- setCharacter(NodeCharacter.FOLLOWER);
- lastHeartbeatReceivedTime = System.currentTimeMillis();
- setVoteFor(electionRequest.getElector());
- updateHardState(thatTerm, getVoteFor());
- } else {
- logger.info(
- "{} rejected an dataGroup election request, term:{}/{},
dataLogIndex:{}/{}, dataLogTerm:{}/{}, metaLogIndex:{}/{},metaLogTerm:{}/{}",
- name,
- thatTerm,
- term.get(),
- thatDataLastLogIndex,
- logManager.getLastLogIndex(),
- thatDataLastLogTerm,
- logManager.getLastLogTerm(),
- thatMetaLastLogIndex,
- metaGroupMember.getLogManager().getLastLogIndex(),
- thatMetaLastLogTerm,
- metaGroupMember.getLogManager().getLastLogTerm());
+ public boolean addNode(Node node, NodeAdditionResult result) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}: start to add node {}", name, node);
+ }
+
+ // mark slots that do not belong to this group any more
+ Set<Integer> lostSlots =
+ ((SlotNodeAdditionResult) result)
+ .getLostSlots()
+ .getOrDefault(new RaftNode(getHeader(), getRaftGroupId()),
Collections.emptySet());
+ for (Integer lostSlot : lostSlots) {
+ slotManager.setToSending(lostSlot, false);
+ }
+ slotManager.save();
+
+ synchronized (allNodes) {
+ preAddNode(node);
+ if (allNodes.contains(node) && allNodes.size() >
config.getReplicationNum()) {
+ // remove the last node because the group size is fixed to replication
number
+ Node removedNode = allNodes.remove(allNodes.size() - 1);
+ peerMap.remove(removedNode);
+
+ if (removedNode.equals(leader.get()) && !removedNode.equals(thisNode))
{
+ // if the leader is removed, also start an election immediately
+ synchronized (term) {
+ setCharacter(NodeCharacter.ELECTOR);
Review comment:
When the leader of a data group is removed from the cluster, the data
group should elect a new leader. Each node in the data group will be an
elector and try to be the leader. If the log of this node is not new enough to
win an election, it just will not win the election and the node with the newest
log will win. Why data group will still have electionTimeout unavailable time?
##########
File path:
cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/AddNodeLog.java
##########
@@ -32,25 +32,57 @@
/** AddNodeLog records the operation of adding a node into this cluster. */
public class AddNodeLog extends Log {
+ private ByteBuffer partitionTable;
+
private Node newNode;
- public Node getNewNode() {
- return newNode;
+ private long metaLogIndex;
Review comment:
In the raft log of the data group, currLogIndex means the index of the
raft log in the data group, but metaLogIndex means the index of the raft log in
the meta group.
##########
File path:
cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RemoveNodeLog.java
##########
@@ -31,17 +31,49 @@
public class RemoveNodeLog extends Log {
+ private ByteBuffer partitionTable;
+
private Node removedNode;
+ private long metaLogIndex;
Review comment:
The answer is the same as above.
##########
File path:
cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/AddNodeLog.java
##########
@@ -82,11 +120,17 @@ public boolean equals(Object o) {
return false;
}
AddNodeLog that = (AddNodeLog) o;
- return Objects.equals(newNode, that.newNode);
+ return Objects.equals(newNode, that.newNode)
Review comment:
Because sub-class has its own variables, it's necessary to rewrite these
two functions for completeness. All sub-class of class Log have rewritten these
two functions.
##########
File path:
cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RemoveNodeLog.java
##########
@@ -77,11 +115,17 @@ public boolean equals(Object o) {
return false;
}
RemoveNodeLog that = (RemoveNodeLog) o;
- return Objects.equals(removedNode, that.removedNode);
+ return Objects.equals(removedNode, that.removedNode)
Review comment:
The answer is the same as above.
##########
File path:
cluster/src/main/java/org/apache/iotdb/cluster/log/manage/MetaSingleSnapshotLogManager.java
##########
@@ -82,4 +84,20 @@ public Snapshot getSnapshot(long minIndex) {
snapshot.setLastLogTerm(term);
return snapshot;
}
+
+ @Override
+ void applyEntries(List<Log> entries) {
Review comment:
These codes belong to the transitional code. Originally, it was intended
to pass in the information about whether the node is a leader when the metadata
group log was applied. Later, this logic was put in the method during
optimization, so these codes are useless. I have removed them.
##########
File path:
cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/FileSnapshot.java
##########
@@ -195,64 +194,70 @@ public String toString() {
}
@Override
- public void install(FileSnapshot snapshot, int slot) throws
SnapshotInstallationException {
+ public void install(FileSnapshot snapshot, int slot, boolean
isDataMigration)
Review comment:
Yes, but the parameter is necessary. With this parameter, this method is
more complete.
##########
File path:
cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
##########
@@ -677,6 +677,9 @@ public boolean matchTerm(long term, long index) {
*/
void applyEntries(List<Log> entries) {
for (Log entry : entries) {
+ if (entry.isApplied()) {
Review comment:
I have added these comments:
For add/remove logs in data groups, this log will be applied immediately
when it is appended to the raft log. In this case, it will apply a log that has
been applied.
##########
File path:
cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java
##########
@@ -55,22 +56,26 @@
* @param timestamp
* @return
*/
- Node routeToHeaderByTime(String storageGroupName, long timestamp);
+ RaftNode routeToHeaderByTime(String storageGroupName, long timestamp);
/**
* Add a new node to update the partition table.
*
* @param node
* @return the new group generated by the node
Review comment:
fixed
##########
File path:
cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java
##########
@@ -55,22 +56,26 @@
* @param timestamp
* @return
*/
- Node routeToHeaderByTime(String storageGroupName, long timestamp);
+ RaftNode routeToHeaderByTime(String storageGroupName, long timestamp);
/**
* Add a new node to update the partition table.
*
* @param node
* @return the new group generated by the node
*/
- NodeAdditionResult addNode(Node node);
+ void addNode(Node node);
+
+ NodeAdditionResult getNodeAdditionResult(Node node);
/**
* Remove a node and update the partition table.
*
* @param node
*/
- NodeRemovalResult removeNode(Node node);
+ void removeNode(Node node);
+
+ NodeRemovalResult getNodeRemovalResult();
Review comment:
fixed
##########
File path:
cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotManager.java
##########
@@ -73,6 +92,10 @@ public void waitSlot(int slotId) {
logger.error("Unexpected interruption when waiting for slot {}",
slotId, e);
}
} else {
+ long cost = System.currentTimeMillis() - startTime;
+ if (cost > 1000) {
Review comment:
fixed
##########
File path:
cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotManager.java
##########
@@ -86,16 +109,15 @@ public void waitSlot(int slotId) {
*/
public void waitSlotForWrite(int slotId) throws StorageEngineException {
SlotDescriptor slotDescriptor = idSlotMap.get(slotId);
+ long startTime = System.currentTimeMillis();
while (true) {
synchronized (slotDescriptor) {
- if (slotDescriptor.slotStatus == SlotStatus.SENDING
- || slotDescriptor.slotStatus == SlotStatus.SENT) {
- throw new StorageEngineException(
- String.format("Slot %d no longer belongs to the node", slotId));
- }
- if (slotDescriptor.slotStatus != SlotStatus.NULL
- && slotDescriptor.slotStatus != SlotStatus.PULLING_WRITABLE) {
+ if (slotDescriptor.slotStatus == SlotStatus.PULLING) {
try {
+ if ((System.currentTimeMillis() - startTime) >= 5000) {
Review comment:
fixed
##########
File path:
cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java
##########
@@ -55,22 +56,26 @@
* @param timestamp
* @return
*/
- Node routeToHeaderByTime(String storageGroupName, long timestamp);
+ RaftNode routeToHeaderByTime(String storageGroupName, long timestamp);
/**
* Add a new node to update the partition table.
*
* @param node
* @return the new group generated by the node
*/
- NodeAdditionResult addNode(Node node);
+ void addNode(Node node);
+
+ NodeAdditionResult getNodeAdditionResult(Node node);
Review comment:
fixed
##########
File path:
cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotManager.java
##########
@@ -73,6 +92,10 @@ public void waitSlot(int slotId) {
logger.error("Unexpected interruption when waiting for slot {}",
slotId, e);
}
} else {
+ long cost = System.currentTimeMillis() - startTime;
+ if (cost > 1000) {
Review comment:
fixed
##########
File path:
cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotManager.java
##########
@@ -86,16 +109,15 @@ public void waitSlot(int slotId) {
*/
public void waitSlotForWrite(int slotId) throws StorageEngineException {
SlotDescriptor slotDescriptor = idSlotMap.get(slotId);
+ long startTime = System.currentTimeMillis();
while (true) {
synchronized (slotDescriptor) {
- if (slotDescriptor.slotStatus == SlotStatus.SENDING
- || slotDescriptor.slotStatus == SlotStatus.SENT) {
- throw new StorageEngineException(
- String.format("Slot %d no longer belongs to the node", slotId));
- }
- if (slotDescriptor.slotStatus != SlotStatus.NULL
- && slotDescriptor.slotStatus != SlotStatus.PULLING_WRITABLE) {
+ if (slotDescriptor.slotStatus == SlotStatus.PULLING) {
try {
+ if ((System.currentTimeMillis() - startTime) >= 5000) {
Review comment:
fixed
##########
File path:
cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
##########
@@ -1023,30 +1157,85 @@ public IReaderByTimestamp getReaderByTimestamp(
TSDataType dataType,
QueryContext context,
DataGroupMember dataGroupMember,
- boolean ascending)
+ boolean ascending,
+ Set<Integer> requiredSlots)
throws StorageEngineException, QueryProcessException {
try {
dataGroupMember.syncLeaderWithConsistencyCheck(false);
} catch (CheckConsistencyException e) {
throw new StorageEngineException(e);
}
- SeriesReader seriesReader =
- getSeriesReader(
- path,
- allSensors,
- dataType,
- TimeFilter.gtEq(Long.MIN_VALUE),
- null,
- context,
- dataGroupMember.getHeader(),
- ascending);
+
+ // find the groups that should be queried due to data migration.
+ // when a slot is in the status of PULLING or PULLING_WRITABLE, the read
of it should merge
+ // result to guarantee integrity.
+ Map<PartitionGroup, Set<Integer>> holderSlotMap =
dataGroupMember.getPreviousHolderSlotMap();
try {
- if (seriesReader.isEmpty()) {
- return null;
+ // If requiredSlots is not null, it means that this data group is the
previous holder of
+ // required slots, which is no need to merge other resource,
+ if (requiredSlots == null && !holderSlotMap.isEmpty()) {
+ // merge remote reader and local reader
+ ManagedMergeReader mergeReader = new ManagedMergeReader(dataType);
+
+ // add local reader
+ IPointReader seriesPointReader =
+ getSeriesPointReader(
+ path,
+ allSensors,
+ dataType,
+ TimeFilter.gtEq(Long.MIN_VALUE),
+ null,
+ context,
+ dataGroupMember,
+ ascending,
+ requiredSlots);
Review comment:
Yes. I have specified it as null.
##########
File path:
cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
##########
@@ -1023,30 +1157,85 @@ public IReaderByTimestamp getReaderByTimestamp(
TSDataType dataType,
QueryContext context,
DataGroupMember dataGroupMember,
- boolean ascending)
+ boolean ascending,
+ Set<Integer> requiredSlots)
throws StorageEngineException, QueryProcessException {
try {
dataGroupMember.syncLeaderWithConsistencyCheck(false);
} catch (CheckConsistencyException e) {
throw new StorageEngineException(e);
}
- SeriesReader seriesReader =
- getSeriesReader(
- path,
- allSensors,
- dataType,
- TimeFilter.gtEq(Long.MIN_VALUE),
- null,
- context,
- dataGroupMember.getHeader(),
- ascending);
+
+ // find the groups that should be queried due to data migration.
+ // when a slot is in the status of PULLING or PULLING_WRITABLE, the read
of it should merge
+ // result to guarantee integrity.
+ Map<PartitionGroup, Set<Integer>> holderSlotMap =
dataGroupMember.getPreviousHolderSlotMap();
try {
- if (seriesReader.isEmpty()) {
- return null;
+ // If requiredSlots is not null, it means that this data group is the
previous holder of
+ // required slots, which is no need to merge other resource,
+ if (requiredSlots == null && !holderSlotMap.isEmpty()) {
+ // merge remote reader and local reader
+ ManagedMergeReader mergeReader = new ManagedMergeReader(dataType);
+
+ // add local reader
+ IPointReader seriesPointReader =
+ getSeriesPointReader(
+ path,
+ allSensors,
+ dataType,
+ TimeFilter.gtEq(Long.MIN_VALUE),
+ null,
+ context,
+ dataGroupMember,
+ ascending,
+ requiredSlots);
Review comment:
Yes, I have specified it as null.
##########
File path:
cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
##########
@@ -507,11 +541,16 @@ private SeriesReader getSeriesReader(
Filter valueFilter,
QueryContext context,
Node header,
- boolean ascending)
+ int raftId,
+ boolean ascending,
+ Set<Integer> requiredSlots)
throws StorageEngineException, QueryProcessException {
ClusterQueryUtils.checkPathExistence(path);
- List<Integer> nodeSlots =
- ((SlotPartitionTable)
metaGroupMember.getPartitionTable()).getNodeSlots(header);
+ if (requiredSlots == null) {
Review comment:
I have added comments.
`why don't judge this in other functions such as getMultSeriesReader?`
Because all other readers will call this function eventually.
##########
File path:
cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
##########
@@ -337,7 +362,8 @@ private AbstractMultPointReader getMultSeriesReader(
deviceMeasurements,
partitionGroup,
context,
- ascending);
+ ascending,
+ requiredSlots);
Review comment:
I have checked the logic and remove useless parameters.
##########
File path:
cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
##########
@@ -507,11 +541,16 @@ private SeriesReader getSeriesReader(
Filter valueFilter,
QueryContext context,
Node header,
- boolean ascending)
+ int raftId,
+ boolean ascending,
+ Set<Integer> requiredSlots)
throws StorageEngineException, QueryProcessException {
ClusterQueryUtils.checkPathExistence(path);
- List<Integer> nodeSlots =
- ((SlotPartitionTable)
metaGroupMember.getPartitionTable()).getNodeSlots(header);
+ if (requiredSlots == null) {
Review comment:
I have added comments.
> why don't judge this in other functions such as getMultSeriesReader?
Because all other readers will call this function eventually.
##########
File path:
cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
##########
@@ -442,9 +445,11 @@ private TSStatus forwardToMultipleGroup(Map<PhysicalPlan,
PartitionGroup> planGr
}
TSStatus status;
if (errorCodePartitionGroups.isEmpty()) {
- status = StatusUtils.OK;
if (allRedirect) {
- status = StatusUtils.getStatus(status, endPoint);
+ status = new TSStatus(TSStatusCode.NEED_REDIRECTION.getStatusCode());
Review comment:
StatusUtils doesn't have a suitable static method. Although I can use
`StatusUtils.getStatus(new
TSStatus(TSStatusCode.NEED_REDIRECTION.getStatusCode()), endPoint)`, it will
call one more deepClone(). I prefer not to do this only for save one line.
##########
File path:
cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
##########
@@ -165,14 +165,14 @@ public static Long querySingleSeries(
}
public static List<String> getNodeList(
Review comment:
Because these two methods use `SingleSeriesQueryRequest`, I have added
`raftId` into this request. So it's unnecessary to add another parameter in
methods.
##########
File path:
cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
##########
@@ -890,4 +988,57 @@ public boolean isUnchanged() {
public void setUnchanged(boolean unchanged) {
this.unchanged = unchanged;
}
+
+ public void setAndSaveLastAppliedPartitionTableVersion(long version) {
+ lastAppliedPartitionTableVersion.setVersion(version);
+ lastAppliedPartitionTableVersion.save();
+ }
+
+ private class LastAppliedPatitionTableVersion {
+
+ private static final String VERSION_FILE_NAME =
"LAST_PARTITION_TABLE_VERSION";
+
+ private long version = -1;
+
+ private String filePath;
+
+ public LastAppliedPatitionTableVersion(String memberDir) {
+ this.filePath = memberDir + File.separator + VERSION_FILE_NAME;
+ load();
+ }
+
+ private void load() {
+ File versionFile = new File(filePath);
+ if (!versionFile.exists()) {
+ return;
+ }
+ try (FileInputStream fileInputStream = new FileInputStream(filePath);
+ DataInputStream dataInputStream = new
DataInputStream(fileInputStream)) {
+ version = dataInputStream.readLong();
+ } catch (Exception e) {
+ logger.warn("Cannot deserialize last partition table version from {}",
filePath, e);
+ }
+ }
+
+ public synchronized void save() {
Review comment:
Maybe unnecessary, I implemented it in imitation of other places, like
`SlotManger`.
##########
File path:
cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
##########
@@ -435,32 +440,42 @@ public static TSStatus executeNonQuery(
}
public static ByteBuffer readFile(
- AsyncDataClient client, String remotePath, long offset, int fetchSize)
+ AsyncDataClient client, String remotePath, long offset, int fetchSize,
int raftId)
Review comment:
I have removed raft from the following methods:
- readFile
- removeHardLink
Because these two methods are unassociated with raft group.
##########
File path:
cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
##########
@@ -106,13 +106,13 @@ public static Long removeNode(AsyncMetaClient
asyncMetaClient, Node nodeToRemove
}
public static Boolean matchTerm(
- AsyncClient client, Node target, long prevLogIndex, long prevLogTerm,
Node header)
Review comment:
I have replaced all `Node header,int raftId` with `RaftNode raftNode`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]