Re: error with DirectRunner

2019-01-10 Thread Allie Chen
Thank you so much for start working on this!

On Thu, Jan 10, 2019 at 5:55 AM Robert Bradshaw  wrote:

> https://github.com/apache/beam/pull/7456
>
> On Thu, Jan 10, 2019 at 10:59 AM Robert Bradshaw 
> wrote:
> >
> > Sorry this got lost. I filed
> > https://issues.apache.org/jira/browse/BEAM-6404; hopefully it'll be an
> > easy fix.
> >
> > On Wed, Jan 9, 2019 at 8:33 PM Allie Chen  wrote:
> > >
> > > Greetings!
> > >
> > > May I ask whether there is any plan to work on this issue? Or if I
> just use `BundleBasedDirectRunner` instead of `DirectRunner`, will there be
> any performance issues/caveats I should worry about?
> > >
> > > Thanks!
> > > Allie
> > >
> > > On Tue, Oct 30, 2018 at 8:13 PM Udi Meiri  wrote:
> > >>
> > >> +Robert Bradshaw I would be happy to debug and fix this, but I'd need
> more guidance on where to look.
> > >>
> > >> On Tue, Oct 30, 2018 at 4:07 PM Udi Meiri  wrote:
> > >>>
> > >>> Created https://issues.apache.org/jira/browse/BEAM-5927
> > >>>
> > >>> On Tue, Oct 30, 2018 at 1:13 PM Lukasz Cwik 
> wrote:
> > 
> >  Udi, do you know if we have a bug tracking this issue?
> > 
> >  If not, can you file one referencing this e-mail thread?
> > 
> >  On Tue, Oct 30, 2018 at 6:33 AM Allie Chen 
> wrote:
> > >
> > > Thanks Udi. I agree, since it works fine removing either the side
> input or the last flatten and combine operation.
> > >
> > > On Mon, Oct 29, 2018 at 9:02 PM Udi Meiri 
> wrote:
> > >>
> > >> This looks like a FnApiRunner bug.
> > >> When I override use_fnapi_runner = False in direct_runner.py the
> pipeline works.
> > >>
> > >> It seems like either the side-input to _copy_number or the
> Flatten operation is the culprit.
> > >>
> > >> On Mon, Oct 29, 2018 at 2:37 PM Allie Chen 
> wrote:
> > >>>
> > >>> Hi,
> > >>>
> > >>> I have a project that started failing with DirectRunner, but
> works well using DataflowRunner (last working version is 2.4). The error
> message I received are:
> > >>> line 1088, in run_stage
> > >>>   pipeline_components.pcollections[actual_pcoll_id].coder_id]]
> > >>> KeyError: u'ref_Coder_WindowedValueCoder_1'
> > >>>
> > >>> I have simplified the pipeline to the following example. Can
> someone please take a look? Many thanks!
> > >>>
> > >>> Allie
> > >>>
> > >>>
> > >>> import apache_beam as beam
> > >>> import argparse
> > >>> from apache_beam import transforms
> > >>> from apache_beam import pvalue
> > >>> from apache_beam.options import pipeline_options
> > >>>
> > >>>
> > >>> def _copy_number(number, side=None):
> > >>>   yield number
> > >>>
> > >>>
> > >>> def fn_sum(values):
> > >>>   return sum(values)
> > >>>
> > >>>
> > >>> def run(argv=None):
> > >>>   parser = argparse.ArgumentParser()
> > >>>   _, pipeline_args = parser.parse_known_args(argv)
> > >>>   options = pipeline_options.PipelineOptions(pipeline_args)
> > >>>   numbers = [1, 2]
> > >>>   with beam.Pipeline(options=options) as p:
> > >>> sum_1 = (p
> > >>>  | 'ReadNumber1' >> transforms.Create(numbers)
> > >>>  | 'CalculateSum1' >> beam.CombineGlobally(fn_sum))
> > >>>
> > >>> sum_2 = (p
> > >>>  | 'ReadNumber2' >> transforms.Create(numbers)
> > >>>  | beam.ParDo(_copy_number,
> pvalue.AsSingleton(sum_1))
> > >>>  | 'CalculateSum2' >> beam.CombineGlobally(fn_sum))
> > >>>
> > >>> _ = ((sum_1, sum_2)
> > >>>  | beam.Flatten()
> > >>>  | 'CalculateSum3' >> beam.CombineGlobally(fn_sum)
> > >>>  | beam.io.WriteToText('gs://BUCKET/sum'))
> > >>>
> > >>>
> > >>>
>


