Hi Gary,

What do you think of this implementation for the multi-processor
package?  This is very similar in spirit to how I designed the IO
redirection in the relax controller window of the GUI.  I think I have
all bases covered - uni-processor, multi-processor on one machine
(multi-cpu and/or multi-core), multi-processor on clusters, and all
fabrics within the prompt, script and GUI modes.  This should also be
abstracted so that all multi-processor fabrics are handled in the same
way.  And it should be unaffected by any type of IO redirection,
teeing, etc on the master.  This makes the multi-processor package
avoid any interference to the IO manipulation set up by the importing
program.  For testing the uni-processor, I use:

$ ./relax test_suite/system_tests/scripts/model_free/omp_model_free.py

For the multi-processor I use:

$ mpirun -np 6 ./relax --multi mpi4py
test_suite/system_tests/scripts/model_free/omp_model_free.py

And for the GUI, I use:

$ mpirun -np 6 ./relax --multi mpi4py -g

then chose the menu 'User functions->script' and select the file
test_suite/system_tests/scripts/model_free/omp_model_free.py.  I still
have some debugging of the IO streams to do for when you run the
multi-processor in the prompt mode:

$ mpirun -np 6 ./relax --multi mpi4py

Cheers,

Edward




On 28 February 2012 18:51,  <[email protected]> wrote:
> Author: bugman
> Date: Tue Feb 28 18:51:09 2012
> New Revision: 15395
>
> URL: http://svn.gna.org/viewcvs/relax?rev=15395&view=rev
> Log:
> Big fix for the IO streams when running in the multi-processor mpi4py mode!
>
> This affects all multi-processor implementations, excluding the standard 
> uni-processor.  The entire
> stream capture design has been overhauled.  The IO streams of the master 
> processor are no longer
> touched, whereas those of the slaves are captured, stored in a list of lists 
> where the original
> 'S  |' and 'S E|' tokens are prepended to each line as well as a number 
> identifying the stream,
> stored in the results command object, and unpacked and sent to sys.stdout and 
> sys.stderr in the
> correct order by the master.
>
> This fixes the implementation for both running on the GUI (the slaves are no 
> longer dumping text to
> the terminal where the GUI was launched from), and on clusters where the IO 
> is dumped on the
> processing node rather than back on the master node.
>
>
> Modified:
>    1.3/multi/multi_processor_base.py
>    1.3/multi/processor.py
>    1.3/multi/processor_io.py
>
> Modified: 1.3/multi/multi_processor_base.py
> URL: 
> http://svn.gna.org/viewcvs/relax/1.3/multi/multi_processor_base.py?rev=15395&r1=15394&r2=15395&view=diff
> ==============================================================================
> --- 1.3/multi/multi_processor_base.py (original)
> +++ 1.3/multi/multi_processor_base.py Tue Feb 28 18:51:09 2012
> @@ -1,7 +1,7 @@
>  ###############################################################################
>  #                                                                            
>  #
>  # Copyright (C) 2007 Gary S Thompson (https://gna.org/users/varioustoxins)   
>  #
> -# Copyright (C) 2011 Edward d'Auvergne                                       
>  #
> +# Copyright (C) 2011-2012 Edward d'Auvergne                                  
>  #
>  #                                                                            
>  #
>  # This file is part of the program relax.                                    
>  #
>  #                                                                            
>  #
> @@ -46,13 +46,36 @@
>
>
>  class Batched_result_command(Result_command):
> -    def __init__(self, processor, result_commands, completed=True):
> +    def __init__(self, processor, result_commands, io_data=None, 
> completed=True):
>         super(Batched_result_command, self).__init__(processor=processor, 
> completed=completed)
>         self.result_commands = result_commands
>
> +        # Store the IO data to print out via the run() method called by the 
> master.
> +        self.io_data = io_data
> +
>
>     def run(self, processor, batched_memo):
> +        """The results command to be run by the master.
> +
> +        @param processor:       The processor instance.
> +        @type processor:        Processor instance
> +        @param batched_memo:    The batched memo object.
> +        @type batched_memo:     Memo instance
> +        """
> +
> +        # First check that we are on the master.
>         processor.assert_on_master()
> +
> +        # Unravel the IO stream data on the master in the correct order.
> +        for line, stream in self.io_data:
> +            if stream == 0:
> +                sys.stdout.write(line)
> +            else:
> +                sys.stderr.write(line)
> +
> +        if batched_memo != None:
> +            msg = "batched result commands shouldn't have memo values, memo: 
> " + repr(batched_memo)
> +
>         if batched_memo != None:
>             msg = "batched result commands shouldn't have memo values, memo: 
> " + repr(batched_memo)
>             raise ValueError(msg)
> @@ -154,9 +177,6 @@
>
>         # Execute the base class method.
>         super(Multi_processor, self).pre_run()
> -
> -        # Capture the standard IO streams for the master and slaves.
> -        self.capture_stdio()
>
>
>     #FIXME: fill out generic result processing move to processor
> @@ -230,13 +250,18 @@
>                         self.result_list = None
>
>                     for i, command in enumerate(commands):
> -
> -                        #raise Exception('dummy')
> +                        # Capture the standard IO streams for the slaves.
> +                        self.stdio_capture()
> +
> +                        # Execute the calculation.
>                         completed = (i == last_command)
>                         command.run(self, completed)
>
> +                        # Restore the IO.
> +                        self.stdio_restore()
> +
>                     if self.batched_returns:
> -                        
> self.return_object(Batched_result_command(processor=self, 
> result_commands=self.result_list))
> +                        
> self.return_object(Batched_result_command(processor=self, 
> result_commands=self.result_list, io_data=self.io_data))
>                         self.result_list = None
>
>                 except:
>
> Modified: 1.3/multi/processor.py
> URL: 
> http://svn.gna.org/viewcvs/relax/1.3/multi/processor.py?rev=15395&r1=15394&r2=15395&view=diff
> ==============================================================================
> --- 1.3/multi/processor.py (original)
> +++ 1.3/multi/processor.py Tue Feb 28 18:51:09 2012
> @@ -1,7 +1,7 @@
>  ###############################################################################
>  #                                                                            
>  #
>  # Copyright (C) 2007 Gary S Thompson (https://gna.org/users/varioustoxins)   
>  #
> -# Copyright (C) 2011 Edward d'Auvergne                                       
>  #
> +# Copyright (C) 2011-2012 Edward d'Auvergne                                  
>  #
>  #                                                                            
>  #
>  # This file is part of the program relax.                                    
>  #
>  #                                                                            
>  #
> @@ -103,7 +103,7 @@
>  import traceback, textwrap
>
>  # relax module imports.
> -from multi.processor_io import PrependStringIO, IO_filter
> +from multi.processor_io import IO_filter, Redirect_text
>  from relax_errors import RelaxError
>
>
> @@ -419,9 +419,6 @@
>         self._processor_size = processor_size
>         '''Number of slave processors available in this processor.'''
>
> -        # Capture the STDIO.
> -        self.std_stdio_capture()
> -
>
>     def abort(self):
>         '''Shutdown the multi processor in exceptional conditions - designed 
> for overriding.
> @@ -462,32 +459,6 @@
>         raise_unimplemented(self.add_to_queue)
>
>
> -    def capture_stdio(self, stdio_capture=None):
> -        '''Enable capture of the STDOUT and STDERR by self.stdio_capture or 
> user supplied streams.
> -
> -        @note:  Both or neither stream has to be replaced you can't just 
> replace one!
> -
> -        @keyword stdio_capture: A pair of file like objects used to replace 
> sys.stdout and sys.stderr respectively.
> -        @type stdio_capture:    list of two file-like objects
> -        '''
> -
> -        # Store the original STDOUT and STDERR for restoring later on.
> -        self.orig_stdout = sys.stdout
> -        self.orig_stderr = sys.stderr
> -
> -        # Default to self.stdio_capture if stdio_capture is not supplied.
> -        if stdio_capture == None:
> -            stdio_capture = self.stdio_capture
> -
> -        # First flush.
> -        sys.stdout.flush()
> -        sys.stderr.flush()
> -
> -        # Then redirect IO.
> -        sys.stdout = stdio_capture[0]
> -        sys.stderr = stdio_capture[1]
> -
> -
>     # FIXME is this used?
>  #    def exit(self):
>  #        raise_unimplemented(self.exit)
> @@ -523,16 +494,6 @@
>         '''
>
>         raise_unimplemented(self.get_name)
> -
> -
> -    def get_stdio_capture(self):
> -        '''Get the file like objects currently replacing sys.stdout and 
> sys.stderr.
> -
> -        @return:    The file like objects currently replacing sys.stdout and 
> sys.stderr.
> -        @rtype:     tuple of two file-like objects
> -        '''
> -
> -        return self.stdio_capture
>
>
>     def get_stdio_pre_strings(self):
> @@ -713,36 +674,34 @@
>         raise_unimplemented(self.run_queue)
>
>
> -    def std_stdio_capture(self, pre_strings=None):
> -        '''Get the default sys.stdout and sys.stderr replacements.
> -
> -        On the master the replacement prepend output with 'MM S]' or MM E]' 
> for the STDOUT and STDERR channels respectively on slaves the outputs are 
> replaced by StringIO objects that prepend 'NN S]' or NN E]' for STDOUT and 
> STDERR where NN is the rank of the processor.
> -
> -        @note:  By default STDOUT and STDERR are conjoined as otherwise the 
> context of STDOUT and STDERR messages are lost.
> -        @todo:  Improve segregation of sys.sdout and sys.stderr.
> -
> -        @keyword pre_strings:   Pre strings for the sys.stdout and 
> sys.stderr channels.
> -        @type pre_strings:      list of 2 str
> -        @return:                File like objects to replace STDOUT and 
> STDERR respectively in order.
> -        @rtype:                 tuple of two file-like objects
> -        '''
> +    def stdio_capture(self):
> +        """Enable capture of the STDOUT and STDERR.
> +
> +        This is currently used to capture the IO streams of the slaves to 
> return back to the master.
> +        """
> +
> +        # Store the original STDOUT and STDERR for restoring later on.
> +        self.orig_stdout = sys.stdout
> +        self.orig_stderr = sys.stderr
> +
> +        # The data object.
> +        self.io_data = []
>
>         # Get the strings to prepend to the IO streams.
> -        if pre_strings == None:
> -            pre_strings = self.get_stdio_pre_strings()
> -
> -        # The master processor.
> -        if self.rank() == 0:
> -            stdout_capture = IO_filter(pre_strings[0], sys.stdout)
> -            stderr_capture = IO_filter(pre_strings[1], sys.stderr)
> -
> -        # The slaves.
> -        else:
> -            stdout_capture = PrependStringIO(pre_strings[0])
> -            stderr_capture = PrependStringIO(pre_strings[1], 
> stream=stdout_capture)
> -
> -        # Store the captured IO streams.
> -        self.stdio_capture = (stdout_capture, stderr_capture)
> +        pre_strings = self.get_stdio_pre_strings()
> +
> +        # Then redirect IO.
> +        sys.stdout = Redirect_text(self.io_data, token=pre_strings[0], 
> stream=0)
> +        sys.stderr = Redirect_text(self.io_data, token=pre_strings[1], 
> stream=1)
> +
> +
> +    def stdio_restore(self):
> +        """Restore the original STDOUT and STDERR streams."""
> +
> +        # Restore the original streams.
> +        sys.stdout = self.orig_stdout
> +        sys.stderr = self.orig_stderr
> +
>
>
>  class Processor_box(object):
> @@ -901,7 +860,6 @@
>     def __init__(self, processor, string, completed):
>         '''Initialiser.
>
> -        @see:   multi.processor.Processor.std_stdio_capture.
>         @todo:  Check inherited parameters are documented.
>
>         @param string:  A string to return the master processor for output to 
> STDOUT (note the
>
> Modified: 1.3/multi/processor_io.py
> URL: 
> http://svn.gna.org/viewcvs/relax/1.3/multi/processor_io.py?rev=15395&r1=15394&r2=15395&view=diff
> ==============================================================================
> --- 1.3/multi/processor_io.py (original)
> +++ 1.3/multi/processor_io.py Tue Feb 28 18:51:09 2012
> @@ -1,6 +1,7 @@
>  ###############################################################################
>  #                                                                            
>  #
>  # Copyright (C) 2007 Gary S Thompson (https://gna.org/users/varioustoxins)   
>  #
> +# Copyright (C) 2012 Edward d'Auvergne                                       
>  #
>  #                                                                            
>  #
>  # This file is part of the program relax.                                    
>  #
>  #                                                                            
>  #
> @@ -121,3 +122,44 @@
>         # Flush both STDOUT and STDERR.
>         sys.stdout.flush()
>         sys.stderr.flush()
> +
> +
> +
> +class Redirect_text(object):
> +    """Store the data of the IO streams, prepending a token to each line of 
> written text."""
> +
> +    def __init__(self, data, token='', stream=0):
> +        """Set up the text redirection object.
> +
> +        @param data:        The data object to store all IO in.
> +        @type data:         list of lists
> +        @param token:       The string to add to the end of all newlines.
> +        @type token:        str
> +        @keyword stream:    The type of steam (0 for STDOUT and 1 for 
> STDERR).
> +        @type stream:       int
> +        """
> +
> +        # Store the args.
> +        self.data = data
> +        self.token = token
> +        self.stream = stream
> +
> +
> +    def flush(self):
> +        """Dummy flush method."""
> +
> +
> +    def write(self, string):
> +        """Replacement write() method.
> +
> +        This prepends the token to each line of STDOUT and STDERR and stores 
> the result together with the stream number.
> +
> +        @param string:  The text to write.
> +        @type string:   str
> +        """
> +
> +        # Append the token to all newline chars.
> +        string = string.replace('\n', '\n' + self.token)
> +
> +        # Store the text.
> +        self.data.append([string, self.stream])
>
>
> _______________________________________________
> relax (http://nmr-relax.com)
>
> This is the relax-commits mailing list
> [email protected]
>
> To unsubscribe from this list, get a password
> reminder, or change your subscription options,
> visit the list information page at
> https://mail.gna.org/listinfo/relax-commits

_______________________________________________
relax (http://nmr-relax.com)

This is the relax-devel mailing list
[email protected]

To unsubscribe from this list, get a password
reminder, or change your subscription options,
visit the list information page at
https://mail.gna.org/listinfo/relax-devel

Reply via email to