Revision: 68b73101a87e
Author:   Mikko Korpela <[email protected]>
Date:     Wed Dec 14 01:32:15 2011
Log:      move timeoutthread related modules to timeouts package
http://code.google.com/p/robotframework/source/detail?r=68b73101a87e

Added:
 /src/robot/running/timeouts/robotthread.py
 /src/robot/running/timeouts/stoppablethread.py
 /utest/running/test_robotthread.py
 /utest/running/thread_resources.py
Deleted:
 /src/robot/utils/robotthread.py
 /src/robot/utils/stoppablethread.py
 /utest/utils/test_robotthread.py
 /utest/utils/thread_resources.py
Modified:
 /src/robot/running/timeouts/timeoutthread.py

=======================================
--- /dev/null
+++ /src/robot/running/timeouts/robotthread.py  Wed Dec 14 01:32:15 2011
@@ -0,0 +1,57 @@
+#  Copyright 2008-2011 Nokia Siemens Networks Oyj
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+import sys
+from threading import Event
+
+if sys.platform.startswith('java'):
+    from java.lang import Thread, Runnable
+else:
+    from stoppablethread import Thread
+    Runnable = object
+
+
+class ThreadedRunner(Runnable):
+
+    def __init__(self, runnable, args=None, kwargs=None, notifier=None):
+        self._runnable = lambda: runnable(*(args or ()), **(kwargs or {}))
+        self._notifier = Event()
+        self._result = None
+        self._error = None
+        self._traceback = None
+        self._thread = None
+
+    def run(self):
+        try:
+            self._result = self._runnable()
+        except:
+            self._error, self._traceback = sys.exc_info()[1:]
+        self._notifier.set()
+
+    __call__ = run
+
+    def run_in_thread(self, timeout):
+        self._thread = Thread(self)
+        self._thread.setDaemon(True)
+        self._thread.start()
+        self._notifier.wait(timeout)
+        return self._notifier.isSet()
+
+    def get_result(self):
+        if self._error:
+            raise self._error, None, self._traceback
+        return self._result
+
+    def stop_thread(self):
+        self._thread.stop()
=======================================
--- /dev/null
+++ /src/robot/running/timeouts/stoppablethread.py      Wed Dec 14 01:32:15 2011
@@ -0,0 +1,59 @@
+#  Copyright 2008-2011 Nokia Siemens Networks Oyj
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+import sys
+import threading
+
+
+class Thread(threading.Thread):
+    """A subclass of threading.Thread, with a stop() method.
+
+ Original version posted by Connelly Barnes to python-list and available at
+    http://mail.python.org/pipermail/python-list/2004-May/219465.html
+
+ This version mainly has kill() changed to stop() to match java.lang.Thread.
+
+ This is a hack but seems to be the best way the get this done. Only used
+    in Python because in Jython we can use java.lang.Thread.
+    """
+
+    def __init__(self, runner):
+        threading.Thread.__init__(self, target=runner)
+        self._stopped = False
+
+    def start(self):
+        self.__run_backup = self.run
+        self.run = self.__run
+        threading.Thread.start(self)
+
+    def stop(self):
+        self._stopped = True
+
+    def __run(self):
+        """Hacked run function, which installs the trace."""
+        sys.settrace(self._globaltrace)
+        self.__run_backup()
+        self.run = self.__run_backup
+
+    def _globaltrace(self, frame, why, arg):
+        if why == 'call':
+            return self._localtrace
+        else:
+            return None
+
+    def _localtrace(self, frame, why, arg):
+        if self._stopped:
+            if why == 'line':
+                raise SystemExit()
+        return self._localtrace
=======================================
--- /dev/null
+++ /utest/running/test_robotthread.py  Wed Dec 14 01:32:15 2011
@@ -0,0 +1,39 @@
+import unittest
+import sys
+from robot.running.timeouts.robotthread import ThreadedRunner
+
+from robot.utils.asserts import *
+from thread_resources import *
+
+
+
+class TestRunner(unittest.TestCase):
+
+    def test_passing(self):
+        runner = ThreadedRunner(passing)
+        runner.run()
+        assert_none(runner.get_result())
+
+    def test_returning(self):
+        for arg in [ 10, 'hello', ['l','i','s','t'], unittest]:
+            runner = ThreadedRunner(returning, args=(arg,))
+            runner.run()
+            assert_equals(runner.get_result(), arg)
+
+    def test_failing(self):
+        runner = ThreadedRunner(failing, args=('hello world',))
+        runner.run()
+        assert_raises_with_msg(Exception, 'hello world', runner.get_result)
+
+    if sys.platform.startswith('java'):
+        from java.lang import Error
+
+        def test_java_failing(self):
+            runner = ThreadedRunner(java_failing, args=('hi tellus',))
+            runner.run()
+            assert_raises_with_msg(Error, 'java.lang.Error: hi tellus',
+                                   runner.get_result)
+
+
+if __name__ == '__main__':
+    unittest.main()
=======================================
--- /dev/null
+++ /utest/running/thread_resources.py  Wed Dec 14 01:32:15 2011
@@ -0,0 +1,27 @@
+import os, time
+
+
+class MyException(Exception):
+    pass
+
+def passing(*args):
+    pass
+
+def sleeping(s):
+    seconds = s
+    while seconds > 0:
+        time.sleep(min(seconds, 0.1))
+        seconds -= 0.1
+    os.environ['ROBOT_THREAD_TESTING'] = str(s)
+    return s
+
+def returning(arg):
+    return arg
+
+def failing(msg='xxx'):
+    raise MyException, msg
+
+if os.name == 'java':
+    from java.lang import Error
+    def java_failing(msg='zzz'):
+        raise Error(msg)
=======================================
--- /src/robot/utils/robotthread.py     Wed Jun 15 23:28:18 2011
+++ /dev/null
@@ -1,57 +0,0 @@
-#  Copyright 2008-2011 Nokia Siemens Networks Oyj
-#
-#  Licensed under the Apache License, Version 2.0 (the "License");
-#  you may not use this file except in compliance with the License.
-#  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-#  limitations under the License.
-
-import sys
-from threading import Event
-
-if sys.platform.startswith('java'):
-    from java.lang import Thread, Runnable
-else:
-    from stoppablethread import Thread
-    Runnable = object
-
-
-class ThreadedRunner(Runnable):
-
-    def __init__(self, runnable, args=None, kwargs=None, notifier=None):
-        self._runnable = lambda: runnable(*(args or ()), **(kwargs or {}))
-        self._notifier = Event()
-        self._result = None
-        self._error = None
-        self._traceback = None
-        self._thread = None
-
-    def run(self):
-        try:
-            self._result = self._runnable()
-        except:
-            self._error, self._traceback = sys.exc_info()[1:]
-        self._notifier.set()
-
-    __call__ = run
-
-    def run_in_thread(self, timeout):
-        self._thread = Thread(self)
-        self._thread.setDaemon(True)
-        self._thread.start()
-        self._notifier.wait(timeout)
-        return self._notifier.isSet()
-
-    def get_result(self):
-        if self._error:
-            raise self._error, None, self._traceback
-        return self._result
-
-    def stop_thread(self):
-        self._thread.stop()
=======================================
--- /src/robot/utils/stoppablethread.py Sun Feb  6 01:24:10 2011
+++ /dev/null
@@ -1,59 +0,0 @@
-#  Copyright 2008-2011 Nokia Siemens Networks Oyj
-#
-#  Licensed under the Apache License, Version 2.0 (the "License");
-#  you may not use this file except in compliance with the License.
-#  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-#  limitations under the License.
-
-import sys
-import threading
-
-
-class Thread(threading.Thread):
-    """A subclass of threading.Thread, with a stop() method.
-
- Original version posted by Connelly Barnes to python-list and available at
-    http://mail.python.org/pipermail/python-list/2004-May/219465.html
-
- This version mainly has kill() changed to stop() to match java.lang.Thread.
-
- This is a hack but seems to be the best way the get this done. Only used
-    in Python because in Jython we can use java.lang.Thread.
-    """
-
-    def __init__(self, runner):
-        threading.Thread.__init__(self, target=runner)
-        self._stopped = False
-
-    def start(self):
-        self.__run_backup = self.run
-        self.run = self.__run
-        threading.Thread.start(self)
-
-    def stop(self):
-        self._stopped = True
-
-    def __run(self):
-        """Hacked run function, which installs the trace."""
-        sys.settrace(self._globaltrace)
-        self.__run_backup()
-        self.run = self.__run_backup
-
-    def _globaltrace(self, frame, why, arg):
-        if why == 'call':
-            return self._localtrace
-        else:
-            return None
-
-    def _localtrace(self, frame, why, arg):
-        if self._stopped:
-            if why == 'line':
-                raise SystemExit()
-        return self._localtrace
=======================================
--- /utest/utils/test_robotthread.py    Sat May 29 07:03:02 2010
+++ /dev/null
@@ -1,39 +0,0 @@
-import unittest
-import sys
-
-from robot.utils.asserts import *
-
-from robot.utils.robotthread import ThreadedRunner
-from thread_resources import *
-
-
-class TestRunner(unittest.TestCase):
-
-    def test_passing(self):
-        runner = ThreadedRunner(passing)
-        runner.run()
-        assert_none(runner.get_result())
-
-    def test_returning(self):
-        for arg in [ 10, 'hello', ['l','i','s','t'], unittest]:
-            runner = ThreadedRunner(returning, args=(arg,))
-            runner.run()
-            assert_equals(runner.get_result(), arg)
-
-    def test_failing(self):
-        runner = ThreadedRunner(failing, args=('hello world',))
-        runner.run()
-        assert_raises_with_msg(Exception, 'hello world', runner.get_result)
-
-    if sys.platform.startswith('java'):
-        from java.lang import Error
-
-        def test_java_failing(self):
-            runner = ThreadedRunner(java_failing, args=('hi tellus',))
-            runner.run()
-            assert_raises_with_msg(Error, 'java.lang.Error: hi tellus',
-                                   runner.get_result)
-
-
-if __name__ == '__main__':
-    unittest.main()
=======================================
--- /utest/utils/thread_resources.py    Sat Dec 10 12:53:04 2011
+++ /dev/null
@@ -1,27 +0,0 @@
-import os, time
-
-
-class MyException(Exception):
-    pass
-
-def passing(*args):
-    pass
-
-def sleeping(s):
-    seconds = s
-    while seconds > 0:
-        time.sleep(min(seconds, 0.1))
-        seconds -= 0.1
-    os.environ['ROBOT_THREAD_TESTING'] = str(s)
-    return s
-
-def returning(arg):
-    return arg
-
-def failing(msg='xxx'):
-    raise MyException, msg
-
-if os.name == 'java':
-    from java.lang import Error
-    def java_failing(msg='zzz'):
-        raise Error(msg)
=======================================
--- /src/robot/running/timeouts/timeoutthread.py        Mon Dec 12 06:03:03 2011
+++ /src/robot/running/timeouts/timeoutthread.py        Wed Dec 14 01:32:15 2011
@@ -13,7 +13,7 @@
 #  limitations under the License.
 from robot import utils
 from robot.errors import TimeoutError
-from robot.utils.robotthread import ThreadedRunner
+from .robotthread import ThreadedRunner


 class Timeout(object):

Reply via email to