Hello again,

As suggested by William Bader, I added the option to use the logical CPU
count as the number of worker threads automatically.

Regards, Adam.

Am 04.12.2012 12:28, schrieb Adam Reichold:
> Hello Thomas,
> 
> Am 04.12.2012 08:41, schrieb Thomas Freitag:
>> Am 04.12.2012 07:45, schrieb Adam Reichold:
>> Hello everyone,
>>
>> I currently try to get myself acquainted with Poppler's regression
>> testing framework. Because my system has a rather low single-threaded
>> performance, I tried to implement parallel testing using Python's
>> Queue class.
>>
>> Even though poppler-regtest currently uses two processes per test
>> file, rendering even and odd pages respectively, the test files
>> themselves are still handled sequentially and both process are joined
>> for each test file. This will yield suboptimal system utilization even
>> for a small three-core system like mine.
>>
>> Using the "-t/--threads N" option in the patched poppler-regtest will
>> spawn N worker threads that handle all tests they can get from a
>> single queue for all known tests, allowing to heavily utilize also
>> large system if using a large set of test cases. But even for my
>> three-core system, this brought down the time to create references for
>> the complete test suite using the Splash backend from 4,5 hours to
>> 2,75 hours.
>>> What do You mean with "patched poppler-regtest"? There was no
>>> attachment, or do I miss something?
> 
> No, this on the process level in the test driver and independent of the
> your multi-threading work. There was a patch called
> "parallel_testing.patch" attached, as least my mail client tells me so.
> 
> I'll just try again...
> 
> Best regards, Adam.
> 
>>> If You talk about my multi-threaded testcase, please be aware, that it
>>> is still experimental, even if quite close. Here the result of my last
>>> regression test last sunday evening:
>>
>>> Total 1133 tests
>>> 1114 tests passed (98.32%)
>>> 17 tests failed (1.50%): /media/thomas/HD-PCTU3/PDF Suite/Algorithmics -
>>> The Spirit of Computing, 3rd Ed.pdf (splash), /media/thomas/HD-PCTU3/PDF
>>> Suite/Essentials of English Grammar - www.ielts4u.blogfa.com.pdf ...
>>> 2 tests crashed (0.18%): /media/thomas/HD-PCTU3/PDF Suite/bug157090.pdf
>>> (splash), /media/thomas/HD-PCTU3/PDF Suite/sinatr4c.f5.pdf (splash)
>>> I'll first have a look at the crashes next weekend, then I'll continue
>>> with looking at the failed tests....
>>
>>> Cheers,
>>> Thomas
>>
>>
>> IMHO, the necessary changes seem quite small especially since a lot of
>> them are connected to indentation handling. What are your thoughts on
>> the utility and implementation of this?
>>
>> Best regards, Adam.
>>>
>>>
>>> _______________________________________________
>>> poppler mailing list
>>> [email protected]
>>> http://lists.freedesktop.org/mailman/listinfo/poppler
>>
>>
>>
>> _______________________________________________
>> poppler mailing list
>> [email protected]
>> http://lists.freedesktop.org/mailman/listinfo/poppler
>>
>>
>>
>> _______________________________________________
>> poppler mailing list
>> [email protected]
>> http://lists.freedesktop.org/mailman/listinfo/poppler
>From e34c246f3c11d7abf0cea629100ca33d483c180f Mon Sep 17 00:00:00 2001
From: Adam Reichold <[email protected]>
Date: Tue, 4 Dec 2012 07:28:01 +0100
Subject: [PATCH 1/3] implemented parallel testing using a queue and worker
 threads

---
 regtest/Printer.py        |  8 ++++++++
 regtest/TestReferences.py | 23 ++++++++++++++++++++++-
 regtest/TestRun.py        | 36 +++++++++++++++++++++++++++++++++++-
 regtest/main.py           |  3 +++
 4 files changed, 68 insertions(+), 2 deletions(-)

diff --git a/regtest/Printer.py b/regtest/Printer.py
index 008f46b..453e03c 100644
--- a/regtest/Printer.py
+++ b/regtest/Printer.py
@@ -19,6 +19,8 @@
 import sys
 from Config import Config
 
+from threading import RLock
+
 class Printer:
 
     __single = None
@@ -32,6 +34,8 @@ class Printer:
         self._rewrite = self._stream.isatty() and not self._verbose
         self._current_line = None
         
+        self._lock = RLock()
+
         Printer.__single = self
 
     def _erase_current_line(self):
@@ -52,11 +56,13 @@ class Printer:
         self._stream.flush()
 
     def printout(self, msg):
+        with self._lock:
             self._erase_current_line()
             self._print(msg)
             self._current_line = msg[msg.rfind('\n') + 1:]
     
     def printout_update(self, msg):
