Hello,
I am running the following pipeline on the local runner with no issues.
logging.info('Define the pipeline')
p = beam.Pipeline(options=options)
samplePath = outputPath
ExploreData = (p | "Extract the rows from dataframe" >>
beam.io.Read(beam.io.BigQuerySource('archs4.Debug_annotation'))
| "create more columns" >>
beam.ParDo(CreateColForSampleFn(colListSubset,outputPath)))
(ExploreData | 'writing to TSV files' >>
beam.io.WriteToText('gs://archs4/output/dataExploration.txt',file_name_suffix='.tsv',num_shards=1,append_trailing_newlines=True,header=colListStrHeader))
Running on Dataflow fires the below error. I don't have any idea where to
look for the issue. The error is not pointing to my pipeline code but to
apache beam modules.
I will try debugging using elimination. Please let me know if you have any
direction for me.
Many thanks,
Eila
======================================================
DataflowRuntimeExceptionTraceback (most recent call
last)<ipython-input-151-1e5aeb8b7d9b> in <module>()----> 1
p.run().wait_until_finish()
/usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/dataflow/dataflow_runner.pyc
in wait_until_finish(self, duration) 776 raise
DataflowRuntimeException( 777 'Dataflow pipeline
failed. State: %s, Error:\n%s' %--> 778 (self.state,
getattr(self._runner, 'last_error_msg', None)), self) 779
return self.state 780
DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error:
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py",
line 581, in do_work
work_executor.execute()
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py",
line 166, in execute
op.start()
File "dataflow_worker/operations.py", line 283, in
dataflow_worker.operations.DoOperation.start
(dataflow_worker/operations.c:10680)
def start(self):
File "dataflow_worker/operations.py", line 284, in
dataflow_worker.operations.DoOperation.start
(dataflow_worker/operations.c:10574)
with self.scoped_start_state:
File "dataflow_worker/operations.py", line 289, in
dataflow_worker.operations.DoOperation.start
(dataflow_worker/operations.c:9775)
pickler.loads(self.spec.serialized_fn))
File "/usr/local/lib/python2.7/dist-packages/apache_beam/internal/pickler.py",
line 225, in loads
return dill.loads(s)
File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 277, in loads
return load(file)
File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 266, in load
obj = pik.load()
File "/usr/lib/python2.7/pickle.py", line 858, in load
dispatch[key](self)
File "/usr/lib/python2.7/pickle.py", line 1090, in load_global
klass = self.find_class(module, name)
File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line
423, in find_class
return StockUnpickler.find_class(self, module, name)
File "/usr/lib/python2.7/pickle.py", line 1124, in find_class
__import__(module)
ImportError: No module named indexes.base
======================================================
--
Eila
www.orielresearch.org
https://www.meetu <https://www.meetup.com/Deep-Learning-In-Production/>p.co
<https://www.meetup.com/Deep-Learning-In-Production/>
m/Deep-Learning-In-Production/
<https://www.meetup.com/Deep-Learning-In-Production/>