details: https://code.openbravo.com/erp/devel/pi/rev/01589a6ee69a changeset: 35716:01589a6ee69a user: Asier Lostalé <asier.lostale <at> openbravo.com> date: Fri May 03 07:48:00 2019 +0200 summary: fixes 40747: unnecessary wait in Import Entry Manager
After handling import entries to be processed, ImportEntryManager thread sleeps in order to give time those entries to be processed before starting a new loop. The time was calculated based on the amount of import entries handled in last cycle (giving 300ms to each one) with a minimum of 2 secs. In cases with a high activity this time can be very high resulting in no new entries being processed because the import entry manager thread is sleeping. When doing this calculation, the number of threads processing import entries was not taken into account. Now this time is divided by this number of threads. Still this is just an inaccurate approximation. If this time is very small, the entries took in previous loop will be taken again in the next one(s) as they couldn't be processed yet so their status didn't change in database, causing unnecessary roundtrips to database. In any case, the consecuence of this would be limited to non required work being done as ImportEntryProcessor.ImportEntryProcessRunnable.addEntry method checks whether new import entries retrieved from DB are already being processed before adding them to the queue preventing in this way double processing. details: https://code.openbravo.com/erp/devel/pi/rev/3ad31745f63e changeset: 35717:3ad31745f63e user: Asier Lostalé <asier.lostale <at> openbravo.com> date: Fri May 03 14:06:11 2019 +0200 summary: fixes 37853: improved ImportEntryManager logs - Log partial timing statistics for the last 100 entries processed keeping also total statisitcs since last restart - Include process time for each processed entry - ImportEntryProcessor.addEntry * Set key to runnable before adding entry so it gets properly logged * In all logs include type of data and key to know if problems occur in a single processor * Increase from debug to warn log level message when processor's quee is completelly full. This should really exceptional and worth to make it visible in log as it might indicate sever problems diffstat: src/org/openbravo/service/importprocess/ImportEntryManager.java | 53 ++++++--- src/org/openbravo/service/importprocess/ImportEntryProcessor.java | 18 +- 2 files changed, 47 insertions(+), 24 deletions(-) diffs (160 lines): diff -r b638e581f3c3 -r 3ad31745f63e src/org/openbravo/service/importprocess/ImportEntryManager.java --- a/src/org/openbravo/service/importprocess/ImportEntryManager.java Mon May 06 15:49:37 2019 +0200 +++ b/src/org/openbravo/service/importprocess/ImportEntryManager.java Fri May 03 14:06:11 2019 +0200 @@ -153,6 +153,8 @@ // default to number of processors plus some additionals for the main threads private int numberOfThreads = Runtime.getRuntime().availableProcessors() + 3; + private int processingCapacityPerSecond; + // defines the batch size of reading and processing import entries by the // main thread, for each type of data the batch size is being read private int importBatchSize = 5000; @@ -177,10 +179,21 @@ numberOfThreads, 4); maxTaskQueueSize = ImportProcessUtils.getCheckIntProperty(log, "import.max.task.queue.size", maxTaskQueueSize, 50); + managerWaitTime = ImportProcessUtils.getCheckIntProperty(log, "import.wait.time", 600, 1); + processingCapacityPerSecond = ImportProcessUtils.getCheckIntProperty(log, + "import.processing.capacity.per.second", numberOfThreads * 30, 10); + // property defined in secs, convert to ms managerWaitTime = managerWaitTime * 1000; + + log.info("Import entry manager settings"); + log.info(" batch size: {}", importBatchSize); + log.info(" number of threads: {}", numberOfThreads); + log.info(" task queue size: {}", maxTaskQueueSize); + log.info(" wait time: {} sec", managerWaitTime); + log.info(" processing capacity per second: {} entries", processingCapacityPerSecond); } public synchronized void start() { @@ -613,23 +626,24 @@ if (entryCount > 0) { // if there was data then just wait some time // give the threads time to process it all before trying - // a next batch of entries + // a next batch of entries to prevent retrieving from DB the same records we have just + // handled in this cycle try { - // wait one second per 30 records, somewhat arbitrary - // but high enough for most cases, also always wait 300 millis additional to - // start up threads etc. + // wait processingCapacityPerSecond which is the expected entries number of entries + // that can be processed per second, it defaults to one second per 30 records per + // thread, somewhat arbitrary but high enough for most cases, also always wait 300 + // milliseconds additional to start up threads etc. // note computation of timing ensures that int rounding is done on 1000* entrycount - if (isTest) { - // in case of test don't wait minimal 2 seconds - Thread.sleep(300 + ((1000 * entryCount) / 30)); - } else { - // wait minimal 2 seconds or based on entry count - long t = Math.max(2000, 300 + ((1000 * entryCount) / 30)); - log.debug( - "{} entries have been processed. Wait {} ms, and try again to capture new entries which have been added", - entryCount, t); - Thread.sleep(t); - } + + // wait minimal 2 seconds or based on entry count, no minimal wait in case of test + int minWait = isTest ? 0 : 2_000; + long t = Math.max(minWait, + 300 + ((1_000 * entryCount) / manager.processingCapacityPerSecond)); + + log.debug( + "{} entries have been handled. Wait {} ms, and try again to capture new entries which have been added", + entryCount, t); + Thread.sleep(t); } catch (Exception ignored) { } } else { @@ -702,7 +716,9 @@ private static class ImportStatistics { private String typeOfData; private long cnt; + private long cntPartial; private long totalTime; + private long totalTimePartial; public void setTypeOfData(String typeOfData) { this.typeOfData = typeOfData; @@ -714,11 +730,16 @@ public synchronized void addTiming(long timeForEntry) { cnt++; + cntPartial++; totalTime += timeForEntry; + totalTimePartial += timeForEntry; } public synchronized void log() { - log.info("Timings for " + typeOfData + " cnt: " + cnt + " avg millis: " + (totalTime / cnt)); + log.info("Timings for {}. Partial [cnt: {}, avg: {} ms] - Total [cnt: {}, avg: {} ms]", + typeOfData, cntPartial, totalTimePartial / cntPartial, cnt, totalTime / cnt); + cntPartial = 0; + totalTimePartial = 0; } } diff -r b638e581f3c3 -r 3ad31745f63e src/org/openbravo/service/importprocess/ImportEntryProcessor.java --- a/src/org/openbravo/service/importprocess/ImportEntryProcessor.java Mon May 06 15:49:37 2019 +0200 +++ b/src/org/openbravo/service/importprocess/ImportEntryProcessor.java Fri May 03 14:06:11 2019 +0200 @@ -176,8 +176,8 @@ // give it the entry runnable.setImportEntryManager(importEntryManager); runnable.setImportEntryProcessor(this); + runnable.setKey(key); runnable.addEntry(importEntry); - runnable.setKey(key); // and make sure it can get next entries by caching it runnables.put(key, runnable); @@ -366,14 +366,14 @@ final String typeOfData = localImportEntry.getTypeofdata(); if (logger.isDebugEnabled()) { - logger.debug("Processing entry " + localImportEntry.getIdentifier() + " " + typeOfData); + logger.debug("Processing entry {} {}", localImportEntry.getIdentifier(), typeOfData); } processEntry(localImportEntry); if (logger.isDebugEnabled()) { - logger.debug( - "Finished Processing entry " + localImportEntry.getIdentifier() + " " + typeOfData); + logger.debug("Finished Processing entry {} {} in {} ms", + localImportEntry.getIdentifier(), typeOfData, System.currentTimeMillis() - t0); } // don't use the import entry anymore, touching methods on it @@ -540,20 +540,22 @@ // prevents memory problems if (importEntries.size() > MAX_QUEUE_SIZE) { // set to level debug until other changes have been made in subclassing code - logger.debug("Ignoring import entry, will be reprocessed later, too many queue entries " - + importEntries.size()); + logger.warn( + "Ignoring import entry {} - {}, will be reprocessed later, too many queue entries {}", + importEntry.getTypeofdata(), key, importEntries.size()); return; } if (!importEntryIds.contains(importEntry.getId())) { - logger.debug("Adding entry to runnable with key " + key); + logger.debug("Adding entry to runnable with key {} - {}", importEntry.getTypeofdata(), key); importEntryIds.add(importEntry.getId()); // cache a queued entry as it has a much lower mem foot print than the import // entry itself importEntries.add(new QueuedEntry(importEntry)); } else { - logger.debug("Not adding entry, it is already in the list of ids " + importEntry.getId()); + logger.debug("Not adding entry, it is already in the list of ids {} - {} - {} ", + importEntry.getTypeofdata(), key, importEntry.getId()); } } _______________________________________________ Openbravo-commits mailing list Openbravo-commits@lists.sourceforge.net https://lists.sourceforge.net/lists/listinfo/openbravo-commits