a1vin-tian commented on a change in pull request #7153:
URL: https://github.com/apache/skywalking/pull/7153#discussion_r658465665



##########
File path: 
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java
##########
@@ -92,73 +105,165 @@ public void start(ModuleManager moduleManager, 
CoreModuleConfig moduleConfig) {
         }
     }
 
-    private void extractDataAndSave(IBatchDAO batchDAO) {
+    @VisibleForTesting
+    void extractDataAndSave(IBatchDAO batchDAO) {
+
         if (log.isDebugEnabled()) {
             log.debug("Extract data and save");
         }
 
         long startTime = System.currentTimeMillis();
+        HistogramMetrics.Timer allTimer = allLatency.createTimer();
+        // Use `stop` as a control signal to make fail-fast in the persistence 
process.
+        AtomicBoolean stop = new AtomicBoolean(false);
 
+        DefaultBlockingBatchQueue<PrepareRequest> prepareQueue = new 
DefaultBlockingBatchQueue(
+            this.maxSyncoperationNum);
         try {
-            HistogramMetrics.Timer timer = prepareLatency.createTimer();
+            List<PersistenceWorker<? extends StorageData>> persistenceWorkers 
= new ArrayList<>();
+            
persistenceWorkers.addAll(TopNStreamProcessor.getInstance().getPersistentWorkers());
+            
persistenceWorkers.addAll(MetricsStreamProcessor.getInstance().getPersistentWorkers());
 
-            try {
-                List<PersistenceWorker> persistenceWorkers = new ArrayList<>();
-                
persistenceWorkers.addAll(TopNStreamProcessor.getInstance().getPersistentWorkers());
-                
persistenceWorkers.addAll(MetricsStreamProcessor.getInstance().getPersistentWorkers());
+            // CountDownLatch makes sure all prepare threads done eventually.
+            CountDownLatch prepareStageCountDownLatch = new 
CountDownLatch(persistenceWorkers.size());
 
-                persistenceWorkers.forEach(worker -> {
-                    if (log.isDebugEnabled()) {
-                        log.debug("extract {} worker data and save", 
worker.getClass().getName());
+            persistenceWorkers.forEach(worker -> {
+                prepareExecutorService.submit(() -> {
+                    if (stop.get()) {
+                        prepareStageCountDownLatch.countDown();
+                        return;
                     }
 
-                    worker.buildBatchRequests(prepareRequests);
-
-                    worker.endOfRound(System.currentTimeMillis() - lastTime);
+                    HistogramMetrics.Timer timer = 
prepareLatency.createTimer();
+                    try {
+                        if (log.isDebugEnabled()) {
+                            log.debug("extract {} worker data and save", 
worker.getClass().getName());
+                        }
+                        List<PrepareRequest> innerPrepareRequests = new 
ArrayList<>(5000);
+                        worker.buildBatchRequests(innerPrepareRequests);
+                        // Push the prepared requests into 
DefaultBlockingBatchQueue,
+                        // the executorService consumes from it when it 
reaches the size of batch.
+                        prepareQueue.offer(innerPrepareRequests);
+                        worker.endOfRound(System.currentTimeMillis() - 
lastTime);
+                    } finally {
+                        timer.finish();
+                        prepareStageCountDownLatch.countDown();
+                    }
                 });
+            });
 
-                if (debug) {
-                    log.info("build batch persistence duration: {} ms", 
System.currentTimeMillis() - startTime);
-                }
-            } finally {
-                timer.finish();
-            }
-
-            HistogramMetrics.Timer executeLatencyTimer = 
executeLatency.createTimer();
-            try {
-                List<List<PrepareRequest>> partitions = 
Lists.partition(prepareRequests, maxSyncoperationNum);
-                CountDownLatch countDownLatch = new 
CountDownLatch(partitions.size());
-                for (final List<PrepareRequest> partition : partitions) {
-                    executorService.submit(() -> {
+            List<Future<?>> batchFutures = new ArrayList<>();
+            for (int i = 0; i < syncOperationThreadsNum; i++) {
+                Future<?> batchFuture = executorService.submit(() -> {
+                    // consume the metrics
+                    while (!stop.get()) {
+                        List<PrepareRequest> partition = prepareQueue.poll();
+                        if (partition.isEmpty()) {
+                            break;
+                        }
+                        HistogramMetrics.Timer executeLatencyTimer = 
executeLatency.createTimer();
                         try {
                             if (CollectionUtils.isNotEmpty(partition)) {
                                 batchDAO.synchronous(partition);
                             }
                         } catch (Throwable e) {
                             log.error(e.getMessage(), e);
                         } finally {
-                            countDownLatch.countDown();
+                            executeLatencyTimer.finish();
                         }
-                    });
-                }
-                countDownLatch.await();
-            } finally {
-                executeLatencyTimer.finish();
+                    }
+                    return null;
+                });
+                batchFutures.add(batchFuture);
+            }
+
+            // Wait for prepare stage is done.
+            prepareStageCountDownLatch.await();
+            prepareQueue.noFurtherAppending();
+            // Wait for batch stage is done.
+            for (Future<?> result : batchFutures) {
+                result.get();
             }
+
         } catch (Throwable e) {
             errorCounter.inc();
             log.error(e.getMessage(), e);
         } finally {
+
             if (log.isDebugEnabled()) {
                 log.debug("Persistence data save finish");
             }
 
-            prepareRequests.clear();
+            stop.set(true);
+            allTimer.finish();
             lastTime = System.currentTimeMillis();
         }
 
         if (debug) {
             log.info("Batch persistence duration: {} ms", 
System.currentTimeMillis() - startTime);
         }
     }
+
+    @RequiredArgsConstructor
+    static class DefaultBlockingBatchQueue<E> implements BlockingBatchQueue<E> 
{
+
+        @Getter
+        private final int maxBatchSize;
+
+        @Getter
+        private boolean inAppendingMode = true;

Review comment:
       This should not add `volatile`. This field is operated in the 
`synchronized `




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to