Thanks   a lot Ganesh!  I'll have a look and report back if stuck
Kind regards

On Mon, Jan 5, 2026 at 8:11 AM Ganesh Sivakumar <[email protected]>
wrote:

> Hey Marco, best pattern for your use case would be to separate data
> processing and inference(invoking your agent). implement a custom `
> RemoteModelHandler
> <https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.base.html#apache_beam.ml.inference.base.RemoteModelHandler>`
> for calling the agent and use it with RunInference transform. Pipeline like
> Read(stocks data) ---> RunInference(your agent handler) --> Sink.
>
> Thanks,
> Ganesh.
>
> On Mon, Jan 5, 2026 at 1:04 PM Marc _ <[email protected]> wrote:
>
>> Hi all
>>  i have an ADK agent running on CloudRun and wanted to invoke it via
>> dataflow
>> I have the following pipeline and DoFn but i am getting this exception
>>
>> Anyone could advise?
>>
>> Kind regards
>> Marco
>>
>> EOF when reading a line [while running 'ClouodagentRun-ptransform-32']
>> Traceback (most recent call last): File "apache_beam/runners/common.py",
>> line 1498, in apache_beam.runners.common.DoFnRunner.process File
>> "apache_beam/runners/common.py", line 685, in
>> apache_beam.runners.common.SimpleInvoker.invoke_process File
>> "/template/shareloader/modules/obb_utils.py", line 726, in process return
>> runner.run(self.amain(element)) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File
>> "/usr/local/lib/python3.11/asyncio/runners.py", line 118, in run return
>> self._loop.run_until_complete(task) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
>> File "/usr/local/lib/python3.11/asyncio/base_events.py", line 654, in
>> run_until_complete return future.result() ^^^^^^^^^^^^^^^ File
>> "/template/shareloader/modules/obb_utils.py", line 713, in amain await
>> self.chat(client, self.SESSION_ID) File
>> "/template/shareloader/modules/obb_utils.py", line 690, in chat raise e
>> File "/template/shareloader/modules/obb_utils.py", line 683, in chat
>> user_input = await asyncio.to_thread(input, f"[{self.USER_ID}]: ")
>> ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File
>> "/usr/local/lib/python3.11/asyncio/threads.py", line 25, in to_thread
>> return await loop.run_in_executor(None, func_call)
>> ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File
>> "/usr/local/lib/python3.11/concurrent/futures/thread.py", line 58, in run
>> result = self.fn(*self.args, **self.kwargs)
>> ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ EOFError: EOF when reading a line
>>
>> def run_gcloud_agent(pipeline, debugSink):
>>     from shareloader.modules.obb_utils import AsyncCloudRunAgent
>>     (pipeline | 'Sourcinig prompt' >> beam.Create(["Run a technical analysis 
>> for today's stock picks and give me your recommendations"])
>>             | 'ClouodagentRun' >> beam.ParDo(AsyncCloudRunAgent())
>>              |  debugSink
>>             )
>>
>>
>> class AsyncCloudRunAgent(AsyncProcess):
>>
>>     def __init__(self):
>>         # --- Configuration (Dynamic) ---
>>         self.APP_URL = 
>> "https://stock-agent-service-682143946483.us-central1.run.app";
>>         self.USER_ID = "user_123"
>>         # Generate a single session ID for the entire conversation loop
>>         self.SESSION_ID = 
>> f"session_{datetime.now().strftime('%Y%m%d%H%M%S')}"
>>         self.APP_NAME = "stock_agent"
>>
>>
>>
>>     # --- Authentication Function (ASYNC) ---
>>
>>     async def get_auth_token(self) -> str:
>>         """
>>         Programmatically fetches an ID token for the Cloud Run service 
>> audience.
>>         'target_audience' should be the URL of your Cloud Run service.
>>         """
>>         # Run the synchronous google-auth call in a thread to keep it 
>> async-friendly
>>         loop = asyncio.get_event_loop()
>>
>>         def fetch_token():
>>             auth_req = google.auth.transport.requests.Request()
>>             # This automatically uses the Dataflow Worker Service Account
>>             return id_token.fetch_id_token(auth_req, self.APP_URL)
>>
>>         try:
>>             token = await loop.run_in_executor(None, fetch_token)
>>             return token
>>         except Exception as e:
>>             raise RuntimeError(f"Failed to fetch ID token: {e}")
>>     # --- API Interaction Functions (ASYNC) ---
>>
>>     async def make_request(self, client: httpx.AsyncClient, method: str, 
>> endpoint: str, data: Dict[str, Any] = None) -> httpx.Response:
>>         """Helper function for authenticated asynchronous requests using 
>> httpx."""
>>         token = await self.get_auth_token()
>>         headers = {
>>             "Authorization": f"Bearer {token}",
>>             "Content-Type": "application/json"
>>         }
>>         url = f"{self.APP_URL}{endpoint}"
>>
>>         try:
>>             if method.upper() == 'POST':
>>                 response = await client.post(url, headers=headers, json=data)
>>             elif method.upper() == 'DELETE':
>>                 response = await client.delete(url, headers=headers)
>>             else:
>>                 raise ValueError(f"Unsupported method: {method}")
>>
>>             response.raise_for_status()
>>             return response
>>         except httpx.HTTPStatusError as errh:
>>             print(f"\n❌ **HTTP ERROR:** Status {response.status_code} for 
>> {url}")
>>             print(f"❌ **Server Response (Raw):**\n{response.text}")
>>             raise
>>         except httpx.RequestError as err:
>>             print(f"\n❌ An unexpected request error occurred: {err}")
>>             raise
>>
>>     async def run_agent_request(self, client: httpx.AsyncClient, session_id: 
>> str, message: str):
>>         """Executes a single POST request to the /run_sse endpoint."""
>>
>>         print(f"\n[User] -> Sending message: '{message}'")
>>
>>         run_data = {
>>             "app_name": self.APP_NAME,
>>             "user_id": self.USER_ID,
>>             "session_id": session_id,
>>             "new_message": {"role": "user", "parts": [{"text": message}]},
>>             "streaming": False
>>         }
>>
>>         try:
>>             response = await self.make_request(client, "POST", "/run_sse", 
>> data=run_data)
>>             current_status = response.status_code
>>             # print(f"**Request Status Code:** {current_status}")
>>
>>             raw_text = response.text.strip()
>>
>>             # Multi-line SSE parsing logic
>>             data_lines = [
>>                 line.strip()
>>                 for line in raw_text.split('\n')
>>                 if line.strip().startswith("data:")
>>             ]
>>
>>             if not data_lines:
>>                 raise json.JSONDecodeError("No 'data:' lines found in 200 
>> response.", raw_text, 0)
>>
>>             last_data_line = data_lines[-1]
>>             json_payload = last_data_line[len("data:"):].strip()
>>             agent_response = json.loads(json_payload)
>>
>>             # Extract the final text
>>             final_text = agent_response.get('content', {}).get('parts', 
>> [{}])[0].get('text', 'Agent response structure not recognized.')
>>
>>             print(f"[Agent] -> {final_text}")
>>
>>         except json.JSONDecodeError as e:
>>             print(f"\n🚨 **JSON PARSING FAILED**!")
>>             print(f"   Error: {e}")
>>             print("   --- RAW SERVER CONTENT ---")
>>             print(raw_text)
>>             print("   --------------------------")
>>
>>         except Exception as e:
>>             print(f"❌ Agent request failed: {e}")
>>
>>     # --- Interactive Chat Loop ---
>>
>>     async def chat(self, client: httpx.AsyncClient, session_id: str):
>>         """Runs the main conversation loop, handling user input 
>> asynchronously."""
>>         print("--- 💬 Start Chatting ---")
>>
>>         try:
>>             # Use asyncio.to_thread to run blocking input() without freezing 
>> the event loop
>>             user_input = await asyncio.to_thread(input, f"[{self.USER_ID}]: 
>> ")
>>
>>             # Send the message to the agent
>>             await self.run_agent_request(client, session_id, user_input)
>>
>>         except Exception as e:
>>             print(f"An unexpected error occurred in the loop: {e}")
>>             raise e
>>
>>     # --- Main Logic (ASYNC) ---
>>
>>     async def amain(self, element):
>>         """Main asynchronous function to set up the session and start the 
>> loop."""
>>         print(f"\n🤖 Starting Interactive Client with Session ID: 
>> **{self.SESSION_ID}**")
>>         session_data = {"state": {"preferred_language": "English", 
>> "visit_count": 5}}
>>         current_session_endpoint = 
>> f"/apps/{self.APP_NAME}/users/{self.USER_ID}/sessions/{self.SESSION_ID}"
>>
>>         # httpx.AsyncClient is used as a context manager to manage 
>> connections
>>         async with httpx.AsyncClient(timeout=30.0) as client:
>>
>>             # 1. Create Session
>>             print("\n## 1. Creating Session")
>>             try:
>>                 await self.make_request(client, "POST", 
>> current_session_endpoint, data=session_data)
>>                 print(f"✅ Session created successfully. Status 200.")
>>             except Exception as e:
>>                 print(f"❌ Could not start session: {e}")
>>                 return
>>
>>             # 2. Start the Interactive Loop
>>             await self.chat(client, self.SESSION_ID)
>>
>>             # 3. Cleanup: Delete Session (Best Practice)
>>             print(f"\n## 3. Deleting Session: {self.SESSION_ID}")
>>             try:
>>                 await self.make_request(client, "DELETE", 
>> current_session_endpoint)
>>                 print("✅ Session deleted successfully.")
>>             except Exception as e:
>>                 print(f"⚠️ Warning: Failed to delete session. {e}")
>>
>>     def process(self, element: str):
>>         logging.info(f'Input elements:{element}')
>>         with asyncio.Runner() as runner:
>>             return runner.run(self.amain(element))
>>
>>
>>

Reply via email to