Hi Gary, What do you think of this implementation for the multi-processor package? This is very similar in spirit to how I designed the IO redirection in the relax controller window of the GUI. I think I have all bases covered - uni-processor, multi-processor on one machine (multi-cpu and/or multi-core), multi-processor on clusters, and all fabrics within the prompt, script and GUI modes. This should also be abstracted so that all multi-processor fabrics are handled in the same way. And it should be unaffected by any type of IO redirection, teeing, etc on the master. This makes the multi-processor package avoid any interference to the IO manipulation set up by the importing program. For testing the uni-processor, I use:
$ ./relax test_suite/system_tests/scripts/model_free/omp_model_free.py For the multi-processor I use: $ mpirun -np 6 ./relax --multi mpi4py test_suite/system_tests/scripts/model_free/omp_model_free.py And for the GUI, I use: $ mpirun -np 6 ./relax --multi mpi4py -g then chose the menu 'User functions->script' and select the file test_suite/system_tests/scripts/model_free/omp_model_free.py. I still have some debugging of the IO streams to do for when you run the multi-processor in the prompt mode: $ mpirun -np 6 ./relax --multi mpi4py Cheers, Edward On 28 February 2012 18:51, <[email protected]> wrote: > Author: bugman > Date: Tue Feb 28 18:51:09 2012 > New Revision: 15395 > > URL: http://svn.gna.org/viewcvs/relax?rev=15395&view=rev > Log: > Big fix for the IO streams when running in the multi-processor mpi4py mode! > > This affects all multi-processor implementations, excluding the standard > uni-processor. The entire > stream capture design has been overhauled. The IO streams of the master > processor are no longer > touched, whereas those of the slaves are captured, stored in a list of lists > where the original > 'S |' and 'S E|' tokens are prepended to each line as well as a number > identifying the stream, > stored in the results command object, and unpacked and sent to sys.stdout and > sys.stderr in the > correct order by the master. > > This fixes the implementation for both running on the GUI (the slaves are no > longer dumping text to > the terminal where the GUI was launched from), and on clusters where the IO > is dumped on the > processing node rather than back on the master node. > > > Modified: > 1.3/multi/multi_processor_base.py > 1.3/multi/processor.py > 1.3/multi/processor_io.py > > Modified: 1.3/multi/multi_processor_base.py > URL: > http://svn.gna.org/viewcvs/relax/1.3/multi/multi_processor_base.py?rev=15395&r1=15394&r2=15395&view=diff > ============================================================================== > --- 1.3/multi/multi_processor_base.py (original) > +++ 1.3/multi/multi_processor_base.py Tue Feb 28 18:51:09 2012 > @@ -1,7 +1,7 @@ > ############################################################################### > # > # > # Copyright (C) 2007 Gary S Thompson (https://gna.org/users/varioustoxins) > # > -# Copyright (C) 2011 Edward d'Auvergne > # > +# Copyright (C) 2011-2012 Edward d'Auvergne > # > # > # > # This file is part of the program relax. > # > # > # > @@ -46,13 +46,36 @@ > > > class Batched_result_command(Result_command): > - def __init__(self, processor, result_commands, completed=True): > + def __init__(self, processor, result_commands, io_data=None, > completed=True): > super(Batched_result_command, self).__init__(processor=processor, > completed=completed) > self.result_commands = result_commands > > + # Store the IO data to print out via the run() method called by the > master. > + self.io_data = io_data > + > > def run(self, processor, batched_memo): > + """The results command to be run by the master. > + > + @param processor: The processor instance. > + @type processor: Processor instance > + @param batched_memo: The batched memo object. > + @type batched_memo: Memo instance > + """ > + > + # First check that we are on the master. > processor.assert_on_master() > + > + # Unravel the IO stream data on the master in the correct order. > + for line, stream in self.io_data: > + if stream == 0: > + sys.stdout.write(line) > + else: > + sys.stderr.write(line) > + > + if batched_memo != None: > + msg = "batched result commands shouldn't have memo values, memo: > " + repr(batched_memo) > + > if batched_memo != None: > msg = "batched result commands shouldn't have memo values, memo: > " + repr(batched_memo) > raise ValueError(msg) > @@ -154,9 +177,6 @@ > > # Execute the base class method. > super(Multi_processor, self).pre_run() > - > - # Capture the standard IO streams for the master and slaves. > - self.capture_stdio() > > > #FIXME: fill out generic result processing move to processor > @@ -230,13 +250,18 @@ > self.result_list = None > > for i, command in enumerate(commands): > - > - #raise Exception('dummy') > + # Capture the standard IO streams for the slaves. > + self.stdio_capture() > + > + # Execute the calculation. > completed = (i == last_command) > command.run(self, completed) > > + # Restore the IO. > + self.stdio_restore() > + > if self.batched_returns: > - > self.return_object(Batched_result_command(processor=self, > result_commands=self.result_list)) > + > self.return_object(Batched_result_command(processor=self, > result_commands=self.result_list, io_data=self.io_data)) > self.result_list = None > > except: > > Modified: 1.3/multi/processor.py > URL: > http://svn.gna.org/viewcvs/relax/1.3/multi/processor.py?rev=15395&r1=15394&r2=15395&view=diff > ============================================================================== > --- 1.3/multi/processor.py (original) > +++ 1.3/multi/processor.py Tue Feb 28 18:51:09 2012 > @@ -1,7 +1,7 @@ > ############################################################################### > # > # > # Copyright (C) 2007 Gary S Thompson (https://gna.org/users/varioustoxins) > # > -# Copyright (C) 2011 Edward d'Auvergne > # > +# Copyright (C) 2011-2012 Edward d'Auvergne > # > # > # > # This file is part of the program relax. > # > # > # > @@ -103,7 +103,7 @@ > import traceback, textwrap > > # relax module imports. > -from multi.processor_io import PrependStringIO, IO_filter > +from multi.processor_io import IO_filter, Redirect_text > from relax_errors import RelaxError > > > @@ -419,9 +419,6 @@ > self._processor_size = processor_size > '''Number of slave processors available in this processor.''' > > - # Capture the STDIO. > - self.std_stdio_capture() > - > > def abort(self): > '''Shutdown the multi processor in exceptional conditions - designed > for overriding. > @@ -462,32 +459,6 @@ > raise_unimplemented(self.add_to_queue) > > > - def capture_stdio(self, stdio_capture=None): > - '''Enable capture of the STDOUT and STDERR by self.stdio_capture or > user supplied streams. > - > - @note: Both or neither stream has to be replaced you can't just > replace one! > - > - @keyword stdio_capture: A pair of file like objects used to replace > sys.stdout and sys.stderr respectively. > - @type stdio_capture: list of two file-like objects > - ''' > - > - # Store the original STDOUT and STDERR for restoring later on. > - self.orig_stdout = sys.stdout > - self.orig_stderr = sys.stderr > - > - # Default to self.stdio_capture if stdio_capture is not supplied. > - if stdio_capture == None: > - stdio_capture = self.stdio_capture > - > - # First flush. > - sys.stdout.flush() > - sys.stderr.flush() > - > - # Then redirect IO. > - sys.stdout = stdio_capture[0] > - sys.stderr = stdio_capture[1] > - > - > # FIXME is this used? > # def exit(self): > # raise_unimplemented(self.exit) > @@ -523,16 +494,6 @@ > ''' > > raise_unimplemented(self.get_name) > - > - > - def get_stdio_capture(self): > - '''Get the file like objects currently replacing sys.stdout and > sys.stderr. > - > - @return: The file like objects currently replacing sys.stdout and > sys.stderr. > - @rtype: tuple of two file-like objects > - ''' > - > - return self.stdio_capture > > > def get_stdio_pre_strings(self): > @@ -713,36 +674,34 @@ > raise_unimplemented(self.run_queue) > > > - def std_stdio_capture(self, pre_strings=None): > - '''Get the default sys.stdout and sys.stderr replacements. > - > - On the master the replacement prepend output with 'MM S]' or MM E]' > for the STDOUT and STDERR channels respectively on slaves the outputs are > replaced by StringIO objects that prepend 'NN S]' or NN E]' for STDOUT and > STDERR where NN is the rank of the processor. > - > - @note: By default STDOUT and STDERR are conjoined as otherwise the > context of STDOUT and STDERR messages are lost. > - @todo: Improve segregation of sys.sdout and sys.stderr. > - > - @keyword pre_strings: Pre strings for the sys.stdout and > sys.stderr channels. > - @type pre_strings: list of 2 str > - @return: File like objects to replace STDOUT and > STDERR respectively in order. > - @rtype: tuple of two file-like objects > - ''' > + def stdio_capture(self): > + """Enable capture of the STDOUT and STDERR. > + > + This is currently used to capture the IO streams of the slaves to > return back to the master. > + """ > + > + # Store the original STDOUT and STDERR for restoring later on. > + self.orig_stdout = sys.stdout > + self.orig_stderr = sys.stderr > + > + # The data object. > + self.io_data = [] > > # Get the strings to prepend to the IO streams. > - if pre_strings == None: > - pre_strings = self.get_stdio_pre_strings() > - > - # The master processor. > - if self.rank() == 0: > - stdout_capture = IO_filter(pre_strings[0], sys.stdout) > - stderr_capture = IO_filter(pre_strings[1], sys.stderr) > - > - # The slaves. > - else: > - stdout_capture = PrependStringIO(pre_strings[0]) > - stderr_capture = PrependStringIO(pre_strings[1], > stream=stdout_capture) > - > - # Store the captured IO streams. > - self.stdio_capture = (stdout_capture, stderr_capture) > + pre_strings = self.get_stdio_pre_strings() > + > + # Then redirect IO. > + sys.stdout = Redirect_text(self.io_data, token=pre_strings[0], > stream=0) > + sys.stderr = Redirect_text(self.io_data, token=pre_strings[1], > stream=1) > + > + > + def stdio_restore(self): > + """Restore the original STDOUT and STDERR streams.""" > + > + # Restore the original streams. > + sys.stdout = self.orig_stdout > + sys.stderr = self.orig_stderr > + > > > class Processor_box(object): > @@ -901,7 +860,6 @@ > def __init__(self, processor, string, completed): > '''Initialiser. > > - @see: multi.processor.Processor.std_stdio_capture. > @todo: Check inherited parameters are documented. > > @param string: A string to return the master processor for output to > STDOUT (note the > > Modified: 1.3/multi/processor_io.py > URL: > http://svn.gna.org/viewcvs/relax/1.3/multi/processor_io.py?rev=15395&r1=15394&r2=15395&view=diff > ============================================================================== > --- 1.3/multi/processor_io.py (original) > +++ 1.3/multi/processor_io.py Tue Feb 28 18:51:09 2012 > @@ -1,6 +1,7 @@ > ############################################################################### > # > # > # Copyright (C) 2007 Gary S Thompson (https://gna.org/users/varioustoxins) > # > +# Copyright (C) 2012 Edward d'Auvergne > # > # > # > # This file is part of the program relax. > # > # > # > @@ -121,3 +122,44 @@ > # Flush both STDOUT and STDERR. > sys.stdout.flush() > sys.stderr.flush() > + > + > + > +class Redirect_text(object): > + """Store the data of the IO streams, prepending a token to each line of > written text.""" > + > + def __init__(self, data, token='', stream=0): > + """Set up the text redirection object. > + > + @param data: The data object to store all IO in. > + @type data: list of lists > + @param token: The string to add to the end of all newlines. > + @type token: str > + @keyword stream: The type of steam (0 for STDOUT and 1 for > STDERR). > + @type stream: int > + """ > + > + # Store the args. > + self.data = data > + self.token = token > + self.stream = stream > + > + > + def flush(self): > + """Dummy flush method.""" > + > + > + def write(self, string): > + """Replacement write() method. > + > + This prepends the token to each line of STDOUT and STDERR and stores > the result together with the stream number. > + > + @param string: The text to write. > + @type string: str > + """ > + > + # Append the token to all newline chars. > + string = string.replace('\n', '\n' + self.token) > + > + # Store the text. > + self.data.append([string, self.stream]) > > > _______________________________________________ > relax (http://nmr-relax.com) > > This is the relax-commits mailing list > [email protected] > > To unsubscribe from this list, get a password > reminder, or change your subscription options, > visit the list information page at > https://mail.gna.org/listinfo/relax-commits _______________________________________________ relax (http://nmr-relax.com) This is the relax-devel mailing list [email protected] To unsubscribe from this list, get a password reminder, or change your subscription options, visit the list information page at https://mail.gna.org/listinfo/relax-devel