+        with self._lock:
             if self._rewrite and self._current_line is not None:
                 msg = self._current_line + msg
             elif not self._rewrite:
@@ -64,6 +70,7 @@ class Printer:
             self.printout(msg)
 
     def printout_ln(self, msg):
+        with self._lock:
             if self._current_line is not None:
                 self._current_line = None
                 msg = '\n' + msg
@@ -71,6 +78,7 @@ class Printer:
             self._print(self._ensure_new_line(msg))
 
     def printerr(self, msg):
+        with self._lock:
             self.stderr.write(self._ensure_new_line(msg))
             self.stderr.flush()
 
diff --git a/regtest/TestReferences.py b/regtest/TestReferences.py
index 9a9e923..cccd8af 100644
--- a/regtest/TestReferences.py
+++ b/regtest/TestReferences.py
@@ -23,6 +23,9 @@ from Config import Config
 from Printer import get_printer
 from Utils import get_document_paths_from_dir, get_skipped_tests
 
+from Queue import Queue
+from threading import Thread
+
 class TestReferences:
 
     def __init__(self, docsdir, refsdir):
@@ -32,6 +35,8 @@ class TestReferences:
         self.config = Config()
         self.printer = get_printer()
         
+        self._queue = Queue()
+
         try:
             os.makedirs(self._refsdir)
         except OSError as e:
@@ -68,9 +73,25 @@ class TestReferences:
             if backend.create_refs(doc_path, refs_path):
                 backend.create_checksums(refs_path, self.config.checksums_only)
     
+    def worker_thread(self):
+        while True:
+            doc, n_doc, total_docs = self._queue.get()
+	    self.create_refs_for_file(doc, n_doc, total_docs)
+	    self._queue.task_done()
+    
     def create_refs(self):
         docs, total_docs = get_document_paths_from_dir(self._docsdir)
+        
+        self.printer.printout_ln('Process %d is spawning %d worker threads...' % (os.getpid(), self.config.threads))
+        
+        for n_thread in range(self.config.threads):
+            thread = Thread(target=self.worker_thread)
+	    thread.daemon = True
+            thread.start()
+        
         n_doc = 0
         for doc in docs:
             n_doc += 1
-            self.create_refs_for_file(doc, n_doc, total_docs)
+            self._queue.put( (doc, n_doc, total_docs) )
+        
+        self._queue.join()
diff --git a/regtest/TestRun.py b/regtest/TestRun.py
index f4e5051..3894e93 100644
--- a/regtest/TestRun.py
+++ b/regtest/TestRun.py
@@ -24,6 +24,9 @@ import sys
 import os
 import errno
 
+from Queue import Queue
+from threading import Thread, RLock
+
 class TestRun:
 
     def __init__(self, docsdir, refsdir, outdir):
@@ -43,6 +46,9 @@ class TestRun:
         self._stderr = []
         self._skipped = []
         
+        self._queue = Queue()
+        self._lock = RLock()
+
         try:
             os.makedirs(self._outdir);
         except OSError as e:
@@ -57,24 +63,30 @@ class TestRun:
         ref_is_crashed = backend.is_crashed(refs_path)
         ref_is_failed = backend.is_failed(refs_path)
         if not ref_has_md5 and not ref_is_crashed and not ref_is_failed:
+            with self._lock:
                 self._skipped.append("%s (%s)" % (doc_path, backend.get_name()))
             self.printer.print_default("Reference files not found, skipping '%s' for %s backend" % (doc_path, backend.get_name()))
             return
 
+        with self._lock:
             self._n_tests += 1
+
         self.printer.print_test_start("Testing '%s' using %s backend (%d/%d): " % (doc_path, backend.get_name(), n_doc, total_docs))
         test_has_md5 = backend.create_refs(doc_path, test_path)
 
         if backend.has_stderr(test_path):
+            with self._lock:
                 self._stderr.append("%s (%s)" % (doc_path, backend.get_name()))
 
         if ref_has_md5 and test_has_md5:
             if backend.compare_checksums(refs_path, test_path, not self.config.keep_results, self.config.create_diffs, self.config.update_refs):
                 # FIXME: remove dir if it's empty?
                 self.printer.print_test_result("PASS")
+                with self._lock:
                     self._n_passed += 1
             else:
                 self.printer.print_test_result_ln("FAIL")
+                with self._lock:
                     self._failed.append("%s (%s)" % (doc_path, backend.get_name()))
             return
         elif test_has_md5:
@@ -87,6 +99,7 @@ class TestRun:
         test_is_crashed = backend.is_crashed(test_path)
         if ref_is_crashed and test_is_crashed:
             self.printer.print_test_result("PASS (Expected crash)")
