Re: error with DirectRunner
Thank you so much for start working on this! On Thu, Jan 10, 2019 at 5:55 AM Robert Bradshaw wrote: > https://github.com/apache/beam/pull/7456 > > On Thu, Jan 10, 2019 at 10:59 AM Robert Bradshaw > wrote: > > > > Sorry this got lost. I filed > > https://issues.apache.org/jira/browse/BEAM-6404; hopefully it'll be an > > easy fix. > > > > On Wed, Jan 9, 2019 at 8:33 PM Allie Chen wrote: > > > > > > Greetings! > > > > > > May I ask whether there is any plan to work on this issue? Or if I > just use `BundleBasedDirectRunner` instead of `DirectRunner`, will there be > any performance issues/caveats I should worry about? > > > > > > Thanks! > > > Allie > > > > > > On Tue, Oct 30, 2018 at 8:13 PM Udi Meiri wrote: > > >> > > >> +Robert Bradshaw I would be happy to debug and fix this, but I'd need > more guidance on where to look. > > >> > > >> On Tue, Oct 30, 2018 at 4:07 PM Udi Meiri wrote: > > >>> > > >>> Created https://issues.apache.org/jira/browse/BEAM-5927 > > >>> > > >>> On Tue, Oct 30, 2018 at 1:13 PM Lukasz Cwik > wrote: > > > > Udi, do you know if we have a bug tracking this issue? > > > > If not, can you file one referencing this e-mail thread? > > > > On Tue, Oct 30, 2018 at 6:33 AM Allie Chen > wrote: > > > > > > Thanks Udi. I agree, since it works fine removing either the side > input or the last flatten and combine operation. > > > > > > On Mon, Oct 29, 2018 at 9:02 PM Udi Meiri > wrote: > > >> > > >> This looks like a FnApiRunner bug. > > >> When I override use_fnapi_runner = False in direct_runner.py the > pipeline works. > > >> > > >> It seems like either the side-input to _copy_number or the > Flatten operation is the culprit. > > >> > > >> On Mon, Oct 29, 2018 at 2:37 PM Allie Chen > wrote: > > >>> > > >>> Hi, > > >>> > > >>> I have a project that started failing with DirectRunner, but > works well using DataflowRunner (last working version is 2.4). The error > message I received are: > > >>> line 1088, in run_stage > > >>> pipeline_components.pcollections[actual_pcoll_id].coder_id]] > > >>> KeyError: u'ref_Coder_WindowedValueCoder_1' > > >>> > > >>> I have simplified the pipeline to the following example. Can > someone please take a look? Many thanks! > > >>> > > >>> Allie > > >>> > > >>> > > >>> import apache_beam as beam > > >>> import argparse > > >>> from apache_beam import transforms > > >>> from apache_beam import pvalue > > >>> from apache_beam.options import pipeline_options > > >>> > > >>> > > >>> def _copy_number(number, side=None): > > >>> yield number > > >>> > > >>> > > >>> def fn_sum(values): > > >>> return sum(values) > > >>> > > >>> > > >>> def run(argv=None): > > >>> parser = argparse.ArgumentParser() > > >>> _, pipeline_args = parser.parse_known_args(argv) > > >>> options = pipeline_options.PipelineOptions(pipeline_args) > > >>> numbers = [1, 2] > > >>> with beam.Pipeline(options=options) as p: > > >>> sum_1 = (p > > >>> | 'ReadNumber1' >> transforms.Create(numbers) > > >>> | 'CalculateSum1' >> beam.CombineGlobally(fn_sum)) > > >>> > > >>> sum_2 = (p > > >>> | 'ReadNumber2' >> transforms.Create(numbers) > > >>> | beam.ParDo(_copy_number, > pvalue.AsSingleton(sum_1)) > > >>> | 'CalculateSum2' >> beam.CombineGlobally(fn_sum)) > > >>> > > >>> _ = ((sum_1, sum_2) > > >>> | beam.Flatten() > > >>> | 'CalculateSum3' >> beam.CombineGlobally(fn_sum) > > >>> | beam.io.WriteToText('gs://BUCKET/sum')) > > >>> > > >>> > > >>> >
Re: error with DirectRunner
https://github.com/apache/beam/pull/7456 On Thu, Jan 10, 2019 at 10:59 AM Robert Bradshaw wrote: > > Sorry this got lost. I filed > https://issues.apache.org/jira/browse/BEAM-6404; hopefully it'll be an > easy fix. > > On Wed, Jan 9, 2019 at 8:33 PM Allie Chen wrote: > > > > Greetings! > > > > May I ask whether there is any plan to work on this issue? Or if I just use > > `BundleBasedDirectRunner` instead of `DirectRunner`, will there be any > > performance issues/caveats I should worry about? > > > > Thanks! > > Allie > > > > On Tue, Oct 30, 2018 at 8:13 PM Udi Meiri wrote: > >> > >> +Robert Bradshaw I would be happy to debug and fix this, but I'd need more > >> guidance on where to look. > >> > >> On Tue, Oct 30, 2018 at 4:07 PM Udi Meiri wrote: > >>> > >>> Created https://issues.apache.org/jira/browse/BEAM-5927 > >>> > >>> On Tue, Oct 30, 2018 at 1:13 PM Lukasz Cwik wrote: > > Udi, do you know if we have a bug tracking this issue? > > If not, can you file one referencing this e-mail thread? > > On Tue, Oct 30, 2018 at 6:33 AM Allie Chen wrote: > > > > Thanks Udi. I agree, since it works fine removing either the side input > > or the last flatten and combine operation. > > > > On Mon, Oct 29, 2018 at 9:02 PM Udi Meiri wrote: > >> > >> This looks like a FnApiRunner bug. > >> When I override use_fnapi_runner = False in direct_runner.py the > >> pipeline works. > >> > >> It seems like either the side-input to _copy_number or the Flatten > >> operation is the culprit. > >> > >> On Mon, Oct 29, 2018 at 2:37 PM Allie Chen > >> wrote: > >>> > >>> Hi, > >>> > >>> I have a project that started failing with DirectRunner, but works > >>> well using DataflowRunner (last working version is 2.4). The error > >>> message I received are: > >>> line 1088, in run_stage > >>> pipeline_components.pcollections[actual_pcoll_id].coder_id]] > >>> KeyError: u'ref_Coder_WindowedValueCoder_1' > >>> > >>> I have simplified the pipeline to the following example. Can someone > >>> please take a look? Many thanks! > >>> > >>> Allie > >>> > >>> > >>> import apache_beam as beam > >>> import argparse > >>> from apache_beam import transforms > >>> from apache_beam import pvalue > >>> from apache_beam.options import pipeline_options > >>> > >>> > >>> def _copy_number(number, side=None): > >>> yield number > >>> > >>> > >>> def fn_sum(values): > >>> return sum(values) > >>> > >>> > >>> def run(argv=None): > >>> parser = argparse.ArgumentParser() > >>> _, pipeline_args = parser.parse_known_args(argv) > >>> options = pipeline_options.PipelineOptions(pipeline_args) > >>> numbers = [1, 2] > >>> with beam.Pipeline(options=options) as p: > >>> sum_1 = (p > >>> | 'ReadNumber1' >> transforms.Create(numbers) > >>> | 'CalculateSum1' >> beam.CombineGlobally(fn_sum)) > >>> > >>> sum_2 = (p > >>> | 'ReadNumber2' >> transforms.Create(numbers) > >>> | beam.ParDo(_copy_number, pvalue.AsSingleton(sum_1)) > >>> | 'CalculateSum2' >> beam.CombineGlobally(fn_sum)) > >>> > >>> _ = ((sum_1, sum_2) > >>> | beam.Flatten() > >>> | 'CalculateSum3' >> beam.CombineGlobally(fn_sum) > >>> | beam.io.WriteToText('gs://BUCKET/sum')) > >>> > >>> > >>>
Re: error with DirectRunner
Sorry this got lost. I filed https://issues.apache.org/jira/browse/BEAM-6404; hopefully it'll be an easy fix. On Wed, Jan 9, 2019 at 8:33 PM Allie Chen wrote: > > Greetings! > > May I ask whether there is any plan to work on this issue? Or if I just use > `BundleBasedDirectRunner` instead of `DirectRunner`, will there be any > performance issues/caveats I should worry about? > > Thanks! > Allie > > On Tue, Oct 30, 2018 at 8:13 PM Udi Meiri wrote: >> >> +Robert Bradshaw I would be happy to debug and fix this, but I'd need more >> guidance on where to look. >> >> On Tue, Oct 30, 2018 at 4:07 PM Udi Meiri wrote: >>> >>> Created https://issues.apache.org/jira/browse/BEAM-5927 >>> >>> On Tue, Oct 30, 2018 at 1:13 PM Lukasz Cwik wrote: Udi, do you know if we have a bug tracking this issue? If not, can you file one referencing this e-mail thread? On Tue, Oct 30, 2018 at 6:33 AM Allie Chen wrote: > > Thanks Udi. I agree, since it works fine removing either the side input > or the last flatten and combine operation. > > On Mon, Oct 29, 2018 at 9:02 PM Udi Meiri wrote: >> >> This looks like a FnApiRunner bug. >> When I override use_fnapi_runner = False in direct_runner.py the >> pipeline works. >> >> It seems like either the side-input to _copy_number or the Flatten >> operation is the culprit. >> >> On Mon, Oct 29, 2018 at 2:37 PM Allie Chen wrote: >>> >>> Hi, >>> >>> I have a project that started failing with DirectRunner, but works well >>> using DataflowRunner (last working version is 2.4). The error message I >>> received are: >>> line 1088, in run_stage >>> pipeline_components.pcollections[actual_pcoll_id].coder_id]] >>> KeyError: u'ref_Coder_WindowedValueCoder_1' >>> >>> I have simplified the pipeline to the following example. Can someone >>> please take a look? Many thanks! >>> >>> Allie >>> >>> >>> import apache_beam as beam >>> import argparse >>> from apache_beam import transforms >>> from apache_beam import pvalue >>> from apache_beam.options import pipeline_options >>> >>> >>> def _copy_number(number, side=None): >>> yield number >>> >>> >>> def fn_sum(values): >>> return sum(values) >>> >>> >>> def run(argv=None): >>> parser = argparse.ArgumentParser() >>> _, pipeline_args = parser.parse_known_args(argv) >>> options = pipeline_options.PipelineOptions(pipeline_args) >>> numbers = [1, 2] >>> with beam.Pipeline(options=options) as p: >>> sum_1 = (p >>> | 'ReadNumber1' >> transforms.Create(numbers) >>> | 'CalculateSum1' >> beam.CombineGlobally(fn_sum)) >>> >>> sum_2 = (p >>> | 'ReadNumber2' >> transforms.Create(numbers) >>> | beam.ParDo(_copy_number, pvalue.AsSingleton(sum_1)) >>> | 'CalculateSum2' >> beam.CombineGlobally(fn_sum)) >>> >>> _ = ((sum_1, sum_2) >>> | beam.Flatten() >>> | 'CalculateSum3' >> beam.CombineGlobally(fn_sum) >>> | beam.io.WriteToText('gs://BUCKET/sum')) >>> >>> >>>
Re: error with DirectRunner
Greetings! May I ask whether there is any plan to work on this issue? Or if I just use `BundleBasedDirectRunner` instead of `DirectRunner`, will there be any performance issues/caveats I should worry about? Thanks! Allie On Tue, Oct 30, 2018 at 8:13 PM Udi Meiri wrote: > +Robert Bradshaw I would be happy to debug and fix > this, but I'd need more guidance on where to look. > > On Tue, Oct 30, 2018 at 4:07 PM Udi Meiri wrote: > >> Created https://issues.apache.org/jira/browse/BEAM-5927 >> >> On Tue, Oct 30, 2018 at 1:13 PM Lukasz Cwik wrote: >> >>> Udi, do you know if we have a bug tracking this issue? >>> >>> If not, can you file one referencing this e-mail thread? >>> >>> On Tue, Oct 30, 2018 at 6:33 AM Allie Chen >>> wrote: >>> Thanks Udi. I agree, since it works fine removing either the side input or the last flatten and combine operation. On Mon, Oct 29, 2018 at 9:02 PM Udi Meiri wrote: > This looks like a FnApiRunner bug. > When I override use_fnapi_runner = False in direct_runner.py the > pipeline works. > > It seems like either the side-input to _copy_number or the Flatten > operation is the culprit. > > On Mon, Oct 29, 2018 at 2:37 PM Allie Chen > wrote: > >> Hi, >> >> I have a project that started failing with DirectRunner, but works >> well using DataflowRunner (last working version is 2.4). The error >> message >> I received are: >> line 1088, in run_stage >> pipeline_components.pcollections[actual_pcoll_id].coder_id]] >> KeyError: u'ref_Coder_WindowedValueCoder_1' >> >> I have simplified the pipeline to the following example. Can someone >> please take a look? Many thanks! >> >> Allie >> >> >> import apache_beam as beam >> import argparse >> from apache_beam import transforms >> from apache_beam import pvalue >> from apache_beam.options import pipeline_options >> >> >> def _copy_number(number, side=None): >> yield number >> >> >> def fn_sum(values): >> return sum(values) >> >> >> def run(argv=None): >> parser = argparse.ArgumentParser() >> _, pipeline_args = parser.parse_known_args(argv) >> options = pipeline_options.PipelineOptions(pipeline_args) >> numbers = [1, 2] >> with beam.Pipeline(options=options) as p: >> sum_1 = (p >> | 'ReadNumber1' >> transforms.Create(numbers) >> | 'CalculateSum1' >> beam.CombineGlobally(fn_sum)) >> >> sum_2 = (p >> | 'ReadNumber2' >> transforms.Create(numbers) >> | beam.ParDo(_copy_number, pvalue.AsSingleton(sum_1)) >> | 'CalculateSum2' >> beam.CombineGlobally(fn_sum)) >> >> _ = ((sum_1, sum_2) >> | beam.Flatten() >> | 'CalculateSum3' >> beam.CombineGlobally(fn_sum) >> | beam.io.WriteToText('gs://BUCKET/sum')) >> >> >> >>
Re: error with DirectRunner
+Robert Bradshaw I would be happy to debug and fix this, but I'd need more guidance on where to look. On Tue, Oct 30, 2018 at 4:07 PM Udi Meiri wrote: > Created https://issues.apache.org/jira/browse/BEAM-5927 > > On Tue, Oct 30, 2018 at 1:13 PM Lukasz Cwik wrote: > >> Udi, do you know if we have a bug tracking this issue? >> >> If not, can you file one referencing this e-mail thread? >> >> On Tue, Oct 30, 2018 at 6:33 AM Allie Chen wrote: >> >>> Thanks Udi. I agree, since it works fine removing either the side input >>> or the last flatten and combine operation. >>> >>> On Mon, Oct 29, 2018 at 9:02 PM Udi Meiri wrote: >>> This looks like a FnApiRunner bug. When I override use_fnapi_runner = False in direct_runner.py the pipeline works. It seems like either the side-input to _copy_number or the Flatten operation is the culprit. On Mon, Oct 29, 2018 at 2:37 PM Allie Chen wrote: > Hi, > > I have a project that started failing with DirectRunner, but works > well using DataflowRunner (last working version is 2.4). The error message > I received are: > line 1088, in run_stage > pipeline_components.pcollections[actual_pcoll_id].coder_id]] > KeyError: u'ref_Coder_WindowedValueCoder_1' > > I have simplified the pipeline to the following example. Can someone > please take a look? Many thanks! > > Allie > > > import apache_beam as beam > import argparse > from apache_beam import transforms > from apache_beam import pvalue > from apache_beam.options import pipeline_options > > > def _copy_number(number, side=None): > yield number > > > def fn_sum(values): > return sum(values) > > > def run(argv=None): > parser = argparse.ArgumentParser() > _, pipeline_args = parser.parse_known_args(argv) > options = pipeline_options.PipelineOptions(pipeline_args) > numbers = [1, 2] > with beam.Pipeline(options=options) as p: > sum_1 = (p > | 'ReadNumber1' >> transforms.Create(numbers) > | 'CalculateSum1' >> beam.CombineGlobally(fn_sum)) > > sum_2 = (p > | 'ReadNumber2' >> transforms.Create(numbers) > | beam.ParDo(_copy_number, pvalue.AsSingleton(sum_1)) > | 'CalculateSum2' >> beam.CombineGlobally(fn_sum)) > > _ = ((sum_1, sum_2) > | beam.Flatten() > | 'CalculateSum3' >> beam.CombineGlobally(fn_sum) > | beam.io.WriteToText('gs://BUCKET/sum')) > > > > smime.p7s Description: S/MIME Cryptographic Signature
Re: error with DirectRunner
Created https://issues.apache.org/jira/browse/BEAM-5927 On Tue, Oct 30, 2018 at 1:13 PM Lukasz Cwik wrote: > Udi, do you know if we have a bug tracking this issue? > > If not, can you file one referencing this e-mail thread? > > On Tue, Oct 30, 2018 at 6:33 AM Allie Chen wrote: > >> Thanks Udi. I agree, since it works fine removing either the side input >> or the last flatten and combine operation. >> >> On Mon, Oct 29, 2018 at 9:02 PM Udi Meiri wrote: >> >>> This looks like a FnApiRunner bug. >>> When I override use_fnapi_runner = False in direct_runner.py the >>> pipeline works. >>> >>> It seems like either the side-input to _copy_number or the Flatten >>> operation is the culprit. >>> >>> On Mon, Oct 29, 2018 at 2:37 PM Allie Chen >>> wrote: >>> Hi, I have a project that started failing with DirectRunner, but works well using DataflowRunner (last working version is 2.4). The error message I received are: line 1088, in run_stage pipeline_components.pcollections[actual_pcoll_id].coder_id]] KeyError: u'ref_Coder_WindowedValueCoder_1' I have simplified the pipeline to the following example. Can someone please take a look? Many thanks! Allie import apache_beam as beam import argparse from apache_beam import transforms from apache_beam import pvalue from apache_beam.options import pipeline_options def _copy_number(number, side=None): yield number def fn_sum(values): return sum(values) def run(argv=None): parser = argparse.ArgumentParser() _, pipeline_args = parser.parse_known_args(argv) options = pipeline_options.PipelineOptions(pipeline_args) numbers = [1, 2] with beam.Pipeline(options=options) as p: sum_1 = (p | 'ReadNumber1' >> transforms.Create(numbers) | 'CalculateSum1' >> beam.CombineGlobally(fn_sum)) sum_2 = (p | 'ReadNumber2' >> transforms.Create(numbers) | beam.ParDo(_copy_number, pvalue.AsSingleton(sum_1)) | 'CalculateSum2' >> beam.CombineGlobally(fn_sum)) _ = ((sum_1, sum_2) | beam.Flatten() | 'CalculateSum3' >> beam.CombineGlobally(fn_sum) | beam.io.WriteToText('gs://BUCKET/sum')) smime.p7s Description: S/MIME Cryptographic Signature
Re: error with DirectRunner
Udi, do you know if we have a bug tracking this issue? If not, can you file one referencing this e-mail thread? On Tue, Oct 30, 2018 at 6:33 AM Allie Chen wrote: > Thanks Udi. I agree, since it works fine removing either the side input or > the last flatten and combine operation. > > On Mon, Oct 29, 2018 at 9:02 PM Udi Meiri wrote: > >> This looks like a FnApiRunner bug. >> When I override use_fnapi_runner = False in direct_runner.py the pipeline >> works. >> >> It seems like either the side-input to _copy_number or the Flatten >> operation is the culprit. >> >> On Mon, Oct 29, 2018 at 2:37 PM Allie Chen wrote: >> >>> Hi, >>> >>> I have a project that started failing with DirectRunner, but works well >>> using DataflowRunner (last working version is 2.4). The error message I >>> received are: >>> line 1088, in run_stage >>> pipeline_components.pcollections[actual_pcoll_id].coder_id]] >>> KeyError: u'ref_Coder_WindowedValueCoder_1' >>> >>> I have simplified the pipeline to the following example. Can someone >>> please take a look? Many thanks! >>> >>> Allie >>> >>> >>> import apache_beam as beam >>> import argparse >>> from apache_beam import transforms >>> from apache_beam import pvalue >>> from apache_beam.options import pipeline_options >>> >>> >>> def _copy_number(number, side=None): >>> yield number >>> >>> >>> def fn_sum(values): >>> return sum(values) >>> >>> >>> def run(argv=None): >>> parser = argparse.ArgumentParser() >>> _, pipeline_args = parser.parse_known_args(argv) >>> options = pipeline_options.PipelineOptions(pipeline_args) >>> numbers = [1, 2] >>> with beam.Pipeline(options=options) as p: >>> sum_1 = (p >>> | 'ReadNumber1' >> transforms.Create(numbers) >>> | 'CalculateSum1' >> beam.CombineGlobally(fn_sum)) >>> >>> sum_2 = (p >>> | 'ReadNumber2' >> transforms.Create(numbers) >>> | beam.ParDo(_copy_number, pvalue.AsSingleton(sum_1)) >>> | 'CalculateSum2' >> beam.CombineGlobally(fn_sum)) >>> >>> _ = ((sum_1, sum_2) >>> | beam.Flatten() >>> | 'CalculateSum3' >> beam.CombineGlobally(fn_sum) >>> | beam.io.WriteToText('gs://BUCKET/sum')) >>> >>> >>> >>>
Re: error with DirectRunner
Thanks Udi. I agree, since it works fine removing either the side input or the last flatten and combine operation. On Mon, Oct 29, 2018 at 9:02 PM Udi Meiri wrote: > This looks like a FnApiRunner bug. > When I override use_fnapi_runner = False in direct_runner.py the pipeline > works. > > It seems like either the side-input to _copy_number or the Flatten > operation is the culprit. > > On Mon, Oct 29, 2018 at 2:37 PM Allie Chen wrote: > >> Hi, >> >> I have a project that started failing with DirectRunner, but works well >> using DataflowRunner (last working version is 2.4). The error message I >> received are: >> line 1088, in run_stage >> pipeline_components.pcollections[actual_pcoll_id].coder_id]] >> KeyError: u'ref_Coder_WindowedValueCoder_1' >> >> I have simplified the pipeline to the following example. Can someone >> please take a look? Many thanks! >> >> Allie >> >> >> import apache_beam as beam >> import argparse >> from apache_beam import transforms >> from apache_beam import pvalue >> from apache_beam.options import pipeline_options >> >> >> def _copy_number(number, side=None): >> yield number >> >> >> def fn_sum(values): >> return sum(values) >> >> >> def run(argv=None): >> parser = argparse.ArgumentParser() >> _, pipeline_args = parser.parse_known_args(argv) >> options = pipeline_options.PipelineOptions(pipeline_args) >> numbers = [1, 2] >> with beam.Pipeline(options=options) as p: >> sum_1 = (p >> | 'ReadNumber1' >> transforms.Create(numbers) >> | 'CalculateSum1' >> beam.CombineGlobally(fn_sum)) >> >> sum_2 = (p >> | 'ReadNumber2' >> transforms.Create(numbers) >> | beam.ParDo(_copy_number, pvalue.AsSingleton(sum_1)) >> | 'CalculateSum2' >> beam.CombineGlobally(fn_sum)) >> >> _ = ((sum_1, sum_2) >> | beam.Flatten() >> | 'CalculateSum3' >> beam.CombineGlobally(fn_sum) >> | beam.io.WriteToText('gs://BUCKET/sum')) >> >> >> >>
Re: error with DirectRunner
This looks like a FnApiRunner bug. When I override use_fnapi_runner = False in direct_runner.py the pipeline works. It seems like either the side-input to _copy_number or the Flatten operation is the culprit. On Mon, Oct 29, 2018 at 2:37 PM Allie Chen wrote: > Hi, > > I have a project that started failing with DirectRunner, but works well > using DataflowRunner (last working version is 2.4). The error message I > received are: > line 1088, in run_stage > pipeline_components.pcollections[actual_pcoll_id].coder_id]] > KeyError: u'ref_Coder_WindowedValueCoder_1' > > I have simplified the pipeline to the following example. Can someone > please take a look? Many thanks! > > Allie > > > import apache_beam as beam > import argparse > from apache_beam import transforms > from apache_beam import pvalue > from apache_beam.options import pipeline_options > > > def _copy_number(number, side=None): > yield number > > > def fn_sum(values): > return sum(values) > > > def run(argv=None): > parser = argparse.ArgumentParser() > _, pipeline_args = parser.parse_known_args(argv) > options = pipeline_options.PipelineOptions(pipeline_args) > numbers = [1, 2] > with beam.Pipeline(options=options) as p: > sum_1 = (p > | 'ReadNumber1' >> transforms.Create(numbers) > | 'CalculateSum1' >> beam.CombineGlobally(fn_sum)) > > sum_2 = (p > | 'ReadNumber2' >> transforms.Create(numbers) > | beam.ParDo(_copy_number, pvalue.AsSingleton(sum_1)) > | 'CalculateSum2' >> beam.CombineGlobally(fn_sum)) > > _ = ((sum_1, sum_2) > | beam.Flatten() > | 'CalculateSum3' >> beam.CombineGlobally(fn_sum) > | beam.io.WriteToText('gs://BUCKET/sum')) > > > > smime.p7s Description: S/MIME Cryptographic Signature
error with DirectRunner
Hi, I have a project that started failing with DirectRunner, but works well using DataflowRunner (last working version is 2.4). The error message I received are: line 1088, in run_stage pipeline_components.pcollections[actual_pcoll_id].coder_id]] KeyError: u'ref_Coder_WindowedValueCoder_1' I have simplified the pipeline to the following example. Can someone please take a look? Many thanks! Allie import apache_beam as beam import argparse from apache_beam import transforms from apache_beam import pvalue from apache_beam.options import pipeline_options def _copy_number(number, side=None): yield number def fn_sum(values): return sum(values) def run(argv=None): parser = argparse.ArgumentParser() _, pipeline_args = parser.parse_known_args(argv) options = pipeline_options.PipelineOptions(pipeline_args) numbers = [1, 2] with beam.Pipeline(options=options) as p: sum_1 = (p | 'ReadNumber1' >> transforms.Create(numbers) | 'CalculateSum1' >> beam.CombineGlobally(fn_sum)) sum_2 = (p | 'ReadNumber2' >> transforms.Create(numbers) | beam.ParDo(_copy_number, pvalue.AsSingleton(sum_1)) | 'CalculateSum2' >> beam.CombineGlobally(fn_sum)) _ = ((sum_1, sum_2) | beam.Flatten() | 'CalculateSum3' >> beam.CombineGlobally(fn_sum) | beam.io.WriteToText('gs://BUCKET/sum'))