Hi
the following code works for me - mind you i have amended slightly the
code.
few qqq:
1 - where are you running it from>? local pc or GCP console?
2 - has it ever ran before?
3 - can you show the command line you are using to kick off hte process?
i have built your code using gcloud build, and run it using another yaml
below is the code i have used. Do you have a github repo you can use so i
can play round with your code?
hth
Marco
import apache_beam as beam
import argparse
import logging
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from datetime import date, datetime
from apache_beam.options.value_provider import RuntimeValueProvider
from datetime import date
import requests
import urllib
import requests
EDGAR_QUARTERLY_URL =
'gs://mm_dataflow_bucket/inputs/all_data_utilities_df.csv'
def run(argv=None, save_main_session=True):
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions()
pipeline_options.view_as(SetupOptions).save_main_session = True
logging.info('starting pipeline..')
with beam.Pipeline(options=pipeline_options) as p:
(p
| 'ReadTextFile'
>> beam.io.textio.ReadFromText(EDGAR_QUARTERLY_URL)
| 'Log it out' >> beam.Map(logging.info))
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
On Tue, Aug 10, 2021 at 2:29 PM Irina Sandu <[email protected]> wrote:
> Hi all!
> We are trying to run a simple dataflow job which just reads from a GCS
> file. The job starts, and runs for approx an hour, after which terminates
> with the following error:
>
> Workflow failed. Causes: The Dataflow job appears to be stuck because no
> worker activity has been seen in the last 1h. Please check the worker logs
> in Stackdriver Logging. You can also get help with Cloud Dataflow at
> https://cloud.google.com/dataflow/support.
>
> This is how the pipeline is run:
> pipeline_options = PipelineOptions(argv)
> rec_options = pipeline_options.view_as(RecOptions)
> filename = 'gs://<path-to-file-in-gcs>'
>
> p = beam.Pipeline(options=pipeline_options)
> _ = (p
> | 'ReadTextFile' >> beam.io.textio.ReadFromText(filename))
> p.run().wait_until_finish()
>
> In our requirements file we only have: apache_beam[gcp]==2.26.0
>
> Do you have any idea why this might happen?
>