OneSizeFitsQuorum commented on code in PR #10896:
URL: https://github.com/apache/iotdb/pull/10896#discussion_r1299786040


##########
iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java:
##########
@@ -243,117 +243,121 @@ private RaftClientReply 
writeLocallyWithRetry(RaftClientRequest request) throws
     return writeWithRetry(() -> server.submitClientRequest(request));
   }
 
-  private RaftClientReply writeRemotelyWithRetry(RatisClient client, Message 
message)
-      throws IOException {
+  private RaftClientReply writeRemotelyWithRetry(
+      org.apache.iotdb.consensus.ratis.RatisClient client, Message message) 
throws IOException {
     return writeWithRetry(() -> client.getRaftClient().io().send(message));
   }
 
   /**
-   * write will first send request to local server use method call if local 
server is not leader, it
-   * will use RaftClient to send RPC to read leader
+   * write will first send request to local server using local method call. If 
local server is not
+   * leader, it will use RaftClient to send RPC to read leader
    */
   @Override
-  public ConsensusWriteResponse write(
-      ConsensusGroupId consensusGroupId, IConsensusRequest IConsensusRequest) {
+  public TSStatus write(ConsensusGroupId groupId, IConsensusRequest request)
+      throws ConsensusException {
     // pre-condition: group exists and myself server serves this group
-    RaftGroupId raftGroupId = 
Utils.fromConsensusGroupIdToRaftGroupId(consensusGroupId);
+    RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(groupId);
     RaftGroup raftGroup = getGroupInfo(raftGroupId);
     if (raftGroup == null || !raftGroup.getPeers().contains(myself)) {
-      return failedWrite(new 
ConsensusGroupNotExistException(consensusGroupId));
+      throw new ConsensusGroupNotExistException(groupId);
     }
 
     // current Peer is group leader and in ReadOnly State
-    if (isLeader(consensusGroupId) && Utils.rejectWrite()) {
+    if (isLeader(groupId) && Utils.rejectWrite()) {
       try {
         forceStepDownLeader(raftGroup);
       } catch (Exception e) {
         logger.warn("leader {} read only, force step down failed due to {}", 
myself, e);
       }
-      return failedWrite(new NodeReadOnlyException(myself));
+      throw new NodeReadOnlyException(myself);
     }
 
     // serialize request into Message
-    Message message = new RequestMessage(IConsensusRequest);
+    Message message = new RequestMessage(request);
 
     // 1. first try the local server
     RaftClientRequest clientRequest =
         buildRawRequest(raftGroupId, message, 
RaftClientRequest.writeRequestType());
 
-    RaftClientReply localServerReply;
     RaftPeer suggestedLeader = null;
-    if (isLeader(consensusGroupId) && waitUntilLeaderReady(raftGroupId)) {
+    if (isLeader(groupId) && waitUntilLeaderReady(raftGroupId)) {
       try (AutoCloseable ignored =
           
RatisMetricsManager.getInstance().startWriteLocallyTimer(consensusGroupType)) {
-        localServerReply = writeLocallyWithRetry(clientRequest);
+        RaftClientReply localServerReply = 
writeLocallyWithRetry(clientRequest);
         if (localServerReply.isSuccess()) {
-          ResponseMessage responseMessage = (ResponseMessage) 
localServerReply.getMessage();
-          TSStatus writeStatus = (TSStatus) responseMessage.getContentHolder();
-          return 
ConsensusWriteResponse.newBuilder().setStatus(writeStatus).build();
+          org.apache.iotdb.consensus.ratis.ResponseMessage responseMessage =
+              (org.apache.iotdb.consensus.ratis.ResponseMessage) 
localServerReply.getMessage();
+          return Optional.ofNullable(responseMessage.getContentHolder())
+              .map(TSStatus.class::cast)
+              .orElse(null);
         }
         NotLeaderException ex = localServerReply.getNotLeaderException();
-        if (ex != null) { // local server is not leader
+        if (ex != null) {
           suggestedLeader = ex.getSuggestedLeader();
         }
       } catch (Exception e) {
-        return failedWrite(new RatisRequestFailedException(e));
+        throw new RatisRequestFailedException(e);
       }
     }
 
     // 2. try raft client
     TSStatus writeResult;
     try (AutoCloseable ignored =
             
RatisMetricsManager.getInstance().startWriteRemotelyTimer(consensusGroupType);
-        RatisClient client = getRaftClient(raftGroup)) {
+        org.apache.iotdb.consensus.ratis.RatisClient client = 
getRaftClient(raftGroup)) {
       RaftClientReply reply = writeRemotelyWithRetry(client, message);
       if (!reply.isSuccess()) {
-        return failedWrite(new 
RatisRequestFailedException(reply.getException()));
+        throw new RatisRequestFailedException(reply.getException());
       }
       writeResult = 
Utils.deserializeFrom(reply.getMessage().getContent().asReadOnlyByteBuffer());
     } catch (Exception e) {
-      return failedWrite(new RatisRequestFailedException(e));
+      throw new RatisRequestFailedException(e);
     }
 
     if (suggestedLeader != null) {
       TEndPoint leaderEndPoint = 
Utils.fromRaftPeerAddressToTEndPoint(suggestedLeader.getAddress());
       writeResult.setRedirectNode(new TEndPoint(leaderEndPoint.getIp(), 
leaderEndPoint.getPort()));
     }
-    return ConsensusWriteResponse.newBuilder().setStatus(writeResult).build();
+    return writeResult;
   }
 
-  /** Read directly from LOCAL COPY notice: May read stale data (not 
linearizable) */
+  /** Read directly from LOCAL COPY notice. */
   @Override
-  public ConsensusReadResponse read(
-      ConsensusGroupId consensusGroupId, IConsensusRequest IConsensusRequest) {
-    RaftGroupId groupId = 
Utils.fromConsensusGroupIdToRaftGroupId(consensusGroupId);
-    RaftGroup group = getGroupInfo(groupId);
+  public DataSet read(ConsensusGroupId groupId, IConsensusRequest request)
+      throws ConsensusException {
+    RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(groupId);
+    RaftGroup group = getGroupInfo(raftGroupId);
     if (group == null || !group.getPeers().contains(myself)) {
-      return failedRead(new ConsensusGroupNotExistException(consensusGroupId));
+      throw new ConsensusGroupNotExistException(groupId);
     }
 
     final boolean isLinearizableRead =
-        !canServeStaleRead.computeIfAbsent(consensusGroupId, id -> new 
AtomicBoolean(false)).get();
+        !canServeStaleRead.computeIfAbsent(groupId, id -> new 
AtomicBoolean(false)).get();
 
     RaftClientReply reply;
     try {
-      reply = doRead(groupId, IConsensusRequest, isLinearizableRead);
+      reply = doRead(raftGroupId, request, isLinearizableRead);
       // allow stale read if current linearizable read returns successfully
       if (isLinearizableRead) {
-        canServeStaleRead.get(consensusGroupId).set(true);
+        canServeStaleRead.get(groupId).set(true);
       }
-    } catch (Exception e) {
+    } catch (ReadException | ReadIndexException e) {
       if (isLinearizableRead) {
         // linearizable read failed. the RaftServer is recovering from Raft 
Log and cannot serve
         // read requests.
-        return failedRead(new RatisUnderRecoveryException(e));
+        throw new RatisUnderRecoveryException(e);
       } else {
-        return failedRead(new RatisRequestFailedException(e));
+        throw new RatisRequestFailedException(e);
       }
+    } catch (Exception e) {
+      throw new RatisRequestFailedException(e);
     }
-
     Message ret = reply.getMessage();
-    ResponseMessage readResponseMessage = (ResponseMessage) ret;
-    DataSet dataSet = (DataSet) readResponseMessage.getContentHolder();
-    return ConsensusReadResponse.newBuilder().setDataSet(dataSet).build();
+    org.apache.iotdb.consensus.ratis.ResponseMessage readResponseMessage =
+        (org.apache.iotdb.consensus.ratis.ResponseMessage) ret;
+    return Optional.ofNullable(readResponseMessage.getContentHolder())
+        .map(DataSet.class::cast)
+        .orElse(null);

Review Comment:
   A null value here indicates a bug in the code, and avoiding an NPE isn't 
going to make much sense at this point, so I'll reverse the change



##########
iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java:
##########
@@ -243,117 +243,121 @@ private RaftClientReply 
writeLocallyWithRetry(RaftClientRequest request) throws
     return writeWithRetry(() -> server.submitClientRequest(request));
   }
 
-  private RaftClientReply writeRemotelyWithRetry(RatisClient client, Message 
message)
-      throws IOException {
+  private RaftClientReply writeRemotelyWithRetry(
+      org.apache.iotdb.consensus.ratis.RatisClient client, Message message) 
throws IOException {
     return writeWithRetry(() -> client.getRaftClient().io().send(message));
   }
 
   /**
-   * write will first send request to local server use method call if local 
server is not leader, it
-   * will use RaftClient to send RPC to read leader
+   * write will first send request to local server using local method call. If 
local server is not
+   * leader, it will use RaftClient to send RPC to read leader
    */
   @Override
-  public ConsensusWriteResponse write(
-      ConsensusGroupId consensusGroupId, IConsensusRequest IConsensusRequest) {
+  public TSStatus write(ConsensusGroupId groupId, IConsensusRequest request)
+      throws ConsensusException {
     // pre-condition: group exists and myself server serves this group
-    RaftGroupId raftGroupId = 
Utils.fromConsensusGroupIdToRaftGroupId(consensusGroupId);
+    RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(groupId);
     RaftGroup raftGroup = getGroupInfo(raftGroupId);
     if (raftGroup == null || !raftGroup.getPeers().contains(myself)) {
-      return failedWrite(new 
ConsensusGroupNotExistException(consensusGroupId));
+      throw new ConsensusGroupNotExistException(groupId);
     }
 
     // current Peer is group leader and in ReadOnly State
-    if (isLeader(consensusGroupId) && Utils.rejectWrite()) {
+    if (isLeader(groupId) && Utils.rejectWrite()) {
       try {
         forceStepDownLeader(raftGroup);
       } catch (Exception e) {
         logger.warn("leader {} read only, force step down failed due to {}", 
myself, e);
       }
-      return failedWrite(new NodeReadOnlyException(myself));
+      throw new NodeReadOnlyException(myself);
     }
 
     // serialize request into Message
-    Message message = new RequestMessage(IConsensusRequest);
+    Message message = new RequestMessage(request);
 
     // 1. first try the local server
     RaftClientRequest clientRequest =
         buildRawRequest(raftGroupId, message, 
RaftClientRequest.writeRequestType());
 
-    RaftClientReply localServerReply;
     RaftPeer suggestedLeader = null;
-    if (isLeader(consensusGroupId) && waitUntilLeaderReady(raftGroupId)) {
+    if (isLeader(groupId) && waitUntilLeaderReady(raftGroupId)) {
       try (AutoCloseable ignored =
           
RatisMetricsManager.getInstance().startWriteLocallyTimer(consensusGroupType)) {
-        localServerReply = writeLocallyWithRetry(clientRequest);
+        RaftClientReply localServerReply = 
writeLocallyWithRetry(clientRequest);
         if (localServerReply.isSuccess()) {
-          ResponseMessage responseMessage = (ResponseMessage) 
localServerReply.getMessage();
-          TSStatus writeStatus = (TSStatus) responseMessage.getContentHolder();
-          return 
ConsensusWriteResponse.newBuilder().setStatus(writeStatus).build();
+          org.apache.iotdb.consensus.ratis.ResponseMessage responseMessage =
+              (org.apache.iotdb.consensus.ratis.ResponseMessage) 
localServerReply.getMessage();
+          return Optional.ofNullable(responseMessage.getContentHolder())
+              .map(TSStatus.class::cast)
+              .orElse(null);

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to