Hi Pawel, could you tell us which version of the Beam Python SDK you are using?
For the record, this looks like a known issue: https://issues.apache.org/jira/browse/BEAM-6860 Kyle Weaver | Software Engineer | github.com/ibzib | [email protected] On Wed, Sep 11, 2019 at 6:33 AM Paweł Kordek <[email protected]> wrote: > Hi > > I was developing a simple pipeline where I aggregate records by key and > sum values for a predefined window. I was getting some errors, and after > checking, I am getting exactly the same issues when running Wikipedia > example from the Beam repo. The output is as follows: > ------------------------------------------- > INFO:root:Missing pipeline option (runner). Executing pipeline using the > default runner: DirectRunner. > INFO:root:==================== <function annotate_downstream_side_inputs > at 0x7f333fc1fe60> ==================== > INFO:root:==================== <function fix_side_input_pcoll_coders at > 0x7f333fc1ff80> ==================== > INFO:root:==================== <function lift_combiners at 0x7f333fc1d050> > ==================== > INFO:root:==================== <function expand_sdf at 0x7f333fc1d0e0> > ==================== > INFO:root:==================== <function expand_gbk at 0x7f333fc1d170> > ==================== > INFO:root:==================== <function sink_flattens at 0x7f333fc1d290> > ==================== > INFO:root:==================== <function greedily_fuse at 0x7f333fc1d320> > ==================== > INFO:root:==================== <function read_to_impulse at > 0x7f333fc1d3b0> ==================== > INFO:root:==================== <function impulse_to_input at > 0x7f333fc1d440> ==================== > INFO:root:==================== <function inject_timer_pcollections at > 0x7f333fc1d5f0> ==================== > INFO:root:==================== <function sort_stages at 0x7f333fc1d680> > ==================== > INFO:root:==================== <function window_pcollection_coders at > 0x7f333fc1d710> ==================== > INFO:root:Running > ((((((ref_AppliedPTransform_ReadFromText/Read_3)+(ref_AppliedPTransform_ComputeTopSessions/ExtractUserAndTimestamp_5))+(ref_AppliedPTransform_ComputeTopSessions/Filter(<lambda > at > top_wikipedia_sessions.py:127>)_6))+(ref_AppliedPTransform_ComputeTopSessions/ComputeSessions/ComputeSessionsWindow_8))+(ref_AppliedPTransform_ComputeTopSessions/ComputeSessions/PerElement/PerElement:PairWithVoid_10))+(ComputeTopSessions/ComputeSessions/PerElement/CombinePerKey(CountCombineFn)/Precombine))+(ComputeTopSessions/ComputeSessions/PerElement/CombinePerKey(CountCombineFn)/Group/Write) > INFO:root:Running > (((((((ComputeTopSessions/ComputeSessions/PerElement/CombinePerKey(CountCombineFn)/Group/Read)+(ComputeTopSessions/ComputeSessions/PerElement/CombinePerKey(CountCombineFn)/Merge))+(ComputeTopSessions/ComputeSessions/PerElement/CombinePerKey(CountCombineFn)/ExtractOutputs))+(ref_AppliedPTransform_ComputeTopSessions/SessionsToStrings_18))+(ref_AppliedPTransform_ComputeTopSessions/TopPerMonth/TopPerMonthWindow_20))+(ref_AppliedPTransform_ComputeTopSessions/TopPerMonth/Top/KeyWithVoid_22))+(ComputeTopSessions/TopPerMonth/Top/CombinePerKey/Precombine))+(ComputeTopSessions/TopPerMonth/Top/CombinePerKey/Group/Write) > INFO:root:Running > (((ref_AppliedPTransform_WriteToText/Write/WriteImpl/DoOnce/Read_36)+(ref_AppliedPTransform_WriteToText/Write/WriteImpl/InitializeWrite_37))+(ref_PCollection_PCollection_19/Write))+(ref_PCollection_PCollection_20/Write) > INFO:root:Running > ((((((((ComputeTopSessions/TopPerMonth/Top/CombinePerKey/Group/Read)+(ComputeTopSessions/TopPerMonth/Top/CombinePerKey/Merge))+(ComputeTopSessions/TopPerMonth/Top/CombinePerKey/ExtractOutputs))+(ref_AppliedPTransform_ComputeTopSessions/TopPerMonth/Top/UnKey_30))+(ref_AppliedPTransform_ComputeTopSessions/FormatOutput_31))+(ref_AppliedPTransform_WriteToText/Write/WriteImpl/WriteBundles_38))+(ref_AppliedPTransform_WriteToText/Write/WriteImpl/Pair_39))+(ref_AppliedPTransform_WriteToText/Write/WriteImpl/WindowInto(WindowIntoFn)_40))+(WriteToText/Write/WriteImpl/GroupByKey/Write) > Traceback (most recent call last): > File "apache_beam/runners/common.py", line 829, in > apache_beam.runners.common.DoFnRunner._invoke_bundle_method > File "apache_beam/runners/common.py", line 403, in > apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle > File "apache_beam/runners/common.py", line 406, in > apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle > File "apache_beam/runners/common.py", line 982, in > apache_beam.runners.common._OutputProcessor.finish_bundle_outputs > File "apache_beam/runners/worker/operations.py", line 142, in > apache_beam.runners.worker.operations.SingletonConsumerSet.receive > File "apache_beam/runners/worker/operations.py", line 122, in > apache_beam.runners.worker.operations.ConsumerSet.update_counters_start > File "apache_beam/runners/worker/opcounters.py", line 196, in > apache_beam.runners.worker.opcounters.OperationCounters.update_from > File "apache_beam/runners/worker/opcounters.py", line 214, in > apache_beam.runners.worker.opcounters.OperationCounters.do_sample > File "apache_beam/coders/coder_impl.py", line 1014, in > apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables > File "apache_beam/coders/coder_impl.py", line 1030, in > apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables > File "apache_beam/coders/coder_impl.py", line 814, in > apache_beam.coders.coder_impl.SequenceCoderImpl.estimate_size > File "apache_beam/coders/coder_impl.py", line 828, in > apache_beam.coders.coder_impl.SequenceCoderImpl.get_estimated_size_and_observables > File "apache_beam/coders/coder_impl.py", line 145, in > apache_beam.coders.coder_impl.CoderImpl.get_estimated_size_and_observables > File "apache_beam/coders/coder_impl.py", line 494, in > apache_beam.coders.coder_impl.IntervalWindowCoderImpl.estimate_size > TypeError: Cannot convert GlobalWindow to > apache_beam.utils.windowed_value._IntervalWindowBase > > During handling of the above exception, another exception occurred: > > Traceback (most recent call last): > File "top_wikipedia_sessions.py", line 171, in <module> > run() > File "top_wikipedia_sessions.py", line 166, in run > | WriteToText(known_args.output)) > File > "/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/pipeline.py", > line 426, in __exit__ > self.run().wait_until_finish() > File > "/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/pipeline.py", > line 406, in run > self._options).run(False) > File > "/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/pipeline.py", > line 419, in run > return self.runner.run_pipeline(self, self._options) > File > "/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py", > line 129, in run_pipeline > return runner.run_pipeline(pipeline, options) > File > "/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner.py", > line 366, in run_pipeline > default_environment=self._default_environment)) > File > "/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner.py", > line 373, in run_via_runner_api > return self.run_stages(stage_context, stages) > File > "/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner.py", > line 455, in run_stages > stage_context.safe_coders) > File > "/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner.py", > line 733, in _run_stage > result, splits = bundle_manager.process_bundle(data_input, data_output) > File > "/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner.py", > line 1688, in process_bundle > part, expected_outputs), part_inputs): > File > "/home/pawel/miniconda3/envs/beam/lib/python3.7/concurrent/futures/_base.py", > line 598, in result_iterator > yield fs.pop().result() > File > "/home/pawel/miniconda3/envs/beam/lib/python3.7/concurrent/futures/_base.py", > line 435, in result > return self.__get_result() > File > "/home/pawel/miniconda3/envs/beam/lib/python3.7/concurrent/futures/_base.py", > line 384, in __get_result > raise self._exception > File > "/home/pawel/miniconda3/envs/beam/lib/python3.7/concurrent/futures/thread.py", > line 57, in run > result = self.fn(*self.args, **self.kwargs) > File > "/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner.py", > line 1688, in <lambda> > part, expected_outputs), part_inputs): > File > "/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner.py", > line 1626, in process_bundle > result_future = > self._worker_handler.control_conn.push(process_bundle_req) > File > "/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner.py", > line 1080, in push > response = self.worker.do_instruction(request) > File > "/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", > line 343, in do_instruction > request.instruction_id) > File > "/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", > line 369, in process_bundle > bundle_processor.process_bundle(instruction_id)) > File > "/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", > line 598, in process_bundle > op.finish() > File "apache_beam/runners/worker/operations.py", line 611, in > apache_beam.runners.worker.operations.DoOperation.finish > File "apache_beam/runners/worker/operations.py", line 612, in > apache_beam.runners.worker.operations.DoOperation.finish > File "apache_beam/runners/worker/operations.py", line 613, in > apache_beam.runners.worker.operations.DoOperation.finish > File "apache_beam/runners/common.py", line 847, in > apache_beam.runners.common.DoFnRunner.finish > File "apache_beam/runners/common.py", line 831, in > apache_beam.runners.common.DoFnRunner._invoke_bundle_method > File "apache_beam/runners/common.py", line 872, in > apache_beam.runners.common.DoFnRunner._reraise_augmented > File > "/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/future/utils/__init__.py", > line 421, in raise_with_traceback > raise exc.with_traceback(traceback) > File "apache_beam/runners/common.py", line 829, in > apache_beam.runners.common.DoFnRunner._invoke_bundle_method > File "apache_beam/runners/common.py", line 403, in > apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle > File "apache_beam/runners/common.py", line 406, in > apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle > File "apache_beam/runners/common.py", line 982, in > apache_beam.runners.common._OutputProcessor.finish_bundle_outputs > File "apache_beam/runners/worker/operations.py", line 142, in > apache_beam.runners.worker.operations.SingletonConsumerSet.receive > File "apache_beam/runners/worker/operations.py", line 122, in > apache_beam.runners.worker.operations.ConsumerSet.update_counters_start > File "apache_beam/runners/worker/opcounters.py", line 196, in > apache_beam.runners.worker.opcounters.OperationCounters.update_from > File "apache_beam/runners/worker/opcounters.py", line 214, in > apache_beam.runners.worker.opcounters.OperationCounters.do_sample > File "apache_beam/coders/coder_impl.py", line 1014, in > apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables > File "apache_beam/coders/coder_impl.py", line 1030, in > apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables > File "apache_beam/coders/coder_impl.py", line 814, in > apache_beam.coders.coder_impl.SequenceCoderImpl.estimate_size > File "apache_beam/coders/coder_impl.py", line 828, in > apache_beam.coders.coder_impl.SequenceCoderImpl.get_estimated_size_and_observables > File "apache_beam/coders/coder_impl.py", line 145, in > apache_beam.coders.coder_impl.CoderImpl.get_estimated_size_and_observables > File "apache_beam/coders/coder_impl.py", line 494, in > apache_beam.coders.coder_impl.IntervalWindowCoderImpl.estimate_size > TypeError: Cannot convert GlobalWindow to > apache_beam.utils.windowed_value._IntervalWindowBase [while running > 'WriteToText/Write/WriteImpl/WriteBundles'] > ------------------------------------------------------- > > To run it I've downloaded a single json with Wiki data and run is as > follows (running from BEAM_REPO/sdks/python/apache_beam/examples/complete > > *python top_wikipedia_sessions.py --input > /data/wiki/wiki_data-000000000492.json --output /tmp/beam/wiki* > > This fails somewhere close to the end, in fact I can find some results > (possibly complete) in > */tmp/beam/beam-temp-wiki-5971176ad49411e9b16448ba4ef75ccc/c18aa9d5-221f-4dc3-bc1a-83c101ee54ba.wiki* > > I tried to find the exact cause in the code but I don't understand Beam's > codebase enough, would appreciate some hints/explanations. There is a question > on SO > <https://stackoverflow.com/questions/54745869/how-to-create-a-dataflow-pipeline-from-pub-sub-to-gcs-in-python/54791913#54791913>, > but that doesn't help much in explaining why this error is observed. > > Thanks! > Pawel > > This email and any files transmitted with it are confidential and intended > solely for the use of the individual or entity to whom they are addressed. > If you have received this email in error please notify the system manager. > This message contains confidential information and is intended only for the > individual named. If you are not the named addressee you should not > disseminate, distribute or copy this e-mail. Please notify the sender > immediately by e-mail if you have received this e-mail by mistake and > delete this e-mail from your system. If you are not the intended recipient > you are notified that disclosing, copying, distributing or taking any > action in reliance on the contents of this information is strictly > prohibited. >
