New submission from Jono DiCarlo:
In the google spreadsheet for py3k tasks for the sprint last month, one
of the listed tasks was to convert Lib/test/test_thread to use the
unittest module, where it previously was using the old-style testing
(i.e. comparing output of print statements to a text file). I'm
attatching a patch that makes that change.
This is my first time submitting a patch to Python, so please let me
know if I'm doing anything wrong (or if this patch is redundant).
----------
components: Tests
files: jdicarlo_stdlibtests_patch_sep7.diff
messages: 55899
nosy: JonoDiCarlo
severity: minor
status: open
title: Patch to make py3k/Lib/test/test_thread.py use unittest
type: rfe
versions: Python 3.0
__________________________________
Tracker <[EMAIL PROTECTED]>
<http://bugs.python.org/issue1163>
__________________________________
Index: output/test_thread
===================================================================
--- output/test_thread (revision 58058)
+++ output/test_thread (working copy)
@@ -1,18 +0,0 @@
-test_thread
-waiting for all tasks to complete
-all tasks done
-
-*** Barrier Test ***
-all tasks done
-
-*** Changing thread stack size ***
-caught expected ValueError setting stack_size(4096)
-successfully set stack_size(262144)
-successfully set stack_size(1048576)
-successfully set stack_size(0)
-trying stack_size = 262144
-waiting for all tasks to complete
-all tasks done
-trying stack_size = 1048576
-waiting for all tasks to complete
-all tasks done
Index: test_thread.py
===================================================================
--- test_thread.py (revision 58058)
+++ test_thread.py (working copy)
@@ -1,55 +1,99 @@
# Very rudimentary test of thread module
-
# Create a bunch of threads, let each do some work, wait until all are done
-from test.test_support import verbose
+# Imports
+
+import unittest
import random
import thread
import time
-mutex = thread.allocate_lock()
-rmutex = thread.allocate_lock() # for calls to random
-running = 0
-done = thread.allocate_lock()
-done.acquire()
+from test import test_support
+from test.test_support import verbose
-numtasks = 10
+# Module-level variables
+_mutex = thread.allocate_lock()
+_rmutex = thread.allocate_lock() # for calls to random
+_done = thread.allocate_lock()
+_done.acquire()
+
+_running = 0 # Number of task threads currently running
+_numtasks = 10 # Number of task threads to start up in each test stage
+_next_ident = 0 # Identifier for next task thread to start
+_numtrips = 3 # Number of times that threads try to pass the barrier
+
+# Child thread functions
+
def task(ident):
- global running
- rmutex.acquire()
- delay = random.random() * numtasks
- rmutex.release()
+ # Function that will be executed by child threads.
+ # ident is a number that uniquely identifies the thread.
+ # Sleeps for a random amount of time before finishing.
+ global _running
+ _rmutex.acquire()
+ delay = random.random() * _numtasks
+ _rmutex.release()
if verbose:
print('task', ident, 'will run for', round(delay, 1), 'sec')
time.sleep(delay)
if verbose:
print('task', ident, 'done')
- mutex.acquire()
- running = running - 1
- if running == 0:
- done.release()
- mutex.release()
+ _mutex.acquire()
+ _running = _running - 1
+ if _running == 0:
+ _done.release()
+ _mutex.release()
-next_ident = 0
-def newtask():
- global next_ident, running
- mutex.acquire()
- next_ident = next_ident + 1
+
+def new_task():
+ # Creates a new child thread which will execute the
+ # task() function.
+ global _next_ident, _running
+ _mutex.acquire()
+ _next_ident = _next_ident + 1
if verbose:
- print('creating task', next_ident)
- thread.start_new_thread(task, (next_ident,))
- running = running + 1
- mutex.release()
+ print('creating task', _next_ident)
+ thread.start_new_thread(task, (_next_ident,))
+ _running = _running + 1
+ _mutex.release()
-for i in range(numtasks):
- newtask()
-print('waiting for all tasks to complete')
-done.acquire()
-print('all tasks done')
+def task_with_barrier(ident, bar):
+ # A function to be executed by child threads.
+ # ident is a number that uniquely identifies the thread.
+ # bar must be a _Barrier object.
+ # The thread tries to enter, then leave, this barrier.
+ global _running
+ for i in range(_numtrips):
+ if ident == 0:
+ # give it a good chance to enter the next
+ # barrier before the others are all out
+ # of the current one
+ delay = 0.001
+ else:
+ _rmutex.acquire()
+ delay = random.random() * _numtasks
+ _rmutex.release()
+ if verbose:
+ print('task', ident, 'will run for', round(delay, 1), 'sec')
+ time.sleep(delay)
+ if verbose:
+ print('task', ident, 'entering barrier', i)
+ bar.enter()
+ if verbose:
+ print('task', ident, 'leaving barrier', i)
+ _mutex.acquire()
+ _running -= 1
+ # Must release _mutex before releasing _done, else the main thread can
+ # exit and set _mutex to None as part of global teardown; then
+ # _mutex.release() raises AttributeError.
+ finished = _running == 0
+ _mutex.release()
+ if finished:
+ _done.release()
-class barrier:
+
+class _Barrier:
def __init__(self, n):
self.n = n
self.waiting = 0
@@ -75,86 +119,73 @@
return
checkout.release()
-numtrips = 3
-def task2(ident):
- global running
- for i in range(numtrips):
- if ident == 0:
- # give it a good chance to enter the next
- # barrier before the others are all out
- # of the current one
- delay = 0.001
- else:
- rmutex.acquire()
- delay = random.random() * numtasks
- rmutex.release()
- if verbose:
- print('task', ident, 'will run for', round(delay, 1), 'sec')
- time.sleep(delay)
- if verbose:
- print('task', ident, 'entering barrier', i)
- bar.enter()
- if verbose:
- print('task', ident, 'leaving barrier', i)
- mutex.acquire()
- running -= 1
- # Must release mutex before releasing done, else the main thread can
- # exit and set mutex to None as part of global teardown; then
- # mutex.release() raises AttributeError.
- finished = running == 0
- mutex.release()
- if finished:
- done.release()
-print('\n*** Barrier Test ***')
-if done.acquire(0):
- raise ValueError("'done' should have remained acquired")
-bar = barrier(numtasks)
-running = numtasks
-for i in range(numtasks):
- thread.start_new_thread(task2, (i,))
-done.acquire()
-print('all tasks done')
+class ThreadTests(unittest.TestCase):
+ def testBasicMultiThreading(self):
+ for i in range(_numtasks):
+ new_task()
-# not all platforms support changing thread stack size
-print('\n*** Changing thread stack size ***')
-if thread.stack_size() != 0:
- raise ValueError("initial stack_size not 0")
+ print('waiting for all tasks to complete')
+ _done.acquire()
+ print('all tasks done')
-thread.stack_size(0)
-if thread.stack_size() != 0:
- raise ValueError("stack_size not reset to default")
+ def testBarrier(self):
+ global _running
+ self.assertFalse(_done.acquire(0),
+ "'_done' should have remained acquired")
+ bar = _Barrier(_numtasks)
+ _running = _numtasks
+ for i in range(_numtasks):
+ thread.start_new_thread(task_with_barrier, (i, bar))
+ _done.acquire()
+ print('all tasks done')
-from os import name as os_name
-if os_name in ("nt", "os2", "posix"):
+ def testChangeStackSize(self):
+ self.assertEqual(0, thread.stack_size(),
+ 'initial stack_size not 0')
- tss_supported = 1
- try:
- thread.stack_size(4096)
- except ValueError:
- print('caught expected ValueError setting stack_size(4096)')
- except thread.error:
- tss_supported = 0
- print('platform does not support changing thread stack size')
+ thread.stack_size(0)
+ self.assertEqual(0, thread.stack_size(),
+ 'stack_size not reset to default')
+ # not all platforms support changing thread stack size
+ from os import name as os_name
+ if not os_name in ("nt", "os2", "posix"):
+ return
- if tss_supported:
- failed = lambda s, e: s != e
- fail_msg = "stack_size(%d) failed - should succeed"
- for tss in (262144, 0x100000, 0):
- thread.stack_size(tss)
- if failed(thread.stack_size(), tss):
- raise ValueError(fail_msg % tss)
- print('successfully set stack_size(%d)' % tss)
+ tss_supported = 1
+ try:
+ thread.stack_size(4096)
+ except ValueError:
+ print('caught expected ValueError setting stack_size(4096)')
+ except thread.error:
+ tss_supported = 0
+ print('platform does not support changing thread stack size')
- for tss in (262144, 0x100000):
- print('trying stack_size = %d' % tss)
- next_ident = 0
- for i in range(numtasks):
- newtask()
+ if tss_supported:
+ fail_msg = "stack_size(%d) failed - should succeed"
+ for tss in (262144, 0x100000, 0):
+ thread.stack_size(tss)
+ self.assertEqual( thread.stack_size(),
+ tss,
+ fail_msg % tss )
+ print('successfully set stack_size(%d)' % tss)
- print('waiting for all tasks to complete')
- done.acquire()
- print('all tasks done')
+ for tss in (262144, 0x100000):
+ print('trying stack_size = %d' % tss)
+ _next_ident = 0
+ for i in range(_numtasks):
+ new_task()
- # reset stack size to default
- thread.stack_size(0)
+ print('waiting for all tasks to complete')
+ _done.acquire()
+ print('all tasks done')
+
+ # reset stack size to default
+ thread.stack_size(0)
+
+
+def test_main():
+ test_support.run_unittest(ThreadTests)
+
+if __name__=='__main__':
+ test_main()
_______________________________________________
Python-bugs-list mailing list
Unsubscribe:
http://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com