Re: Dataflow not able to find a module specified using extra_package

2023-12-19 Thread Sumit Desai via user
Thanks Anand and Robert. Using extra_packages and specifying it as list
worked.

Regards,
Sumit Desai

On Tue, Dec 19, 2023 at 11:45 PM Robert Bradshaw via user <
user@beam.apache.org> wrote:

> And should it be a list of strings, rather than a string?
>
> On Tue, Dec 19, 2023 at 10:10 AM Anand Inguva via user <
> user@beam.apache.org> wrote:
>
>> Can you try passing `extra_packages` instead of `extra_package` when
>> passing pipeline options as a dict?
>>
>> On Tue, Dec 19, 2023 at 12:26 PM Sumit Desai via user <
>> user@beam.apache.org> wrote:
>>
>>> Hi all,
>>> I have created a Dataflow pipeline in batch mode using Apache beam
>>> Python SDK. I am using one non-public dependency 'uplight-telemetry'. I
>>> have specified it using parameter extra_package while creating
>>> pipeline_options object. However, the pipeline loading is failing with an
>>> error *No module named 'uplight_telemetry'*.
>>> The code to create pipeline_options is as following-
>>>
>>> def __create_pipeline_options_dataflow(job_name):
>>> # Set up the Dataflow runner options
>>> gcp_project_id = os.environ.get(GCP_PROJECT_ID)
>>> current_dir = os.path.dirname(os.path.abspath(__file__))
>>> print("current_dir=", current_dir)
>>> setup_file_path = os.path.join(current_dir, '..', '..', 'setup.py')
>>> print("Set-up file path=", setup_file_path)
>>> #TODO:Move file to proper location
>>> uplight_telemetry_tar_file_path=os.path.join(current_dir, '..', 
>>> '..','..','non-public-dependencies', 'uplight-telemetry-1.0.0.tar.gz')
>>> # TODO:Move to environmental variables
>>> pipeline_options = {
>>> 'project': gcp_project_id,
>>> 'region': "us-east1",
>>> 'job_name': job_name,  # Provide a unique job name
>>> 'temp_location': 
>>> f'gs://{TAS_GCS_BUCKET_NAME_PREFIX}{os.getenv("UP_PLATFORM_ENV")}/temp',
>>> 'staging_location': 
>>> f'gs://{TAS_GCS_BUCKET_NAME_PREFIX}{os.getenv("UP_PLATFORM_ENV")}/staging',
>>> 'runner': 'DataflowRunner',
>>> 'save_main_session': True,
>>> 'service_account_email': os.environ.get(SERVICE_ACCOUNT),
>>> # 'network': f'projects/{gcp_project_id}/global/networks/default',
>>> 'subnetwork': os.environ.get(SUBNETWORK_URL),
>>> 'setup_file': setup_file_path,
>>> 'extra_package': uplight_telemetry_tar_file_path
>>> # 'template_location': 
>>> 'gcr.io/dataflow-templates-base/python310-template-launcher-base'
>>> }
>>> print("Pipeline created for job-name", job_name)
>>> logger.debug(f"pipeline_options created as {pipeline_options}")
>>> return pipeline_options
>>>
>>> Why is it not trying to install this package from extra_package?
>>>
>>


Re: pubsubliteio is super slow

2023-12-19 Thread hsy...@gmail.com
Do you have a ticket?

And in the pubsublite metrics it show NO_CLIENT_TOKENS

On Tue, Dec 19, 2023 at 1:39 PM Nirav Patel  wrote:

> we have. yes it is super slow.  I tested python, java IO version as well
> besides beam IO. we reported to google about this problem.
>
> On Tue, Dec 19, 2023 at 10:17 AM hsy...@gmail.com 
> wrote:
>
>> Any one is using pubsublite? I find it super slow 5 messages/sec and
>> there is no options for me to tune the performance
>>
>


Re: pubsubliteio is super slow

2023-12-19 Thread Nirav Patel
we have. yes it is super slow.  I tested python, java IO version as well
besides beam IO. we reported to google about this problem.

On Tue, Dec 19, 2023 at 10:17 AM hsy...@gmail.com  wrote:

