Yicong-Huang commented on code in PR #4893:
URL: https://github.com/apache/texera/pull/4893#discussion_r3179287539


##########
amber/src/main/python/core/architecture/managers/test_console_message_manager.py:
##########
@@ -0,0 +1,78 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from datetime import datetime, timedelta
+
+from core.architecture.managers.console_message_manager import 
ConsoleMessageManager
+from proto.org.apache.texera.amber.engine.architecture.rpc import (
+    ConsoleMessage,
+    ConsoleMessageType,
+)
+
+
+def _msg(title: str) -> ConsoleMessage:
+    return ConsoleMessage(
+        worker_id="w0",
+        timestamp=datetime.now(),
+        msg_type=ConsoleMessageType.PRINT,
+        source="src",
+        title=title,
+        message=title,
+    )
+
+
+class TestConsoleMessageManager:
+    def test_initially_force_flush_drains_empty(self):
+        mgr = ConsoleMessageManager()
+        # No messages put yet — force_flush still yields zero items.
+        assert list(mgr.get_messages(force_flush=True)) == []
+
+    def test_force_flush_drains_all_buffered_in_order(self):
+        mgr = ConsoleMessageManager()
+        for t in ("a", "b", "c"):
+            mgr.put_message(_msg(t))
+        flushed = list(mgr.get_messages(force_flush=True))
+        assert [m.title for m in flushed] == ["a", "b", "c"]
+        # A second drain must come back empty — get() is consumptive.
+        assert list(mgr.get_messages(force_flush=True)) == []
+
+    def test_get_without_flush_below_threshold_yields_nothing(self):
+        # Below max_message_num (default 10) and within max_flush_interval
+        # (default 500ms) — the underlying TimedBuffer should withhold output.
+        mgr = ConsoleMessageManager()
+        mgr.put_message(_msg("only"))

Review Comment:
   Pinned in fd0acff5ae: `mgr.print_buf._last_output_time = datetime.now()` is 
set explicitly right before the `get_messages(force_flush=False)` call, so the 
`>= 1s` flush branch in `TimedBuffer` can't fire if the rest of the test 
happens to run more than ~1s after `ConsoleMessageManager()` construction.



##########
amber/src/main/python/core/architecture/managers/test_console_message_manager.py:
##########
@@ -0,0 +1,78 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from datetime import datetime, timedelta
+
+from core.architecture.managers.console_message_manager import 
ConsoleMessageManager
+from proto.org.apache.texera.amber.engine.architecture.rpc import (
+    ConsoleMessage,
+    ConsoleMessageType,
+)
+
+
+def _msg(title: str) -> ConsoleMessage:
+    return ConsoleMessage(
+        worker_id="w0",
+        timestamp=datetime.now(),
+        msg_type=ConsoleMessageType.PRINT,
+        source="src",
+        title=title,
+        message=title,
+    )
+
+
+class TestConsoleMessageManager:
+    def test_initially_force_flush_drains_empty(self):
+        mgr = ConsoleMessageManager()
+        # No messages put yet — force_flush still yields zero items.
+        assert list(mgr.get_messages(force_flush=True)) == []
+
+    def test_force_flush_drains_all_buffered_in_order(self):
+        mgr = ConsoleMessageManager()
+        for t in ("a", "b", "c"):
+            mgr.put_message(_msg(t))
+        flushed = list(mgr.get_messages(force_flush=True))
+        assert [m.title for m in flushed] == ["a", "b", "c"]
+        # A second drain must come back empty — get() is consumptive.
+        assert list(mgr.get_messages(force_flush=True)) == []
+
+    def test_get_without_flush_below_threshold_yields_nothing(self):
+        # Below max_message_num (default 10) and within max_flush_interval
+        # (default 500ms) — the underlying TimedBuffer should withhold output.
+        mgr = ConsoleMessageManager()
+        mgr.put_message(_msg("only"))
+        assert list(mgr.get_messages(force_flush=False)) == []
+        # The withheld message must still be drainable on a force flush.
+        assert [m.title for m in mgr.get_messages(force_flush=True)] == 
["only"]
+
+    def test_get_without_flush_at_or_over_max_message_num_drains(self):
+        # Once buffered count crosses max_message_num (default 10), the
+        # buffer should auto-flush even without force_flush=True.
+        mgr = ConsoleMessageManager()
+        for i in range(10):
+            mgr.put_message(_msg(f"m{i}"))
+        flushed = [m.title for m in mgr.get_messages(force_flush=False)]
+        assert flushed == [f"m{i}" for i in range(10)]
+
+    def test_get_drains_when_last_output_time_is_stale(self, monkeypatch):
+        # Backdate the buffer's `_last_output_time` so the >=500ms branch
+        # fires even with a single message and force_flush=False.
+        mgr = ConsoleMessageManager()
+        mgr.put_message(_msg("stale"))
+        mgr.print_buf._last_output_time = datetime.now() - timedelta(seconds=2)
+        flushed = [m.title for m in mgr.get_messages(force_flush=False)]
+        assert flushed == ["stale"]

Review Comment:
   Removed in fd0acff5ae: dropped the unused `monkeypatch` parameter from 
`test_get_drains_when_last_output_time_is_stale`. The test backdates 
`_last_output_time` directly and never touches `datetime.now`, so the fixture 
wasn't doing anything.



##########
amber/src/main/python/core/architecture/managers/test_embedded_control_message_manager.py:
##########
@@ -0,0 +1,192 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from types import SimpleNamespace
+from unittest.mock import MagicMock
+
+import pytest
+
+from core.architecture.managers.embedded_control_message_manager import (
+    EmbeddedControlMessageManager,
+)
+from proto.org.apache.texera.amber.core import (
+    ActorVirtualIdentity,
+    ChannelIdentity,
+    EmbeddedControlMessageIdentity,
+)
+from proto.org.apache.texera.amber.engine.architecture.rpc import (
+    EmbeddedControlMessage,
+    EmbeddedControlMessageType,
+)
+
+
+SELF_ID = ActorVirtualIdentity(name="self")
+
+
+def _channel(from_name: str, to_name: str = "self", is_control: bool = False):
+    return ChannelIdentity(
+        from_worker_id=ActorVirtualIdentity(name=from_name),
+        to_worker_id=ActorVirtualIdentity(name=to_name),
+        is_control=is_control,
+    )
+
+
+def _make_ecm(
+    ecm_type: EmbeddedControlMessageType,
+    scope=None,
+) -> EmbeddedControlMessage:
+    # Reuse the same identity *object* across calls so the dict in
+    # `EmbeddedControlMessageManager.ecm_received` aggregates under one key.
+    return EmbeddedControlMessage(
+        id=EmbeddedControlMessageIdentity(id="ecm-1"),
+        ecm_type=ecm_type,
+        scope=scope or [],
+    )

Review Comment:
   Comment fixed in fd0acff5ae: replaced the inaccurate "Reuse the same 
identity *object*" claim with one that describes what actually happens — 
`_make_ecm` does construct a fresh `EmbeddedControlMessageIdentity(id="ecm-1")` 
each call, but the proto's dataclass-style equality / hash means the dict in 
`ecm_received` still aggregates them under one key.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to