Revision: 68b73101a87e
Author: Mikko Korpela <[email protected]>
Date: Wed Dec 14 01:32:15 2011
Log: move timeoutthread related modules to timeouts package
http://code.google.com/p/robotframework/source/detail?r=68b73101a87e
Added:
/src/robot/running/timeouts/robotthread.py
/src/robot/running/timeouts/stoppablethread.py
/utest/running/test_robotthread.py
/utest/running/thread_resources.py
Deleted:
/src/robot/utils/robotthread.py
/src/robot/utils/stoppablethread.py
/utest/utils/test_robotthread.py
/utest/utils/thread_resources.py
Modified:
/src/robot/running/timeouts/timeoutthread.py
=======================================
--- /dev/null
+++ /src/robot/running/timeouts/robotthread.py Wed Dec 14 01:32:15 2011
@@ -0,0 +1,57 @@
+# Copyright 2008-2011 Nokia Siemens Networks Oyj
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import sys
+from threading import Event
+
+if sys.platform.startswith('java'):
+ from java.lang import Thread, Runnable
+else:
+ from stoppablethread import Thread
+ Runnable = object
+
+
+class ThreadedRunner(Runnable):
+
+ def __init__(self, runnable, args=None, kwargs=None, notifier=None):
+ self._runnable = lambda: runnable(*(args or ()), **(kwargs or {}))
+ self._notifier = Event()
+ self._result = None
+ self._error = None
+ self._traceback = None
+ self._thread = None
+
+ def run(self):
+ try:
+ self._result = self._runnable()
+ except:
+ self._error, self._traceback = sys.exc_info()[1:]
+ self._notifier.set()
+
+ __call__ = run
+
+ def run_in_thread(self, timeout):
+ self._thread = Thread(self)
+ self._thread.setDaemon(True)
+ self._thread.start()
+ self._notifier.wait(timeout)
+ return self._notifier.isSet()
+
+ def get_result(self):
+ if self._error:
+ raise self._error, None, self._traceback
+ return self._result
+
+ def stop_thread(self):
+ self._thread.stop()
=======================================
--- /dev/null
+++ /src/robot/running/timeouts/stoppablethread.py Wed Dec 14 01:32:15 2011
@@ -0,0 +1,59 @@
+# Copyright 2008-2011 Nokia Siemens Networks Oyj
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import sys
+import threading
+
+
+class Thread(threading.Thread):
+ """A subclass of threading.Thread, with a stop() method.
+
+ Original version posted by Connelly Barnes to python-list and
available at
+ http://mail.python.org/pipermail/python-list/2004-May/219465.html
+
+ This version mainly has kill() changed to stop() to match
java.lang.Thread.
+
+ This is a hack but seems to be the best way the get this done. Only
used
+ in Python because in Jython we can use java.lang.Thread.
+ """
+
+ def __init__(self, runner):
+ threading.Thread.__init__(self, target=runner)
+ self._stopped = False
+
+ def start(self):
+ self.__run_backup = self.run
+ self.run = self.__run
+ threading.Thread.start(self)
+
+ def stop(self):
+ self._stopped = True
+
+ def __run(self):
+ """Hacked run function, which installs the trace."""
+ sys.settrace(self._globaltrace)
+ self.__run_backup()
+ self.run = self.__run_backup
+
+ def _globaltrace(self, frame, why, arg):
+ if why == 'call':
+ return self._localtrace
+ else:
+ return None
+
+ def _localtrace(self, frame, why, arg):
+ if self._stopped:
+ if why == 'line':
+ raise SystemExit()
+ return self._localtrace
=======================================
--- /dev/null
+++ /utest/running/test_robotthread.py Wed Dec 14 01:32:15 2011
@@ -0,0 +1,39 @@
+import unittest
+import sys
+from robot.running.timeouts.robotthread import ThreadedRunner
+
+from robot.utils.asserts import *
+from thread_resources import *
+
+
+
+class TestRunner(unittest.TestCase):
+
+ def test_passing(self):
+ runner = ThreadedRunner(passing)
+ runner.run()
+ assert_none(runner.get_result())
+
+ def test_returning(self):
+ for arg in [ 10, 'hello', ['l','i','s','t'], unittest]:
+ runner = ThreadedRunner(returning, args=(arg,))
+ runner.run()
+ assert_equals(runner.get_result(), arg)
+
+ def test_failing(self):
+ runner = ThreadedRunner(failing, args=('hello world',))
+ runner.run()
+ assert_raises_with_msg(Exception, 'hello world', runner.get_result)
+
+ if sys.platform.startswith('java'):
+ from java.lang import Error
+
+ def test_java_failing(self):
+ runner = ThreadedRunner(java_failing, args=('hi tellus',))
+ runner.run()
+ assert_raises_with_msg(Error, 'java.lang.Error: hi tellus',
+ runner.get_result)
+
+
+if __name__ == '__main__':
+ unittest.main()
=======================================
--- /dev/null
+++ /utest/running/thread_resources.py Wed Dec 14 01:32:15 2011
@@ -0,0 +1,27 @@
+import os, time
+
+
+class MyException(Exception):
+ pass
+
+def passing(*args):
+ pass
+
+def sleeping(s):
+ seconds = s
+ while seconds > 0:
+ time.sleep(min(seconds, 0.1))
+ seconds -= 0.1
+ os.environ['ROBOT_THREAD_TESTING'] = str(s)
+ return s
+
+def returning(arg):
+ return arg
+
+def failing(msg='xxx'):
+ raise MyException, msg
+
+if os.name == 'java':
+ from java.lang import Error
+ def java_failing(msg='zzz'):
+ raise Error(msg)
=======================================
--- /src/robot/utils/robotthread.py Wed Jun 15 23:28:18 2011
+++ /dev/null
@@ -1,57 +0,0 @@
-# Copyright 2008-2011 Nokia Siemens Networks Oyj
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import sys
-from threading import Event
-
-if sys.platform.startswith('java'):
- from java.lang import Thread, Runnable
-else:
- from stoppablethread import Thread
- Runnable = object
-
-
-class ThreadedRunner(Runnable):
-
- def __init__(self, runnable, args=None, kwargs=None, notifier=None):
- self._runnable = lambda: runnable(*(args or ()), **(kwargs or {}))
- self._notifier = Event()
- self._result = None
- self._error = None
- self._traceback = None
- self._thread = None
-
- def run(self):
- try:
- self._result = self._runnable()
- except:
- self._error, self._traceback = sys.exc_info()[1:]
- self._notifier.set()
-
- __call__ = run
-
- def run_in_thread(self, timeout):
- self._thread = Thread(self)
- self._thread.setDaemon(True)
- self._thread.start()
- self._notifier.wait(timeout)
- return self._notifier.isSet()
-
- def get_result(self):
- if self._error:
- raise self._error, None, self._traceback
- return self._result
-
- def stop_thread(self):
- self._thread.stop()
=======================================
--- /src/robot/utils/stoppablethread.py Sun Feb 6 01:24:10 2011
+++ /dev/null
@@ -1,59 +0,0 @@
-# Copyright 2008-2011 Nokia Siemens Networks Oyj
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import sys
-import threading
-
-
-class Thread(threading.Thread):
- """A subclass of threading.Thread, with a stop() method.
-
- Original version posted by Connelly Barnes to python-list and
available at
- http://mail.python.org/pipermail/python-list/2004-May/219465.html
-
- This version mainly has kill() changed to stop() to match
java.lang.Thread.
-
- This is a hack but seems to be the best way the get this done. Only
used
- in Python because in Jython we can use java.lang.Thread.
- """
-
- def __init__(self, runner):
- threading.Thread.__init__(self, target=runner)
- self._stopped = False
-
- def start(self):
- self.__run_backup = self.run
- self.run = self.__run
- threading.Thread.start(self)
-
- def stop(self):
- self._stopped = True
-
- def __run(self):
- """Hacked run function, which installs the trace."""
- sys.settrace(self._globaltrace)
- self.__run_backup()
- self.run = self.__run_backup
-
- def _globaltrace(self, frame, why, arg):
- if why == 'call':
- return self._localtrace
- else:
- return None
-
- def _localtrace(self, frame, why, arg):
- if self._stopped:
- if why == 'line':
- raise SystemExit()
- return self._localtrace
=======================================
--- /utest/utils/test_robotthread.py Sat May 29 07:03:02 2010
+++ /dev/null
@@ -1,39 +0,0 @@
-import unittest
-import sys
-
-from robot.utils.asserts import *
-
-from robot.utils.robotthread import ThreadedRunner
-from thread_resources import *
-
-
-class TestRunner(unittest.TestCase):
-
- def test_passing(self):
- runner = ThreadedRunner(passing)
- runner.run()
- assert_none(runner.get_result())
-
- def test_returning(self):
- for arg in [ 10, 'hello', ['l','i','s','t'], unittest]:
- runner = ThreadedRunner(returning, args=(arg,))
- runner.run()
- assert_equals(runner.get_result(), arg)
-
- def test_failing(self):
- runner = ThreadedRunner(failing, args=('hello world',))
- runner.run()
- assert_raises_with_msg(Exception, 'hello world', runner.get_result)
-
- if sys.platform.startswith('java'):
- from java.lang import Error
-
- def test_java_failing(self):
- runner = ThreadedRunner(java_failing, args=('hi tellus',))
- runner.run()
- assert_raises_with_msg(Error, 'java.lang.Error: hi tellus',
- runner.get_result)
-
-
-if __name__ == '__main__':
- unittest.main()
=======================================
--- /utest/utils/thread_resources.py Sat Dec 10 12:53:04 2011
+++ /dev/null
@@ -1,27 +0,0 @@
-import os, time
-
-
-class MyException(Exception):
- pass
-
-def passing(*args):
- pass
-
-def sleeping(s):
- seconds = s
- while seconds > 0:
- time.sleep(min(seconds, 0.1))
- seconds -= 0.1
- os.environ['ROBOT_THREAD_TESTING'] = str(s)
- return s
-
-def returning(arg):
- return arg
-
-def failing(msg='xxx'):
- raise MyException, msg
-
-if os.name == 'java':
- from java.lang import Error
- def java_failing(msg='zzz'):
- raise Error(msg)
=======================================
--- /src/robot/running/timeouts/timeoutthread.py Mon Dec 12 06:03:03 2011
+++ /src/robot/running/timeouts/timeoutthread.py Wed Dec 14 01:32:15 2011
@@ -13,7 +13,7 @@
# limitations under the License.
from robot import utils
from robot.errors import TimeoutError
-from robot.utils.robotthread import ThreadedRunner
+from .robotthread import ThreadedRunner
class Timeout(object):