Thank you guys for the reply. I am really stuck and could not proceed
further.
Yes, the previous trial published message had null key.
But when I send key:value pair through producer using
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic
mytopic --property *"parse.key=true" --property "key.separator=:"*
> tryKey:tryValue
I do not get any error but beam does not print the received message. Here
is how my pipeline looks like,
result = (
pipeline
| "Read from kafka" >> ReadFromKafka(
consumer_config={
"bootstrap.servers": 'localhost:9092',
},
topics=['mytopic'],
expansion_service='localhost:8097',
| "print" >> beam.Map(print)
)
If this is not the way we make beam and kafka communicate then please share
a working example which showcases how a message published in kafka gets
received by beam while streaming.
Regards,
Ayush Sharma
On Fri, Jul 17, 2020 at 11:39 PM Chamikara Jayalath <[email protected]>
wrote:
> Yes, seems like this is due to the key being null. XLang KafkaIO has to be
> updated to support this. You should not run into this error if you publish
> keys and values that are not null.
>
>
>
>
> On Fri, Jul 17, 2020 at 8:04 PM Luke Cwik <[email protected]> wrote:
>
>> +dev <[email protected]>
>>
>> On Fri, Jul 17, 2020 at 8:03 PM Luke Cwik <[email protected]> wrote:
>>
>>> +Heejong Lee <[email protected]> +Chamikara Jayalath
>>> <[email protected]>
>>>
>>> Do you know if your trial record has an empty key or value?
>>> If so, then you hit a bug and it seems as though there was a miss
>>> supporting this usecase.
>>>
>>> Heejong and Cham,
>>> It looks like the Javadoc for ByteArrayDeserializer and other
>>> Deserializers can return null[1, 2] and we aren't using
>>> NullableCoder.of(ByteArrayCoder.of()) in the expansion[3]. Note that the
>>> non XLang KafkaIO does this correctly in its regular coder inference
>>> logic[4]. I flied BEAM-10529[5]
>>>
>>> 1:
>>> https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/ByteArrayDeserializer.html#deserialize-java.lang.String-byte:A-
>>> 2:
>>> https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/StringDeserializer.html#deserialize-java.lang.String-byte:A-
>>> 3:
>>> https://github.com/apache/beam/blob/af2d6b0379d64b522ecb769d88e9e7e7b8900208/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L478
>>> 4:
>>> https://github.com/apache/beam/blob/af2d6b0379d64b522ecb769d88e9e7e7b8900208/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/LocalDeserializerProvider.java#L85
>>> 5: https://issues.apache.org/jira/browse/BEAM-10529
>>>
>>>
>>> On Fri, Jul 17, 2020 at 8:51 AM ayush sharma <[email protected]>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I am trying to build a streaming beam pipeline in python which should
>>>> capture messages from kafka and then execute further stages of data
>>>> fetching from other sources and aggregation. The step-by-step process of
>>>> what I have built till now is:
>>>>
>>>> 1.
>>>>
>>>> Running Kafka instance on localhost:9092
>>>>
>>>> ./bin/kafka-server-start.sh ./config/server.properties
>>>> 2.
>>>>
>>>> Run beam-flink job server using docker
>>>>
>>>> docker run --net=host apache/beam_flink1.10_job_server:latest
>>>> 3.
>>>>
>>>> Run beam-kafka pipeline
>>>>
>>>> import apache_beam as beamfrom apache_beam.io.external.kafka import
>>>> ReadFromKafka, WriteToKafkafrom apache_beam.options.pipeline_options
>>>> import PipelineOptions, StandardOptions
>>>>
>>>> if __name__ == '__main__':
>>>> options = PipelineOptions([
>>>> "--job_endpoint=localhost:8099",
>>>> "--environment_type=LOOPBACK",
>>>> "--streaming",
>>>> "--environment_config={\"command\":\"/opt/apache/beam/boot\"}",
>>>> ])
>>>>
>>>> options = options.view_as(StandardOptions)
>>>> options.streaming = True
>>>>
>>>> pipeline = beam.Pipeline(options=options)
>>>>
>>>> result = (
>>>> pipeline
>>>>
>>>> | "Read from kafka" >> ReadFromKafka(
>>>> consumer_config={
>>>> "bootstrap.servers": 'localhost:9092',
>>>> },
>>>> topics=['mytopic'],
>>>> expansion_service='localhost:8097',
>>>> )
>>>>
>>>> | beam.Map(print)
>>>> )
>>>>
>>>> pipeline.run()
>>>>
>>>>
>>>> 1. Publish new message using kafka-producer.sh
>>>>
>>>> ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic
>>>> mytopic>tryme
>>>>
>>>> After publishing this trial message, the beam pipeline perceives the
>>>> message but crashes giving this error:
>>>>
>>>> RuntimeError: org.apache.beam.sdk.util.UserCodeException:
>>>> org.apache.beam.sdk.coders.CoderException: cannot encode a null byte[]
>>>> at
>>>> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
>>>> at
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1014)
>>>> at
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:132)
>>>> at
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1483)
>>>> at
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1478)
>>>> at
>>>> org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1.processElement(KafkaIO.java:1042)
>>>> at
>>>> org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1$DoFnInvoker.invokeProcessElement(Unknown
>>>> Source)
>>>> at
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:740)
>>>> at
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$700(FnApiDoFnRunner.java:132)
>>>> at
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$1(FnApiDoFnRunner.java:203)
>>>> at org.apache.beam
>>>>
>>>> Regards,
>>>>
>>>> Ayush Sharma.
>>>>
>>>>