Thanks a lot Ganesh! I'll have a look and report back if stuck Kind regards
On Mon, Jan 5, 2026 at 8:11 AM Ganesh Sivakumar <[email protected]> wrote: > Hey Marco, best pattern for your use case would be to separate data > processing and inference(invoking your agent). implement a custom ` > RemoteModelHandler > <https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.base.html#apache_beam.ml.inference.base.RemoteModelHandler>` > for calling the agent and use it with RunInference transform. Pipeline like > Read(stocks data) ---> RunInference(your agent handler) --> Sink. > > Thanks, > Ganesh. > > On Mon, Jan 5, 2026 at 1:04 PM Marc _ <[email protected]> wrote: > >> Hi all >> i have an ADK agent running on CloudRun and wanted to invoke it via >> dataflow >> I have the following pipeline and DoFn but i am getting this exception >> >> Anyone could advise? >> >> Kind regards >> Marco >> >> EOF when reading a line [while running 'ClouodagentRun-ptransform-32'] >> Traceback (most recent call last): File "apache_beam/runners/common.py", >> line 1498, in apache_beam.runners.common.DoFnRunner.process File >> "apache_beam/runners/common.py", line 685, in >> apache_beam.runners.common.SimpleInvoker.invoke_process File >> "/template/shareloader/modules/obb_utils.py", line 726, in process return >> runner.run(self.amain(element)) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File >> "/usr/local/lib/python3.11/asyncio/runners.py", line 118, in run return >> self._loop.run_until_complete(task) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ >> File "/usr/local/lib/python3.11/asyncio/base_events.py", line 654, in >> run_until_complete return future.result() ^^^^^^^^^^^^^^^ File >> "/template/shareloader/modules/obb_utils.py", line 713, in amain await >> self.chat(client, self.SESSION_ID) File >> "/template/shareloader/modules/obb_utils.py", line 690, in chat raise e >> File "/template/shareloader/modules/obb_utils.py", line 683, in chat >> user_input = await asyncio.to_thread(input, f"[{self.USER_ID}]: ") >> ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File >> "/usr/local/lib/python3.11/asyncio/threads.py", line 25, in to_thread >> return await loop.run_in_executor(None, func_call) >> ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File >> "/usr/local/lib/python3.11/concurrent/futures/thread.py", line 58, in run >> result = self.fn(*self.args, **self.kwargs) >> ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ EOFError: EOF when reading a line >> >> def run_gcloud_agent(pipeline, debugSink): >> from shareloader.modules.obb_utils import AsyncCloudRunAgent >> (pipeline | 'Sourcinig prompt' >> beam.Create(["Run a technical analysis >> for today's stock picks and give me your recommendations"]) >> | 'ClouodagentRun' >> beam.ParDo(AsyncCloudRunAgent()) >> | debugSink >> ) >> >> >> class AsyncCloudRunAgent(AsyncProcess): >> >> def __init__(self): >> # --- Configuration (Dynamic) --- >> self.APP_URL = >> "https://stock-agent-service-682143946483.us-central1.run.app" >> self.USER_ID = "user_123" >> # Generate a single session ID for the entire conversation loop >> self.SESSION_ID = >> f"session_{datetime.now().strftime('%Y%m%d%H%M%S')}" >> self.APP_NAME = "stock_agent" >> >> >> >> # --- Authentication Function (ASYNC) --- >> >> async def get_auth_token(self) -> str: >> """ >> Programmatically fetches an ID token for the Cloud Run service >> audience. >> 'target_audience' should be the URL of your Cloud Run service. >> """ >> # Run the synchronous google-auth call in a thread to keep it >> async-friendly >> loop = asyncio.get_event_loop() >> >> def fetch_token(): >> auth_req = google.auth.transport.requests.Request() >> # This automatically uses the Dataflow Worker Service Account >> return id_token.fetch_id_token(auth_req, self.APP_URL) >> >> try: >> token = await loop.run_in_executor(None, fetch_token) >> return token >> except Exception as e: >> raise RuntimeError(f"Failed to fetch ID token: {e}") >> # --- API Interaction Functions (ASYNC) --- >> >> async def make_request(self, client: httpx.AsyncClient, method: str, >> endpoint: str, data: Dict[str, Any] = None) -> httpx.Response: >> """Helper function for authenticated asynchronous requests using >> httpx.""" >> token = await self.get_auth_token() >> headers = { >> "Authorization": f"Bearer {token}", >> "Content-Type": "application/json" >> } >> url = f"{self.APP_URL}{endpoint}" >> >> try: >> if method.upper() == 'POST': >> response = await client.post(url, headers=headers, json=data) >> elif method.upper() == 'DELETE': >> response = await client.delete(url, headers=headers) >> else: >> raise ValueError(f"Unsupported method: {method}") >> >> response.raise_for_status() >> return response >> except httpx.HTTPStatusError as errh: >> print(f"\n❌ **HTTP ERROR:** Status {response.status_code} for >> {url}") >> print(f"❌ **Server Response (Raw):**\n{response.text}") >> raise >> except httpx.RequestError as err: >> print(f"\n❌ An unexpected request error occurred: {err}") >> raise >> >> async def run_agent_request(self, client: httpx.AsyncClient, session_id: >> str, message: str): >> """Executes a single POST request to the /run_sse endpoint.""" >> >> print(f"\n[User] -> Sending message: '{message}'") >> >> run_data = { >> "app_name": self.APP_NAME, >> "user_id": self.USER_ID, >> "session_id": session_id, >> "new_message": {"role": "user", "parts": [{"text": message}]}, >> "streaming": False >> } >> >> try: >> response = await self.make_request(client, "POST", "/run_sse", >> data=run_data) >> current_status = response.status_code >> # print(f"**Request Status Code:** {current_status}") >> >> raw_text = response.text.strip() >> >> # Multi-line SSE parsing logic >> data_lines = [ >> line.strip() >> for line in raw_text.split('\n') >> if line.strip().startswith("data:") >> ] >> >> if not data_lines: >> raise json.JSONDecodeError("No 'data:' lines found in 200 >> response.", raw_text, 0) >> >> last_data_line = data_lines[-1] >> json_payload = last_data_line[len("data:"):].strip() >> agent_response = json.loads(json_payload) >> >> # Extract the final text >> final_text = agent_response.get('content', {}).get('parts', >> [{}])[0].get('text', 'Agent response structure not recognized.') >> >> print(f"[Agent] -> {final_text}") >> >> except json.JSONDecodeError as e: >> print(f"\n🚨 **JSON PARSING FAILED**!") >> print(f" Error: {e}") >> print(" --- RAW SERVER CONTENT ---") >> print(raw_text) >> print(" --------------------------") >> >> except Exception as e: >> print(f"❌ Agent request failed: {e}") >> >> # --- Interactive Chat Loop --- >> >> async def chat(self, client: httpx.AsyncClient, session_id: str): >> """Runs the main conversation loop, handling user input >> asynchronously.""" >> print("--- 💬 Start Chatting ---") >> >> try: >> # Use asyncio.to_thread to run blocking input() without freezing >> the event loop >> user_input = await asyncio.to_thread(input, f"[{self.USER_ID}]: >> ") >> >> # Send the message to the agent >> await self.run_agent_request(client, session_id, user_input) >> >> except Exception as e: >> print(f"An unexpected error occurred in the loop: {e}") >> raise e >> >> # --- Main Logic (ASYNC) --- >> >> async def amain(self, element): >> """Main asynchronous function to set up the session and start the >> loop.""" >> print(f"\n🤖 Starting Interactive Client with Session ID: >> **{self.SESSION_ID}**") >> session_data = {"state": {"preferred_language": "English", >> "visit_count": 5}} >> current_session_endpoint = >> f"/apps/{self.APP_NAME}/users/{self.USER_ID}/sessions/{self.SESSION_ID}" >> >> # httpx.AsyncClient is used as a context manager to manage >> connections >> async with httpx.AsyncClient(timeout=30.0) as client: >> >> # 1. Create Session >> print("\n## 1. Creating Session") >> try: >> await self.make_request(client, "POST", >> current_session_endpoint, data=session_data) >> print(f"✅ Session created successfully. Status 200.") >> except Exception as e: >> print(f"❌ Could not start session: {e}") >> return >> >> # 2. Start the Interactive Loop >> await self.chat(client, self.SESSION_ID) >> >> # 3. Cleanup: Delete Session (Best Practice) >> print(f"\n## 3. Deleting Session: {self.SESSION_ID}") >> try: >> await self.make_request(client, "DELETE", >> current_session_endpoint) >> print("✅ Session deleted successfully.") >> except Exception as e: >> print(f"⚠️ Warning: Failed to delete session. {e}") >> >> def process(self, element: str): >> logging.info(f'Input elements:{element}') >> with asyncio.Runner() as runner: >> return runner.run(self.amain(element)) >> >> >>
