HI Jack thanks for info.. i'd better explain my case for clarity purposes
What i have now in my infra is - A custom Agent running on Vertex AI/ AgentEngine - A GatewayAPI that exposes my Agent functionality via REST Endpoints. - A GCP Dataflow ETL which loads some data and send it to Gemini via Run_Inference / GeminiModelHandler What i am missing is how to integrate my Gateway API (or my Agent Engine , if possible) GeminiModelHandlers will allow me to call Gemini directly, while what i want is to call my agent - which runs on top of Gemini or maybe i am complicating things and there is a better way to integrate a custom Agent with DataFlows? Thanks in advance and regards Marco On Wed, Oct 22, 2025 at 4:07 PM Jack McCluskey <[email protected]> wrote: > If you're using Vertex to deploy a generative model (anything > Gemini-based) I would recommend routing through the Gemini Remote Model > Handler ( > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/ml/inference/gemini_inference.py) > since routing generative calls through the Vertex API is being removed next > summer. > > On Wed, Oct 22, 2025 at 10:27 AM Danny McCormick via user < > [email protected]> wrote: > >> Yep, that guide looks good to me. >> https://github.com/apache/beam/blob/master/examples/notebooks/beam-ml/run_inference_vertex_ai.ipynb >> is >> normally the doc page I'd recommend for this. >> >> On Tue, Oct 21, 2025 at 4:29 PM Marc _ <[email protected]> wrote: >> >>> Thanks XQ/Danny, works like a charm! >>> One further question - apologies i should probably check docs - >>> I want now to go to the next step and deploy my own Agent on VertexAI >>> and call it from dataflow. >>> Am i correct that for this usecase the sample to follow is this one? >>> >>> >>> https://github.com/blueviggen/beam-remote-llm-examples/blob/main/gemma_runinference_example.ipynb >>> >>> If not, do you have other examples? >>> >>> Kind regards >>> Marco >>> >>> >>> >>> On Sat, Oct 18, 2025 at 1:50 PM XQ Hu <[email protected]> wrote: >>> >>>> This happens a lot for the local Prism runner. Have you tried to run >>>> this with Dataflow? >>>> >>>> On Sat, Oct 18, 2025 at 7:01 AM Marc _ <[email protected]> wrote: >>>> >>>>> HI Danny >>>>> thank a lot!.. worked as a charm for some simple instructions and >>>>> 'low tokens request' >>>>> But , when using a complex system instruction which requires some work >>>>> on the Model side i am getting back this >>>>> >>>>> Traceback (most recent call last): >>>>> File "C:\Users\Marco\envs\obb-dataflow\Lib\threading.py", line 1045, >>>>> in _bootstrap_inner >>>>> self.run() >>>>> File "C:\Users\Marco\envs\obb-dataflow\Lib\threading.py", line 982, >>>>> in run >>>>> self._target(*self._args, **self._kwargs) >>>>> File >>>>> "C:\Users\Marco\envs\obb-dataflow\Lib\site-packages\apache_beam\runners\portability\portable_runner.py", >>>>> line 533, in read_messages >>>>> for message in self._message_stream: >>>>> File >>>>> "C:\Users\Marco\envs\obb-dataflow\Lib\site-packages\grpc\_channel.py", >>>>> line >>>>> 543, in __next__ >>>>> return self._next() >>>>> ^^^^^^^^^^^^ >>>>> File >>>>> "C:\Users\Marco\envs\obb-dataflow\Lib\site-packages\grpc\_channel.py", >>>>> line >>>>> 969, in _next >>>>> raise self >>>>> grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of >>>>> RPC that terminated with: >>>>> status = StatusCode.DEADLINE_EXCEEDED >>>>> details = "Deadline Exceeded" >>>>> debug_error_string = "UNKNOWN:Error received from peer >>>>> {grpc_message:"Deadline Exceeded", grpc_status:4, >>>>> created_time:"2025-10-18T10:58:33.9731013+00:00"}" >>>>> > >>>>> >>>>> Any idea on how i can get around it> can i somehow control how much to >>>>> wait for a Model response? >>>>> >>>>> Kind regards >>>>> Marco >>>>> >>>>> >>>>> >>>>> >>>>> On Mon, Oct 13, 2025 at 4:43 PM Danny McCormick < >>>>> [email protected]> wrote: >>>>> >>>>>> The model handler lets you pass in any inference function you want. >>>>>> For example, the notebook uses the default one defined here - >>>>>> https://github.com/apache/beam/blob/08b480000ec859292d0f7bbadafb72328d3e9e16/sdks/python/apache_beam/ml/inference/gemini_inference.py#L54 >>>>>> >>>>>> So you could define an inference function which passes in additional >>>>>> config, e.g. from https://pypi.org/project/google-genai/ >>>>>> >>>>>> ``` >>>>>> config=types.GenerateContentConfig( >>>>>> system_instruction='I say high, you say low', >>>>>> max_output_tokens=3, >>>>>> temperature=0.3, >>>>>> ), >>>>>> ``` >>>>>> >>>>>> On Sat, Oct 11, 2025 at 3:46 PM Marc _ <[email protected]> wrote: >>>>>> >>>>>>> Hello >>>>>>> i m a muppet. Did not notice this in the colab sample >>>>>>> https://github.com/blueviggen/beam-remote-llm-examples/blob/main/gemini_runinference_example.ipynb >>>>>>> >>>>>>> # Only supported for genai package 1.21.1 or earlier >>>>>>> output_text = gemini_response.content.parts[0].text >>>>>>> >>>>>>> Using that package i can run sample pipeline on my local machine >>>>>>> I have further questions on the GeminiModelHandler as i could not >>>>>>> find anything via google.. >>>>>>> >>>>>>> How can i specify system instructions ? I was able to do so with the >>>>>>> OpenAIHandler - below a snippet copied from a prev mail Danny sent me >>>>>>> few >>>>>>> months ago >>>>>>> >>>>>>> class SampleOpenAIHandler(ModelHandler): >>>>>>> """DoFn that accepts a batch of images as bytearray >>>>>>> and sends that batch to the Cloud Vision API for remote inference""" >>>>>>> def __init__(self, oai_key, llm_instructions): >>>>>>> self.oai_key = oai_key >>>>>>> self.llm_instructions = llm_instructions >>>>>>> >>>>>>> def load_model(self): >>>>>>> """Initiate the Google Vision API client.""" >>>>>>> """Initiate the OAI API client.""" >>>>>>> client = openai.OpenAI( >>>>>>> # This is the default and can be omitted >>>>>>> api_key=self.oai_key, >>>>>>> ) >>>>>>> return client >>>>>>> >>>>>>> >>>>>>> def run_inference(self, batch, model, inference): >>>>>>> >>>>>>> >>>>>>> response = model.responses.create( >>>>>>> model="gpt-4o", >>>>>>> instructions=self.llm_instructions, >>>>>>> input=batch[0], >>>>>>> ) >>>>>>> return [response.output_text] >>>>>>> >>>>>>> >>>>>>> >>>>>>> Kind regards >>>>>>> Marco >>>>>>> >>>>>>> >>>>>>> On Fri, Oct 10, 2025 at 10:51 PM Marc _ <[email protected]> wrote: >>>>>>> >>>>>>>> Danny / XQ >>>>>>>> got some setback. >>>>>>>> I copied the colab gemini sample,. >>>>>>>> https://github.com/blueviggen/beam-remote-llm-examples/blob/main/gemini_runinference_example.ipynb >>>>>>>> >>>>>>>> I Have added the following lines to post processor to figure out >>>>>>>> what is going on >>>>>>>> >>>>>>>> input_prompt = element.example >>>>>>>> >>>>>>>> # The API response is in `element.inference` >>>>>>>> # Path to text: response -> candidates -> content -> parts -> text >>>>>>>> gemini_inference = element.inference >>>>>>>> print(f'element.inference is {gemini_inference}') >>>>>>>> print(gemini_inference[1]) >>>>>>>> >>>>>>>> The code works fine in colab and i can see that the response >>>>>>>> matches the docs >>>>>>>> element.inference is ('candidates', >>>>>>>> [Candidate(content=Content(parts=[Part(video_metadata=None, >>>>>>>> thought=None, >>>>>>>> inline_data=None, file_data=None, thought_signature=None, >>>>>>>> code_execution_result=None, executable_code=None, function_call=None, >>>>>>>> function_response=None, text='```json\n{\n "question": "What is >>>>>>>> 1+2?",\n >>>>>>>> "answer": 3\n}\n```')], role='model'), citation_metadata=None, >>>>>>>> finish_message=None, token_count=None, >>>>>>>> finish_reason=<FinishReason.STOP: >>>>>>>> 'STOP'>, url_context_metadata=None, avg_logprobs=None, >>>>>>>> grounding_metadata=None, index=0, logprobs_result=None, >>>>>>>> safety_ratings=None)]) >>>>>>>> >>>>>>>> But when i run the same pipeline on GCP DataFlow, it seems i can >>>>>>>> only capture the first HttpResponse with all headers. the rest of the >>>>>>>> response is gone....... >>>>>>>> Plus, the response is not a Candidate but an HttpResponse and >>>>>>>> hence the following code fails miserably >>>>>>>> >>>>>>>> Is it because GCP is running multiple workers and somehow i am only >>>>>>>> capturing the first 'streamed' response from the model? >>>>>>>> >>>>>>>> Kind regards >>>>>>>> Marco >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> element.inference is ('sdk_http_response', HttpResponse( >>>>>>>> headers=<dict len=10> )) >>>>>>>> >>>>>>>> <https://console.cloud.google.com/logs/query;query=resource.type%3D%22dataflow_step%22%20resource.labels.job_id%3D%222025-10-10_14_40_13-2699715226748068892%22%20logName%3D%22projects%2Fdatascience-projects%2Flogs%2Fdataflow.googleapis.com%252Fworker%22%20resource.labels.step_id%3D%2528%22PostProcess%22%2529%20timestamp%20%3E%3D%20%222025-10-10T21:40:13.801Z%22%20timestamp%20%3C%3D%20%222025-10-10T21:46:28.995Z%22%20severity%3E%3DDEFAULT;timeRange=2025-10-10T21:44:25.660899877Z%2F2025-10-10T21:44:25.660899877Z--PT1H;storageScope=project;pinnedLogId=2025-10-10T21:44:25.660899877Z%2F6182574922792223526:174769:0:14686?hl=en&project=datascience-projects> >>>>>>>> >>>>>>>> <https://console.cloud.google.com/logs/query;query=resource.type%3D%22dataflow_step%22%20resource.labels.job_id%3D%222025-10-10_14_40_13-2699715226748068892%22%20logName%3D%22projects%2Fdatascience-projects%2Flogs%2Fdataflow.googleapis.com%252Fworker%22%20resource.labels.step_id%3D%2528%22PostProcess%22%2529%20timestamp%20%3E%3D%20%222025-10-10T21:40:13.801Z%22%20timestamp%20%3C%3D%20%222025-10-10T21:46:28.995Z%22%20severity%3E%3DDEFAULT;timeRange=2025-10-10T21:44:25.660899877Z%2F2025-10-10T21:44:25.660899877Z--PT1H;storageScope=project;pinnedLogId=2025-10-10T21:44:25.660899877Z%2F6182574922792223526:174769:0:14686?hl=en&project=datascience-projects> >>>>>>>> >>>>>>>> <https://console.cloud.google.com/logs/query;query=resource.type%3D%22dataflow_step%22%20resource.labels.job_id%3D%222025-10-10_14_40_13-2699715226748068892%22%20logName%3D%22projects%2Fdatascience-projects%2Flogs%2Fdataflow.googleapis.com%252Fworker%22%20resource.labels.step_id%3D%2528%22PostProcess%22%2529%20timestamp%20%3E%3D%20%222025-10-10T21:40:13.801Z%22%20timestamp%20%3C%3D%20%222025-10-10T21:46:28.995Z%22%20severity%3E%3DDEFAULT;timeRange=2025-10-10T21:44:25.660899877Z%2F2025-10-10T21:44:25.660899877Z--PT1H;storageScope=project;pinnedLogId=2025-10-10T21:44:25.660899877Z%2F6182574922792223526:174769:0:14686?hl=en&project=datascience-projects> >>>>>>>> >>>>>>>> <https://console.cloud.google.com/logs/query;query=resource.type%3D%22dataflow_step%22%20resource.labels.job_id%3D%222025-10-10_14_40_13-2699715226748068892%22%20logName%3D%22projects%2Fdatascience-projects%2Flogs%2Fdataflow.googleapis.com%252Fworker%22%20resource.labels.step_id%3D%2528%22PostProcess%22%2529%20timestamp%20%3E%3D%20%222025-10-10T21:40:13.801Z%22%20timestamp%20%3C%3D%20%222025-10-10T21:46:28.995Z%22%20severity%3E%3DDEFAULT;timeRange=2025-10-10T21:44:25.660899877Z%2F2025-10-10T21:44:25.660899877Z--PT1H;storageScope=project;pinnedLogId=2025-10-10T21:44:25.660899877Z%2F6182574922792223526:174769:0:14686?hl=en&project=datascience-projects> >>>>>>>> >>>>>>>> <https://console.cloud.google.com/logs/query;query=resource.type%3D%22dataflow_step%22%20resource.labels.job_id%3D%222025-10-10_14_40_13-2699715226748068892%22%20logName%3D%22projects%2Fdatascience-projects%2Flogs%2Fdataflow.googleapis.com%252Fworker%22%20resource.labels.step_id%3D%2528%22PostProcess%22%2529%20timestamp%20%3E%3D%20%222025-10-10T21:40:13.801Z%22%20timestamp%20%3C%3D%20%222025-10-10T21:46:28.995Z%22%20severity%3E%3DDEFAULT;timeRange=2025-10-10T21:44:25.660899877Z%2F2025-10-10T21:44:25.660899877Z--PT1H;storageScope=project;pinnedLogId=2025-10-10T21:44:25.660899877Z%2F6182574922792223526:174769:0:14686?hl=en&project=datascience-projects> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Thu, Oct 9, 2025 at 2:51 PM Marc _ <[email protected]> wrote: >>>>>>>> >>>>>>>>> thanks Danny / XQ, will have a look and report back if i am stuck >>>>>>>>> kr >>>>>>>>> >>>>>>>>> On Thu, Oct 9, 2025 at 2:22 PM Danny McCormick via user < >>>>>>>>> [email protected]> wrote: >>>>>>>>> >>>>>>>>>> If you're using the Dataflow runner, >>>>>>>>>> https://cloud.google.com/dataflow/docs/concepts/security-and-permissions#permissions >>>>>>>>>> has >>>>>>>>>> some info as well - basically you'd want to make sure the worker >>>>>>>>>> service >>>>>>>>>> account has access to the Vertex endpoint you're using. >>>>>>>>>> >>>>>>>>>> Thanks, >>>>>>>>>> Danny >>>>>>>>>> >>>>>>>>>> On Thu, Oct 9, 2025 at 9:18 AM XQ Hu via user < >>>>>>>>>> [email protected]> wrote: >>>>>>>>>> >>>>>>>>>>> I think >>>>>>>>>>> https://cloud.google.com/dataflow/docs/notebooks/run_inference_vertex_ai >>>>>>>>>>> has more details for you to get started. >>>>>>>>>>> >>>>>>>>>>> On Thu, Oct 9, 2025 at 7:14 AM Marc _ <[email protected]> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hello all >>>>>>>>>>>> i want to port this example to a real dataflow pipeline i am >>>>>>>>>>>> running, as i want to move from >>>>>>>>>>>> OpenAI to dataflow >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> https://github.com/blueviggen/beam-remote-llm-examples/blob/main/gemma_runinference_example.ipynb >>>>>>>>>>>> >>>>>>>>>>>> Could anyone advise on the authentication side for accessing >>>>>>>>>>>> VertexAI? >>>>>>>>>>>> >>>>>>>>>>>> Kind regards >>>>>>>>>>>> Marco >>>>>>>>>>>> >>>>>>>>>>>
