aglinxinyuan commented on code in PR #4206:
URL: https://github.com/apache/texera/pull/4206#discussion_r3353567234


##########
amber/src/main/python/core/models/operator.py:
##########
@@ -291,3 +291,30 @@ def process_table(self, table: Table, port: int) -> 
Iterator[Optional[TableLike]
             time, or None.
         """
         yield
+
+
+class LoopStartOperator(TableOperator):
+    @overrides.final
+    def process_state(self, state: State, port: int) -> Optional[State]:
+        if "LoopStartStateURI" in state:
+            state["loop_counter"] += 1
+            return state
+        self.state.update(state)
+        return None
+
+    @overrides.final
+    def produce_state_on_finish(self, port: int) -> State:
+        from pickle import dumps
+
+        self.state["table"] = 
dumps(Table(self._TableOperator__table_data[port]))

Review Comment:
   Both issues fixed in e281c61b4c.
   
   **1. Name-mangled access.** Added a protected 
`TableOperator._buffered_table(port)` accessor; inside the class 
`self.__table_data` resolves normally so a rename of `TableOperator` stays 
transparent. `LoopStartOperator.produce_state_on_finish` now goes through it 
instead of `self._TableOperator__table_data[port]`.
   
   **2. Pickle as RCE surface.** Swapped the bytes format from pickle to Apache 
Arrow IPC — structured + typed, no callable payload, parse errors raise at read 
time. Two new helpers in `core/models/table.py`:
   
   * `table_to_ipc_bytes(table) -> bytes` (sender side, used by Loop Start)
   * `table_from_ipc_bytes(buf) -> Table` (receiver side, emitted by the 
codegen for Loop End)
   
   The codegen in `LoopEndOpDesc.scala` now emits `from core.models.table 
import table_from_ipc_bytes; self.state["table"] = 
table_from_ipc_bytes(self.state["table"])` in place of the prior `from pickle 
import loads` lines. The wire shape (bytes-in-`state["table"]`) is unchanged; 
only the format swaps.
   
   **Tests:**
   * New `core/models/test_loop_operators.py` (8 tests, all green): pins the 
accessor, the Arrow IPC round-trip across mixed/single-row/empty tables, that 
the serialized bytes parse as an Arrow IPC stream (stronger than a 
pickle-prefix check), that malformed input raises at parse time, and the 
end-to-end Loop Start sender path.
   * Extended `LoopOpDescsSpec`: asserts the generated Loop End source imports 
`table_from_ipc_bytes` and contains no `pickle` reference at all.
   
   Diff scoped to 5 files (operator.py, table.py, test_loop_operators.py, 
LoopEndOpDesc.scala, LoopOpDescsSpec.scala) — no unrelated churn. The 
URI-in-state alternative was considered but rejected as ~4× the diff with new 
cleanup plumbing; the surgical pickle→Arrow swap fully resolves both concerns.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to