If you're using Vertex to deploy a generative model (anything Gemini-based) I would recommend routing through the Gemini Remote Model Handler ( https://github.com/apache/beam/blob/master/sdks/python/apache_beam/ml/inference/gemini_inference.py) since routing generative calls through the Vertex API is being removed next summer.
On Wed, Oct 22, 2025 at 10:27 AM Danny McCormick via user < [email protected]> wrote: > Yep, that guide looks good to me. > https://github.com/apache/beam/blob/master/examples/notebooks/beam-ml/run_inference_vertex_ai.ipynb > is > normally the doc page I'd recommend for this. > > On Tue, Oct 21, 2025 at 4:29 PM Marc _ <[email protected]> wrote: > >> Thanks XQ/Danny, works like a charm! >> One further question - apologies i should probably check docs - >> I want now to go to the next step and deploy my own Agent on VertexAI and >> call it from dataflow. >> Am i correct that for this usecase the sample to follow is this one? >> >> >> https://github.com/blueviggen/beam-remote-llm-examples/blob/main/gemma_runinference_example.ipynb >> >> If not, do you have other examples? >> >> Kind regards >> Marco >> >> >> >> On Sat, Oct 18, 2025 at 1:50 PM XQ Hu <[email protected]> wrote: >> >>> This happens a lot for the local Prism runner. Have you tried to run >>> this with Dataflow? >>> >>> On Sat, Oct 18, 2025 at 7:01 AM Marc _ <[email protected]> wrote: >>> >>>> HI Danny >>>> thank a lot!.. worked as a charm for some simple instructions and >>>> 'low tokens request' >>>> But , when using a complex system instruction which requires some work >>>> on the Model side i am getting back this >>>> >>>> Traceback (most recent call last): >>>> File "C:\Users\Marco\envs\obb-dataflow\Lib\threading.py", line 1045, >>>> in _bootstrap_inner >>>> self.run() >>>> File "C:\Users\Marco\envs\obb-dataflow\Lib\threading.py", line 982, >>>> in run >>>> self._target(*self._args, **self._kwargs) >>>> File >>>> "C:\Users\Marco\envs\obb-dataflow\Lib\site-packages\apache_beam\runners\portability\portable_runner.py", >>>> line 533, in read_messages >>>> for message in self._message_stream: >>>> File >>>> "C:\Users\Marco\envs\obb-dataflow\Lib\site-packages\grpc\_channel.py", line >>>> 543, in __next__ >>>> return self._next() >>>> ^^^^^^^^^^^^ >>>> File >>>> "C:\Users\Marco\envs\obb-dataflow\Lib\site-packages\grpc\_channel.py", line >>>> 969, in _next >>>> raise self >>>> grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of >>>> RPC that terminated with: >>>> status = StatusCode.DEADLINE_EXCEEDED >>>> details = "Deadline Exceeded" >>>> debug_error_string = "UNKNOWN:Error received from peer >>>> {grpc_message:"Deadline Exceeded", grpc_status:4, >>>> created_time:"2025-10-18T10:58:33.9731013+00:00"}" >>>> > >>>> >>>> Any idea on how i can get around it> can i somehow control how much to >>>> wait for a Model response? >>>> >>>> Kind regards >>>> Marco >>>> >>>> >>>> >>>> >>>> On Mon, Oct 13, 2025 at 4:43 PM Danny McCormick < >>>> [email protected]> wrote: >>>> >>>>> The model handler lets you pass in any inference function you want. >>>>> For example, the notebook uses the default one defined here - >>>>> https://github.com/apache/beam/blob/08b480000ec859292d0f7bbadafb72328d3e9e16/sdks/python/apache_beam/ml/inference/gemini_inference.py#L54 >>>>> >>>>> So you could define an inference function which passes in additional >>>>> config, e.g. from https://pypi.org/project/google-genai/ >>>>> >>>>> ``` >>>>> config=types.GenerateContentConfig( >>>>> system_instruction='I say high, you say low', >>>>> max_output_tokens=3, >>>>> temperature=0.3, >>>>> ), >>>>> ``` >>>>> >>>>> On Sat, Oct 11, 2025 at 3:46 PM Marc _ <[email protected]> wrote: >>>>> >>>>>> Hello >>>>>> i m a muppet. Did not notice this in the colab sample >>>>>> https://github.com/blueviggen/beam-remote-llm-examples/blob/main/gemini_runinference_example.ipynb >>>>>> >>>>>> # Only supported for genai package 1.21.1 or earlier >>>>>> output_text = gemini_response.content.parts[0].text >>>>>> >>>>>> Using that package i can run sample pipeline on my local machine >>>>>> I have further questions on the GeminiModelHandler as i could not >>>>>> find anything via google.. >>>>>> >>>>>> How can i specify system instructions ? I was able to do so with the >>>>>> OpenAIHandler - below a snippet copied from a prev mail Danny sent me few >>>>>> months ago >>>>>> >>>>>> class SampleOpenAIHandler(ModelHandler): >>>>>> """DoFn that accepts a batch of images as bytearray >>>>>> and sends that batch to the Cloud Vision API for remote inference""" >>>>>> def __init__(self, oai_key, llm_instructions): >>>>>> self.oai_key = oai_key >>>>>> self.llm_instructions = llm_instructions >>>>>> >>>>>> def load_model(self): >>>>>> """Initiate the Google Vision API client.""" >>>>>> """Initiate the OAI API client.""" >>>>>> client = openai.OpenAI( >>>>>> # This is the default and can be omitted >>>>>> api_key=self.oai_key, >>>>>> ) >>>>>> return client >>>>>> >>>>>> >>>>>> def run_inference(self, batch, model, inference): >>>>>> >>>>>> >>>>>> response = model.responses.create( >>>>>> model="gpt-4o", >>>>>> instructions=self.llm_instructions, >>>>>> input=batch[0], >>>>>> ) >>>>>> return [response.output_text] >>>>>> >>>>>> >>>>>> >>>>>> Kind regards >>>>>> Marco >>>>>> >>>>>> >>>>>> On Fri, Oct 10, 2025 at 10:51 PM Marc _ <[email protected]> wrote: >>>>>> >>>>>>> Danny / XQ >>>>>>> got some setback. >>>>>>> I copied the colab gemini sample,. >>>>>>> https://github.com/blueviggen/beam-remote-llm-examples/blob/main/gemini_runinference_example.ipynb >>>>>>> >>>>>>> I Have added the following lines to post processor to figure out >>>>>>> what is going on >>>>>>> >>>>>>> input_prompt = element.example >>>>>>> >>>>>>> # The API response is in `element.inference` >>>>>>> # Path to text: response -> candidates -> content -> parts -> text >>>>>>> gemini_inference = element.inference >>>>>>> print(f'element.inference is {gemini_inference}') >>>>>>> print(gemini_inference[1]) >>>>>>> >>>>>>> The code works fine in colab and i can see that the response matches >>>>>>> the docs >>>>>>> element.inference is ('candidates', >>>>>>> [Candidate(content=Content(parts=[Part(video_metadata=None, >>>>>>> thought=None, >>>>>>> inline_data=None, file_data=None, thought_signature=None, >>>>>>> code_execution_result=None, executable_code=None, function_call=None, >>>>>>> function_response=None, text='```json\n{\n "question": "What is 1+2?",\n >>>>>>> "answer": 3\n}\n```')], role='model'), citation_metadata=None, >>>>>>> finish_message=None, token_count=None, finish_reason=<FinishReason.STOP: >>>>>>> 'STOP'>, url_context_metadata=None, avg_logprobs=None, >>>>>>> grounding_metadata=None, index=0, logprobs_result=None, >>>>>>> safety_ratings=None)]) >>>>>>> >>>>>>> But when i run the same pipeline on GCP DataFlow, it seems i can >>>>>>> only capture the first HttpResponse with all headers. the rest of the >>>>>>> response is gone....... >>>>>>> Plus, the response is not a Candidate but an HttpResponse and >>>>>>> hence the following code fails miserably >>>>>>> >>>>>>> Is it because GCP is running multiple workers and somehow i am only >>>>>>> capturing the first 'streamed' response from the model? >>>>>>> >>>>>>> Kind regards >>>>>>> Marco >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> element.inference is ('sdk_http_response', HttpResponse( >>>>>>> headers=<dict len=10> )) >>>>>>> >>>>>>> <https://console.cloud.google.com/logs/query;query=resource.type%3D%22dataflow_step%22%20resource.labels.job_id%3D%222025-10-10_14_40_13-2699715226748068892%22%20logName%3D%22projects%2Fdatascience-projects%2Flogs%2Fdataflow.googleapis.com%252Fworker%22%20resource.labels.step_id%3D%2528%22PostProcess%22%2529%20timestamp%20%3E%3D%20%222025-10-10T21:40:13.801Z%22%20timestamp%20%3C%3D%20%222025-10-10T21:46:28.995Z%22%20severity%3E%3DDEFAULT;timeRange=2025-10-10T21:44:25.660899877Z%2F2025-10-10T21:44:25.660899877Z--PT1H;storageScope=project;pinnedLogId=2025-10-10T21:44:25.660899877Z%2F6182574922792223526:174769:0:14686?hl=en&project=datascience-projects> >>>>>>> >>>>>>> <https://console.cloud.google.com/logs/query;query=resource.type%3D%22dataflow_step%22%20resource.labels.job_id%3D%222025-10-10_14_40_13-2699715226748068892%22%20logName%3D%22projects%2Fdatascience-projects%2Flogs%2Fdataflow.googleapis.com%252Fworker%22%20resource.labels.step_id%3D%2528%22PostProcess%22%2529%20timestamp%20%3E%3D%20%222025-10-10T21:40:13.801Z%22%20timestamp%20%3C%3D%20%222025-10-10T21:46:28.995Z%22%20severity%3E%3DDEFAULT;timeRange=2025-10-10T21:44:25.660899877Z%2F2025-10-10T21:44:25.660899877Z--PT1H;storageScope=project;pinnedLogId=2025-10-10T21:44:25.660899877Z%2F6182574922792223526:174769:0:14686?hl=en&project=datascience-projects> >>>>>>> >>>>>>> <https://console.cloud.google.com/logs/query;query=resource.type%3D%22dataflow_step%22%20resource.labels.job_id%3D%222025-10-10_14_40_13-2699715226748068892%22%20logName%3D%22projects%2Fdatascience-projects%2Flogs%2Fdataflow.googleapis.com%252Fworker%22%20resource.labels.step_id%3D%2528%22PostProcess%22%2529%20timestamp%20%3E%3D%20%222025-10-10T21:40:13.801Z%22%20timestamp%20%3C%3D%20%222025-10-10T21:46:28.995Z%22%20severity%3E%3DDEFAULT;timeRange=2025-10-10T21:44:25.660899877Z%2F2025-10-10T21:44:25.660899877Z--PT1H;storageScope=project;pinnedLogId=2025-10-10T21:44:25.660899877Z%2F6182574922792223526:174769:0:14686?hl=en&project=datascience-projects> >>>>>>> >>>>>>> <https://console.cloud.google.com/logs/query;query=resource.type%3D%22dataflow_step%22%20resource.labels.job_id%3D%222025-10-10_14_40_13-2699715226748068892%22%20logName%3D%22projects%2Fdatascience-projects%2Flogs%2Fdataflow.googleapis.com%252Fworker%22%20resource.labels.step_id%3D%2528%22PostProcess%22%2529%20timestamp%20%3E%3D%20%222025-10-10T21:40:13.801Z%22%20timestamp%20%3C%3D%20%222025-10-10T21:46:28.995Z%22%20severity%3E%3DDEFAULT;timeRange=2025-10-10T21:44:25.660899877Z%2F2025-10-10T21:44:25.660899877Z--PT1H;storageScope=project;pinnedLogId=2025-10-10T21:44:25.660899877Z%2F6182574922792223526:174769:0:14686?hl=en&project=datascience-projects> >>>>>>> >>>>>>> <https://console.cloud.google.com/logs/query;query=resource.type%3D%22dataflow_step%22%20resource.labels.job_id%3D%222025-10-10_14_40_13-2699715226748068892%22%20logName%3D%22projects%2Fdatascience-projects%2Flogs%2Fdataflow.googleapis.com%252Fworker%22%20resource.labels.step_id%3D%2528%22PostProcess%22%2529%20timestamp%20%3E%3D%20%222025-10-10T21:40:13.801Z%22%20timestamp%20%3C%3D%20%222025-10-10T21:46:28.995Z%22%20severity%3E%3DDEFAULT;timeRange=2025-10-10T21:44:25.660899877Z%2F2025-10-10T21:44:25.660899877Z--PT1H;storageScope=project;pinnedLogId=2025-10-10T21:44:25.660899877Z%2F6182574922792223526:174769:0:14686?hl=en&project=datascience-projects> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Thu, Oct 9, 2025 at 2:51 PM Marc _ <[email protected]> wrote: >>>>>>> >>>>>>>> thanks Danny / XQ, will have a look and report back if i am stuck >>>>>>>> kr >>>>>>>> >>>>>>>> On Thu, Oct 9, 2025 at 2:22 PM Danny McCormick via user < >>>>>>>> [email protected]> wrote: >>>>>>>> >>>>>>>>> If you're using the Dataflow runner, >>>>>>>>> https://cloud.google.com/dataflow/docs/concepts/security-and-permissions#permissions >>>>>>>>> has >>>>>>>>> some info as well - basically you'd want to make sure the worker >>>>>>>>> service >>>>>>>>> account has access to the Vertex endpoint you're using. >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> Danny >>>>>>>>> >>>>>>>>> On Thu, Oct 9, 2025 at 9:18 AM XQ Hu via user < >>>>>>>>> [email protected]> wrote: >>>>>>>>> >>>>>>>>>> I think >>>>>>>>>> https://cloud.google.com/dataflow/docs/notebooks/run_inference_vertex_ai >>>>>>>>>> has more details for you to get started. >>>>>>>>>> >>>>>>>>>> On Thu, Oct 9, 2025 at 7:14 AM Marc _ <[email protected]> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hello all >>>>>>>>>>> i want to port this example to a real dataflow pipeline i am >>>>>>>>>>> running, as i want to move from >>>>>>>>>>> OpenAI to dataflow >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> https://github.com/blueviggen/beam-remote-llm-examples/blob/main/gemma_runinference_example.ipynb >>>>>>>>>>> >>>>>>>>>>> Could anyone advise on the authentication side for accessing >>>>>>>>>>> VertexAI? >>>>>>>>>>> >>>>>>>>>>> Kind regards >>>>>>>>>>> Marco >>>>>>>>>>> >>>>>>>>>>
