jt2594838 commented on code in PR #9667:
URL: https://github.com/apache/iotdb/pull/9667#discussion_r1201401750


##########
server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java:
##########
@@ -218,159 +265,59 @@ public void syncFlushMemTable() throws 
ExecutionException, InterruptedException
         System.currentTimeMillis() - start);
   }
 
-  /** encoding task (second task of pipeline) */
-  private Runnable encodingTask =
-      new Runnable() {
-
-        @SuppressWarnings("squid:S135")
-        @Override
-        public void run() {
-          LOGGER.debug(
-              "Database {} memtable flushing to file {} starts to encoding 
data.",
-              storageGroup,
-              writer.getFile().getName());
-          while (true) {
-
-            Object task;
-            try {
-              task = encodingTaskQueue.take();
-            } catch (InterruptedException e1) {
-              LOGGER.error("Take task into ioTaskQueue Interrupted");
-              Thread.currentThread().interrupt();
-              break;
-            }
-            if (task instanceof StartFlushGroupIOTask || task instanceof 
EndChunkGroupIoTask) {
-              try {
-                ioTaskQueue.put(task);
-              } catch (
-                  @SuppressWarnings("squid:S2142")
-                  InterruptedException e) {
-                LOGGER.error(
-                    "Database {} memtable flushing to file {}, encoding task 
is interrupted.",
-                    storageGroup,
-                    writer.getFile().getName(),
-                    e);
-                // generally it is because the thread pool is shutdown so the 
task should be aborted
-                break;
-              }
-            } else if (task instanceof TaskEnd) {
-              break;
-            } else {
-              long starTime = System.currentTimeMillis();
-              IWritableMemChunk writableMemChunk = (IWritableMemChunk) task;
-              IChunkWriter seriesWriter = 
writableMemChunk.createIChunkWriter();
-              writableMemChunk.encode(seriesWriter);
-              seriesWriter.sealCurrentPage();
-              seriesWriter.clearPageWriter();
-              try {
-                ioTaskQueue.put(seriesWriter);
-              } catch (InterruptedException e) {
-                LOGGER.error("Put task into ioTaskQueue Interrupted");
-                Thread.currentThread().interrupt();
-              }
-              long subTaskTime = System.currentTimeMillis() - starTime;
-              
WRITING_METRICS.recordFlushSubTaskCost(WritingMetrics.ENCODING_TASK, 
subTaskTime);
-              memSerializeTime += subTaskTime;
-            }
-          }
-          try {
-            ioTaskQueue.put(new TaskEnd());
-          } catch (InterruptedException e) {
-            LOGGER.error("Put task into ioTaskQueue Interrupted");
-            Thread.currentThread().interrupt();
-          }
-
-          if (!storageGroup.startsWith(IoTDBMetricsUtils.DATABASE)) {
-            int lastIndex = storageGroup.lastIndexOf("-");
-            if (lastIndex == -1) {
-              lastIndex = storageGroup.length();
-            }
-            MetricService.getInstance()
-                .gaugeWithInternalReportAsync(
-                    memTable.getTotalPointsNum(),
-                    Metric.POINTS.toString(),
-                    MetricLevel.CORE,
-                    Tag.DATABASE.toString(),
-                    storageGroup.substring(0, lastIndex),
-                    Tag.TYPE.toString(),
-                    "flush",
-                    Tag.REGION.toString(),
-                    dataRegionId);
-          }
-
-          LOGGER.info(
-              "Database {}, flushing memtable {} into disk: Encoding data cost 
" + "{} ms.",
-              storageGroup,
-              writer.getFile().getName(),
-              memSerializeTime);
-          WRITING_METRICS.recordFlushCost(WritingMetrics.FLUSH_STAGE_ENCODING, 
memSerializeTime);
-        }
-      };
-
-  /** io task (third task of pipeline) */
-  @SuppressWarnings("squid:S135")
-  private Runnable ioTask =
-      () -> {
-        LOGGER.debug(
-            "Database {} memtable flushing to file {} start io.",
-            storageGroup,
-            writer.getFile().getName());
-        while (true) {
-          Object ioMessage = null;
-          try {
-            ioMessage = ioTaskQueue.take();
-          } catch (InterruptedException e1) {
-            LOGGER.error("take task from ioTaskQueue Interrupted");
-            Thread.currentThread().interrupt();
-            break;
-          }
-          long starTime = System.currentTimeMillis();
-          try {
-            if (ioMessage instanceof StartFlushGroupIOTask) {
-              this.writer.startChunkGroup(((StartFlushGroupIOTask) 
ioMessage).deviceId);
-            } else if (ioMessage instanceof TaskEnd) {
-              break;
-            } else if (ioMessage instanceof EndChunkGroupIoTask) {
-              this.writer.setMinPlanIndex(memTable.getMinPlanIndex());
-              this.writer.setMaxPlanIndex(memTable.getMaxPlanIndex());
-              this.writer.endChunkGroup();
-            } else {
-              ((IChunkWriter) ioMessage).writeToFileWriter(this.writer);
-            }
-          } catch (IOException e) {
-            LOGGER.error(
-                "Database {} memtable {}, io task meets error.", storageGroup, 
memTable, e);
-            throw new FlushRunTimeException(e);
-          }
-          long subTaskTime = System.currentTimeMillis() - starTime;
-          ioTime += subTaskTime;
-          WRITING_METRICS.recordFlushSubTaskCost(WritingMetrics.IO_TASK, 
subTaskTime);
-        }
-        LOGGER.debug(
-            "flushing a memtable to file {} in database {}, io cost {}ms",
-            writer.getFile().getName(),
-            storageGroup,
-            ioTime);
-        WRITING_METRICS.recordFlushTsFileSize(storageGroup, 
writer.getFile().length());
-        WRITING_METRICS.recordFlushCost(WritingMetrics.FLUSH_STAGE_IO, ioTime);
-      };
-
-  static class TaskEnd {
+  protected void metricFlush() {
+    if (!storageGroup.startsWith(IoTDBMetricsUtils.DATABASE)) {
+      int lastIndex = storageGroup.lastIndexOf("-");
+      if (lastIndex == -1) {
+        lastIndex = storageGroup.length();
+      }
+      MetricService.getInstance()
+          .gaugeWithInternalReportAsync(
+              memTable.getTotalPointsNum(),
+              Metric.POINTS.toString(),
+              MetricLevel.CORE,
+              Tag.DATABASE.toString(),
+              storageGroup.substring(0, lastIndex),
+              Tag.TYPE.toString(),
+              "flush",
+              Tag.REGION.toString(),
+              dataRegionId);
+    }
+  }
 
-    TaskEnd() {}
+  private DynamicThread newSortThread() {
+    return new TaskRunner(
+        sortTasks, this::cleanSortThread, sortTaskQueue, encodingTaskQueue, 
taskName + "-sort");
   }
 
-  static class EndChunkGroupIoTask {
+  private DynamicThread newEncodingThread() {
+    return new TaskRunner(
+        encodingTasks,
+        this::cleanEncodingThread,
+        encodingTaskQueue,
+        ioTaskQueue,
+        taskName + "-encode");
+  }
 
-    EndChunkGroupIoTask() {}
+  private DynamicThread newIOThread() {
+    return new TaskRunner(null, this::cleanIOThread, ioTaskQueue, ioTaskQueue, 
taskName + "-io");
   }
 
-  static class StartFlushGroupIOTask {
+  private void cleanSortThread() {
+    metricFlush();
+    WRITING_METRICS.recordFlushCost(
+        WritingMetrics.FLUSH_STAGE_SORT, allContext.getSortTime().get());
+  }
 
-    private final String deviceId;
+  private void cleanEncodingThread() {
+    metricFlush();
+    WRITING_METRICS.recordFlushCost(
+        WritingMetrics.FLUSH_STAGE_ENCODING, 
allContext.getEncodingTime().get());
+  }
 
-    StartFlushGroupIOTask(String deviceId) {
-      this.deviceId = deviceId;
-    }
+  private void cleanIOThread() {
+    metricFlush();
+    WRITING_METRICS.recordFlushCost(WritingMetrics.FLUSH_STAGE_IO, 
allContext.getIoTime().get());
+    WRITING_METRICS.recordFlushTsFileSize(storageGroup, 
allContext.getWriter().getFile().length());
   }

Review Comment:
   Now I only retain the one in the IOTask.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to