details:   https://code.openbravo.com/erp/devel/pi/rev/01589a6ee69a
changeset: 35716:01589a6ee69a
user:      Asier Lostalé <asier.lostale <at> openbravo.com>
date:      Fri May 03 07:48:00 2019 +0200
summary:   fixes 40747: unnecessary wait in Import Entry Manager

  After handling import entries to be processed, ImportEntryManager thread 
sleeps
  in order to give time those entries to be processed before starting a new 
loop.

  The time was calculated based on the amount of import entries handled in last
  cycle (giving 300ms to each one) with a minimum of 2 secs. In cases with a 
high
  activity this time can be very high resulting in no new entries being 
processed
  because the import entry manager thread is sleeping.

  When doing this calculation, the number of threads processing import entries 
was
  not taken into account.

  Now this time is divided by this number of threads. Still this is just an 
inaccurate
  approximation. If this time is very small, the entries took in previous loop 
will
  be taken again in the next one(s) as they couldn't be processed yet so their 
status
  didn't change in database, causing unnecessary roundtrips to database. In any 
case,
  the consecuence of this would be limited to non required work being done as
  ImportEntryProcessor.ImportEntryProcessRunnable.addEntry method checks 
whether new
  import entries retrieved from DB are already being processed before adding 
them to
  the queue preventing in this way double processing.

details:   https://code.openbravo.com/erp/devel/pi/rev/3ad31745f63e
changeset: 35717:3ad31745f63e
user:      Asier Lostalé <asier.lostale <at> openbravo.com>
date:      Fri May 03 14:06:11 2019 +0200
summary:   fixes 37853: improved ImportEntryManager logs

  - Log partial timing statistics for the last 100 entries processed keeping 
also
    total statisitcs since last restart
  - Include process time for each processed entry
  - ImportEntryProcessor.addEntry
     * Set key to runnable before adding entry so it gets properly logged
     * In all logs include type of data and key to know if problems occur in a
       single processor
     * Increase from debug to warn log level message when processor's quee is
       completelly full. This should really exceptional and worth to make it
       visible in log as it might indicate sever problems

diffstat:

 src/org/openbravo/service/importprocess/ImportEntryManager.java   |  53 
++++++---
 src/org/openbravo/service/importprocess/ImportEntryProcessor.java |  18 +-
 2 files changed, 47 insertions(+), 24 deletions(-)

diffs (160 lines):

diff -r b638e581f3c3 -r 3ad31745f63e 
src/org/openbravo/service/importprocess/ImportEntryManager.java
--- a/src/org/openbravo/service/importprocess/ImportEntryManager.java   Mon May 
06 15:49:37 2019 +0200
+++ b/src/org/openbravo/service/importprocess/ImportEntryManager.java   Fri May 
03 14:06:11 2019 +0200
@@ -153,6 +153,8 @@
   // default to number of processors plus some additionals for the main threads
   private int numberOfThreads = Runtime.getRuntime().availableProcessors() + 3;
 
+  private int processingCapacityPerSecond;
+
   // defines the batch size of reading and processing import entries by the
   // main thread, for each type of data the batch size is being read
   private int importBatchSize = 5000;
@@ -177,10 +179,21 @@
         numberOfThreads, 4);
     maxTaskQueueSize = ImportProcessUtils.getCheckIntProperty(log, 
"import.max.task.queue.size",
         maxTaskQueueSize, 50);
+
     managerWaitTime = ImportProcessUtils.getCheckIntProperty(log, 
"import.wait.time", 600, 1);
 
+    processingCapacityPerSecond = ImportProcessUtils.getCheckIntProperty(log,
+        "import.processing.capacity.per.second", numberOfThreads * 30, 10);
+
     // property defined in secs, convert to ms
     managerWaitTime = managerWaitTime * 1000;
