aglinxinyuan commented on code in PR #5900:
URL: https://github.com/apache/texera/pull/5900#discussion_r3457100202
##########
amber/src/main/python/core/runnables/network_sender.py:
##########
@@ -100,7 +106,12 @@ def _send_data(self, to: ChannelIdentity, data_payload:
DataPayload) -> None:
elif isinstance(data_payload, StateFrame):
data_header = PythonDataHeader(tag=to, payload_type="State")
table = pa.Table.from_pydict(
- {State.CONTENT: [data_payload.frame.to_json()]},
+ {
+ State.CONTENT: [data_payload.frame.to_json()],
Review Comment:
Good catch, and accurate — `PythonProxyServer` deserializes incoming Python
state via `StateFrame(State.fromTuple(...))`, and Scala's `StateFrame(frame:
State)` carries no loop fields, so loop bookkeeping doesn't survive a
Python→Scala→Python hop.
That's intentional for this PR, which is **dormant**: with no loop
operators, `loop_counter` is always 0, so Scala dropping/defaulting it is
observationally identical. And there's no short-table risk — this PR makes
**both** Python and Scala `toTuple()` emit the full 4 columns, so every State
Arrow table on the wire is 4-column and the receiver never KeyErrors.
Preserving loop bookkeeping **end-to-end** is part of the loop-operator PR,
where the durable channel is the iceberg materialization columns (read directly
by the Python materialization reader), not the live Scala bridge. Extending
Scala `StateFrame` (or confirming loops only use the materialization path) will
land there.
##########
amber/src/main/python/core/runnables/network_receiver.py:
##########
@@ -96,7 +96,12 @@ def data_handler(command: bytes, table: Table) -> int:
"Data",
lambda _: DataFrame(table),
"State",
- lambda _:
StateFrame(State.from_json(table[State.CONTENT][0].as_py())),
+ lambda _: StateFrame(
+ State.from_json(table[State.CONTENT][0].as_py()),
+ loop_counter=int(table[State.LOOP_COUNTER][0].as_py()),
+ loop_start_id=table[State.LOOP_START_ID][0].as_py(),
+
loop_start_state_uri=table[State.LOOP_START_STATE_URI][0].as_py(),
Review Comment:
Good catch, and accurate — `PythonProxyServer` deserializes incoming Python
state via `StateFrame(State.fromTuple(...))`, and Scala's `StateFrame(frame:
State)` carries no loop fields, so loop bookkeeping doesn't survive a
Python→Scala→Python hop.
That's intentional for this PR, which is **dormant**: with no loop
operators, `loop_counter` is always 0, so Scala dropping/defaulting it is
observationally identical. And there's no short-table risk — this PR makes
**both** Python and Scala `toTuple()` emit the full 4 columns, so every State
Arrow table on the wire is 4-column and the receiver never KeyErrors.
Preserving loop bookkeeping **end-to-end** is part of the loop-operator PR,
where the durable channel is the iceberg materialization columns (read directly
by the Python materialization reader), not the live Scala bridge. Extending
Scala `StateFrame` (or confirming loops only use the materialization path) will
land there.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]