Author: Armin Rigo <[email protected]>
Branch: stm-thread
Changeset: r54881:117ef34b3249
Date: 2012-05-02 19:07 +0200
http://bitbucket.org/pypy/pypy/changeset/117ef34b3249/
Log: First, kill the 'transaction' module.
diff --git a/lib_pypy/transaction.py b/lib_pypy/transaction.py
deleted file mode 100644
--- a/lib_pypy/transaction.py
+++ /dev/null
@@ -1,128 +0,0 @@
-"""
-Minimal example of usage:
-
- for i in range(10):
- transaction.add(do_stuff, i)
- transaction.run()
-
-This schedules and runs all ten do_stuff(i), each in its own transaction.
-Each one can also add more transactions to run afterwards, and so on.
-The call to run() returns when all transactions have completed.
-
-From the API point of view it is as if the do_stuff(i) were run serially
-in some random order. If you use a real implementation instead of this
-one (which is here for trying things out), then the transactions can
-actually run in parallel on multiple cores.
-"""
-
-import sys
-import random
-
-
-print >> sys.stderr, "warning: using lib_pypy/transaction.py, the emulator"
-
-_pending = {}
-_in_transaction = False
-
-
-class TransactionError(Exception):
- pass
-
-
-def set_num_threads(num):
- """Set the number of threads to use. In a real implementation,
- the transactions will attempt to use 'num' threads in parallel.
- """
-
-
-def add(f, *args, **kwds):
- """Register the call 'f(*args, **kwds)' as running a new
- transaction. If we are currently running in a transaction too, the
- new transaction will only start after the end of the current
- transaction. Note that if the same or another transaction raises an
- exception in the meantime, all pending transactions are cancelled.
- """
- r = random.random()
- assert r not in _pending # very bad luck if it is
- _pending[r] = (f, args, kwds)
-
-
-def add_epoll(ep, callback):
- """Register the epoll object (from the 'select' module). For any
- event (fd, events) detected by 'ep', a new transaction will be
- started invoking 'callback(fd, events)'. Note that all fds should
- be registered with the flag select.EPOLLONESHOT, and re-registered
- from the callback if needed.
- """
- for key, (f, args, kwds) in _pending.items():
- if getattr(f, '_reads_from_epoll_', None) is ep:
- raise TransactionError("add_epoll(ep): ep is already registered")
- def poll_reader():
- # assume only one epoll is added. If the _pending list is
- # now empty, wait. If not, then just poll non-blockingly.
- if len(_pending) == 0:
- timeout = -1
- else:
- timeout = 0
- got = ep.poll(timeout=timeout)
- for fd, events in got:
- add(callback, fd, events)
- add(poll_reader)
- poll_reader._reads_from_epoll_ = ep
- add(poll_reader)
-
-def remove_epoll(ep):
- """Explicitly unregister the epoll object. Note that raising an
- exception in a transaction to abort run() also unregisters all epolls.
- However, an epoll that becomes empty (doesn't wait on any fd) is not
- automatically removed; if there is only an empty epoll left and no
- further transactions, and no-one raised an exception, then it will
- basically deadlock.
- """
- for key, (f, args, kwds) in _pending.items():
- if getattr(f, '_reads_from_epoll_', None) is ep:
- del _pending[key]
- break
- else:
- raise TransactionError("remove_epoll(ep): ep is not registered")
-
-def run():
- """Run the pending transactions, as well as all transactions started
- by them, and so on. The order is random and undeterministic. Must
- be called from the main program, i.e. not from within another
- transaction. If at some point all transactions are done, returns.
- If a transaction raises an exception, it propagates here; in this
- case all pending transactions are cancelled.
- """
- global _pending, _in_transaction
- if _in_transaction:
- raise TransactionError("recursive invocation of transaction.run()")
- pending = _pending
- try:
- _in_transaction = True
- while pending:
- _, (f, args, kwds) = pending.popitem()
- f(*args, **kwds)
- finally:
- _in_transaction = False
- pending.clear() # this is the behavior we get with interp_transaction
-
-
-class local(object):
- """Thread-local data. Behaves like a regular object, but its content
- is not shared between multiple concurrently-running transactions.
- It can be accessed without conflicts.
-
- It can be used for purely transaction-local data that needs to be
- stored in a single global object. As long as the data is not needed
- after the transaction, e.g. because it is removed before the
- transaction ends, using a "local" instance avoids conflicts which
- would otherwise systematically trigger.
-
- Values that remain in a "local" instance after the end of a
- transaction are visible in the next transaction that happens to be
- executed on the same thread. This can be used for long-living
- caches that store values that are (1) not too costly to compute,
- because they will end up being computed once per thread; and (2) not
- too memory-hungry, because of the replicated storage.
- """
diff --git a/pypy/module/transaction/__init__.py
b/pypy/module/transaction/__init__.py
deleted file mode 100644
--- a/pypy/module/transaction/__init__.py
+++ /dev/null
@@ -1,30 +0,0 @@
-
-from pypy.interpreter.mixedmodule import MixedModule
-
-class Module(MixedModule):
- """Transaction module. XXX document me
- """
-
- interpleveldefs = {
- 'set_num_threads': 'interp_transaction.set_num_threads',
- 'add': 'interp_transaction.add',
- 'run': 'interp_transaction.run',
- #'add_epoll': 'interp_epoll.add_epoll', # xxx linux only
- #'remove_epoll': 'interp_epoll.remove_epoll', # xxx linux only
- 'local': 'interp_local.W_Local',
- }
-
- appleveldefs = {
- 'TransactionError': 'app_transaction.TransactionError',
- }
-
- def __init__(self, space, *args):
- "NOT_RPYTHON: patches space.threadlocals to use real threadlocals"
- from pypy.module.transaction import interp_transaction
- MixedModule.__init__(self, space, *args)
- space.threadlocals = interp_transaction.getstate(space)
-
- def startup(self, space):
- from pypy.module.transaction import interp_transaction
- state = interp_transaction.getstate(space)
- state.startup(space.wrap(self))
diff --git a/pypy/module/transaction/app_transaction.py
b/pypy/module/transaction/app_transaction.py
deleted file mode 100644
--- a/pypy/module/transaction/app_transaction.py
+++ /dev/null
@@ -1,3 +0,0 @@
-
-class TransactionError(Exception):
- pass
diff --git a/pypy/module/transaction/interp_epoll.py
b/pypy/module/transaction/interp_epoll.py
deleted file mode 100644
--- a/pypy/module/transaction/interp_epoll.py
+++ /dev/null
@@ -1,126 +0,0 @@
-
-# Linux-only
-
-from __future__ import with_statement
-import os
-from errno import EINTR
-from pypy.rpython.lltypesystem import lltype, rffi
-from pypy.interpreter.gateway import unwrap_spec
-from pypy.interpreter.error import OperationError
-from pypy.module.select import interp_epoll
-from pypy.module.select.interp_epoll import W_Epoll, FD_SETSIZE
-from pypy.module.select.interp_epoll import epoll_event
-from pypy.module.transaction import interp_transaction
-from pypy.rlib import rstm, rposix
-
-
-# a _nowrapper version, to be sure that it does not allocate anything
-_epoll_wait = rffi.llexternal(
- "epoll_wait",
- [rffi.INT, lltype.Ptr(rffi.CArray(epoll_event)), rffi.INT, rffi.INT],
- rffi.INT,
- compilation_info = interp_epoll.eci,
- _nowrapper = True
-)
-
-
-class EPollPending(interp_transaction.AbstractPending):
- maxevents = FD_SETSIZE - 1 # for now
- evs = lltype.nullptr(rffi.CArray(epoll_event))
-
- def __init__(self, space, epoller, w_callback):
- self.space = space
- self.epoller = epoller
- self.w_callback = w_callback
- self.evs = lltype.malloc(rffi.CArray(epoll_event), self.maxevents,
- flavor='raw', add_memory_pressure=True,
- track_allocation=False)
- self.force_quit = False
-
- def __del__(self):
- evs = self.evs
- if evs:
- self.evs = lltype.nullptr(rffi.CArray(epoll_event))
- lltype.free(evs, flavor='raw', track_allocation=False)
-
- def run(self):
- # This code is run non-transactionally. Careful, no GC available.
- ts = interp_transaction.state.transactionalstate
- if ts.has_exception() or self.force_quit:
- return
- fd = rffi.cast(rffi.INT, self.epoller.epfd)
- maxevents = rffi.cast(rffi.INT, self.maxevents)
- timeout = rffi.cast(rffi.INT, 500) # for now: half a second
- nfds = _epoll_wait(fd, self.evs, maxevents, timeout)
- nfds = rffi.cast(lltype.Signed, nfds)
- #
- if nfds < 0:
- errno = rposix.get_errno()
- if errno == EINTR:
- nfds = 0 # ignore, just wait for more later
- else:
- # unsure how to trigger this case
- ts = interp_transaction.state.transactionalstate
- ts.got_exception_errno = errno
- ts.must_reraise_exception(_reraise_from_errno)
- return
- # We have to allocate new PendingCallback objects, but we can't
- # allocate anything here because we are not running transactionally.
- # Workaround for now: run a new tiny transaction just to create
- # and register these PendingCallback's.
- self.nfds = nfds
- rstm.perform_transaction(EPollPending._add_real_transactions,
- EPollPending, self)
- # XXX could be avoided in the common case with some pool of
- # PendingCallback instances
-
- @staticmethod
- def _add_real_transactions(self, retry_counter):
- evs = self.evs
- for i in range(self.nfds):
- event = evs[i]
- fd = rffi.cast(lltype.Signed, event.c_data.c_fd)
- PendingCallback(self.w_callback, fd, event.c_events).register()
- # re-register myself to epoll_wait() for more
- self.register()
-
-
-class PendingCallback(interp_transaction.AbstractPending):
- def __init__(self, w_callback, fd, events):
- self.w_callback = w_callback
- self.fd = fd
- self.events = events
-
- def run_in_transaction(self, space):
- space.call_function(self.w_callback, space.wrap(self.fd),
- space.wrap(self.events))
-
-
-def _reraise_from_errno(transactionalstate):
- space = interp_transaction.state.space
- errno = transactionalstate.got_exception_errno
- msg = os.strerror(errno)
- w_type = space.w_IOError
- w_error = space.call_function(w_type, space.wrap(errno), space.wrap(msg))
- raise OperationError(w_type, w_error)
-
-
-@unwrap_spec(epoller=W_Epoll)
-def add_epoll(space, epoller, w_callback):
- state = interp_transaction.state
- if epoller in state.epolls:
- raise OperationError(state.w_error,
- space.wrap("add_epoll(ep): ep is already registered"))
- pending = EPollPending(space, epoller, w_callback)
- state.epolls[epoller] = pending
- pending.register()
-
-@unwrap_spec(epoller=W_Epoll)
-def remove_epoll(space, epoller):
- state = interp_transaction.state
- pending = state.epolls.get(epoller, None)
- if pending is None:
- raise OperationError(state.w_error,
- space.wrap("remove_epoll(ep): ep is not registered"))
- pending.force_quit = True
- del state.epolls[epoller]
diff --git a/pypy/module/transaction/interp_local.py
b/pypy/module/transaction/interp_local.py
deleted file mode 100644
--- a/pypy/module/transaction/interp_local.py
+++ /dev/null
@@ -1,55 +0,0 @@
-from pypy.interpreter.baseobjspace import Wrappable
-from pypy.interpreter.typedef import (TypeDef, interp2app, GetSetProperty,
- descr_get_dict)
-from pypy.module.transaction.interp_transaction import getstate
-
-
-class W_Local(Wrappable):
- """Thread-local data. Behaves like a regular object, but its content
- is not shared between multiple concurrently-running transactions.
- It can be accessed without conflicts.
-
- It can be used for purely transaction-local data.
-
- It can also be used for long-living caches that store values that
- are (1) not too costly to compute and (2) not too memory-hungry,
- because they will end up being computed and stored once per actual
- thread.
- """
-
- def __init__(self, space):
- self.state = getstate(space)
- self.dicts = []
- self._update_dicts()
- # unless we call transaction.set_num_threads() afterwards, this
- # 'local' object is now initialized with the correct number of
- # dictionaries, to avoid conflicts later if _update_dicts() is
- # called in a transaction.
-
- def _update_dicts(self):
- state = self.state
- new = state.get_total_number_of_threads() - len(self.dicts)
- if new <= 0:
- return
- # update the list without appending to it (to keep it non-resizable)
- self.dicts = self.dicts + [state.space.newdict(instance=True)
- for i in range(new)]
-
- def getdict(self, space):
- n = self.state.get_thread_number()
- try:
- return self.dicts[n]
- except IndexError:
- self._update_dicts()
- assert n < len(self.dicts)
- return self.dicts[n]
-
-def descr_local__new__(space, w_subtype):
- local = W_Local(space)
- return space.wrap(local)
-
-W_Local.typedef = TypeDef("transaction.local",
- __new__ = interp2app(descr_local__new__),
- __dict__ = GetSetProperty(descr_get_dict, cls=W_Local),
- )
-W_Local.typedef.acceptable_as_base_class = False
diff --git a/pypy/module/transaction/interp_transaction.py
b/pypy/module/transaction/interp_transaction.py
deleted file mode 100644
--- a/pypy/module/transaction/interp_transaction.py
+++ /dev/null
@@ -1,176 +0,0 @@
-from pypy.interpreter.error import OperationError
-from pypy.interpreter.gateway import unwrap_spec
-from pypy.interpreter.executioncontext import ExecutionContext
-from pypy.rlib import rstm
-
-
-class State(object):
- """The shared, global state. Warning, writes to it cause conflicts.
- XXX fix me to somehow avoid conflicts at the beginning due to setvalue()
- """
-
- def __init__(self, space):
- self.space = space
- self.num_threads = rstm.NUM_THREADS_DEFAULT
- self.running = False
- self.w_error = None
- self.threadobjs = {} # empty during translation
- self.threadnums = {} # empty during translation
- self.epolls = {}
- self.pending_before_run = []
-
- def startup(self, w_module):
- if w_module is not None: # for tests
- space = self.space
- self.w_error = space.getattr(w_module,
- space.wrap('TransactionError'))
- main_ec = self.space.getexecutioncontext() # create it if needed
- main_ec._transaction_pending = self.pending_before_run
-
- def add_thread(self, id, ec):
- # register a new transaction thread
- assert id not in self.threadobjs
- ec._transaction_pending = []
- self.threadobjs[id] = ec
- self.threadnums[id] = len(self.threadnums)
-
- # ---------- interface for ThreadLocals ----------
- # This works really like a thread-local, which may have slightly
- # strange consequences in multiple transactions, because you don't
- # know on which thread a transaction will run. The point of this is
- # to let every thread get its own ExecutionContext; otherwise, they
- # conflict with each other e.g. when setting the 'topframeref'
- # attribute.
-
- def getvalue(self):
- id = rstm.thread_id()
- return self.threadobjs.get(id, None)
-
- def setvalue(self, value):
- id = rstm.thread_id()
- if id == rstm.MAIN_THREAD_ID:
- assert len(self.threadobjs) == 0
- assert len(self.threadnums) == 0
- self.threadobjs[id] = value
- self.threadnums[id] = 0
- else:
- self.add_thread(id, value)
-
- def getmainthreadvalue(self):
- return self.threadobjs.get(MAIN_THREAD_ID, None)
-
- def getallvalues(self):
- return self.threadobjs
-
- def clear_all_values_apart_from_main(self):
- for id in self.threadobjs.keys():
- if id != MAIN_THREAD_ID:
- del self.threadobjs[id]
- for id in self.threadnums.keys():
- if id != MAIN_THREAD_ID:
- del self.threadnums[id]
- self.epolls.clear()
-
- def get_thread_number(self):
- id = rstm.thread_id()
- return self.threadnums[id]
-
- def get_total_number_of_threads(self):
- return 1 + self.num_threads
-
- def set_num_threads(self, num):
- if self.running:
- space = self.space
- raise OperationError(self.w_error,
- space.wrap("cannot change the number of "
- "threads when transaction.run() "
- "is active"))
- self.num_threads = num
-
-
-def getstate(space):
- return space.fromcache(State)
-
-
-@unwrap_spec(num=int)
-def set_num_threads(space, num):
- if num < 1:
- num = 1
- getstate(space).set_num_threads(num)
-
-
-class SpaceTransaction(rstm.Transaction):
-
- def __init__(self, space, w_callback, args):
- self.space = space
- self.state = getstate(space)
- self.w_callback = w_callback
- self.args = args
-
- def register(self):
- """Register this SpaceTransaction instance in the pending list
- belonging to the current thread. If called from the main
- thread, it is the global list. If called from a transaction,
- it is a thread-local list that will be merged with the global
- list when the transaction is done.
- NOTE: never register() the same instance multiple times.
- """
- ec = self.state.getvalue()
- assert ec is not None # must have been created first
- ec._transaction_pending.append(self)
-
- def run(self):
- #if self.retry_counter > 0:
- # # retrying: will be done later, try others first
- # return [self] # XXX does not work, will be retried immediately
- #
- ec = self.space.getexecutioncontext() # create it if needed
- assert len(ec._transaction_pending) == 0
- #
- if self.space.config.objspace.std.withmethodcache:
- from pypy.objspace.std.typeobject import MethodCache
- ec._methodcache = MethodCache(self.space)
- #
- self.space.call_args(self.w_callback, self.args)
- #
- if self.space.config.objspace.std.withmethodcache:
- # remove the method cache again now, to prevent it from being
- # promoted to a GLOBAL
- ec._methodcache = None
- #
- result = ec._transaction_pending
- ec._transaction_pending = []
- return result
-
-
-class InitialTransaction(rstm.Transaction):
-
- def __init__(self, state):
- self.state = state
-
- def run(self):
- # initially: return the list of already-added transactions as
- # the list of transactions to run next, and clear it
- result = self.state.pending_before_run[:]
- del self.state.pending_before_run[:]
- return result
-
-
-def add(space, w_callback, __args__):
- transaction = SpaceTransaction(space, w_callback, __args__)
- transaction.register()
-
-
-def run(space):
- state = getstate(space)
- if state.running:
- raise OperationError(
- state.w_error,
- space.wrap("recursive invocation of transaction.run()"))
- state.running = True
- try:
- rstm.run_all_transactions(InitialTransaction(state),
- num_threads = state.num_threads)
- finally:
- state.running = False
- assert len(state.pending_before_run) == 0
diff --git a/pypy/module/transaction/test/__init__.py
b/pypy/module/transaction/test/__init__.py
deleted file mode 100644
diff --git a/pypy/module/transaction/test/test_epoll.py
b/pypy/module/transaction/test/test_epoll.py
deleted file mode 100644
--- a/pypy/module/transaction/test/test_epoll.py
+++ /dev/null
@@ -1,89 +0,0 @@
-import py
-from pypy.conftest import gettestobjspace
-py.test.skip("epoll support disabled for now")
-
-
-class AppTestEpoll:
- def setup_class(cls):
- cls.space = gettestobjspace(usemodules=['transaction', 'select'])
-
- def test_non_transactional(self):
- import select, posix as os
- fd_read, fd_write = os.pipe()
- epoller = select.epoll()
- epoller.register(fd_read)
- os.write(fd_write, 'x')
- [(fd, events)] = epoller.poll()
- assert fd == fd_read
- assert events & select.EPOLLIN
- got = os.read(fd_read, 1)
- assert got == 'x'
-
- def test_simple(self):
- import transaction, select, posix as os
-
- steps = []
-
- fd_read, fd_write = os.pipe()
-
- epoller = select.epoll()
- epoller.register(fd_read)
-
- def write_stuff():
- os.write(fd_write, 'x')
- steps.append('write_stuff')
-
- class Done(Exception):
- pass
-
- def callback(fd, events):
- assert fd == fd_read
- assert events & select.EPOLLIN
- got = os.read(fd_read, 1)
- assert got == 'x'
- steps.append('callback')
- raise Done
-
- transaction.add_epoll(epoller, callback)
- transaction.add(write_stuff)
-
- assert steps == []
- raises(Done, transaction.run)
- assert steps == ['write_stuff', 'callback']
-
- def test_remove_closed_epoll(self):
- import transaction, select, posix as os
-
- fd_read, fd_write = os.pipe()
-
- epoller = select.epoll()
- epoller.register(fd_read)
-
- # we run it 10 times in order to get both possible orders in
- # the emulator
- for i in range(10):
- transaction.add_epoll(epoller, lambda *args: not_actually_callable)
- transaction.add(transaction.remove_epoll, epoller)
- transaction.run()
- # assert didn't deadlock
- transaction.add(transaction.remove_epoll, epoller)
- transaction.add_epoll(epoller, lambda *args: not_actually_callable)
- transaction.run()
- # assert didn't deadlock
-
- def test_errors(self):
- import transaction, select
- epoller = select.epoll()
- callback = lambda *args: not_actually_callable
- transaction.add_epoll(epoller, callback)
- raises(transaction.TransactionError,
- transaction.add_epoll, epoller, callback)
- transaction.remove_epoll(epoller)
- raises(transaction.TransactionError,
- transaction.remove_epoll, epoller)
-
-
-class AppTestEpollEmulator(AppTestEpoll):
- def setup_class(cls):
- # test for lib_pypy/transaction.py
- cls.space = gettestobjspace(usemodules=['select'])
diff --git a/pypy/module/transaction/test/test_interp_transaction.py
b/pypy/module/transaction/test/test_interp_transaction.py
deleted file mode 100644
--- a/pypy/module/transaction/test/test_interp_transaction.py
+++ /dev/null
@@ -1,91 +0,0 @@
-import time
-from pypy.module.transaction import interp_transaction
-
-
-class FakeSpace:
- class config:
- class objspace:
- class std:
- withmethodcache = False
- def __init__(self):
- self._spacecache = {}
- def getexecutioncontext(self):
- state = interp_transaction.getstate(self)
- ec = state.getvalue()
- if ec is None:
- ec = FakeEC()
- state.setvalue(ec)
- return ec
- def call_args(self, w_callback, args):
- w_callback(*args)
- def fromcache(self, Cls):
- if Cls not in self._spacecache:
- self._spacecache[Cls] = Cls(self)
- return self._spacecache[Cls]
-
-class FakeEC:
- pass
-
-def make_fake_space():
- space = FakeSpace()
- interp_transaction.getstate(space).startup(None)
- return space
-
-
-def test_linear_list():
- space = make_fake_space()
- seen = []
- #
- def do(n):
- seen.append(n)
- if n < 200:
- interp_transaction.add(space, do, (n+1,))
- #
- interp_transaction.add(space, do, (0,))
- assert seen == []
- interp_transaction.run(space)
- assert seen == range(201)
-
-
-def test_tree_of_transactions():
- space = make_fake_space()
- seen = []
- #
- def do(level):
- seen.append(level)
- if level < 11:
- interp_transaction.add(space, do, (level+1,))
- interp_transaction.add(space, do, (level+1,))
- #
- interp_transaction.add(space, do, (0,))
- assert seen == []
- interp_transaction.run(space)
- for i in range(12):
- assert seen.count(i) == 2 ** i
- assert len(seen) == 2 ** 12 - 1
-
-
-def test_transactional_simple():
- space = make_fake_space()
- lst = []
- def f(n):
- lst.append(n+0)
- lst.append(n+1)
- time.sleep(0.05)
- lst.append(n+2)
- lst.append(n+3)
- lst.append(n+4)
- time.sleep(0.25)
- lst.append(n+5)
- lst.append(n+6)
- interp_transaction.add(space, f, (10,))
- interp_transaction.add(space, f, (20,))
- interp_transaction.add(space, f, (30,))
- interp_transaction.run(space)
- assert len(lst) == 7 * 3
- seen = set()
- for start in range(0, 21, 7):
- seen.add(lst[start])
- for index in range(7):
- assert lst[start + index] == lst[start] + index
- assert seen == set([10, 20, 30])
diff --git a/pypy/module/transaction/test/test_local.py
b/pypy/module/transaction/test/test_local.py
deleted file mode 100644
--- a/pypy/module/transaction/test/test_local.py
+++ /dev/null
@@ -1,75 +0,0 @@
-import py
-from pypy.conftest import gettestobjspace
-
-
-class AppTestLocal:
- def setup_class(cls):
- cls.space = gettestobjspace(usemodules=['transaction'])
-
- def test_simple(self):
- import transaction
- x = transaction.local()
- x.foo = 42
- assert x.foo == 42
- assert hasattr(x, 'foo')
- assert not hasattr(x, 'bar')
- assert getattr(x, 'foo', 84) == 42
- assert getattr(x, 'bar', 84) == 84
-
- def test_transaction_local(self):
- import transaction
- transaction.set_num_threads(2)
- x = transaction.local()
- all_lists = []
-
- def f(n):
- if not hasattr(x, 'lst'):
- x.lst = []
- all_lists.append(x.lst)
- x.lst.append(n)
- if n > 0:
- transaction.add(f, n - 1)
- transaction.add(f, n - 1)
- transaction.add(f, 5)
- transaction.run()
-
- assert not hasattr(x, 'lst')
- assert len(all_lists) == 2
- total = all_lists[0] + all_lists[1]
- assert total.count(5) == 1
- assert total.count(4) == 2
- assert total.count(3) == 4
- assert total.count(2) == 8
- assert total.count(1) == 16
- assert total.count(0) == 32
- assert len(total) == 63
-
- def test_transaction_local_growing(self):
- import transaction
- transaction.set_num_threads(1)
- x = transaction.local()
- all_lists = []
-
- def f(n):
- if not hasattr(x, 'lst'):
- x.lst = []
- all_lists.append(x.lst)
- x.lst.append(n)
- if n > 0:
- transaction.add(f, n - 1)
- transaction.add(f, n - 1)
- transaction.add(f, 5)
-
- transaction.set_num_threads(2) # more than 1 specified above
- transaction.run()
-
- assert not hasattr(x, 'lst')
- assert len(all_lists) == 2
- total = all_lists[0] + all_lists[1]
- assert total.count(5) == 1
- assert total.count(4) == 2
- assert total.count(3) == 4
- assert total.count(2) == 8
- assert total.count(1) == 16
- assert total.count(0) == 32
- assert len(total) == 63
diff --git a/pypy/module/transaction/test/test_transaction.py
b/pypy/module/transaction/test/test_transaction.py
deleted file mode 100644
--- a/pypy/module/transaction/test/test_transaction.py
+++ /dev/null
@@ -1,82 +0,0 @@
-import py
-from pypy.conftest import gettestobjspace
-
-
-class AppTestTransaction:
- def setup_class(cls):
- cls.space = gettestobjspace(usemodules=['transaction'])
-
- def test_set_num_threads(self):
- import transaction
- transaction.set_num_threads(4)
-
- def test_simple(self):
- import transaction
- lst = []
- transaction.add(lst.append, 5)
- transaction.add(lst.append, 6)
- transaction.add(lst.append, 7)
- transaction.run()
- assert sorted(lst) == [5, 6, 7]
-
- def test_almost_as_simple(self):
- import transaction
- lst = []
- def f(n):
- lst.append(n+0)
- lst.append(n+1)
- lst.append(n+2)
- lst.append(n+3)
- lst.append(n+4)
- lst.append(n+5)
- lst.append(n+6)
- transaction.add(f, 10)
- transaction.add(f, 20)
- transaction.add(f, 30)
- transaction.run()
- assert len(lst) == 7 * 3
- seen = set()
- for start in range(0, 21, 7):
- seen.add(lst[start])
- for index in range(7):
- assert lst[start + index] == lst[start] + index
- assert seen == set([10, 20, 30])
-
- def test_propagate_exception(self):
- import transaction, time
- lst = []
- def f(n):
- lst.append(n)
- time.sleep(0.5)
- raise ValueError(n)
- transaction.add(f, 10)
- transaction.add(f, 20)
- transaction.add(f, 30)
- try:
- transaction.run()
- assert 0, "should have raised ValueError"
- except ValueError, e:
- pass
- assert len(lst) == 1
- assert lst[0] == e.args[0]
-
- def test_clear_pending_transactions(self):
- import transaction
- class Foo(Exception):
- pass
- def raiseme():
- raise Foo
- for i in range(20):
- transaction.add(raiseme)
- try:
- transaction.run()
- assert 0, "should have raised Foo"
- except Foo:
- pass
- transaction.run() # all the other 'raiseme's should have been cleared
-
-
-class AppTestTransactionEmulator(AppTestTransaction):
- def setup_class(cls):
- # test for lib_pypy/transaction.py
- cls.space = gettestobjspace(usemodules=[])
diff --git a/pypy/module/transaction/test/test_ztranslation.py
b/pypy/module/transaction/test/test_ztranslation.py
deleted file mode 100644
--- a/pypy/module/transaction/test/test_ztranslation.py
+++ /dev/null
@@ -1,4 +0,0 @@
-from pypy.objspace.fake.checkmodule import checkmodule
-
-def test_checkmodule():
- checkmodule('transaction')
_______________________________________________
pypy-commit mailing list
[email protected]
http://mail.python.org/mailman/listinfo/pypy-commit