OneSizeFitsQuorum commented on code in PR #10896:
URL: https://github.com/apache/iotdb/pull/10896#discussion_r1299778576
##########
iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java:
##########
@@ -243,117 +243,121 @@ private RaftClientReply
writeLocallyWithRetry(RaftClientRequest request) throws
return writeWithRetry(() -> server.submitClientRequest(request));
}
- private RaftClientReply writeRemotelyWithRetry(RatisClient client, Message
message)
- throws IOException {
+ private RaftClientReply writeRemotelyWithRetry(
+ org.apache.iotdb.consensus.ratis.RatisClient client, Message message)
throws IOException {
return writeWithRetry(() -> client.getRaftClient().io().send(message));
}
/**
- * write will first send request to local server use method call if local
server is not leader, it
- * will use RaftClient to send RPC to read leader
+ * write will first send request to local server using local method call. If
local server is not
+ * leader, it will use RaftClient to send RPC to read leader
*/
@Override
- public ConsensusWriteResponse write(
- ConsensusGroupId consensusGroupId, IConsensusRequest IConsensusRequest) {
+ public TSStatus write(ConsensusGroupId groupId, IConsensusRequest request)
+ throws ConsensusException {
// pre-condition: group exists and myself server serves this group
- RaftGroupId raftGroupId =
Utils.fromConsensusGroupIdToRaftGroupId(consensusGroupId);
+ RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(groupId);
RaftGroup raftGroup = getGroupInfo(raftGroupId);
if (raftGroup == null || !raftGroup.getPeers().contains(myself)) {
- return failedWrite(new
ConsensusGroupNotExistException(consensusGroupId));
+ throw new ConsensusGroupNotExistException(groupId);
}
// current Peer is group leader and in ReadOnly State
- if (isLeader(consensusGroupId) && Utils.rejectWrite()) {
+ if (isLeader(groupId) && Utils.rejectWrite()) {
try {
forceStepDownLeader(raftGroup);
} catch (Exception e) {
logger.warn("leader {} read only, force step down failed due to {}",
myself, e);
}
- return failedWrite(new NodeReadOnlyException(myself));
+ throw new NodeReadOnlyException(myself);
}
// serialize request into Message
- Message message = new RequestMessage(IConsensusRequest);
+ Message message = new RequestMessage(request);
// 1. first try the local server
RaftClientRequest clientRequest =
buildRawRequest(raftGroupId, message,
RaftClientRequest.writeRequestType());
- RaftClientReply localServerReply;
RaftPeer suggestedLeader = null;
- if (isLeader(consensusGroupId) && waitUntilLeaderReady(raftGroupId)) {
+ if (isLeader(groupId) && waitUntilLeaderReady(raftGroupId)) {
try (AutoCloseable ignored =
RatisMetricsManager.getInstance().startWriteLocallyTimer(consensusGroupType)) {
- localServerReply = writeLocallyWithRetry(clientRequest);
+ RaftClientReply localServerReply =
writeLocallyWithRetry(clientRequest);
if (localServerReply.isSuccess()) {
- ResponseMessage responseMessage = (ResponseMessage)
localServerReply.getMessage();
- TSStatus writeStatus = (TSStatus) responseMessage.getContentHolder();
- return
ConsensusWriteResponse.newBuilder().setStatus(writeStatus).build();
+ org.apache.iotdb.consensus.ratis.ResponseMessage responseMessage =
+ (org.apache.iotdb.consensus.ratis.ResponseMessage)
localServerReply.getMessage();
+ return Optional.ofNullable(responseMessage.getContentHolder())
+ .map(TSStatus.class::cast)
+ .orElse(null);
}
NotLeaderException ex = localServerReply.getNotLeaderException();
- if (ex != null) { // local server is not leader
+ if (ex != null) {
suggestedLeader = ex.getSuggestedLeader();
}
} catch (Exception e) {
- return failedWrite(new RatisRequestFailedException(e));
+ throw new RatisRequestFailedException(e);
}
}
// 2. try raft client
TSStatus writeResult;
try (AutoCloseable ignored =
RatisMetricsManager.getInstance().startWriteRemotelyTimer(consensusGroupType);
- RatisClient client = getRaftClient(raftGroup)) {
+ org.apache.iotdb.consensus.ratis.RatisClient client =
getRaftClient(raftGroup)) {
RaftClientReply reply = writeRemotelyWithRetry(client, message);
if (!reply.isSuccess()) {
- return failedWrite(new
RatisRequestFailedException(reply.getException()));
+ throw new RatisRequestFailedException(reply.getException());
}
writeResult =
Utils.deserializeFrom(reply.getMessage().getContent().asReadOnlyByteBuffer());
} catch (Exception e) {
- return failedWrite(new RatisRequestFailedException(e));
+ throw new RatisRequestFailedException(e);
}
if (suggestedLeader != null) {
TEndPoint leaderEndPoint =
Utils.fromRaftPeerAddressToTEndPoint(suggestedLeader.getAddress());
writeResult.setRedirectNode(new TEndPoint(leaderEndPoint.getIp(),
leaderEndPoint.getPort()));
}
- return ConsensusWriteResponse.newBuilder().setStatus(writeResult).build();
+ return writeResult;
}
- /** Read directly from LOCAL COPY notice: May read stale data (not
linearizable) */
+ /** Read directly from LOCAL COPY notice. */
Review Comment:
Fixed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]