Database IAM authentication failing from Google Dataflow instance

2023-12-20 Thread Sumit Desai via user
Hi all,
I have a Python based application that is using Apache beam in batch mode
and Google Dataflow as a worker. Yesterday, I was facing an issue passing
environmental variable  to Dataflow workers. I have temporarily commented
uses of the non.public Python package which required environmental
variables to function.

The first step of my pipeline is to read data from a database table as
input PCollection. The library that I have used as Input connector requires
DB build-in user and password and first step is getting executed
successfully.

Now, in second step, I want to update the DB rows (just 1 right now for
testing) to IN_PROGRESS. Here, I am using an IAM user which I am also using
outside of Dataflow. But  I am getting an error in dataflow pipeline -

*sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) connection to
server at "xx.xx.xxx.xx", port 5432 failed: FATAL: AlloyDB IAM user
authentication failed for user "{iam_user}". *

I also tried creating a new IAM user corresponding to the service account I
am using for workers and provided it with the same permissions as the IAM
user outside of dataflow. But ,I am still seeing the same error. From logs,
I can see DB IP ,DB name and IAM users are correctly being passed.

Is there anything additional that I should be doing for an IAM user to
successfully connect to DB?

Thanks & Regards,
Sumit Desai


RE: Processing data from Kafka. Python

2023-12-20 Thread Поротиков Станислав Вячеславович via user
It seems to be fixed by adding option to Java expansion service:
"--experiments=use_deprecated_read"
I have found connected ticket: https://issues.apache.org/jira/browse/BEAM-11991

Best regards,
Stanislav Porotikov

From: Поротиков Станислав Вячеславович via user 
Sent: Tuesday, December 19, 2023 1:58 PM
To: user@beam.apache.org
Cc: Поротиков Станислав Вячеславович 
Subject: Processing data from Kafka. Python

I'm trying to read data from Kafka, make some processing and then write new 
data to another Kafka topic.
The problem is that task is probably stucked on the processing stage.
In the logs I can see it reads data from kafka constantly. But no new data 
appears in the sink Kafka topic
Could you help me, what I did wrong?

My pipeline:
pipeline_flink_environment = [
"--runner=FlinkRunner",
"--flink_submit_uber_jar",
"--streaming",
"--flink_master=localhost:8081",
"--environment_type=PROCESS",
"--environment_config={\"command\":\"/opt/apache/beam/boot\"}"
]


def run():
pipeline_options = PipelineOptions(pipeline_flink_environment)


with beam.Pipeline(options=pipeline_options) as pipeline:
kafka_message = (
pipeline
| 'Read topic from Kafka' >> 
ReadFromKafka(consumer_config=source_config,
  
topics=[source_topic],
  
expansion_service=kafka_process_expansion_service
)
| beam.WindowInto(beam.window.FixedWindows(15))
| 'Group elements' >> beam.GroupByKey()
| 'Write data to Kafka' >> 
WriteToKafka(producer_config=source_config,
topic=sink_topic,

expansion_service=kafka_process_expansion_service
)
 )


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()



Just few lines of logs, I can see, connected to python worker:

2023-12-19 08:18:04,634 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - 
Un-registering task and sending final execution state FINISHED to JobManager 
for task Source: Impulse -> [3]Read topic from 
Kafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(GenerateKafkaSourceDescriptor),
 KafkaIO.ReadSourceDescriptors} (1/1)#0 
856b8acfe73098d7075a2636a645f66d_cbc357ccb763df2852fee8c4fc7d55f2_0_0.
2023-12-19 08:18:05,581 INFO  
org.apache.beam.runners.fnexecution.logging.GrpcLoggingService [] - Beam Fn 
Logging client connected.
2023-12-19 08:18:05,626 WARN  
/usr/local/lib/python3.9/site-packages/apache_beam/options/pipeline_options.py:291
 [] - Not setting flag with value None: app_name
2023-12-19 08:18:05,627 WARN  
/usr/local/lib/python3.9/site-packages/apache_beam/options/pipeline_options.py:291
 [] - Not setting flag with value None: flink_conf_dir
