aglinxinyuan commented on code in PR #4560:
URL: https://github.com/apache/texera/pull/4560#discussion_r3165761207


##########
amber/src/main/python/core/runnables/data_processor.py:
##########
@@ -49,20 +49,17 @@ def run(self) -> None:
         with self._context.tuple_processing_manager.context_switch_condition:
             
self._context.tuple_processing_manager.context_switch_condition.wait()
         self._running.set()
-        self._switch_context()
         while self._running.is_set():
-            marker = 
self._context.tuple_processing_manager.get_internal_marker()
-            state = self._context.state_processing_manager.get_input_state()
-            tuple_ = self._context.tuple_processing_manager.current_input_tuple
-            if marker is not None:
-                self.process_internal_marker(marker)
-            elif state is not None:
-                self.process_state(state)
-            elif tuple_ is not None:
+            tpm = self._context.tuple_processing_manager
+            spm = self._context.state_processing_manager
+            if tpm.current_internal_marker is not None:
+                self.process_internal_marker(tpm.get_internal_marker())
+            elif spm.current_input_state is not None:
+                self.process_state(spm.get_input_state())

Review Comment:
   Added a `queued != 1` runtime check in `d890004c11`. MainLoop is 
single-threaded and `_process_state` / `_process_tuple` / `_process_ecm` each 
populate one slot then switch, so exactly one is always set. The if/elif order 
is just dispatch, not priority.



##########
amber/src/main/python/core/runnables/test_main_loop.py:
##########
@@ -1231,3 +1247,297 @@ def channel_size(channel: ChannelIdentity) -> int:
             "test-1"
         ] == b"pickle    " + pickle.dumps(mock_binary_tuple["test-1"])
         reraise()
+
+    @pytest.mark.timeout(2)
+    def test_process_state_can_emit_multiple_states(
+        self,
+        main_loop,
+        output_queue,
+        mock_data_output_channel,
+        monkeypatch,
+    ):
+        # Stub-level coverage of the single-switch state handshake. Each
+        # call to the (stubbed) _switch_context simulates DataProc
+        # consuming the queued input state and writing
+        # current_output_state, mirroring what real DataProc.process_state
+        # does between MainLoop's switches.
+        class DummyExecutor:
+            @staticmethod
+            def process_state(state: State, port: int) -> State:
+                output_state = State()
+                output_state.add("value", state["value"] + 1)
+                output_state.add("port", port)
+                return output_state
+
+        main_loop.context.executor_manager.executor = DummyExecutor()
+        monkeypatch.setattr(main_loop, "_check_and_process_control", lambda: 
None)
+        monkeypatch.setattr(
+            main_loop.context.output_manager,
+            "emit_state",
+            lambda state: [(mock_data_output_channel.to_worker_id, 
StateFrame(state))],
+        )
+
+        def fake_switch_context():
+            current_input_state = (
+                main_loop.context.state_processing_manager.current_input_state
+            )
+            if current_input_state is not None:
+                
main_loop.context.state_processing_manager.current_output_state = (
+                    DummyExecutor.process_state(current_input_state, 0)
+                )
+
+        monkeypatch.setattr(main_loop, "_switch_context", fake_switch_context)
+
+        first_state = State()
+        first_state.add("value", 1)
+        second_state = State()
+        second_state.add("value", 41)
+
+        main_loop._process_state(first_state)
+        main_loop._process_state(second_state)
+
+        first_output: DataElement = output_queue.get()
+        second_output: DataElement = output_queue.get()
+
+        assert first_output.tag == mock_data_output_channel
+        assert isinstance(first_output.payload, StateFrame)
+        assert first_output.payload.frame["value"] == 2
+        assert first_output.payload.frame["port"] == 0
+
+        assert second_output.tag == mock_data_output_channel
+        assert isinstance(second_output.payload, StateFrame)
+        assert second_output.payload.frame["value"] == 42
+        assert second_output.payload.frame["port"] == 0
+
+    @pytest.mark.timeout(5)
+    def test_main_loop_thread_can_process_state(
+        self,
+        mock_data_output_channel,
+        mock_control_output_channel,
+        input_queue,
+        output_queue,
+        main_loop,
+        main_loop_thread,
+        mock_assign_input_port,
+        mock_assign_output_port,
+        mock_add_input_channel,
+        mock_add_partitioning,
+        mock_initialize_executor,
+        mock_state_data_elements,
+        mock_end_of_upstream,
+        command_sequence,
+        reraise,
+    ):
+        # End-to-end coverage of the state-processing path through the real
+        # MainLoop + DataProcessor threads. The single-switch state handshake
+        # in MainLoop.process_input_state means each state is emitted in its
+        # own cycle (no lag), and an EndChannel ECM after the last state
+        # produces an additional output via produce_state_on_finish.
+        main_loop_thread.start()
+
+        for setup_msg in [
+            mock_assign_input_port,
+            mock_assign_output_port,
+            mock_add_input_channel,
+            mock_add_partitioning,
+            mock_initialize_executor,
+        ]:
+            input_queue.put(setup_msg)
+            assert output_queue.get() == DCMElement(
+                tag=mock_control_output_channel,
+                payload=DirectControlMessagePayloadV2(
+                    return_invocation=ReturnInvocation(
+                        command_id=command_sequence,
+                        return_value=ControlReturn(empty_return=EmptyReturn()),
+                    )
+                ),
+            )
+
+        # Replace the EchoOperator that mock_initialize_executor loaded
+        # with an in-process executor that tags processed states and emits
+        # a finish marker on EndChannel. Going through the
+        # InitializeExecutor RPC above sets up the rest of the worker
+        # state (output schema, partitioning bookkeeping); swapping the
+        # executor instance here lets the test observe whether
+        # process_state actually runs without depending on Python's
+        # cross-test module caching for the loaded operator class.
+        class StateProcessingExecutor:
+            @staticmethod
+            def process_tuple(tuple_, port):
+                yield tuple_
+
+            @staticmethod
+            def process_state(state: State, port: int) -> State:
+                new_state = State()
+                for key, value in state.__dict__.items():
+                    if key != "schema":
+                        new_state.add(key, value)
+                new_state.add("processed_marker", "executed")
+                new_state.add("port", port)
+                return new_state
+
+            @staticmethod
+            def produce_state_on_finish(port: int) -> State:
+                finish_state = State()
+                finish_state.add("finish_marker", 
"produce_state_on_finish_ran")
+                return finish_state
+
+            @staticmethod
+            def on_finish(port):
+                yield
+
+            @staticmethod
+            def close():
+                pass

Review Comment:
   Done — moved to a module-level `_StateProcessingExecutor` + 
`state_processing_executor` fixture in `d890004c11`. Both tests share it now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to