Hi Kaniska,
Thanks for your mail. First of all, let us state clearly the problem
you are trying to solve.
Do you want to use Avro serialization or Flink serialization? If you
use the TypeInformationSerializationSchema you use Flink's
serialization stack - no Avro involved then. You have to be consistent
and stick with either one. Otherwise problems are bound to happen as
the Flink serialization doesn't understand Avro's serialization and
also the other way around.
>From your initial question it appears you want to read/write Avro
serialized data from/to Kafka. So let's see how to do this:
============
A custom class
============
Let's assume you have a class like this:
public class MyCounts implements Serializable {
private String word;
private long count;
public MyCounts() {}
public MyCounts(String word, long count) {
this.word = word;
this.count = count;
}
@Override
public String toString() {
return "MyCounts{" +
"word='" + word + '\'' +
", count=" + count +
'}';
}
}
=================================
The Serialization / Deserialization Schema
=================================
This is the schema for the Kafka Producer/Consumer to
serialize/deserialize data:
public class AvroSerializationDeserializationSchema<T>
implements SerializationSchema<T>, DeserializationSchema<T> {
private final Class<T> avroType;
private final AvroCoder<T> coder;
private transient ByteArrayOutputStream out;
public AvroSerializationDeserializationSchema(Class<T> clazz) {
this.avroType = clazz;
this.coder = AvroCoder.of(clazz);
this.out = new ByteArrayOutputStream();
}
@Override
public byte[] serialize(T element) {
if (out == null) {
out = new ByteArrayOutputStream();
}
try {
out.reset();
coder.encode(element, out, Coder.Context.NESTED);
} catch (IOException e) {
throw new RuntimeException("Avro encoding failed.", e);
}
return out.toByteArray();
}
@Override
public T deserialize(byte[] message) throws IOException {
return coder.decode(new ByteArrayInputStream(message),
Coder.Context.NESTED);
}
@Override
public boolean isEndOfStream(T nextElement) {
return false;
}
@Override
public TypeInformation<T> getProducedType() {
return TypeExtractor.getForClass(avroType);
}
}
======================================
Writing some Avro serialized data to a Kafka topic
======================================
Pipeline pipeline = Pipeline.create(options);
PCollection<MyCounts> words =
pipeline.apply(Create.of(
new MyCounts("word", 1L),
new MyCounts("another", 2L),
new MyCounts("yet another", 3L)));
FlinkKafkaProducer08<MyCounts> kafkaSink =
new FlinkKafkaProducer08<>(options.getKafkaOutputTopic(),
new
AvroSerializationDeserializationSchema<>(MyCounts.class), props);
words.apply(Write.to(UnboundedFlinkSink.of(kafkaSink)));
pipeline.run();
Let's execute that.
====================================
Reading Avro serialized data from a Kafka topic
====================================
Now time to read back data from Kafka:
Pipeline pipeline = Pipeline.create(options);
FlinkKafkaConsumer08<MyCounts> kafkaConsumer = new FlinkKafkaConsumer08<>(
options.getKafkaTopic(),
new AvroSerializationDeserializationSchema<>(MyCounts.class), props);
PCollection<MyCounts> words = pipeline
.apply(Read.from(UnboundedFlinkSource.of(kafkaConsumer)))
.apply(ParDo.of(new PrintFn()));
pipeline.run();
===================
Executing the examples
===================
It prints:
MyCounts{word='word', count=1}
MyCounts{word='another', count=2}
MyCounts{word='yet another', count=3}
Let me know if that helps you! I've omitted the Kafka props and
options for brevity.
I hope that we will soon have native Kafka IO for both reading and
writing to Kafka available in Beam.
Cheers,
Max
On Wed, Apr 27, 2016 at 4:21 AM, kaniska Mandal
<[email protected]> wrote:
> Sorry for cluttering the post with some code.
> I have attached couples of java file and one Avro-generated pojo.
>
> I am facing some issues while reading / writing data using flink's DeSer /
> Ser schema.
>
>
> A) << producer >> BeamKafkaFlinkAvroProducerTest
>
>>> if I use KafkaProducer directly (i.e. call produceSimpleData() ) ,
>>> things are working fine (just for testing )
>
>>> Using FlinkKafkaProducer as UnboundedSource (this is what I should do)
>
> produceAvroData2() { ...
>
> 1) First, if I use >> AvroSerializationSchema schema = new
> AvroSerializationSchema(Test.class);
>
> i.e. essentially using Avro’s org.apache.avro.specific.SpecificDatumWriter ;
> I face following error >>
>
> Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to
> org.apache.avro.generic.IndexedRecord
>
> at org.apache.avro.generic.GenericData.getField(GenericData.java:580)
>
> at org.apache.avro.generic.GenericData.getField(GenericData.java:595)
>
> at
> org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:112)
>
> at
> org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104)
>
>
> 2) Next, if I use TypeInformationSerializationSchema (irrespective of
> AvroCoder in Pipeline) , things apparently work fine
>
> as Kafka test consumer tool prints the message >>
> java.lang.String{"uname": "Joe", "id": 6}
>
>
> B) <<Consumer>> , BeamKafkaFlinkAvroConsumerTest
>
>>> I understand we should either use TypeInformationSerializationSchema in
>>> both consumer and producer OR
>
> should use AvroDeserializationSchema and AvroSerializationSchema in Consumer
> and Producer respectively !!
>
> But, irrespective of using AvroDeserializationSchema or
> TypeInformationSerializationSchema, I get the following exception >>
>
> Exception in thread "main" java.lang.NullPointerException: null value in
> entry: V=null
>
> at
> com.google.common.collect.CollectPreconditions.checkEntryNotNull(CollectPreconditions.java:33)
>
> at
> com.google.common.collect.SingletonImmutableBiMap.<init>(SingletonImmutableBiMap.java:39)
>
> at com.google.common.collect.ImmutableBiMap.of(ImmutableBiMap.java:49)
>
> at com.google.common.collect.ImmutableMap.of(ImmutableMap.java:70)
>
> at
> org.apache.beam.sdk.coders.CoderRegistry.getDefaultOutputCoder(CoderRegistry.java:221)
>
>
> Any clues whats wrong here ? May be missing some simple step
> .. Trying to make a simple Avro schema work here and eventually we need to
> deal with very complex Avro schema.
>
> Much appreciate your help.
>
> Thanks
> Kaniska
>
> On Tue, Apr 26, 2016 at 9:05 AM, Maximilian Michels <[email protected]> wrote:
>>
>> Hi Kaniska,
>>
>> To read data from Kafka, you need to supply a DeserializationSchema.
>> Here is one you could use:
>> https://gist.github.com/StephanEwen/d515e10dd1c609f70bed
>>
>> Similarly, to write data into Kafka using the Producer, you will need
>> a SerializationSchema. You need to serialize your data into bytes
>> using your Avro schema. Actually, you could use the AvroCoder which is
>> supplied in Beam for this. Or you implement your own analogue to the
>> DeserializationSchema above.
>>
>> - Max
>>
>>
>> On Tue, Apr 26, 2016 at 8:43 AM, kaniska Mandal
>> <[email protected]> wrote:
>> > Hi,
>> >
>> > I followed the example in AvroITCase.java - which reads/writes Avro data
>> > from/to filesystem.
>> >> I don't want to update AvroIO to embed Kafka consumer / producer
>> >
>> > Also looked into - https://issues.apache.org/jira/browse/FLINK-2597
>> >> But I couldn't instantiate TypeInformationSerializationSchema in
>> >> following
>> >> FlinkKafkaConsumer / Producer - as I am not sure how do get access to
>> >> ExecutionConfig
>> >
>> > Essentially, need help to change the following code in Beam Pipeline
>> >> to read Avro data from Kafka and deserialize into my custom object.
>> >
>> > public static UnboundedSource<String, CheckpointMark> consumeMessages()
>> > {
>> > FlinkKafkaConsumer08<String> kafkaConsumer = new
>> > FlinkKafkaConsumer08<>(options.getKafkaTopic(),
>> > ? , props);
>> >
>> > return UnboundedFlinkSource.of(kafkaConsumer);
>> > }
>> >
>> >> and how write Avro data into kafka ?
>> > public static void produceData(){
>> > FlinkKafkaProducer08<String> kafkaSink =
>> > new FlinkKafkaProducer08<>(TOPIC, ? , props);
>> >
>> > Pipeline pipeline = Pipeline.create(options);
>> > pipeline
>> > .apply(Create.of(
>> > new User("Joe", 3, "red"),
>> > new User("Mary", 4, "blue"),
>> > new User("Mark", 1, "green"),
>> > new User("Julia", 5, "purple"))
>> > .withCoder(AvroCoder.of(User.class)))
>> >
>> > .apply(transformationToWriteToKafkaSink());
>> >
>> > pipeline.run();
>> >
>> > }
>> >
>> > Thanks
>> > Kaniska
>
>