Kaniska, If your kafka cluster is running 0.9, you can also try native KafkaIO <https://github.com/apache/incubator-beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L88> merged recently into Beam. It supports reading from Kafka and I am working on Sink support (this week).
See TopHashtagsExample.java <https://github.com/apache/incubator-beam/pull/142/commits/f5c809d5d3551c4fcb64bc7bcde0c64f8dd76e0a#diff-796ac0dad9e90975cfea2e2b05a90d69R121> for a complete example (it writes results back to Kafka in ParDo() rather than a Sink). To use AvroCoder, your consumer test will have something like pipline.apply(KafkaIO.read() .withBootstrapServers(options.getBroker()) .withTopics(ImmutableList.of(TOPICS)) .withValueCoder(AvroCoder.of(User.class)) ... Raghu. On Wed, Apr 27, 2016 at 10:05 AM, Maximilian Michels <[email protected]> wrote: > Hi Kaniska, > > Thanks for your mail. First of all, let us state clearly the problem > you are trying to solve. > > Do you want to use Avro serialization or Flink serialization? If you > use the TypeInformationSerializationSchema you use Flink's > serialization stack - no Avro involved then. You have to be consistent > and stick with either one. Otherwise problems are bound to happen as > the Flink serialization doesn't understand Avro's serialization and > also the other way around. > > From your initial question it appears you want to read/write Avro > serialized data from/to Kafka. So let's see how to do this: > > ============ > A custom class > ============ > > Let's assume you have a class like this: > > public class MyCounts implements Serializable { > > private String word; > private long count; > > public MyCounts() {} > > public MyCounts(String word, long count) { > this.word = word; > this.count = count; > } > > @Override > public String toString() { > return "MyCounts{" + > "word='" + word + '\'' + > ", count=" + count + > '}'; > } > } > > > ================================= > The Serialization / Deserialization Schema > ================================= > > This is the schema for the Kafka Producer/Consumer to > serialize/deserialize data: > > public class AvroSerializationDeserializationSchema<T> > implements SerializationSchema<T>, DeserializationSchema<T> { > > private final Class<T> avroType; > > private final AvroCoder<T> coder; > private transient ByteArrayOutputStream out; > > public AvroSerializationDeserializationSchema(Class<T> clazz) { > this.avroType = clazz; > this.coder = AvroCoder.of(clazz); > this.out = new ByteArrayOutputStream(); > } > > @Override > public byte[] serialize(T element) { > if (out == null) { > out = new ByteArrayOutputStream(); > } > try { > out.reset(); > coder.encode(element, out, Coder.Context.NESTED); > } catch (IOException e) { > throw new RuntimeException("Avro encoding failed.", e); > } > return out.toByteArray(); > } > > @Override > public T deserialize(byte[] message) throws IOException { > return coder.decode(new ByteArrayInputStream(message), > Coder.Context.NESTED); > } > > @Override > public boolean isEndOfStream(T nextElement) { > return false; > } > > @Override > public TypeInformation<T> getProducedType() { > return TypeExtractor.getForClass(avroType); > } > } > > ====================================== > Writing some Avro serialized data to a Kafka topic > ====================================== > > Pipeline pipeline = Pipeline.create(options); > > PCollection<MyCounts> words = > pipeline.apply(Create.of( > new MyCounts("word", 1L), > new MyCounts("another", 2L), > new MyCounts("yet another", 3L))); > > FlinkKafkaProducer08<MyCounts> kafkaSink = > new FlinkKafkaProducer08<>(options.getKafkaOutputTopic(), > new > AvroSerializationDeserializationSchema<>(MyCounts.class), props); > > words.apply(Write.to(UnboundedFlinkSink.of(kafkaSink))); > > pipeline.run(); > > Let's execute that. > > ==================================== > Reading Avro serialized data from a Kafka topic > ==================================== > > Now time to read back data from Kafka: > > Pipeline pipeline = Pipeline.create(options); > > FlinkKafkaConsumer08<MyCounts> kafkaConsumer = new > FlinkKafkaConsumer08<>( > options.getKafkaTopic(), > new AvroSerializationDeserializationSchema<>(MyCounts.class), > props); > > PCollection<MyCounts> words = pipeline > .apply(Read.from(UnboundedFlinkSource.of(kafkaConsumer))) > .apply(ParDo.of(new PrintFn())); > > pipeline.run(); > > =================== > Executing the examples > =================== > > It prints: > > MyCounts{word='word', count=1} > MyCounts{word='another', count=2} > MyCounts{word='yet another', count=3} > > > Let me know if that helps you! I've omitted the Kafka props and > options for brevity. > > I hope that we will soon have native Kafka IO for both reading and > writing to Kafka available in Beam. > > Cheers, > Max > > On Wed, Apr 27, 2016 at 4:21 AM, kaniska Mandal > <[email protected]> wrote: > > Sorry for cluttering the post with some code. > > I have attached couples of java file and one Avro-generated pojo. > > > > I am facing some issues while reading / writing data using flink's > DeSer / > > Ser schema. > > > > > > A) << producer >> BeamKafkaFlinkAvroProducerTest > > > >>> if I use KafkaProducer directly (i.e. call produceSimpleData() ) , > >>> things are working fine (just for testing ) > > > >>> Using FlinkKafkaProducer as UnboundedSource (this is what I should do) > > > > produceAvroData2() { ... > > > > 1) First, if I use >> AvroSerializationSchema schema = new > > AvroSerializationSchema(Test.class); > > > > i.e. essentially using Avro’s > org.apache.avro.specific.SpecificDatumWriter ; > > I face following error >> > > > > Caused by: java.lang.ClassCastException: java.lang.String cannot be cast > to > > org.apache.avro.generic.IndexedRecord > > > > at org.apache.avro.generic.GenericData.getField(GenericData.java:580) > > > > at org.apache.avro.generic.GenericData.getField(GenericData.java:595) > > > > at > > > org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:112) > > > > at > > > org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104) > > > > > > 2) Next, if I use TypeInformationSerializationSchema (irrespective of > > AvroCoder in Pipeline) , things apparently work fine > > > > as Kafka test consumer tool prints the message >> > > java.lang.String{"uname": "Joe", "id": 6} > > > > > > B) <<Consumer>> , BeamKafkaFlinkAvroConsumerTest > > > >>> I understand we should either use TypeInformationSerializationSchema in > >>> both consumer and producer OR > > > > should use AvroDeserializationSchema and AvroSerializationSchema in > Consumer > > and Producer respectively !! > > > > But, irrespective of using AvroDeserializationSchema or > > TypeInformationSerializationSchema, I get the following exception >> > > > > Exception in thread "main" java.lang.NullPointerException: null value in > > entry: V=null > > > > at > > > com.google.common.collect.CollectPreconditions.checkEntryNotNull(CollectPreconditions.java:33) > > > > at > > > com.google.common.collect.SingletonImmutableBiMap.<init>(SingletonImmutableBiMap.java:39) > > > > at com.google.common.collect.ImmutableBiMap.of(ImmutableBiMap.java:49) > > > > at com.google.common.collect.ImmutableMap.of(ImmutableMap.java:70) > > > > at > > > org.apache.beam.sdk.coders.CoderRegistry.getDefaultOutputCoder(CoderRegistry.java:221) > > > > > > Any clues whats wrong here ? May be missing some simple step > > .. Trying to make a simple Avro schema work here and eventually we need > to > > deal with very complex Avro schema. > > > > Much appreciate your help. > > > > Thanks > > Kaniska > > > > On Tue, Apr 26, 2016 at 9:05 AM, Maximilian Michels <[email protected]> > wrote: > >> > >> Hi Kaniska, > >> > >> To read data from Kafka, you need to supply a DeserializationSchema. > >> Here is one you could use: > >> https://gist.github.com/StephanEwen/d515e10dd1c609f70bed > >> > >> Similarly, to write data into Kafka using the Producer, you will need > >> a SerializationSchema. You need to serialize your data into bytes > >> using your Avro schema. Actually, you could use the AvroCoder which is > >> supplied in Beam for this. Or you implement your own analogue to the > >> DeserializationSchema above. > >> > >> - Max > >> > >> > >> On Tue, Apr 26, 2016 at 8:43 AM, kaniska Mandal > >> <[email protected]> wrote: > >> > Hi, > >> > > >> > I followed the example in AvroITCase.java - which reads/writes Avro > data > >> > from/to filesystem. > >> >> I don't want to update AvroIO to embed Kafka consumer / producer > >> > > >> > Also looked into - https://issues.apache.org/jira/browse/FLINK-2597 > >> >> But I couldn't instantiate TypeInformationSerializationSchema in > >> >> following > >> >> FlinkKafkaConsumer / Producer - as I am not sure how do get access to > >> >> ExecutionConfig > >> > > >> > Essentially, need help to change the following code in Beam Pipeline > >> >> to read Avro data from Kafka and deserialize into my custom object. > >> > > >> > public static UnboundedSource<String, CheckpointMark> > consumeMessages() > >> > { > >> > FlinkKafkaConsumer08<String> kafkaConsumer = new > >> > FlinkKafkaConsumer08<>(options.getKafkaTopic(), > >> > ? , props); > >> > > >> > return UnboundedFlinkSource.of(kafkaConsumer); > >> > } > >> > > >> >> and how write Avro data into kafka ? > >> > public static void produceData(){ > >> > FlinkKafkaProducer08<String> kafkaSink = > >> > new FlinkKafkaProducer08<>(TOPIC, ? , props); > >> > > >> > Pipeline pipeline = Pipeline.create(options); > >> > pipeline > >> > .apply(Create.of( > >> > new User("Joe", 3, "red"), > >> > new User("Mary", 4, "blue"), > >> > new User("Mark", 1, "green"), > >> > new User("Julia", 5, "purple")) > >> > .withCoder(AvroCoder.of(User.class))) > >> > > >> > .apply(transformationToWriteToKafkaSink()); > >> > > >> > pipeline.run(); > >> > > >> > } > >> > > >> > Thanks > >> > Kaniska > > > > >
