Thanks XQ/Danny, works like a charm!
One further question - apologies i should probably check docs -
I want now to go to the next step and deploy my own Agent on VertexAI and
call it from dataflow.
Am i correct that for this usecase the sample to follow is this one?

https://github.com/blueviggen/beam-remote-llm-examples/blob/main/gemma_runinference_example.ipynb

If not, do you have other examples?

Kind regards
  Marco



On Sat, Oct 18, 2025 at 1:50 PM XQ Hu <[email protected]> wrote:

> This happens a lot for the local Prism runner. Have you tried to run this
> with Dataflow?
>
> On Sat, Oct 18, 2025 at 7:01 AM Marc _ <[email protected]> wrote:
>
>> HI Danny
>>  thank a lot!..  worked as a charm for some simple instructions and 'low
>> tokens request'
>> But , when using a complex system instruction which requires some work on
>> the Model side i am getting back this
>>
>> Traceback (most recent call last):
>>   File "C:\Users\Marco\envs\obb-dataflow\Lib\threading.py", line 1045, in
>> _bootstrap_inner
>>     self.run()
>>   File "C:\Users\Marco\envs\obb-dataflow\Lib\threading.py", line 982, in
>> run
>>     self._target(*self._args, **self._kwargs)
>>   File
>> "C:\Users\Marco\envs\obb-dataflow\Lib\site-packages\apache_beam\runners\portability\portable_runner.py",
>> line 533, in read_messages
>>     for message in self._message_stream:
>>   File
>> "C:\Users\Marco\envs\obb-dataflow\Lib\site-packages\grpc\_channel.py", line
>> 543, in __next__
>>     return self._next()
>>            ^^^^^^^^^^^^
>>   File
>> "C:\Users\Marco\envs\obb-dataflow\Lib\site-packages\grpc\_channel.py", line
>> 969, in _next
>>     raise self
>> grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC
>> that terminated with:
>> status = StatusCode.DEADLINE_EXCEEDED
>> details = "Deadline Exceeded"
>> debug_error_string = "UNKNOWN:Error received from peer
>>  {grpc_message:"Deadline Exceeded", grpc_status:4,
>> created_time:"2025-10-18T10:58:33.9731013+00:00"}"
>> >
>>
>> Any idea on how i can get around it> can i somehow control how much to
>> wait for a Model response?
>>
>> Kind regards
>>   Marco
>>
>>
>>
>>
>> On Mon, Oct 13, 2025 at 4:43 PM Danny McCormick <
>> [email protected]> wrote:
>>
>>> The model handler lets you pass in any inference function you want. For
>>> example, the notebook uses the default one defined here -
>>> https://github.com/apache/beam/blob/08b480000ec859292d0f7bbadafb72328d3e9e16/sdks/python/apache_beam/ml/inference/gemini_inference.py#L54
>>>
>>> So you could define an inference function which passes in additional
>>> config, e.g. from https://pypi.org/project/google-genai/
>>>
>>> ```
>>> config=types.GenerateContentConfig(
>>>         system_instruction='I say high, you say low',
>>>         max_output_tokens=3,
>>>         temperature=0.3,
>>>     ),
>>> ```
>>>
>>> On Sat, Oct 11, 2025 at 3:46 PM Marc _ <[email protected]> wrote:
>>>
>>>> Hello
>>>>  i m a muppet. Did not notice this in the colab sample
>>>> https://github.com/blueviggen/beam-remote-llm-examples/blob/main/gemini_runinference_example.ipynb
>>>>
>>>> # Only supported for genai package 1.21.1 or earlier
>>>> output_text = gemini_response.content.parts[0].text
>>>>
>>>> Using that package i can run sample pipeline on my local machine
>>>> I have further questions on the GeminiModelHandler as i could not find
>>>> anything via google..
>>>>
>>>> How can i specify system instructions ? I was able to do so with the
>>>> OpenAIHandler - below a snippet copied from a prev mail Danny sent me few
>>>> months ago
>>>>
>>>> class SampleOpenAIHandler(ModelHandler):
>>>>   """DoFn that accepts a batch of images as bytearray
>>>>   and sends that batch to the Cloud Vision API for remote inference"""
>>>>   def __init__(self, oai_key, llm_instructions):
>>>>       self.oai_key = oai_key
>>>>       self.llm_instructions = llm_instructions
>>>>
>>>>   def load_model(self):
>>>>     """Initiate the Google Vision API client."""
>>>>     """Initiate the OAI API client."""
>>>>     client =  openai.OpenAI(
>>>>     # This is the default and can be omitted
>>>>         api_key=self.oai_key,
>>>>     )
>>>>     return client
>>>>
>>>>
>>>>   def run_inference(self, batch, model, inference):
>>>>
>>>>
>>>>     response = model.responses.create(
>>>>           model="gpt-4o",
>>>>           instructions=self.llm_instructions,
>>>>           input=batch[0],
>>>>       )
>>>>     return [response.output_text]
>>>>
>>>>
>>>>
>>>> Kind regards
>>>> Marco
>>>>
>>>>
>>>> On Fri, Oct 10, 2025 at 10:51 PM Marc _ <[email protected]> wrote:
>>>>
>>>>> Danny / XQ
>>>>>   got some setback.
>>>>> I copied the colab gemini sample,.
>>>>> https://github.com/blueviggen/beam-remote-llm-examples/blob/main/gemini_runinference_example.ipynb
>>>>>
>>>>> I Have added the following lines to post processor to figure out what
>>>>> is going on
>>>>>
>>>>> input_prompt = element.example
>>>>>
>>>>> # The API response is in `element.inference`
>>>>> # Path to text: response -> candidates -> content -> parts -> text
>>>>> gemini_inference = element.inference
>>>>> print(f'element.inference is {gemini_inference}')
>>>>> print(gemini_inference[1])
>>>>>
>>>>> The code works fine in colab and i can see that the response matches
>>>>> the docs
>>>>> element.inference is ('candidates',
>>>>> [Candidate(content=Content(parts=[Part(video_metadata=None, thought=None,
>>>>> inline_data=None, file_data=None, thought_signature=None,
>>>>> code_execution_result=None, executable_code=None, function_call=None,
>>>>> function_response=None, text='```json\n{\n "question": "What is 1+2?",\n
>>>>> "answer": 3\n}\n```')], role='model'), citation_metadata=None,
>>>>> finish_message=None, token_count=None, finish_reason=<FinishReason.STOP:
>>>>> 'STOP'>, url_context_metadata=None, avg_logprobs=None,
>>>>> grounding_metadata=None, index=0, logprobs_result=None,
>>>>> safety_ratings=None)])
>>>>>
>>>>> But when i run the same pipeline on GCP DataFlow, it seems i can
>>>>> only capture the first HttpResponse with all headers. the rest of the
>>>>> response is gone.......
>>>>> Plus, the response is not  a Candidate  but an HttpResponse and hence
>>>>> the following code fails miserably
>>>>>
>>>>> Is it because GCP is running multiple workers and somehow i am only
>>>>> capturing the first 'streamed' response from the model?
>>>>>
>>>>> Kind regards
>>>>>   Marco
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> element.inference is ('sdk_http_response', HttpResponse( headers=<dict
>>>>> len=10> ))
>>>>>
>>>>> <https://console.cloud.google.com/logs/query;query=resource.type%3D%22dataflow_step%22%20resource.labels.job_id%3D%222025-10-10_14_40_13-2699715226748068892%22%20logName%3D%22projects%2Fdatascience-projects%2Flogs%2Fdataflow.googleapis.com%252Fworker%22%20resource.labels.step_id%3D%2528%22PostProcess%22%2529%20timestamp%20%3E%3D%20%222025-10-10T21:40:13.801Z%22%20timestamp%20%3C%3D%20%222025-10-10T21:46:28.995Z%22%20severity%3E%3DDEFAULT;timeRange=2025-10-10T21:44:25.660899877Z%2F2025-10-10T21:44:25.660899877Z--PT1H;storageScope=project;pinnedLogId=2025-10-10T21:44:25.660899877Z%2F6182574922792223526:174769:0:14686?hl=en&project=datascience-projects>
>>>>>
>>>>> <https://console.cloud.google.com/logs/query;query=resource.type%3D%22dataflow_step%22%20resource.labels.job_id%3D%222025-10-10_14_40_13-2699715226748068892%22%20logName%3D%22projects%2Fdatascience-projects%2Flogs%2Fdataflow.googleapis.com%252Fworker%22%20resource.labels.step_id%3D%2528%22PostProcess%22%2529%20timestamp%20%3E%3D%20%222025-10-10T21:40:13.801Z%22%20timestamp%20%3C%3D%20%222025-10-10T21:46:28.995Z%22%20severity%3E%3DDEFAULT;timeRange=2025-10-10T21:44:25.660899877Z%2F2025-10-10T21:44:25.660899877Z--PT1H;storageScope=project;pinnedLogId=2025-10-10T21:44:25.660899877Z%2F6182574922792223526:174769:0:14686?hl=en&project=datascience-projects>
>>>>>
>>>>> <https://console.cloud.google.com/logs/query;query=resource.type%3D%22dataflow_step%22%20resource.labels.job_id%3D%222025-10-10_14_40_13-2699715226748068892%22%20logName%3D%22projects%2Fdatascience-projects%2Flogs%2Fdataflow.googleapis.com%252Fworker%22%20resource.labels.step_id%3D%2528%22PostProcess%22%2529%20timestamp%20%3E%3D%20%222025-10-10T21:40:13.801Z%22%20timestamp%20%3C%3D%20%222025-10-10T21:46:28.995Z%22%20severity%3E%3DDEFAULT;timeRange=2025-10-10T21:44:25.660899877Z%2F2025-10-10T21:44:25.660899877Z--PT1H;storageScope=project;pinnedLogId=2025-10-10T21:44:25.660899877Z%2F6182574922792223526:174769:0:14686?hl=en&project=datascience-projects>
>>>>>
>>>>> <https://console.cloud.google.com/logs/query;query=resource.type%3D%22dataflow_step%22%20resource.labels.job_id%3D%222025-10-10_14_40_13-2699715226748068892%22%20logName%3D%22projects%2Fdatascience-projects%2Flogs%2Fdataflow.googleapis.com%252Fworker%22%20resource.labels.step_id%3D%2528%22PostProcess%22%2529%20timestamp%20%3E%3D%20%222025-10-10T21:40:13.801Z%22%20timestamp%20%3C%3D%20%222025-10-10T21:46:28.995Z%22%20severity%3E%3DDEFAULT;timeRange=2025-10-10T21:44:25.660899877Z%2F2025-10-10T21:44:25.660899877Z--PT1H;storageScope=project;pinnedLogId=2025-10-10T21:44:25.660899877Z%2F6182574922792223526:174769:0:14686?hl=en&project=datascience-projects>
>>>>>
>>>>> <https://console.cloud.google.com/logs/query;query=resource.type%3D%22dataflow_step%22%20resource.labels.job_id%3D%222025-10-10_14_40_13-2699715226748068892%22%20logName%3D%22projects%2Fdatascience-projects%2Flogs%2Fdataflow.googleapis.com%252Fworker%22%20resource.labels.step_id%3D%2528%22PostProcess%22%2529%20timestamp%20%3E%3D%20%222025-10-10T21:40:13.801Z%22%20timestamp%20%3C%3D%20%222025-10-10T21:46:28.995Z%22%20severity%3E%3DDEFAULT;timeRange=2025-10-10T21:44:25.660899877Z%2F2025-10-10T21:44:25.660899877Z--PT1H;storageScope=project;pinnedLogId=2025-10-10T21:44:25.660899877Z%2F6182574922792223526:174769:0:14686?hl=en&project=datascience-projects>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Thu, Oct 9, 2025 at 2:51 PM Marc _ <[email protected]> wrote:
>>>>>
>>>>>> thanks Danny / XQ, will have a look and report back if i am stuck
>>>>>> kr
>>>>>>
>>>>>> On Thu, Oct 9, 2025 at 2:22 PM Danny McCormick via user <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>>> If you're using the Dataflow runner,
>>>>>>> https://cloud.google.com/dataflow/docs/concepts/security-and-permissions#permissions
>>>>>>>  has
>>>>>>> some info as well - basically you'd want to make sure the worker service
>>>>>>> account has access to the Vertex endpoint you're using.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Danny
>>>>>>>
>>>>>>> On Thu, Oct 9, 2025 at 9:18 AM XQ Hu via user <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I think
>>>>>>>> https://cloud.google.com/dataflow/docs/notebooks/run_inference_vertex_ai
>>>>>>>> has more details for you to get started.
>>>>>>>>
>>>>>>>> On Thu, Oct 9, 2025 at 7:14 AM Marc _ <[email protected]> wrote:
>>>>>>>>
>>>>>>>>> Hello all
>>>>>>>>>  i want to port this example to a real dataflow pipeline i am
>>>>>>>>> running, as i want to move from
>>>>>>>>> OpenAI to dataflow
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> https://github.com/blueviggen/beam-remote-llm-examples/blob/main/gemma_runinference_example.ipynb
>>>>>>>>>
>>>>>>>>> Could anyone advise on the authentication side for accessing
>>>>>>>>> VertexAI?
>>>>>>>>>
>>>>>>>>> Kind regards
>>>>>>>>> Marco
>>>>>>>>>
>>>>>>>>

Reply via email to