Re: error with DirectRunner

2019-01-10 Thread Robert Bradshaw
https://github.com/apache/beam/pull/7456

On Thu, Jan 10, 2019 at 10:59 AM Robert Bradshaw  wrote:
>
> Sorry this got lost. I filed
> https://issues.apache.org/jira/browse/BEAM-6404; hopefully it'll be an
> easy fix.
>
> On Wed, Jan 9, 2019 at 8:33 PM Allie Chen  wrote:
> >
> > Greetings!
> >
> > May I ask whether there is any plan to work on this issue? Or if I just use 
> > `BundleBasedDirectRunner` instead of `DirectRunner`, will there be any 
> > performance issues/caveats I should worry about?
> >
> > Thanks!
> > Allie
> >
> > On Tue, Oct 30, 2018 at 8:13 PM Udi Meiri  wrote:
> >>
> >> +Robert Bradshaw I would be happy to debug and fix this, but I'd need more 
> >> guidance on where to look.
> >>
> >> On Tue, Oct 30, 2018 at 4:07 PM Udi Meiri  wrote:
> >>>
> >>> Created https://issues.apache.org/jira/browse/BEAM-5927
> >>>
> >>> On Tue, Oct 30, 2018 at 1:13 PM Lukasz Cwik  wrote:
> 
>  Udi, do you know if we have a bug tracking this issue?
> 
>  If not, can you file one referencing this e-mail thread?
> 
>  On Tue, Oct 30, 2018 at 6:33 AM Allie Chen  wrote:
> >
> > Thanks Udi. I agree, since it works fine removing either the side input 
> > or the last flatten and combine operation.
> >
> > On Mon, Oct 29, 2018 at 9:02 PM Udi Meiri  wrote:
> >>
> >> This looks like a FnApiRunner bug.
> >> When I override use_fnapi_runner = False in direct_runner.py the 
> >> pipeline works.
> >>
> >> It seems like either the side-input to _copy_number or the Flatten 
> >> operation is the culprit.
> >>
> >> On Mon, Oct 29, 2018 at 2:37 PM Allie Chen  
> >> wrote:
> >>>
> >>> Hi,
> >>>
> >>> I have a project that started failing with DirectRunner, but works 
> >>> well using DataflowRunner (last working version is 2.4). The error 
> >>> message I received are:
> >>> line 1088, in run_stage
> >>>   pipeline_components.pcollections[actual_pcoll_id].coder_id]]
> >>> KeyError: u'ref_Coder_WindowedValueCoder_1'
> >>>
> >>> I have simplified the pipeline to the following example. Can someone 
> >>> please take a look? Many thanks!
> >>>
> >>> Allie
> >>>
> >>>
> >>> import apache_beam as beam
> >>> import argparse
> >>> from apache_beam import transforms
> >>> from apache_beam import pvalue
> >>> from apache_beam.options import pipeline_options
> >>>
> >>>
> >>> def _copy_number(number, side=None):
> >>>   yield number
> >>>
> >>>
> >>> def fn_sum(values):
> >>>   return sum(values)
> >>>
> >>>
> >>> def run(argv=None):
> >>>   parser = argparse.ArgumentParser()
> >>>   _, pipeline_args = parser.parse_known_args(argv)
> >>>   options = pipeline_options.PipelineOptions(pipeline_args)
> >>>   numbers = [1, 2]
> >>>   with beam.Pipeline(options=options) as p:
> >>> sum_1 = (p
> >>>  | 'ReadNumber1' >> transforms.Create(numbers)
> >>>  | 'CalculateSum1' >> beam.CombineGlobally(fn_sum))
> >>>
> >>> sum_2 = (p
> >>>  | 'ReadNumber2' >> transforms.Create(numbers)
> >>>  | beam.ParDo(_copy_number, pvalue.AsSingleton(sum_1))
> >>>  | 'CalculateSum2' >> beam.CombineGlobally(fn_sum))
> >>>
> >>> _ = ((sum_1, sum_2)
> >>>  | beam.Flatten()
> >>>  | 'CalculateSum3' >> beam.CombineGlobally(fn_sum)
> >>>  | beam.io.WriteToText('gs://BUCKET/sum'))
> >>>
> >>>
> >>>


