Hello again, As suggested by William Bader, I added the option to use the logical CPU count as the number of worker threads automatically.
Regards, Adam. Am 04.12.2012 12:28, schrieb Adam Reichold: > Hello Thomas, > > Am 04.12.2012 08:41, schrieb Thomas Freitag: >> Am 04.12.2012 07:45, schrieb Adam Reichold: >> Hello everyone, >> >> I currently try to get myself acquainted with Poppler's regression >> testing framework. Because my system has a rather low single-threaded >> performance, I tried to implement parallel testing using Python's >> Queue class. >> >> Even though poppler-regtest currently uses two processes per test >> file, rendering even and odd pages respectively, the test files >> themselves are still handled sequentially and both process are joined >> for each test file. This will yield suboptimal system utilization even >> for a small three-core system like mine. >> >> Using the "-t/--threads N" option in the patched poppler-regtest will >> spawn N worker threads that handle all tests they can get from a >> single queue for all known tests, allowing to heavily utilize also >> large system if using a large set of test cases. But even for my >> three-core system, this brought down the time to create references for >> the complete test suite using the Splash backend from 4,5 hours to >> 2,75 hours. >>> What do You mean with "patched poppler-regtest"? There was no >>> attachment, or do I miss something? > > No, this on the process level in the test driver and independent of the > your multi-threading work. There was a patch called > "parallel_testing.patch" attached, as least my mail client tells me so. > > I'll just try again... > > Best regards, Adam. > >>> If You talk about my multi-threaded testcase, please be aware, that it >>> is still experimental, even if quite close. Here the result of my last >>> regression test last sunday evening: >> >>> Total 1133 tests >>> 1114 tests passed (98.32%) >>> 17 tests failed (1.50%): /media/thomas/HD-PCTU3/PDF Suite/Algorithmics - >>> The Spirit of Computing, 3rd Ed.pdf (splash), /media/thomas/HD-PCTU3/PDF >>> Suite/Essentials of English Grammar - www.ielts4u.blogfa.com.pdf ... >>> 2 tests crashed (0.18%): /media/thomas/HD-PCTU3/PDF Suite/bug157090.pdf >>> (splash), /media/thomas/HD-PCTU3/PDF Suite/sinatr4c.f5.pdf (splash) >>> I'll first have a look at the crashes next weekend, then I'll continue >>> with looking at the failed tests.... >> >>> Cheers, >>> Thomas >> >> >> IMHO, the necessary changes seem quite small especially since a lot of >> them are connected to indentation handling. What are your thoughts on >> the utility and implementation of this? >> >> Best regards, Adam. >>> >>> >>> _______________________________________________ >>> poppler mailing list >>> [email protected] >>> http://lists.freedesktop.org/mailman/listinfo/poppler >> >> >> >> _______________________________________________ >> poppler mailing list >> [email protected] >> http://lists.freedesktop.org/mailman/listinfo/poppler >> >> >> >> _______________________________________________ >> poppler mailing list >> [email protected] >> http://lists.freedesktop.org/mailman/listinfo/poppler
>From e34c246f3c11d7abf0cea629100ca33d483c180f Mon Sep 17 00:00:00 2001 From: Adam Reichold <[email protected]> Date: Tue, 4 Dec 2012 07:28:01 +0100 Subject: [PATCH 1/3] implemented parallel testing using a queue and worker threads --- regtest/Printer.py | 8 ++++++++ regtest/TestReferences.py | 23 ++++++++++++++++++++++- regtest/TestRun.py | 36 +++++++++++++++++++++++++++++++++++- regtest/main.py | 3 +++ 4 files changed, 68 insertions(+), 2 deletions(-) diff --git a/regtest/Printer.py b/regtest/Printer.py index 008f46b..453e03c 100644 --- a/regtest/Printer.py +++ b/regtest/Printer.py @@ -19,6 +19,8 @@ import sys from Config import Config +from threading import RLock + class Printer: __single = None @@ -32,6 +34,8 @@ class Printer: self._rewrite = self._stream.isatty() and not self._verbose self._current_line = None + self._lock = RLock() + Printer.__single = self def _erase_current_line(self): @@ -52,11 +56,13 @@ class Printer: self._stream.flush() def printout(self, msg): + with self._lock: self._erase_current_line() self._print(msg) self._current_line = msg[msg.rfind('\n') + 1:] def printout_update(self, msg): + with self._lock: if self._rewrite and self._current_line is not None: msg = self._current_line + msg elif not self._rewrite: @@ -64,6 +70,7 @@ class Printer: self.printout(msg) def printout_ln(self, msg): + with self._lock: if self._current_line is not None: self._current_line = None msg = '\n' + msg @@ -71,6 +78,7 @@ class Printer: self._print(self._ensure_new_line(msg)) def printerr(self, msg): + with self._lock: self.stderr.write(self._ensure_new_line(msg)) self.stderr.flush() diff --git a/regtest/TestReferences.py b/regtest/TestReferences.py index 9a9e923..cccd8af 100644 --- a/regtest/TestReferences.py +++ b/regtest/TestReferences.py @@ -23,6 +23,9 @@ from Config import Config from Printer import get_printer from Utils import get_document_paths_from_dir, get_skipped_tests +from Queue import Queue +from threading import Thread + class TestReferences: def __init__(self, docsdir, refsdir): @@ -32,6 +35,8 @@ class TestReferences: self.config = Config() self.printer = get_printer() + self._queue = Queue() + try: os.makedirs(self._refsdir) except OSError as e: @@ -68,9 +73,25 @@ class TestReferences: if backend.create_refs(doc_path, refs_path): backend.create_checksums(refs_path, self.config.checksums_only) + def worker_thread(self): + while True: + doc, n_doc, total_docs = self._queue.get() + self.create_refs_for_file(doc, n_doc, total_docs) + self._queue.task_done() + def create_refs(self): docs, total_docs = get_document_paths_from_dir(self._docsdir) + + self.printer.printout_ln('Process %d is spawning %d worker threads...' % (os.getpid(), self.config.threads)) + + for n_thread in range(self.config.threads): + thread = Thread(target=self.worker_thread) + thread.daemon = True + thread.start() + n_doc = 0 for doc in docs: n_doc += 1 - self.create_refs_for_file(doc, n_doc, total_docs) + self._queue.put( (doc, n_doc, total_docs) ) + + self._queue.join() diff --git a/regtest/TestRun.py b/regtest/TestRun.py index f4e5051..3894e93 100644 --- a/regtest/TestRun.py +++ b/regtest/TestRun.py @@ -24,6 +24,9 @@ import sys import os import errno +from Queue import Queue +from threading import Thread, RLock + class TestRun: def __init__(self, docsdir, refsdir, outdir): @@ -43,6 +46,9 @@ class TestRun: self._stderr = [] self._skipped = [] + self._queue = Queue() + self._lock = RLock() + try: os.makedirs(self._outdir); except OSError as e: @@ -57,24 +63,30 @@ class TestRun: ref_is_crashed = backend.is_crashed(refs_path) ref_is_failed = backend.is_failed(refs_path) if not ref_has_md5 and not ref_is_crashed and not ref_is_failed: + with self._lock: self._skipped.append("%s (%s)" % (doc_path, backend.get_name())) self.printer.print_default("Reference files not found, skipping '%s' for %s backend" % (doc_path, backend.get_name())) return + with self._lock: self._n_tests += 1 + self.printer.print_test_start("Testing '%s' using %s backend (%d/%d): " % (doc_path, backend.get_name(), n_doc, total_docs)) test_has_md5 = backend.create_refs(doc_path, test_path) if backend.has_stderr(test_path): + with self._lock: self._stderr.append("%s (%s)" % (doc_path, backend.get_name())) if ref_has_md5 and test_has_md5: if backend.compare_checksums(refs_path, test_path, not self.config.keep_results, self.config.create_diffs, self.config.update_refs): # FIXME: remove dir if it's empty? self.printer.print_test_result("PASS") + with self._lock: self._n_passed += 1 else: self.printer.print_test_result_ln("FAIL") + with self._lock: self._failed.append("%s (%s)" % (doc_path, backend.get_name())) return elif test_has_md5: @@ -87,6 +99,7 @@ class TestRun: test_is_crashed = backend.is_crashed(test_path) if ref_is_crashed and test_is_crashed: self.printer.print_test_result("PASS (Expected crash)") + with self._lock: self._n_passed += 1 return @@ -94,22 +107,26 @@ class TestRun: if ref_is_failed and test_is_failed: # FIXME: compare status errors self.printer.print_test_result("PASS (Expected fail with status error %d)" % (test_is_failed)) + with self._lock: self._n_passed += 1 return if test_is_crashed: self.printer.print_test_result_ln("CRASH") + with self._lock: self._crashed.append("%s (%s)" % (doc_path, backend.get_name())) return if test_is_failed: self.printer.print_test_result_ln("FAIL (status error %d)" % (test_is_failed)) + with self._lock: self._failed_status_error("%s (%s)" % (doc_path, backend.get_name())) return def run_test(self, filename, n_doc = 1, total_docs = 1): if filename in self._skip: doc_path = os.path.join(self._docsdir, filename) + with self._lock: self._skipped.append("%s" % (doc_path)) self.printer.print_default("Skipping test '%s' (%d/%d)" % (doc_path, n_doc, total_docs)) return @@ -126,6 +143,7 @@ class TestRun: refs_path = os.path.join(self._refsdir, filename) if not os.path.isdir(refs_path): + with self._lock: self._skipped.append("%s" % (doc_path)) self.printer.print_default("Reference dir not found for %s, skipping (%d/%d)" % (doc_path, n_doc, total_docs)) return @@ -138,12 +156,28 @@ class TestRun: for backend in backends: self.test(refs_path, doc_path, out_path, backend, n_doc, total_docs) + def worker_thread(self): + while True: + doc, n_doc, total_docs = self._queue.get() + self.run_test(doc, n_doc, total_docs) + self._queue.task_done() + def run_tests(self): docs, total_docs = get_document_paths_from_dir(self._docsdir) + + self.printer.printout_ln('Process %d is spawning %d worker threads...' % (os.getpid(), self.config.threads)) + + for n_thread in range(self.config.threads): + thread = Thread(target=self.worker_thread) + thread.daemon = True + thread.start() + n_doc = 0 for doc in docs: n_doc += 1 - self.run_test(doc, n_doc, total_docs) + self._queue.put( (doc, n_doc, total_docs) ) + + self._queue.join() def summary(self): if not self._n_tests: diff --git a/regtest/main.py b/regtest/main.py index 290c8bc..f7c3ac7 100644 --- a/regtest/main.py +++ b/regtest/main.py @@ -61,6 +61,9 @@ def main(args): parser.add_argument('--skip', metavar = 'FILE', action = 'store', dest = 'skipped_file', help = 'File containing tests to skip') + parser.add_argument('-t', '--threads', + action = 'store', dest = 'threads', type = int, default = 1, + help = 'Number of worker threads') ns, args = parser.parse_known_args(args) if not args: -- 1.8.0.1 >From 939a57e14726cfb9859497249b20e64409def9d2 Mon Sep 17 00:00:00 2001 From: Adam Reichold <[email protected]> Date: Tue, 4 Dec 2012 07:32:44 +0100 Subject: [PATCH 2/3] removed two process optimization because it reduces parallel test performance --- regtest/backends/cairo.py | 5 ++--- regtest/backends/splash.py | 5 ++--- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/regtest/backends/cairo.py b/regtest/backends/cairo.py index 304783e..3593342 100644 --- a/regtest/backends/cairo.py +++ b/regtest/backends/cairo.py @@ -28,9 +28,8 @@ class Cairo(Backend): def create_refs(self, doc_path, refs_path): out_path = os.path.join(refs_path, 'cairo') - p1 = subprocess.Popen([self._pdftocairo, '-cropbox', '-r', '72', '-e', '-png', doc_path, out_path], stderr = subprocess.PIPE) - p2 = subprocess.Popen([self._pdftocairo, '-cropbox', '-r', '72', '-o', '-png', doc_path, out_path], stderr = subprocess.PIPE) - return self._check_exit_status2(p1, p2, out_path) + p = subprocess.Popen([self._pdftocairo, '-cropbox', '-r', '72', '-png', doc_path, out_path], stderr = subprocess.PIPE) + return self._check_exit_status(p, out_path) def _create_diff(self, ref_path, result_path): self._diff_png(ref_path, result_path) diff --git a/regtest/backends/splash.py b/regtest/backends/splash.py index 3144bc7..aadbdec 100644 --- a/regtest/backends/splash.py +++ b/regtest/backends/splash.py @@ -28,9 +28,8 @@ class Splash(Backend): def create_refs(self, doc_path, refs_path): out_path = os.path.join(refs_path, 'splash') - p1 = subprocess.Popen([self._pdftoppm, '-cropbox', '-r', '72', '-e', '-png', doc_path, out_path], stderr = subprocess.PIPE) - p2 = subprocess.Popen([self._pdftoppm, '-cropbox', '-r', '72', '-o', '-png', doc_path, out_path], stderr = subprocess.PIPE) - return self._check_exit_status2(p1, p2, out_path) + p = subprocess.Popen([self._pdftoppm, '-cropbox', '-r', '72', '-png', doc_path, out_path], stderr = subprocess.PIPE) + return self._check_exit_status(p, out_path) def _create_diff(self, ref_path, result_path): self._diff_png(ref_path, result_path) -- 1.8.0.1 >From ec0ae2470b044fedf9484bf369b3e1e5b302d771 Mon Sep 17 00:00:00 2001 From: Adam Reichold <[email protected]> Date: Tue, 4 Dec 2012 20:55:03 +0100 Subject: [PATCH 3/3] interpret a thread count of zero or lower as the number of logical CPU as suggested by William Bader --- regtest/TestReferences.py | 4 ++-- regtest/TestRun.py | 4 ++-- regtest/main.py | 8 +++++++- 3 files changed, 11 insertions(+), 5 deletions(-) diff --git a/regtest/TestReferences.py b/regtest/TestReferences.py index cccd8af..4dccfe1 100644 --- a/regtest/TestReferences.py +++ b/regtest/TestReferences.py @@ -73,7 +73,7 @@ class TestReferences: if backend.create_refs(doc_path, refs_path): backend.create_checksums(refs_path, self.config.checksums_only) - def worker_thread(self): + def _worker_thread(self): while True: doc, n_doc, total_docs = self._queue.get() self.create_refs_for_file(doc, n_doc, total_docs) @@ -85,7 +85,7 @@ class TestReferences: self.printer.printout_ln('Process %d is spawning %d worker threads...' % (os.getpid(), self.config.threads)) for n_thread in range(self.config.threads): - thread = Thread(target=self.worker_thread) + thread = Thread(target=self._worker_thread) thread.daemon = True thread.start() diff --git a/regtest/TestRun.py b/regtest/TestRun.py index 3894e93..490fab1 100644 --- a/regtest/TestRun.py +++ b/regtest/TestRun.py @@ -156,7 +156,7 @@ class TestRun: for backend in backends: self.test(refs_path, doc_path, out_path, backend, n_doc, total_docs) - def worker_thread(self): + def _worker_thread(self): while True: doc, n_doc, total_docs = self._queue.get() self.run_test(doc, n_doc, total_docs) @@ -168,7 +168,7 @@ class TestRun: self.printer.printout_ln('Process %d is spawning %d worker threads...' % (os.getpid(), self.config.threads)) for n_thread in range(self.config.threads): - thread = Thread(target=self.worker_thread) + thread = Thread(target=self._worker_thread) thread.daemon = True thread.start() diff --git a/regtest/main.py b/regtest/main.py index f7c3ac7..9b4d340 100644 --- a/regtest/main.py +++ b/regtest/main.py @@ -23,6 +23,8 @@ import backends import os from Config import Config +from multiprocessing import cpu_count + class ListAction(argparse.Action): def __call__(self, parser, namespace, values, option_string = None): setattr(namespace, self.dest, values.split(',')) @@ -70,7 +72,11 @@ def main(args): parser.print_help() sys.exit(0) - Config(vars(ns)) + c = Config(vars(ns)) + + if c.threads <= 0: + c.threads = cpu_count() + try: commands.run(args) except commands.UnknownCommandError: -- 1.8.0.1
_______________________________________________ poppler mailing list [email protected] http://lists.freedesktop.org/mailman/listinfo/poppler
