Oh,  does not work with 0.8.x as it it uses new consumer api in 0.9x.

On Wed, Apr 27, 2016 at 4:35 PM, kaniska Mandal <[email protected]>
wrote:

> Hi Raghu,
>
> Thanks much for the update
>
> We are using kafka 0.8.x
>
> Can you please also add a working example (beam+flink+kafka) with
> Avro-generated Pojo ?
>
> Kaniska
>
> On Wed, Apr 27, 2016 at 12:14 PM, Raghu Angadi <[email protected]> wrote:
>
>> Kaniska,
>>
>> If your kafka cluster is running 0.9, you can also try native KafkaIO
>> <https://github.com/apache/incubator-beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L88>
>> merged recently into Beam. It supports reading from Kafka and I am working
>> on Sink support (this week).
>>
>> See TopHashtagsExample.java
>> <https://github.com/apache/incubator-beam/pull/142/commits/f5c809d5d3551c4fcb64bc7bcde0c64f8dd76e0a#diff-796ac0dad9e90975cfea2e2b05a90d69R121>
>> for a complete example (it writes results back to Kafka in ParDo() rather
>> than a Sink). To use AvroCoder, your consumer test will have something like
>>
>>    pipline.apply(KafkaIO.read()
>>             .withBootstrapServers(options.getBroker())
>>             .withTopics(ImmutableList.of(TOPICS))
>>             .withValueCoder(AvroCoder.of(User.class))
>>            ...
>>
>> Raghu.
>>
>>
>>
>> On Wed, Apr 27, 2016 at 10:05 AM, Maximilian Michels <[email protected]>
>> wrote:
>>
>>> Hi Kaniska,
>>>
>>> Thanks for your mail. First of all, let us state clearly the problem
>>> you are trying to solve.
>>>
>>> Do you want to use Avro serialization or Flink serialization? If you
>>> use the TypeInformationSerializationSchema you use Flink's
>>> serialization stack - no Avro involved then. You have to be consistent
>>> and stick with either one. Otherwise problems are bound to happen as
>>> the Flink serialization doesn't understand Avro's serialization and
>>> also the other way around.
>>>
>>> From your initial question it appears you want to read/write Avro
>>> serialized data from/to Kafka. So let's see how to do this:
>>>
>>> ============
>>> A custom class
>>> ============
>>>
>>> Let's assume you have a class like this:
>>>
>>>   public class MyCounts implements Serializable {
>>>
>>>     private String word;
>>>     private long count;
>>>
>>>     public MyCounts() {}
>>>
>>>     public MyCounts(String word, long count) {
>>>       this.word = word;
>>>       this.count = count;
>>>     }
>>>
>>>     @Override
>>>     public String toString() {
>>>       return "MyCounts{" +
>>>           "word='" + word + '\'' +
>>>           ", count=" + count +
>>>           '}';
>>>     }
>>>   }
>>>
>>>
>>> =================================
>>> The Serialization / Deserialization Schema
>>> =================================
>>>
>>> This is the schema for the Kafka Producer/Consumer to
>>> serialize/deserialize data:
>>>
>>>   public class AvroSerializationDeserializationSchema<T>
>>>       implements SerializationSchema<T>, DeserializationSchema<T> {
>>>
>>>     private final Class<T> avroType;
>>>
>>>     private final AvroCoder<T> coder;
>>>     private transient ByteArrayOutputStream out;
>>>
>>>     public AvroSerializationDeserializationSchema(Class<T> clazz) {
>>>       this.avroType = clazz;
>>>       this.coder = AvroCoder.of(clazz);
>>>       this.out = new ByteArrayOutputStream();
>>>     }
>>>
>>>     @Override
>>>     public byte[] serialize(T element) {
>>>       if (out == null) {
>>>         out = new ByteArrayOutputStream();
>>>       }
>>>       try {
>>>         out.reset();
>>>         coder.encode(element, out, Coder.Context.NESTED);
>>>       } catch (IOException e) {
>>>         throw new RuntimeException("Avro encoding failed.", e);
>>>       }
>>>       return out.toByteArray();
>>>     }
>>>
>>>     @Override
>>>     public T deserialize(byte[] message) throws IOException {
>>>       return coder.decode(new ByteArrayInputStream(message),
>>> Coder.Context.NESTED);
>>>     }
>>>
>>>     @Override
>>>     public boolean isEndOfStream(T nextElement) {
>>>       return false;
>>>     }
>>>
>>>     @Override
>>>     public TypeInformation<T> getProducedType() {
>>>       return TypeExtractor.getForClass(avroType);
>>>     }
>>>   }
>>>
>>> ======================================
>>> Writing some Avro serialized data to a Kafka topic
>>> ======================================
>>>
>>>     Pipeline pipeline = Pipeline.create(options);
>>>
>>>     PCollection<MyCounts> words =
>>>         pipeline.apply(Create.of(
>>>             new MyCounts("word", 1L),
>>>             new MyCounts("another", 2L),
>>>             new MyCounts("yet another", 3L)));
>>>
>>>     FlinkKafkaProducer08<MyCounts> kafkaSink =
>>>         new FlinkKafkaProducer08<>(options.getKafkaOutputTopic(),
>>>             new
>>> AvroSerializationDeserializationSchema<>(MyCounts.class), props);
>>>
>>>     words.apply(Write.to(UnboundedFlinkSink.of(kafkaSink)));
>>>
>>>     pipeline.run();
>>>
>>> Let's execute that.
>>>
>>> ====================================
>>> Reading Avro serialized data from a Kafka topic
>>> ====================================
>>>
>>> Now time to read back data from Kafka:
>>>
>>>     Pipeline pipeline = Pipeline.create(options);
>>>
>>>     FlinkKafkaConsumer08<MyCounts> kafkaConsumer = new
>>> FlinkKafkaConsumer08<>(
>>>         options.getKafkaTopic(),
>>>         new AvroSerializationDeserializationSchema<>(MyCounts.class),
>>> props);
>>>
>>>     PCollection<MyCounts> words = pipeline
>>>         .apply(Read.from(UnboundedFlinkSource.of(kafkaConsumer)))
>>>         .apply(ParDo.of(new PrintFn()));
>>>
>>>     pipeline.run();
>>>
>>> ===================
>>> Executing the examples
>>> ===================
>>>
>>> It prints:
>>>
>>>     MyCounts{word='word', count=1}
>>>     MyCounts{word='another', count=2}
>>>     MyCounts{word='yet another', count=3}
>>>
>>>
>>> Let me know if that helps you! I've omitted the Kafka props and
>>> options for brevity.
>>>
>>> I hope that we will soon have native Kafka IO for both reading and
>>> writing to Kafka available in Beam.
>>>
>>> Cheers,
>>> Max
>>>
>>> On Wed, Apr 27, 2016 at 4:21 AM, kaniska Mandal
>>> <[email protected]> wrote:
>>> > Sorry for cluttering the post with some code.
>>> > I have attached couples of java file and one Avro-generated pojo.
>>> >
>>> > I am facing some issues while reading / writing data using  flink's
>>> DeSer /
>>> > Ser schema.
>>> >
>>> >
>>> > A) << producer >> BeamKafkaFlinkAvroProducerTest
>>> >
>>> >>> if I use  KafkaProducer directly (i.e. call produceSimpleData()  ) ,
>>> >>> things are working fine   (just for testing )
>>> >
>>> >>> Using FlinkKafkaProducer as UnboundedSource  (this is what I should
>>> do)
>>> >
>>> > produceAvroData2() { ...
>>> >
>>> > 1) First, if I use >> AvroSerializationSchema schema = new
>>> > AvroSerializationSchema(Test.class);
>>> >
>>> > i.e. essentially using Avro’s
>>> org.apache.avro.specific.SpecificDatumWriter ;
>>> > I face following error >>
>>> >
>>> > Caused by: java.lang.ClassCastException: java.lang.String cannot be
>>> cast to
>>> > org.apache.avro.generic.IndexedRecord
>>> >
>>> > at org.apache.avro.generic.GenericData.getField(GenericData.java:580)
>>> >
>>> > at org.apache.avro.generic.GenericData.getField(GenericData.java:595)
>>> >
>>> > at
>>> >
>>> org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:112)
>>> >
>>> > at
>>> >
>>> org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104)
>>> >
>>> >
>>> > 2) Next,  if I use TypeInformationSerializationSchema (irrespective of
>>> > AvroCoder in Pipeline) , things apparently work fine
>>> >
>>> > as Kafka test consumer tool prints the message  >>
>>> > java.lang.String{"uname": "Joe", "id": 6}
>>> >
>>> >
>>> > B) <<Consumer>> ,  BeamKafkaFlinkAvroConsumerTest
>>> >
>>> >>> I understand we should either use TypeInformationSerializationSchema
>>> in
>>> >>> both consumer and producer OR
>>> >
>>> > should use AvroDeserializationSchema and AvroSerializationSchema in
>>> Consumer
>>> > and Producer respectively !!
>>> >
>>> > But, irrespective of using AvroDeserializationSchema or
>>> > TypeInformationSerializationSchema, I get the following exception >>
>>> >
>>> > Exception in thread "main" java.lang.NullPointerException: null value
>>> in
>>> > entry: V=null
>>> >
>>> > at
>>> >
>>> com.google.common.collect.CollectPreconditions.checkEntryNotNull(CollectPreconditions.java:33)
>>> >
>>> > at
>>> >
>>> com.google.common.collect.SingletonImmutableBiMap.<init>(SingletonImmutableBiMap.java:39)
>>> >
>>> > at com.google.common.collect.ImmutableBiMap.of(ImmutableBiMap.java:49)
>>> >
>>> > at com.google.common.collect.ImmutableMap.of(ImmutableMap.java:70)
>>> >
>>> > at
>>> >
>>> org.apache.beam.sdk.coders.CoderRegistry.getDefaultOutputCoder(CoderRegistry.java:221)
>>> >
>>> >
>>> > Any clues whats wrong here ?   May be missing some simple step
>>> > .. Trying to make a simple Avro schema work here and eventually we
>>> need to
>>> > deal with very complex Avro schema.
>>> >
>>> > Much appreciate your help.
>>> >
>>> > Thanks
>>> > Kaniska
>>> >
>>> > On Tue, Apr 26, 2016 at 9:05 AM, Maximilian Michels <[email protected]>
>>> wrote:
>>> >>
>>> >> Hi Kaniska,
>>> >>
>>> >> To read data from Kafka, you need to supply a DeserializationSchema.
>>> >> Here is one you could use:
>>> >> https://gist.github.com/StephanEwen/d515e10dd1c609f70bed
>>> >>
>>> >> Similarly, to write data into Kafka using the Producer, you will need
>>> >> a SerializationSchema. You need to serialize your data into bytes
>>> >> using your Avro schema. Actually, you could use the AvroCoder which is
>>> >> supplied in Beam for this. Or you implement your own analogue to the
>>> >> DeserializationSchema above.
>>> >>
>>> >> - Max
>>> >>
>>> >>
>>> >> On Tue, Apr 26, 2016 at 8:43 AM, kaniska Mandal
>>> >> <[email protected]> wrote:
>>> >> > Hi,
>>> >> >
>>> >> > I followed the example in AvroITCase.java - which reads/writes Avro
>>> data
>>> >> > from/to  filesystem.
>>> >> >> I don't want to update AvroIO to embed Kafka consumer / producer
>>> >> >
>>> >> > Also looked into - https://issues.apache.org/jira/browse/FLINK-2597
>>> >> >> But I couldn't instantiate TypeInformationSerializationSchema in
>>> >> >> following
>>> >> >> FlinkKafkaConsumer / Producer - as I am not sure how do get access
>>> to
>>> >> >> ExecutionConfig
>>> >> >
>>> >> > Essentially, need help to change the following code in Beam Pipeline
>>> >> >> to read Avro data from Kafka and deserialize into my custom object.
>>> >> >
>>> >> > public static UnboundedSource<String, CheckpointMark>
>>> consumeMessages()
>>> >> > {
>>> >> >         FlinkKafkaConsumer08<String> kafkaConsumer = new
>>> >> > FlinkKafkaConsumer08<>(options.getKafkaTopic(),
>>> >> >                 ? , props);
>>> >> >
>>> >> >         return UnboundedFlinkSource.of(kafkaConsumer);
>>> >> >     }
>>> >> >
>>> >> >>  and how write Avro data into kafka ?
>>> >> > public static void produceData(){
>>> >> >         FlinkKafkaProducer08<String> kafkaSink =
>>> >> >                 new FlinkKafkaProducer08<>(TOPIC, ? , props);
>>> >> >
>>> >> >         Pipeline pipeline = Pipeline.create(options);
>>> >> >         pipeline
>>> >> >         .apply(Create.of(
>>> >> >                 new User("Joe", 3, "red"),
>>> >> >                 new User("Mary", 4, "blue"),
>>> >> >                 new User("Mark", 1, "green"),
>>> >> >                 new User("Julia", 5, "purple"))
>>> >> >             .withCoder(AvroCoder.of(User.class)))
>>> >> >
>>> >> >         .apply(transformationToWriteToKafkaSink());
>>> >> >
>>> >> >         pipeline.run();
>>> >> >
>>> >> >     }
>>> >> >
>>> >> > Thanks
>>> >> > Kaniska
>>> >
>>> >
>>>
>>
>>
>

Reply via email to