Last month I posted some example code to show how to do
Functional Reactive Programming in Oz.  That code was
incomplete since it did not avoid "glitches".  For example,
in the expression (a + (a div 3)), if a new event arrives for a,
then a single new output has to be calculated.  Because of
thread scheduling, we might get a temporary wrong output
(e.g., if the + is scheduled before the div).

Here is an updated version of FRP that avoids this problem.
It correctly does global synchronization to avoid glitches.
It works as follows: when a new input event arrives, then
the computation is allowed to calculate until all its threads
suspend simultaneously.  At that point, we know that the
output is correct.  We use computation spaces to wait until
all threads suspend simultaneously.



% Functional Reactive Programming in Oz

% This version implements FRP with a global synchronization
% that avoids glitches

% Author: Peter Van Roy
% Date: May 30, 2006

% FRP is a generalization of functional programming where an
% argument value can be replaced by a more recent value (this
% is called an "event") and any changes have to be propagated
% to other functions. If a new input value arrives and the
% output does not change then no new output value is generated.
% This is a kind of dataflow programming.  It can handle
% nondeterminism, so it is strictly more expressive than
% declarative concurrency.  It can be expressed with two new
% concepts: (1) nondeterministic choice and (2) synchronizing
% on a set of threads all suspending at once.  The first is
% implemented here using WaitTwo and the second is implemented
% here using computation spaces with the askVerbose operation.

% FRP is similar in expressiveness to concurrent logic
% programming.  The main difference is that FRP adds scheduling
% constraints to avoid temporarily violating invariants.


% First part: Local FRP operation

% Function with two input streams and one output stream
% If a new event arrives on the input, a new event is generated
% on the output, but only if the output changes

% Waits until at least one argument is bound
% Fairly returns 1 if X is bound or 2 if Y is bound
fun {WaitTwo X Y}
   {Record.waitOr X#Y}

% When a stream terminates with nil, it means there are no more changes
% Remembers last output and generates new output only if it is different
fun {TwoArgOp F}
   fun {$ As Bs}
         fun {OpLoop Aval Bval Oval As Bs}
            case {WaitTwo As Bs}
            of 1 then
               case As of A|Ar then Nval={F A Bval} in
                  if Oval==Nval then
                     {OpLoop A Bval Oval Ar Bs}
                     Nval|{OpLoop A Bval Nval Ar Bs}
            [] nil then
               % Slightly nonoptimal: May give repeats if one stream is done
               % (Fixing this is an exercise for the reader!)
               {Map Bs fun {$ B} {F Aval B} end}
            [] 2 then
               case Bs of B|Br then Nval={F Aval B} in
                  if Oval==Nval then
                     {OpLoop Aval B Oval As Br}
                     Nval|{OpLoop Aval B Nval As Br}
               [] nil then
                  % Slightly nonoptimal: May give repeats if one stream is done
                  {Map As fun {$ A} {F A Bval} end}
         fun {FirstValue As}
            case As of A|Ar then A#Ar else if {IsInt As} then As#nil end end
         % Wait until each stream has its first value
         Aval#Anext={FirstValue As}
         Bval#Bnext={FirstValue Bs}
         Oval={F Aval Bval}
         % Execution loop
         Oval|{OpLoop Aval Bval Oval Anext Bnext}


% Second part: Global FRP synchronization

% Avoid glitches by using computation spaces to synchronize
% on suspension of all threads.  The output is read only when
% all threads are suspended, which avoids glitches.

% Return current last element of a stream
fun {End S}
   case S of X|S2 then
      if {IsDet S2} then {End S2} else X end

% Return unbound tail of a stream
fun {Tail S}
   if {IsDet S} then {Tail S.2} else S end

% One step of a calculation
% Inject a new input event; then calculate internally and
% possibly generate a new output event.  Avoid glitches
% by using askVerbose to wait until all threads inside the
% computation space are simultaneously suspended.  Then
% detect whether a new output event was generated or not.
proc {Step X}
   Z={Tail Bs} % Get unbound tail of output stream
   {Port.send P1 X} % Inject an input event
   {Wait {Space.askVerbose Sp}} % Wait for fixpoint
   % Detect if an output event was generated:
   if {IsDet Z} then {Browse outevent({End Bs})}
   else {Browse nooutevent} end

% Example calculation inside of a computation space
declare P1 As P2 Bs Sp in
{NewPort As P1} % Input event stream
{NewPort Bs P2} % Output event stream
Sp={ proc {$ _} Add Div Rs in
                 % Define calculation
                 % Example: r=(a div 2)+(a div 3)
                 Add={TwoArgOp fun {$ A B} A+B end}
                 Div={TwoArgOp fun {$ A B} A div B end}
                 Rs={Add {Div As 2} {Div As 3}}
                 thread for X in Rs do {Port.send P2 X} end end

% Example: Insert some input events
{Step 10}
{Step 11}
{Step 12}
{Step 13}
{Step 14}

mozart-users mailing list                      

Reply via email to