Thanks Bozzo, that sorts those failures (so long as the issue from your other change is worked around, to let the build get that far).
Robbie On 9 July 2015 at 11:22, Bozo Dragojevic <bo...@digiverse.si> wrote: > Hi Robbie, > > Yeah, my bad. I was sitting on some local changes so I missed this. > I'll test on a clean checkout next time. Sorry for all the mess. > I've commited the missing methods to proton-j > > Bozzo > > On 9. 07. 15 12.14, Robbie Gemmell wrote: >> Hi Bozzo, >> >> This change also seems to be causing test failures when using the >> maven build (if you update things to get past the earlier failures, >> caused by the commit mentioned in the other thread on proton@): >> >> proton_tests.reactor.ExceptionTest.test_schedule_cancel ................. >> fail >> Error during test: Traceback (most recent call last): >> File "/home/gemmellr/workspace/proton/tests/python/proton-test", >> line 360, in run >> phase() >> File >> "/home/gemmellr/workspace/proton/tests/python/proton_tests/reactor.py", >> line 181, in test_schedule_cancel >> now = self.reactor.mark() >> File >> "/home/gemmellr/workspace/proton/tests/../proton-c/bindings/python/proton/reactor.py", >> line 118, in mark >> return pn_reactor_mark(self._impl) >> NameError: global name 'pn_reactor_mark' is not defined >> >> >> Robbie >> >> On 9 July 2015 at 10:55, Bozo Dragojevic <bo...@digiverse.si> wrote: >>> Hi Ken, >>> >>> I've installed python3.4 and tox and friends and tried to reproduce it here >>> and I can confirm that some completely unrelated test fail mysteriously >>> with python3.4 and that reverting my change makes that failure go away :) >>> >>> I've added more task cancellation tests to force the error while still >>> in the implicated code, as I suspected it could be some refcounting >>> problem but the new tests do not show anything unusual. >>> >>> What is even weirder, with the new tests even the python3.4 suite passes >>> without segfault! >>> >>> So, I consider this a false positive and have left the change in, >>> including the new tests at ca47d72. >>> >>> Does such solution work for you? >>> >>> Bozzo >>> >>> On 8. 07. 15 16.32, Ken Giusti wrote: >>>> Hi Bozzo, >>>> >>>> Can you please revert this change? >>>> >>>> It is causing a segfault in the python unit tests when they are run under >>>> python3.4. >>>> >>>> I haven't hit the segfault on python2.7, only on python3.4 >>>> >>>> thanks, >>>> >>>> -K >>>> >>>> ----- Original Message ----- >>>>> From: bo...@apache.org >>>>> To: comm...@qpid.apache.org >>>>> Sent: Tuesday, July 7, 2015 3:50:16 PM >>>>> Subject: [2/2] qpid-proton git commit: PROTON-928: cancellable tasks >>>>> >>>>> PROTON-928: cancellable tasks >>>>> >>>>> A scheduled task can be cancelled. >>>>> A cancelled task does not prevent reactor from stopping running >>>>> >>>>> >>>>> Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo >>>>> Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/d4d22ee3 >>>>> Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/d4d22ee3 >>>>> Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/d4d22ee3 >>>>> >>>>> Branch: refs/heads/master >>>>> Commit: d4d22ee396163babcac19c48845b1f10ca3b5a48 >>>>> Parents: 09af375 >>>>> Author: Bozo Dragojevic <bo...@digiverse.si> >>>>> Authored: Tue Jul 7 10:17:40 2015 +0200 >>>>> Committer: Bozo Dragojevic <bo...@digiverse.si> >>>>> Committed: Tue Jul 7 21:49:44 2015 +0200 >>>>> >>>>> ---------------------------------------------------------------------- >>>>> proton-c/bindings/python/proton/reactor.py | 5 +++- >>>>> proton-c/include/proton/reactor.h | 1 + >>>>> proton-c/src/reactor/timer.c | 25 +++++++++++++++++++- >>>>> proton-c/src/tests/reactor.c | 15 ++++++++++++ >>>>> .../org/apache/qpid/proton/reactor/Task.java | 4 ++++ >>>>> .../qpid/proton/reactor/impl/TaskImpl.java | 10 ++++++++ >>>>> .../apache/qpid/proton/reactor/impl/Timer.java | 19 ++++++++++++--- >>>>> proton-j/src/main/resources/creactor.py | 3 +++ >>>>> tests/python/proton_tests/reactor.py | 14 +++++++++++ >>>>> 9 files changed, 91 insertions(+), 5 deletions(-) >>>>> ---------------------------------------------------------------------- >>>>> >>>>> >>>>> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-c/bindings/python/proton/reactor.py >>>>> ---------------------------------------------------------------------- >>>>> diff --git a/proton-c/bindings/python/proton/reactor.py >>>>> b/proton-c/bindings/python/proton/reactor.py >>>>> index c66334b..d019554 100644 >>>>> --- a/proton-c/bindings/python/proton/reactor.py >>>>> +++ b/proton-c/bindings/python/proton/reactor.py >>>>> @@ -53,6 +53,9 @@ class Task(Wrapper): >>>>> def _init(self): >>>>> pass >>>>> >>>>> + def cancel(self): >>>>> + pn_task_cancel(self._impl) >>>>> + >>>>> class Acceptor(Wrapper): >>>>> >>>>> def __init__(self, impl): >>>>> @@ -112,7 +115,7 @@ class Reactor(Wrapper): >>>>> pn_reactor_yield(self._impl) >>>>> >>>>> def mark(self): >>>>> - pn_reactor_mark(self._impl) >>>>> + return pn_reactor_mark(self._impl) >>>>> >>>>> def _get_handler(self): >>>>> return WrappedHandler.wrap(pn_reactor_get_handler(self._impl), >>>>> self.on_error) >>>>> >>>>> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-c/include/proton/reactor.h >>>>> ---------------------------------------------------------------------- >>>>> diff --git a/proton-c/include/proton/reactor.h >>>>> b/proton-c/include/proton/reactor.h >>>>> index 59b2282..6f52d22 100644 >>>>> --- a/proton-c/include/proton/reactor.h >>>>> +++ b/proton-c/include/proton/reactor.h >>>>> @@ -96,6 +96,7 @@ PN_EXTERN pn_task_t *pn_timer_schedule(pn_timer_t >>>>> *timer, >>>>> pn_timestamp_t deadlin >>>>> PN_EXTERN int pn_timer_tasks(pn_timer_t *timer); >>>>> >>>>> PN_EXTERN pn_record_t *pn_task_attachments(pn_task_t *task); >>>>> +PN_EXTERN void pn_task_cancel(pn_task_t *task); >>>>> >>>>> PN_EXTERN pn_reactor_t *pn_class_reactor(const pn_class_t *clazz, void >>>>> *object); >>>>> PN_EXTERN pn_reactor_t *pn_object_reactor(void *object); >>>>> >>>>> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-c/src/reactor/timer.c >>>>> ---------------------------------------------------------------------- >>>>> diff --git a/proton-c/src/reactor/timer.c b/proton-c/src/reactor/timer.c >>>>> index 1ad0821..61efd31 100644 >>>>> --- a/proton-c/src/reactor/timer.c >>>>> +++ b/proton-c/src/reactor/timer.c >>>>> @@ -27,12 +27,14 @@ struct pn_task_t { >>>>> pn_list_t *pool; >>>>> pn_record_t *attachments; >>>>> pn_timestamp_t deadline; >>>>> + bool cancelled; >>>>> }; >>>>> >>>>> void pn_task_initialize(pn_task_t *task) { >>>>> task->pool = NULL; >>>>> task->attachments = pn_record(); >>>>> task->deadline = 0; >>>>> + task->cancelled = false; >>>>> } >>>>> >>>>> void pn_task_finalize(pn_task_t *task) { >>>>> @@ -68,6 +70,11 @@ pn_record_t *pn_task_attachments(pn_task_t *task) { >>>>> return task->attachments; >>>>> } >>>>> >>>>> +void pn_task_cancel(pn_task_t *task) { >>>>> + assert(task); >>>>> + task->cancelled = true; >>>>> +} >>>>> + >>>>> // >>>>> // timer >>>>> // >>>>> @@ -113,8 +120,22 @@ pn_task_t *pn_timer_schedule(pn_timer_t *timer, >>>>> pn_timestamp_t deadline) { >>>>> return task; >>>>> } >>>>> >>>>> +void pni_timer_flush_cancelled(pn_timer_t *timer) { >>>>> + while (pn_list_size(timer->tasks)) { >>>>> + pn_task_t *task = (pn_task_t *) pn_list_get(timer->tasks, 0); >>>>> + if (task->cancelled) { >>>>> + pn_task_t *min = (pn_task_t *) pn_list_minpop(timer->tasks); >>>>> + assert(min == task); >>>>> + pn_decref(min); >>>>> + } else { >>>>> + break; >>>>> + } >>>>> + } >>>>> +} >>>>> + >>>>> pn_timestamp_t pn_timer_deadline(pn_timer_t *timer) { >>>>> assert(timer); >>>>> + pni_timer_flush_cancelled(timer); >>>>> if (pn_list_size(timer->tasks)) { >>>>> pn_task_t *task = (pn_task_t *) pn_list_get(timer->tasks, 0); >>>>> return task->deadline; >>>>> @@ -130,7 +151,8 @@ void pn_timer_tick(pn_timer_t *timer, pn_timestamp_t >>>>> now) >>>>> { >>>>> if (now >= task->deadline) { >>>>> pn_task_t *min = (pn_task_t *) pn_list_minpop(timer->tasks); >>>>> assert(min == task); >>>>> - pn_collector_put(timer->collector, PN_OBJECT, min, PN_TIMER_TASK); >>>>> + if (!min->cancelled) >>>>> + pn_collector_put(timer->collector, PN_OBJECT, min, >>>>> PN_TIMER_TASK); >>>>> pn_decref(min); >>>>> } else { >>>>> break; >>>>> @@ -140,5 +162,6 @@ void pn_timer_tick(pn_timer_t *timer, pn_timestamp_t >>>>> now) >>>>> { >>>>> >>>>> int pn_timer_tasks(pn_timer_t *timer) { >>>>> assert(timer); >>>>> + pni_timer_flush_cancelled(timer); >>>>> return pn_list_size(timer->tasks); >>>>> } >>>>> >>>>> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-c/src/tests/reactor.c >>>>> ---------------------------------------------------------------------- >>>>> diff --git a/proton-c/src/tests/reactor.c b/proton-c/src/tests/reactor.c >>>>> index fe6c769..059d099 100644 >>>>> --- a/proton-c/src/tests/reactor.c >>>>> +++ b/proton-c/src/tests/reactor.c >>>>> @@ -440,6 +440,20 @@ static void test_reactor_schedule_handler(void) { >>>>> pn_free(tevents); >>>>> } >>>>> >>>>> +static void test_reactor_schedule_cancel(void) { >>>>> + pn_reactor_t *reactor = pn_reactor(); >>>>> + pn_handler_t *root = pn_reactor_get_handler(reactor); >>>>> + pn_list_t *events = pn_list(PN_VOID, 0); >>>>> + pn_handler_add(root, test_handler(reactor, events)); >>>>> + pn_task_t *task = pn_reactor_schedule(reactor, 0, NULL); >>>>> + pn_task_cancel(task); >>>>> + pn_reactor_run(reactor); >>>>> + pn_reactor_free(reactor); >>>>> + expect(events, PN_REACTOR_INIT, PN_SELECTABLE_INIT, >>>>> PN_SELECTABLE_UPDATED, >>>>> + PN_SELECTABLE_FINAL, PN_REACTOR_FINAL, END); >>>>> + pn_free(events); >>>>> +} >>>>> + >>>>> int main(int argc, char **argv) >>>>> { >>>>> test_reactor(); >>>>> @@ -461,5 +475,6 @@ int main(int argc, char **argv) >>>>> test_reactor_transfer(4*1024, 1024); >>>>> test_reactor_schedule(); >>>>> test_reactor_schedule_handler(); >>>>> + test_reactor_schedule_cancel(); >>>>> return 0; >>>>> } >>>>> >>>>> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java >>>>> ---------------------------------------------------------------------- >>>>> diff --git >>>>> a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java >>>>> b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java >>>>> index 69701ab..7fb5964 100644 >>>>> --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java >>>>> +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java >>>>> @@ -43,4 +43,8 @@ public interface Task extends Extendable { >>>>> /** @return the reactor that created this task. */ >>>>> Reactor getReactor(); >>>>> >>>>> + /** >>>>> + * Cancel the execution of this task. No-op if invoked after the task >>>>> was already executed. >>>>> + */ >>>>> + void cancel(); >>>>> } >>>>> >>>>> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java >>>>> ---------------------------------------------------------------------- >>>>> diff --git >>>>> a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java >>>>> b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java >>>>> index 00c9a84..11bb6b8 100644 >>>>> --- >>>>> a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java >>>>> +++ >>>>> b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java >>>>> @@ -31,6 +31,7 @@ import org.apache.qpid.proton.reactor.Task; >>>>> public class TaskImpl implements Task, Comparable<TaskImpl> { >>>>> private final long deadline; >>>>> private final int counter; >>>>> + private boolean cancelled = false; >>>>> private final AtomicInteger count = new AtomicInteger(); >>>>> private Record attachments = new RecordImpl(); >>>>> private Reactor reactor; >>>>> @@ -58,6 +59,15 @@ public class TaskImpl implements Task, >>>>> Comparable<TaskImpl> { >>>>> return deadline; >>>>> } >>>>> >>>>> + public boolean isCancelled() { >>>>> + return cancelled; >>>>> + } >>>>> + >>>>> + @Override >>>>> + public void cancel() { >>>>> + cancelled = true; >>>>> + } >>>>> + >>>>> public void setReactor(Reactor reactor) { >>>>> this.reactor = reactor; >>>>> } >>>>> >>>>> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java >>>>> ---------------------------------------------------------------------- >>>>> diff --git >>>>> a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java >>>>> b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java >>>>> index 32bb4f6..b8df19d 100644 >>>>> --- >>>>> a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java >>>>> +++ >>>>> b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java >>>>> @@ -31,7 +31,7 @@ import org.apache.qpid.proton.reactor.Task; >>>>> public class Timer { >>>>> >>>>> private CollectorImpl collector; >>>>> - private PriorityQueue<Task> tasks = new PriorityQueue<Task>(); >>>>> + private PriorityQueue<TaskImpl> tasks = new >>>>> PriorityQueue<TaskImpl>(); >>>>> >>>>> public Timer(Collector collector) { >>>>> this.collector = (CollectorImpl)collector; >>>>> @@ -44,6 +44,7 @@ public class Timer { >>>>> } >>>>> >>>>> long deadline() { >>>>> + flushCancelled(); >>>>> if (tasks.size() > 0) { >>>>> Task task = tasks.peek(); >>>>> return task.deadline(); >>>>> @@ -52,12 +53,23 @@ public class Timer { >>>>> } >>>>> } >>>>> >>>>> + private void flushCancelled() { >>>>> + while (!tasks.isEmpty()) { >>>>> + TaskImpl task = tasks.peek(); >>>>> + if (task.isCancelled()) >>>>> + tasks.poll(); >>>>> + else >>>>> + break; >>>>> + } >>>>> + } >>>>> + >>>>> void tick(long now) { >>>>> while(!tasks.isEmpty()) { >>>>> - Task task = tasks.peek(); >>>>> + TaskImpl task = tasks.peek(); >>>>> if (now >= task.deadline()) { >>>>> tasks.poll(); >>>>> - collector.put(Type.TIMER_TASK, task); >>>>> + if (!task.isCancelled()) >>>>> + collector.put(Type.TIMER_TASK, task); >>>>> } else { >>>>> break; >>>>> } >>>>> @@ -65,6 +77,7 @@ public class Timer { >>>>> } >>>>> >>>>> int tasks() { >>>>> + flushCancelled(); >>>>> return tasks.size(); >>>>> } >>>>> } >>>>> >>>>> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-j/src/main/resources/creactor.py >>>>> ---------------------------------------------------------------------- >>>>> diff --git a/proton-j/src/main/resources/creactor.py >>>>> b/proton-j/src/main/resources/creactor.py >>>>> index e179b23..1f8514e 100644 >>>>> --- a/proton-j/src/main/resources/creactor.py >>>>> +++ b/proton-j/src/main/resources/creactor.py >>>>> @@ -78,6 +78,9 @@ def pn_selectable_set_fd(s, fd): >>>>> def pn_acceptor_close(a): >>>>> a.close() >>>>> >>>>> +def pn_task_cancel(t): >>>>> + t.cancel() >>>>> + >>>>> def pn_object_reactor(o): >>>>> if hasattr(o, "impl"): >>>>> if hasattr(o.impl, "getSession"): >>>>> >>>>> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/tests/python/proton_tests/reactor.py >>>>> ---------------------------------------------------------------------- >>>>> diff --git a/tests/python/proton_tests/reactor.py >>>>> b/tests/python/proton_tests/reactor.py >>>>> index 6afee30..067c5c0 100644 >>>>> --- a/tests/python/proton_tests/reactor.py >>>>> +++ b/tests/python/proton_tests/reactor.py >>>>> @@ -171,3 +171,17 @@ class ExceptionTest(Test): >>>>> assert False, "expected to barf" >>>>> except Barf: >>>>> pass >>>>> + >>>>> + def test_schedule_cancel(self): >>>>> + barf = self.reactor.schedule(10, BarfOnTask()) >>>>> + class CancelBarf: >>>>> + def on_timer_task(self, event): >>>>> + barf.cancel() >>>>> + self.reactor.schedule(0, CancelBarf()) >>>>> + now = self.reactor.mark() >>>>> + try: >>>>> + self.reactor.run() >>>>> + elapsed = self.reactor.mark() - now >>>>> + assert elapsed < 10, "expected cancelled task to not delay >>>>> the >>>>> reactor by " + elapsed >>>>> + except Barf: >>>>> + assert False, "expected barf to be cancelled" >>>>> >>>>> >>>>> --------------------------------------------------------------------- >>>>> To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org >>>>> For additional commands, e-mail: commits-h...@qpid.apache.org >>>>> >>>>> >