wu-sheng commented on a change in pull request #7153:
URL: https://github.com/apache/skywalking/pull/7153#discussion_r656299453



##########
File path: 
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java
##########
@@ -92,69 +105,111 @@ public void start(ModuleManager moduleManager, 
CoreModuleConfig moduleConfig) {
         }
     }
 
-    private void extractDataAndSave(IBatchDAO batchDAO) {
+    @VisibleForTesting
+    void extractDataAndSave(IBatchDAO batchDAO) {
         if (log.isDebugEnabled()) {
             log.debug("Extract data and save");
         }
 
         long startTime = System.currentTimeMillis();
+        HistogramMetrics.Timer allTimer = allLatency.createTimer();
 
         try {
-            HistogramMetrics.Timer timer = prepareLatency.createTimer();
-
-            try {
-                List<PersistenceWorker> persistenceWorkers = new ArrayList<>();
-                
persistenceWorkers.addAll(TopNStreamProcessor.getInstance().getPersistentWorkers());
-                
persistenceWorkers.addAll(MetricsStreamProcessor.getInstance().getPersistentWorkers());
-
-                persistenceWorkers.forEach(worker -> {
-                    if (log.isDebugEnabled()) {
-                        log.debug("extract {} worker data and save", 
worker.getClass().getName());
+            List<PersistenceWorker<? extends StorageData>> persistenceWorkers 
= new ArrayList<>();
+            
persistenceWorkers.addAll(TopNStreamProcessor.getInstance().getPersistentWorkers());
+            
persistenceWorkers.addAll(MetricsStreamProcessor.getInstance().getPersistentWorkers());
+            CountDownLatch countDownLatch = new 
CountDownLatch(persistenceWorkers.size());
+
+            persistenceWorkers.forEach(worker -> {
+                prepareExecutorService.submit(() -> {
+                    HistogramMetrics.Timer timer = 
prepareLatency.createTimer();
+                    try {
+                        if (log.isDebugEnabled()) {
+                            log.debug("extract {} worker data and save", 
worker.getClass().getName());
+                        }
+                        List<PrepareRequest> innerPrepareRequests = new 
ArrayList<>(5000);
+                        worker.buildBatchRequests(innerPrepareRequests);
+                        synchronized (prepareRequests) {
+                            prepareRequests.addAll(innerPrepareRequests);
+                            if (prepareRequests.size() >= maxSyncoperationNum) 
{
+                                prepareRequests.notify();
+                            }
+                        }
+                        worker.endOfRound(System.currentTimeMillis() - 
lastTime);
+                    } finally {
+                        timer.finish();
+                        countDownLatch.countDown();
                     }
+                });
+            });
+
+            Future<?> batchFuture = batchExecutorService.submit(() -> {
+                List<Future<?>> results = new ArrayList<>();
+                while (true) {
+                    List<PrepareRequest> partition = null;
+                    synchronized (prepareRequests) {
+                        if (prepareDone && 
CollectionUtils.isEmpty(prepareRequests)) {
+                            break;
+                        }
 
-                    worker.buildBatchRequests(prepareRequests);
+                        while (this.prepareRequests.size() < 
maxSyncoperationNum && !prepareDone) {
+                            try {
+                                this.prepareRequests.wait(1000);
+                            } catch (InterruptedException e) {
+                            }
+                        }
 
-                    worker.endOfRound(System.currentTimeMillis() - lastTime);
-                });
+                        if (CollectionUtils.isEmpty(prepareRequests)) {
+                            continue;
+                        }
 
-                if (debug) {
-                    log.info("build batch persistence duration: {} ms", 
System.currentTimeMillis() - startTime);
-                }
-            } finally {
-                timer.finish();
-            }
+                        List<PrepareRequest> prepareRequestList = 
this.prepareRequests.subList(
+                            0, Math.min(maxSyncoperationNum, 
this.prepareRequests.size()));
+                        partition = new ArrayList<>(prepareRequestList);
+                        prepareRequestList.clear();
+                    }
 
-            HistogramMetrics.Timer executeLatencyTimer = 
executeLatency.createTimer();
-            try {
-                List<List<PrepareRequest>> partitions = 
Lists.partition(prepareRequests, maxSyncoperationNum);
-                CountDownLatch countDownLatch = new 
CountDownLatch(partitions.size());
-                for (final List<PrepareRequest> partition : partitions) {
-                    executorService.submit(() -> {
+                    final List<PrepareRequest> finalPartition = partition;
+                    Future<?> submit = executorService.submit(() -> {
+                        HistogramMetrics.Timer executeLatencyTimer = 
executeLatency.createTimer();
                         try {
-                            if (CollectionUtils.isNotEmpty(partition)) {
-                                batchDAO.synchronous(partition);
+                            if (CollectionUtils.isNotEmpty(finalPartition)) {
+                                batchDAO.synchronous(finalPartition);
                             }
                         } catch (Throwable e) {
                             log.error(e.getMessage(), e);
                         } finally {
-                            countDownLatch.countDown();
+                            executeLatencyTimer.finish();
                         }
+
                     });
+                    results.add(submit);
                 }
-                countDownLatch.await();
-            } finally {
-                executeLatencyTimer.finish();
+
+                for (Future<?> result : results) {
+                    result.get();
+                }
+                return null;
+            });
+            countDownLatch.await();
+            prepareDone = true;
+            synchronized (prepareRequests) {
+                prepareRequests.notify();
             }
+            batchFuture.get();
+
         } catch (Throwable e) {
             errorCounter.inc();
             log.error(e.getMessage(), e);
         } finally {
             if (log.isDebugEnabled()) {
                 log.debug("Persistence data save finish");
             }
-
+            allTimer.finish();
             prepareRequests.clear();
+
             lastTime = System.currentTimeMillis();
+            prepareDone = false;

Review comment:
       Why need this?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to