Hey Marco, best pattern for your use case would be to separate data
processing and inference(invoking your agent). implement a custom `
RemoteModelHandler
<https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.base.html#apache_beam.ml.inference.base.RemoteModelHandler>`
for calling the agent and use it with RunInference transform. Pipeline like
Read(stocks data) ---> RunInference(your agent handler) --> Sink.

Thanks,
Ganesh.

On Mon, Jan 5, 2026 at 1:04 PM Marc _ <[email protected]> wrote:

> Hi all
>  i have an ADK agent running on CloudRun and wanted to invoke it via
> dataflow
> I have the following pipeline and DoFn but i am getting this exception
>
> Anyone could advise?
>
> Kind regards
> Marco
>
> EOF when reading a line [while running 'ClouodagentRun-ptransform-32']
> Traceback (most recent call last): File "apache_beam/runners/common.py",
> line 1498, in apache_beam.runners.common.DoFnRunner.process File
> "apache_beam/runners/common.py", line 685, in
> apache_beam.runners.common.SimpleInvoker.invoke_process File
> "/template/shareloader/modules/obb_utils.py", line 726, in process return
> runner.run(self.amain(element)) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File
> "/usr/local/lib/python3.11/asyncio/runners.py", line 118, in run return
> self._loop.run_until_complete(task) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
> File "/usr/local/lib/python3.11/asyncio/base_events.py", line 654, in
> run_until_complete return future.result() ^^^^^^^^^^^^^^^ File
> "/template/shareloader/modules/obb_utils.py", line 713, in amain await
> self.chat(client, self.SESSION_ID) File
> "/template/shareloader/modules/obb_utils.py", line 690, in chat raise e
> File "/template/shareloader/modules/obb_utils.py", line 683, in chat
> user_input = await asyncio.to_thread(input, f"[{self.USER_ID}]: ")
> ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File
> "/usr/local/lib/python3.11/asyncio/threads.py", line 25, in to_thread
> return await loop.run_in_executor(None, func_call)
> ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File
> "/usr/local/lib/python3.11/concurrent/futures/thread.py", line 58, in run
> result = self.fn(*self.args, **self.kwargs)
> ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ EOFError: EOF when reading a line
>
> def run_gcloud_agent(pipeline, debugSink):
>     from shareloader.modules.obb_utils import AsyncCloudRunAgent
>     (pipeline | 'Sourcinig prompt' >> beam.Create(["Run a technical analysis 
> for today's stock picks and give me your recommendations"])
>             | 'ClouodagentRun' >> beam.ParDo(AsyncCloudRunAgent())
>              |  debugSink
>             )
>
>
> class AsyncCloudRunAgent(AsyncProcess):
>
>     def __init__(self):
>         # --- Configuration (Dynamic) ---
>         self.APP_URL = 
> "https://stock-agent-service-682143946483.us-central1.run.app";
>         self.USER_ID = "user_123"
>         # Generate a single session ID for the entire conversation loop
>         self.SESSION_ID = f"session_{datetime.now().strftime('%Y%m%d%H%M%S')}"
>         self.APP_NAME = "stock_agent"
>
>
>
>     # --- Authentication Function (ASYNC) ---
>
>     async def get_auth_token(self) -> str:
>         """
>         Programmatically fetches an ID token for the Cloud Run service 
> audience.
>         'target_audience' should be the URL of your Cloud Run service.
>         """
>         # Run the synchronous google-auth call in a thread to keep it 
> async-friendly
>         loop = asyncio.get_event_loop()
>
>         def fetch_token():
>             auth_req = google.auth.transport.requests.Request()
>             # This automatically uses the Dataflow Worker Service Account
>             return id_token.fetch_id_token(auth_req, self.APP_URL)
>
>         try:
>             token = await loop.run_in_executor(None, fetch_token)
>             return token
>         except Exception as e:
>             raise RuntimeError(f"Failed to fetch ID token: {e}")
>     # --- API Interaction Functions (ASYNC) ---
>
>     async def make_request(self, client: httpx.AsyncClient, method: str, 
> endpoint: str, data: Dict[str, Any] = None) -> httpx.Response:
>         """Helper function for authenticated asynchronous requests using 
> httpx."""
>         token = await self.get_auth_token()
>         headers = {
>             "Authorization": f"Bearer {token}",
>             "Content-Type": "application/json"
>         }
>         url = f"{self.APP_URL}{endpoint}"
>
>         try:
>             if method.upper() == 'POST':
>                 response = await client.post(url, headers=headers, json=data)
>             elif method.upper() == 'DELETE':
>                 response = await client.delete(url, headers=headers)
>             else:
>                 raise ValueError(f"Unsupported method: {method}")
>
>             response.raise_for_status()
>             return response
>         except httpx.HTTPStatusError as errh:
>             print(f"\n❌ **HTTP ERROR:** Status {response.status_code} for 
> {url}")
>             print(f"❌ **Server Response (Raw):**\n{response.text}")
>             raise
>         except httpx.RequestError as err:
>             print(f"\n❌ An unexpected request error occurred: {err}")
>             raise
>
>     async def run_agent_request(self, client: httpx.AsyncClient, session_id: 
> str, message: str):
>         """Executes a single POST request to the /run_sse endpoint."""
>
>         print(f"\n[User] -> Sending message: '{message}'")
>
>         run_data = {
>             "app_name": self.APP_NAME,
>             "user_id": self.USER_ID,
>             "session_id": session_id,
>             "new_message": {"role": "user", "parts": [{"text": message}]},
>             "streaming": False
>         }
>
>         try:
>             response = await self.make_request(client, "POST", "/run_sse", 
> data=run_data)
>             current_status = response.status_code
>             # print(f"**Request Status Code:** {current_status}")
>
>             raw_text = response.text.strip()
>
>             # Multi-line SSE parsing logic
>             data_lines = [
>                 line.strip()
>                 for line in raw_text.split('\n')
>                 if line.strip().startswith("data:")
>             ]
>
>             if not data_lines:
>                 raise json.JSONDecodeError("No 'data:' lines found in 200 
> response.", raw_text, 0)
>
>             last_data_line = data_lines[-1]
>             json_payload = last_data_line[len("data:"):].strip()
>             agent_response = json.loads(json_payload)
>
>             # Extract the final text
>             final_text = agent_response.get('content', {}).get('parts', 
> [{}])[0].get('text', 'Agent response structure not recognized.')
>
>             print(f"[Agent] -> {final_text}")
>
>         except json.JSONDecodeError as e:
>             print(f"\n🚨 **JSON PARSING FAILED**!")
>             print(f"   Error: {e}")
>             print("   --- RAW SERVER CONTENT ---")
>             print(raw_text)
>             print("   --------------------------")
>
>         except Exception as e:
>             print(f"❌ Agent request failed: {e}")
>
>     # --- Interactive Chat Loop ---
>
>     async def chat(self, client: httpx.AsyncClient, session_id: str):
>         """Runs the main conversation loop, handling user input 
> asynchronously."""
>         print("--- 💬 Start Chatting ---")
>
>         try:
>             # Use asyncio.to_thread to run blocking input() without freezing 
> the event loop
>             user_input = await asyncio.to_thread(input, f"[{self.USER_ID}]: ")
>
>             # Send the message to the agent
>             await self.run_agent_request(client, session_id, user_input)
>
>         except Exception as e:
>             print(f"An unexpected error occurred in the loop: {e}")
>             raise e
>
>     # --- Main Logic (ASYNC) ---
>
>     async def amain(self, element):
>         """Main asynchronous function to set up the session and start the 
> loop."""
>         print(f"\n🤖 Starting Interactive Client with Session ID: 
> **{self.SESSION_ID}**")
>         session_data = {"state": {"preferred_language": "English", 
> "visit_count": 5}}
>         current_session_endpoint = 
> f"/apps/{self.APP_NAME}/users/{self.USER_ID}/sessions/{self.SESSION_ID}"
>
>         # httpx.AsyncClient is used as a context manager to manage connections
>         async with httpx.AsyncClient(timeout=30.0) as client:
>
>             # 1. Create Session
>             print("\n## 1. Creating Session")
>             try:
>                 await self.make_request(client, "POST", 
> current_session_endpoint, data=session_data)
>                 print(f"✅ Session created successfully. Status 200.")
>             except Exception as e:
>                 print(f"❌ Could not start session: {e}")
>                 return
>
>             # 2. Start the Interactive Loop
>             await self.chat(client, self.SESSION_ID)
>
>             # 3. Cleanup: Delete Session (Best Practice)
>             print(f"\n## 3. Deleting Session: {self.SESSION_ID}")
>             try:
>                 await self.make_request(client, "DELETE", 
> current_session_endpoint)
>                 print("✅ Session deleted successfully.")
>             except Exception as e:
>                 print(f"⚠️ Warning: Failed to delete session. {e}")
>
>     def process(self, element: str):
>         logging.info(f'Input elements:{element}')
>         with asyncio.Runner() as runner:
>             return runner.run(self.amain(element))
>
>
>

Reply via email to