Author: bugman
Date: Fri Mar 23 12:37:27 2012
New Revision: 15613
URL: http://svn.gna.org/viewcvs/relax?rev=15613&view=rev
Log:
Shifted the run_command_queue() and run_queue() methods from the
Multi_processor to Processor class.
Modified:
1.3/multi/multi_processor_base.py
1.3/multi/processor.py
Modified: 1.3/multi/multi_processor_base.py
URL:
http://svn.gna.org/viewcvs/relax/1.3/multi/multi_processor_base.py?rev=15613&r1=15612&r2=15613&view=diff
==============================================================================
--- 1.3/multi/multi_processor_base.py (original)
+++ 1.3/multi/multi_processor_base.py Fri Mar 23 12:37:27 2012
@@ -183,69 +183,6 @@
def return_result_command(self, result_object):
raise_unimplemented(self.slave_queue_result)
-
-
- #TODO: move up a level add virtaul send and revieve functions
- def run_command_queue(self, queue):
- """Process all commands on the queue and wait for completion.
-
- @param queue: The command queue.
- @type queue: list of Command instances
- """
-
- # This must only be run on the master processor.
- self.assert_on_master()
-
- running_set = set()
- idle_set = set([i for i in range(1, self.processor_size()+1)])
-
- if self.threaded_result_processing:
- result_queue = Threaded_result_queue(self)
- else:
- result_queue = Immediate_result_queue(self)
-
- while len(queue) != 0:
-
- while len(idle_set) != 0:
- if len(queue) != 0:
- command = queue.pop()
- dest = idle_set.pop()
- self.master_queue_command(command=command, dest=dest)
- running_set.add(dest)
- else:
- break
-
- # Loop until the queue of calculations is depleted.
- while len(running_set) != 0:
- # Get the result.
- result = self.master_receive_result()
-
- # Debugging print out.
- if verbosity.level():
- print('\nIdle set: %s' % idle_set)
- print('Running set: %s' % running_set)
-
- # Shift the processor rank to the idle set.
- if result.completed:
- idle_set.add(result.rank)
- running_set.remove(result.rank)
-
- # Add to the result queue for instant or threaded processing.
- result_queue.put(result)
-
- # Process the threaded results.
- if self.threaded_result_processing:
- result_queue.run_all()
-
-
- #TODO: move up a level
- def run_queue(self):
- #FIXME: need a finally here to cleanup exceptions states
- lqueue = self.chunk_queue(self.command_queue)
- self.run_command_queue(lqueue)
-
- del self.command_queue[:]
- self.memo_map.clear()
def slave_receive_commands(self):
Modified: 1.3/multi/processor.py
URL:
http://svn.gna.org/viewcvs/relax/1.3/multi/processor.py?rev=15613&r1=15612&r2=15613&view=diff
==============================================================================
--- 1.3/multi/processor.py (original)
+++ 1.3/multi/processor.py Fri Mar 23 12:37:27 2012
@@ -103,6 +103,7 @@
# multi module imports.
from multi.misc import Capturing_exception, raise_unimplemented, Verbosity;
verbosity = Verbosity()
+from multi.multi_processor_base import Threaded_result_queue
from multi.processor_io import Redirect_text
from multi.result_commands import Batched_result_command, Null_result_command,
Result_exception
from multi.slave_commands import Slave_storage_command
@@ -536,6 +537,58 @@
self.run_command_queue(queue)
+ def run_command_queue(self, queue):
+ """Process all commands on the queue and wait for completion.
+
+ @param queue: The command queue.
+ @type queue: list of Command instances
+ """
+
+ # This must only be run on the master processor.
+ self.assert_on_master()
+
+ running_set = set()
+ idle_set = set([i for i in range(1, self.processor_size()+1)])
+
+ if self.threaded_result_processing:
+ result_queue = Threaded_result_queue(self)
+ else:
+ result_queue = Immediate_result_queue(self)
+
+ while len(queue) != 0:
+
+ while len(idle_set) != 0:
+ if len(queue) != 0:
+ command = queue.pop()
+ dest = idle_set.pop()
+ self.master_queue_command(command=command, dest=dest)
+ running_set.add(dest)
+ else:
+ break
+
+ # Loop until the queue of calculations is depleted.
+ while len(running_set) != 0:
+ # Get the result.
+ result = self.master_receive_result()
+
+ # Debugging print out.
+ if verbosity.level():
+ print('\nIdle set: %s' % idle_set)
+ print('Running set: %s' % running_set)
+
+ # Shift the processor rank to the idle set.
+ if result.completed:
+ idle_set.add(result.rank)
+ running_set.remove(result.rank)
+
+ # Add to the result queue for instant or threaded processing.
+ result_queue.put(result)
+
+ # Process the threaded results.
+ if self.threaded_result_processing:
+ result_queue.run_all()
+
+
def run_queue(self):
"""Run the processor queue - an abstract method.
@@ -543,7 +596,12 @@
thread to block until the command has completed.
"""
- raise_unimplemented(self.run_queue)
+ #FIXME: need a finally here to cleanup exceptions states
+ lqueue = self.chunk_queue(self.command_queue)
+ self.run_command_queue(lqueue)
+
+ del self.command_queue[:]
+ self.memo_map.clear()
def stdio_capture(self):
_______________________________________________
relax (http://nmr-relax.com)
This is the relax-commits mailing list
[email protected]
To unsubscribe from this list, get a password
reminder, or change your subscription options,
visit the list information page at
https://mail.gna.org/listinfo/relax-commits