+            with self._lock:
                 self._n_passed += 1
             return
 
@@ -94,22 +107,26 @@ class TestRun:
         if ref_is_failed and test_is_failed:
             # FIXME: compare status errors
             self.printer.print_test_result("PASS (Expected fail with status error %d)" % (test_is_failed))
+            with self._lock:
                 self._n_passed += 1
             return
 
         if test_is_crashed:
             self.printer.print_test_result_ln("CRASH")
+            with self._lock:
                 self._crashed.append("%s (%s)" % (doc_path, backend.get_name()))
             return
 
         if test_is_failed:
             self.printer.print_test_result_ln("FAIL (status error %d)" % (test_is_failed))
+            with self._lock:
                 self._failed_status_error("%s (%s)" % (doc_path, backend.get_name()))
             return
 
     def run_test(self, filename, n_doc = 1, total_docs = 1):
         if filename in self._skip:
             doc_path = os.path.join(self._docsdir, filename)
+            with self._lock:
                 self._skipped.append("%s" % (doc_path))
             self.printer.print_default("Skipping test '%s' (%d/%d)" % (doc_path, n_doc, total_docs))
             return
@@ -126,6 +143,7 @@ class TestRun:
         refs_path = os.path.join(self._refsdir, filename)
 
         if not os.path.isdir(refs_path):
+            with self._lock:
                 self._skipped.append("%s" % (doc_path))
             self.printer.print_default("Reference dir not found for %s, skipping (%d/%d)" % (doc_path, n_doc, total_docs))
             return
@@ -138,12 +156,28 @@ class TestRun:
         for backend in backends:
             self.test(refs_path, doc_path, out_path, backend, n_doc, total_docs)
 
+    def worker_thread(self):
+        while True:
+            doc, n_doc, total_docs = self._queue.get()
+	    self.run_test(doc, n_doc, total_docs)
+	    self._queue.task_done()
+    
     def run_tests(self):
         docs, total_docs = get_document_paths_from_dir(self._docsdir)
+        
+        self.printer.printout_ln('Process %d is spawning %d worker threads...' % (os.getpid(), self.config.threads))
+        
+        for n_thread in range(self.config.threads):
+            thread = Thread(target=self.worker_thread)
+	    thread.daemon = True
+            thread.start()
+        
         n_doc = 0
         for doc in docs:
             n_doc += 1
-            self.run_test(doc, n_doc, total_docs)
+            self._queue.put( (doc, n_doc, total_docs) )
+        
+        self._queue.join()
 
     def summary(self):
         if not self._n_tests:
diff --git a/regtest/main.py b/regtest/main.py
index 290c8bc..f7c3ac7 100644
--- a/regtest/main.py
+++ b/regtest/main.py
@@ -61,6 +61,9 @@ def main(args):
     parser.add_argument('--skip', metavar = 'FILE',
                         action = 'store', dest = 'skipped_file',
                         help = 'File containing tests to skip')
+    parser.add_argument('-t', '--threads',
+                        action = 'store', dest = 'threads', type = int, default = 1,
+                        help = 'Number of worker threads')
 
     ns, args = parser.parse_known_args(args)
     if not args:
-- 
1.8.0.1


>From 939a57e14726cfb9859497249b20e64409def9d2 Mon Sep 17 00:00:00 2001
From: Adam Reichold <[email protected]>
Date: Tue, 4 Dec 2012 07:32:44 +0100
Subject: [PATCH 2/3] removed two process optimization because it reduces
 parallel test performance

---
 regtest/backends/cairo.py  | 5 ++---
 regtest/backends/splash.py | 5 ++---
 2 files changed, 4 insertions(+), 6 deletions(-)

diff --git a/regtest/backends/cairo.py b/regtest/backends/cairo.py
index 304783e..3593342 100644
--- a/regtest/backends/cairo.py
+++ b/regtest/backends/cairo.py
@@ -28,9 +28,8 @@ class Cairo(Backend):
 
     def create_refs(self, doc_path, refs_path):
         out_path = os.path.join(refs_path, 'cairo')
-        p1 = subprocess.Popen([self._pdftocairo, '-cropbox', '-r', '72', '-e', '-png', doc_path, out_path], stderr = subprocess.PIPE)
-        p2 = subprocess.Popen([self._pdftocairo, '-cropbox', '-r', '72', '-o', '-png', doc_path, out_path], stderr = subprocess.PIPE)
-        return self._check_exit_status2(p1, p2, out_path)
+        p = subprocess.Popen([self._pdftocairo, '-cropbox', '-r', '72', '-png', doc_path, out_path], stderr = subprocess.PIPE)
+        return self._check_exit_status(p, out_path)
 
     def _create_diff(self, ref_path, result_path):
         self._diff_png(ref_path, result_path)
