OK. I pushed to master in my repo. You can do the merge now. The relevant API methods look like this:
https://github.com/leoneu/s4-piper/blob/master/subprojects/s4-core/src/main/java/io/s4/core/ProcessingElement.java /* what used to be processInputEvent(). */ abstract protected void onEvent(Event event); // gets called on every event. /* what used to be processOutputEvent(). */ abstract public void onTrigger(Event event); // it is called based on the triggers for the event type /** * This method is called by the PE timer. By default it is synchronized with * the {@link #onEvent()} and {@link #onTrigger()} methods. To execute * concurrently with other methods, the {@link ProcessingElelment} subclass * must be annotated with {@link @ThreadSafe}. * * Override this method to implement a periodic process. */ void onTime() {} /** * This trigger is fired when the following conditions occur: * * <ul> * <li>An event of eventType arrived to the PE instance * <li>numEvents have arrived since the last time this trigger * was fired -OR- time since last event is greater than interval. * </ul> * * When the trigger fires, the method <tt>trigger(EventType event)</tt> is * called. Where <tt>EventType</tt> matches the argument eventType. * * @param eventType * - the type of event on which this trigger will fire. * @param numEvents * - number of events since last trigger activation. Must be * greater than zero. (Set to one to trigger on every input * event.) * @param interval * - minimum time between triggers. * @param timeUnit * - the TimeUnit for the argument interval. */ public void setTrigger(Class<? extends Event> eventType, int numEvents, long interval, TimeUnit timeUnit) {} /** * Set a timer that calls {@link #onTime()}. * * If {@code interval==0} the timer is disabled. * * @param interval * in timeUnit * @param timeUnit * the timeUnit of interval */ public void setTimerInterval(long interval, TimeUnit timeUnit) {} On Thu, Oct 13, 2011 at 9:41 AM, Leo Neumeyer <[email protected]> wrote: > After some skype we converged to the following: > > We support synchronous and asynchronous method triggers. The > synchronous trigger is activated when an input event arrives and the > asynchronous one is triggered by a timer. > > Proposed solution: > > * Synchronous methods: > > Signature: processEvent(SomeEvent event) > Default behavior: trigger on every event > Configuration: configure triggers by Event type, synchronized with all > processEvent() and periodicTask() methods unless PE is annotated with > @ThreadSafe > > * Asynchronous method: > > Signature: periodicTask() > Default behavior: no trigger > Configuration: interval in time unit, synchronized with all > processEvent() methods unless PE is annotated with @ThreadSafe > > -leo > > > On Thu, Oct 13, 2011 at 7:21 AM, Matthieu Morel > <[email protected]> wrote: >> On Wed, Oct 12, 2011 at 8:18 PM, Leo Neumeyer <[email protected]> wrote: >> >>> >>> ... >>> >>> > >>> >> A few thoughts: >>> >> >>> >> Input/Output names may not make a lot of sense any longer. What about >>> >> changing to the following: >>> >> >>> >> processEvent(Event1 event) with any of the synchronous configurations. >>> >> >>> >> timerTask() called when there is a timer task configured. >>> >> >>> >> The timer task needs to be synchronized with the processEvent method >>> >> (unless the PE has a @ThreadSafe annotation), we can keep the internal >>> dummy >>> >> event to synchronize but call timerTask() without an Event instead. I'm >>> sure >>> >> there are other ways of doing this. >>> >> >>> >> Does this make sense? We need to think a bit. >>> >> >>> > >>> > Indeed, we lose some consistency in the API by handling output triggers >>> in 2 >>> > different methods, depending on the trigger (event count based or timer >>> > based) >>> > >>> > The original design (as described in the S4 paper) uses a "processEvent" >>> > method for input events and an "output" method for triggered outputs. Why >>> > not just keep that design? For the dynamic overload dispatcher, that >>> would >>> > just mean removing the code for "processOutputEvent". >>> > >>> >>> Not sure I understand what you propose. I think we are saying the same: >>> >>> the processEvent() method can be configured to be triggered: >>> >>> - on every matching input event >>> - on every N matching input events >>> - on input event if time since last event is greater than delta >>> >> >> This is a part which is not clear to me. My understanding was that the >> output method might be triggered at periodic intervals, not the input >> processing method. You seem to want to mix both methods, which is a bit >> confusing to me. But it is also possible. >> >> >> >>> >>> We still need another method for periodic tasks, maybe a good name is >>> periodicTask() >>> >> >> is it a periodic task that we need (and we may), or is it a periodically >> invokable output method? >> >> >> >>> This method is synchronized with processEvent() (unless PS is >>> @ThreadSafe) and it doesn't need to pass an event (because it is >>> asynchronous). >>> >>> So it's not just removing processOutputEvent(), we need an additional >>> method for periodic tasks that is synchronized with processEvent(). >>> >> >> That's the "output" method in the S4 paper, no? >> >> >> >> Matthieu >> > > > > -- > > -leo > -- -leo
