Yicong-Huang commented on code in PR #4685:
URL: https://github.com/apache/texera/pull/4685#discussion_r3189062266


##########
amber/src/main/python/core/runnables/data_processor.py:
##########
@@ -75,77 +75,58 @@ def run(self) -> None:
                 self.process_tuple()
 
     def process_internal_marker(self, internal_marker: InternalMarker) -> None:
-        try:
-            executor = self._context.executor_manager.executor
-            port_id = 
self._context.tuple_processing_manager.get_input_port_id()
-            with replace_print(
-                self._context.worker_id,
-                self._context.console_message_manager.print_buf,
-            ):
-                if isinstance(internal_marker, StartChannel):
-                    
self._set_output_state(executor.produce_state_on_start(port_id))
-                elif isinstance(internal_marker, EndChannel):
-                    
self._set_output_state(executor.produce_state_on_finish(port_id))
-                    self._switch_context()
-                    self._set_output_tuple(executor.on_finish(port_id))
-
-        except Exception as err:
-            logger.exception(err)
-            exc_info = sys.exc_info()
-            self._context.exception_manager.set_exception_info(exc_info)
-            self._report_exception(exc_info)
-
-        finally:
-            self._switch_context()
+        with self._executor_session() as (executor, port_id):
+            if isinstance(internal_marker, StartChannel):
+                
self._set_output_state(executor.produce_state_on_start(port_id))
+            elif isinstance(internal_marker, EndChannel):
+                
self._set_output_state(executor.produce_state_on_finish(port_id))
+                # Flush the state to MainLoop before producing tuples so the
+                # state and the tuple stream don't share a single switch.
+                self._switch_context()
+                self._set_output_tuple(executor.on_finish(port_id))
 
     def process_state(self, state: State) -> None:
         """
         Process an input marker by invoking appropriate state
         or tuple generation based on the marker type.
         """
+        with self._executor_session() as (executor, port_id):
+            self._set_output_state(executor.process_state(state, port_id))
+
+    def process_tuple(self) -> None:
+        """
+        Process an input tuple by invoking the executor's tuple processing 
method.
+        """
+        finished_current = 
self._context.tuple_processing_manager.finished_current
+        while not finished_current.is_set():
+            with self._executor_session() as (executor, port_id):
+                tuple_ = 
self._context.tuple_processing_manager.get_input_tuple()
+                self._set_output_tuple(executor.process_tuple(tuple_, port_id))
+
+    @contextmanager
+    def _executor_session(self):
+        """
+        Open one executor invocation: hand back (executor, port_id) under a
+        print-capture session, route any exception into the exception
+        manager, and always switch back to MainLoop on exit. Reporting and
+        the post-resolution yield happen in `_post_switch_context_checks`
+        so they live in one place rather than being duplicated in every
+        process_* method.
+        """
         try:
             executor = self._context.executor_manager.executor
             port_id = 
self._context.tuple_processing_manager.get_input_port_id()
             with replace_print(
                 self._context.worker_id,
                 self._context.console_message_manager.print_buf,
             ):
-                self._set_output_state(executor.process_state(state, port_id))
-
+                yield executor, port_id
         except Exception as err:
             logger.exception(err)
-            exc_info = sys.exc_info()
-            self._context.exception_manager.set_exception_info(exc_info)
-            self._report_exception(exc_info)
-
+            self._context.exception_manager.set_exception_info(sys.exc_info())

Review Comment:
   Confirmed and fixed in 5ec1dc2a1a. The post-switch centralization broke 
MainLoop's pause-with-error contract: `_check_exception` flushes pending 
console messages and then enters `EXCEPTION_PAUSE` immediately, so anything 
queued only after the resolution-resume arrived at the controller too late. 
`_report_exception(exc_info)` is back inside `_executor_session.except` ahead 
of the finally-clause switch, and `_post_switch_context_checks` / 
`_pre_loop_checks` (which existed only to host the deferred report) are 
removed; `run()` and `_switch_context` now call 
`_check_and_process_debug_command` directly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to