Author: Matti Picus <[email protected]>
Branch: vmprof-win32
Changeset: r92021:91789212fc18
Date: 2017-08-01 23:24 +0300
http://bitbucket.org/pypy/pypy/changeset/91789212fc18/

Log:    close files

diff --git a/pypy/module/_vmprof/test/test__vmprof.py 
b/pypy/module/_vmprof/test/test__vmprof.py
--- a/pypy/module/_vmprof/test/test__vmprof.py
+++ b/pypy/module/_vmprof/test/test__vmprof.py
@@ -1,6 +1,5 @@
 import sys
 from rpython.tool.udir import udir
-from pypy.tool.pytest.objspace import gettestobjspace
 
 class AppTestVMProf(object):
     spaceconfig = {'usemodules': ['_vmprof', 'struct']}
@@ -12,82 +11,90 @@
             '__pypy__' not in sys.builtin_module_names)
 
     def test_import_vmprof(self):
+        import _vmprof
         tmpfile = open(self.tmpfilename, 'wb')
-        tmpfileno = tmpfile.fileno()
         tmpfile2 = open(self.tmpfilename2, 'wb')
-        tmpfileno2 = tmpfile2.fileno()
+        try:
+            tmpfileno = tmpfile.fileno()
+            tmpfileno2 = tmpfile2.fileno()
 
-        import struct, sys, gc
+            import struct, gc
 
-        WORD = struct.calcsize('l')
+            WORD = struct.calcsize('l')
 
-        def count(s):
-            i = 0
-            count = 0
-            i += 5 * WORD # header
-            assert s[i    ] == '\x05'    # MARKER_HEADER
-            assert s[i + 1] == '\x00'    # 0
-            assert s[i + 2] == '\x06'    # VERSION_TIMESTAMP
-            assert s[i + 3] == '\x08'    # PROFILE_RPYTHON
-            assert s[i + 4] == chr(4)    # len('pypy')
-            assert s[i + 5: i + 9] == 'pypy'
-            i += 9
-            while i < len(s):
-                if s[i] == '\x03':
-                    break
-                elif s[i] == '\x01':
-                    i += 1
-                    _, size = struct.unpack("ll", s[i:i + 2 * WORD])
-                    i += 2 * WORD + size * struct.calcsize("P")
-                    i += WORD    # thread id
-                elif s[i] == '\x02':
-                    i += 1
-                    _, size = struct.unpack("ll", s[i:i + 2 * WORD])
-                    count += 1
-                    i += 2 * WORD + size
-                elif s[i] == '\x06':
-                    print(s[i:i+24])
-                    i += 1+8+8+8
-                elif s[i] == '\x07':
-                    i += 1
-                    # skip string
-                    size, = struct.unpack("l", s[i:i + WORD])
-                    i += WORD+size
-                    # skip string
-                    size, = struct.unpack("l", s[i:i + WORD])
-                    i += WORD+size
-                else:
-                    raise AssertionError(ord(s[i]))
-            return count
+            def count(s):
+                i = 0
+                count = 0
+                i += 5 * WORD # header
+                assert s[i    ] == '\x05'    # MARKER_HEADER
+                assert s[i + 1] == '\x00'    # 0
+                assert s[i + 2] == '\x06'    # VERSION_TIMESTAMP
+                assert s[i + 3] == '\x08'    # PROFILE_RPYTHON
+                assert s[i + 4] == chr(4)    # len('pypy')
+                assert s[i + 5: i + 9] == 'pypy'
+                i += 9
+                while i < len(s):
+                    if s[i] == '\x03':
+                        break
+                    elif s[i] == '\x01':
+                        i += 1
+                        _, size = struct.unpack("ll", s[i:i + 2 * WORD])
+                        i += 2 * WORD + size * struct.calcsize("P")
+                        i += WORD    # thread id
+                    elif s[i] == '\x02':
+                        i += 1
+                        _, size = struct.unpack("ll", s[i:i + 2 * WORD])
+                        count += 1
+                        i += 2 * WORD + size
+                    elif s[i] == '\x06':
+                        print(s[i:i+24])
+                        i += 1+8+8+8
+                    elif s[i] == '\x07':
+                        i += 1
+                        # skip string
+                        size, = struct.unpack("l", s[i:i + WORD])
+                        i += WORD+size
+                        # skip string
+                        size, = struct.unpack("l", s[i:i + WORD])
+                        i += WORD+size
+                    else:
+                        raise AssertionError(ord(s[i]))
+                return count
 
-        import _vmprof
-        gc.collect()  # try to make the weakref list deterministic
-        gc.collect()  # by freeing all dead code objects
-        _vmprof.enable(tmpfileno, 0.01, 0, 0, 0, 0)
-        _vmprof.disable()
-        s = open(self.tmpfilename, 'rb').read()
-        no_of_codes = count(s)
-        assert no_of_codes > 10
-        d = {}
+            gc.collect()  # try to make the weakref list deterministic
+            gc.collect()  # by freeing all dead code objects
+            _vmprof.enable(tmpfileno, 0.01, 0, 0, 0, 0)
+            _vmprof.disable()
+            with open(self.tmpfilename, 'rb') as fid:
+                s = fid.read()
+            no_of_codes = count(s)
+            assert no_of_codes > 10
+            d = {}
 
