Hi Ella,

It seems like, the package related to indexes.base is not installed in the
workers. Could you try one of the methods in "Managing Python Pipeline
Dependencies" [1], to stage that dependency?

Ahmet

[1] https://beam.apache.org/documentation/sdks/python-pipeline-dependencies/

On Thu, Jun 21, 2018 at 9:40 AM, OrielResearch Eila Arich-Landkof <
[email protected]> wrote:

> Hello all,
>
> Exploring that issue (Local runner - works great and Dataflow fails),
> there might be a mismatch between the apache_beam version and the dataflow
> version
>
> Please let me know what your thoughts are. if it is a version issue, what
> updates should be executed? how do I cover the installation on the datalab
> VM and the Google Cloud Platform.
>
> Running the following command / or a different command on the shell? on
> datalab?
>
> I tried running this on the datalab and it didnt solve the issue (*see
> below the full logs report*)
>
> pip install --upgrade apache_beam google-cloud-dataflow
>
> Please advice.
>
> Thanks,
> Eila
>
>
> *All logs:*
>
>
> INFO:root:Staging the SDK tarball from PyPI to 
> gs://archs4/staging/label-archs4-tsv.1529598693.453095/dataflow_python_sdk.tar
> INFO:root:Executing command: ['/usr/local/envs/py2env/bin/python', '-m', 
> 'pip', 'install', '--download', '/tmp/tmp5MM5wr', 
> 'google-cloud-dataflow==2.0.0', '--no-binary', ':all:', '--no-deps']
> INFO:root:file copy from /tmp/tmp5MM5wr/google-cloud-dataflow-2.0.0.tar.gz to 
> gs://archs4/staging/label-archs4-tsv.1529598693.453095/dataflow_python_sdk.tar.
> INFO:oauth2client.client:Attempting refresh to obtain initial access_token
> INFO:oauth2client.client:Attempting refresh to obtain initial access_token
> INFO:root:Create job: <Job
>  createTime: u'2018-06-21T16:31:51.304121Z'
>  currentStateTime: u'1970-01-01T00:00:00Z'
>  id: u'2018-06-21_09_31_50-17545183031487377678'
>  location: u'us-central1'
>  name: u'label-archs4-tsv'
>  projectId: u'orielresearch-188115'
>  stageStates: []
>  steps: []
>  tempFiles: []
>  type: TypeValueValuesEnum(JOB_TYPE_BATCH, 1)>
> INFO:root:Created job with id: [2018-06-21_09_31_50-17545183031487377678]
> INFO:root:To access the Dataflow monitoring console, please navigate to 
> https://console.developers.google.com/project/orielresearch-188115/dataflow/job/2018-06-21_09_31_50-17545183031487377678
> INFO:root:Job 2018-06-21_09_31_50-17545183031487377678 is in state 
> JOB_STATE_PENDING
> INFO:root:2018-06-21T16:31:50.476Z: JOB_MESSAGE_DETAILED: Autoscaling is 
> enabled for job 2018-06-21_09_31_50-17545183031487377678. The number of 
> workers will be between 1 and 1000.
> INFO:root:2018-06-21T16:31:50.506Z: JOB_MESSAGE_DETAILED: Autoscaling was 
> automatically enabled for job 2018-06-21_09_31_50-17545183031487377678.
> INFO:root:2018-06-21T16:31:53.079Z: JOB_MESSAGE_DETAILED: Checking required 
> Cloud APIs are enabled.
> INFO:root:2018-06-21T16:31:53.385Z: JOB_MESSAGE_DETAILED: Checking 
> permissions granted to controller Service Account.
> INFO:root:2018-06-21T16:31:54.161Z: JOB_MESSAGE_BASIC: Worker configuration: 
> n1-standard-1 in us-central1-b.
> INFO:root:2018-06-21T16:31:54.910Z: JOB_MESSAGE_DETAILED: Expanding 
> CoGroupByKey operations into optimizable parts.
> INFO:root:2018-06-21T16:31:54.936Z: JOB_MESSAGE_DEBUG: Combiner lifting 
> skipped for step writing to TSV files/Write/WriteImpl/GroupByKey: GroupByKey 
> not followed by a combiner.
> INFO:root:2018-06-21T16:31:54.968Z: JOB_MESSAGE_DETAILED: Expanding 
> GroupByKey operations into optimizable parts.
> INFO:root:2018-06-21T16:31:54.992Z: JOB_MESSAGE_DETAILED: Lifting 
> ValueCombiningMappingFns into MergeBucketsMappingFns
> INFO:root:2018-06-21T16:31:55.056Z: JOB_MESSAGE_DEBUG: Annotating graph with 
> Autotuner information.
> INFO:root:2018-06-21T16:31:55.168Z: JOB_MESSAGE_DETAILED: Fusing adjacent 
> ParDo, Read, Write, and Flatten operations
> INFO:root:2018-06-21T16:31:55.195Z: JOB_MESSAGE_DETAILED: Fusing consumer 
> create more columns into Extract the rows from dataframe
> INFO:root:2018-06-21T16:31:55.221Z: JOB_MESSAGE_DETAILED: Fusing consumer 
> writing to TSV files/Write/WriteImpl/GroupByKey/Reify into writing to TSV 
> files/Write/WriteImpl/WindowInto(WindowIntoFn)
> INFO:root:2018-06-21T16:31:55.244Z: JOB_MESSAGE_DETAILED: Fusing consumer 
> writing to TSV files/Write/WriteImpl/GroupByKey/Write into writing to TSV 
> files/Write/WriteImpl/GroupByKey/Reify
> INFO:root:2018-06-21T16:31:55.271Z: JOB_MESSAGE_DETAILED: Fusing consumer 
> writing to TSV files/Write/WriteImpl/WriteBundles/Do into writing to TSV 
> files/Write/WriteImpl/GroupByKey/GroupByWindow
> INFO:root:2018-06-21T16:31:55.303Z: JOB_MESSAGE_DETAILED: Fusing consumer 
> writing to TSV files/Write/WriteImpl/Map(<lambda at iobase.py:895>) into 
> create more columns
> INFO:root:2018-06-21T16:31:55.328Z: JOB_MESSAGE_DETAILED: Fusing consumer 
> writing to TSV files/Write/WriteImpl/WindowInto(WindowIntoFn) into writing to 
> TSV files/Write/WriteImpl/Map(<lambda at iobase.py:895>)
> INFO:root:2018-06-21T16:31:55.341Z: JOB_MESSAGE_DETAILED: Fusing consumer 
> writing to TSV files/Write/WriteImpl/GroupByKey/GroupByWindow into writing to 
> TSV files/Write/WriteImpl/GroupByKey/Read
> INFO:root:2018-06-21T16:31:55.365Z: JOB_MESSAGE_DETAILED: Fusing consumer 
> writing to TSV files/Write/WriteImpl/InitializeWrite into writing to TSV 
> files/Write/WriteImpl/DoOnce/Read
> INFO:root:2018-06-21T16:31:55.396Z: JOB_MESSAGE_DEBUG: Workflow config is 
> missing a default resource spec.
> INFO:root:2018-06-21T16:31:55.432Z: JOB_MESSAGE_DEBUG: Adding StepResource 
> setup and teardown to workflow graph.
> INFO:root:2018-06-21T16:31:55.461Z: JOB_MESSAGE_DEBUG: Adding workflow start 
> and stop steps.
> INFO:root:2018-06-21T16:31:55.486Z: JOB_MESSAGE_DEBUG: Assigning stage ids.
> INFO:root:2018-06-21T16:31:55.641Z: JOB_MESSAGE_DEBUG: Executing wait step 
> start15
> INFO:root:Job 2018-06-21_09_31_50-17545183031487377678 is in state 
> JOB_STATE_RUNNING
> INFO:root:2018-06-21T16:31:55.701Z: JOB_MESSAGE_BASIC: Executing operation 
> writing to TSV files/Write/WriteImpl/DoOnce/Read+writing to TSV 
> files/Write/WriteImpl/InitializeWrite
> INFO:root:2018-06-21T16:31:55.727Z: JOB_MESSAGE_BASIC: Executing operation 
> writing to TSV files/Write/WriteImpl/GroupByKey/Create
> INFO:root:2018-06-21T16:31:55.739Z: JOB_MESSAGE_DEBUG: Starting worker pool 
> setup.
> INFO:root:2018-06-21T16:31:55.753Z: JOB_MESSAGE_BASIC: Starting 1 workers in 
> us-central1-b...
> INFO:root:2018-06-21T16:31:55.839Z: JOB_MESSAGE_DEBUG: Value "writing to TSV 
> files/Write/WriteImpl/GroupByKey/Session" materialized.
> INFO:root:2018-06-21T16:31:55.901Z: JOB_MESSAGE_BASIC: Executing operation 
> Extract the rows from dataframe+create more columns+writing to TSV 
> files/Write/WriteImpl/Map(<lambda at iobase.py:895>)+writing to TSV 
> files/Write/WriteImpl/WindowInto(WindowIntoFn)+writing to TSV 
> files/Write/WriteImpl/GroupByKey/Reify+writing to TSV 
> files/Write/WriteImpl/GroupByKey/Write
> INFO:root:2018-06-21T16:31:56.332Z: JOB_MESSAGE_BASIC: BigQuery export job 
> "dataflow_job_576766793008965363" started. You can check its status with the 
> bq tool: "bq show -j --project_id=orielresearch-188115 
> dataflow_job_576766793008965363".
> INFO:root:2018-06-21T16:32:03.683Z: JOB_MESSAGE_DETAILED: Autoscaling: Raised 
> the number of workers to 0 based on the rate of progress in the currently 
> running step(s).
> INFO:root:2018-06-21T16:32:14.181Z: JOB_MESSAGE_DETAILED: Autoscaling: Raised 
> the number of workers to 1 based on the rate of progress in the currently 
> running step(s).
> INFO:root:2018-06-21T16:32:26.827Z: JOB_MESSAGE_DETAILED: BigQuery export job 
> progress: "dataflow_job_576766793008965363" observed total of 1 exported 
> files thus far.
> INFO:root:2018-06-21T16:32:26.850Z: JOB_MESSAGE_BASIC: BigQuery export job 
> finished: "dataflow_job_576766793008965363"
> INFO:root:2018-06-21T16:32:33.078Z: JOB_MESSAGE_DETAILED: Workers have 
> started successfully.
> INFO:root:2018-06-21T16:35:35.511Z: JOB_MESSAGE_ERROR: Traceback (most recent 
> call last):
>   File 
> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 
> 581, in do_work
>     work_executor.execute()
>   File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", 
> line 166, in execute
>     op.start()
>   File "dataflow_worker/operations.py", line 283, in 
> dataflow_worker.operations.DoOperation.start 
> (dataflow_worker/operations.c:10680)
>     def start(self):
>   File "dataflow_worker/operations.py", line 284, in 
> dataflow_worker.operations.DoOperation.start 
> (dataflow_worker/operations.c:10574)
>     with self.scoped_start_state:
>   File "dataflow_worker/operations.py", line 289, in 
> dataflow_worker.operations.DoOperation.start 
> (dataflow_worker/operations.c:9775)
>     pickler.loads(self.spec.serialized_fn))
>   File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/internal/pickler.py", 
> line 225, in loads
>     return dill.loads(s)
>   File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 277, in 
> loads
>     return load(file)
>   File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 266, in 
> load
>     obj = pik.load()
>   File "/usr/lib/python2.7/pickle.py", line 858, in load
>     dispatch[key](self)
>   File "/usr/lib/python2.7/pickle.py", line 1090, in load_global
>     klass = self.find_class(module, name)
>   File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 423, in 
> find_class
>     return StockUnpickler.find_class(self, module, name)
>   File "/usr/lib/python2.7/pickle.py", line 1124, in find_class
>     __import__(module)
> ImportError: No module named indexes.base
>
> INFO:root:2018-06-21T16:35:38.897Z: JOB_MESSAGE_ERROR: Traceback (most recent 
> call last):
>   File 
> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 
> 581, in do_work
>     work_executor.execute()
>   File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", 
> line 166, in execute
>     op.start()
>   File "dataflow_worker/operations.py", line 283, in 
> dataflow_worker.operations.DoOperation.start 
> (dataflow_worker/operations.c:10680)
>     def start(self):
>   File "dataflow_worker/operations.py", line 284, in 
> dataflow_worker.operations.DoOperation.start 
> (dataflow_worker/operations.c:10574)
>     with self.scoped_start_state:
>   File "dataflow_worker/operations.py", line 289, in 
> dataflow_worker.operations.DoOperation.start 
> (dataflow_worker/operations.c:9775)
>     pickler.loads(self.spec.serialized_fn))
>   File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/internal/pickler.py", 
> line 225, in loads
>     return dill.loads(s)
>   File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 277, in 
> loads
>     return load(file)
>   File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 266, in 
> load
>     obj = pik.load()
>   File "/usr/lib/python2.7/pickle.py", line 858, in load
>     dispatch[key](self)
>   File "/usr/lib/python2.7/pickle.py", line 1090, in load_global
>     klass = self.find_class(module, name)
>   File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 423, in 
> find_class
>     return StockUnpickler.find_class(self, module, name)
>   File "/usr/lib/python2.7/pickle.py", line 1124, in find_class
>     __import__(module)
> ImportError: No module named indexes.base
>
> INFO:root:2018-06-21T16:35:42.245Z: JOB_MESSAGE_ERROR: Traceback (most recent 
> call last):
>   File 
> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 
> 581, in do_work
>     work_executor.execute()
>   File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", 
> line 166, in execute
>     op.start()
>   File "dataflow_worker/operations.py", line 283, in 
> dataflow_worker.operations.DoOperation.start 
> (dataflow_worker/operations.c:10680)
>     def start(self):
>   File "dataflow_worker/operations.py", line 284, in 
> dataflow_worker.operations.DoOperation.start 
> (dataflow_worker/operations.c:10574)
>     with self.scoped_start_state:
>   File "dataflow_worker/operations.py", line 289, in 
> dataflow_worker.operations.DoOperation.start 
> (dataflow_worker/operations.c:9775)
>     pickler.loads(self.spec.serialized_fn))
>   File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/internal/pickler.py", 
> line 225, in loads
>     return dill.loads(s)
>   File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 277, in 
> loads
>     return load(file)
>   File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 266, in 
> load
>     obj = pik.load()
>   File "/usr/lib/python2.7/pickle.py", line 858, in load
>     dispatch[key](self)
>   File "/usr/lib/python2.7/pickle.py", line 1090, in load_global
>     klass = self.find_class(module, name)
>   File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 423, in 
> find_class
>     return StockUnpickler.find_class(self, module, name)
>   File "/usr/lib/python2.7/pickle.py", line 1124, in find_class
>     __import__(module)
> ImportError: No module named indexes.base
>
> INFO:root:2018-06-21T16:35:45.619Z: JOB_MESSAGE_ERROR: Traceback (most recent 
> call last):
>   File 
> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 
> 581, in do_work
>     work_executor.execute()
>   File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", 
> line 166, in execute
>     op.start()
>   File "dataflow_worker/operations.py", line 283, in 
> dataflow_worker.operations.DoOperation.start 
> (dataflow_worker/operations.c:10680)
>     def start(self):
>   File "dataflow_worker/operations.py", line 284, in 
> dataflow_worker.operations.DoOperation.start 
> (dataflow_worker/operations.c:10574)
>     with self.scoped_start_state:
>   File "dataflow_worker/operations.py", line 289, in 
> dataflow_worker.operations.DoOperation.start 
> (dataflow_worker/operations.c:9775)
>     pickler.loads(self.spec.serialized_fn))
>   File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/internal/pickler.py", 
> line 225, in loads
>     return dill.loads(s)
>   File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 277, in 
> loads
>     return load(file)
>   File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 266, in 
> load
>     obj = pik.load()
>   File "/usr/lib/python2.7/pickle.py", line 858, in load
>     dispatch[key](self)
>   File "/usr/lib/python2.7/pickle.py", line 1090, in load_global
>     klass = self.find_class(module, name)
>   File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 423, in 
> find_class
>     return StockUnpickler.find_class(self, module, name)
>   File "/usr/lib/python2.7/pickle.py", line 1124, in find_class
>     __import__(module)
> ImportError: No module named indexes.base
>
> INFO:root:2018-06-21T16:35:45.668Z: JOB_MESSAGE_DEBUG: Executing failure step 
> failure14
> INFO:root:2018-06-21T16:35:45.695Z: JOB_MESSAGE_ERROR: Workflow failed. 
> Causes: S04:Extract the rows from dataframe+create more columns+writing to 
> TSV files/Write/WriteImpl/Map(<lambda at iobase.py:895>)+writing to TSV 
> files/Write/WriteImpl/WindowInto(WindowIntoFn)+writing to TSV 
> files/Write/WriteImpl/GroupByKey/Reify+writing to TSV 
> files/Write/WriteImpl/GroupByKey/Write failed., A work item was attempted 4 
> times without success. Each time the worker eventually lost contact with the 
> service. The work item was attempted on:
>   label-archs4-tsv-06210931-a4r1-harness-rlqz,
>   label-archs4-tsv-06210931-a4r1-harness-rlqz,
>   label-archs4-tsv-06210931-a4r1-harness-rlqz,
>   label-archs4-tsv-06210931-a4r1-harness-rlqz
> INFO:root:2018-06-21T16:35:45.799Z: JOB_MESSAGE_DETAILED: Cleaning up.
> INFO:root:2018-06-21T16:35:46Z: JOB_MESSAGE_DEBUG: Starting worker pool 
> teardown.
> INFO:root:2018-06-21T16:35:46.027Z: JOB_MESSAGE_BASIC: Stopping worker pool...
>
>
>
> On Wed, Jun 20, 2018 at 5:02 PM, OrielResearch Eila Arich-Landkof <
> [email protected]> wrote:
>
>> Hello,
>>
>> I am running the following pipeline on the local runner with no issues.
>>
>> logging.info('Define the pipeline')
>> p =  beam.Pipeline(options=options)
>> samplePath = outputPath
>> ExploreData = (p | "Extract the rows from dataframe" >> beam.io.Read(
>> beam.io.BigQuerySource('archs4.Debug_annotation'))
>>                  | "create more columns" >> beam.ParDo(CreateColForSampleF
>> n(colListSubset,outputPath)))
>> (ExploreData | 'writing to TSV files' >> beam.io.WriteToText('gs://arch
>> s4/output/dataExploration.txt',file_name_suffix='.tsv',num_
>> shards=1,append_trailing_newlines=True,header=colListStrHeader))
>>
>>
>> Running on Dataflow fires the below error. I don't have any idea where to
>> look for the issue. The error is not pointing to my pipeline code but to
>> apache beam modules.
>> I will try debugging using elimination. Please let me know if you have
>> any direction for me.
>>
>> Many thanks,
>> Eila
>>
>>
>> ======================================================
>>
>> DataflowRuntimeExceptionTraceback (most recent call 
>> last)<ipython-input-151-1e5aeb8b7d9b> in <module>()----> 1 
>> p.run().wait_until_finish()
>> /usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/dataflow/dataflow_runner.pyc
>>  in wait_until_finish(self, duration)    776         raise 
>> DataflowRuntimeException(    777             'Dataflow pipeline failed. 
>> State: %s, Error:\n%s' %--> 778             (self.state, 
>> getattr(self._runner, 'last_error_msg', None)), self)    779     return 
>> self.state    780
>> DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error:
>> Traceback (most recent call last):
>>   File 
>> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", 
>> line 581, in do_work
>>     work_executor.execute()
>>   File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", 
>> line 166, in execute
>>     op.start()
>>   File "dataflow_worker/operations.py", line 283, in 
>> dataflow_worker.operations.DoOperation.start 
>> (dataflow_worker/operations.c:10680)
>>     def start(self):
>>   File "dataflow_worker/operations.py", line 284, in 
>> dataflow_worker.operations.DoOperation.start 
>> (dataflow_worker/operations.c:10574)
>>     with self.scoped_start_state:
>>   File "dataflow_worker/operations.py", line 289, in 
>> dataflow_worker.operations.DoOperation.start 
>> (dataflow_worker/operations.c:9775)
>>     pickler.loads(self.spec.serialized_fn))
>>   File 
>> "/usr/local/lib/python2.7/dist-packages/apache_beam/internal/pickler.py", 
>> line 225, in loads
>>     return dill.loads(s)
>>   File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 277, in 
>> loads
>>     return load(file)
>>   File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 266, in 
>> load
>>     obj = pik.load()
>>   File "/usr/lib/python2.7/pickle.py", line 858, in load
>>     dispatch[key](self)
>>   File "/usr/lib/python2.7/pickle.py", line 1090, in load_global
>>     klass = self.find_class(module, name)
>>   File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 423, in 
>> find_class
>>     return StockUnpickler.find_class(self, module, name)
>>   File "/usr/lib/python2.7/pickle.py", line 1124, in find_class
>>     __import__(module)
>> ImportError: No module named indexes.base
>>
>> ======================================================
>>
>> --
>> Eila
>> www.orielresearch.org
>> https://www.meetu <https://www.meetup.com/Deep-Learning-In-Production/>
>> p.co <https://www.meetup.com/Deep-Learning-In-Production/>m/Deep-Le
>> arning-In-Production/
>> <https://www.meetup.com/Deep-Learning-In-Production/>
>>
>>
>>
>
>
> --
> Eila
> www.orielresearch.org
> https://www.meetu <https://www.meetup.com/Deep-Learning-In-Production/>
> p.co <https://www.meetup.com/Deep-Learning-In-Production/>m/Deep-
> Learning-In-Production/
> <https://www.meetup.com/Deep-Learning-In-Production/>
>
>
>

Reply via email to