I would suggest also including a more recent fix [1] or using the workaround mentioned in [2].
Thanks, Cham [1] https://github.com/apache/beam/pull/14306 [2] https://issues.apache.org/jira/browse/BEAM-11862?focusedCommentId=17305920&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17305920 On Tue, Mar 30, 2021 at 1:23 PM Brian Hulette <[email protected]> wrote: > +Chamikara Jayalath <[email protected]> > > Could you try with beam 2.27.0 or 2.28.0? I think that this PR [1] may > have addressed the issue. It avoids the problematic code when the pipeline > is multi-language [2]. > > [1] https://github.com/apache/beam/pull/13536 > [2] > https://github.com/apache/beam/blob/7eff49fae34e8d3c50716f5da14fa6bcc607fc67/sdks/python/apache_beam/pipeline.py#L524 > > On Tue, Mar 30, 2021 at 12:55 PM Maria-Irina Sandu <[email protected]> > wrote: > >> I'm trying to write to a Kafka topic using WriteTokafka module from >> apache_beam.io.kafka. >> The error I get is: >> >>> File "predict.py", line 162, in <module> >>> run() >>> File "predict.py", line 158, in run >>> topic = 'int.fitbit_explore.video_recommendations')) >>> File >>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py", >>> line 580, in __exit__ >>> self.result = self.run() >>> File >>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py", >>> line 530, in run >>> self._options).run(False) >>> File >>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py", >>> line 902, in from_runner_api >>> p.transforms_stack = [context.transforms.get_by_id(root_transform_id)] >>> File >>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py", >>> line 116, in get_by_id >>> self._id_to_proto[id], self._pipeline_context) >>> File >>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py", >>> line 1252, in from_runner_api >>> part = context.transforms.get_by_id(transform_id) >>> File >>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py", >>> line 116, in get_by_id >>> self._id_to_proto[id], self._pipeline_context) >>> File >>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py", >>> line 1252, in from_runner_api >>> part = context.transforms.get_by_id(transform_id) >>> File >>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py", >>> line 116, in get_by_id >>> self._id_to_proto[id], self._pipeline_context) >>> File >>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py", >>> line 1252, in from_runner_api >>> part = context.transforms.get_by_id(transform_id) >>> File >>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py", >>> line 116, in get_by_id >>> self._id_to_proto[id], self._pipeline_context) >>> File >>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py", >>> line 1252, in from_runner_api >>> part = context.transforms.get_by_id(transform_id) >>> File >>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py", >>> line 116, in get_by_id >>> self._id_to_proto[id], self._pipeline_context) >>> File >>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py", >>> line 1229, in from_runner_api >>> transform = ptransform.PTransform.from_runner_api(proto, context) >>> File >>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/transforms/ptransform.py", >>> line 733, in from_runner_api >>> context) >>> File >>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/transforms/core.py", >>> line 1420, in from_runner_api_parameter >>> pardo_payload.do_fn, context).serialized_dofn_data()) >>> File >>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/transforms/core.py", >>> line 1493, in from_runner_api >>> raise ValueError('Unexpected DoFn type: %s' % spec.urn) >>> ValueError: Unexpected DoFn type: beam:dofn:javasdk:0.1 >> >> >> The pipeline looks like this: >> >>> pipeline_options = PipelineOptions(argv) >>> with beam.Pipeline(options=pipeline_options) as p: >>> _ = (p | 'Create' >> beam.Create(['Start']) >>> | 'Read MDAU' >> >>> beam.io.textio.ReadFromText("gs://fit-recommend-system-testpy/saved_model/dummy_mdau.txt") >>> | 'Predict' >> beam.ParDo(PredictDoFn()) >>> | 'EncodeThrift' >> beam.ParDo(ThriftEncodeDoFn()) >>> | 'WriteToKafka' >> WriteToKafka(producer_config = {'bootstrap.servers' >>> : '<fitbit-bootstrap-server>:9092'}, >>> topic = '<internal_fitbit_topic>')) >> >> >> I replaced the bootstrap server and topic values with placeholders here >> because I'm not sure if I should show them or not. >> >> The ThriftEncodeDoFn function seems to work. It produces a tuple of bytes >> and it looks like this: >> >> class ThriftEncodeDoFn(beam.DoFn): def encode(self, element): >> video = VideosAndRatings() >> video.videoId = str(element['videoId']) >> video.rating = 5 >> video.index = 1 >> videosList = [video] >> recommendations = RecommendationsKafkaMessage() >> recommendations.userId = str(element['userId']) >> recommendations.videos = videosList >> recommendations.category = "DISCOVER_WORKOUTS" >> print(recommendations.userId, recommendations.category) >> trans = TTransport.TMemoryBuffer() >> proto = TBinaryProtocol.TBinaryProtocol(trans) >> recommendations.write(proto) encoded_data = bytes(trans.getvalue()) >> encoded_key = str(element['userId']).encode() return encoded_key, >> encoded_data >> >> def process(self, element) -> Iterable[Tuple[bytes,bytes]]: >> try: >> encoded_key, encoded_data = self.encode(element) >> yield (encoded_key, encoded_data) >> except Exception as e: >> print("encoding didn't work", e) >> yield TaggedOutput('encode_errors', f'element={element}, error={e}') >> >> The command I use to run the pipeline is this: >> >> python3 predict.py \ >> --work-dir gs://fit-recommend-system-testpy/saved_model \ >> --batch \ >> --project fit-recommend-system-int \ >> --runner DataflowRunner \ >> --setup_file ./setup.py \ >> --subnetwork https://www.googleapis.com/compute/v1/projects/< >> <https://www.googleapis.com/compute/v1/projects/fit-networking-glob/regions/us-central1/subnetworks/fit-networking-glob>fitbit-internal-subnetwork> >> \ >> --job_name prediction \ >> --region us-central1 \ >> --temp_location gs://fit-recommend-system-testpy/temp \ >> --staging_location gs://fit-recommend-system-testpy/staging \ >> --no_use_public_ips \ >> --sdk_harness_container_image_overrides >> ".*java.*,gcr.io/cloud-dataflow/v1beta3/beam_java8_sdk:2.26.0" \ >> --service_account_email >> [email protected] >> >> And I have installed apache beam with python3 -m pip install >> apache_beam[gcp]==2.26.0. >> >> Any help with this is much appreciated! >> >> Best regards, >> >> Irina >> >>
