Last month I posted some example code to show how to do
Functional Reactive Programming in Oz.  That code was
incomplete since it did not avoid "glitches".  For example,
in the expression (a + (a div 3)), if a new event arrives for a,
then a single new output has to be calculated.  Because of
thread scheduling, we might get a temporary wrong output
(e.g., if the + is scheduled before the div).

Here is an updated version of FRP that avoids this problem.
It correctly does global synchronization to avoid glitches.
It works as follows: when a new input event arrives, then
the computation is allowed to calculate until all its threads
suspend simultaneously.  At that point, we know that the
output is correct.  We use computation spaces to wait until
all threads suspend simultaneously.

Peter

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

% Functional Reactive Programming in Oz

% This version implements FRP with a global synchronization
% that avoids glitches

% Author: Peter Van Roy
% Date: May 30, 2006

% FRP is a generalization of functional programming where an
% argument value can be replaced by a more recent value (this
% is called an "event") and any changes have to be propagated
% to other functions. If a new input value arrives and the
% output does not change then no new output value is generated.
% This is a kind of dataflow programming.  It can handle
% nondeterminism, so it is strictly more expressive than
% declarative concurrency.  It can be expressed with two new
% concepts: (1) nondeterministic choice and (2) synchronizing
% on a set of threads all suspending at once.  The first is
% implemented here using WaitTwo and the second is implemented
% here using computation spaces with the askVerbose operation.

% FRP is similar in expressiveness to concurrent logic
% programming.  The main difference is that FRP adds scheduling
% constraints to avoid temporarily violating invariants.

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

% First part: Local FRP operation

% Function with two input streams and one output stream
% If a new event arrives on the input, a new event is generated
% on the output, but only if the output changes

% Waits until at least one argument is bound
% Fairly returns 1 if X is bound or 2 if Y is bound
declare
fun {WaitTwo X Y}
   {Record.waitOr X#Y}
end

% When a stream terminates with nil, it means there are no more changes
% Remembers last output and generates new output only if it is different
declare
fun {TwoArgOp F}
   fun {$ As Bs}
      thread
         fun {OpLoop Aval Bval Oval As Bs}
            case {WaitTwo As Bs}
            of 1 then
               case As of A|Ar then Nval={F A Bval} in
                  if Oval==Nval then
                     {OpLoop A Bval Oval Ar Bs}
                  else
                     Nval|{OpLoop A Bval Nval Ar Bs}
                  end
            [] nil then
               % Slightly nonoptimal: May give repeats if one stream is done
               % (Fixing this is an exercise for the reader!)
               {Map Bs fun {$ B} {F Aval B} end}
            end
            [] 2 then
               case Bs of B|Br then Nval={F Aval B} in
                  if Oval==Nval then
                     {OpLoop Aval B Oval As Br}
                  else
                     Nval|{OpLoop Aval B Nval As Br}
                  end
               [] nil then
                  % Slightly nonoptimal: May give repeats if one stream is done
                  {Map As fun {$ A} {F A Bval} end}
               end
            end
         end
         fun {FirstValue As}
            case As of A|Ar then A#Ar else if {IsInt As} then As#nil end end
         end
         % Wait until each stream has its first value
         Aval#Anext={FirstValue As}
         Bval#Bnext={FirstValue Bs}
         Oval={F Aval Bval}
      in
         % Execution loop
         Oval|{OpLoop Aval Bval Oval Anext Bnext}
      end
   end
end

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

% Second part: Global FRP synchronization

% Avoid glitches by using computation spaces to synchronize
% on suspension of all threads.  The output is read only when
% all threads are suspended, which avoids glitches.

% Return current last element of a stream
declare
fun {End S}
   case S of X|S2 then
      if {IsDet S2} then {End S2} else X end
   end
end

% Return unbound tail of a stream
declare
fun {Tail S}
   if {IsDet S} then {Tail S.2} else S end
end

% One step of a calculation
% Inject a new input event; then calculate internally and
% possibly generate a new output event.  Avoid glitches
% by using askVerbose to wait until all threads inside the
% computation space are simultaneously suspended.  Then
% detect whether a new output event was generated or not.
declare
proc {Step X}
   Z={Tail Bs} % Get unbound tail of output stream
in
   {Port.send P1 X} % Inject an input event
   {Wait {Space.askVerbose Sp}} % Wait for fixpoint
   % Detect if an output event was generated:
   if {IsDet Z} then {Browse outevent({End Bs})}
   else {Browse nooutevent} end
end

% Example calculation inside of a computation space
declare P1 As P2 Bs Sp in
{NewPort As P1} % Input event stream
{NewPort Bs P2} % Output event stream
Sp={Space.new proc {$ _} Add Div Rs in
                 % Define calculation
                 % Example: r=(a div 2)+(a div 3)
                 Add={TwoArgOp fun {$ A B} A+B end}
                 Div={TwoArgOp fun {$ A B} A div B end}
                 Rs={Add {Div As 2} {Div As 3}}
                 thread for X in Rs do {Port.send P2 X} end end
              end}

% Example: Insert some input events
{Step 10}
{Step 11}
{Step 12}
{Step 13}
{Step 14}

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
_________________________________________________________________________________
mozart-users mailing list                               
mozart-users@ps.uni-sb.de
http://www.mozart-oz.org/mailman/listinfo/mozart-users

Reply via email to