2023-12-19 08:18:05,628 INFO  
/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker_main.py:111
 [] - semi_persistent_directory: /tmp
2023-12-19 08:18:05,628 WARN  
/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker_main.py:356
 [] - No session file found: /tmp/staged/pickled_main_session. Functions 
defined in __main__ (interactive session) may fail.
2023-12-19 08:18:05,629 WARN  
/usr/local/lib/python3.9/site-packages/apache_beam/options/pipeline_options.py:367
 [] - Discarding unparseable args: ['--direct_runner_use_stacked_bundle', 
'--options_id=1', '--pipeline_type_check']
2023-12-19 08:18:05,629 INFO  
/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker_main.py:135
 [] - Pipeline_options: {'streaming': True, 'job_name': 
'BeamApp-flink-1219081730-11566b15', 'gcp_oauth_scopes': 
['https://www.googleapis.com/auth/bigquery', 
'https://www.googleapis.com/auth/cloud-platform', 
'https://www.googleapis.com/auth/devstorage.full_control', 
'https://www.googleapis.com/auth/userinfo.email', 
'https://www.googleapis.com/auth/datastore', 
'https://www.googleapis.com/auth/spanner.admin', 
'https://www.googleapis.com/auth/spanner.data', 
'https://www.googleapis.com/auth/bigquery', 
'https://www.googleapis.com/auth/cloud-platform', 
'https://www.googleapis.com/auth/devstorage.full_control', 
'https://www.googleapis.com/auth/userinfo.email', 
'https://www.googleapis.com/auth/datastore', 
'https://www.googleapis.com/auth/spanner.admin', 
'https://www.googleapis.com/auth/spanner.data'], 
'default_sdk_harness_log_level': 'DEBUG', 'experiments': ['beam_fn_api'], 
'sdk_location': 'container', 'environment_type': 'PROCESS', 
'environment_config': '{"command":"/opt/apache/beam/boot"}', 
'sdk_worker_parallelism': '1', 'environment_cache_millis': '0', 
'flink_submit_uber_jar': True}
2023-12-19 08:18:05,672 INFO  

How to set flow control for pubsubliteio?

2023-12-20 Thread hsy...@gmail.com
How to change flow control config for pubsubliteio ?

I saw the setting has been taken out as part of
https://issues.apache.org/jira/browse/BEAM-14129

But without setup flow control correctly, my beam app is running super slow
ingesting from pubsbulite and getting NO_CLIENT_TOKEN error on the server
side, which suggest to increase the flow control setting


Re: Environmental variables not accessible in Dataflow pipeline

2023-12-20 Thread XQ Hu via user
Dataflow VMs cannot know your local env variable. I think you should use
custom container:
https://cloud.google.com/dataflow/docs/guides/using-custom-containers. Here
is a sample project: https://github.com/google/dataflow-ml-starter

On Wed, Dec 20, 2023 at 4:57 AM Sofia’s World  wrote:

> Hello Sumit
>  Thanks. Sorry...I guess if the value of the env variable is always the
> same u can pass it as job params?..though it doesn't sound like a
> viable option...
> Hth
>
> On Wed, 20 Dec 2023, 09:49 Sumit Desai,  wrote:
>
>> Hi Sofia,
>>
>> Thanks for the response. For now, we have decided not to use flex
>> template. Is there a way to pass environmental variables without using any
>> template?
>>
>> Thanks & Regards,
>> Sumit Desai
>>
>> On Wed, Dec 20, 2023 at 3:16 PM Sofia’s World 
>> wrote:
>>
>>> Hi
>>>  My 2 cents. .have u ever considered using flex templates to run your
>>> pipeline? Then you can pass all your parameters at runtime..
>>> (Apologies in advance if it does not cover your use case...)
>>>
>>> On Wed, 20 Dec 2023, 09:35 Sumit Desai via user, 
>>> wrote:
>>>
 Hi all,

 I have a Python application which is using Apache beam and Dataflow as
 runner. The application uses a non-public Python package
 'uplight-telemetry' which is configured using 'extra_packages' while
 creating pipeline_options object. This package expects an environmental
 variable named 'OTEL_SERVICE_NAME' and since this variable is not present
 in the Dataflow worker, it is resulting in an error during application
 startup.

 I am passing this variable using custom pipeline options. Code to
 create pipeline options is as follows-

 pipeline_options = ProcessBillRequests.CustomOptions(
 project=gcp_project_id,
 region="us-east1",
 job_name=job_name,
 
 temp_location=f'gs://{TAS_GCS_BUCKET_NAME_PREFIX}{os.getenv("UP_PLATFORM_ENV")}/temp',
 
 staging_location=f'gs://{TAS_GCS_BUCKET_NAME_PREFIX}{os.getenv("UP_PLATFORM_ENV")}/staging',
 runner='DataflowRunner',
 save_main_session=True,
 service_account_email= service_account,
 subnetwork=os.environ.get(SUBNETWORK_URL),
 extra_packages=[uplight_telemetry_tar_file_path],
 setup_file=setup_file_path,
 OTEL_SERVICE_NAME=otel_service_name,
 OTEL_RESOURCE_ATTRIBUTES=otel_resource_attributes
 # Set values for additional custom variables as needed
 )


 And the code that executes the pipeline is as follows-


 result = (
 pipeline
 | "ReadPendingRecordsFromDB" >> read_from_db
 | "Parse input PCollection" >> 
 beam.Map(ProcessBillRequests.parse_bill_data_requests)
 | "Fetch bills " >> 
 beam.ParDo(ProcessBillRequests.FetchBillInformation())
 )

 pipeline.run().wait_until_finish()

 Is there a way I can set the environmental variables in custom options
 available in the worker?

 Thanks & Regards,
 Sumit Desai

>>>


Re: Environmental variables not accessible in Dataflow pipeline

2023-12-20 Thread Sofia’s World
Hello Sumit
 Thanks. Sorry...I guess if the value of the env variable is always the
same u can pass it as job params?..though it doesn't sound like a
viable option...
Hth

On Wed, 20 Dec 2023, 09:49 Sumit Desai,  wrote:

> Hi Sofia,
>
> Thanks for the response. For now, we have decided not to use flex
> template. Is there a way to pass environmental variables without using any
> template?
>
> Thanks & Regards,
> Sumit Desai
>
> On Wed, Dec 20, 2023 at 3:16 PM Sofia’s World  wrote:
>
>> Hi
>>  My 2 cents. .have u ever considered using flex templates to run your
>> pipeline? Then you can pass all your parameters at runtime..
>> (Apologies in advance if it does not cover your use case...)
>>
>> On Wed, 20 Dec 2023, 09:35 Sumit Desai via user, 
>> wrote:
>>
>>> Hi all,
>>>
>>> I have a Python application which is using Apache beam and Dataflow as
>>> runner. The application uses a non-public Python package
>>> 'uplight-telemetry' which is configured using 'extra_packages' while
>>> creating pipeline_options object. This package expects an environmental
>>> variable named 'OTEL_SERVICE_NAME' and since this variable is not present
>>> in the Dataflow worker, it is resulting in an error during application
>>> startup.
>>>
>>> I am passing this variable using custom pipeline options. Code to create
>>> pipeline options is as follows-
>>>
>>> pipeline_options = ProcessBillRequests.CustomOptions(
>>> project=gcp_project_id,
>>> region="us-east1",
>>> job_name=job_name,
>>> 
>>> temp_location=f'gs://{TAS_GCS_BUCKET_NAME_PREFIX}{os.getenv("UP_PLATFORM_ENV")}/temp',
>>> 
>>> staging_location=f'gs://{TAS_GCS_BUCKET_NAME_PREFIX}{os.getenv("UP_PLATFORM_ENV")}/staging',
>>> runner='DataflowRunner',
>>> save_main_session=True,
>>> service_account_email= service_account,
>>> subnetwork=os.environ.get(SUBNETWORK_URL),
>>> extra_packages=[uplight_telemetry_tar_file_path],
>>> setup_file=setup_file_path,
>>> OTEL_SERVICE_NAME=otel_service_name,
>>> OTEL_RESOURCE_ATTRIBUTES=otel_resource_attributes
>>> # Set values for additional custom variables as needed
>>> )
>>>
>>>
>>> And the code that executes the pipeline is as follows-
>>>
>>>
>>> result = (
>>> pipeline
>>> | "ReadPendingRecordsFromDB" >> read_from_db
>>> | "Parse input PCollection" >> 
>>> beam.Map(ProcessBillRequests.parse_bill_data_requests)
>>> | "Fetch bills " >> 
>>> beam.ParDo(ProcessBillRequests.FetchBillInformation())
>>> )
>>>
>>> pipeline.run().wait_until_finish()
>>>
>>> Is there a way I can set the environmental variables in custom options
>>> available in the worker?
>>>
>>> Thanks & Regards,
>>> Sumit Desai
>>>
>>


Re: Environmental variables not accessible in Dataflow pipeline

2023-12-20 Thread Sumit Desai via user
Hi Sofia,

Thanks for the response. For now, we have decided not to use flex template.
Is there a way to pass environmental variables without using any template?

Thanks & Regards,
Sumit Desai

On Wed, Dec 20, 2023 at 3:16 PM Sofia’s World  wrote:

> Hi
>  My 2 cents. .have u ever considered using flex templates to run your
> pipeline? Then you can pass all your parameters at runtime..
> (Apologies in advance if it does not cover your use case...)
>
> On Wed, 20 Dec 2023, 09:35 Sumit Desai via user, 
> wrote:
>
>> Hi all,
>>
>> I have a Python application which is using Apache beam and Dataflow as
>> runner. The application uses a non-public Python package
>> 'uplight-telemetry' which is configured using 'extra_packages' while
>> creating pipeline_options object. This package expects an environmental
>> variable named 'OTEL_SERVICE_NAME' and since this variable is not present
>> in the Dataflow worker, it is resulting in an error during application
>> startup.
>>
>> I am passing this variable using custom pipeline options. Code to create
>> pipeline options is as follows-
>>
>> pipeline_options = ProcessBillRequests.CustomOptions(
>> project=gcp_project_id,
>> region="us-east1",
>> job_name=job_name,
>> 
>> temp_location=f'gs://{TAS_GCS_BUCKET_NAME_PREFIX}{os.getenv("UP_PLATFORM_ENV")}/temp',
>> 
>> staging_location=f'gs://{TAS_GCS_BUCKET_NAME_PREFIX}{os.getenv("UP_PLATFORM_ENV")}/staging',
>> runner='DataflowRunner',
>> save_main_session=True,
>> service_account_email= service_account,
>> subnetwork=os.environ.get(SUBNETWORK_URL),
>> extra_packages=[uplight_telemetry_tar_file_path],
>> setup_file=setup_file_path,
>> OTEL_SERVICE_NAME=otel_service_name,
>> OTEL_RESOURCE_ATTRIBUTES=otel_resource_attributes
>> # Set values for additional custom variables as needed
>> )
>>
>>
>> And the code that executes the pipeline is as follows-
>>
>>
>> result = (
>> pipeline
>> | "ReadPendingRecordsFromDB" >> read_from_db
>> | "Parse input PCollection" >> 
>> beam.Map(ProcessBillRequests.parse_bill_data_requests)
>> | "Fetch bills " >> 
>> beam.ParDo(ProcessBillRequests.FetchBillInformation())
>> )
>>
>> pipeline.run().wait_until_finish()
>>
>> Is there a way I can set the environmental variables in custom options
>> available in the worker?
>>
>> Thanks & Regards,
>> Sumit Desai
>>
>


Re: Environmental variables not accessible in Dataflow pipeline

2023-12-20 Thread Sofia’s World
Hi
 My 2 cents. .have u ever considered using flex templates to run your
