Hi,
I am trying to build a streaming beam pipeline in python which should
capture messages from kafka and then execute further stages of data
fetching from other sources and aggregation. The step-by-step process of
what I have built till now is:
1.
Running Kafka instance on localhost:9092
./bin/kafka-server-start.sh ./config/server.properties
2.
Run beam-flink job server using docker
docker run --net=host apache/beam_flink1.10_job_server:latest
3.
Run beam-kafka pipeline
import apache_beam as beamfrom apache_beam.io.external.kafka import
ReadFromKafka, WriteToKafkafrom apache_beam.options.pipeline_options
import PipelineOptions, StandardOptions
if __name__ == '__main__':
options = PipelineOptions([
"--job_endpoint=localhost:8099",
"--environment_type=LOOPBACK",
"--streaming",
"--environment_config={\"command\":\"/opt/apache/beam/boot\"}",
])
options = options.view_as(StandardOptions)
options.streaming = True
pipeline = beam.Pipeline(options=options)
result = (
pipeline
| "Read from kafka" >> ReadFromKafka(
consumer_config={
"bootstrap.servers": 'localhost:9092',
},
topics=['mytopic'],
expansion_service='localhost:8097',
)
| beam.Map(print)
)
pipeline.run()
1. Publish new message using kafka-producer.sh
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic
mytopic>tryme
After publishing this trial message, the beam pipeline perceives the
message but crashes giving this error:
RuntimeError: org.apache.beam.sdk.util.UserCodeException:
org.apache.beam.sdk.coders.CoderException: cannot encode a null byte[]
at
org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
at
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1014)
at
org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:132)
at
org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1483)
at
org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1478)
at
org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1.processElement(KafkaIO.java:1042)
at
org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1$DoFnInvoker.invokeProcessElement(Unknown
Source)
at
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:740)
at
org.apache.beam.fn.harness.FnApiDoFnRunner.access$700(FnApiDoFnRunner.java:132)
at
org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$1(FnApiDoFnRunner.java:203)
at org.apache.beam
Regards,
Ayush Sharma.