Last month I posted some example code to show how to do Functional Reactive Programming in Oz. That code was incomplete since it did not avoid "glitches". For example, in the expression (a + (a div 3)), if a new event arrives for a, then a single new output has to be calculated. Because of thread scheduling, we might get a temporary wrong output (e.g., if the + is scheduled before the div).
Here is an updated version of FRP that avoids this problem. It correctly does global synchronization to avoid glitches. It works as follows: when a new input event arrives, then the computation is allowed to calculate until all its threads suspend simultaneously. At that point, we know that the output is correct. We use computation spaces to wait until all threads suspend simultaneously. Peter
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% % Functional Reactive Programming in Oz % This version implements FRP with a global synchronization % that avoids glitches % Author: Peter Van Roy % Date: May 30, 2006 % FRP is a generalization of functional programming where an % argument value can be replaced by a more recent value (this % is called an "event") and any changes have to be propagated % to other functions. If a new input value arrives and the % output does not change then no new output value is generated. % This is a kind of dataflow programming. It can handle % nondeterminism, so it is strictly more expressive than % declarative concurrency. It can be expressed with two new % concepts: (1) nondeterministic choice and (2) synchronizing % on a set of threads all suspending at once. The first is % implemented here using WaitTwo and the second is implemented % here using computation spaces with the askVerbose operation. % FRP is similar in expressiveness to concurrent logic % programming. The main difference is that FRP adds scheduling % constraints to avoid temporarily violating invariants. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% % First part: Local FRP operation % Function with two input streams and one output stream % If a new event arrives on the input, a new event is generated % on the output, but only if the output changes % Waits until at least one argument is bound % Fairly returns 1 if X is bound or 2 if Y is bound declare fun {WaitTwo X Y} {Record.waitOr X#Y} end % When a stream terminates with nil, it means there are no more changes % Remembers last output and generates new output only if it is different declare fun {TwoArgOp F} fun {$ As Bs} thread fun {OpLoop Aval Bval Oval As Bs} case {WaitTwo As Bs} of 1 then case As of A|Ar then Nval={F A Bval} in if Oval==Nval then {OpLoop A Bval Oval Ar Bs} else Nval|{OpLoop A Bval Nval Ar Bs} end [] nil then % Slightly nonoptimal: May give repeats if one stream is done % (Fixing this is an exercise for the reader!) {Map Bs fun {$ B} {F Aval B} end} end [] 2 then case Bs of B|Br then Nval={F Aval B} in if Oval==Nval then {OpLoop Aval B Oval As Br} else Nval|{OpLoop Aval B Nval As Br} end [] nil then % Slightly nonoptimal: May give repeats if one stream is done {Map As fun {$ A} {F A Bval} end} end end end fun {FirstValue As} case As of A|Ar then A#Ar else if {IsInt As} then As#nil end end end % Wait until each stream has its first value Aval#Anext={FirstValue As} Bval#Bnext={FirstValue Bs} Oval={F Aval Bval} in % Execution loop Oval|{OpLoop Aval Bval Oval Anext Bnext} end end end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% % Second part: Global FRP synchronization % Avoid glitches by using computation spaces to synchronize % on suspension of all threads. The output is read only when % all threads are suspended, which avoids glitches. % Return current last element of a stream declare fun {End S} case S of X|S2 then if {IsDet S2} then {End S2} else X end end end % Return unbound tail of a stream declare fun {Tail S} if {IsDet S} then {Tail S.2} else S end end % One step of a calculation % Inject a new input event; then calculate internally and % possibly generate a new output event. Avoid glitches % by using askVerbose to wait until all threads inside the % computation space are simultaneously suspended. Then % detect whether a new output event was generated or not. declare proc {Step X} Z={Tail Bs} % Get unbound tail of output stream in {Port.send P1 X} % Inject an input event {Wait {Space.askVerbose Sp}} % Wait for fixpoint % Detect if an output event was generated: if {IsDet Z} then {Browse outevent({End Bs})} else {Browse nooutevent} end end % Example calculation inside of a computation space declare P1 As P2 Bs Sp in {NewPort As P1} % Input event stream {NewPort Bs P2} % Output event stream Sp={Space.new proc {$ _} Add Div Rs in % Define calculation % Example: r=(a div 2)+(a div 3) Add={TwoArgOp fun {$ A B} A+B end} Div={TwoArgOp fun {$ A B} A div B end} Rs={Add {Div As 2} {Div As 3}} thread for X in Rs do {Port.send P2 X} end end end} % Example: Insert some input events {Step 10} {Step 11} {Step 12} {Step 13} {Step 14} %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
_________________________________________________________________________________ mozart-users mailing list mozart-users@ps.uni-sb.de http://www.mozart-oz.org/mailman/listinfo/mozart-users