Hi Max, Thanks again for pointing out that I missed the AvroCoder inside SerDeserSchema
Can you please try the attached avro-generated pojo Test.java ?
>> just to identify if there is any issue with SerDeserSchema or I am
missing any dependency
>> note that I am still referring to your branch of flink-runner (I
checked out 5 days back) - to resolve FlinkUnboundedSink (btw is it merged
back to main ? / when its scheduled to be merged )
*Here goes the current results :*
(A) Data Producer :
I just made the required changes in producer code per your suggestion
But when I am trying to produce data for the attached Avro pojo now , I am
facing the following exception -
Caused by: java.lang.RuntimeException: Could not forward element to next
operator
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(
OperatorChain.java:354)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(
OperatorChain.java:337)
at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(
TimestampedCollector.java:51)
at
org.apache.beam.runners.flink.translation.FlinkStreamingTransformTranslators$WriteSinkStreamingTranslator$1.flatMap(
FlinkStreamingTransformTranslators.java:215)
at
org.apache.beam.runners.flink.translation.FlinkStreamingTransformTranslators$WriteSinkStreamingTranslator$1.flatMap(
FlinkStreamingTransformTranslators.java:212)
at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(
StreamFlatMap.java:48)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(
OperatorChain.java:351)
... 9 more
Caused by: java.lang.NullPointerException: in com.xyz.schemas.Test null of
com.xyz.schemas.Test
at org.apache.avro.reflect.ReflectDatumWriter.write(
ReflectDatumWriter.java:145)
at org.apache.avro.generic.GenericDatumWriter.write(
GenericDatumWriter.java:58)
at org.apache.beam.sdk.coders.AvroCoder.encode(AvroCoder.java:265)
at
com.xyz.topology.netflow.beam.AvroSerializationDeserializationSchema.serialize(
AvroSerializationDeserializationSchema.java:41)
at
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper.serializeValue(
KeyedSerializationSchemaWrapper.java:41)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.invoke(
FlinkKafkaProducerBase.java:252)
at org.apache.flink.streaming.api.operators.StreamSink.processElement(
StreamSink.java:39)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(
OperatorChain.java:351)
** Note that, when I revert back to I used the AvroCoder along with
TypeInformationSerializationSchema
pipeline.apply(Create.of(new Test("Joe", 6))
*.withCoder(AvroCoder.of(Test.class))*).
apply(Write.to(UnboundedFlinkSink.of(kafkaSink)));
** the exception doesn't occur and the message is generated properly
(B) Data Consumer :
I made the changes per your suggestion.
==================================
AvroSerializationDeserializationSchema schema = new
AvroSerializationDeserializationSchema(Test.class);
FlinkKafkaConsumer08<Test> kafkaConsumer = new FlinkKafkaConsumer08<>(TOPIC,
schema, props);
PCollection<Test> users = pipeline.apply(
Read.named("StreamingWordCount").from(new UnboundedFlinkSource.of(
kafkaConsumer))
PCollection<Long> counts = users.apply(ParDo.of(new PrintFn()));
PipelineResult result = pipeline.run();
** Then , executed the Producer with my original code -> so that the
messages are generated properly ...
kafkaSink uses TypeInformationSerializationSchema
pipeline.apply(Create.of(new Test("Joe", 6))
*.withCoder(AvroCoder.of(Test.class))*).
apply(Write.to(UnboundedFlinkSink.of(kafkaSink)));
**
I still get the error on the received side ...
Exception in thread "main" java.lang.NullPointerException: null value in
entry: InputT=null
at com.google.common.collect.CollectPreconditions.checkEntryNotNull(
CollectPreconditions.java:33)
at com.google.common.collect.SingletonImmutableBiMap.<init>(
SingletonImmutableBiMap.java:39)
at com.google.common.collect.ImmutableBiMap.of(ImmutableBiMap.java:49)
at com.google.common.collect.ImmutableMap.of(ImmutableMap.java:70)
at org.apache.beam.sdk.coders.CoderRegistry.getTypeToCoderBindings(
CoderRegistry.java:809)
at org.apache.beam.sdk.coders.CoderRegistry.getDefaultCoder(
CoderRegistry.java:204)
at org.apache.beam.sdk.transforms.ParDo$Bound.getDefaultOutputCoder(
ParDo.java:792)
On Wed, Apr 27, 2016 at 10:05 AM, Maximilian Michels <[email protected]> wrote:
> Hi Kaniska,
>
> Thanks for your mail. First of all, let us state clearly the problem
> you are trying to solve.
>
> Do you want to use Avro serialization or Flink serialization? If you
> use the TypeInformationSerializationSchema you use Flink's
> serialization stack - no Avro involved then. You have to be consistent
> and stick with either one. Otherwise problems are bound to happen as
> the Flink serialization doesn't understand Avro's serialization and
> also the other way around.
>
> From your initial question it appears you want to read/write Avro
> serialized data from/to Kafka. So let's see how to do this:
>
> ============
> A custom class
> ============
>
> Let's assume you have a class like this:
>
> public class MyCounts implements Serializable {
>
> private String word;
> private long count;
>
> public MyCounts() {}
>
> public MyCounts(String word, long count) {
> this.word = word;
> this.count = count;
> }
>
> @Override
> public String toString() {
> return "MyCounts{" +
> "word='" + word + '\'' +
> ", count=" + count +
> '}';
> }
> }
>
>
> =================================
> The Serialization / Deserialization Schema
> =================================
>
> This is the schema for the Kafka Producer/Consumer to
> serialize/deserialize data:
>
> public class AvroSerializationDeserializationSchema<T>
> implements SerializationSchema<T>, DeserializationSchema<T> {
>
> private final Class<T> avroType;
>
> private final AvroCoder<T> coder;
> private transient ByteArrayOutputStream out;
>
> public AvroSerializationDeserializationSchema(Class<T> clazz) {
> this.avroType = clazz;
> this.coder = AvroCoder.of(clazz);
> this.out = new ByteArrayOutputStream();
> }
>
> @Override
> public byte[] serialize(T element) {
> if (out == null) {
> out = new ByteArrayOutputStream();
> }
> try {
> out.reset();
> coder.encode(element, out, Coder.Context.NESTED);
> } catch (IOException e) {
> throw new RuntimeException("Avro encoding failed.", e);
> }
> return out.toByteArray();
> }
>
> @Override
> public T deserialize(byte[] message) throws IOException {
> return coder.decode(new ByteArrayInputStream(message),
> Coder.Context.NESTED);
> }
>
> @Override
> public boolean isEndOfStream(T nextElement) {
> return false;
> }
>
> @Override
> public TypeInformation<T> getProducedType() {
> return TypeExtractor.getForClass(avroType);
> }
> }
>
> ======================================
> Writing some Avro serialized data to a Kafka topic
> ======================================
>
> Pipeline pipeline = Pipeline.create(options);
>
> PCollection<MyCounts> words =
> pipeline.apply(Create.of(
> new MyCounts("word", 1L),
> new MyCounts("another", 2L),
> new MyCounts("yet another", 3L)));
>
> FlinkKafkaProducer08<MyCounts> kafkaSink =
> new FlinkKafkaProducer08<>(options.getKafkaOutputTopic(),
> new
> AvroSerializationDeserializationSchema<>(MyCounts.class), props);
>
> words.apply(Write.to(UnboundedFlinkSink.of(kafkaSink)));
>
> pipeline.run();
>
> Let's execute that.
>
> ====================================
> Reading Avro serialized data from a Kafka topic
> ====================================
>
> Now time to read back data from Kafka:
>
> Pipeline pipeline = Pipeline.create(options);
>
> FlinkKafkaConsumer08<MyCounts> kafkaConsumer = new
> FlinkKafkaConsumer08<>(
> options.getKafkaTopic(),
> new AvroSerializationDeserializationSchema<>(MyCounts.class),
> props);
>
> PCollection<MyCounts> words = pipeline
> .apply(Read.from(UnboundedFlinkSource.of(kafkaConsumer)))
> .apply(ParDo.of(new PrintFn()));
>
> pipeline.run();
>
> ===================
> Executing the examples
> ===================
>
> It prints:
>
> MyCounts{word='word', count=1}
> MyCounts{word='another', count=2}
> MyCounts{word='yet another', count=3}
>
>
> Let me know if that helps you! I've omitted the Kafka props and
> options for brevity.
>
> I hope that we will soon have native Kafka IO for both reading and
> writing to Kafka available in Beam.
>
> Cheers,
> Max
>
> On Wed, Apr 27, 2016 at 4:21 AM, kaniska Mandal
> <[email protected]> wrote:
> > Sorry for cluttering the post with some code.
> > I have attached couples of java file and one Avro-generated pojo.
> >
> > I am facing some issues while reading / writing data using flink's
> DeSer /
> > Ser schema.
> >
> >
> > A) << producer >> BeamKafkaFlinkAvroProducerTest
> >
> >>> if I use KafkaProducer directly (i.e. call produceSimpleData() ) ,
> >>> things are working fine (just for testing )
> >
> >>> Using FlinkKafkaProducer as UnboundedSource (this is what I should do)
> >
> > produceAvroData2() { ...
> >
> > 1) First, if I use >> AvroSerializationSchema schema = new
> > AvroSerializationSchema(Test.class);
> >
> > i.e. essentially using Avro’s
> org.apache.avro.specific.SpecificDatumWriter ;
> > I face following error >>
> >
> > Caused by: java.lang.ClassCastException: java.lang.String cannot be cast
> to
> > org.apache.avro.generic.IndexedRecord
> >
> > at org.apache.avro.generic.GenericData.getField(GenericData.java:580)
> >
> > at org.apache.avro.generic.GenericData.getField(GenericData.java:595)
> >
> > at
> >
> org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:112)
> >
> > at
> >
> org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104)
> >
> >
> > 2) Next, if I use TypeInformationSerializationSchema (irrespective of
> > AvroCoder in Pipeline) , things apparently work fine
> >
> > as Kafka test consumer tool prints the message >>
> > java.lang.String{"uname": "Joe", "id": 6}
> >
> >
> > B) <<Consumer>> , BeamKafkaFlinkAvroConsumerTest
> >
> >>> I understand we should either use TypeInformationSerializationSchema in
> >>> both consumer and producer OR
> >
> > should use AvroDeserializationSchema and AvroSerializationSchema in
> Consumer
> > and Producer respectively !!
> >
> > But, irrespective of using AvroDeserializationSchema or
> > TypeInformationSerializationSchema, I get the following exception >>
> >
> > Exception in thread "main" java.lang.NullPointerException: null value in
> > entry: V=null
> >
> > at
> >
> com.google.common.collect.CollectPreconditions.checkEntryNotNull(CollectPreconditions.java:33)
> >
> > at
> >
> com.google.common.collect.SingletonImmutableBiMap.<init>(SingletonImmutableBiMap.java:39)
> >
> > at com.google.common.collect.ImmutableBiMap.of(ImmutableBiMap.java:49)
> >
> > at com.google.common.collect.ImmutableMap.of(ImmutableMap.java:70)
> >
> > at
> >
> org.apache.beam.sdk.coders.CoderRegistry.getDefaultOutputCoder(CoderRegistry.java:221)
> >
> >
> > Any clues whats wrong here ? May be missing some simple step
> > .. Trying to make a simple Avro schema work here and eventually we need
> to
> > deal with very complex Avro schema.
> >
> > Much appreciate your help.
> >
> > Thanks
> > Kaniska
> >
> > On Tue, Apr 26, 2016 at 9:05 AM, Maximilian Michels <[email protected]>
> wrote:
> >>
> >> Hi Kaniska,
> >>
> >> To read data from Kafka, you need to supply a DeserializationSchema.
> >> Here is one you could use:
> >> https://gist.github.com/StephanEwen/d515e10dd1c609f70bed
> >>
> >> Similarly, to write data into Kafka using the Producer, you will need
> >> a SerializationSchema. You need to serialize your data into bytes
> >> using your Avro schema. Actually, you could use the AvroCoder which is
> >> supplied in Beam for this. Or you implement your own analogue to the
> >> DeserializationSchema above.
> >>
> >> - Max
> >>
> >>
> >> On Tue, Apr 26, 2016 at 8:43 AM, kaniska Mandal
> >> <[email protected]> wrote:
> >> > Hi,
> >> >
> >> > I followed the example in AvroITCase.java - which reads/writes Avro
> data
> >> > from/to filesystem.
> >> >> I don't want to update AvroIO to embed Kafka consumer / producer
> >> >
> >> > Also looked into - https://issues.apache.org/jira/browse/FLINK-2597
> >> >> But I couldn't instantiate TypeInformationSerializationSchema in
> >> >> following
> >> >> FlinkKafkaConsumer / Producer - as I am not sure how do get access to
> >> >> ExecutionConfig
> >> >
> >> > Essentially, need help to change the following code in Beam Pipeline
> >> >> to read Avro data from Kafka and deserialize into my custom object.
> >> >
> >> > public static UnboundedSource<String, CheckpointMark>
> consumeMessages()
> >> > {
> >> > FlinkKafkaConsumer08<String> kafkaConsumer = new
> >> > FlinkKafkaConsumer08<>(options.getKafkaTopic(),
> >> > ? , props);
> >> >
> >> > return UnboundedFlinkSource.of(kafkaConsumer);
> >> > }
> >> >
> >> >> and how write Avro data into kafka ?
> >> > public static void produceData(){
> >> > FlinkKafkaProducer08<String> kafkaSink =
> >> > new FlinkKafkaProducer08<>(TOPIC, ? , props);
> >> >
> >> > Pipeline pipeline = Pipeline.create(options);
> >> > pipeline
> >> > .apply(Create.of(
> >> > new User("Joe", 3, "red"),
> >> > new User("Mary", 4, "blue"),
> >> > new User("Mark", 1, "green"),
> >> > new User("Julia", 5, "purple"))
> >> > .withCoder(AvroCoder.of(User.class)))
> >> >
> >> > .apply(transformationToWriteToKafkaSink());
> >> >
> >> > pipeline.run();
> >> >
> >> > }
> >> >
> >> > Thanks
> >> > Kaniska
> >
> >
>
Test.java
Description: Binary data
