HTHou commented on a change in pull request #2358:
URL: https://github.com/apache/iotdb/pull/2358#discussion_r559968403



##########
File path: 
server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
##########
@@ -43,18 +46,23 @@
   private static final Logger LOGGER = 
LoggerFactory.getLogger(MemTableFlushTask.class);
   private static final FlushSubTaskPoolManager SUB_TASK_POOL_MANAGER = 
FlushSubTaskPoolManager
       .getInstance();
+  private static IoTDBConfig config = 
IoTDBDescriptor.getInstance().getConfig();
   private final Future<?> encodingTaskFuture;
   private final Future<?> ioTaskFuture;
   private RestorableTsFileIOWriter writer;
 
-  private final ConcurrentLinkedQueue<Object> ioTaskQueue = new 
ConcurrentLinkedQueue<>();
-  private final ConcurrentLinkedQueue<Object> encodingTaskQueue = new 
ConcurrentLinkedQueue<>();
+  private final LinkedBlockingQueue<Object> encodingTaskQueue = new 
LinkedBlockingQueue<>();
+  private final LinkedBlockingQueue<Object> ioTaskQueue = 
(config.isEnableMemControl()
+      && SystemInfo.getInstance().isEncodingFasterThanIo())
+          ? new LinkedBlockingQueue<>(config.getIoTaskQueueSizeForFlushing())
+          : new LinkedBlockingQueue<>();
+
   private String storageGroup;
 
   private IMemTable memTable;
 
-  private volatile boolean noMoreEncodingTask = false;
-  private volatile boolean noMoreIOTask = false;
+  private long memSerializeTime = 0L;
+  private long ioTime = 0L;

Review comment:
       Fixed~

##########
File path: 
server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
##########
@@ -170,94 +189,104 @@ private void writeOneSeries(TVList tvPairs, IChunkWriter 
seriesWriterImpl,
     @Override
     public void run() {
       long memSerializeTime = 0;
-      boolean noMoreMessages = false;
       LOGGER.debug("Storage group {} memtable flushing to file {} starts to 
encoding data.",
-              storageGroup, writer.getFile().getName());
+          storageGroup, writer.getFile().getName());
       while (true) {
-        if (noMoreEncodingTask) {
-          noMoreMessages = true;
+
+        Object task = null;
+        try {
+          task = encodingTaskQueue.take();
+        } catch (InterruptedException e1) {
+          LOGGER.error("Take task into ioTaskQueue Interrupted");
+          Thread.currentThread().interrupt();
+          break;
         }
-        Object task = encodingTaskQueue.poll();
-        if (task == null) {
-          if (noMoreMessages) {
-            break;
-          }
+        if (task instanceof StartFlushGroupIOTask || task instanceof 
EndChunkGroupIoTask) {
           try {
-            TimeUnit.MILLISECONDS.sleep(10);
+            ioTaskQueue.put(task);
           } catch (@SuppressWarnings("squid:S2142") InterruptedException e) {
             LOGGER.error("Storage group {} memtable flushing to file {}, 
encoding task is interrupted.",
                 storageGroup, writer.getFile().getName(), e);
             // generally it is because the thread pool is shutdown so the task 
should be aborted
             break;
           }
+        } else if (task instanceof TaskEnd) {
+          break;
         } else {
-          if (task instanceof StartFlushGroupIOTask || task instanceof 
EndChunkGroupIoTask) {
-            ioTaskQueue.add(task);
-          } else {
-            long starTime = System.currentTimeMillis();
-            Pair<TVList, MeasurementSchema> encodingMessage = (Pair<TVList, 
MeasurementSchema>) task;
-            IChunkWriter seriesWriter = new 
ChunkWriterImpl(encodingMessage.right);
-            writeOneSeries(encodingMessage.left, seriesWriter, 
encodingMessage.right.getType());
-            ioTaskQueue.add(seriesWriter);
-            memSerializeTime += System.currentTimeMillis() - starTime;
+          long starTime = System.currentTimeMillis();
+          Pair<TVList, MeasurementSchema> encodingMessage = (Pair<TVList, 
MeasurementSchema>) task;
+          IChunkWriter seriesWriter = new 
ChunkWriterImpl(encodingMessage.right);
+          writeOneSeries(encodingMessage.left, seriesWriter, 
encodingMessage.right.getType());
+          seriesWriter.sealCurrentPage();
+          seriesWriter.clearPageWriter();
+          try {
+            ioTaskQueue.put(seriesWriter);
+          } catch (InterruptedException e) {
+            LOGGER.error("Put task into ioTaskQueue Interrupted");
+            Thread.currentThread().interrupt();
           }
+          memSerializeTime += System.currentTimeMillis() - starTime;
         }
       }
-      noMoreIOTask = true;
-      LOGGER.debug("Storage group {}, flushing memtable into file {}: Encoding 
data cost "
-              + "{} ms.",
+      try {
+        ioTaskQueue.put(new TaskEnd());
+      } catch (InterruptedException e) {
+        LOGGER.error("Put task into ioTaskQueue Interrupted");
+        Thread.currentThread().interrupt();
+      }
+      
+      LOGGER.debug("Storage group {}, flushing memtable {} into disk: Encoding 
data cost "
+          + "{} ms.",
           storageGroup, writer.getFile().getName(), memSerializeTime);
     }
   };
 
   @SuppressWarnings("squid:S135")
   private Runnable ioTask = () -> {
     long ioTime = 0;

Review comment:
       Fixed~




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to