New submission from Jono DiCarlo:

In the google spreadsheet for py3k tasks for the sprint last month, one
of the listed tasks was to convert Lib/test/test_thread to use the
unittest module, where it previously was using the old-style testing
(i.e. comparing output of print statements to a text file).  I'm
attatching a patch that makes that change.

This is my first time submitting a patch to Python, so please let me
know if I'm doing anything wrong (or if this patch is redundant).

----------
components: Tests
files: jdicarlo_stdlibtests_patch_sep7.diff
messages: 55899
nosy: JonoDiCarlo
severity: minor
status: open
title: Patch to make py3k/Lib/test/test_thread.py use unittest
type: rfe
versions: Python 3.0

__________________________________
Tracker <[EMAIL PROTECTED]>
<http://bugs.python.org/issue1163>
__________________________________
Index: output/test_thread
===================================================================
--- output/test_thread	(revision 58058)
+++ output/test_thread	(working copy)
@@ -1,18 +0,0 @@
-test_thread
-waiting for all tasks to complete
-all tasks done
-
-*** Barrier Test ***
-all tasks done
-
-*** Changing thread stack size ***
-caught expected ValueError setting stack_size(4096)
-successfully set stack_size(262144)
-successfully set stack_size(1048576)
-successfully set stack_size(0)
-trying stack_size = 262144
-waiting for all tasks to complete
-all tasks done
-trying stack_size = 1048576
-waiting for all tasks to complete
-all tasks done
Index: test_thread.py
===================================================================
--- test_thread.py	(revision 58058)
+++ test_thread.py	(working copy)
@@ -1,55 +1,99 @@
 # Very rudimentary test of thread module
-
 # Create a bunch of threads, let each do some work, wait until all are done
 
-from test.test_support import verbose
+# Imports
+
+import unittest
 import random
 import thread
 import time
 
-mutex = thread.allocate_lock()
-rmutex = thread.allocate_lock() # for calls to random
-running = 0
-done = thread.allocate_lock()
-done.acquire()
+from test import test_support
+from test.test_support import verbose
 
-numtasks = 10
+# Module-level variables
 
+_mutex = thread.allocate_lock()
+_rmutex = thread.allocate_lock() # for calls to random
+_done = thread.allocate_lock()
+_done.acquire()
+
+_running = 0 # Number of task threads currently running
+_numtasks = 10 # Number of task threads to start up in each test stage
+_next_ident = 0 # Identifier for next task thread to start
+_numtrips = 3 # Number of times that threads try to pass the barrier
+
+# Child thread functions
+
 def task(ident):
-    global running
-    rmutex.acquire()
-    delay = random.random() * numtasks
-    rmutex.release()
+    # Function that will be executed by child threads.
+    # ident is a number that uniquely identifies the thread.
+    # Sleeps for a random amount of time before finishing.
+    global _running
+    _rmutex.acquire()
+    delay = random.random() * _numtasks
+    _rmutex.release()
     if verbose:
         print('task', ident, 'will run for', round(delay, 1), 'sec')
     time.sleep(delay)
     if verbose:
         print('task', ident, 'done')
-    mutex.acquire()
-    running = running - 1
-    if running == 0:
-        done.release()
-    mutex.release()
+    _mutex.acquire()
+    _running = _running - 1
+    if _running == 0:
+        _done.release()
+    _mutex.release()
 
-next_ident = 0
-def newtask():
-    global next_ident, running
-    mutex.acquire()
-    next_ident = next_ident + 1
+
+def new_task():
+    # Creates a new child thread which will execute the
+    # task() function.
+    global _next_ident, _running
+    _mutex.acquire()
+    _next_ident = _next_ident + 1
     if verbose:
-        print('creating task', next_ident)
-    thread.start_new_thread(task, (next_ident,))
-    running = running + 1
-    mutex.release()
+        print('creating task', _next_ident)
+    thread.start_new_thread(task, (_next_ident,))
+    _running = _running + 1
+    _mutex.release()
 
-for i in range(numtasks):
-    newtask()
 
-print('waiting for all tasks to complete')
-done.acquire()
-print('all tasks done')
+def task_with_barrier(ident, bar):
+    # A function to be executed by child threads.
+    # ident is a number that uniquely identifies the thread.
+    # bar must be a _Barrier object.
+    # The thread tries to enter, then leave, this barrier.
+    global _running
+    for i in range(_numtrips):
+        if ident == 0:
+            # give it a good chance to enter the next
+            # barrier before the others are all out
+            # of the current one
+            delay = 0.001
+        else:
+            _rmutex.acquire()
+            delay = random.random() * _numtasks
+            _rmutex.release()
+        if verbose:
+            print('task', ident, 'will run for', round(delay, 1), 'sec')
+        time.sleep(delay)
+        if verbose:
+            print('task', ident, 'entering barrier', i)
+        bar.enter()
+        if verbose:
+            print('task', ident, 'leaving barrier', i)
+    _mutex.acquire()
+    _running -= 1
+    # Must release _mutex before releasing _done, else the main thread can
+    # exit and set _mutex to None as part of global teardown; then
+    # _mutex.release() raises AttributeError.
+    finished = _running == 0
+    _mutex.release()
+    if finished:
+        _done.release()
 
