Author: Armin Rigo <[email protected]>
Branch: stmgc-c8
Changeset: r78177:f597af41b44b
Date: 2015-06-18 14:48 +0200
http://bitbucket.org/pypy/pypy/changeset/f597af41b44b/
Log: add task_done() and join() to rstm's queues
diff --git a/rpython/memory/gctransform/stmframework.py
b/rpython/memory/gctransform/stmframework.py
--- a/rpython/memory/gctransform/stmframework.py
+++ b/rpython/memory/gctransform/stmframework.py
@@ -225,6 +225,7 @@
gct_stm_transaction_break = _gct_with_roots_pushed
gct_stm_collect = _gct_with_roots_pushed
gct_stm_queue_get = _gct_with_roots_pushed
+ gct_stm_queue_join = _gct_with_roots_pushed
class StmRootWalker(BaseRootWalker):
diff --git a/rpython/rlib/rstm.py b/rpython/rlib/rstm.py
--- a/rpython/rlib/rstm.py
+++ b/rpython/rlib/rstm.py
@@ -370,12 +370,22 @@
def _ll_queue_put(q, newitem):
llop.stm_queue_put(lltype.Void, q, q.ll_raw_queue, newitem)
+@dont_look_inside
+def _ll_queue_task_done(q):
+ llop.stm_queue_task_done(lltype.Void, q.ll_raw_queue)
+
+@dont_look_inside
+def _ll_queue_join(q):
+ return llop.stm_queue_join(lltype.Signed, q, q.ll_raw_queue)
+
_QUEUE_OBJ = lltype.GcStruct('QUEUE_OBJ',
('ll_raw_queue', _STM_QUEUE_P),
hints={'immutable': True},
rtti=True,
adtmeths={'get': _ll_queue_get,
- 'put': _ll_queue_put})
+ 'put': _ll_queue_put,
+ 'task_done': _ll_queue_task_done,
+ 'join': _ll_queue_join})
NULL_QUEUE = lltype.nullptr(_QUEUE_OBJ)
def _ll_queue_trace(gc, obj, callback, arg):
@@ -423,3 +433,10 @@
def put(self, newitem):
assert lltype.typeOf(newitem) == llmemory.GCREF
self._content.put(newitem)
+
+ def task_done(self):
+ self._content.task_done()
+
+ def join(self):
+ self._content.join()
+ return 0
diff --git a/rpython/rtyper/llinterp.py b/rpython/rtyper/llinterp.py
--- a/rpython/rtyper/llinterp.py
+++ b/rpython/rtyper/llinterp.py
@@ -1006,6 +1006,8 @@
op_stm_queue_free = _stm_not_implemented
op_stm_queue_get = _stm_not_implemented
op_stm_queue_put = _stm_not_implemented
+ op_stm_queue_task_done = _stm_not_implemented
+ op_stm_queue_join = _stm_not_implemented
op_stm_queue_tracefn = _stm_not_implemented
op_stm_register_thread_local = _stm_not_implemented
op_stm_unregister_thread_local = _stm_not_implemented
diff --git a/rpython/rtyper/lltypesystem/lloperation.py
b/rpython/rtyper/lltypesystem/lloperation.py
--- a/rpython/rtyper/lltypesystem/lloperation.py
+++ b/rpython/rtyper/lltypesystem/lloperation.py
@@ -483,6 +483,8 @@
'stm_queue_free': LLOp(),
'stm_queue_get': LLOp(canmallocgc=True), # push roots!
'stm_queue_put': LLOp(),
+ 'stm_queue_task_done': LLOp(),
+ 'stm_queue_join': LLOp(canmallocgc=True), # push roots!
'stm_queue_tracefn': LLOp(),
# __________ address operations __________
diff --git a/rpython/translator/stm/breakfinder.py
b/rpython/translator/stm/breakfinder.py
--- a/rpython/translator/stm/breakfinder.py
+++ b/rpython/translator/stm/breakfinder.py
@@ -10,6 +10,7 @@
'stm_leave_callback_call',
'stm_transaction_break',
'stm_queue_get',
+ 'stm_queue_join',
])
for tb in TRANSACTION_BREAK:
diff --git a/rpython/translator/stm/funcgen.py
b/rpython/translator/stm/funcgen.py
--- a/rpython/translator/stm/funcgen.py
+++ b/rpython/translator/stm/funcgen.py
@@ -392,6 +392,17 @@
return 'stm_queue_put((object_t *)%s, %s, (object_t *)%s);' % (
arg0, arg1, arg2)
+def stm_queue_task_done(funcgen, op):
+ arg0 = funcgen.expr(op.args[0])
+ return 'stm_queue_task_done(%s);' % (arg0,)
+
+def stm_queue_join(funcgen, op):
+ arg0 = funcgen.expr(op.args[0])
+ arg1 = funcgen.expr(op.args[1])
+ result = funcgen.expr(op.result)
+ return ('%s = stm_queue_join((object_t *)%s, %s, '
+ '&stm_thread_local);' % (result, arg0, arg1,))
+
def stm_queue_tracefn(funcgen, op):
arg0 = funcgen.expr(op.args[0])
arg1 = funcgen.expr(op.args[1])
diff --git a/rpython/translator/stm/src_stm/revision
b/rpython/translator/stm/src_stm/revision
--- a/rpython/translator/stm/src_stm/revision
+++ b/rpython/translator/stm/src_stm/revision
@@ -1,1 +1,1 @@
-7592a0f11ac2
+d083e426a17d
diff --git a/rpython/translator/stm/src_stm/stm/queue.c
b/rpython/translator/stm/src_stm/stm/queue.c
--- a/rpython/translator/stm/src_stm/stm/queue.c
+++ b/rpython/translator/stm/src_stm/stm/queue.c
@@ -25,6 +25,10 @@
notion is per segment.) this flag says that the queue is
already in the tree STM_PSEGMENT->active_queues. */
bool active;
+
+ /* counts the number of put's done in this transaction, minus
+ the number of task_done's */
+ int64_t unfinished_tasks_in_this_transaction;
};
char pad[64];
} stm_queue_segment_t;
@@ -37,6 +41,10 @@
/* a chained list of old entries in the queue */
queue_entry_t *volatile old_entries;
+
+ /* total of 'unfinished_tasks_in_this_transaction' for all
+ committed transactions */
+ volatile int64_t unfinished_tasks;
};
@@ -126,6 +134,7 @@
queue_lock_acquire();
bool added_any_old_entries = false;
+ bool finished_more_tasks = false;
wlog_t *item;
TREE_LOOP_FORWARD(STM_PSEGMENT->active_queues, item) {
stm_queue_t *queue = (stm_queue_t *)item->addr;
@@ -133,6 +142,11 @@
queue_entry_t *head, *freehead;
if (at_commit) {
+ int64_t d = seg->unfinished_tasks_in_this_transaction;
+ if (d != 0) {
+ finished_more_tasks |= (d < 0);
+ __sync_add_and_fetch(&queue->unfinished_tasks, d);
+ }
head = seg->added_in_this_transaction;
freehead = seg->old_objects_popped;
}
@@ -145,6 +159,7 @@
seg->added_in_this_transaction = NULL;
seg->added_young_limit = NULL;
seg->old_objects_popped = NULL;
+ seg->unfinished_tasks_in_this_transaction = 0;
/* free the list of entries that must disappear */
queue_free_entries(freehead);
@@ -176,10 +191,11 @@
queue_lock_release();
- if (added_any_old_entries) {
- assert(_has_mutex());
+ assert(_has_mutex());
+ if (added_any_old_entries)
cond_broadcast(C_QUEUE_OLD_ENTRIES);
- }
+ if (finished_more_tasks)
+ cond_broadcast(C_QUEUE_FINISHED_MORE_TASKS);
}
void stm_queue_put(object_t *qobj, stm_queue_t *queue, object_t *newitem)
@@ -195,6 +211,7 @@
seg->added_in_this_transaction = entry;
queue_activate(queue);
+ seg->unfinished_tasks_in_this_transaction++;
/* add qobj to 'objects_pointing_to_nursery' if it has the
WRITE_BARRIER flag */
@@ -285,6 +302,41 @@
}
}
+void stm_queue_task_done(stm_queue_t *queue)
+{
+ queue_activate(queue);
+ stm_queue_segment_t *seg = &queue->segs[STM_SEGMENT->segment_num - 1];
+ seg->unfinished_tasks_in_this_transaction--;
+}
+
+long stm_queue_join(object_t *qobj, stm_queue_t *queue, stm_thread_local_t *tl)
+{
+ int64_t result;
+
+#if STM_TESTS
+ result = queue->unfinished_tasks; /* can't wait in tests */
+ result += (queue->segs[STM_SEGMENT->segment_num - 1]
+ .unfinished_tasks_in_this_transaction);
+ return result;
+#else
+ STM_PUSH_ROOT(*tl, qobj);
+ _stm_commit_transaction();
+
+ s_mutex_lock();
+ while ((result = queue->unfinished_tasks) > 0) {
+ cond_wait(C_QUEUE_FINISHED_MORE_TASKS);
+ }
+ s_mutex_unlock();
+
+ _stm_start_transaction(tl);
+ STM_POP_ROOT(*tl, qobj); /* 'queue' should stay alive until here */
+#endif
+
+ /* returns 0 for 'ok', or negative if there was more task_done()
+ than put() so far */
+ return result;
+}
+
static void queue_trace_list(queue_entry_t *entry, void trace(object_t **),
queue_entry_t *stop_at)
{
diff --git a/rpython/translator/stm/src_stm/stm/sync.h
b/rpython/translator/stm/src_stm/stm/sync.h
--- a/rpython/translator/stm/src_stm/stm/sync.h
+++ b/rpython/translator/stm/src_stm/stm/sync.h
@@ -7,6 +7,7 @@
C_SEGMENT_FREE,
C_SEGMENT_FREE_OR_SAFE_POINT,
C_QUEUE_OLD_ENTRIES,
+ C_QUEUE_FINISHED_MORE_TASKS,
_C_TOTAL
};
diff --git a/rpython/translator/stm/src_stm/stmgc.h
b/rpython/translator/stm/src_stm/stmgc.h
--- a/rpython/translator/stm/src_stm/stmgc.h
+++ b/rpython/translator/stm/src_stm/stmgc.h
@@ -747,6 +747,11 @@
transaction (this is needed to ensure correctness). */
object_t *stm_queue_get(object_t *qobj, stm_queue_t *queue, double timeout,
stm_thread_local_t *tl);
+/* task_done() and join(): see https://docs.python.org/2/library/queue.html */
+void stm_queue_task_done(stm_queue_t *queue);
+/* join() commits and waits outside a transaction (so push roots).
+ Unsuitable if the current transaction is atomic! */
+long stm_queue_join(object_t *qobj, stm_queue_t *queue, stm_thread_local_t
*tl);
void stm_queue_tracefn(stm_queue_t *queue, void trace(object_t **));
diff --git a/rpython/translator/stm/test/test_ztranslated.py
b/rpython/translator/stm/test/test_ztranslated.py
--- a/rpython/translator/stm/test/test_ztranslated.py
+++ b/rpython/translator/stm/test/test_ztranslated.py
@@ -634,6 +634,18 @@
x2 = cast_gcref_to_instance(X, p2)
assert x2 is x1
#
+ q.task_done()
+ q.task_done()
+ res = q.join()
+ assert res == 0
+ res = q.join()
+ assert res == 0
+ if objectmodel.we_are_translated():
+ q.task_done()
+ q.task_done()
+ res = q.join()
+ assert res == -2
+ #
print "ok!"
return 0
_______________________________________________
pypy-commit mailing list
[email protected]
https://mail.python.org/mailman/listinfo/pypy-commit