The model handler lets you pass in any inference function you want. For
example, the notebook uses the default one defined here -
https://github.com/apache/beam/blob/08b480000ec859292d0f7bbadafb72328d3e9e16/sdks/python/apache_beam/ml/inference/gemini_inference.py#L54

So you could define an inference function which passes in additional
config, e.g. from https://pypi.org/project/google-genai/

```
config=types.GenerateContentConfig(
        system_instruction='I say high, you say low',
        max_output_tokens=3,
        temperature=0.3,
    ),
```

On Sat, Oct 11, 2025 at 3:46 PM Marc _ <[email protected]> wrote:

> Hello
>  i m a muppet. Did not notice this in the colab sample
> https://github.com/blueviggen/beam-remote-llm-examples/blob/main/gemini_runinference_example.ipynb
>
> # Only supported for genai package 1.21.1 or earlier
> output_text = gemini_response.content.parts[0].text
>
> Using that package i can run sample pipeline on my local machine
> I have further questions on the GeminiModelHandler as i could not find
> anything via google..
>
> How can i specify system instructions ? I was able to do so with the
> OpenAIHandler - below a snippet copied from a prev mail Danny sent me few
> months ago
>
> class SampleOpenAIHandler(ModelHandler):
>   """DoFn that accepts a batch of images as bytearray
>   and sends that batch to the Cloud Vision API for remote inference"""
>   def __init__(self, oai_key, llm_instructions):
>       self.oai_key = oai_key
>       self.llm_instructions = llm_instructions
>
>   def load_model(self):
>     """Initiate the Google Vision API client."""
>     """Initiate the OAI API client."""
>     client =  openai.OpenAI(
>     # This is the default and can be omitted
>         api_key=self.oai_key,
>     )
>     return client
>
>
>   def run_inference(self, batch, model, inference):
>
>
>     response = model.responses.create(
>           model="gpt-4o",
>           instructions=self.llm_instructions,
>           input=batch[0],
>       )
>     return [response.output_text]
>
>
>
> Kind regards
> Marco
>
>
> On Fri, Oct 10, 2025 at 10:51 PM Marc _ <[email protected]> wrote:
>
>> Danny / XQ
>>   got some setback.
>> I copied the colab gemini sample,.
>> https://github.com/blueviggen/beam-remote-llm-examples/blob/main/gemini_runinference_example.ipynb
>>
>> I Have added the following lines to post processor to figure out what is
>> going on
>>
>> input_prompt = element.example
>>
>> # The API response is in `element.inference`
>> # Path to text: response -> candidates -> content -> parts -> text
>> gemini_inference = element.inference
>> print(f'element.inference is {gemini_inference}')
>> print(gemini_inference[1])
>>
>> The code works fine in colab and i can see that the response matches the
>> docs
>> element.inference is ('candidates',
>> [Candidate(content=Content(parts=[Part(video_metadata=None, thought=None,
>> inline_data=None, file_data=None, thought_signature=None,
>> code_execution_result=None, executable_code=None, function_call=None,
>> function_response=None, text='```json\n{\n "question": "What is 1+2?",\n
>> "answer": 3\n}\n```')], role='model'), citation_metadata=None,
>> finish_message=None, token_count=None, finish_reason=<FinishReason.STOP:
>> 'STOP'>, url_context_metadata=None, avg_logprobs=None,
>> grounding_metadata=None, index=0, logprobs_result=None,
>> safety_ratings=None)])
>>
>> But when i run the same pipeline on GCP DataFlow, it seems i can
>> only capture the first HttpResponse with all headers. the rest of the
>> response is gone.......
>> Plus, the response is not  a Candidate  but an HttpResponse and hence the
>> following code fails miserably
>>
>> Is it because GCP is running multiple workers and somehow i am only
>> capturing the first 'streamed' response from the model?
>>
>> Kind regards
>>   Marco
>>
>>
>>
>>
>> element.inference is ('sdk_http_response', HttpResponse( headers=<dict
>> len=10> ))
>>
>> <https://console.cloud.google.com/logs/query;query=resource.type%3D%22dataflow_step%22%20resource.labels.job_id%3D%222025-10-10_14_40_13-2699715226748068892%22%20logName%3D%22projects%2Fdatascience-projects%2Flogs%2Fdataflow.googleapis.com%252Fworker%22%20resource.labels.step_id%3D%2528%22PostProcess%22%2529%20timestamp%20%3E%3D%20%222025-10-10T21:40:13.801Z%22%20timestamp%20%3C%3D%20%222025-10-10T21:46:28.995Z%22%20severity%3E%3DDEFAULT;timeRange=2025-10-10T21:44:25.660899877Z%2F2025-10-10T21:44:25.660899877Z--PT1H;storageScope=project;pinnedLogId=2025-10-10T21:44:25.660899877Z%2F6182574922792223526:174769:0:14686?hl=en&project=datascience-projects>
>>
>> <https://console.cloud.google.com/logs/query;query=resource.type%3D%22dataflow_step%22%20resource.labels.job_id%3D%222025-10-10_14_40_13-2699715226748068892%22%20logName%3D%22projects%2Fdatascience-projects%2Flogs%2Fdataflow.googleapis.com%252Fworker%22%20resource.labels.step_id%3D%2528%22PostProcess%22%2529%20timestamp%20%3E%3D%20%222025-10-10T21:40:13.801Z%22%20timestamp%20%3C%3D%20%222025-10-10T21:46:28.995Z%22%20severity%3E%3DDEFAULT;timeRange=2025-10-10T21:44:25.660899877Z%2F2025-10-10T21:44:25.660899877Z--PT1H;storageScope=project;pinnedLogId=2025-10-10T21:44:25.660899877Z%2F6182574922792223526:174769:0:14686?hl=en&project=datascience-projects>
>>
>> <https://console.cloud.google.com/logs/query;query=resource.type%3D%22dataflow_step%22%20resource.labels.job_id%3D%222025-10-10_14_40_13-2699715226748068892%22%20logName%3D%22projects%2Fdatascience-projects%2Flogs%2Fdataflow.googleapis.com%252Fworker%22%20resource.labels.step_id%3D%2528%22PostProcess%22%2529%20timestamp%20%3E%3D%20%222025-10-10T21:40:13.801Z%22%20timestamp%20%3C%3D%20%222025-10-10T21:46:28.995Z%22%20severity%3E%3DDEFAULT;timeRange=2025-10-10T21:44:25.660899877Z%2F2025-10-10T21:44:25.660899877Z--PT1H;storageScope=project;pinnedLogId=2025-10-10T21:44:25.660899877Z%2F6182574922792223526:174769:0:14686?hl=en&project=datascience-projects>
>>
>> <https://console.cloud.google.com/logs/query;query=resource.type%3D%22dataflow_step%22%20resource.labels.job_id%3D%222025-10-10_14_40_13-2699715226748068892%22%20logName%3D%22projects%2Fdatascience-projects%2Flogs%2Fdataflow.googleapis.com%252Fworker%22%20resource.labels.step_id%3D%2528%22PostProcess%22%2529%20timestamp%20%3E%3D%20%222025-10-10T21:40:13.801Z%22%20timestamp%20%3C%3D%20%222025-10-10T21:46:28.995Z%22%20severity%3E%3DDEFAULT;timeRange=2025-10-10T21:44:25.660899877Z%2F2025-10-10T21:44:25.660899877Z--PT1H;storageScope=project;pinnedLogId=2025-10-10T21:44:25.660899877Z%2F6182574922792223526:174769:0:14686?hl=en&project=datascience-projects>
>>
>> <https://console.cloud.google.com/logs/query;query=resource.type%3D%22dataflow_step%22%20resource.labels.job_id%3D%222025-10-10_14_40_13-2699715226748068892%22%20logName%3D%22projects%2Fdatascience-projects%2Flogs%2Fdataflow.googleapis.com%252Fworker%22%20resource.labels.step_id%3D%2528%22PostProcess%22%2529%20timestamp%20%3E%3D%20%222025-10-10T21:40:13.801Z%22%20timestamp%20%3C%3D%20%222025-10-10T21:46:28.995Z%22%20severity%3E%3DDEFAULT;timeRange=2025-10-10T21:44:25.660899877Z%2F2025-10-10T21:44:25.660899877Z--PT1H;storageScope=project;pinnedLogId=2025-10-10T21:44:25.660899877Z%2F6182574922792223526:174769:0:14686?hl=en&project=datascience-projects>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> On Thu, Oct 9, 2025 at 2:51 PM Marc _ <[email protected]> wrote:
>>
>>> thanks Danny / XQ, will have a look and report back if i am stuck
>>> kr
>>>
>>> On Thu, Oct 9, 2025 at 2:22 PM Danny McCormick via user <
>>> [email protected]> wrote:
>>>
>>>> If you're using the Dataflow runner,
>>>> https://cloud.google.com/dataflow/docs/concepts/security-and-permissions#permissions
>>>>  has
>>>> some info as well - basically you'd want to make sure the worker service
>>>> account has access to the Vertex endpoint you're using.
>>>>
>>>> Thanks,
>>>> Danny
>>>>
>>>> On Thu, Oct 9, 2025 at 9:18 AM XQ Hu via user <[email protected]>
>>>> wrote:
>>>>
>>>>> I think
>>>>> https://cloud.google.com/dataflow/docs/notebooks/run_inference_vertex_ai
>>>>> has more details for you to get started.
>>>>>
>>>>> On Thu, Oct 9, 2025 at 7:14 AM Marc _ <[email protected]> wrote:
>>>>>
>>>>>> Hello all
>>>>>>  i want to port this example to a real dataflow pipeline i am
>>>>>> running, as i want to move from
>>>>>> OpenAI to dataflow
>>>>>>
>>>>>>
>>>>>> https://github.com/blueviggen/beam-remote-llm-examples/blob/main/gemma_runinference_example.ipynb
>>>>>>
>>>>>> Could anyone advise on the authentication side for accessing VertexAI?
>>>>>>
>>>>>> Kind regards
>>>>>> Marco
>>>>>>
>>>>>

Reply via email to