pipeline? Then you can pass all your parameters at runtime..
(Apologies in advance if it does not cover your use case...)

On Wed, 20 Dec 2023, 09:35 Sumit Desai via user, 
wrote:

> Hi all,
>
> I have a Python application which is using Apache beam and Dataflow as
> runner. The application uses a non-public Python package
> 'uplight-telemetry' which is configured using 'extra_packages' while
> creating pipeline_options object. This package expects an environmental
> variable named 'OTEL_SERVICE_NAME' and since this variable is not present
> in the Dataflow worker, it is resulting in an error during application
> startup.
>
> I am passing this variable using custom pipeline options. Code to create
> pipeline options is as follows-
>
> pipeline_options = ProcessBillRequests.CustomOptions(
> project=gcp_project_id,
> region="us-east1",
> job_name=job_name,
> 
> temp_location=f'gs://{TAS_GCS_BUCKET_NAME_PREFIX}{os.getenv("UP_PLATFORM_ENV")}/temp',
> 
> staging_location=f'gs://{TAS_GCS_BUCKET_NAME_PREFIX}{os.getenv("UP_PLATFORM_ENV")}/staging',
> runner='DataflowRunner',
> save_main_session=True,
> service_account_email= service_account,
> subnetwork=os.environ.get(SUBNETWORK_URL),
> extra_packages=[uplight_telemetry_tar_file_path],
> setup_file=setup_file_path,
> OTEL_SERVICE_NAME=otel_service_name,
> OTEL_RESOURCE_ATTRIBUTES=otel_resource_attributes
> # Set values for additional custom variables as needed
> )
>
>
> And the code that executes the pipeline is as follows-
>
>
> result = (
> pipeline
> | "ReadPendingRecordsFromDB" >> read_from_db
> | "Parse input PCollection" >> 
> beam.Map(ProcessBillRequests.parse_bill_data_requests)
> | "Fetch bills " >> 
> beam.ParDo(ProcessBillRequests.FetchBillInformation())
> )
>
> pipeline.run().wait_until_finish()
>
> Is there a way I can set the environmental variables in custom options
> available in the worker?
>
> Thanks & Regards,
> Sumit Desai
>


Environmental variables not accessible in Dataflow pipeline

2023-12-20 Thread Sumit Desai via user
Hi all,

I have a Python application which is using Apache beam and Dataflow as
runner. The application uses a non-public Python package
'uplight-telemetry' which is configured using 'extra_packages' while
creating pipeline_options object. This package expects an environmental
variable named 'OTEL_SERVICE_NAME' and since this variable is not present
in the Dataflow worker, it is resulting in an error during application
startup.

I am passing this variable using custom pipeline options. Code to create
pipeline options is as follows-

pipeline_options = ProcessBillRequests.CustomOptions(
project=gcp_project_id,
region="us-east1",
job_name=job_name,

temp_location=f'gs://{TAS_GCS_BUCKET_NAME_PREFIX}{os.getenv("UP_PLATFORM_ENV")}/temp',

staging_location=f'gs://{TAS_GCS_BUCKET_NAME_PREFIX}{os.getenv("UP_PLATFORM_ENV")}/staging',
runner='DataflowRunner',
save_main_session=True,
service_account_email= service_account,
subnetwork=os.environ.get(SUBNETWORK_URL),
extra_packages=[uplight_telemetry_tar_file_path],
setup_file=setup_file_path,
OTEL_SERVICE_NAME=otel_service_name,
OTEL_RESOURCE_ATTRIBUTES=otel_resource_attributes
# Set values for additional custom variables as needed
)


And the code that executes the pipeline is as follows-


result = (
pipeline
| "ReadPendingRecordsFromDB" >> read_from_db
| "Parse input PCollection" >>
beam.Map(ProcessBillRequests.parse_bill_data_requests)
| "Fetch bills " >>
beam.ParDo(ProcessBillRequests.FetchBillInformation())
)

pipeline.run().wait_until_finish()

Is there a way I can set the environmental variables in custom options
available in the worker?

Thanks & Regards,
Sumit Desai