Author: Matti Picus <[email protected]>
Branch: vmprof-win32
Changeset: r92021:91789212fc18
Date: 2017-08-01 23:24 +0300
http://bitbucket.org/pypy/pypy/changeset/91789212fc18/
Log: close files
diff --git a/pypy/module/_vmprof/test/test__vmprof.py
b/pypy/module/_vmprof/test/test__vmprof.py
--- a/pypy/module/_vmprof/test/test__vmprof.py
+++ b/pypy/module/_vmprof/test/test__vmprof.py
@@ -1,6 +1,5 @@
import sys
from rpython.tool.udir import udir
-from pypy.tool.pytest.objspace import gettestobjspace
class AppTestVMProf(object):
spaceconfig = {'usemodules': ['_vmprof', 'struct']}
@@ -12,82 +11,90 @@
'__pypy__' not in sys.builtin_module_names)
def test_import_vmprof(self):
+ import _vmprof
tmpfile = open(self.tmpfilename, 'wb')
- tmpfileno = tmpfile.fileno()
tmpfile2 = open(self.tmpfilename2, 'wb')
- tmpfileno2 = tmpfile2.fileno()
+ try:
+ tmpfileno = tmpfile.fileno()
+ tmpfileno2 = tmpfile2.fileno()
- import struct, sys, gc
+ import struct, gc
- WORD = struct.calcsize('l')
+ WORD = struct.calcsize('l')
- def count(s):
- i = 0
- count = 0
- i += 5 * WORD # header
- assert s[i ] == '\x05' # MARKER_HEADER
- assert s[i + 1] == '\x00' # 0
- assert s[i + 2] == '\x06' # VERSION_TIMESTAMP
- assert s[i + 3] == '\x08' # PROFILE_RPYTHON
- assert s[i + 4] == chr(4) # len('pypy')
- assert s[i + 5: i + 9] == 'pypy'
- i += 9
- while i < len(s):
- if s[i] == '\x03':
- break
- elif s[i] == '\x01':
- i += 1
- _, size = struct.unpack("ll", s[i:i + 2 * WORD])
- i += 2 * WORD + size * struct.calcsize("P")
- i += WORD # thread id
- elif s[i] == '\x02':
- i += 1
- _, size = struct.unpack("ll", s[i:i + 2 * WORD])
- count += 1
- i += 2 * WORD + size
- elif s[i] == '\x06':
- print(s[i:i+24])
- i += 1+8+8+8
- elif s[i] == '\x07':
- i += 1
- # skip string
- size, = struct.unpack("l", s[i:i + WORD])
- i += WORD+size
- # skip string
- size, = struct.unpack("l", s[i:i + WORD])
- i += WORD+size
- else:
- raise AssertionError(ord(s[i]))
- return count
+ def count(s):
+ i = 0
+ count = 0
+ i += 5 * WORD # header
+ assert s[i ] == '\x05' # MARKER_HEADER
+ assert s[i + 1] == '\x00' # 0
+ assert s[i + 2] == '\x06' # VERSION_TIMESTAMP
+ assert s[i + 3] == '\x08' # PROFILE_RPYTHON
+ assert s[i + 4] == chr(4) # len('pypy')
+ assert s[i + 5: i + 9] == 'pypy'
+ i += 9
+ while i < len(s):
+ if s[i] == '\x03':
+ break
+ elif s[i] == '\x01':
+ i += 1
+ _, size = struct.unpack("ll", s[i:i + 2 * WORD])
+ i += 2 * WORD + size * struct.calcsize("P")
+ i += WORD # thread id
+ elif s[i] == '\x02':
+ i += 1
+ _, size = struct.unpack("ll", s[i:i + 2 * WORD])
+ count += 1
+ i += 2 * WORD + size
+ elif s[i] == '\x06':
+ print(s[i:i+24])
+ i += 1+8+8+8
+ elif s[i] == '\x07':
+ i += 1
+ # skip string
+ size, = struct.unpack("l", s[i:i + WORD])
+ i += WORD+size
+ # skip string
+ size, = struct.unpack("l", s[i:i + WORD])
+ i += WORD+size
+ else:
+ raise AssertionError(ord(s[i]))
+ return count
- import _vmprof
- gc.collect() # try to make the weakref list deterministic
- gc.collect() # by freeing all dead code objects
- _vmprof.enable(tmpfileno, 0.01, 0, 0, 0, 0)
- _vmprof.disable()
- s = open(self.tmpfilename, 'rb').read()
- no_of_codes = count(s)
- assert no_of_codes > 10
- d = {}
+ gc.collect() # try to make the weakref list deterministic
+ gc.collect() # by freeing all dead code objects
+ _vmprof.enable(tmpfileno, 0.01, 0, 0, 0, 0)
+ _vmprof.disable()
+ with open(self.tmpfilename, 'rb') as fid:
+ s = fid.read()
+ no_of_codes = count(s)
+ assert no_of_codes > 10
+ d = {}
- exec """def foo():
- pass
- """ in d
+ exec """def foo():
+ pass
+ """ in d
- gc.collect()
- gc.collect()
- _vmprof.enable(tmpfileno2, 0.01, 0, 0, 0, 0)
+ gc.collect()
+ gc.collect()
+ _vmprof.enable(tmpfileno2, 0.01, 0, 0, 0, 0)
- exec """def foo2():
- pass
- """ in d
+ exec """def foo2():
+ pass
+ """ in d
- _vmprof.disable()
- s = open(self.tmpfilename2, 'rb').read()
- no_of_codes2 = count(s)
- assert "py:foo:" in s
- assert "py:foo2:" in s
- assert no_of_codes2 >= no_of_codes + 2 # some extra codes from tests
+ _vmprof.disable()
+ with open(self.tmpfilename2, 'rb') as fid:
+ s = fid.read()
+ no_of_codes2 = count(s)
+ assert "py:foo:" in s
+ assert "py:foo2:" in s
+ assert no_of_codes2 >= no_of_codes + 2 # some extra codes from
tests
+ finally:
+ if _vmprof.is_enabled():
+ _vmprof.disable()
+ tmpfile.close()
+ tmpfile2.close()
def test_enable_ovf(self):
import _vmprof
@@ -100,31 +107,34 @@
def test_is_enabled(self):
import _vmprof
- tmpfile = open(self.tmpfilename, 'wb')
- assert _vmprof.is_enabled() is False
- _vmprof.enable(tmpfile.fileno(), 0.01, 0, 0, 0, 0)
- assert _vmprof.is_enabled() is True
- _vmprof.disable()
- assert _vmprof.is_enabled() is False
+ with open(self.tmpfilename, 'wb') as tmpfile:
+ assert _vmprof.is_enabled() is False
+ _vmprof.enable(tmpfile.fileno(), 0.01, 0, 0, 0, 0)
+ assert _vmprof.is_enabled() is True
+ _vmprof.disable()
+ assert _vmprof.is_enabled() is False
def test_get_profile_path(self):
import _vmprof
- tmpfile = open(self.tmpfilename, 'wb')
- assert _vmprof.get_profile_path() is None
- _vmprof.enable(tmpfile.fileno(), 0.01, 0, 0, 0, 0)
- path = _vmprof.get_profile_path()
- if path != tmpfile.name:
- with open(path, "rb") as fd1:
- assert fd1.read() == tmpfile.read()
- _vmprof.disable()
- assert _vmprof.get_profile_path() is None
+ try:
+ with open(self.tmpfilename, 'wb') as tmpfile:
+ assert _vmprof.get_profile_path() is None
+ _vmprof.enable(tmpfile.fileno(), 0.01, 0, 0, 0, 0)
+ path = _vmprof.get_profile_path()
+ if path != tmpfile.name:
+ with open(path, "rb") as fd1:
+ assert fd1.read() == tmpfile.read()
+ _vmprof.disable()
+ assert _vmprof.get_profile_path() is None
+ finally:
+ if _vmprof.is_enabled():
+ _vmprof.disable()
def test_stop_sampling(self):
if not self.plain:
skip("unreliable test except on CPython without -A")
import os
import _vmprof
- tmpfile = open(self.tmpfilename, 'wb')
native = 1
def f():
import sys
@@ -132,19 +142,20 @@
j = sys.maxsize
for i in range(500):
j = math.sqrt(j)
- _vmprof.enable(tmpfile.fileno(), 0.01, 0, native, 0, 0)
- # get_vmprof_stack() always returns 0 here!
- # see vmprof_common.c and assume RPYTHON_LL2CTYPES is defined!
- f()
- fileno = _vmprof.stop_sampling()
- pos = os.lseek(fileno, 0, os.SEEK_CUR)
- f()
- pos2 = os.lseek(fileno, 0, os.SEEK_CUR)
- assert pos == pos2
- _vmprof.start_sampling()
- f()
- fileno = _vmprof.stop_sampling()
- pos3 = os.lseek(fileno, 0, os.SEEK_CUR)
- assert pos3 > pos
- _vmprof.disable()
+ with open(self.tmpfilename, 'wb') as tmpfile:
+ _vmprof.enable(tmpfile.fileno(), 0.01, 0, native, 0, 0)
+ # get_vmprof_stack() always returns 0 here!
+ # see vmprof_common.c and assume RPYTHON_LL2CTYPES is defined!
+ f()
+ fileno = _vmprof.stop_sampling()
+ pos = os.lseek(fileno, 0, os.SEEK_CUR)
+ f()
+ pos2 = os.lseek(fileno, 0, os.SEEK_CUR)
+ assert pos == pos2
+ _vmprof.start_sampling()
+ f()
+ fileno = _vmprof.stop_sampling()
+ pos3 = os.lseek(fileno, 0, os.SEEK_CUR)
+ assert pos3 > pos
+ _vmprof.disable()
_______________________________________________
pypy-commit mailing list
[email protected]
https://mail.python.org/mailman/listinfo/pypy-commit