diff --git a/regtest/backends/splash.py b/regtest/backends/splash.py
index 3144bc7..aadbdec 100644
--- a/regtest/backends/splash.py
+++ b/regtest/backends/splash.py
@@ -28,9 +28,8 @@ class Splash(Backend):
 
     def create_refs(self, doc_path, refs_path):
         out_path = os.path.join(refs_path, 'splash')
-        p1 = subprocess.Popen([self._pdftoppm, '-cropbox', '-r', '72', '-e', '-png', doc_path, out_path], stderr = subprocess.PIPE)
-        p2 = subprocess.Popen([self._pdftoppm, '-cropbox', '-r', '72', '-o', '-png', doc_path, out_path], stderr = subprocess.PIPE)
-        return self._check_exit_status2(p1, p2, out_path)
+        p = subprocess.Popen([self._pdftoppm, '-cropbox', '-r', '72', '-png', doc_path, out_path], stderr = subprocess.PIPE)
+        return self._check_exit_status(p, out_path)
 
     def _create_diff(self, ref_path, result_path):
         self._diff_png(ref_path, result_path)
-- 
1.8.0.1


>From ec0ae2470b044fedf9484bf369b3e1e5b302d771 Mon Sep 17 00:00:00 2001
From: Adam Reichold <[email protected]>
Date: Tue, 4 Dec 2012 20:55:03 +0100
Subject: [PATCH 3/3] interpret a thread count of zero or lower as the number
 of logical CPU as suggested by William Bader

---
 regtest/TestReferences.py | 4 ++--
 regtest/TestRun.py        | 4 ++--
 regtest/main.py           | 8 +++++++-
 3 files changed, 11 insertions(+), 5 deletions(-)

diff --git a/regtest/TestReferences.py b/regtest/TestReferences.py
index cccd8af..4dccfe1 100644
--- a/regtest/TestReferences.py
+++ b/regtest/TestReferences.py
@@ -73,7 +73,7 @@ class TestReferences:
             if backend.create_refs(doc_path, refs_path):
                 backend.create_checksums(refs_path, self.config.checksums_only)
     
-    def worker_thread(self):
+    def _worker_thread(self):
         while True:
             doc, n_doc, total_docs = self._queue.get()
 	    self.create_refs_for_file(doc, n_doc, total_docs)
@@ -85,7 +85,7 @@ class TestReferences:
         self.printer.printout_ln('Process %d is spawning %d worker threads...' % (os.getpid(), self.config.threads))
         
         for n_thread in range(self.config.threads):
-            thread = Thread(target=self.worker_thread)
+            thread = Thread(target=self._worker_thread)
 	    thread.daemon = True
             thread.start()
         
diff --git a/regtest/TestRun.py b/regtest/TestRun.py
index 3894e93..490fab1 100644
--- a/regtest/TestRun.py
+++ b/regtest/TestRun.py
@@ -156,7 +156,7 @@ class TestRun:
         for backend in backends:
             self.test(refs_path, doc_path, out_path, backend, n_doc, total_docs)
 
-    def worker_thread(self):
+    def _worker_thread(self):
         while True:
             doc, n_doc, total_docs = self._queue.get()
 	    self.run_test(doc, n_doc, total_docs)
@@ -168,7 +168,7 @@ class TestRun:
         self.printer.printout_ln('Process %d is spawning %d worker threads...' % (os.getpid(), self.config.threads))
         
         for n_thread in range(self.config.threads):
-            thread = Thread(target=self.worker_thread)
+            thread = Thread(target=self._worker_thread)
 	    thread.daemon = True
             thread.start()
         
diff --git a/regtest/main.py b/regtest/main.py
index f7c3ac7..9b4d340 100644
--- a/regtest/main.py
+++ b/regtest/main.py
@@ -23,6 +23,8 @@ import backends
 import os
 from Config import Config
 
+from multiprocessing import cpu_count
+
 class ListAction(argparse.Action):
     def __call__(self, parser, namespace, values, option_string = None):
         setattr(namespace, self.dest, values.split(','))
@@ -70,7 +72,11 @@ def main(args):
         parser.print_help()
         sys.exit(0)
 
-    Config(vars(ns))
+    c = Config(vars(ns))
+    
+    if c.threads <= 0:
+        c.threads = cpu_count()           
+    
     try:
         commands.run(args)
     except commands.UnknownCommandError:
-- 
1.8.0.1

_______________________________________________
poppler mailing list
[email protected]
http://lists.freedesktop.org/mailman/listinfo/poppler

Reply via email to