Hello, Thanks for your reply. Does that mean every single current and future I/O connector that is supported via the multi-language pipelines framework requires its own ipython magic? It'd be a lot more convenient if key connectors are supported natively as I have really good user experience with PubSub.
Cheers, Jaehyeon On Fri, 8 Mar 2024 at 05:59, Ning Kang via user <[email protected]> wrote: > Python SDK's ReadFromKafka is an external transform implemented in Java, > which is similar to SqlTransform. InteractiveRunner doesn't support it. > > That being said, if you want to implement an interactive interaction with > external transforms, you may follow the workaround for SQL ( > https://cloud.google.com/dataflow/docs/guides/notebook-advanced#beam-sql). > The source code is > https://github.com/apache/beam/tree/master/sdks/python/apache_beam/runners/interactive/sql > . > > > Ning. > > On Wed, Mar 6, 2024 at 9:50 PM Jaehyeon Kim <[email protected]> wrote: > >> Hello, >> >> I'm playing with the interactive runner on a notebook and the flink >> runner is used as the underlying runner. I wonder if it can read messages >> from Kafka. I checked the example notebook >> <https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/interactive/examples/Interactive%20Beam%20Running%20on%20Flink.ipynb> >> and >> it works. However I cannot read Kafka messages with the following error. >> >> KeyError: 'beam:transform:org.apache.beam:kafka_read_with_metadata:v2' >> >> Cheers, >> Jaehyeon >> >> *Here is the source.* >> >> pipeline_opts = { >> "job_name": "kafka-io", >> "environment_type": "LOOPBACK", >> "streaming": True, >> "parallelism": 3, >> "experiments": [ >> "use_deprecated_read" >> ], ## https://github.com/apache/beam/issues/20979 >> "checkpointing_interval": "60000", >> } >> options = PipelineOptions([], **pipeline_opts) >> # Required, else it will complain that when importing worker functions >> options.view_as(SetupOptions).save_main_session = True >> >> p = beam.Pipeline( >> >> interactive_runner.InteractiveRunner(underlying_runner=flink_runner.FlinkRunner()), >> options=options >> ) >> events = ( >> p >> | "Read from Kafka" >> >> kafka.ReadFromKafka( >> consumer_config={ >> "bootstrap.servers": os.getenv( >> "BOOTSTRAP_SERVERS", >> "host.docker.internal:29092", >> ), >> "auto.offset.reset": "earliest", >> # "enable.auto.commit": "true", >> "group.id": "kafka-io", >> }, >> topics=["website-visit"], >> ) >> | "Decode messages" >> beam.Map(decode_message) >> | "Parse elements" >> beam.Map(parse_json).with_output_types(EventLog >> ) >> ) >> results = p.run() >> result.wait_until_finish() >> >> *And here is the full error message.* >> >> WARNING:apache_beam.options.pipeline_options:Discarding invalid overrides: >> {'checkpointing_interval': '60000'} >> >> ---------------------------------------------------------------------------KeyError >> Traceback (most recent call last) >> Cell In[17], line 36 15 p = beam.Pipeline( 16 >> interactive_runner.InteractiveRunner(underlying_runner=flink_runner.FlinkRunner()), >> options=options 17 ) 18 events = ( 19 p 20 | "Read >> from Kafka" (...) 34 | "Parse elements" >> >> beam.Map(parse_json).with_output_types(EventLog) 35 )---> 36 results = >> p.run() 37 result.wait_until_finish() 38 # >> ib.options.recording_duration = "120s" 39 # ib.show(events) >> >> File >> ~/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/pipeline.py:586 >> >> <http://localhost:8888/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/pipeline.py#line=585>, >> in Pipeline.run(self, test_runner_api) 584 finally: 585 >> shutil.rmtree(tmpdir)--> 586 return self.runner.run_pipeline(self, >> self._options) 587 finally: 588 if not is_in_ipython(): >> >> File >> ~/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/interactive/interactive_runner.py:148 >> >> <http://localhost:8888/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/interactive/interactive_runner.py#line=147>, >> in InteractiveRunner.run_pipeline(self, pipeline, options) 145 if >> isinstance(self._underlying_runner, FlinkRunner): 146 >> self.configure_for_flink(user_pipeline, options)--> 148 pipeline_instrument >> = inst.build_pipeline_instrument(pipeline, options) 150 # The >> user_pipeline analyzed might be None if the pipeline given has nothing >> 151 # to be cached and tracing back to the user defined pipeline is >> impossible. 152 # When it's None, there is no need to cache including the >> background 153 # caching job and no result to track since no background >> caching job is 154 # started at all. 155 if user_pipeline: 156 # >> Should use the underlying runner and run asynchronously. >> >> File >> ~/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/interactive/pipeline_instrument.py:756 >> >> <http://localhost:8888/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/interactive/pipeline_instrument.py#line=755>, >> in build_pipeline_instrument(pipeline, options) 742 def >> build_pipeline_instrument(pipeline, options=None): 743 """Creates >> PipelineInstrument for a pipeline and its options with cache. 744 745 >> Throughout the process, the returned PipelineInstrument snapshots the >> given (...) 754 runner pipeline to apply interactivity. 755 >> """--> 756 pi = PipelineInstrument(pipeline, options) 757 >> pi.preprocess() 758 pi.instrument() # Instruments the pipeline only >> once. >> >> File >> ~/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/interactive/pipeline_instrument.py:71 >> >> <http://localhost:8888/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/interactive/pipeline_instrument.py#line=70>, >> in PipelineInstrument.__init__(self, pipeline, options) 67 if >> background_caching_job.has_source_to_cache(self._user_pipeline): 68 >> self._cache_manager = ie.current_env().get_cache_manager( 69 >> self._user_pipeline)---> 71 self._background_caching_pipeline = >> beam.pipeline.Pipeline.from_runner_api( 72 pipeline.to_runner_api(), >> pipeline.runner, options) 73 ie.current_env().add_derived_pipeline( >> 74 self._pipeline, self._background_caching_pipeline) 76 # Snapshot >> of original pipeline information. >> >> File >> ~/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/pipeline.py:1020 >> >> <http://localhost:8888/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/pipeline.py#line=1019>, >> in Pipeline.from_runner_api(proto, runner, options, return_context) 1018 >> if proto.root_transform_ids: 1019 root_transform_id, = >> proto.root_transform_ids-> 1020 p.transforms_stack = >> [context.transforms.get_by_id(root_transform_id)] 1021 else: 1022 >> p.transforms_stack = [AppliedPTransform(None, None, '', None)] >> >> File >> ~/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/pipeline_context.py:114 >> >> <http://localhost:8888/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/pipeline_context.py#line=113>, >> in _PipelineContextMap.get_by_id(self, id) 111 def get_by_id(self, id): >> 112 # type: (str) -> PortableObjectT 113 if id not in >> self._id_to_obj:--> 114 self._id_to_obj[id] = >> self._obj_type.from_runner_api( 115 self._id_to_proto[id], >> self._pipeline_context) 116 return self._id_to_obj[id] >> >> File >> ~/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/pipeline.py:1456 >> >> <http://localhost:8888/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/pipeline.py#line=1455>, >> in AppliedPTransform.from_runner_api(proto, context) 1454 result.parts = >> [] 1455 for transform_id in proto.subtransforms:-> 1456 part = >> context.transforms.get_by_id(transform_id) 1457 part.parent = result >> 1458 result.add_part(part) >> >> File >> ~/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/pipeline_context.py:114 >> >> <http://localhost:8888/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/pipeline_context.py#line=113>, >> in _PipelineContextMap.get_by_id(self, id) 111 def get_by_id(self, id): >> 112 # type: (str) -> PortableObjectT 113 if id not in >> self._id_to_obj:--> 114 self._id_to_obj[id] = >> self._obj_type.from_runner_api( 115 self._id_to_proto[id], >> self._pipeline_context) 116 return self._id_to_obj[id] >> >> File >> ~/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/pipeline.py:1456 >> >> <http://localhost:8888/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/pipeline.py#line=1455>, >> in AppliedPTransform.from_runner_api(proto, context) 1454 result.parts = >> [] 1455 for transform_id in proto.subtransforms:-> 1456 part = >> context.transforms.get_by_id(transform_id) 1457 part.parent = result >> 1458 result.add_part(part) >> >> File >> ~/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/pipeline_context.py:114 >> >> <http://localhost:8888/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/pipeline_context.py#line=113>, >> in _PipelineContextMap.get_by_id(self, id) 111 def get_by_id(self, id): >> 112 # type: (str) -> PortableObjectT 113 if id not in >> self._id_to_obj:--> 114 self._id_to_obj[id] = >> self._obj_type.from_runner_api( 115 self._id_to_proto[id], >> self._pipeline_context) 116 return self._id_to_obj[id] >> >> File >> ~/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/pipeline.py:1426 >> >> <http://localhost:8888/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/pipeline.py#line=1425>, >> in AppliedPTransform.from_runner_api(proto, context) 1419 >> side_input_tags = [] 1421 main_inputs = { 1422 tag: >> context.pcollections.get_by_id(id) 1423 for (tag, id) in >> proto.inputs.items() if tag not in side_input_tags 1424 }-> 1426 transform >> = ptransform.PTransform.from_runner_api(proto, context) 1427 if transform >> and proto.environment_id: 1428 resource_hints = >> context.environments.get_by_id( 1429 >> proto.environment_id).resource_hints() >> >> File >> ~/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/transforms/ptransform.py:769 >> >> <http://localhost:8888/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/transforms/ptransform.py#line=768>, >> in PTransform.from_runner_api(cls, proto, context) 767 if proto is None >> or proto.spec is None or not proto.spec.urn: 768 return None--> 769 >> parameter_type, constructor = cls._known_urns[proto.spec.urn] 771 return >> constructor( 772 proto, 773 >> proto_utils.parse_Bytes(proto.spec.payload, parameter_type), 774 >> context) >> KeyError: 'beam:transform:org.apache.beam:kafka_read_with_metadata:v2' >> >> >>