Re: error with DirectRunner

2019-01-10 Thread Robert Bradshaw
Sorry this got lost. I filed
https://issues.apache.org/jira/browse/BEAM-6404; hopefully it'll be an
easy fix.

On Wed, Jan 9, 2019 at 8:33 PM Allie Chen  wrote:
>
> Greetings!
>
> May I ask whether there is any plan to work on this issue? Or if I just use 
> `BundleBasedDirectRunner` instead of `DirectRunner`, will there be any 
> performance issues/caveats I should worry about?
>
> Thanks!
> Allie
>
> On Tue, Oct 30, 2018 at 8:13 PM Udi Meiri  wrote:
>>
>> +Robert Bradshaw I would be happy to debug and fix this, but I'd need more 
>> guidance on where to look.
>>
>> On Tue, Oct 30, 2018 at 4:07 PM Udi Meiri  wrote:
>>>
>>> Created https://issues.apache.org/jira/browse/BEAM-5927
>>>
>>> On Tue, Oct 30, 2018 at 1:13 PM Lukasz Cwik  wrote:

 Udi, do you know if we have a bug tracking this issue?

 If not, can you file one referencing this e-mail thread?

 On Tue, Oct 30, 2018 at 6:33 AM Allie Chen  wrote:
>
> Thanks Udi. I agree, since it works fine removing either the side input 
> or the last flatten and combine operation.
>
> On Mon, Oct 29, 2018 at 9:02 PM Udi Meiri  wrote:
>>
>> This looks like a FnApiRunner bug.
>> When I override use_fnapi_runner = False in direct_runner.py the 
>> pipeline works.
>>
>> It seems like either the side-input to _copy_number or the Flatten 
>> operation is the culprit.
>>
>> On Mon, Oct 29, 2018 at 2:37 PM Allie Chen  wrote:
>>>
>>> Hi,
>>>
>>> I have a project that started failing with DirectRunner, but works well 
>>> using DataflowRunner (last working version is 2.4). The error message I 
>>> received are:
>>> line 1088, in run_stage
>>>   pipeline_components.pcollections[actual_pcoll_id].coder_id]]
>>> KeyError: u'ref_Coder_WindowedValueCoder_1'
>>>
>>> I have simplified the pipeline to the following example. Can someone 
>>> please take a look? Many thanks!
>>>
>>> Allie
>>>
>>>
>>> import apache_beam as beam
>>> import argparse
>>> from apache_beam import transforms
>>> from apache_beam import pvalue
>>> from apache_beam.options import pipeline_options
>>>
>>>
>>> def _copy_number(number, side=None):
>>>   yield number
>>>
>>>
>>> def fn_sum(values):
>>>   return sum(values)
>>>
>>>
>>> def run(argv=None):
>>>   parser = argparse.ArgumentParser()
>>>   _, pipeline_args = parser.parse_known_args(argv)
>>>   options = pipeline_options.PipelineOptions(pipeline_args)
>>>   numbers = [1, 2]
>>>   with beam.Pipeline(options=options) as p:
>>> sum_1 = (p
>>>  | 'ReadNumber1' >> transforms.Create(numbers)
>>>  | 'CalculateSum1' >> beam.CombineGlobally(fn_sum))
>>>
>>> sum_2 = (p
>>>  | 'ReadNumber2' >> transforms.Create(numbers)
>>>  | beam.ParDo(_copy_number, pvalue.AsSingleton(sum_1))
>>>  | 'CalculateSum2' >> beam.CombineGlobally(fn_sum))
>>>
>>> _ = ((sum_1, sum_2)
>>>  | beam.Flatten()
>>>  | 'CalculateSum3' >> beam.CombineGlobally(fn_sum)
>>>  | beam.io.WriteToText('gs://BUCKET/sum'))
>>>
>>>
>>>


Re: error with DirectRunner

2019-01-09 Thread Allie Chen
Greetings!

May I ask whether there is any plan to work on this issue? Or if I just use
`BundleBasedDirectRunner` instead of `DirectRunner`, will there be any
performance issues/caveats I should worry about?

Thanks!
Allie

On Tue, Oct 30, 2018 at 8:13 PM Udi Meiri  wrote:

> +Robert Bradshaw  I would be happy to debug and fix
> this, but I'd need more guidance on where to look.
>
> On Tue, Oct 30, 2018 at 4:07 PM Udi Meiri  wrote:
>
>> Created https://issues.apache.org/jira/browse/BEAM-5927
>>
>> On Tue, Oct 30, 2018 at 1:13 PM Lukasz Cwik  wrote:
>>
>>> Udi, do you know if we have a bug tracking this issue?
>>>
>>> If not, can you file one referencing this e-mail thread?
>>>
>>> On Tue, Oct 30, 2018 at 6:33 AM Allie Chen 
>>> wrote:
>>>
 Thanks Udi. I agree, since it works fine removing either the side input
 or the last flatten and combine operation.

 On Mon, Oct 29, 2018 at 9:02 PM Udi Meiri  wrote:

> This looks like a FnApiRunner bug.
> When I override use_fnapi_runner = False in direct_runner.py the
> pipeline works.
>
> It seems like either the side-input to _copy_number or the Flatten
> operation is the culprit.
>
> On Mon, Oct 29, 2018 at 2:37 PM Allie Chen 
> wrote:
>
>> Hi,
>>
>> I have a project that started failing with DirectRunner, but works
>> well using DataflowRunner (last working version is 2.4). The error 
>> message
>> I received are:
>> line 1088, in run_stage
>>   pipeline_components.pcollections[actual_pcoll_id].coder_id]]
>> KeyError: u'ref_Coder_WindowedValueCoder_1'
>>
>> I have simplified the pipeline to the following example. Can someone
>> please take a look? Many thanks!
>>
>> Allie
>>
>>
>> import apache_beam as beam
>> import argparse
>> from apache_beam import transforms
>> from apache_beam import pvalue
>> from apache_beam.options import pipeline_options
>>
>>
>> def _copy_number(number, side=None):
>>   yield number
>>
>>
>> def fn_sum(values):
>>   return sum(values)
>>
>>
>> def run(argv=None):
>>   parser = argparse.ArgumentParser()
>>   _, pipeline_args = parser.parse_known_args(argv)
>>   options = pipeline_options.PipelineOptions(pipeline_args)
>>   numbers = [1, 2]
>>   with beam.Pipeline(options=options) as p:
>> sum_1 = (p
>>  | 'ReadNumber1' >> transforms.Create(numbers)
>>  | 'CalculateSum1' >> beam.CombineGlobally(fn_sum))
>>
>> sum_2 = (p
>>  | 'ReadNumber2' >> transforms.Create(numbers)
>>  | beam.ParDo(_copy_number, pvalue.AsSingleton(sum_1))
>>  | 'CalculateSum2' >> beam.CombineGlobally(fn_sum))
>>
>> _ = ((sum_1, sum_2)
>>  | beam.Flatten()
>>  | 'CalculateSum3' >> beam.CombineGlobally(fn_sum)
>>  | beam.io.WriteToText('gs://BUCKET/sum'))
>>
>>
>>
>>


Re: error with DirectRunner

2018-10-30 Thread Udi Meiri
+Robert Bradshaw  I would be happy to debug and fix
this, but I'd need more guidance on where to look.

On Tue, Oct 30, 2018 at 4:07 PM Udi Meiri  wrote:

> Created https://issues.apache.org/jira/browse/BEAM-5927
>
> On Tue, Oct 30, 2018 at 1:13 PM Lukasz Cwik  wrote:
>
>> Udi, do you know if we have a bug tracking this issue?
>>
>> If not, can you file one referencing this e-mail thread?
>>
>> On Tue, Oct 30, 2018 at 6:33 AM Allie Chen  wrote:
>>
>>> Thanks Udi. I agree, since it works fine removing either the side input
>>> or the last flatten and combine operation.
>>>
>>> On Mon, Oct 29, 2018 at 9:02 PM Udi Meiri  wrote:
>>>
 This looks like a FnApiRunner bug.
 When I override use_fnapi_runner = False in direct_runner.py the
 pipeline works.

 It seems like either the side-input to _copy_number or the Flatten
 operation is the culprit.

 On Mon, Oct 29, 2018 at 2:37 PM Allie Chen 
 wrote:

> Hi,
>
> I have a project that started failing with DirectRunner, but works
> well using DataflowRunner (last working version is 2.4). The error message
> I received are:
> line 1088, in run_stage
>   pipeline_components.pcollections[actual_pcoll_id].coder_id]]
> KeyError: u'ref_Coder_WindowedValueCoder_1'
>
> I have simplified the pipeline to the following example. Can someone
> please take a look? Many thanks!
>
> Allie
>
>
> import apache_beam as beam
> import argparse
> from apache_beam import transforms
> from apache_beam import pvalue
> from apache_beam.options import pipeline_options
>
>
> def _copy_number(number, side=None):
>   yield number
>
>
> def fn_sum(values):
>   return sum(values)
>
>
> def run(argv=None):
>   parser = argparse.ArgumentParser()
>   _, pipeline_args = parser.parse_known_args(argv)
>   options = pipeline_options.PipelineOptions(pipeline_args)
>   numbers = [1, 2]
>   with beam.Pipeline(options=options) as p:
> sum_1 = (p
>  | 'ReadNumber1' >> transforms.Create(numbers)
>  | 'CalculateSum1' >> beam.CombineGlobally(fn_sum))
>
> sum_2 = (p
>  | 'ReadNumber2' >> transforms.Create(numbers)
>  | beam.ParDo(_copy_number, pvalue.AsSingleton(sum_1))
>  | 'CalculateSum2' >> beam.CombineGlobally(fn_sum))
>
> _ = ((sum_1, sum_2)
>  | beam.Flatten()
>  | 'CalculateSum3' >> beam.CombineGlobally(fn_sum)
>  | beam.io.WriteToText('gs://BUCKET/sum'))
>
>
>
>


smime.p7s
Description: S/MIME Cryptographic Signature


Re: error with DirectRunner

2018-10-30 Thread Udi Meiri
Created https://issues.apache.org/jira/browse/BEAM-5927

On Tue, Oct 30, 2018 at 1:13 PM Lukasz Cwik  wrote:

> Udi, do you know if we have a bug tracking this issue?
>
> If not, can you file one referencing this e-mail thread?
>
> On Tue, Oct 30, 2018 at 6:33 AM Allie Chen  wrote:
>
>> Thanks Udi. I agree, since it works fine removing either the side input
>> or the last flatten and combine operation.
>>
>> On Mon, Oct 29, 2018 at 9:02 PM Udi Meiri  wrote:
>>
>>> This looks like a FnApiRunner bug.
>>> When I override use_fnapi_runner = False in direct_runner.py the
>>> pipeline works.
>>>
>>> It seems like either the side-input to _copy_number or the Flatten
>>> operation is the culprit.
>>>
>>> On Mon, Oct 29, 2018 at 2:37 PM Allie Chen 
>>> wrote:
>>>
 Hi,

 I have a project that started failing with DirectRunner, but works well
 using DataflowRunner (last working version is 2.4). The error message I
 received are:
 line 1088, in run_stage
   pipeline_components.pcollections[actual_pcoll_id].coder_id]]
 KeyError: u'ref_Coder_WindowedValueCoder_1'

 I have simplified the pipeline to the following example. Can someone
 please take a look? Many thanks!

 Allie


 import apache_beam as beam
 import argparse
 from apache_beam import transforms
 from apache_beam import pvalue
 from apache_beam.options import pipeline_options


 def _copy_number(number, side=None):
   yield number


 def fn_sum(values):
   return sum(values)


 def run(argv=None):
   parser = argparse.ArgumentParser()
   _, pipeline_args = parser.parse_known_args(argv)
   options = pipeline_options.PipelineOptions(pipeline_args)
   numbers = [1, 2]
   with beam.Pipeline(options=options) as p:
 sum_1 = (p
  | 'ReadNumber1' >> transforms.Create(numbers)
  | 'CalculateSum1' >> beam.CombineGlobally(fn_sum))

 sum_2 = (p
  | 'ReadNumber2' >> transforms.Create(numbers)
  | beam.ParDo(_copy_number, pvalue.AsSingleton(sum_1))
  | 'CalculateSum2' >> beam.CombineGlobally(fn_sum))

 _ = ((sum_1, sum_2)
  | beam.Flatten()
  | 'CalculateSum3' >> beam.CombineGlobally(fn_sum)
  | beam.io.WriteToText('gs://BUCKET/sum'))






smime.p7s
Description: S/MIME Cryptographic Signature


Re: error with DirectRunner

2018-10-30 Thread Lukasz Cwik
Udi, do you know if we have a bug tracking this issue?

If not, can you file one referencing this e-mail thread?

