Hi
 the following code works for me - mind you i have amended slightly the
code.
few qqq:
1 - where are you running it from>?  local pc or GCP console?
2 - has it ever ran before?
3 - can you show the command line you are using to kick off hte process?

i have built your code using gcloud build, and run it using another yaml
below is the code i have used. Do you have a github repo you can use so i
can play round with your code?

hth
 Marco

import apache_beam as beam
import argparse
import logging
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from datetime import date, datetime
from apache_beam.options.value_provider import RuntimeValueProvider
from datetime import date
import requests
import urllib
import requests

EDGAR_QUARTERLY_URL =
'gs://mm_dataflow_bucket/inputs/all_data_utilities_df.csv'

def run(argv=None, save_main_session=True):

    parser = argparse.ArgumentParser()
    known_args, pipeline_args = parser.parse_known_args(argv)
    pipeline_options = PipelineOptions()
    pipeline_options.view_as(SetupOptions).save_main_session = True
    logging.info('starting pipeline..')
    with beam.Pipeline(options=pipeline_options) as p:
          (p
         | 'ReadTextFile'
 >> beam.io.textio.ReadFromText(EDGAR_QUARTERLY_URL)
         | 'Log it out' >> beam.Map(logging.info))

if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()











On Tue, Aug 10, 2021 at 2:29 PM Irina Sandu <[email protected]> wrote:

> Hi all!
> We are trying to run a simple dataflow job which just reads from a GCS
> file. The job starts, and runs for approx an hour, after which terminates
> with the following error:
>
> Workflow failed. Causes: The Dataflow job appears to be stuck because no
> worker activity has been seen in the last 1h. Please check the worker logs
> in Stackdriver Logging. You can also get help with Cloud Dataflow at
> https://cloud.google.com/dataflow/support.
>
> This is how the pipeline is run:
>     pipeline_options = PipelineOptions(argv)
>     rec_options = pipeline_options.view_as(RecOptions)
>     filename = 'gs://<path-to-file-in-gcs>'
>
>     p = beam.Pipeline(options=pipeline_options)
>     _ = (p
>          | 'ReadTextFile' >> beam.io.textio.ReadFromText(filename))
>     p.run().wait_until_finish()
>
> In our requirements file we only have: apache_beam[gcp]==2.26.0
>
> Do you have any idea why this might happen?
>

Reply via email to