Index: lib/sqlalchemy/orm/query.py
===================================================================
--- lib/sqlalchemy/orm/query.py	(revision 3986)
+++ lib/sqlalchemy/orm/query.py	(working copy)
@@ -50,6 +50,7 @@
         self._limit = None
         self._statement = None
         self._params = {}
+        self._yield_per = None
         self._criterion = None
         self._having = None
         self._column_aggregate = None
@@ -106,7 +107,21 @@
         q = self._clone()
         q._current_path = path
         return q
+    
+    def yield_per(self, count):
+        """yield only ``count`` rows at a time.
         
+        WARNING: use this method with caution; result sets
+        which contain the same instance more than once may yield
+        the same instance twice in different batches, leading to 
+        attrbiutes or collections being overwritten or not
+        loaded properly.  Set this number high enough such that
+        instances can be fully loaded from a single result batch.
+        """
+        q = self._clone()
+        q._yield_per = count
+        return q
+        
     def get(self, ident, **kwargs):
         """Return an instance of the object based on the given
         identifier, or None if not found.
@@ -749,12 +764,12 @@
     
     def _execute_and_instances(self, querycontext):
         result = self.session.execute(querycontext.statement, params=self._params, mapper=self.mapper, instance=self._refresh_instance)
-        try:
-            return iter(self.instances(result, querycontext=querycontext))
-        finally:
-            result.close()
+        return self.iterate_instances(result, querycontext=querycontext)
 
     def instances(self, cursor, *mappers_or_columns, **kwargs):
+        return list(self.iterate_instances(cursor, *mappers_or_columns, **kwargs))
+        
+    def iterate_instances(self, cursor, *mappers_or_columns, **kwargs):
         session = self.session
 
         context = kwargs.pop('querycontext', None)
@@ -808,26 +823,36 @@
                 else:
                     raise exceptions.InvalidRequestError("Invalid column expression '%r'" % m)
 
-        context.progress = util.Set()    
-        if tuples:
-            rows = util.OrderedSet()
-            for row in cursor.fetchall():
-                rows.add(tuple(proc(context, row) for proc in process))
-        else:
-            rows = util.UniqueAppender([])
-            for row in cursor.fetchall():
-                rows.append(main(context, row))
+        while True:
+            context.progress = util.Set()    
+            if self._yield_per:
+                fetch = cursor.fetchmany(self._yield_per)
+            else:
+                fetch = cursor.fetchall()
+            
+            if not fetch:
+                return
+                    
+            if tuples:
+                rows = util.OrderedSet()
+                for row in fetch:
+                    rows.add(tuple(proc(context, row) for proc in process))
+            else:
+                rows = util.UniqueAppender([])
+                for row in fetch:
+                    rows.append(main(context, row))
 
-        if context.refresh_instance and context.only_load_props and context.refresh_instance in context.progress:
-            context.refresh_instance.commit(context.only_load_props)
-            context.progress.remove(context.refresh_instance)
+            if context.refresh_instance and context.only_load_props and context.refresh_instance in context.progress:
+                context.refresh_instance.commit(context.only_load_props)
+                context.progress.remove(context.refresh_instance)
 
-        for ii in context.progress:
-            context.attributes.get(('populating_mapper', ii), _state_mapper(ii))._post_instance(context, ii)
-            ii.commit_all()
+            for ii in context.progress:
+                context.attributes.get(('populating_mapper', ii), _state_mapper(ii))._post_instance(context, ii)
+                ii.commit_all()
+            
+            for row in rows:
+                yield row
 
-        return list(rows)
-
     def _get(self, key=None, ident=None, refresh_instance=None, lockmode=None, only_load_props=None):
         lockmode = lockmode or self._lockmode
         if not self._populate_existing and not refresh_instance and not self.mapper.always_refresh and lockmode is None:
