HI Jack
  thanks for info.. i'd better explain my case for clarity purposes

What i have now in my infra is
- A custom Agent running on Vertex AI/ AgentEngine
- A GatewayAPI that exposes my Agent functionality via REST Endpoints.
- A GCP Dataflow ETL which loads some data and send it to Gemini via
Run_Inference / GeminiModelHandler

What i am missing is how to integrate my Gateway API (or my Agent Engine ,
if possible)

GeminiModelHandlers will allow me to call Gemini directly, while what i
want is to call my agent - which runs on top of Gemini

or maybe i am  complicating things and there is a better way to integrate a
custom Agent with DataFlows?

Thanks in advance and regards
 Marco







On Wed, Oct 22, 2025 at 4:07 PM Jack McCluskey <[email protected]>
wrote:

> If you're using Vertex to deploy a generative model (anything
> Gemini-based) I would recommend routing through the Gemini Remote Model
> Handler (
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/ml/inference/gemini_inference.py)
> since routing generative calls through the Vertex API is being removed next
> summer.
>
> On Wed, Oct 22, 2025 at 10:27 AM Danny McCormick via user <
> [email protected]> wrote:
>
>> Yep, that guide looks good to me.
>> https://github.com/apache/beam/blob/master/examples/notebooks/beam-ml/run_inference_vertex_ai.ipynb
>>  is
>> normally the doc page I'd recommend for this.
>>
>> On Tue, Oct 21, 2025 at 4:29 PM Marc _ <[email protected]> wrote:
>>
>>> Thanks XQ/Danny, works like a charm!
>>> One further question - apologies i should probably check docs -
>>> I want now to go to the next step and deploy my own Agent on VertexAI
>>> and call it from dataflow.
>>> Am i correct that for this usecase the sample to follow is this one?
>>>
>>>
>>> https://github.com/blueviggen/beam-remote-llm-examples/blob/main/gemma_runinference_example.ipynb
>>>
>>> If not, do you have other examples?
>>>
>>> Kind regards
>>>   Marco
>>>
>>>
>>>
>>> On Sat, Oct 18, 2025 at 1:50 PM XQ Hu <[email protected]> wrote:
>>>
>>>> This happens a lot for the local Prism runner. Have you tried to run
>>>> this with Dataflow?
>>>>
>>>> On Sat, Oct 18, 2025 at 7:01 AM Marc _ <[email protected]> wrote:
>>>>
>>>>> HI Danny
>>>>>  thank a lot!..  worked as a charm for some simple instructions and
>>>>> 'low tokens request'
>>>>> But , when using a complex system instruction which requires some work
>>>>> on the Model side i am getting back this
>>>>>
>>>>> Traceback (most recent call last):
>>>>>   File "C:\Users\Marco\envs\obb-dataflow\Lib\threading.py", line 1045,
>>>>> in _bootstrap_inner
>>>>>     self.run()
>>>>>   File "C:\Users\Marco\envs\obb-dataflow\Lib\threading.py", line 982,
>>>>> in run
>>>>>     self._target(*self._args, **self._kwargs)
>>>>>   File
>>>>> "C:\Users\Marco\envs\obb-dataflow\Lib\site-packages\apache_beam\runners\portability\portable_runner.py",
>>>>> line 533, in read_messages
>>>>>     for message in self._message_stream:
>>>>>   File
>>>>> "C:\Users\Marco\envs\obb-dataflow\Lib\site-packages\grpc\_channel.py", 
>>>>> line
>>>>> 543, in __next__
>>>>>     return self._next()
>>>>>            ^^^^^^^^^^^^
>>>>>   File
>>>>> "C:\Users\Marco\envs\obb-dataflow\Lib\site-packages\grpc\_channel.py", 
>>>>> line
>>>>> 969, in _next
>>>>>     raise self
>>>>> grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of
>>>>> RPC that terminated with:
>>>>> status = StatusCode.DEADLINE_EXCEEDED
>>>>> details = "Deadline Exceeded"
>>>>> debug_error_string = "UNKNOWN:Error received from peer
>>>>>  {grpc_message:"Deadline Exceeded", grpc_status:4,
>>>>> created_time:"2025-10-18T10:58:33.9731013+00:00"}"
>>>>> >
>>>>>
>>>>> Any idea on how i can get around it> can i somehow control how much to
>>>>> wait for a Model response?
>>>>>
>>>>> Kind regards
>>>>>   Marco
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Mon, Oct 13, 2025 at 4:43 PM Danny McCormick <
>>>>> [email protected]> wrote:
>>>>>
>>>>>> The model handler lets you pass in any inference function you want.
>>>>>> For example, the notebook uses the default one defined here -
>>>>>> https://github.com/apache/beam/blob/08b480000ec859292d0f7bbadafb72328d3e9e16/sdks/python/apache_beam/ml/inference/gemini_inference.py#L54
>>>>>>
>>>>>> So you could define an inference function which passes in additional
>>>>>> config, e.g. from https://pypi.org/project/google-genai/
>>>>>>
>>>>>> ```
>>>>>> config=types.GenerateContentConfig(
>>>>>>         system_instruction='I say high, you say low',
>>>>>>         max_output_tokens=3,
>>>>>>         temperature=0.3,
>>>>>>     ),
>>>>>> ```
>>>>>>
>>>>>> On Sat, Oct 11, 2025 at 3:46 PM Marc _ <[email protected]> wrote:
>>>>>>
>>>>>>> Hello
>>>>>>>  i m a muppet. Did not notice this in the colab sample
>>>>>>> https://github.com/blueviggen/beam-remote-llm-examples/blob/main/gemini_runinference_example.ipynb
>>>>>>>
>>>>>>> # Only supported for genai package 1.21.1 or earlier
>>>>>>> output_text = gemini_response.content.parts[0].text
>>>>>>>
>>>>>>> Using that package i can run sample pipeline on my local machine
>>>>>>> I have further questions on the GeminiModelHandler as i could not
>>>>>>> find anything via google..
>>>>>>>
>>>>>>> How can i specify system instructions ? I was able to do so with the
>>>>>>> OpenAIHandler - below a snippet copied from a prev mail Danny sent me 
>>>>>>> few
>>>>>>> months ago
>>>>>>>
>>>>>>> class SampleOpenAIHandler(ModelHandler):
>>>>>>>   """DoFn that accepts a batch of images as bytearray
>>>>>>>   and sends that batch to the Cloud Vision API for remote inference"""
>>>>>>>   def __init__(self, oai_key, llm_instructions):
>>>>>>>       self.oai_key = oai_key
>>>>>>>       self.llm_instructions = llm_instructions
>>>>>>>
>>>>>>>   def load_model(self):
>>>>>>>     """Initiate the Google Vision API client."""
>>>>>>>     """Initiate the OAI API client."""
>>>>>>>     client =  openai.OpenAI(
>>>>>>>     # This is the default and can be omitted
>>>>>>>         api_key=self.oai_key,
>>>>>>>     )
>>>>>>>     return client
>>>>>>>
>>>>>>>
>>>>>>>   def run_inference(self, batch, model, inference):
>>>>>>>
>>>>>>>
>>>>>>>     response = model.responses.create(
>>>>>>>           model="gpt-4o",
>>>>>>>           instructions=self.llm_instructions,
>>>>>>>           input=batch[0],
>>>>>>>       )
>>>>>>>     return [response.output_text]
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Kind regards
>>>>>>> Marco
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Oct 10, 2025 at 10:51 PM Marc _ <[email protected]> wrote:
>>>>>>>
>>>>>>>> Danny / XQ
>>>>>>>>   got some setback.
>>>>>>>> I copied the colab gemini sample,.
>>>>>>>> https://github.com/blueviggen/beam-remote-llm-examples/blob/main/gemini_runinference_example.ipynb
>>>>>>>>
>>>>>>>> I Have added the following lines to post processor to figure out
>>>>>>>> what is going on
>>>>>>>>
>>>>>>>> input_prompt = element.example
>>>>>>>>
>>>>>>>> # The API response is in `element.inference`
>>>>>>>> # Path to text: response -> candidates -> content -> parts -> text
>>>>>>>> gemini_inference = element.inference
>>>>>>>> print(f'element.inference is {gemini_inference}')
>>>>>>>> print(gemini_inference[1])
>>>>>>>>
>>>>>>>> The code works fine in colab and i can see that the response
>>>>>>>> matches the docs
>>>>>>>> element.inference is ('candidates',
>>>>>>>> [Candidate(content=Content(parts=[Part(video_metadata=None, 
>>>>>>>> thought=None,
>>>>>>>> inline_data=None, file_data=None, thought_signature=None,
>>>>>>>> code_execution_result=None, executable_code=None, function_call=None,
>>>>>>>> function_response=None, text='```json\n{\n "question": "What is 
>>>>>>>> 1+2?",\n
>>>>>>>> "answer": 3\n}\n```')], role='model'), citation_metadata=None,
>>>>>>>> finish_message=None, token_count=None, 
>>>>>>>> finish_reason=<FinishReason.STOP:
>>>>>>>> 'STOP'>, url_context_metadata=None, avg_logprobs=None,
>>>>>>>> grounding_metadata=None, index=0, logprobs_result=None,
>>>>>>>> safety_ratings=None)])
>>>>>>>>
>>>>>>>> But when i run the same pipeline on GCP DataFlow, it seems i can
>>>>>>>> only capture the first HttpResponse with all headers. the rest of the
>>>>>>>> response is gone.......
>>>>>>>> Plus, the response is not  a Candidate  but an HttpResponse and
>>>>>>>> hence the following code fails miserably
>>>>>>>>
>>>>>>>> Is it because GCP is running multiple workers and somehow i am only
>>>>>>>> capturing the first 'streamed' response from the model?
>>>>>>>>
>>>>>>>> Kind regards
>>>>>>>>   Marco
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> element.inference is ('sdk_http_response', HttpResponse(
>>>>>>>> headers=<dict len=10> ))
>>>>>>>>
>>>>>>>> <https://console.cloud.google.com/logs/query;query=resource.type%3D%22dataflow_step%22%20resource.labels.job_id%3D%222025-10-10_14_40_13-2699715226748068892%22%20logName%3D%22projects%2Fdatascience-projects%2Flogs%2Fdataflow.googleapis.com%252Fworker%22%20resource.labels.step_id%3D%2528%22PostProcess%22%2529%20timestamp%20%3E%3D%20%222025-10-10T21:40:13.801Z%22%20timestamp%20%3C%3D%20%222025-10-10T21:46:28.995Z%22%20severity%3E%3DDEFAULT;timeRange=2025-10-10T21:44:25.660899877Z%2F2025-10-10T21:44:25.660899877Z--PT1H;storageScope=project;pinnedLogId=2025-10-10T21:44:25.660899877Z%2F6182574922792223526:174769:0:14686?hl=en&project=datascience-projects>
>>>>>>>>
>>>>>>>> <https://console.cloud.google.com/logs/query;query=resource.type%3D%22dataflow_step%22%20resource.labels.job_id%3D%222025-10-10_14_40_13-2699715226748068892%22%20logName%3D%22projects%2Fdatascience-projects%2Flogs%2Fdataflow.googleapis.com%252Fworker%22%20resource.labels.step_id%3D%2528%22PostProcess%22%2529%20timestamp%20%3E%3D%20%222025-10-10T21:40:13.801Z%22%20timestamp%20%3C%3D%20%222025-10-10T21:46:28.995Z%22%20severity%3E%3DDEFAULT;timeRange=2025-10-10T21:44:25.660899877Z%2F2025-10-10T21:44:25.660899877Z--PT1H;storageScope=project;pinnedLogId=2025-10-10T21:44:25.660899877Z%2F6182574922792223526:174769:0:14686?hl=en&project=datascience-projects>
>>>>>>>>
>>>>>>>> <https://console.cloud.google.com/logs/query;query=resource.type%3D%22dataflow_step%22%20resource.labels.job_id%3D%222025-10-10_14_40_13-2699715226748068892%22%20logName%3D%22projects%2Fdatascience-projects%2Flogs%2Fdataflow.googleapis.com%252Fworker%22%20resource.labels.step_id%3D%2528%22PostProcess%22%2529%20timestamp%20%3E%3D%20%222025-10-10T21:40:13.801Z%22%20timestamp%20%3C%3D%20%222025-10-10T21:46:28.995Z%22%20severity%3E%3DDEFAULT;timeRange=2025-10-10T21:44:25.660899877Z%2F2025-10-10T21:44:25.660899877Z--PT1H;storageScope=project;pinnedLogId=2025-10-10T21:44:25.660899877Z%2F6182574922792223526:174769:0:14686?hl=en&project=datascience-projects>
>>>>>>>>
>>>>>>>> <https://console.cloud.google.com/logs/query;query=resource.type%3D%22dataflow_step%22%20resource.labels.job_id%3D%222025-10-10_14_40_13-2699715226748068892%22%20logName%3D%22projects%2Fdatascience-projects%2Flogs%2Fdataflow.googleapis.com%252Fworker%22%20resource.labels.step_id%3D%2528%22PostProcess%22%2529%20timestamp%20%3E%3D%20%222025-10-10T21:40:13.801Z%22%20timestamp%20%3C%3D%20%222025-10-10T21:46:28.995Z%22%20severity%3E%3DDEFAULT;timeRange=2025-10-10T21:44:25.660899877Z%2F2025-10-10T21:44:25.660899877Z--PT1H;storageScope=project;pinnedLogId=2025-10-10T21:44:25.660899877Z%2F6182574922792223526:174769:0:14686?hl=en&project=datascience-projects>
>>>>>>>>
>>>>>>>> <https://console.cloud.google.com/logs/query;query=resource.type%3D%22dataflow_step%22%20resource.labels.job_id%3D%222025-10-10_14_40_13-2699715226748068892%22%20logName%3D%22projects%2Fdatascience-projects%2Flogs%2Fdataflow.googleapis.com%252Fworker%22%20resource.labels.step_id%3D%2528%22PostProcess%22%2529%20timestamp%20%3E%3D%20%222025-10-10T21:40:13.801Z%22%20timestamp%20%3C%3D%20%222025-10-10T21:46:28.995Z%22%20severity%3E%3DDEFAULT;timeRange=2025-10-10T21:44:25.660899877Z%2F2025-10-10T21:44:25.660899877Z--PT1H;storageScope=project;pinnedLogId=2025-10-10T21:44:25.660899877Z%2F6182574922792223526:174769:0:14686?hl=en&project=datascience-projects>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Oct 9, 2025 at 2:51 PM Marc _ <[email protected]> wrote:
>>>>>>>>
>>>>>>>>> thanks Danny / XQ, will have a look and report back if i am stuck
>>>>>>>>> kr
>>>>>>>>>
>>>>>>>>> On Thu, Oct 9, 2025 at 2:22 PM Danny McCormick via user <
>>>>>>>>> [email protected]> wrote:
>>>>>>>>>
>>>>>>>>>> If you're using the Dataflow runner,
>>>>>>>>>> https://cloud.google.com/dataflow/docs/concepts/security-and-permissions#permissions
>>>>>>>>>>  has
>>>>>>>>>> some info as well - basically you'd want to make sure the worker 
>>>>>>>>>> service
>>>>>>>>>> account has access to the Vertex endpoint you're using.
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> Danny
>>>>>>>>>>
>>>>>>>>>> On Thu, Oct 9, 2025 at 9:18 AM XQ Hu via user <
>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>
>>>>>>>>>>> I think
>>>>>>>>>>> https://cloud.google.com/dataflow/docs/notebooks/run_inference_vertex_ai
>>>>>>>>>>> has more details for you to get started.
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Oct 9, 2025 at 7:14 AM Marc _ <[email protected]>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hello all
>>>>>>>>>>>>  i want to port this example to a real dataflow pipeline i am
>>>>>>>>>>>> running, as i want to move from
>>>>>>>>>>>> OpenAI to dataflow
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> https://github.com/blueviggen/beam-remote-llm-examples/blob/main/gemma_runinference_example.ipynb
>>>>>>>>>>>>
>>>>>>>>>>>> Could anyone advise on the authentication side for accessing
>>>>>>>>>>>> VertexAI?
>>>>>>>>>>>>
>>>>>>>>>>>> Kind regards
>>>>>>>>>>>> Marco
>>>>>>>>>>>>
>>>>>>>>>>>

Reply via email to