Hello,

Thanks for your reply. Does that mean every single current and future I/O
connector that is supported via the multi-language pipelines framework
requires its own ipython magic? It'd be a lot more convenient if key
connectors are supported natively as I have really good user experience
with PubSub.

Cheers,
Jaehyeon

On Fri, 8 Mar 2024 at 05:59, Ning Kang via user <[email protected]>
wrote:

> Python SDK's ReadFromKafka is an external transform implemented in Java,
> which is similar to SqlTransform. InteractiveRunner doesn't support it.
>
> That being said, if you want to implement an interactive interaction with
> external transforms, you may follow the workaround for SQL (
> https://cloud.google.com/dataflow/docs/guides/notebook-advanced#beam-sql).
> The source code is
> https://github.com/apache/beam/tree/master/sdks/python/apache_beam/runners/interactive/sql
> .
>
>
> Ning.
>
> On Wed, Mar 6, 2024 at 9:50 PM Jaehyeon Kim <[email protected]> wrote:
>
>> Hello,
>>
>> I'm playing with the interactive runner on a notebook and the flink
>> runner is used as the underlying runner. I wonder if it can read messages
>> from Kafka. I checked the example notebook
>> <https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/interactive/examples/Interactive%20Beam%20Running%20on%20Flink.ipynb>
>>  and
>> it works. However I cannot read Kafka messages with the following error.
>>
>>  KeyError: 'beam:transform:org.apache.beam:kafka_read_with_metadata:v2'
>>
>> Cheers,
>> Jaehyeon
>>
>> *Here is the source.*
>>
>> pipeline_opts = {
>>     "job_name": "kafka-io",
>>     "environment_type": "LOOPBACK",
>>     "streaming": True,
>>     "parallelism": 3,
>>     "experiments": [
>>         "use_deprecated_read"
>>     ],  ## https://github.com/apache/beam/issues/20979
>>     "checkpointing_interval": "60000",
>> }
>> options = PipelineOptions([], **pipeline_opts)
>> # Required, else it will complain that when importing worker functions
>> options.view_as(SetupOptions).save_main_session = True
>>
>> p = beam.Pipeline(
>>     
>> interactive_runner.InteractiveRunner(underlying_runner=flink_runner.FlinkRunner()),
>> options=options
>> )
>> events = (
>>     p
>>     | "Read from Kafka"
>>     >> kafka.ReadFromKafka(
>>         consumer_config={
>>             "bootstrap.servers": os.getenv(
>>                 "BOOTSTRAP_SERVERS",
>>                 "host.docker.internal:29092",
>>             ),
>>             "auto.offset.reset": "earliest",
>>             # "enable.auto.commit": "true",
>>             "group.id": "kafka-io",
>>         },
>>         topics=["website-visit"],
>>     )
>>     | "Decode messages" >> beam.Map(decode_message)
>>     | "Parse elements" >> beam.Map(parse_json).with_output_types(EventLog
>> )
>> )
>> results = p.run()
>> result.wait_until_finish()
>>
>> *And here is the full error message.*
>>
>> WARNING:apache_beam.options.pipeline_options:Discarding invalid overrides: 
>> {'checkpointing_interval': '60000'}
>>
>> ---------------------------------------------------------------------------KeyError
>>                                   Traceback (most recent call last)
>> Cell In[17], line 36     15 p = beam.Pipeline(     16     
>> interactive_runner.InteractiveRunner(underlying_runner=flink_runner.FlinkRunner()),
>>  options=options     17 )     18 events = (     19     p     20     | "Read 
>> from Kafka"   (...)     34     | "Parse elements" >> 
>> beam.Map(parse_json).with_output_types(EventLog)     35 )---> 36 results = 
>> p.run()     37 result.wait_until_finish()     38 # 
>> ib.options.recording_duration = "120s"     39 # ib.show(events)
>>
>> File 
>> ~/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/pipeline.py:586
>>  
>> <http://localhost:8888/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/pipeline.py#line=585>,
>>  in Pipeline.run(self, test_runner_api)    584     finally:    585       
>> shutil.rmtree(tmpdir)--> 586   return self.runner.run_pipeline(self, 
>> self._options)    587 finally:    588   if not is_in_ipython():
>>
>> File 
>> ~/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/interactive/interactive_runner.py:148
>>  
>> <http://localhost:8888/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/interactive/interactive_runner.py#line=147>,
>>  in InteractiveRunner.run_pipeline(self, pipeline, options)    145 if 
>> isinstance(self._underlying_runner, FlinkRunner):    146   
>> self.configure_for_flink(user_pipeline, options)--> 148 pipeline_instrument 
>> = inst.build_pipeline_instrument(pipeline, options)    150 # The 
>> user_pipeline analyzed might be None if the pipeline given has nothing    
>> 151 # to be cached and tracing back to the user defined pipeline is 
>> impossible.    152 # When it's None, there is no need to cache including the 
>> background    153 # caching job and no result to track since no background 
>> caching job is    154 # started at all.    155 if user_pipeline:    156   # 
>> Should use the underlying runner and run asynchronously.
>>
>> File 
>> ~/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/interactive/pipeline_instrument.py:756
>>  
>> <http://localhost:8888/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/interactive/pipeline_instrument.py#line=755>,
>>  in build_pipeline_instrument(pipeline, options)    742 def 
>> build_pipeline_instrument(pipeline, options=None):    743   """Creates 
>> PipelineInstrument for a pipeline and its options with cache.    744     745 
>>   Throughout the process, the returned PipelineInstrument snapshots the 
>> given   (...)    754   runner pipeline to apply interactivity.    755   
>> """--> 756   pi = PipelineInstrument(pipeline, options)    757   
>> pi.preprocess()    758   pi.instrument()  # Instruments the pipeline only 
>> once.
>>
>> File 
>> ~/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/interactive/pipeline_instrument.py:71
>>  
>> <http://localhost:8888/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/interactive/pipeline_instrument.py#line=70>,
>>  in PipelineInstrument.__init__(self, pipeline, options)     67 if 
>> background_caching_job.has_source_to_cache(self._user_pipeline):     68   
>> self._cache_manager = ie.current_env().get_cache_manager(     69       
>> self._user_pipeline)---> 71 self._background_caching_pipeline = 
>> beam.pipeline.Pipeline.from_runner_api(     72     pipeline.to_runner_api(), 
>> pipeline.runner, options)     73 ie.current_env().add_derived_pipeline(     
>> 74     self._pipeline, self._background_caching_pipeline)     76 # Snapshot 
>> of original pipeline information.
>>
>> File 
>> ~/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/pipeline.py:1020
>>  
>> <http://localhost:8888/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/pipeline.py#line=1019>,
>>  in Pipeline.from_runner_api(proto, runner, options, return_context)   1018 
>> if proto.root_transform_ids:   1019   root_transform_id, = 
>> proto.root_transform_ids-> 1020   p.transforms_stack = 
>> [context.transforms.get_by_id(root_transform_id)]   1021 else:   1022   
>> p.transforms_stack = [AppliedPTransform(None, None, '', None)]
>>
>> File 
>> ~/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/pipeline_context.py:114
>>  
>> <http://localhost:8888/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/pipeline_context.py#line=113>,
>>  in _PipelineContextMap.get_by_id(self, id)    111 def get_by_id(self, id):  
>>   112   # type: (str) -> PortableObjectT    113   if id not in 
>> self._id_to_obj:--> 114     self._id_to_obj[id] = 
>> self._obj_type.from_runner_api(    115         self._id_to_proto[id], 
>> self._pipeline_context)    116   return self._id_to_obj[id]
>>
>> File 
>> ~/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/pipeline.py:1456
>>  
>> <http://localhost:8888/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/pipeline.py#line=1455>,
>>  in AppliedPTransform.from_runner_api(proto, context)   1454 result.parts = 
>> []   1455 for transform_id in proto.subtransforms:-> 1456   part = 
>> context.transforms.get_by_id(transform_id)   1457   part.parent = result   
>> 1458   result.add_part(part)
>>
>> File 
>> ~/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/pipeline_context.py:114
>>  
>> <http://localhost:8888/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/pipeline_context.py#line=113>,
>>  in _PipelineContextMap.get_by_id(self, id)    111 def get_by_id(self, id):  
>>   112   # type: (str) -> PortableObjectT    113   if id not in 
>> self._id_to_obj:--> 114     self._id_to_obj[id] = 
>> self._obj_type.from_runner_api(    115         self._id_to_proto[id], 
>> self._pipeline_context)    116   return self._id_to_obj[id]
>>
>> File 
>> ~/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/pipeline.py:1456
>>  
>> <http://localhost:8888/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/pipeline.py#line=1455>,
>>  in AppliedPTransform.from_runner_api(proto, context)   1454 result.parts = 
>> []   1455 for transform_id in proto.subtransforms:-> 1456   part = 
>> context.transforms.get_by_id(transform_id)   1457   part.parent = result   
>> 1458   result.add_part(part)
>>
>> File 
>> ~/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/pipeline_context.py:114
>>  
>> <http://localhost:8888/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/pipeline_context.py#line=113>,
>>  in _PipelineContextMap.get_by_id(self, id)    111 def get_by_id(self, id):  
>>   112   # type: (str) -> PortableObjectT    113   if id not in 
>> self._id_to_obj:--> 114     self._id_to_obj[id] = 
>> self._obj_type.from_runner_api(    115         self._id_to_proto[id], 
>> self._pipeline_context)    116   return self._id_to_obj[id]
>>
>> File 
>> ~/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/pipeline.py:1426
>>  
>> <http://localhost:8888/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/pipeline.py#line=1425>,
>>  in AppliedPTransform.from_runner_api(proto, context)   1419   
>> side_input_tags = []   1421 main_inputs = {   1422     tag: 
>> context.pcollections.get_by_id(id)   1423     for (tag, id) in 
>> proto.inputs.items() if tag not in side_input_tags   1424 }-> 1426 transform 
>> = ptransform.PTransform.from_runner_api(proto, context)   1427 if transform 
>> and proto.environment_id:   1428   resource_hints = 
>> context.environments.get_by_id(   1429       
>> proto.environment_id).resource_hints()
>>
>> File 
>> ~/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/transforms/ptransform.py:769
>>  
>> <http://localhost:8888/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/transforms/ptransform.py#line=768>,
>>  in PTransform.from_runner_api(cls, proto, context)    767 if proto is None 
>> or proto.spec is None or not proto.spec.urn:    768   return None--> 769 
>> parameter_type, constructor = cls._known_urns[proto.spec.urn]    771 return 
>> constructor(    772     proto,    773     
>> proto_utils.parse_Bytes(proto.spec.payload, parameter_type),    774     
>> context)
>> KeyError: 'beam:transform:org.apache.beam:kafka_read_with_metadata:v2'
>>
>>
>>

Reply via email to