Yicong-Huang commented on code in PR #4560:
URL: https://github.com/apache/texera/pull/4560#discussion_r3165586388


##########
amber/src/main/python/core/runnables/test_main_loop.py:
##########
@@ -1231,3 +1247,297 @@ def channel_size(channel: ChannelIdentity) -> int:
             "test-1"
         ] == b"pickle    " + pickle.dumps(mock_binary_tuple["test-1"])
         reraise()
+
+    @pytest.mark.timeout(2)
+    def test_process_state_can_emit_multiple_states(
+        self,
+        main_loop,
+        output_queue,
+        mock_data_output_channel,
+        monkeypatch,
+    ):
+        # Stub-level coverage of the single-switch state handshake. Each
+        # call to the (stubbed) _switch_context simulates DataProc
+        # consuming the queued input state and writing
+        # current_output_state, mirroring what real DataProc.process_state
+        # does between MainLoop's switches.
+        class DummyExecutor:
+            @staticmethod
+            def process_state(state: State, port: int) -> State:
+                output_state = State()
+                output_state.add("value", state["value"] + 1)
+                output_state.add("port", port)
+                return output_state
+
+        main_loop.context.executor_manager.executor = DummyExecutor()
+        monkeypatch.setattr(main_loop, "_check_and_process_control", lambda: 
None)
+        monkeypatch.setattr(
+            main_loop.context.output_manager,
+            "emit_state",
+            lambda state: [(mock_data_output_channel.to_worker_id, 
StateFrame(state))],
+        )
+
+        def fake_switch_context():
+            current_input_state = (
+                main_loop.context.state_processing_manager.current_input_state
+            )
+            if current_input_state is not None:
+                
main_loop.context.state_processing_manager.current_output_state = (
+                    DummyExecutor.process_state(current_input_state, 0)
+                )
+
+        monkeypatch.setattr(main_loop, "_switch_context", fake_switch_context)
+
+        first_state = State()
+        first_state.add("value", 1)
+        second_state = State()
+        second_state.add("value", 41)
+
+        main_loop._process_state(first_state)
+        main_loop._process_state(second_state)
+
+        first_output: DataElement = output_queue.get()
+        second_output: DataElement = output_queue.get()
+
+        assert first_output.tag == mock_data_output_channel
+        assert isinstance(first_output.payload, StateFrame)
+        assert first_output.payload.frame["value"] == 2
+        assert first_output.payload.frame["port"] == 0
+
+        assert second_output.tag == mock_data_output_channel
+        assert isinstance(second_output.payload, StateFrame)
+        assert second_output.payload.frame["value"] == 42
+        assert second_output.payload.frame["port"] == 0
+
+    @pytest.mark.timeout(5)

Review Comment:
   suggest a shorter timeout. those events should happen in ms level.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to