BTW it doesn't seem to be related to the BQ sink. My job is failing now too
without that part (and it wasn't earlier today):
def test_error(
bq_table: str) -> str:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
class GenData(beam.DoFn):
def process(self, _):
for _ in range (20000):
yield {'a':1,'b':2}
pipeline = beam.Pipeline(options=PipelineOptions(
project='my-project',
temp_location = 'gs://my-bucket/temp',
staging_location = 'gs://my-bucket/staging',
runner='DataflowRunner'
))
#pipeline = beam.Pipeline()
(
pipeline
| 'Empty start' >> beam.Create([''])
| 'Generate Data' >> beam.ParDo(GenData())
| 'print' >> beam.Map(print)
)
result = pipeline.run()
result.wait_until_finish()
return True
test_error(
bq_table = 'my-project:my_dataset.my_table'
)
On Tue, Feb 4, 2020 at 11:21 AM Alan Krumholz <[email protected]>
wrote:
> Here is a test job that sometimes fails and sometimes doesn't (but most
> times do).....
> There seems to be something stochastic that causes this as after several
> tests a couple of them did succeed....
>
>
> def test_error(
> bq_table: str) -> str:
>
> import apache_beam as beam
> from apache_beam.options.pipeline_options import PipelineOptions
>
> class GenData(beam.DoFn):
> def process(self, _):
> for _ in range (20000):
> yield {'a':1,'b':2}
>
>
> def get_bigquery_schema():
> from apache_beam.io.gcp.internal.clients import bigquery
>
> table_schema = bigquery.TableSchema()
> columns = [
> ["a","integer","nullable"],
> ["b","integer","nullable"]
> ]
>
> for column in columns:
> column_schema = bigquery.TableFieldSchema()
> column_schema.name = column[0]
> column_schema.type = column[1]
> column_schema.mode = column[2]
> table_schema.fields.append(column_schema)
>
> return table_schema
>
> pipeline = beam.Pipeline(options=PipelineOptions(
> project='my-project',
> temp_location = 'gs://my-bucket/temp',
> staging_location = 'gs://my-bucket/staging',
> runner='DataflowRunner'
> ))
> #pipeline = beam.Pipeline()
>
> (
> pipeline
> | 'Empty start' >> beam.Create([''])
> | 'Generate Data' >> beam.ParDo(GenData())
> #| 'print' >> beam.Map(print)
> | 'Write to BigQuery' >> beam.io.WriteToBigQuery(
> project=bq_table.split(':')[0],
> dataset=bq_table.split(':')[1].split('.')[0],
> table=bq_table.split(':')[1].split('.')[1],
> schema=get_bigquery_schema(),
>
> create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
>
> write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)
> )
>
> result = pipeline.run()
> result.wait_until_finish()
>
> return True
>
> test_error(
> bq_table = 'my-project:my_dataset.my_table'
> )
>
> On Tue, Feb 4, 2020 at 10:04 AM Alan Krumholz <[email protected]>
> wrote:
>
>> I tried breaking apart my pipeline. Seems the step that breaks it is:
>> beam.io.WriteToBigQuery
>>
>> Let me see if I can create a self contained example that breaks to share
>> with you
>>
>> Thanks!
>>
>> On Tue, Feb 4, 2020 at 9:53 AM Pablo Estrada <[email protected]> wrote:
>>
>>> Hm that's odd. No changes to the pipeline? Are you able to share some of
>>> the code?
>>>
>>> +Udi Meiri <[email protected]> do you have any idea what could be going
>>> on here?
>>>
>>> On Tue, Feb 4, 2020 at 9:25 AM Alan Krumholz <[email protected]>
>>> wrote:
>>>
>>>> Hi Pablo,
>>>> This is strange... it doesn't seem to be the last beam release as last
>>>> night it was already using 2.19.0 I wonder if it was some release from the
>>>> DataFlow team (not beam related):
>>>> Job typeBatch
>>>> Job status Succeeded
>>>> SDK version
>>>> Apache Beam Python 3.5 SDK 2.19.0
>>>> Region
>>>> us-central1
>>>> Start timeFebruary 3, 2020 at 9:28:35 PM GMT-8
>>>> Elapsed time5 min 11 sec
>>>>
>>>> On Tue, Feb 4, 2020 at 9:15 AM Pablo Estrada <[email protected]>
>>>> wrote:
>>>>
>>>>> Hi Alan,
>>>>> could it be that you're picking up the new Apache Beam 2.19.0 release?
>>>>> Could you try depending on beam 2.18.0 to see if the issue surfaces when
>>>>> using the new release?
>>>>>
>>>>> If something was working and no longer works, it sounds like a bug.
>>>>> This may have to do with how we pickle (dill / cloudpickle) - see this
>>>>> question
>>>>> https://stackoverflow.com/questions/42960637/python-3-5-dill-pickling-unpickling-on-different-servers-keyerror-classtype
>>>>> Best
>>>>> -P.
>>>>>
>>>>> On Tue, Feb 4, 2020 at 6:22 AM Alan Krumholz <
>>>>> [email protected]> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I was running a dataflow job in GCP last night and it was running
>>>>>> fine.
>>>>>> This morning this same exact job is failing with the following error:
>>>>>>
>>>>>> Error message from worker: Traceback (most recent call last): File
>>>>>> "/usr/local/lib/python3.5/site-packages/apache_beam/internal/pickler.py",
>>>>>> line 286, in loads return dill.loads(s) File
>>>>>> "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 275, in
>>>>>> loads
>>>>>> return load(file, ignore, **kwds) File
>>>>>> "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 270, in load
>>>>>> return Unpickler(file, ignore=ignore, **kwds).load() File
>>>>>> "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 472, in load
>>>>>> obj = StockUnpickler.load(self) File
>>>>>> "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 577, in
>>>>>> _load_type return _reverse_typemap[name] KeyError: 'ClassType' During
>>>>>> handling of the above exception, another exception occurred: Traceback
>>>>>> (most recent call last): File
>>>>>> "/usr/local/lib/python3.5/site-packages/dataflow_worker/batchworker.py",
>>>>>> line 648, in do_work work_executor.execute() File
>>>>>> "/usr/local/lib/python3.5/site-packages/dataflow_worker/executor.py",
>>>>>> line
>>>>>> 176, in execute op.start() File
>>>>>> "apache_beam/runners/worker/operations.py",
>>>>>> line 649, in apache_beam.runners.worker.operations.DoOperation.start File
>>>>>> "apache_beam/runners/worker/operations.py", line 651, in
>>>>>> apache_beam.runners.worker.operations.DoOperation.start File
>>>>>> "apache_beam/runners/worker/operations.py", line 652, in
>>>>>> apache_beam.runners.worker.operations.DoOperation.start File
>>>>>> "apache_beam/runners/worker/operations.py", line 261, in
>>>>>> apache_beam.runners.worker.operations.Operation.start File
>>>>>> "apache_beam/runners/worker/operations.py", line 266, in
>>>>>> apache_beam.runners.worker.operations.Operation.start File
>>>>>> "apache_beam/runners/worker/operations.py", line 597, in
>>>>>> apache_beam.runners.worker.operations.DoOperation.setup File
>>>>>> "apache_beam/runners/worker/operations.py", line 602, in
>>>>>> apache_beam.runners.worker.operations.DoOperation.setup File
>>>>>> "/usr/local/lib/python3.5/site-packages/apache_beam/internal/pickler.py",
>>>>>> line 290, in loads return dill.loads(s) File
>>>>>> "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 275, in
>>>>>> loads
>>>>>> return load(file, ignore, **kwds) File
>>>>>> "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 270, in load
>>>>>> return Unpickler(file, ignore=ignore, **kwds).load() File
>>>>>> "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 472, in load
>>>>>> obj = StockUnpickler.load(self) File
>>>>>> "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 577, in
>>>>>> _load_type return _reverse_typemap[name] KeyError: 'ClassType'
>>>>>>
>>>>>>
>>>>>> If I use a local runner it still runs fine.
>>>>>> Anyone else experiencing something similar today? (or know how to fix
>>>>>> this?)
>>>>>>
>>>>>> Thanks!
>>>>>>
>>>>>