Hi I was developing a simple pipeline where I aggregate records by key and sum values for a predefined window. I was getting some errors, and after checking, I am getting exactly the same issues when running Wikipedia example from the Beam repo. The output is as follows: ------------------------------------------- INFO:root:Missing pipeline option (runner). Executing pipeline using the default runner: DirectRunner. INFO:root:==================== <function annotate_downstream_side_inputs at 0x7f333fc1fe60> ==================== INFO:root:==================== <function fix_side_input_pcoll_coders at 0x7f333fc1ff80> ==================== INFO:root:==================== <function lift_combiners at 0x7f333fc1d050> ==================== INFO:root:==================== <function expand_sdf at 0x7f333fc1d0e0> ==================== INFO:root:==================== <function expand_gbk at 0x7f333fc1d170> ==================== INFO:root:==================== <function sink_flattens at 0x7f333fc1d290> ==================== INFO:root:==================== <function greedily_fuse at 0x7f333fc1d320> ==================== INFO:root:==================== <function read_to_impulse at 0x7f333fc1d3b0> ==================== INFO:root:==================== <function impulse_to_input at 0x7f333fc1d440> ==================== INFO:root:==================== <function inject_timer_pcollections at 0x7f333fc1d5f0> ==================== INFO:root:==================== <function sort_stages at 0x7f333fc1d680> ==================== INFO:root:==================== <function window_pcollection_coders at 0x7f333fc1d710> ==================== INFO:root:Running ((((((ref_AppliedPTransform_ReadFromText/Read_3)+(ref_AppliedPTransform_ComputeTopSessions/ExtractUserAndTimestamp_5))+(ref_AppliedPTransform_ComputeTopSessions/Filter(<lambda at top_wikipedia_sessions.py:127>)_6))+(ref_AppliedPTransform_ComputeTopSessions/ComputeSessions/ComputeSessionsWindow_8))+(ref_AppliedPTransform_ComputeTopSessions/ComputeSessions/PerElement/PerElement:PairWithVoid_10))+(ComputeTopSessions/ComputeSessions/PerElement/CombinePerKey(CountCombineFn)/Precombine))+(ComputeTopSessions/ComputeSessions/PerElement/CombinePerKey(CountCombineFn)/Group/Write) INFO:root:Running (((((((ComputeTopSessions/ComputeSessions/PerElement/CombinePerKey(CountCombineFn)/Group/Read)+(ComputeTopSessions/ComputeSessions/PerElement/CombinePerKey(CountCombineFn)/Merge))+(ComputeTopSessions/ComputeSessions/PerElement/CombinePerKey(CountCombineFn)/ExtractOutputs))+(ref_AppliedPTransform_ComputeTopSessions/SessionsToStrings_18))+(ref_AppliedPTransform_ComputeTopSessions/TopPerMonth/TopPerMonthWindow_20))+(ref_AppliedPTransform_ComputeTopSessions/TopPerMonth/Top/KeyWithVoid_22))+(ComputeTopSessions/TopPerMonth/Top/CombinePerKey/Precombine))+(ComputeTopSessions/TopPerMonth/Top/CombinePerKey/Group/Write) INFO:root:Running (((ref_AppliedPTransform_WriteToText/Write/WriteImpl/DoOnce/Read_36)+(ref_AppliedPTransform_WriteToText/Write/WriteImpl/InitializeWrite_37))+(ref_PCollection_PCollection_19/Write))+(ref_PCollection_PCollection_20/Write) INFO:root:Running ((((((((ComputeTopSessions/TopPerMonth/Top/CombinePerKey/Group/Read)+(ComputeTopSessions/TopPerMonth/Top/CombinePerKey/Merge))+(ComputeTopSessions/TopPerMonth/Top/CombinePerKey/ExtractOutputs))+(ref_AppliedPTransform_ComputeTopSessions/TopPerMonth/Top/UnKey_30))+(ref_AppliedPTransform_ComputeTopSessions/FormatOutput_31))+(ref_AppliedPTransform_WriteToText/Write/WriteImpl/WriteBundles_38))+(ref_AppliedPTransform_WriteToText/Write/WriteImpl/Pair_39))+(ref_AppliedPTransform_WriteToText/Write/WriteImpl/WindowInto(WindowIntoFn)_40))+(WriteToText/Write/WriteImpl/GroupByKey/Write) Traceback (most recent call last): File "apache_beam/runners/common.py", line 829, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method File "apache_beam/runners/common.py", line 403, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle File "apache_beam/runners/common.py", line 406, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle File "apache_beam/runners/common.py", line 982, in apache_beam.runners.common._OutputProcessor.finish_bundle_outputs File "apache_beam/runners/worker/operations.py", line 142, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 122, in apache_beam.runners.worker.operations.ConsumerSet.update_counters_start File "apache_beam/runners/worker/opcounters.py", line 196, in apache_beam.runners.worker.opcounters.OperationCounters.update_from File "apache_beam/runners/worker/opcounters.py", line 214, in apache_beam.runners.worker.opcounters.OperationCounters.do_sample File "apache_beam/coders/coder_impl.py", line 1014, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables File "apache_beam/coders/coder_impl.py", line 1030, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables File "apache_beam/coders/coder_impl.py", line 814, in apache_beam.coders.coder_impl.SequenceCoderImpl.estimate_size File "apache_beam/coders/coder_impl.py", line 828, in apache_beam.coders.coder_impl.SequenceCoderImpl.get_estimated_size_and_observables File "apache_beam/coders/coder_impl.py", line 145, in apache_beam.coders.coder_impl.CoderImpl.get_estimated_size_and_observables File "apache_beam/coders/coder_impl.py", line 494, in apache_beam.coders.coder_impl.IntervalWindowCoderImpl.estimate_size TypeError: Cannot convert GlobalWindow to apache_beam.utils.windowed_value._IntervalWindowBase
During handling of the above exception, another exception occurred: Traceback (most recent call last): File "top_wikipedia_sessions.py", line 171, in <module> run() File "top_wikipedia_sessions.py", line 166, in run | WriteToText(known_args.output)) File "/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/pipeline.py", line 426, in __exit__ self.run().wait_until_finish() File "/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/pipeline.py", line 406, in run self._options).run(False) File "/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/pipeline.py", line 419, in run return self.runner.run_pipeline(self, self._options) File "/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py", line 129, in run_pipeline return runner.run_pipeline(pipeline, options) File "/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner.py", line 366, in run_pipeline default_environment=self._default_environment)) File "/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner.py", line 373, in run_via_runner_api return self.run_stages(stage_context, stages) File "/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner.py", line 455, in run_stages stage_context.safe_coders) File "/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner.py", line 733, in _run_stage result, splits = bundle_manager.process_bundle(data_input, data_output) File "/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner.py", line 1688, in process_bundle part, expected_outputs), part_inputs): File "/home/pawel/miniconda3/envs/beam/lib/python3.7/concurrent/futures/_base.py", line 598, in result_iterator yield fs.pop().result() File "/home/pawel/miniconda3/envs/beam/lib/python3.7/concurrent/futures/_base.py", line 435, in result return self.__get_result() File "/home/pawel/miniconda3/envs/beam/lib/python3.7/concurrent/futures/_base.py", line 384, in __get_result raise self._exception File "/home/pawel/miniconda3/envs/beam/lib/python3.7/concurrent/futures/thread.py", line 57, in run result = self.fn(*self.args, **self.kwargs) File "/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner.py", line 1688, in <lambda> part, expected_outputs), part_inputs): File "/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner.py", line 1626, in process_bundle result_future = self._worker_handler.control_conn.push(process_bundle_req) File "/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner.py", line 1080, in push response = self.worker.do_instruction(request) File "/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 343, in do_instruction request.instruction_id) File "/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 369, in process_bundle bundle_processor.process_bundle(instruction_id)) File "/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 598, in process_bundle op.finish() File "apache_beam/runners/worker/operations.py", line 611, in apache_beam.runners.worker.operations.DoOperation.finish File "apache_beam/runners/worker/operations.py", line 612, in apache_beam.runners.worker.operations.DoOperation.finish File "apache_beam/runners/worker/operations.py", line 613, in apache_beam.runners.worker.operations.DoOperation.finish File "apache_beam/runners/common.py", line 847, in apache_beam.runners.common.DoFnRunner.finish File "apache_beam/runners/common.py", line 831, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method File "apache_beam/runners/common.py", line 872, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/future/utils/__init__.py", line 421, in raise_with_traceback raise exc.with_traceback(traceback) File "apache_beam/runners/common.py", line 829, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method File "apache_beam/runners/common.py", line 403, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle File "apache_beam/runners/common.py", line 406, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle File "apache_beam/runners/common.py", line 982, in apache_beam.runners.common._OutputProcessor.finish_bundle_outputs File "apache_beam/runners/worker/operations.py", line 142, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 122, in apache_beam.runners.worker.operations.ConsumerSet.update_counters_start File "apache_beam/runners/worker/opcounters.py", line 196, in apache_beam.runners.worker.opcounters.OperationCounters.update_from File "apache_beam/runners/worker/opcounters.py", line 214, in apache_beam.runners.worker.opcounters.OperationCounters.do_sample File "apache_beam/coders/coder_impl.py", line 1014, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables File "apache_beam/coders/coder_impl.py", line 1030, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables File "apache_beam/coders/coder_impl.py", line 814, in apache_beam.coders.coder_impl.SequenceCoderImpl.estimate_size File "apache_beam/coders/coder_impl.py", line 828, in apache_beam.coders.coder_impl.SequenceCoderImpl.get_estimated_size_and_observables File "apache_beam/coders/coder_impl.py", line 145, in apache_beam.coders.coder_impl.CoderImpl.get_estimated_size_and_observables File "apache_beam/coders/coder_impl.py", line 494, in apache_beam.coders.coder_impl.IntervalWindowCoderImpl.estimate_size TypeError: Cannot convert GlobalWindow to apache_beam.utils.windowed_value._IntervalWindowBase [while running 'WriteToText/Write/WriteImpl/WriteBundles'] ------------------------------------------------------- To run it I've downloaded a single json with Wiki data and run is as follows (running from BEAM_REPO/sdks/python/apache_beam/examples/complete *python top_wikipedia_sessions.py --input /data/wiki/wiki_data-000000000492.json --output /tmp/beam/wiki* This fails somewhere close to the end, in fact I can find some results (possibly complete) in */tmp/beam/beam-temp-wiki-5971176ad49411e9b16448ba4ef75ccc/c18aa9d5-221f-4dc3-bc1a-83c101ee54ba.wiki* I tried to find the exact cause in the code but I don't understand Beam's codebase enough, would appreciate some hints/explanations. There is a question on SO <https://stackoverflow.com/questions/54745869/how-to-create-a-dataflow-pipeline-from-pub-sub-to-gcs-in-python/54791913#54791913>, but that doesn't help much in explaining why this error is observed. Thanks! Pawel -- This email and any files transmitted with it are confidential and intended solely for the use of the individual or entity to whom they are addressed. If you have received this email in error please notify the system manager. This message contains confidential information and is intended only for the individual named. If you are not the named addressee you should not disseminate, distribute or copy this e-mail. Please notify the sender immediately by e-mail if you have received this e-mail by mistake and delete this e-mail from your system. If you are not the intended recipient you are notified that disclosing, copying, distributing or taking any action in reliance on the contents of this information is strictly prohibited.