-class barrier:
+
+class _Barrier:
     def __init__(self, n):
         self.n = n
         self.waiting = 0
@@ -75,86 +119,73 @@
             return
         checkout.release()
 
-numtrips = 3
-def task2(ident):
-    global running
-    for i in range(numtrips):
-        if ident == 0:
-            # give it a good chance to enter the next
-            # barrier before the others are all out
-            # of the current one
-            delay = 0.001
-        else:
-            rmutex.acquire()
-            delay = random.random() * numtasks
-            rmutex.release()
-        if verbose:
-            print('task', ident, 'will run for', round(delay, 1), 'sec')
-        time.sleep(delay)
-        if verbose:
-            print('task', ident, 'entering barrier', i)
-        bar.enter()
-        if verbose:
-            print('task', ident, 'leaving barrier', i)
-    mutex.acquire()
-    running -= 1
-    # Must release mutex before releasing done, else the main thread can
-    # exit and set mutex to None as part of global teardown; then
-    # mutex.release() raises AttributeError.
-    finished = running == 0
-    mutex.release()
-    if finished:
-        done.release()
 
-print('\n*** Barrier Test ***')
-if done.acquire(0):
-    raise ValueError("'done' should have remained acquired")
-bar = barrier(numtasks)
-running = numtasks
-for i in range(numtasks):
-    thread.start_new_thread(task2, (i,))
-done.acquire()
-print('all tasks done')
+class ThreadTests(unittest.TestCase):
+    def testBasicMultiThreading(self):
+        for i in range(_numtasks):
+            new_task()
 
-# not all platforms support changing thread stack size
-print('\n*** Changing thread stack size ***')
-if thread.stack_size() != 0:
-    raise ValueError("initial stack_size not 0")
+        print('waiting for all tasks to complete')
+        _done.acquire()
+        print('all tasks done')
 
-thread.stack_size(0)
-if thread.stack_size() != 0:
-    raise ValueError("stack_size not reset to default")
+    def testBarrier(self):
+        global _running
+        self.assertFalse(_done.acquire(0), 
+                         "'_done' should have remained acquired")
+        bar = _Barrier(_numtasks)
+        _running = _numtasks
+        for i in range(_numtasks):
+            thread.start_new_thread(task_with_barrier, (i, bar))
+        _done.acquire()
+        print('all tasks done')
 
-from os import name as os_name
-if os_name in ("nt", "os2", "posix"):
+    def testChangeStackSize(self):
+        self.assertEqual(0, thread.stack_size(),
+                         'initial stack_size not 0')
 
-    tss_supported = 1
-    try:
-        thread.stack_size(4096)
-    except ValueError:
-        print('caught expected ValueError setting stack_size(4096)')
-    except thread.error:
-        tss_supported = 0
-        print('platform does not support changing thread stack size')
+        thread.stack_size(0)
+        self.assertEqual(0, thread.stack_size(),
+                   	 'stack_size not reset to default')
+        # not all platforms support changing thread stack size
+        from os import name as os_name
+        if not os_name in ("nt", "os2", "posix"):
+            return
 
-    if tss_supported:
-        failed = lambda s, e: s != e
-        fail_msg = "stack_size(%d) failed - should succeed"
-        for tss in (262144, 0x100000, 0):
-            thread.stack_size(tss)
-            if failed(thread.stack_size(), tss):
-                raise ValueError(fail_msg % tss)
-            print('successfully set stack_size(%d)' % tss)
+        tss_supported = 1
+        try:
+            thread.stack_size(4096)
+        except ValueError:
+            print('caught expected ValueError setting stack_size(4096)')
+        except thread.error:
+            tss_supported = 0
+            print('platform does not support changing thread stack size')
 
-        for tss in (262144, 0x100000):
-            print('trying stack_size = %d' % tss)
-            next_ident = 0
-            for i in range(numtasks):
-                newtask()
+        if tss_supported:
+            fail_msg = "stack_size(%d) failed - should succeed"
+            for tss in (262144, 0x100000, 0):
+                thread.stack_size(tss)
+                self.assertEqual( thread.stack_size(),
+                                  tss,
+                                  fail_msg % tss )
+                print('successfully set stack_size(%d)' % tss)
 
-            print('waiting for all tasks to complete')
-            done.acquire()
-            print('all tasks done')
+            for tss in (262144, 0x100000):
+                print('trying stack_size = %d' % tss)
+                _next_ident = 0
+                for i in range(_numtasks):
+                    new_task()
 
-        # reset stack size to default
-        thread.stack_size(0)
+                print('waiting for all tasks to complete')
+                _done.acquire()
+                print('all tasks done')
+
+            # reset stack size to default
+            thread.stack_size(0)
+
+
+def test_main():
+    test_support.run_unittest(ThreadTests)
+
+if __name__=='__main__':
+    test_main()
_______________________________________________
Python-bugs-list mailing list 
Unsubscribe: 
http://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com

Reply via email to