a1vin-tian commented on a change in pull request #7153:
URL: https://github.com/apache/skywalking/pull/7153#discussion_r657688386



##########
File path: 
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java
##########
@@ -92,73 +105,165 @@ public void start(ModuleManager moduleManager, 
CoreModuleConfig moduleConfig) {
         }
     }
 
-    private void extractDataAndSave(IBatchDAO batchDAO) {
+    @VisibleForTesting
+    void extractDataAndSave(IBatchDAO batchDAO) {
+
         if (log.isDebugEnabled()) {
             log.debug("Extract data and save");
         }
 
         long startTime = System.currentTimeMillis();
+        HistogramMetrics.Timer allTimer = allLatency.createTimer();
+        // Use `stop` as a control signal to make fail-fast in the persistence 
process.
+        AtomicBoolean stop = new AtomicBoolean(false);
 
+        BlockingBatchQueue<PrepareRequest> prepareQueue = new 
BlockingBatchQueue(this.maxSyncoperationNum);
         try {
-            HistogramMetrics.Timer timer = prepareLatency.createTimer();
+            List<PersistenceWorker<? extends StorageData>> persistenceWorkers 
= new ArrayList<>();
+            
persistenceWorkers.addAll(TopNStreamProcessor.getInstance().getPersistentWorkers());
+            
persistenceWorkers.addAll(MetricsStreamProcessor.getInstance().getPersistentWorkers());
 
-            try {
-                List<PersistenceWorker> persistenceWorkers = new ArrayList<>();
-                
persistenceWorkers.addAll(TopNStreamProcessor.getInstance().getPersistentWorkers());
-                
persistenceWorkers.addAll(MetricsStreamProcessor.getInstance().getPersistentWorkers());
+            // CountDownLatch makes sure all prepare threads done eventually.
+            CountDownLatch prepareStageCountDownLatch = new 
CountDownLatch(persistenceWorkers.size());
 
-                persistenceWorkers.forEach(worker -> {
-                    if (log.isDebugEnabled()) {
-                        log.debug("extract {} worker data and save", 
worker.getClass().getName());
-                    }
+            /*
+                Here we use `this.prepareRequests` as a FIFO queue, for a 
producer-consumer model.
+                The prepareExecutorService is for making the executable 
requests ready, and batchExecutorService consumes from the queue to flush.
+                When the number of metrics produced reaches 
maxSyncoperationNum or the prepare stage is done,
+                the data would flush into the storage.
 
-                    worker.buildBatchRequests(prepareRequests);
+                When the consumer ends or an exception occurs in the middle, 
the entire process is completed.
+             */
 
-                    worker.endOfRound(System.currentTimeMillis() - lastTime);
-                });
+            persistenceWorkers.forEach(worker -> {
+                prepareExecutorService.submit(() -> {
+                    if (stop.get()) {
+                        prepareStageCountDownLatch.countDown();
+                        return;
+                    }
 
-                if (debug) {
-                    log.info("build batch persistence duration: {} ms", 
System.currentTimeMillis() - startTime);
-                }
-            } finally {
-                timer.finish();
-            }
+                    HistogramMetrics.Timer timer = 
prepareLatency.createTimer();
+                    try {
+                        if (log.isDebugEnabled()) {
+                            log.debug("extract {} worker data and save", 
worker.getClass().getName());
+                        }
+                        List<PrepareRequest> innerPrepareRequests = new 
ArrayList<>(5000);
+                        worker.buildBatchRequests(innerPrepareRequests);
+                        prepareQueue.putMany(innerPrepareRequests);
+                        worker.endOfRound(System.currentTimeMillis() - 
lastTime);
+                    } finally {
+                        timer.finish();
+                        prepareStageCountDownLatch.countDown();
+                    }
+                });
+            });
 
-            HistogramMetrics.Timer executeLatencyTimer = 
executeLatency.createTimer();
-            try {
-                List<List<PrepareRequest>> partitions = 
Lists.partition(prepareRequests, maxSyncoperationNum);
-                CountDownLatch countDownLatch = new 
CountDownLatch(partitions.size());
-                for (final List<PrepareRequest> partition : partitions) {
-                    executorService.submit(() -> {
+            Future<?> batchFuture = batchExecutorService.submit(() -> {
+                List<Future<?>> results = new ArrayList<>();
+                // consume the metrics
+                while (!stop.get()) {
+                    List<PrepareRequest> partition = prepareQueue.popMany();
+                    if (partition.isEmpty()) {
+                        break;
+                    }
+                    Future<?> submit = executorService.submit(() -> {
+                        HistogramMetrics.Timer executeLatencyTimer = 
executeLatency.createTimer();
                         try {
                             if (CollectionUtils.isNotEmpty(partition)) {
                                 batchDAO.synchronous(partition);
                             }
                         } catch (Throwable e) {
                             log.error(e.getMessage(), e);
                         } finally {
-                            countDownLatch.countDown();
+                            executeLatencyTimer.finish();
                         }
+
                     });
+                    results.add(submit);
                 }
-                countDownLatch.await();
-            } finally {
-                executeLatencyTimer.finish();
-            }
+
+                if (!stop.get()) {
+                    for (Future<?> result : results) {
+                        result.get();
+                    }
+                }
+                return null;
+            });
+
+            // Wait for prepare stage is done.
+            prepareStageCountDownLatch.await();
+            prepareQueue.disableNeedFillFully();
+            // Wait for batch stage is done.
+            batchFuture.get();
+
         } catch (Throwable e) {
             errorCounter.inc();
             log.error(e.getMessage(), e);
         } finally {
+
             if (log.isDebugEnabled()) {
                 log.debug("Persistence data save finish");
             }
 
-            prepareRequests.clear();
+            stop.set(true);
+            allTimer.finish();
             lastTime = System.currentTimeMillis();
         }
 
         if (debug) {
             log.info("Batch persistence duration: {} ms", 
System.currentTimeMillis() - startTime);
         }
     }
+
+    private static class BlockingBatchQueue<E> {
+
+        private int maxBatchSize;
+        private boolean inAppendingMode = true;
+
+        public BlockingBatchQueue(int maxBatchSize) {
+            this.maxBatchSize = maxBatchSize;
+        }
+
+        private final List<E> elementData = new ArrayList<>(50000);
+
+        public void putMany(List<E> elements) {
+            synchronized (elementData) {
+                elementData.addAll(elements);
+                if (elementData.size() >= maxBatchSize) {
+                    elementData.notify();
+                }
+            }
+        }

Review comment:
       The `private final List<PrepareRequest> prepareRequests = new 
ArrayList<>(50000);` did not check queue full before.
   I think that there shouldn't be a queue full check. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to