MyXOF commented on a change in pull request #142: Add Concurrent Manager in Cluster URL: https://github.com/apache/incubator-iotdb/pull/142#discussion_r275261866
########## File path: cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/DataStateMachine.java ########## @@ -82,59 +76,77 @@ public DataStateMachine(String groupId, PeerId peerId) { @Override public void onApply(Iterator iterator) { while (iterator.hasNext()) { - - Closure closure = null; - DataGroupNonQueryRequest request = null; - BasicResponse response = null; + final Closure closure = iterator.done(); final ByteBuffer data = iterator.getData(); + +// /** It's leader to apply task **/ +// if (closure != null) { +// RaftTaskManager.getInstance().execute(() -> applySingleTask(closure, data)); +// } else { + applySingleTask(closure, data); +// } + iterator.next(); + } + } + + /** + * Apply a single raft task. If this node is the leader of state machine's data group, apply the + * task in thread. + * + * @param closure if this node is leader, closure is not null. + * @param data Request data + */ + private void applySingleTask(Closure closure, ByteBuffer data) { + BasicResponse response = (closure == null) ? null : ((ResponseClosure) closure).getResponse(); + DataGroupNonQueryRequest request = null; + try { + request = SerializerManager.getSerializer(SerializerManager.Hessian2) + .deserialize(data.array(), DataGroupNonQueryRequest.class.getName()); + } catch (final CodecException e) { + LOGGER.error("Fail to decode IncrementAndGetRequest", e); + } + + Status status = Status.OK(); + assert request != null; + + List<byte[]> planBytes = request.getPhysicalPlanBytes(); + + LOGGER.debug(String.format("State machine batch size(): %d", planBytes.size())); + + /** handle batch plans(planBytes.size() > 0) or single plan(planBytes.size()==1) **/ + for (byte[] planByte : planBytes) { try { - request = SerializerManager.getSerializer(SerializerManager.Hessian2) - .deserialize(data.array(), DataGroupNonQueryRequest.class.getName()); - } catch (final CodecException e) { - LOGGER.error("Fail to decode IncrementAndGetRequest", e); - } - if (iterator.done() != null) { - /** It's leader to apply task **/ - closure = iterator.done(); - response = ((ResponseClosure) closure).getResponse(); - } - Status status = Status.OK(); - assert request != null; - - List<byte[]> planBytes = request.getPhysicalPlanBytes(); - - LOGGER.debug(String.format("State machine batch size(): %d", planBytes.size())); - - /** handle batch plans(planBytes.size() > 0) or single plan(planBytes.size()==1) **/ - for (byte[] planByte : planBytes) { - try { - PhysicalPlan plan = PhysicalPlanLogTransfer.logToOperator(planByte); - - LOGGER.debug(String.format("OperatorType :%s", plan.getOperatorType())); - /** If the request is to set path and sg of the path doesn't exist, it needs to run null-read in meta group to avoid out of data sync **/ - if (plan.getOperatorType() == OperatorType.CREATE_TIMESERIES && !checkPathExistence( - ((MetadataPlan) plan).getPath().getFullPath())) { - RaftUtils.handleNullReadToMetaGroup(status); - if(!status.isOk()){ - continue; - } - } - qpExecutor.processNonQuery(plan); - if (closure != null) { - response.addResult(true); - } - } catch (ProcessorException | IOException | PathErrorException e) { - LOGGER.error("Execute physical plan error", e); - status = new Status(-1, e.getMessage()); - if (closure != null) { - response.addResult(false); + PhysicalPlan plan = PhysicalPlanLogTransfer.logToOperator(planByte); + + LOGGER.debug(String.format("OperatorType :%s", plan.getOperatorType())); Review comment: LOGGER.debug("OperatorType :%s", plan.getOperatorType()); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services