dlmarion commented on code in PR #4239:
URL: https://github.com/apache/accumulo/pull/4239#discussion_r1484502659


##########
server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java:
##########
@@ -390,6 +391,22 @@ public void run() {
 
     ServiceLock lock = announceExistence();
 
+    int threadPoolSize = 
getConfiguration().getCount(Property.SSERV_WAL_SORT_MAX_CONCURRENT);
+    if (threadPoolSize > 0) {
+      final LogSorter logSorter = new LogSorter(context, getConfiguration());
+      try {
+        // Attempt to process all existing log sorting work and start a 
background
+        // thread to look for log sorting work in the future
+        logSorter.startWatchingForRecoveryLogs(threadPoolSize);
+      } catch (Exception ex) {
+        log.error("Error starting LogSorter");
+        throw new RuntimeException(ex);
+      }
+    } else {
+      log.warn(

Review Comment:
   Resolved in the e937629 merge commit from main to elasticity



##########
server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java:
##########
@@ -614,18 +614,28 @@ public void run() {
         new DistributedWorkQueue(getContext().getZooKeeperRoot() + 
Constants.ZBULK_FAILED_COPYQ,
             getConfiguration(), getContext());
     try {
-      bulkFailedCopyQ.startProcessing(new 
BulkFailedCopyProcessor(getContext()),
+      bulkFailedCopyQ.processExistingAndFuture(new 
BulkFailedCopyProcessor(getContext()),
           distWorkQThreadPool);
     } catch (Exception e1) {
       throw new RuntimeException("Failed to start distributed work queue for 
copying ", e1);
     }
 
-    try {
-      logSorter.startWatchingForRecoveryLogs(distWorkQThreadPool);
-    } catch (Exception ex) {
-      log.error("Error setting watches for recoveries");
-      throw new RuntimeException(ex);
+    int threadPoolSize =
+        
getContext().getConfiguration().getCount(Property.TSERV_WAL_SORT_MAX_CONCURRENT);
+    if (threadPoolSize > 0) {
+      try {
+        // Attempt to process all existing log sorting work and start a 
background
+        // thread to look for log sorting work in the future
+        logSorter.startWatchingForRecoveryLogs(threadPoolSize);
+      } catch (Exception ex) {
+        log.error("Error starting LogSorter");
+        throw new RuntimeException(ex);
+      }
+    } else {
+      log.warn(

Review Comment:
   Resolved in the e937629 merge commit from main to elasticity



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to