Hi Robbie, Yeah, my bad. I was sitting on some local changes so I missed this. I'll test on a clean checkout next time. Sorry for all the mess. I've commited the missing methods to proton-j
Bozzo On 9. 07. 15 12.14, Robbie Gemmell wrote: > Hi Bozzo, > > This change also seems to be causing test failures when using the > maven build (if you update things to get past the earlier failures, > caused by the commit mentioned in the other thread on proton@): > > proton_tests.reactor.ExceptionTest.test_schedule_cancel ................. fail > Error during test: Traceback (most recent call last): > File "/home/gemmellr/workspace/proton/tests/python/proton-test", > line 360, in run > phase() > File > "/home/gemmellr/workspace/proton/tests/python/proton_tests/reactor.py", > line 181, in test_schedule_cancel > now = self.reactor.mark() > File > "/home/gemmellr/workspace/proton/tests/../proton-c/bindings/python/proton/reactor.py", > line 118, in mark > return pn_reactor_mark(self._impl) > NameError: global name 'pn_reactor_mark' is not defined > > > Robbie > > On 9 July 2015 at 10:55, Bozo Dragojevic <bo...@digiverse.si> wrote: >> Hi Ken, >> >> I've installed python3.4 and tox and friends and tried to reproduce it here >> and I can confirm that some completely unrelated test fail mysteriously >> with python3.4 and that reverting my change makes that failure go away :) >> >> I've added more task cancellation tests to force the error while still >> in the implicated code, as I suspected it could be some refcounting >> problem but the new tests do not show anything unusual. >> >> What is even weirder, with the new tests even the python3.4 suite passes >> without segfault! >> >> So, I consider this a false positive and have left the change in, >> including the new tests at ca47d72. >> >> Does such solution work for you? >> >> Bozzo >> >> On 8. 07. 15 16.32, Ken Giusti wrote: >>> Hi Bozzo, >>> >>> Can you please revert this change? >>> >>> It is causing a segfault in the python unit tests when they are run under >>> python3.4. >>> >>> I haven't hit the segfault on python2.7, only on python3.4 >>> >>> thanks, >>> >>> -K >>> >>> ----- Original Message ----- >>>> From: bo...@apache.org >>>> To: comm...@qpid.apache.org >>>> Sent: Tuesday, July 7, 2015 3:50:16 PM >>>> Subject: [2/2] qpid-proton git commit: PROTON-928: cancellable tasks >>>> >>>> PROTON-928: cancellable tasks >>>> >>>> A scheduled task can be cancelled. >>>> A cancelled task does not prevent reactor from stopping running >>>> >>>> >>>> Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo >>>> Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/d4d22ee3 >>>> Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/d4d22ee3 >>>> Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/d4d22ee3 >>>> >>>> Branch: refs/heads/master >>>> Commit: d4d22ee396163babcac19c48845b1f10ca3b5a48 >>>> Parents: 09af375 >>>> Author: Bozo Dragojevic <bo...@digiverse.si> >>>> Authored: Tue Jul 7 10:17:40 2015 +0200 >>>> Committer: Bozo Dragojevic <bo...@digiverse.si> >>>> Committed: Tue Jul 7 21:49:44 2015 +0200 >>>> >>>> ---------------------------------------------------------------------- >>>> proton-c/bindings/python/proton/reactor.py | 5 +++- >>>> proton-c/include/proton/reactor.h | 1 + >>>> proton-c/src/reactor/timer.c | 25 +++++++++++++++++++- >>>> proton-c/src/tests/reactor.c | 15 ++++++++++++ >>>> .../org/apache/qpid/proton/reactor/Task.java | 4 ++++ >>>> .../qpid/proton/reactor/impl/TaskImpl.java | 10 ++++++++ >>>> .../apache/qpid/proton/reactor/impl/Timer.java | 19 ++++++++++++--- >>>> proton-j/src/main/resources/creactor.py | 3 +++ >>>> tests/python/proton_tests/reactor.py | 14 +++++++++++ >>>> 9 files changed, 91 insertions(+), 5 deletions(-) >>>> ---------------------------------------------------------------------- >>>> >>>> >>>> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-c/bindings/python/proton/reactor.py >>>> ---------------------------------------------------------------------- >>>> diff --git a/proton-c/bindings/python/proton/reactor.py >>>> b/proton-c/bindings/python/proton/reactor.py >>>> index c66334b..d019554 100644 >>>> --- a/proton-c/bindings/python/proton/reactor.py >>>> +++ b/proton-c/bindings/python/proton/reactor.py >>>> @@ -53,6 +53,9 @@ class Task(Wrapper): >>>> def _init(self): >>>> pass >>>> >>>> + def cancel(self): >>>> + pn_task_cancel(self._impl) >>>> + >>>> class Acceptor(Wrapper): >>>> >>>> def __init__(self, impl): >>>> @@ -112,7 +115,7 @@ class Reactor(Wrapper): >>>> pn_reactor_yield(self._impl) >>>> >>>> def mark(self): >>>> - pn_reactor_mark(self._impl) >>>> + return pn_reactor_mark(self._impl) >>>> >>>> def _get_handler(self): >>>> return WrappedHandler.wrap(pn_reactor_get_handler(self._impl), >>>> self.on_error) >>>> >>>> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-c/include/proton/reactor.h >>>> ---------------------------------------------------------------------- >>>> diff --git a/proton-c/include/proton/reactor.h >>>> b/proton-c/include/proton/reactor.h >>>> index 59b2282..6f52d22 100644 >>>> --- a/proton-c/include/proton/reactor.h >>>> +++ b/proton-c/include/proton/reactor.h >>>> @@ -96,6 +96,7 @@ PN_EXTERN pn_task_t *pn_timer_schedule(pn_timer_t *timer, >>>> pn_timestamp_t deadlin >>>> PN_EXTERN int pn_timer_tasks(pn_timer_t *timer); >>>> >>>> PN_EXTERN pn_record_t *pn_task_attachments(pn_task_t *task); >>>> +PN_EXTERN void pn_task_cancel(pn_task_t *task); >>>> >>>> PN_EXTERN pn_reactor_t *pn_class_reactor(const pn_class_t *clazz, void >>>> *object); >>>> PN_EXTERN pn_reactor_t *pn_object_reactor(void *object); >>>> >>>> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-c/src/reactor/timer.c >>>> ---------------------------------------------------------------------- >>>> diff --git a/proton-c/src/reactor/timer.c b/proton-c/src/reactor/timer.c >>>> index 1ad0821..61efd31 100644 >>>> --- a/proton-c/src/reactor/timer.c >>>> +++ b/proton-c/src/reactor/timer.c >>>> @@ -27,12 +27,14 @@ struct pn_task_t { >>>> pn_list_t *pool; >>>> pn_record_t *attachments; >>>> pn_timestamp_t deadline; >>>> + bool cancelled; >>>> }; >>>> >>>> void pn_task_initialize(pn_task_t *task) { >>>> task->pool = NULL; >>>> task->attachments = pn_record(); >>>> task->deadline = 0; >>>> + task->cancelled = false; >>>> } >>>> >>>> void pn_task_finalize(pn_task_t *task) { >>>> @@ -68,6 +70,11 @@ pn_record_t *pn_task_attachments(pn_task_t *task) { >>>> return task->attachments; >>>> } >>>> >>>> +void pn_task_cancel(pn_task_t *task) { >>>> + assert(task); >>>> + task->cancelled = true; >>>> +} >>>> + >>>> // >>>> // timer >>>> // >>>> @@ -113,8 +120,22 @@ pn_task_t *pn_timer_schedule(pn_timer_t *timer, >>>> pn_timestamp_t deadline) { >>>> return task; >>>> } >>>> >>>> +void pni_timer_flush_cancelled(pn_timer_t *timer) { >>>> + while (pn_list_size(timer->tasks)) { >>>> + pn_task_t *task = (pn_task_t *) pn_list_get(timer->tasks, 0); >>>> + if (task->cancelled) { >>>> + pn_task_t *min = (pn_task_t *) pn_list_minpop(timer->tasks); >>>> + assert(min == task); >>>> + pn_decref(min); >>>> + } else { >>>> + break; >>>> + } >>>> + } >>>> +} >>>> + >>>> pn_timestamp_t pn_timer_deadline(pn_timer_t *timer) { >>>> assert(timer); >>>> + pni_timer_flush_cancelled(timer); >>>> if (pn_list_size(timer->tasks)) { >>>> pn_task_t *task = (pn_task_t *) pn_list_get(timer->tasks, 0); >>>> return task->deadline; >>>> @@ -130,7 +151,8 @@ void pn_timer_tick(pn_timer_t *timer, pn_timestamp_t >>>> now) >>>> { >>>> if (now >= task->deadline) { >>>> pn_task_t *min = (pn_task_t *) pn_list_minpop(timer->tasks); >>>> assert(min == task); >>>> - pn_collector_put(timer->collector, PN_OBJECT, min, PN_TIMER_TASK); >>>> + if (!min->cancelled) >>>> + pn_collector_put(timer->collector, PN_OBJECT, min, >>>> PN_TIMER_TASK); >>>> pn_decref(min); >>>> } else { >>>> break; >>>> @@ -140,5 +162,6 @@ void pn_timer_tick(pn_timer_t *timer, pn_timestamp_t >>>> now) >>>> { >>>> >>>> int pn_timer_tasks(pn_timer_t *timer) { >>>> assert(timer); >>>> + pni_timer_flush_cancelled(timer); >>>> return pn_list_size(timer->tasks); >>>> } >>>> >>>> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-c/src/tests/reactor.c >>>> ---------------------------------------------------------------------- >>>> diff --git a/proton-c/src/tests/reactor.c b/proton-c/src/tests/reactor.c >>>> index fe6c769..059d099 100644 >>>> --- a/proton-c/src/tests/reactor.c >>>> +++ b/proton-c/src/tests/reactor.c >>>> @@ -440,6 +440,20 @@ static void test_reactor_schedule_handler(void) { >>>> pn_free(tevents); >>>> } >>>> >>>> +static void test_reactor_schedule_cancel(void) { >>>> + pn_reactor_t *reactor = pn_reactor(); >>>> + pn_handler_t *root = pn_reactor_get_handler(reactor); >>>> + pn_list_t *events = pn_list(PN_VOID, 0); >>>> + pn_handler_add(root, test_handler(reactor, events)); >>>> + pn_task_t *task = pn_reactor_schedule(reactor, 0, NULL); >>>> + pn_task_cancel(task); >>>> + pn_reactor_run(reactor); >>>> + pn_reactor_free(reactor); >>>> + expect(events, PN_REACTOR_INIT, PN_SELECTABLE_INIT, >>>> PN_SELECTABLE_UPDATED, >>>> + PN_SELECTABLE_FINAL, PN_REACTOR_FINAL, END); >>>> + pn_free(events); >>>> +} >>>> + >>>> int main(int argc, char **argv) >>>> { >>>> test_reactor(); >>>> @@ -461,5 +475,6 @@ int main(int argc, char **argv) >>>> test_reactor_transfer(4*1024, 1024); >>>> test_reactor_schedule(); >>>> test_reactor_schedule_handler(); >>>> + test_reactor_schedule_cancel(); >>>> return 0; >>>> } >>>> >>>> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java >>>> ---------------------------------------------------------------------- >>>> diff --git >>>> a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java >>>> b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java >>>> index 69701ab..7fb5964 100644 >>>> --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java >>>> +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java >>>> @@ -43,4 +43,8 @@ public interface Task extends Extendable { >>>> /** @return the reactor that created this task. */ >>>> Reactor getReactor(); >>>> >>>> + /** >>>> + * Cancel the execution of this task. No-op if invoked after the task >>>> was already executed. >>>> + */ >>>> + void cancel(); >>>> } >>>> >>>> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java >>>> ---------------------------------------------------------------------- >>>> diff --git >>>> a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java >>>> b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java >>>> index 00c9a84..11bb6b8 100644 >>>> --- >>>> a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java >>>> +++ >>>> b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java >>>> @@ -31,6 +31,7 @@ import org.apache.qpid.proton.reactor.Task; >>>> public class TaskImpl implements Task, Comparable<TaskImpl> { >>>> private final long deadline; >>>> private final int counter; >>>> + private boolean cancelled = false; >>>> private final AtomicInteger count = new AtomicInteger(); >>>> private Record attachments = new RecordImpl(); >>>> private Reactor reactor; >>>> @@ -58,6 +59,15 @@ public class TaskImpl implements Task, >>>> Comparable<TaskImpl> { >>>> return deadline; >>>> } >>>> >>>> + public boolean isCancelled() { >>>> + return cancelled; >>>> + } >>>> + >>>> + @Override >>>> + public void cancel() { >>>> + cancelled = true; >>>> + } >>>> + >>>> public void setReactor(Reactor reactor) { >>>> this.reactor = reactor; >>>> } >>>> >>>> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java >>>> ---------------------------------------------------------------------- >>>> diff --git >>>> a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java >>>> b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java >>>> index 32bb4f6..b8df19d 100644 >>>> --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java >>>> +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java >>>> @@ -31,7 +31,7 @@ import org.apache.qpid.proton.reactor.Task; >>>> public class Timer { >>>> >>>> private CollectorImpl collector; >>>> - private PriorityQueue<Task> tasks = new PriorityQueue<Task>(); >>>> + private PriorityQueue<TaskImpl> tasks = new PriorityQueue<TaskImpl>(); >>>> >>>> public Timer(Collector collector) { >>>> this.collector = (CollectorImpl)collector; >>>> @@ -44,6 +44,7 @@ public class Timer { >>>> } >>>> >>>> long deadline() { >>>> + flushCancelled(); >>>> if (tasks.size() > 0) { >>>> Task task = tasks.peek(); >>>> return task.deadline(); >>>> @@ -52,12 +53,23 @@ public class Timer { >>>> } >>>> } >>>> >>>> + private void flushCancelled() { >>>> + while (!tasks.isEmpty()) { >>>> + TaskImpl task = tasks.peek(); >>>> + if (task.isCancelled()) >>>> + tasks.poll(); >>>> + else >>>> + break; >>>> + } >>>> + } >>>> + >>>> void tick(long now) { >>>> while(!tasks.isEmpty()) { >>>> - Task task = tasks.peek(); >>>> + TaskImpl task = tasks.peek(); >>>> if (now >= task.deadline()) { >>>> tasks.poll(); >>>> - collector.put(Type.TIMER_TASK, task); >>>> + if (!task.isCancelled()) >>>> + collector.put(Type.TIMER_TASK, task); >>>> } else { >>>> break; >>>> } >>>> @@ -65,6 +77,7 @@ public class Timer { >>>> } >>>> >>>> int tasks() { >>>> + flushCancelled(); >>>> return tasks.size(); >>>> } >>>> } >>>> >>>> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-j/src/main/resources/creactor.py >>>> ---------------------------------------------------------------------- >>>> diff --git a/proton-j/src/main/resources/creactor.py >>>> b/proton-j/src/main/resources/creactor.py >>>> index e179b23..1f8514e 100644 >>>> --- a/proton-j/src/main/resources/creactor.py >>>> +++ b/proton-j/src/main/resources/creactor.py >>>> @@ -78,6 +78,9 @@ def pn_selectable_set_fd(s, fd): >>>> def pn_acceptor_close(a): >>>> a.close() >>>> >>>> +def pn_task_cancel(t): >>>> + t.cancel() >>>> + >>>> def pn_object_reactor(o): >>>> if hasattr(o, "impl"): >>>> if hasattr(o.impl, "getSession"): >>>> >>>> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/tests/python/proton_tests/reactor.py >>>> ---------------------------------------------------------------------- >>>> diff --git a/tests/python/proton_tests/reactor.py >>>> b/tests/python/proton_tests/reactor.py >>>> index 6afee30..067c5c0 100644 >>>> --- a/tests/python/proton_tests/reactor.py >>>> +++ b/tests/python/proton_tests/reactor.py >>>> @@ -171,3 +171,17 @@ class ExceptionTest(Test): >>>> assert False, "expected to barf" >>>> except Barf: >>>> pass >>>> + >>>> + def test_schedule_cancel(self): >>>> + barf = self.reactor.schedule(10, BarfOnTask()) >>>> + class CancelBarf: >>>> + def on_timer_task(self, event): >>>> + barf.cancel() >>>> + self.reactor.schedule(0, CancelBarf()) >>>> + now = self.reactor.mark() >>>> + try: >>>> + self.reactor.run() >>>> + elapsed = self.reactor.mark() - now >>>> + assert elapsed < 10, "expected cancelled task to not delay the >>>> reactor by " + elapsed >>>> + except Barf: >>>> + assert False, "expected barf to be cancelled" >>>> >>>> >>>> --------------------------------------------------------------------- >>>> To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org >>>> For additional commands, e-mail: commits-h...@qpid.apache.org >>>> >>>>