-        exec """def foo():
-            pass
-        """ in d
+            exec """def foo():
+                pass
+            """ in d
 
-        gc.collect()
-        gc.collect()
-        _vmprof.enable(tmpfileno2, 0.01, 0, 0, 0, 0)
+            gc.collect()
+            gc.collect()
+            _vmprof.enable(tmpfileno2, 0.01, 0, 0, 0, 0)
 
-        exec """def foo2():
-            pass
-        """ in d
+            exec """def foo2():
+                pass
+            """ in d
 
-        _vmprof.disable()
-        s = open(self.tmpfilename2, 'rb').read()
-        no_of_codes2 = count(s)
-        assert "py:foo:" in s
-        assert "py:foo2:" in s
-        assert no_of_codes2 >= no_of_codes + 2 # some extra codes from tests
+            _vmprof.disable()
+            with open(self.tmpfilename2, 'rb') as fid:
+                s = fid.read()
+            no_of_codes2 = count(s)
+            assert "py:foo:" in s
+            assert "py:foo2:" in s
+            assert no_of_codes2 >= no_of_codes + 2 # some extra codes from 
tests
+        finally:
+            if _vmprof.is_enabled():
+                _vmprof.disable()
+            tmpfile.close()
+            tmpfile2.close()
 
     def test_enable_ovf(self):
         import _vmprof
@@ -100,31 +107,34 @@
 
     def test_is_enabled(self):
         import _vmprof
-        tmpfile = open(self.tmpfilename, 'wb')
-        assert _vmprof.is_enabled() is False
-        _vmprof.enable(tmpfile.fileno(), 0.01, 0, 0, 0, 0)
-        assert _vmprof.is_enabled() is True
-        _vmprof.disable()
-        assert _vmprof.is_enabled() is False
+        with open(self.tmpfilename, 'wb') as tmpfile:
+            assert _vmprof.is_enabled() is False
+            _vmprof.enable(tmpfile.fileno(), 0.01, 0, 0, 0, 0)
+            assert _vmprof.is_enabled() is True
+            _vmprof.disable()
+            assert _vmprof.is_enabled() is False
 
     def test_get_profile_path(self):
         import _vmprof
-        tmpfile = open(self.tmpfilename, 'wb')
-        assert _vmprof.get_profile_path() is None
-        _vmprof.enable(tmpfile.fileno(), 0.01, 0, 0, 0, 0)
-        path = _vmprof.get_profile_path()
-        if path != tmpfile.name:
-            with open(path, "rb") as fd1:
-                assert fd1.read() == tmpfile.read()
-        _vmprof.disable()
-        assert _vmprof.get_profile_path() is None
+        try:
+            with open(self.tmpfilename, 'wb') as tmpfile:
+                assert _vmprof.get_profile_path() is None
+                _vmprof.enable(tmpfile.fileno(), 0.01, 0, 0, 0, 0)
+                path = _vmprof.get_profile_path()
+                if path != tmpfile.name:
+                    with open(path, "rb") as fd1:
+                        assert fd1.read() == tmpfile.read()
+                _vmprof.disable()
+                assert _vmprof.get_profile_path() is None
+        finally:
+            if _vmprof.is_enabled():
+                _vmprof.disable()
 
     def test_stop_sampling(self):
         if not self.plain:
             skip("unreliable test except on CPython without -A")
         import os
         import _vmprof
-        tmpfile = open(self.tmpfilename, 'wb')
         native = 1
         def f():
             import sys
@@ -132,19 +142,20 @@
             j = sys.maxsize
             for i in range(500):
                 j = math.sqrt(j)
-        _vmprof.enable(tmpfile.fileno(), 0.01, 0, native, 0, 0)
-        # get_vmprof_stack() always returns 0 here!
-        # see vmprof_common.c and assume RPYTHON_LL2CTYPES is defined!
-        f()
-        fileno = _vmprof.stop_sampling()
-        pos = os.lseek(fileno, 0, os.SEEK_CUR)
-        f()
-        pos2 = os.lseek(fileno, 0, os.SEEK_CUR)
-        assert pos == pos2
-        _vmprof.start_sampling()
-        f()
-        fileno = _vmprof.stop_sampling()
-        pos3 = os.lseek(fileno, 0, os.SEEK_CUR)
-        assert pos3 > pos
-        _vmprof.disable()
+        with open(self.tmpfilename, 'wb') as tmpfile:
+            _vmprof.enable(tmpfile.fileno(), 0.01, 0, native, 0, 0)
+            # get_vmprof_stack() always returns 0 here!
+            # see vmprof_common.c and assume RPYTHON_LL2CTYPES is defined!
+            f()
+            fileno = _vmprof.stop_sampling()
+            pos = os.lseek(fileno, 0, os.SEEK_CUR)
+            f()
+            pos2 = os.lseek(fileno, 0, os.SEEK_CUR)
+            assert pos == pos2
+            _vmprof.start_sampling()
+            f()
+            fileno = _vmprof.stop_sampling()
+            pos3 = os.lseek(fileno, 0, os.SEEK_CUR)
+            assert pos3 > pos
+            _vmprof.disable()
 
_______________________________________________
pypy-commit mailing list
[email protected]
https://mail.python.org/mailman/listinfo/pypy-commit

Reply via email to