MyXOF commented on a change in pull request #142: Add Concurrent Manager in 
Cluster
URL: https://github.com/apache/incubator-iotdb/pull/142#discussion_r275261866
 
 

 ##########
 File path: 
cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/DataStateMachine.java
 ##########
 @@ -82,59 +76,77 @@ public DataStateMachine(String groupId, PeerId peerId) {
   @Override
   public void onApply(Iterator iterator) {
     while (iterator.hasNext()) {
-
-      Closure closure = null;
-      DataGroupNonQueryRequest request = null;
-      BasicResponse response = null;
+      final Closure closure = iterator.done();
       final ByteBuffer data = iterator.getData();
+
+//      /** It's leader to apply task **/
+//      if (closure != null) {
+//        RaftTaskManager.getInstance().execute(() -> applySingleTask(closure, 
data));
+//      } else {
+        applySingleTask(closure, data);
+//      }
+      iterator.next();
+    }
+  }
+
+  /**
+   * Apply a single raft task. If this node is the leader of state machine's 
data group, apply the
+   * task in thread.
+   *
+   * @param closure if this node is leader, closure is not null.
+   * @param data Request data
+   */
+  private void applySingleTask(Closure closure, ByteBuffer data) {
+    BasicResponse response = (closure == null) ? null : ((ResponseClosure) 
closure).getResponse();
+    DataGroupNonQueryRequest request = null;
+    try {
+      request = SerializerManager.getSerializer(SerializerManager.Hessian2)
+          .deserialize(data.array(), DataGroupNonQueryRequest.class.getName());
+    } catch (final CodecException e) {
+      LOGGER.error("Fail to decode IncrementAndGetRequest", e);
+    }
+
+    Status status = Status.OK();
+    assert request != null;
+
+    List<byte[]> planBytes = request.getPhysicalPlanBytes();
+
+    LOGGER.debug(String.format("State machine batch size(): %d", 
planBytes.size()));
+
+    /** handle batch plans(planBytes.size() > 0) or single 
plan(planBytes.size()==1) **/
+    for (byte[] planByte : planBytes) {
       try {
-        request = SerializerManager.getSerializer(SerializerManager.Hessian2)
-            .deserialize(data.array(), 
DataGroupNonQueryRequest.class.getName());
-      } catch (final CodecException e) {
-        LOGGER.error("Fail to decode IncrementAndGetRequest", e);
-      }
-      if (iterator.done() != null) {
-        /** It's leader to apply task **/
-        closure = iterator.done();
-        response = ((ResponseClosure) closure).getResponse();
-      }
-      Status status = Status.OK();
-      assert request != null;
-
-      List<byte[]> planBytes = request.getPhysicalPlanBytes();
-
-      LOGGER.debug(String.format("State machine batch size(): %d", 
planBytes.size()));
-
-      /** handle batch plans(planBytes.size() > 0) or single 
plan(planBytes.size()==1) **/
-      for (byte[] planByte : planBytes) {
-        try {
-          PhysicalPlan plan = PhysicalPlanLogTransfer.logToOperator(planByte);
-
-          LOGGER.debug(String.format("OperatorType :%s", 
plan.getOperatorType()));
-          /** If the request is to set path and sg of the path doesn't exist, 
it needs to run null-read in meta group to avoid out of data sync **/
-          if (plan.getOperatorType() == OperatorType.CREATE_TIMESERIES && 
!checkPathExistence(
-              ((MetadataPlan) plan).getPath().getFullPath())) {
-            RaftUtils.handleNullReadToMetaGroup(status);
-            if(!status.isOk()){
-              continue;
-            }
-          }
-          qpExecutor.processNonQuery(plan);
-          if (closure != null) {
-            response.addResult(true);
-          }
-        } catch (ProcessorException | IOException | PathErrorException e) {
-          LOGGER.error("Execute physical plan error", e);
-          status = new Status(-1, e.getMessage());
-          if (closure != null) {
-            response.addResult(false);
+        PhysicalPlan plan = PhysicalPlanLogTransfer.logToOperator(planByte);
+
+        LOGGER.debug(String.format("OperatorType :%s", 
plan.getOperatorType()));
 
 Review comment:
   LOGGER.debug("OperatorType :%s", plan.getOperatorType());

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to