Hi Ella, It seems like, the package related to indexes.base is not installed in the workers. Could you try one of the methods in "Managing Python Pipeline Dependencies" [1], to stage that dependency?
Ahmet [1] https://beam.apache.org/documentation/sdks/python-pipeline-dependencies/ On Thu, Jun 21, 2018 at 9:40 AM, OrielResearch Eila Arich-Landkof < [email protected]> wrote: > Hello all, > > Exploring that issue (Local runner - works great and Dataflow fails), > there might be a mismatch between the apache_beam version and the dataflow > version > > Please let me know what your thoughts are. if it is a version issue, what > updates should be executed? how do I cover the installation on the datalab > VM and the Google Cloud Platform. > > Running the following command / or a different command on the shell? on > datalab? > > I tried running this on the datalab and it didnt solve the issue (*see > below the full logs report*) > > pip install --upgrade apache_beam google-cloud-dataflow > > Please advice. > > Thanks, > Eila > > > *All logs:* > > > INFO:root:Staging the SDK tarball from PyPI to > gs://archs4/staging/label-archs4-tsv.1529598693.453095/dataflow_python_sdk.tar > INFO:root:Executing command: ['/usr/local/envs/py2env/bin/python', '-m', > 'pip', 'install', '--download', '/tmp/tmp5MM5wr', > 'google-cloud-dataflow==2.0.0', '--no-binary', ':all:', '--no-deps'] > INFO:root:file copy from /tmp/tmp5MM5wr/google-cloud-dataflow-2.0.0.tar.gz to > gs://archs4/staging/label-archs4-tsv.1529598693.453095/dataflow_python_sdk.tar. > INFO:oauth2client.client:Attempting refresh to obtain initial access_token > INFO:oauth2client.client:Attempting refresh to obtain initial access_token > INFO:root:Create job: <Job > createTime: u'2018-06-21T16:31:51.304121Z' > currentStateTime: u'1970-01-01T00:00:00Z' > id: u'2018-06-21_09_31_50-17545183031487377678' > location: u'us-central1' > name: u'label-archs4-tsv' > projectId: u'orielresearch-188115' > stageStates: [] > steps: [] > tempFiles: [] > type: TypeValueValuesEnum(JOB_TYPE_BATCH, 1)> > INFO:root:Created job with id: [2018-06-21_09_31_50-17545183031487377678] > INFO:root:To access the Dataflow monitoring console, please navigate to > https://console.developers.google.com/project/orielresearch-188115/dataflow/job/2018-06-21_09_31_50-17545183031487377678 > INFO:root:Job 2018-06-21_09_31_50-17545183031487377678 is in state > JOB_STATE_PENDING > INFO:root:2018-06-21T16:31:50.476Z: JOB_MESSAGE_DETAILED: Autoscaling is > enabled for job 2018-06-21_09_31_50-17545183031487377678. The number of > workers will be between 1 and 1000. > INFO:root:2018-06-21T16:31:50.506Z: JOB_MESSAGE_DETAILED: Autoscaling was > automatically enabled for job 2018-06-21_09_31_50-17545183031487377678. > INFO:root:2018-06-21T16:31:53.079Z: JOB_MESSAGE_DETAILED: Checking required > Cloud APIs are enabled. > INFO:root:2018-06-21T16:31:53.385Z: JOB_MESSAGE_DETAILED: Checking > permissions granted to controller Service Account. > INFO:root:2018-06-21T16:31:54.161Z: JOB_MESSAGE_BASIC: Worker configuration: > n1-standard-1 in us-central1-b. > INFO:root:2018-06-21T16:31:54.910Z: JOB_MESSAGE_DETAILED: Expanding > CoGroupByKey operations into optimizable parts. > INFO:root:2018-06-21T16:31:54.936Z: JOB_MESSAGE_DEBUG: Combiner lifting > skipped for step writing to TSV files/Write/WriteImpl/GroupByKey: GroupByKey > not followed by a combiner. > INFO:root:2018-06-21T16:31:54.968Z: JOB_MESSAGE_DETAILED: Expanding > GroupByKey operations into optimizable parts. > INFO:root:2018-06-21T16:31:54.992Z: JOB_MESSAGE_DETAILED: Lifting > ValueCombiningMappingFns into MergeBucketsMappingFns > INFO:root:2018-06-21T16:31:55.056Z: JOB_MESSAGE_DEBUG: Annotating graph with > Autotuner information. > INFO:root:2018-06-21T16:31:55.168Z: JOB_MESSAGE_DETAILED: Fusing adjacent > ParDo, Read, Write, and Flatten operations > INFO:root:2018-06-21T16:31:55.195Z: JOB_MESSAGE_DETAILED: Fusing consumer > create more columns into Extract the rows from dataframe > INFO:root:2018-06-21T16:31:55.221Z: JOB_MESSAGE_DETAILED: Fusing consumer > writing to TSV files/Write/WriteImpl/GroupByKey/Reify into writing to TSV > files/Write/WriteImpl/WindowInto(WindowIntoFn) > INFO:root:2018-06-21T16:31:55.244Z: JOB_MESSAGE_DETAILED: Fusing consumer > writing to TSV files/Write/WriteImpl/GroupByKey/Write into writing to TSV > files/Write/WriteImpl/GroupByKey/Reify > INFO:root:2018-06-21T16:31:55.271Z: JOB_MESSAGE_DETAILED: Fusing consumer > writing to TSV files/Write/WriteImpl/WriteBundles/Do into writing to TSV > files/Write/WriteImpl/GroupByKey/GroupByWindow > INFO:root:2018-06-21T16:31:55.303Z: JOB_MESSAGE_DETAILED: Fusing consumer > writing to TSV files/Write/WriteImpl/Map(<lambda at iobase.py:895>) into > create more columns > INFO:root:2018-06-21T16:31:55.328Z: JOB_MESSAGE_DETAILED: Fusing consumer > writing to TSV files/Write/WriteImpl/WindowInto(WindowIntoFn) into writing to > TSV files/Write/WriteImpl/Map(<lambda at iobase.py:895>) > INFO:root:2018-06-21T16:31:55.341Z: JOB_MESSAGE_DETAILED: Fusing consumer > writing to TSV files/Write/WriteImpl/GroupByKey/GroupByWindow into writing to > TSV files/Write/WriteImpl/GroupByKey/Read > INFO:root:2018-06-21T16:31:55.365Z: JOB_MESSAGE_DETAILED: Fusing consumer > writing to TSV files/Write/WriteImpl/InitializeWrite into writing to TSV > files/Write/WriteImpl/DoOnce/Read > INFO:root:2018-06-21T16:31:55.396Z: JOB_MESSAGE_DEBUG: Workflow config is > missing a default resource spec. > INFO:root:2018-06-21T16:31:55.432Z: JOB_MESSAGE_DEBUG: Adding StepResource > setup and teardown to workflow graph. > INFO:root:2018-06-21T16:31:55.461Z: JOB_MESSAGE_DEBUG: Adding workflow start > and stop steps. > INFO:root:2018-06-21T16:31:55.486Z: JOB_MESSAGE_DEBUG: Assigning stage ids. > INFO:root:2018-06-21T16:31:55.641Z: JOB_MESSAGE_DEBUG: Executing wait step > start15 > INFO:root:Job 2018-06-21_09_31_50-17545183031487377678 is in state > JOB_STATE_RUNNING > INFO:root:2018-06-21T16:31:55.701Z: JOB_MESSAGE_BASIC: Executing operation > writing to TSV files/Write/WriteImpl/DoOnce/Read+writing to TSV > files/Write/WriteImpl/InitializeWrite > INFO:root:2018-06-21T16:31:55.727Z: JOB_MESSAGE_BASIC: Executing operation > writing to TSV files/Write/WriteImpl/GroupByKey/Create > INFO:root:2018-06-21T16:31:55.739Z: JOB_MESSAGE_DEBUG: Starting worker pool > setup. > INFO:root:2018-06-21T16:31:55.753Z: JOB_MESSAGE_BASIC: Starting 1 workers in > us-central1-b... > INFO:root:2018-06-21T16:31:55.839Z: JOB_MESSAGE_DEBUG: Value "writing to TSV > files/Write/WriteImpl/GroupByKey/Session" materialized. > INFO:root:2018-06-21T16:31:55.901Z: JOB_MESSAGE_BASIC: Executing operation > Extract the rows from dataframe+create more columns+writing to TSV > files/Write/WriteImpl/Map(<lambda at iobase.py:895>)+writing to TSV > files/Write/WriteImpl/WindowInto(WindowIntoFn)+writing to TSV > files/Write/WriteImpl/GroupByKey/Reify+writing to TSV > files/Write/WriteImpl/GroupByKey/Write > INFO:root:2018-06-21T16:31:56.332Z: JOB_MESSAGE_BASIC: BigQuery export job > "dataflow_job_576766793008965363" started. You can check its status with the > bq tool: "bq show -j --project_id=orielresearch-188115 > dataflow_job_576766793008965363". > INFO:root:2018-06-21T16:32:03.683Z: JOB_MESSAGE_DETAILED: Autoscaling: Raised > the number of workers to 0 based on the rate of progress in the currently > running step(s). > INFO:root:2018-06-21T16:32:14.181Z: JOB_MESSAGE_DETAILED: Autoscaling: Raised > the number of workers to 1 based on the rate of progress in the currently > running step(s). > INFO:root:2018-06-21T16:32:26.827Z: JOB_MESSAGE_DETAILED: BigQuery export job > progress: "dataflow_job_576766793008965363" observed total of 1 exported > files thus far. > INFO:root:2018-06-21T16:32:26.850Z: JOB_MESSAGE_BASIC: BigQuery export job > finished: "dataflow_job_576766793008965363" > INFO:root:2018-06-21T16:32:33.078Z: JOB_MESSAGE_DETAILED: Workers have > started successfully. > INFO:root:2018-06-21T16:35:35.511Z: JOB_MESSAGE_ERROR: Traceback (most recent > call last): > File > "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line > 581, in do_work > work_executor.execute() > File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", > line 166, in execute > op.start() > File "dataflow_worker/operations.py", line 283, in > dataflow_worker.operations.DoOperation.start > (dataflow_worker/operations.c:10680) > def start(self): > File "dataflow_worker/operations.py", line 284, in > dataflow_worker.operations.DoOperation.start > (dataflow_worker/operations.c:10574) > with self.scoped_start_state: > File "dataflow_worker/operations.py", line 289, in > dataflow_worker.operations.DoOperation.start > (dataflow_worker/operations.c:9775) > pickler.loads(self.spec.serialized_fn)) > File > "/usr/local/lib/python2.7/dist-packages/apache_beam/internal/pickler.py", > line 225, in loads > return dill.loads(s) > File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 277, in > loads > return load(file) > File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 266, in > load > obj = pik.load() > File "/usr/lib/python2.7/pickle.py", line 858, in load > dispatch[key](self) > File "/usr/lib/python2.7/pickle.py", line 1090, in load_global > klass = self.find_class(module, name) > File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 423, in > find_class > return StockUnpickler.find_class(self, module, name) > File "/usr/lib/python2.7/pickle.py", line 1124, in find_class > __import__(module) > ImportError: No module named indexes.base > > INFO:root:2018-06-21T16:35:38.897Z: JOB_MESSAGE_ERROR: Traceback (most recent > call last): > File > "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line > 581, in do_work > work_executor.execute() > File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", > line 166, in execute > op.start() > File "dataflow_worker/operations.py", line 283, in > dataflow_worker.operations.DoOperation.start > (dataflow_worker/operations.c:10680) > def start(self): > File "dataflow_worker/operations.py", line 284, in > dataflow_worker.operations.DoOperation.start > (dataflow_worker/operations.c:10574) > with self.scoped_start_state: > File "dataflow_worker/operations.py", line 289, in > dataflow_worker.operations.DoOperation.start > (dataflow_worker/operations.c:9775) > pickler.loads(self.spec.serialized_fn)) > File > "/usr/local/lib/python2.7/dist-packages/apache_beam/internal/pickler.py", > line 225, in loads > return dill.loads(s) > File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 277, in > loads > return load(file) > File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 266, in > load > obj = pik.load() > File "/usr/lib/python2.7/pickle.py", line 858, in load > dispatch[key](self) > File "/usr/lib/python2.7/pickle.py", line 1090, in load_global > klass = self.find_class(module, name) > File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 423, in > find_class > return StockUnpickler.find_class(self, module, name) > File "/usr/lib/python2.7/pickle.py", line 1124, in find_class > __import__(module) > ImportError: No module named indexes.base > > INFO:root:2018-06-21T16:35:42.245Z: JOB_MESSAGE_ERROR: Traceback (most recent > call last): > File > "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line > 581, in do_work > work_executor.execute() > File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", > line 166, in execute > op.start() > File "dataflow_worker/operations.py", line 283, in > dataflow_worker.operations.DoOperation.start > (dataflow_worker/operations.c:10680) > def start(self): > File "dataflow_worker/operations.py", line 284, in > dataflow_worker.operations.DoOperation.start > (dataflow_worker/operations.c:10574) > with self.scoped_start_state: > File "dataflow_worker/operations.py", line 289, in > dataflow_worker.operations.DoOperation.start > (dataflow_worker/operations.c:9775) > pickler.loads(self.spec.serialized_fn)) > File > "/usr/local/lib/python2.7/dist-packages/apache_beam/internal/pickler.py", > line 225, in loads > return dill.loads(s) > File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 277, in > loads > return load(file) > File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 266, in > load > obj = pik.load() > File "/usr/lib/python2.7/pickle.py", line 858, in load > dispatch[key](self) > File "/usr/lib/python2.7/pickle.py", line 1090, in load_global > klass = self.find_class(module, name) > File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 423, in > find_class > return StockUnpickler.find_class(self, module, name) > File "/usr/lib/python2.7/pickle.py", line 1124, in find_class > __import__(module) > ImportError: No module named indexes.base > > INFO:root:2018-06-21T16:35:45.619Z: JOB_MESSAGE_ERROR: Traceback (most recent > call last): > File > "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line > 581, in do_work > work_executor.execute() > File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", > line 166, in execute > op.start() > File "dataflow_worker/operations.py", line 283, in > dataflow_worker.operations.DoOperation.start > (dataflow_worker/operations.c:10680) > def start(self): > File "dataflow_worker/operations.py", line 284, in > dataflow_worker.operations.DoOperation.start > (dataflow_worker/operations.c:10574) > with self.scoped_start_state: > File "dataflow_worker/operations.py", line 289, in > dataflow_worker.operations.DoOperation.start > (dataflow_worker/operations.c:9775) > pickler.loads(self.spec.serialized_fn)) > File > "/usr/local/lib/python2.7/dist-packages/apache_beam/internal/pickler.py", > line 225, in loads > return dill.loads(s) > File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 277, in > loads > return load(file) > File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 266, in > load > obj = pik.load() > File "/usr/lib/python2.7/pickle.py", line 858, in load > dispatch[key](self) > File "/usr/lib/python2.7/pickle.py", line 1090, in load_global > klass = self.find_class(module, name) > File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 423, in > find_class > return StockUnpickler.find_class(self, module, name) > File "/usr/lib/python2.7/pickle.py", line 1124, in find_class > __import__(module) > ImportError: No module named indexes.base > > INFO:root:2018-06-21T16:35:45.668Z: JOB_MESSAGE_DEBUG: Executing failure step > failure14 > INFO:root:2018-06-21T16:35:45.695Z: JOB_MESSAGE_ERROR: Workflow failed. > Causes: S04:Extract the rows from dataframe+create more columns+writing to > TSV files/Write/WriteImpl/Map(<lambda at iobase.py:895>)+writing to TSV > files/Write/WriteImpl/WindowInto(WindowIntoFn)+writing to TSV > files/Write/WriteImpl/GroupByKey/Reify+writing to TSV > files/Write/WriteImpl/GroupByKey/Write failed., A work item was attempted 4 > times without success. Each time the worker eventually lost contact with the > service. The work item was attempted on: > label-archs4-tsv-06210931-a4r1-harness-rlqz, > label-archs4-tsv-06210931-a4r1-harness-rlqz, > label-archs4-tsv-06210931-a4r1-harness-rlqz, > label-archs4-tsv-06210931-a4r1-harness-rlqz > INFO:root:2018-06-21T16:35:45.799Z: JOB_MESSAGE_DETAILED: Cleaning up. > INFO:root:2018-06-21T16:35:46Z: JOB_MESSAGE_DEBUG: Starting worker pool > teardown. > INFO:root:2018-06-21T16:35:46.027Z: JOB_MESSAGE_BASIC: Stopping worker pool... > > > > On Wed, Jun 20, 2018 at 5:02 PM, OrielResearch Eila Arich-Landkof < > [email protected]> wrote: > >> Hello, >> >> I am running the following pipeline on the local runner with no issues. >> >> logging.info('Define the pipeline') >> p = beam.Pipeline(options=options) >> samplePath = outputPath >> ExploreData = (p | "Extract the rows from dataframe" >> beam.io.Read( >> beam.io.BigQuerySource('archs4.Debug_annotation')) >> | "create more columns" >> beam.ParDo(CreateColForSampleF >> n(colListSubset,outputPath))) >> (ExploreData | 'writing to TSV files' >> beam.io.WriteToText('gs://arch >> s4/output/dataExploration.txt',file_name_suffix='.tsv',num_ >> shards=1,append_trailing_newlines=True,header=colListStrHeader)) >> >> >> Running on Dataflow fires the below error. I don't have any idea where to >> look for the issue. The error is not pointing to my pipeline code but to >> apache beam modules. >> I will try debugging using elimination. Please let me know if you have >> any direction for me. >> >> Many thanks, >> Eila >> >> >> ====================================================== >> >> DataflowRuntimeExceptionTraceback (most recent call >> last)<ipython-input-151-1e5aeb8b7d9b> in <module>()----> 1 >> p.run().wait_until_finish() >> /usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/dataflow/dataflow_runner.pyc >> in wait_until_finish(self, duration) 776 raise >> DataflowRuntimeException( 777 'Dataflow pipeline failed. >> State: %s, Error:\n%s' %--> 778 (self.state, >> getattr(self._runner, 'last_error_msg', None)), self) 779 return >> self.state 780 >> DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error: >> Traceback (most recent call last): >> File >> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", >> line 581, in do_work >> work_executor.execute() >> File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", >> line 166, in execute >> op.start() >> File "dataflow_worker/operations.py", line 283, in >> dataflow_worker.operations.DoOperation.start >> (dataflow_worker/operations.c:10680) >> def start(self): >> File "dataflow_worker/operations.py", line 284, in >> dataflow_worker.operations.DoOperation.start >> (dataflow_worker/operations.c:10574) >> with self.scoped_start_state: >> File "dataflow_worker/operations.py", line 289, in >> dataflow_worker.operations.DoOperation.start >> (dataflow_worker/operations.c:9775) >> pickler.loads(self.spec.serialized_fn)) >> File >> "/usr/local/lib/python2.7/dist-packages/apache_beam/internal/pickler.py", >> line 225, in loads >> return dill.loads(s) >> File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 277, in >> loads >> return load(file) >> File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 266, in >> load >> obj = pik.load() >> File "/usr/lib/python2.7/pickle.py", line 858, in load >> dispatch[key](self) >> File "/usr/lib/python2.7/pickle.py", line 1090, in load_global >> klass = self.find_class(module, name) >> File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 423, in >> find_class >> return StockUnpickler.find_class(self, module, name) >> File "/usr/lib/python2.7/pickle.py", line 1124, in find_class >> __import__(module) >> ImportError: No module named indexes.base >> >> ====================================================== >> >> -- >> Eila >> www.orielresearch.org >> https://www.meetu <https://www.meetup.com/Deep-Learning-In-Production/> >> p.co <https://www.meetup.com/Deep-Learning-In-Production/>m/Deep-Le >> arning-In-Production/ >> <https://www.meetup.com/Deep-Learning-In-Production/> >> >> >> > > > -- > Eila > www.orielresearch.org > https://www.meetu <https://www.meetup.com/Deep-Learning-In-Production/> > p.co <https://www.meetup.com/Deep-Learning-In-Production/>m/Deep- > Learning-In-Production/ > <https://www.meetup.com/Deep-Learning-In-Production/> > > >
