Hey Marco, best pattern for your use case would be to separate data processing and inference(invoking your agent). implement a custom ` RemoteModelHandler <https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.base.html#apache_beam.ml.inference.base.RemoteModelHandler>` for calling the agent and use it with RunInference transform. Pipeline like Read(stocks data) ---> RunInference(your agent handler) --> Sink.
Thanks, Ganesh. On Mon, Jan 5, 2026 at 1:04 PM Marc _ <[email protected]> wrote: > Hi all > i have an ADK agent running on CloudRun and wanted to invoke it via > dataflow > I have the following pipeline and DoFn but i am getting this exception > > Anyone could advise? > > Kind regards > Marco > > EOF when reading a line [while running 'ClouodagentRun-ptransform-32'] > Traceback (most recent call last): File "apache_beam/runners/common.py", > line 1498, in apache_beam.runners.common.DoFnRunner.process File > "apache_beam/runners/common.py", line 685, in > apache_beam.runners.common.SimpleInvoker.invoke_process File > "/template/shareloader/modules/obb_utils.py", line 726, in process return > runner.run(self.amain(element)) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File > "/usr/local/lib/python3.11/asyncio/runners.py", line 118, in run return > self._loop.run_until_complete(task) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ > File "/usr/local/lib/python3.11/asyncio/base_events.py", line 654, in > run_until_complete return future.result() ^^^^^^^^^^^^^^^ File > "/template/shareloader/modules/obb_utils.py", line 713, in amain await > self.chat(client, self.SESSION_ID) File > "/template/shareloader/modules/obb_utils.py", line 690, in chat raise e > File "/template/shareloader/modules/obb_utils.py", line 683, in chat > user_input = await asyncio.to_thread(input, f"[{self.USER_ID}]: ") > ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File > "/usr/local/lib/python3.11/asyncio/threads.py", line 25, in to_thread > return await loop.run_in_executor(None, func_call) > ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File > "/usr/local/lib/python3.11/concurrent/futures/thread.py", line 58, in run > result = self.fn(*self.args, **self.kwargs) > ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ EOFError: EOF when reading a line > > def run_gcloud_agent(pipeline, debugSink): > from shareloader.modules.obb_utils import AsyncCloudRunAgent > (pipeline | 'Sourcinig prompt' >> beam.Create(["Run a technical analysis > for today's stock picks and give me your recommendations"]) > | 'ClouodagentRun' >> beam.ParDo(AsyncCloudRunAgent()) > | debugSink > ) > > > class AsyncCloudRunAgent(AsyncProcess): > > def __init__(self): > # --- Configuration (Dynamic) --- > self.APP_URL = > "https://stock-agent-service-682143946483.us-central1.run.app" > self.USER_ID = "user_123" > # Generate a single session ID for the entire conversation loop > self.SESSION_ID = f"session_{datetime.now().strftime('%Y%m%d%H%M%S')}" > self.APP_NAME = "stock_agent" > > > > # --- Authentication Function (ASYNC) --- > > async def get_auth_token(self) -> str: > """ > Programmatically fetches an ID token for the Cloud Run service > audience. > 'target_audience' should be the URL of your Cloud Run service. > """ > # Run the synchronous google-auth call in a thread to keep it > async-friendly > loop = asyncio.get_event_loop() > > def fetch_token(): > auth_req = google.auth.transport.requests.Request() > # This automatically uses the Dataflow Worker Service Account > return id_token.fetch_id_token(auth_req, self.APP_URL) > > try: > token = await loop.run_in_executor(None, fetch_token) > return token > except Exception as e: > raise RuntimeError(f"Failed to fetch ID token: {e}") > # --- API Interaction Functions (ASYNC) --- > > async def make_request(self, client: httpx.AsyncClient, method: str, > endpoint: str, data: Dict[str, Any] = None) -> httpx.Response: > """Helper function for authenticated asynchronous requests using > httpx.""" > token = await self.get_auth_token() > headers = { > "Authorization": f"Bearer {token}", > "Content-Type": "application/json" > } > url = f"{self.APP_URL}{endpoint}" > > try: > if method.upper() == 'POST': > response = await client.post(url, headers=headers, json=data) > elif method.upper() == 'DELETE': > response = await client.delete(url, headers=headers) > else: > raise ValueError(f"Unsupported method: {method}") > > response.raise_for_status() > return response > except httpx.HTTPStatusError as errh: > print(f"\n❌ **HTTP ERROR:** Status {response.status_code} for > {url}") > print(f"❌ **Server Response (Raw):**\n{response.text}") > raise > except httpx.RequestError as err: > print(f"\n❌ An unexpected request error occurred: {err}") > raise > > async def run_agent_request(self, client: httpx.AsyncClient, session_id: > str, message: str): > """Executes a single POST request to the /run_sse endpoint.""" > > print(f"\n[User] -> Sending message: '{message}'") > > run_data = { > "app_name": self.APP_NAME, > "user_id": self.USER_ID, > "session_id": session_id, > "new_message": {"role": "user", "parts": [{"text": message}]}, > "streaming": False > } > > try: > response = await self.make_request(client, "POST", "/run_sse", > data=run_data) > current_status = response.status_code > # print(f"**Request Status Code:** {current_status}") > > raw_text = response.text.strip() > > # Multi-line SSE parsing logic > data_lines = [ > line.strip() > for line in raw_text.split('\n') > if line.strip().startswith("data:") > ] > > if not data_lines: > raise json.JSONDecodeError("No 'data:' lines found in 200 > response.", raw_text, 0) > > last_data_line = data_lines[-1] > json_payload = last_data_line[len("data:"):].strip() > agent_response = json.loads(json_payload) > > # Extract the final text > final_text = agent_response.get('content', {}).get('parts', > [{}])[0].get('text', 'Agent response structure not recognized.') > > print(f"[Agent] -> {final_text}") > > except json.JSONDecodeError as e: > print(f"\n🚨 **JSON PARSING FAILED**!") > print(f" Error: {e}") > print(" --- RAW SERVER CONTENT ---") > print(raw_text) > print(" --------------------------") > > except Exception as e: > print(f"❌ Agent request failed: {e}") > > # --- Interactive Chat Loop --- > > async def chat(self, client: httpx.AsyncClient, session_id: str): > """Runs the main conversation loop, handling user input > asynchronously.""" > print("--- 💬 Start Chatting ---") > > try: > # Use asyncio.to_thread to run blocking input() without freezing > the event loop > user_input = await asyncio.to_thread(input, f"[{self.USER_ID}]: ") > > # Send the message to the agent > await self.run_agent_request(client, session_id, user_input) > > except Exception as e: > print(f"An unexpected error occurred in the loop: {e}") > raise e > > # --- Main Logic (ASYNC) --- > > async def amain(self, element): > """Main asynchronous function to set up the session and start the > loop.""" > print(f"\n🤖 Starting Interactive Client with Session ID: > **{self.SESSION_ID}**") > session_data = {"state": {"preferred_language": "English", > "visit_count": 5}} > current_session_endpoint = > f"/apps/{self.APP_NAME}/users/{self.USER_ID}/sessions/{self.SESSION_ID}" > > # httpx.AsyncClient is used as a context manager to manage connections > async with httpx.AsyncClient(timeout=30.0) as client: > > # 1. Create Session > print("\n## 1. Creating Session") > try: > await self.make_request(client, "POST", > current_session_endpoint, data=session_data) > print(f"✅ Session created successfully. Status 200.") > except Exception as e: > print(f"❌ Could not start session: {e}") > return > > # 2. Start the Interactive Loop > await self.chat(client, self.SESSION_ID) > > # 3. Cleanup: Delete Session (Best Practice) > print(f"\n## 3. Deleting Session: {self.SESSION_ID}") > try: > await self.make_request(client, "DELETE", > current_session_endpoint) > print("✅ Session deleted successfully.") > except Exception as e: > print(f"⚠️ Warning: Failed to delete session. {e}") > > def process(self, element: str): > logging.info(f'Input elements:{element}') > with asyncio.Runner() as runner: > return runner.run(self.amain(element)) > > >
