Yep, that guide looks good to me. https://github.com/apache/beam/blob/master/examples/notebooks/beam-ml/run_inference_vertex_ai.ipynb is normally the doc page I'd recommend for this.
On Tue, Oct 21, 2025 at 4:29 PM Marc _ <[email protected]> wrote: > Thanks XQ/Danny, works like a charm! > One further question - apologies i should probably check docs - > I want now to go to the next step and deploy my own Agent on VertexAI and > call it from dataflow. > Am i correct that for this usecase the sample to follow is this one? > > > https://github.com/blueviggen/beam-remote-llm-examples/blob/main/gemma_runinference_example.ipynb > > If not, do you have other examples? > > Kind regards > Marco > > > > On Sat, Oct 18, 2025 at 1:50 PM XQ Hu <[email protected]> wrote: > >> This happens a lot for the local Prism runner. Have you tried to run this >> with Dataflow? >> >> On Sat, Oct 18, 2025 at 7:01 AM Marc _ <[email protected]> wrote: >> >>> HI Danny >>> thank a lot!.. worked as a charm for some simple instructions and 'low >>> tokens request' >>> But , when using a complex system instruction which requires some work >>> on the Model side i am getting back this >>> >>> Traceback (most recent call last): >>> File "C:\Users\Marco\envs\obb-dataflow\Lib\threading.py", line 1045, >>> in _bootstrap_inner >>> self.run() >>> File "C:\Users\Marco\envs\obb-dataflow\Lib\threading.py", line 982, in >>> run >>> self._target(*self._args, **self._kwargs) >>> File >>> "C:\Users\Marco\envs\obb-dataflow\Lib\site-packages\apache_beam\runners\portability\portable_runner.py", >>> line 533, in read_messages >>> for message in self._message_stream: >>> File >>> "C:\Users\Marco\envs\obb-dataflow\Lib\site-packages\grpc\_channel.py", line >>> 543, in __next__ >>> return self._next() >>> ^^^^^^^^^^^^ >>> File >>> "C:\Users\Marco\envs\obb-dataflow\Lib\site-packages\grpc\_channel.py", line >>> 969, in _next >>> raise self >>> grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC >>> that terminated with: >>> status = StatusCode.DEADLINE_EXCEEDED >>> details = "Deadline Exceeded" >>> debug_error_string = "UNKNOWN:Error received from peer >>> {grpc_message:"Deadline Exceeded", grpc_status:4, >>> created_time:"2025-10-18T10:58:33.9731013+00:00"}" >>> > >>> >>> Any idea on how i can get around it> can i somehow control how much to >>> wait for a Model response? >>> >>> Kind regards >>> Marco >>> >>> >>> >>> >>> On Mon, Oct 13, 2025 at 4:43 PM Danny McCormick < >>> [email protected]> wrote: >>> >>>> The model handler lets you pass in any inference function you want. For >>>> example, the notebook uses the default one defined here - >>>> https://github.com/apache/beam/blob/08b480000ec859292d0f7bbadafb72328d3e9e16/sdks/python/apache_beam/ml/inference/gemini_inference.py#L54 >>>> >>>> So you could define an inference function which passes in additional >>>> config, e.g. from https://pypi.org/project/google-genai/ >>>> >>>> ``` >>>> config=types.GenerateContentConfig( >>>> system_instruction='I say high, you say low', >>>> max_output_tokens=3, >>>> temperature=0.3, >>>> ), >>>> ``` >>>> >>>> On Sat, Oct 11, 2025 at 3:46 PM Marc _ <[email protected]> wrote: >>>> >>>>> Hello >>>>> i m a muppet. Did not notice this in the colab sample >>>>> https://github.com/blueviggen/beam-remote-llm-examples/blob/main/gemini_runinference_example.ipynb >>>>> >>>>> # Only supported for genai package 1.21.1 or earlier >>>>> output_text = gemini_response.content.parts[0].text >>>>> >>>>> Using that package i can run sample pipeline on my local machine >>>>> I have further questions on the GeminiModelHandler as i could not find >>>>> anything via google.. >>>>> >>>>> How can i specify system instructions ? I was able to do so with the >>>>> OpenAIHandler - below a snippet copied from a prev mail Danny sent me few >>>>> months ago >>>>> >>>>> class SampleOpenAIHandler(ModelHandler): >>>>> """DoFn that accepts a batch of images as bytearray >>>>> and sends that batch to the Cloud Vision API for remote inference""" >>>>> def __init__(self, oai_key, llm_instructions): >>>>> self.oai_key = oai_key >>>>> self.llm_instructions = llm_instructions >>>>> >>>>> def load_model(self): >>>>> """Initiate the Google Vision API client.""" >>>>> """Initiate the OAI API client.""" >>>>> client = openai.OpenAI( >>>>> # This is the default and can be omitted >>>>> api_key=self.oai_key, >>>>> ) >>>>> return client >>>>> >>>>> >>>>> def run_inference(self, batch, model, inference): >>>>> >>>>> >>>>> response = model.responses.create( >>>>> model="gpt-4o", >>>>> instructions=self.llm_instructions, >>>>> input=batch[0], >>>>> ) >>>>> return [response.output_text] >>>>> >>>>> >>>>> >>>>> Kind regards >>>>> Marco >>>>> >>>>> >>>>> On Fri, Oct 10, 2025 at 10:51 PM Marc _ <[email protected]> wrote: >>>>> >>>>>> Danny / XQ >>>>>> got some setback. >>>>>> I copied the colab gemini sample,. >>>>>> https://github.com/blueviggen/beam-remote-llm-examples/blob/main/gemini_runinference_example.ipynb >>>>>> >>>>>> I Have added the following lines to post processor to figure out what >>>>>> is going on >>>>>> >>>>>> input_prompt = element.example >>>>>> >>>>>> # The API response is in `element.inference` >>>>>> # Path to text: response -> candidates -> content -> parts -> text >>>>>> gemini_inference = element.inference >>>>>> print(f'element.inference is {gemini_inference}') >>>>>> print(gemini_inference[1]) >>>>>> >>>>>> The code works fine in colab and i can see that the response matches >>>>>> the docs >>>>>> element.inference is ('candidates', >>>>>> [Candidate(content=Content(parts=[Part(video_metadata=None, thought=None, >>>>>> inline_data=None, file_data=None, thought_signature=None, >>>>>> code_execution_result=None, executable_code=None, function_call=None, >>>>>> function_response=None, text='```json\n{\n "question": "What is 1+2?",\n >>>>>> "answer": 3\n}\n```')], role='model'), citation_metadata=None, >>>>>> finish_message=None, token_count=None, finish_reason=<FinishReason.STOP: >>>>>> 'STOP'>, url_context_metadata=None, avg_logprobs=None, >>>>>> grounding_metadata=None, index=0, logprobs_result=None, >>>>>> safety_ratings=None)]) >>>>>> >>>>>> But when i run the same pipeline on GCP DataFlow, it seems i can >>>>>> only capture the first HttpResponse with all headers. the rest of the >>>>>> response is gone....... >>>>>> Plus, the response is not a Candidate but an HttpResponse and hence >>>>>> the following code fails miserably >>>>>> >>>>>> Is it because GCP is running multiple workers and somehow i am only >>>>>> capturing the first 'streamed' response from the model? >>>>>> >>>>>> Kind regards >>>>>> Marco >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> element.inference is ('sdk_http_response', HttpResponse( >>>>>> headers=<dict len=10> )) >>>>>> >>>>>> <https://console.cloud.google.com/logs/query;query=resource.type%3D%22dataflow_step%22%20resource.labels.job_id%3D%222025-10-10_14_40_13-2699715226748068892%22%20logName%3D%22projects%2Fdatascience-projects%2Flogs%2Fdataflow.googleapis.com%252Fworker%22%20resource.labels.step_id%3D%2528%22PostProcess%22%2529%20timestamp%20%3E%3D%20%222025-10-10T21:40:13.801Z%22%20timestamp%20%3C%3D%20%222025-10-10T21:46:28.995Z%22%20severity%3E%3DDEFAULT;timeRange=2025-10-10T21:44:25.660899877Z%2F2025-10-10T21:44:25.660899877Z--PT1H;storageScope=project;pinnedLogId=2025-10-10T21:44:25.660899877Z%2F6182574922792223526:174769:0:14686?hl=en&project=datascience-projects> >>>>>> >>>>>> <https://console.cloud.google.com/logs/query;query=resource.type%3D%22dataflow_step%22%20resource.labels.job_id%3D%222025-10-10_14_40_13-2699715226748068892%22%20logName%3D%22projects%2Fdatascience-projects%2Flogs%2Fdataflow.googleapis.com%252Fworker%22%20resource.labels.step_id%3D%2528%22PostProcess%22%2529%20timestamp%20%3E%3D%20%222025-10-10T21:40:13.801Z%22%20timestamp%20%3C%3D%20%222025-10-10T21:46:28.995Z%22%20severity%3E%3DDEFAULT;timeRange=2025-10-10T21:44:25.660899877Z%2F2025-10-10T21:44:25.660899877Z--PT1H;storageScope=project;pinnedLogId=2025-10-10T21:44:25.660899877Z%2F6182574922792223526:174769:0:14686?hl=en&project=datascience-projects> >>>>>> >>>>>> <https://console.cloud.google.com/logs/query;query=resource.type%3D%22dataflow_step%22%20resource.labels.job_id%3D%222025-10-10_14_40_13-2699715226748068892%22%20logName%3D%22projects%2Fdatascience-projects%2Flogs%2Fdataflow.googleapis.com%252Fworker%22%20resource.labels.step_id%3D%2528%22PostProcess%22%2529%20timestamp%20%3E%3D%20%222025-10-10T21:40:13.801Z%22%20timestamp%20%3C%3D%20%222025-10-10T21:46:28.995Z%22%20severity%3E%3DDEFAULT;timeRange=2025-10-10T21:44:25.660899877Z%2F2025-10-10T21:44:25.660899877Z--PT1H;storageScope=project;pinnedLogId=2025-10-10T21:44:25.660899877Z%2F6182574922792223526:174769:0:14686?hl=en&project=datascience-projects> >>>>>> >>>>>> <https://console.cloud.google.com/logs/query;query=resource.type%3D%22dataflow_step%22%20resource.labels.job_id%3D%222025-10-10_14_40_13-2699715226748068892%22%20logName%3D%22projects%2Fdatascience-projects%2Flogs%2Fdataflow.googleapis.com%252Fworker%22%20resource.labels.step_id%3D%2528%22PostProcess%22%2529%20timestamp%20%3E%3D%20%222025-10-10T21:40:13.801Z%22%20timestamp%20%3C%3D%20%222025-10-10T21:46:28.995Z%22%20severity%3E%3DDEFAULT;timeRange=2025-10-10T21:44:25.660899877Z%2F2025-10-10T21:44:25.660899877Z--PT1H;storageScope=project;pinnedLogId=2025-10-10T21:44:25.660899877Z%2F6182574922792223526:174769:0:14686?hl=en&project=datascience-projects> >>>>>> >>>>>> <https://console.cloud.google.com/logs/query;query=resource.type%3D%22dataflow_step%22%20resource.labels.job_id%3D%222025-10-10_14_40_13-2699715226748068892%22%20logName%3D%22projects%2Fdatascience-projects%2Flogs%2Fdataflow.googleapis.com%252Fworker%22%20resource.labels.step_id%3D%2528%22PostProcess%22%2529%20timestamp%20%3E%3D%20%222025-10-10T21:40:13.801Z%22%20timestamp%20%3C%3D%20%222025-10-10T21:46:28.995Z%22%20severity%3E%3DDEFAULT;timeRange=2025-10-10T21:44:25.660899877Z%2F2025-10-10T21:44:25.660899877Z--PT1H;storageScope=project;pinnedLogId=2025-10-10T21:44:25.660899877Z%2F6182574922792223526:174769:0:14686?hl=en&project=datascience-projects> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Thu, Oct 9, 2025 at 2:51 PM Marc _ <[email protected]> wrote: >>>>>> >>>>>>> thanks Danny / XQ, will have a look and report back if i am stuck >>>>>>> kr >>>>>>> >>>>>>> On Thu, Oct 9, 2025 at 2:22 PM Danny McCormick via user < >>>>>>> [email protected]> wrote: >>>>>>> >>>>>>>> If you're using the Dataflow runner, >>>>>>>> https://cloud.google.com/dataflow/docs/concepts/security-and-permissions#permissions >>>>>>>> has >>>>>>>> some info as well - basically you'd want to make sure the worker >>>>>>>> service >>>>>>>> account has access to the Vertex endpoint you're using. >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Danny >>>>>>>> >>>>>>>> On Thu, Oct 9, 2025 at 9:18 AM XQ Hu via user <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> I think >>>>>>>>> https://cloud.google.com/dataflow/docs/notebooks/run_inference_vertex_ai >>>>>>>>> has more details for you to get started. >>>>>>>>> >>>>>>>>> On Thu, Oct 9, 2025 at 7:14 AM Marc _ <[email protected]> wrote: >>>>>>>>> >>>>>>>>>> Hello all >>>>>>>>>> i want to port this example to a real dataflow pipeline i am >>>>>>>>>> running, as i want to move from >>>>>>>>>> OpenAI to dataflow >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> https://github.com/blueviggen/beam-remote-llm-examples/blob/main/gemma_runinference_example.ipynb >>>>>>>>>> >>>>>>>>>> Could anyone advise on the authentication side for accessing >>>>>>>>>> VertexAI? >>>>>>>>>> >>>>>>>>>> Kind regards >>>>>>>>>> Marco >>>>>>>>>> >>>>>>>>>