+
+    log.info("Import entry manager settings");
+    log.info("  batch size: {}", importBatchSize);
+    log.info("  number of threads: {}", numberOfThreads);
+    log.info("  task queue size: {}", maxTaskQueueSize);
+    log.info("  wait time: {} sec", managerWaitTime);
+    log.info("  processing capacity per second: {} entries", 
processingCapacityPerSecond);
   }
 
   public synchronized void start() {
@@ -613,23 +626,24 @@
             if (entryCount > 0) {
               // if there was data then just wait some time
               // give the threads time to process it all before trying
-              // a next batch of entries
+              // a next batch of entries to prevent retrieving from DB the 
same records we have just
+              // handled in this cycle
               try {
-                // wait one second per 30 records, somewhat arbitrary
-                // but high enough for most cases, also always wait 300 millis 
additional to
-                // start up threads etc.
+                // wait processingCapacityPerSecond which is the expected 
entries number of entries
+                // that can be processed per second, it defaults to one second 
per 30 records per
+                // thread, somewhat arbitrary but high enough for most cases, 
also always wait 300
+                // milliseconds additional to start up threads etc.
                 // note computation of timing ensures that int rounding is 
done on 1000* entrycount
-                if (isTest) {
-                  // in case of test don't wait minimal 2 seconds
-                  Thread.sleep(300 + ((1000 * entryCount) / 30));
-                } else {
-                  // wait minimal 2 seconds or based on entry count
-                  long t = Math.max(2000, 300 + ((1000 * entryCount) / 30));
-                  log.debug(
-                      "{} entries have been processed. Wait {} ms, and try 
again to capture new entries which have been added",
-                      entryCount, t);
-                  Thread.sleep(t);
-                }
+
+                // wait minimal 2 seconds or based on entry count, no minimal 
wait in case of test
+                int minWait = isTest ? 0 : 2_000;
+                long t = Math.max(minWait,
+                    300 + ((1_000 * entryCount) / 
manager.processingCapacityPerSecond));
+
+                log.debug(
+                    "{} entries have been handled. Wait {} ms, and try again 
to capture new entries which have been added",
+                    entryCount, t);
+                Thread.sleep(t);
               } catch (Exception ignored) {
               }
             } else {
@@ -702,7 +716,9 @@
   private static class ImportStatistics {
     private String typeOfData;
     private long cnt;
+    private long cntPartial;
     private long totalTime;
+    private long totalTimePartial;
 
     public void setTypeOfData(String typeOfData) {
       this.typeOfData = typeOfData;
@@ -714,11 +730,16 @@
 
     public synchronized void addTiming(long timeForEntry) {
       cnt++;
+      cntPartial++;
       totalTime += timeForEntry;
+      totalTimePartial += timeForEntry;
     }
 
     public synchronized void log() {
-      log.info("Timings for " + typeOfData + " cnt: " + cnt + " avg millis: " 
+ (totalTime / cnt));
+      log.info("Timings for {}. Partial [cnt: {}, avg: {} ms] - Total [cnt: 
{}, avg: {} ms]",
+          typeOfData, cntPartial, totalTimePartial / cntPartial, cnt, 
totalTime / cnt);
+      cntPartial = 0;
+      totalTimePartial = 0;
     }
   }
 
diff -r b638e581f3c3 -r 3ad31745f63e 
src/org/openbravo/service/importprocess/ImportEntryProcessor.java
--- a/src/org/openbravo/service/importprocess/ImportEntryProcessor.java Mon May 
06 15:49:37 2019 +0200
+++ b/src/org/openbravo/service/importprocess/ImportEntryProcessor.java Fri May 
03 14:06:11 2019 +0200
@@ -176,8 +176,8 @@
     // give it the entry
     runnable.setImportEntryManager(importEntryManager);
     runnable.setImportEntryProcessor(this);
+    runnable.setKey(key);
     runnable.addEntry(importEntry);
-    runnable.setKey(key);
 
     // and make sure it can get next entries by caching it
     runnables.put(key, runnable);
@@ -366,14 +366,14 @@
           final String typeOfData = localImportEntry.getTypeofdata();
 
           if (logger.isDebugEnabled()) {
-            logger.debug("Processing entry " + 
localImportEntry.getIdentifier() + " " + typeOfData);
+            logger.debug("Processing entry {} {}", 
localImportEntry.getIdentifier(), typeOfData);
           }
 
           processEntry(localImportEntry);
 
           if (logger.isDebugEnabled()) {
-            logger.debug(
-                "Finished Processing entry " + 
localImportEntry.getIdentifier() + " " + typeOfData);
+            logger.debug("Finished Processing entry {} {} in {} ms",
+                localImportEntry.getIdentifier(), typeOfData, 
System.currentTimeMillis() - t0);
           }
 
           // don't use the import entry anymore, touching methods on it
@@ -540,20 +540,22 @@
       // prevents memory problems
       if (importEntries.size() > MAX_QUEUE_SIZE) {
         // set to level debug until other changes have been made in 
subclassing code
-        logger.debug("Ignoring import entry, will be reprocessed later, too 
many queue entries "
-            + importEntries.size());
+        logger.warn(
+            "Ignoring import entry {} - {}, will be reprocessed later, too 
many queue entries {}",
+            importEntry.getTypeofdata(), key, importEntries.size());
         return;
       }
 
       if (!importEntryIds.contains(importEntry.getId())) {
-        logger.debug("Adding entry to runnable with key " + key);
+        logger.debug("Adding entry to runnable with key {} - {}", 
importEntry.getTypeofdata(), key);
 
         importEntryIds.add(importEntry.getId());
         // cache a queued entry as it has a much lower mem foot print than the 
import
         // entry itself
         importEntries.add(new QueuedEntry(importEntry));
       } else {
-        logger.debug("Not adding entry, it is already in the list of ids " + 
importEntry.getId());
+        logger.debug("Not adding entry, it is already in the list of ids {} - 
{} - {} ",
+            importEntry.getTypeofdata(), key, importEntry.getId());
       }
     }
 


_______________________________________________
Openbravo-commits mailing list
Openbravo-commits@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/openbravo-commits

Reply via email to