aglinxinyuan commented on code in PR #4206:
URL: https://github.com/apache/texera/pull/4206#discussion_r3290943037
##########
amber/src/main/python/core/runnables/main_loop.py:
##########
@@ -87,19 +92,54 @@ def __init__(
target=self.data_processor.run, daemon=True,
name="data_processor_thread"
).start()
+ def _attach_loop_start_id(self, output_state: State) -> None:
+ if "LoopStartId" in output_state:
+ return
+ output_state["LoopStartId"] = self.context.worker_id.split("-",
1)[1].rsplit(
+ "-main-0", 1
+ )[0]
+ # The URI lives on the upstream operator's output port (which
+ # LoopStart's first materialization reader is reading from).
+ reader_runnables = (
+ self.context.input_manager.get_input_port_mat_reader_threads()
+ )
+ output_state["LoopStartStateURI"] = VFSURIFactory.state_uri(
+ next(iter(reader_runnables.values()))[0].uri
+ )
+
+ def _jump_to_loop_start(
+ self, executor: LoopEndOperator, controller_interface
+ ) -> None:
+ state = executor.state
+ controller_interface.jump_to_operator_region(
+ JumpToOperatorRegionRequest(OperatorIdentity(state["LoopStartId"]))
+ )
+ uri = state["LoopStartStateURI"]
+ # Strip the per-iteration scratch (`table`, `output`) and the
+ # loop metadata (`LoopStartId`, `LoopStartStateURI`) so only the
+ # user-visible loop state is written back to LoopStart's input.
+ for key in ("table", "output", "LoopStartId", "LoopStartStateURI"):
+ state.pop(key, None)
+ writer = DocumentFactory.create_document(uri, State.SCHEMA).writer("0")
+ writer.put_one(State(state).to_tuple())
+ writer.close()
+
def complete(self) -> None:
"""
Complete the DataProcessor, marking state to COMPLETED, and notify the
controller.
"""
# flush the buffered console prints
self._check_and_report_console_messages(force_flush=True)
- self.context.executor_manager.executor.close()
+ controller_interface = self._async_rpc_client.controller_stub()
+ executor = self.context.executor_manager.executor
+ if isinstance(executor, LoopEndOperator) and executor.condition():
+ self._jump_to_loop_start(executor, controller_interface)
Review Comment:
1. It's expected. We don't provide error handling for user-provided code,
and it shares the same design as Python UDF.
2. Infinite loop is valid.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]