> Any one is using pubsublite? I find it super slow 5 messages/sec and there
> is no options for me to tune the performance
>


pubsubliteio is super slow

2023-12-19 Thread hsy...@gmail.com
Any one is using pubsublite? I find it super slow 5 messages/sec and there
is no options for me to tune the performance


Re: Dataflow not able to find a module specified using extra_package

2023-12-19 Thread Robert Bradshaw via user
And should it be a list of strings, rather than a string?

On Tue, Dec 19, 2023 at 10:10 AM Anand Inguva via user 
wrote:

> Can you try passing `extra_packages` instead of `extra_package` when
> passing pipeline options as a dict?
>
> On Tue, Dec 19, 2023 at 12:26 PM Sumit Desai via user <
> user@beam.apache.org> wrote:
>
>> Hi all,
>> I have created a Dataflow pipeline in batch mode using Apache beam Python
>> SDK. I am using one non-public dependency 'uplight-telemetry'. I have
>> specified it using parameter extra_package while creating pipeline_options
>> object. However, the pipeline loading is failing with an error *No
>> module named 'uplight_telemetry'*.
>> The code to create pipeline_options is as following-
>>
>> def __create_pipeline_options_dataflow(job_name):
>> # Set up the Dataflow runner options
>> gcp_project_id = os.environ.get(GCP_PROJECT_ID)
>> current_dir = os.path.dirname(os.path.abspath(__file__))
>> print("current_dir=", current_dir)
>> setup_file_path = os.path.join(current_dir, '..', '..', 'setup.py')
>> print("Set-up file path=", setup_file_path)
>> #TODO:Move file to proper location
>> uplight_telemetry_tar_file_path=os.path.join(current_dir, '..', 
>> '..','..','non-public-dependencies', 'uplight-telemetry-1.0.0.tar.gz')
>> # TODO:Move to environmental variables
>> pipeline_options = {
>> 'project': gcp_project_id,
>> 'region': "us-east1",
>> 'job_name': job_name,  # Provide a unique job name
>> 'temp_location': 
>> f'gs://{TAS_GCS_BUCKET_NAME_PREFIX}{os.getenv("UP_PLATFORM_ENV")}/temp',
>> 'staging_location': 
>> f'gs://{TAS_GCS_BUCKET_NAME_PREFIX}{os.getenv("UP_PLATFORM_ENV")}/staging',
>> 'runner': 'DataflowRunner',
>> 'save_main_session': True,
>> 'service_account_email': os.environ.get(SERVICE_ACCOUNT),
>> # 'network': f'projects/{gcp_project_id}/global/networks/default',
>> 'subnetwork': os.environ.get(SUBNETWORK_URL),
>> 'setup_file': setup_file_path,
>> 'extra_package': uplight_telemetry_tar_file_path
>> # 'template_location': 
>> 'gcr.io/dataflow-templates-base/python310-template-launcher-base'
>> }
>> print("Pipeline created for job-name", job_name)
>> logger.debug(f"pipeline_options created as {pipeline_options}")
>> return pipeline_options
>>
>> Why is it not trying to install this package from extra_package?
>>
>


Re: Dataflow not able to find a module specified using extra_package

2023-12-19 Thread Anand Inguva via user
Can you try passing `extra_packages` instead of `extra_package` when
passing pipeline options as a dict?

On Tue, Dec 19, 2023 at 12:26 PM Sumit Desai via user 
wrote:

> Hi all,
> I have created a Dataflow pipeline in batch mode using Apache beam Python
> SDK. I am using one non-public dependency 'uplight-telemetry'. I have
> specified it using parameter extra_package while creating pipeline_options
> object. However, the pipeline loading is failing with an error *No module
> named 'uplight_telemetry'*.
> The code to create pipeline_options is as following-
>
> def __create_pipeline_options_dataflow(job_name):
> # Set up the Dataflow runner options
> gcp_project_id = os.environ.get(GCP_PROJECT_ID)
> current_dir = os.path.dirname(os.path.abspath(__file__))
> print("current_dir=", current_dir)
> setup_file_path = os.path.join(current_dir, '..', '..', 'setup.py')
> print("Set-up file path=", setup_file_path)
> #TODO:Move file to proper location
> uplight_telemetry_tar_file_path=os.path.join(current_dir, '..', 
> '..','..','non-public-dependencies', 'uplight-telemetry-1.0.0.tar.gz')
> # TODO:Move to environmental variables
> pipeline_options = {
> 'project': gcp_project_id,
> 'region': "us-east1",
> 'job_name': job_name,  # Provide a unique job name
> 'temp_location': 
> f'gs://{TAS_GCS_BUCKET_NAME_PREFIX}{os.getenv("UP_PLATFORM_ENV")}/temp',
> 'staging_location': 
> f'gs://{TAS_GCS_BUCKET_NAME_PREFIX}{os.getenv("UP_PLATFORM_ENV")}/staging',
> 'runner': 'DataflowRunner',
> 'save_main_session': True,
> 'service_account_email': os.environ.get(SERVICE_ACCOUNT),
> # 'network': f'projects/{gcp_project_id}/global/networks/default',
> 'subnetwork': os.environ.get(SUBNETWORK_URL),
> 'setup_file': setup_file_path,
> 'extra_package': uplight_telemetry_tar_file_path
> # 'template_location': 
> 'gcr.io/dataflow-templates-base/python310-template-launcher-base'
> }
> print("Pipeline created for job-name", job_name)
> logger.debug(f"pipeline_options created as {pipeline_options}")
> return pipeline_options
>
> Why is it not trying to install this package from extra_package?
>


Dataflow not able to find a module specified using extra_package

2023-12-19 Thread Sumit Desai via user
Hi all,
I have created a Dataflow pipeline in batch mode using Apache beam Python
SDK. I am using one non-public dependency 'uplight-telemetry'. I have
specified it using parameter extra_package while creating pipeline_options
object. However, the pipeline loading is failing with an error *No module
named 'uplight_telemetry'*.
The code to create pipeline_options is as following-

def __create_pipeline_options_dataflow(job_name):
# Set up the Dataflow runner options
gcp_project_id = os.environ.get(GCP_PROJECT_ID)
current_dir = os.path.dirname(os.path.abspath(__file__))
print("current_dir=", current_dir)
setup_file_path = os.path.join(current_dir, '..', '..', 'setup.py')
print("Set-up file path=", setup_file_path)
#TODO:Move file to proper location
uplight_telemetry_tar_file_path=os.path.join(current_dir, '..',
'..','..','non-public-dependencies', 'uplight-telemetry-1.0.0.tar.gz')
# TODO:Move to environmental variables
pipeline_options = {
'project': gcp_project_id,
'region': "us-east1",
'job_name': job_name,  # Provide a unique job name
'temp_location':
f'gs://{TAS_GCS_BUCKET_NAME_PREFIX}{os.getenv("UP_PLATFORM_ENV")}/temp',
'staging_location':
f'gs://{TAS_GCS_BUCKET_NAME_PREFIX}{os.getenv("UP_PLATFORM_ENV")}/staging',
'runner': 'DataflowRunner',
'save_main_session': True,
'service_account_email': os.environ.get(SERVICE_ACCOUNT),
# 'network': f'projects/{gcp_project_id}/global/networks/default',
'subnetwork': os.environ.get(SUBNETWORK_URL),
'setup_file': setup_file_path,
'extra_package': uplight_telemetry_tar_file_path
# 'template_location':
'gcr.io/dataflow-templates-base/python310-template-launcher-base'
}
print("Pipeline created for job-name", job_name)
logger.debug(f"pipeline_options created as {pipeline_options}")
return pipeline_options

Why is it not trying to install this package from extra_package?


Processing data from Kafka. Python

2023-12-19 Thread Поротиков Станислав Вячеславович via user
I'm trying to read data from Kafka, make some processing and then write new 
data to another Kafka topic.
The problem is that task is probably stucked on the processing stage.
In the logs I can see it reads data from kafka constantly. But no new data 
appears in the sink Kafka topic
Could you help me, what I did wrong?

My pipeline:
pipeline_flink_environment = [
"--runner=FlinkRunner",
"--flink_submit_uber_jar",
"--streaming",
"--flink_master=localhost:8081",
"--environment_type=PROCESS",
"--environment_config={\"command\":\"/opt/apache/beam/boot\"}"
]


def run():
pipeline_options = PipelineOptions(pipeline_flink_environment)


with beam.Pipeline(options=pipeline_options) as pipeline:
kafka_message = (
pipeline
| 'Read topic from Kafka' >> 
ReadFromKafka(consumer_config=source_config,
  
topics=[source_topic],
  
expansion_service=kafka_process_expansion_service
)
| beam.WindowInto(beam.window.FixedWindows(15))
| 'Group elements' >> beam.GroupByKey()
| 'Write data to Kafka' >> 
WriteToKafka(producer_config=source_config,
topic=sink_topic,

expansion_service=kafka_process_expansion_service
)
 )


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()



Just few lines of logs, I can see, connected to python worker:

2023-12-19 08:18:04,634 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - 
Un-registering task and sending final execution state FINISHED to JobManager 
for task Source: Impulse -> [3]Read topic from 
Kafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(GenerateKafkaSourceDescriptor),
 KafkaIO.ReadSourceDescriptors} (1/1)#0 
856b8acfe73098d7075a2636a645f66d_cbc357ccb763df2852fee8c4fc7d55f2_0_0.
2023-12-19 08:18:05,581 INFO  
org.apache.beam.runners.fnexecution.logging.GrpcLoggingService [] - Beam Fn 
Logging client connected.
2023-12-19 08:18:05,626 WARN  
/usr/local/lib/python3.9/site-packages/apache_beam/options/pipeline_options.py:291
 [] - Not setting flag with value None: app_name
2023-12-19 08:18:05,627 WARN  
/usr/local/lib/python3.9/site-packages/apache_beam/options/pipeline_options.py:291
 [] - Not setting flag with value None: flink_conf_dir
2023-12-19 08:18:05,628 INFO  
/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker_main.py:111
 [] - semi_persistent_directory: /tmp
2023-12-19 08:18:05,628 WARN  
/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker_main.py:356
 [] - No session file found: /tmp/staged/pickled_main_session. Functions 
defined in __main__ (interactive session) may fail.
2023-12-19 08:18:05,629 WARN  
/usr/local/lib/python3.9/site-packages/apache_beam/options/pipeline_options.py:367
 [] - Discarding unparseable args: ['--direct_runner_use_stacked_bundle', 
'--options_id=1', '--pipeline_type_check']
2023-12-19 08:18:05,629 INFO  
/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker_main.py:135
 [] - Pipeline_options: {'streaming': True, 'job_name': 
'BeamApp-flink-1219081730-11566b15', 'gcp_oauth_scopes': 
['https://www.googleapis.com/auth/bigquery', 
'https://www.googleapis.com/auth/cloud-platform', 
'https://www.googleapis.com/auth/devstorage.full_control', 
'https://www.googleapis.com/auth/userinfo.email', 
'https://www.googleapis.com/auth/datastore', 
'https://www.googleapis.com/auth/spanner.admin', 
'https://www.googleapis.com/auth/spanner.data', 
'https://www.googleapis.com/auth/bigquery', 
'https://www.googleapis.com/auth/cloud-platform', 
'https://www.googleapis.com/auth/devstorage.full_control', 
'https://www.googleapis.com/auth/userinfo.email', 
'https://www.googleapis.com/auth/datastore', 
'https://www.googleapis.com/auth/spanner.admin', 
'https://www.googleapis.com/auth/spanner.data'], 
'default_sdk_harness_log_level': 'DEBUG', 'experiments': ['beam_fn_api'], 
'sdk_location': 'container', 'environment_type': 'PROCESS', 
'environment_config': '{"command":"/opt/apache/beam/boot"}', 
'sdk_worker_parallelism': '1', 'environment_cache_millis': '0', 
'flink_submit_uber_jar': True}
2023-12-19 08:18:05,672 INFO  
/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/statecache.py:234
 [] - Creating state cache with size 104857600
2023-12-19 08:18:05,672 INFO  
/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py:187
 [] - Creating insecure control channel for localhost:35427.
2023-12-19 08:18:05,679 INFO