On Tue, Oct 30, 2018 at 6:33 AM Allie Chen  wrote:

> Thanks Udi. I agree, since it works fine removing either the side input or
> the last flatten and combine operation.
>
> On Mon, Oct 29, 2018 at 9:02 PM Udi Meiri  wrote:
>
>> This looks like a FnApiRunner bug.
>> When I override use_fnapi_runner = False in direct_runner.py the pipeline
>> works.
>>
>> It seems like either the side-input to _copy_number or the Flatten
>> operation is the culprit.
>>
>> On Mon, Oct 29, 2018 at 2:37 PM Allie Chen  wrote:
>>
>>> Hi,
>>>
>>> I have a project that started failing with DirectRunner, but works well
>>> using DataflowRunner (last working version is 2.4). The error message I
>>> received are:
>>> line 1088, in run_stage
>>>   pipeline_components.pcollections[actual_pcoll_id].coder_id]]
>>> KeyError: u'ref_Coder_WindowedValueCoder_1'
>>>
>>> I have simplified the pipeline to the following example. Can someone
>>> please take a look? Many thanks!
>>>
>>> Allie
>>>
>>>
>>> import apache_beam as beam
>>> import argparse
>>> from apache_beam import transforms
>>> from apache_beam import pvalue
>>> from apache_beam.options import pipeline_options
>>>
>>>
>>> def _copy_number(number, side=None):
>>>   yield number
>>>
>>>
>>> def fn_sum(values):
>>>   return sum(values)
>>>
>>>
>>> def run(argv=None):
>>>   parser = argparse.ArgumentParser()
>>>   _, pipeline_args = parser.parse_known_args(argv)
>>>   options = pipeline_options.PipelineOptions(pipeline_args)
>>>   numbers = [1, 2]
>>>   with beam.Pipeline(options=options) as p:
>>> sum_1 = (p
>>>  | 'ReadNumber1' >> transforms.Create(numbers)
>>>  | 'CalculateSum1' >> beam.CombineGlobally(fn_sum))
>>>
>>> sum_2 = (p
>>>  | 'ReadNumber2' >> transforms.Create(numbers)
>>>  | beam.ParDo(_copy_number, pvalue.AsSingleton(sum_1))
>>>  | 'CalculateSum2' >> beam.CombineGlobally(fn_sum))
>>>
>>> _ = ((sum_1, sum_2)
>>>  | beam.Flatten()
>>>  | 'CalculateSum3' >> beam.CombineGlobally(fn_sum)
>>>  | beam.io.WriteToText('gs://BUCKET/sum'))
>>>
>>>
>>>
>>>


Re: error with DirectRunner

2018-10-30 Thread Allie Chen
Thanks Udi. I agree, since it works fine removing either the side input or
the last flatten and combine operation.

On Mon, Oct 29, 2018 at 9:02 PM Udi Meiri  wrote:

> This looks like a FnApiRunner bug.
> When I override use_fnapi_runner = False in direct_runner.py the pipeline
> works.
>
> It seems like either the side-input to _copy_number or the Flatten
> operation is the culprit.
>
> On Mon, Oct 29, 2018 at 2:37 PM Allie Chen  wrote:
>
>> Hi,
>>
>> I have a project that started failing with DirectRunner, but works well
>> using DataflowRunner (last working version is 2.4). The error message I
>> received are:
>> line 1088, in run_stage
>>   pipeline_components.pcollections[actual_pcoll_id].coder_id]]
>> KeyError: u'ref_Coder_WindowedValueCoder_1'
>>
>> I have simplified the pipeline to the following example. Can someone
>> please take a look? Many thanks!
>>
>> Allie
>>
>>
>> import apache_beam as beam
>> import argparse
>> from apache_beam import transforms
>> from apache_beam import pvalue
>> from apache_beam.options import pipeline_options
>>
>>
>> def _copy_number(number, side=None):
>>   yield number
>>
>>
>> def fn_sum(values):
>>   return sum(values)
>>
>>
>> def run(argv=None):
>>   parser = argparse.ArgumentParser()
>>   _, pipeline_args = parser.parse_known_args(argv)
>>   options = pipeline_options.PipelineOptions(pipeline_args)
>>   numbers = [1, 2]
>>   with beam.Pipeline(options=options) as p:
>> sum_1 = (p
>>  | 'ReadNumber1' >> transforms.Create(numbers)
>>  | 'CalculateSum1' >> beam.CombineGlobally(fn_sum))
>>
>> sum_2 = (p
>>  | 'ReadNumber2' >> transforms.Create(numbers)
>>  | beam.ParDo(_copy_number, pvalue.AsSingleton(sum_1))
>>  | 'CalculateSum2' >> beam.CombineGlobally(fn_sum))
>>
>> _ = ((sum_1, sum_2)
>>  | beam.Flatten()
>>  | 'CalculateSum3' >> beam.CombineGlobally(fn_sum)
>>  | beam.io.WriteToText('gs://BUCKET/sum'))
>>
>>
>>
>>


