I'm trying to write to a Kafka topic using WriteTokafka module from
apache_beam.io.kafka.
The error I get is:

> File "predict.py", line 162, in <module>
> run()
> File "predict.py", line 158, in run
> topic = 'int.fitbit_explore.video_recommendations'))
> File
> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py",
> line 580, in __exit__
> self.result = self.run()
> File
> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py",
> line 530, in run
> self._options).run(False)
> File
> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py",
> line 902, in from_runner_api
> p.transforms_stack = [context.transforms.get_by_id(root_transform_id)]
> File
> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
> line 116, in get_by_id
> self._id_to_proto[id], self._pipeline_context)
> File
> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py",
> line 1252, in from_runner_api
> part = context.transforms.get_by_id(transform_id)
> File
> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
> line 116, in get_by_id
> self._id_to_proto[id], self._pipeline_context)
> File
> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py",
> line 1252, in from_runner_api
> part = context.transforms.get_by_id(transform_id)
> File
> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
> line 116, in get_by_id
> self._id_to_proto[id], self._pipeline_context)
> File
> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py",
> line 1252, in from_runner_api
> part = context.transforms.get_by_id(transform_id)
> File
> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
> line 116, in get_by_id
> self._id_to_proto[id], self._pipeline_context)
> File
> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py",
> line 1252, in from_runner_api
> part = context.transforms.get_by_id(transform_id)
> File
> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
> line 116, in get_by_id
> self._id_to_proto[id], self._pipeline_context)
> File
> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py",
> line 1229, in from_runner_api
> transform = ptransform.PTransform.from_runner_api(proto, context)
> File
> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/transforms/ptransform.py",
> line 733, in from_runner_api
> context)
> File
> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/transforms/core.py",
> line 1420, in from_runner_api_parameter
> pardo_payload.do_fn, context).serialized_dofn_data())
> File
> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/transforms/core.py",
> line 1493, in from_runner_api
> raise ValueError('Unexpected DoFn type: %s' % spec.urn)
> ValueError: Unexpected DoFn type: beam:dofn:javasdk:0.1


The pipeline looks like this:

> pipeline_options = PipelineOptions(argv)
> with beam.Pipeline(options=pipeline_options) as p:
> _ = (p | 'Create' >> beam.Create(['Start'])
> | 'Read MDAU' >>
> beam.io.textio.ReadFromText("gs://fit-recommend-system-testpy/saved_model/dummy_mdau.txt")
> | 'Predict' >> beam.ParDo(PredictDoFn())
> | 'EncodeThrift' >> beam.ParDo(ThriftEncodeDoFn())
> | 'WriteToKafka' >> WriteToKafka(producer_config = {'bootstrap.servers' :
> '<fitbit-bootstrap-server>:9092'},
> topic = '<internal_fitbit_topic>'))


I replaced the bootstrap server and topic values with placeholders here
because I'm not sure if I should show them or not.

The ThriftEncodeDoFn function seems to work. It produces a tuple of bytes
and it looks like this:

class ThriftEncodeDoFn(beam.DoFn):  def encode(self, element):
    video = VideosAndRatings()
    video.videoId = str(element['videoId'])
    video.rating = 5
    video.index = 1
    videosList = [video]
    recommendations = RecommendationsKafkaMessage()
recommendations.userId = str(element['userId'])
    recommendations.videos = videosList
    recommendations.category = "DISCOVER_WORKOUTS"
print(recommendations.userId, recommendations.category)
    trans = TTransport.TMemoryBuffer()
    proto = TBinaryProtocol.TBinaryProtocol(trans)
    recommendations.write(proto)    encoded_data = bytes(trans.getvalue())
    encoded_key = str(element['userId']).encode()    return
encoded_key, encoded_data

  def process(self, element) -> Iterable[Tuple[bytes,bytes]]:
    try:
      encoded_key, encoded_data = self.encode(element)
      yield (encoded_key, encoded_data)
    except Exception as e:
      print("encoding didn't work", e)
      yield TaggedOutput('encode_errors', f'element={element}, error={e}')

The command I use to run the pipeline is this:

python3 predict.py \
  --work-dir gs://fit-recommend-system-testpy/saved_model \
  --batch \
  --project fit-recommend-system-int \
  --runner DataflowRunner \
  --setup_file ./setup.py \
  --subnetwork https://www.googleapis.com/compute/v1/projects/<
<https://www.googleapis.com/compute/v1/projects/fit-networking-glob/regions/us-central1/subnetworks/fit-networking-glob>fitbit-internal-subnetwork>
\
  --job_name prediction \
  --region us-central1 \
  --temp_location gs://fit-recommend-system-testpy/temp \
  --staging_location gs://fit-recommend-system-testpy/staging \
  --no_use_public_ips \
  --sdk_harness_container_image_overrides
".*java.*,gcr.io/cloud-dataflow/v1beta3/beam_java8_sdk:2.26.0" \
  --service_account_email
[email protected]

And I have installed apache beam with python3 -m pip install
apache_beam[gcp]==2.26.0.

Any help with this is much appreciated!

Best regards,

Irina

Reply via email to