[ 
https://issues.apache.org/jira/browse/BEAM-5927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Bradshaw resolved BEAM-5927.
-----------------------------------
       Resolution: Duplicate
    Fix Version/s: Not applicable

> FnApiRunner KeyError
> --------------------
>
>                 Key: BEAM-5927
>                 URL: https://issues.apache.org/jira/browse/BEAM-5927
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>            Reporter: Udi Meiri
>            Assignee: Robert Bradshaw
>            Priority: Major
>             Fix For: Not applicable
>
>
> Code to recreate (modified slightly from 
> https://lists.apache.org/thread.html/e910bfe702a3c8c5b0902f5f1c2c51fb7b2574f1b4abc4d9efab4e0f@%3Cdev.beam.apache.org%3E):
> import apache_beam as beam
> import argparse
> from apache_beam import transforms
> from apache_beam import pvalue
> from apache_beam.options import pipeline_options
> import logging
> def _copy_number(number, side=None):
>   print '_copy_number:', number, side
>   yield number
> def fn_sum(values):
>   #print 'values', values
>   return sum(values)
> def run(argv=None):
>   parser = argparse.ArgumentParser()
>   _, pipeline_args = parser.parse_known_args(argv)
>   options = pipeline_options.PipelineOptions(pipeline_args)
>   #options.view_as(pipeline_options.StandardOptions).streaming = True
>   numbers = [1, 2]
>   with beam.Pipeline(options=options) as p:
>     sum_1 = (p
>              | 'ReadNumber1' >> transforms.Create(numbers)
>              | 'CalculateSum1' >> beam.CombineGlobally(fn_sum))
>     sum_2 = (p
>              | 'ReadNumber2' >> transforms.Create(numbers)
>              | beam.ParDo(_copy_number, pvalue.AsSingleton(sum_1))
>              | 'CalculateSum2' >> beam.CombineGlobally(fn_sum))
>     _ = ((sum_1, sum_2)
>          | beam.Flatten()
>          | 'CalculateSum3' >> beam.CombineGlobally(fn_sum)
>          | beam.io.WriteToText('gs://foo/sum'))
> logging.getLogger().setLevel(logging.INFO)
> run()
> Console:
> $ python test.py
> INFO:root:Missing pipeline option (runner). Executing pipeline using the 
> default runner: DirectRunner.
> INFO:root:==================== <function annotate_downstream_side_inputs at 
> 0x7f2b7088e758> ====================
> INFO:root:==================== <function fix_side_input_pcoll_coders at 
> 0x7f2b7088e7d0> ====================
> INFO:root:==================== <function lift_combiners at 0x7f2b7088e5f0> 
> ====================
> INFO:root:==================== <function expand_gbk at 0x7f2b7088e668> 
> ====================
> INFO:root:==================== <function sink_flattens at 0x7f2b7088e6e0> 
> ====================
> INFO:root:==================== <function greedily_fuse at 0x7f2b7088e848> 
> ====================
> INFO:root:==================== <function impulse_to_input at 0x7f2b7088e578> 
> ====================
> INFO:root:==================== <function inject_timer_pcollections at 
> 0x7f2b7088e8c0> ====================
> INFO:root:==================== <function sort_stages at 0x7f2b7088e938> 
> ====================
> INFO:root:Running 
> ((ref_AppliedPTransform_ReadNumber1/Read_3)+((ref_AppliedPTransform_CalculateSum1/KeyWithVoid_5)+(CalculateSum1/CombinePerKey/Precombine)))+(CalculateSum1/CombinePerKey/Group/Write)
> INFO:root:Running 
> ((CalculateSum1/CombinePerKey/Group/Read)+(CalculateSum1/CombinePerKey/Merge))+((CalculateSum1/CombinePerKey/ExtractOutputs)+((ref_AppliedPTransform_CalculateSum1/UnKey_13)+(ref_PCollection_PCollection_7/Write)))
> INFO:root:Running 
> ((ref_AppliedPTransform_CalculateSum1/DoOnce/Read_15)+(((ref_AppliedPTransform_CalculateSum1/InjectDefault_16)+(ref_PCollection_PCollection_9/Write))+(Flatten/Transcode/0)))+(Flatten/Write/0)
> INFO:root:Running 
> ((ref_AppliedPTransform_ReadNumber2/Read_18)+((ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_19)+((ref_AppliedPTransform_CalculateSum2/KeyWithVoid_21)+(CalculateSum2/CombinePerKey/Precombine))))+(CalculateSum2/CombinePerKey/Group/Write)
> Traceback (most recent call last):
>   File "test.py", line 41, in <module>
>     run()
>   File "test.py", line 38, in run
>     | beam.io.WriteToText('gs://foo/sum'))
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/pipeline.py", 
> line 425, in __exit__
>     self.run().wait_until_finish()
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/pipeline.py", 
> line 405, in run
>     self._options).run(False)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/pipeline.py", 
> line 418, in run
>     return self.runner.run_pipeline(self)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/direct/direct_runner.py",
>  line 139, in run_pipeline
>     return runner.run_pipeline(pipeline)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
>  line 238, in run_pipeline
>     return self.run_via_runner_api(pipeline.to_runner_api())
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
>  line 241, in run_via_runner_api
>     return self.run_stages(*self.create_stages(pipeline_proto))
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
>  line 1020, in run_stages
>     pcoll_buffers, safe_coders)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
>  line 1096, in run_stage
>     pipeline_components.pcollections[actual_pcoll_id].coder_id]]
> KeyError: u'coder_4'



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to