Re: error with DirectRunner

2018-10-29 Thread Udi Meiri
This looks like a FnApiRunner bug.
When I override use_fnapi_runner = False in direct_runner.py the pipeline
works.

It seems like either the side-input to _copy_number or the Flatten
operation is the culprit.

On Mon, Oct 29, 2018 at 2:37 PM Allie Chen  wrote:

> Hi,
>
> I have a project that started failing with DirectRunner, but works well
> using DataflowRunner (last working version is 2.4). The error message I
> received are:
> line 1088, in run_stage
>   pipeline_components.pcollections[actual_pcoll_id].coder_id]]
> KeyError: u'ref_Coder_WindowedValueCoder_1'
>
> I have simplified the pipeline to the following example. Can someone
> please take a look? Many thanks!
>
> Allie
>
>
> import apache_beam as beam
> import argparse
> from apache_beam import transforms
> from apache_beam import pvalue
> from apache_beam.options import pipeline_options
>
>
> def _copy_number(number, side=None):
>   yield number
>
>
> def fn_sum(values):
>   return sum(values)
>
>
> def run(argv=None):
>   parser = argparse.ArgumentParser()
>   _, pipeline_args = parser.parse_known_args(argv)
>   options = pipeline_options.PipelineOptions(pipeline_args)
>   numbers = [1, 2]
>   with beam.Pipeline(options=options) as p:
> sum_1 = (p
>  | 'ReadNumber1' >> transforms.Create(numbers)
>  | 'CalculateSum1' >> beam.CombineGlobally(fn_sum))
>
> sum_2 = (p
>  | 'ReadNumber2' >> transforms.Create(numbers)
>  | beam.ParDo(_copy_number, pvalue.AsSingleton(sum_1))
>  | 'CalculateSum2' >> beam.CombineGlobally(fn_sum))
>
> _ = ((sum_1, sum_2)
>  | beam.Flatten()
>  | 'CalculateSum3' >> beam.CombineGlobally(fn_sum)
>  | beam.io.WriteToText('gs://BUCKET/sum'))
>
>
>
>


smime.p7s
Description: S/MIME Cryptographic Signature


error with DirectRunner

2018-10-29 Thread Allie Chen
Hi,

I have a project that started failing with DirectRunner, but works well
using DataflowRunner (last working version is 2.4). The error message I
received are:
line 1088, in run_stage
  pipeline_components.pcollections[actual_pcoll_id].coder_id]]
KeyError: u'ref_Coder_WindowedValueCoder_1'

I have simplified the pipeline to the following example. Can someone please
take a look? Many thanks!

Allie


import apache_beam as beam
import argparse
from apache_beam import transforms
from apache_beam import pvalue
from apache_beam.options import pipeline_options


def _copy_number(number, side=None):
  yield number


def fn_sum(values):
  return sum(values)


def run(argv=None):
  parser = argparse.ArgumentParser()
  _, pipeline_args = parser.parse_known_args(argv)
  options = pipeline_options.PipelineOptions(pipeline_args)
  numbers = [1, 2]
  with beam.Pipeline(options=options) as p:
sum_1 = (p
 | 'ReadNumber1' >> transforms.Create(numbers)
 | 'CalculateSum1' >> beam.CombineGlobally(fn_sum))

sum_2 = (p
 | 'ReadNumber2' >> transforms.Create(numbers)
 | beam.ParDo(_copy_number, pvalue.AsSingleton(sum_1))
 | 'CalculateSum2' >> beam.CombineGlobally(fn_sum))

_ = ((sum_1, sum_2)
 | beam.Flatten()
 | 'CalculateSum3' >> beam.CombineGlobally(fn_sum)
 | beam.io.WriteToText('gs://BUCKET/sum'))