Hi Raghu,

When the KafkaIO sink code is merged , can you please add couple of
examples to demonstrate that :

1) KafkaIO works fine with FinkPipelineRunner

> the reason I am asking this; earlier when I tried to port KafkaIO code
(added a custom Sink code - copied from KafkaWriter) from spark-runner into
beam sdk; I ended up registering 'KafkaIO.Write.Bound.class'   inside
FlinkStreamingTransformTranslators
(I don't want to update the Flink-Runner core API)

> It was a dirty hack and somehow worked (didn't test Avro that time) , so
I quickly adapted Max's suggestion on FlinkUnboundedSource & Sink approach
- which is very clean and worked for simple non-avro data

> Is Max's contrib 'FlinkUnboudedSink' going to be merged as well ?

2) KafkaIO can serialize an Avro-generated POJO (e.g. sample attached here)
 into byte-array and then corresponding Sink can deserialize it
effortlessly .

> this is a very important use case for Network Industry where thousands of
Sensor-generated machine-data are converted into Avro and sent to Kafka
> so I am trying to make it work seamlessly inside Beam-Flink Pipeline ( as
explained in the other reply to the thread - based on Max's feedback )

** I am not yet able to Ser / DeSer the attached avro , even after
following Max's suggestions and even after

returning new AvroTypeInfo(avroType) , in method
AvroSerializationDeserializationSchema#getProducedType()

** I have posted the error messages in other response message


Thanks

Kaniska

On Wed, Apr 27, 2016 at 9:18 PM, Raghu Angadi <[email protected]> wrote:

> Oh,  does not work with 0.8.x as it it uses new consumer api in 0.9x.
>
> On Wed, Apr 27, 2016 at 4:35 PM, kaniska Mandal <[email protected]>
> wrote:
>
>> Hi Raghu,
>>
>> Thanks much for the update
>>
>> We are using kafka 0.8.x
>>
>> Can you please also add a working example (beam+flink+kafka) with
>> Avro-generated Pojo ?
>>
>> Kaniska
>>
>> On Wed, Apr 27, 2016 at 12:14 PM, Raghu Angadi <[email protected]>
>> wrote:
>>
>>> Kaniska,
>>>
>>> If your kafka cluster is running 0.9, you can also try native KafkaIO
>>> <https://github.com/apache/incubator-beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L88>
>>> merged recently into Beam. It supports reading from Kafka and I am working
>>> on Sink support (this week).
>>>
>>> See TopHashtagsExample.java
>>> <https://github.com/apache/incubator-beam/pull/142/commits/f5c809d5d3551c4fcb64bc7bcde0c64f8dd76e0a#diff-796ac0dad9e90975cfea2e2b05a90d69R121>
>>> for a complete example (it writes results back to Kafka in ParDo() rather
>>> than a Sink). To use AvroCoder, your consumer test will have something like
>>>
>>>    pipline.apply(KafkaIO.read()
>>>             .withBootstrapServers(options.getBroker())
>>>             .withTopics(ImmutableList.of(TOPICS))
>>>             .withValueCoder(AvroCoder.of(User.class))
>>>            ...
>>>
>>> Raghu.
>>>
>>>
>>>
>>> On Wed, Apr 27, 2016 at 10:05 AM, Maximilian Michels <[email protected]>
>>> wrote:
>>>
>>>> Hi Kaniska,
>>>>
>>>> Thanks for your mail. First of all, let us state clearly the problem
>>>> you are trying to solve.
>>>>
>>>> Do you want to use Avro serialization or Flink serialization? If you
>>>> use the TypeInformationSerializationSchema you use Flink's
>>>> serialization stack - no Avro involved then. You have to be consistent
>>>> and stick with either one. Otherwise problems are bound to happen as
>>>> the Flink serialization doesn't understand Avro's serialization and
>>>> also the other way around.
>>>>
>>>> From your initial question it appears you want to read/write Avro
>>>> serialized data from/to Kafka. So let's see how to do this:
>>>>
>>>> ============
>>>> A custom class
>>>> ============
>>>>
>>>> Let's assume you have a class like this:
>>>>
>>>>   public class MyCounts implements Serializable {
>>>>
>>>>     private String word;
>>>>     private long count;
>>>>
>>>>     public MyCounts() {}
>>>>
>>>>     public MyCounts(String word, long count) {
>>>>       this.word = word;
>>>>       this.count = count;
>>>>     }
>>>>
>>>>     @Override
>>>>     public String toString() {
>>>>       return "MyCounts{" +
>>>>           "word='" + word + '\'' +
>>>>           ", count=" + count +
>>>>           '}';
>>>>     }
>>>>   }
>>>>
>>>>
>>>> =================================
>>>> The Serialization / Deserialization Schema
>>>> =================================
>>>>
>>>> This is the schema for the Kafka Producer/Consumer to
>>>> serialize/deserialize data:
>>>>
>>>>   public class AvroSerializationDeserializationSchema<T>
>>>>       implements SerializationSchema<T>, DeserializationSchema<T> {
>>>>
>>>>     private final Class<T> avroType;
>>>>
>>>>     private final AvroCoder<T> coder;
>>>>     private transient ByteArrayOutputStream out;
>>>>
>>>>     public AvroSerializationDeserializationSchema(Class<T> clazz) {
>>>>       this.avroType = clazz;
>>>>       this.coder = AvroCoder.of(clazz);
>>>>       this.out = new ByteArrayOutputStream();
>>>>     }
>>>>
>>>>     @Override
>>>>     public byte[] serialize(T element) {
>>>>       if (out == null) {
>>>>         out = new ByteArrayOutputStream();
>>>>       }
>>>>       try {
>>>>         out.reset();
>>>>         coder.encode(element, out, Coder.Context.NESTED);
>>>>       } catch (IOException e) {
>>>>         throw new RuntimeException("Avro encoding failed.", e);
>>>>       }
>>>>       return out.toByteArray();
>>>>     }
>>>>
>>>>     @Override
>>>>     public T deserialize(byte[] message) throws IOException {
>>>>       return coder.decode(new ByteArrayInputStream(message),
>>>> Coder.Context.NESTED);
>>>>     }
>>>>
>>>>     @Override
>>>>     public boolean isEndOfStream(T nextElement) {
>>>>       return false;
>>>>     }
>>>>
>>>>     @Override
>>>>     public TypeInformation<T> getProducedType() {
>>>>       return TypeExtractor.getForClass(avroType);
>>>>     }
>>>>   }
>>>>
>>>> ======================================
>>>> Writing some Avro serialized data to a Kafka topic
>>>> ======================================
>>>>
>>>>     Pipeline pipeline = Pipeline.create(options);
>>>>
>>>>     PCollection<MyCounts> words =
>>>>         pipeline.apply(Create.of(
>>>>             new MyCounts("word", 1L),
>>>>             new MyCounts("another", 2L),
>>>>             new MyCounts("yet another", 3L)));
>>>>
>>>>     FlinkKafkaProducer08<MyCounts> kafkaSink =
>>>>         new FlinkKafkaProducer08<>(options.getKafkaOutputTopic(),
>>>>             new
>>>> AvroSerializationDeserializationSchema<>(MyCounts.class), props);
>>>>
>>>>     words.apply(Write.to(UnboundedFlinkSink.of(kafkaSink)));
>>>>
>>>>     pipeline.run();
>>>>
>>>> Let's execute that.
>>>>
>>>> ====================================
>>>> Reading Avro serialized data from a Kafka topic
>>>> ====================================
>>>>
>>>> Now time to read back data from Kafka:
>>>>
>>>>     Pipeline pipeline = Pipeline.create(options);
>>>>
>>>>     FlinkKafkaConsumer08<MyCounts> kafkaConsumer = new
>>>> FlinkKafkaConsumer08<>(
>>>>         options.getKafkaTopic(),
>>>>         new AvroSerializationDeserializationSchema<>(MyCounts.class),
>>>> props);
>>>>
>>>>     PCollection<MyCounts> words = pipeline
>>>>         .apply(Read.from(UnboundedFlinkSource.of(kafkaConsumer)))
>>>>         .apply(ParDo.of(new PrintFn()));
>>>>
>>>>     pipeline.run();
>>>>
>>>> ===================
>>>> Executing the examples
>>>> ===================
>>>>
>>>> It prints:
>>>>
>>>>     MyCounts{word='word', count=1}
>>>>     MyCounts{word='another', count=2}
>>>>     MyCounts{word='yet another', count=3}
>>>>
>>>>
>>>> Let me know if that helps you! I've omitted the Kafka props and
>>>> options for brevity.
>>>>
>>>> I hope that we will soon have native Kafka IO for both reading and
>>>> writing to Kafka available in Beam.
>>>>
>>>> Cheers,
>>>> Max
>>>>
>>>> On Wed, Apr 27, 2016 at 4:21 AM, kaniska Mandal
>>>> <[email protected]> wrote:
>>>> > Sorry for cluttering the post with some code.
>>>> > I have attached couples of java file and one Avro-generated pojo.
>>>> >
>>>> > I am facing some issues while reading / writing data using  flink's
>>>> DeSer /
>>>> > Ser schema.
>>>> >
>>>> >
>>>> > A) << producer >> BeamKafkaFlinkAvroProducerTest
>>>> >
>>>> >>> if I use  KafkaProducer directly (i.e. call produceSimpleData()  ) ,
>>>> >>> things are working fine   (just for testing )
>>>> >
>>>> >>> Using FlinkKafkaProducer as UnboundedSource  (this is what I should
>>>> do)
>>>> >
>>>> > produceAvroData2() { ...
>>>> >
>>>> > 1) First, if I use >> AvroSerializationSchema schema = new
>>>> > AvroSerializationSchema(Test.class);
>>>> >
>>>> > i.e. essentially using Avro’s
>>>> org.apache.avro.specific.SpecificDatumWriter ;
>>>> > I face following error >>
>>>> >
>>>> > Caused by: java.lang.ClassCastException: java.lang.String cannot be
>>>> cast to
>>>> > org.apache.avro.generic.IndexedRecord
>>>> >
>>>> > at org.apache.avro.generic.GenericData.getField(GenericData.java:580)
>>>> >
>>>> > at org.apache.avro.generic.GenericData.getField(GenericData.java:595)
>>>> >
>>>> > at
>>>> >
>>>> org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:112)
>>>> >
>>>> > at
>>>> >
>>>> org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104)
>>>> >
>>>> >
>>>> > 2) Next,  if I use TypeInformationSerializationSchema (irrespective of
>>>> > AvroCoder in Pipeline) , things apparently work fine
>>>> >
>>>> > as Kafka test consumer tool prints the message  >>
>>>> > java.lang.String{"uname": "Joe", "id": 6}
>>>> >
>>>> >
>>>> > B) <<Consumer>> ,  BeamKafkaFlinkAvroConsumerTest
>>>> >
>>>> >>> I understand we should either use
>>>> TypeInformationSerializationSchema in
>>>> >>> both consumer and producer OR
>>>> >
>>>> > should use AvroDeserializationSchema and AvroSerializationSchema in
>>>> Consumer
>>>> > and Producer respectively !!
>>>> >
>>>> > But, irrespective of using AvroDeserializationSchema or
>>>> > TypeInformationSerializationSchema, I get the following exception >>
>>>> >
>>>> > Exception in thread "main" java.lang.NullPointerException: null value
>>>> in
>>>> > entry: V=null
>>>> >
>>>> > at
>>>> >
>>>> com.google.common.collect.CollectPreconditions.checkEntryNotNull(CollectPreconditions.java:33)
>>>> >
>>>> > at
>>>> >
>>>> com.google.common.collect.SingletonImmutableBiMap.<init>(SingletonImmutableBiMap.java:39)
>>>> >
>>>> > at com.google.common.collect.ImmutableBiMap.of(ImmutableBiMap.java:49)
>>>> >
>>>> > at com.google.common.collect.ImmutableMap.of(ImmutableMap.java:70)
>>>> >
>>>> > at
>>>> >
>>>> org.apache.beam.sdk.coders.CoderRegistry.getDefaultOutputCoder(CoderRegistry.java:221)
>>>> >
>>>> >
>>>> > Any clues whats wrong here ?   May be missing some simple step
>>>> > .. Trying to make a simple Avro schema work here and eventually we
>>>> need to
>>>> > deal with very complex Avro schema.
>>>> >
>>>> > Much appreciate your help.
>>>> >
>>>> > Thanks
>>>> > Kaniska
>>>> >
>>>> > On Tue, Apr 26, 2016 at 9:05 AM, Maximilian Michels <[email protected]>
>>>> wrote:
>>>> >>
>>>> >> Hi Kaniska,
>>>> >>
>>>> >> To read data from Kafka, you need to supply a DeserializationSchema.
>>>> >> Here is one you could use:
>>>> >> https://gist.github.com/StephanEwen/d515e10dd1c609f70bed
>>>> >>
>>>> >> Similarly, to write data into Kafka using the Producer, you will need
>>>> >> a SerializationSchema. You need to serialize your data into bytes
>>>> >> using your Avro schema. Actually, you could use the AvroCoder which
>>>> is
>>>> >> supplied in Beam for this. Or you implement your own analogue to the
>>>> >> DeserializationSchema above.
>>>> >>
>>>> >> - Max
>>>> >>
>>>> >>
>>>> >> On Tue, Apr 26, 2016 at 8:43 AM, kaniska Mandal
>>>> >> <[email protected]> wrote:
>>>> >> > Hi,
>>>> >> >
>>>> >> > I followed the example in AvroITCase.java - which reads/writes
>>>> Avro data
>>>> >> > from/to  filesystem.
>>>> >> >> I don't want to update AvroIO to embed Kafka consumer / producer
>>>> >> >
>>>> >> > Also looked into -
>>>> https://issues.apache.org/jira/browse/FLINK-2597
>>>> >> >> But I couldn't instantiate TypeInformationSerializationSchema in
>>>> >> >> following
>>>> >> >> FlinkKafkaConsumer / Producer - as I am not sure how do get
>>>> access to
>>>> >> >> ExecutionConfig
>>>> >> >
>>>> >> > Essentially, need help to change the following code in Beam
>>>> Pipeline
>>>> >> >> to read Avro data from Kafka and deserialize into my custom
>>>> object.
>>>> >> >
>>>> >> > public static UnboundedSource<String, CheckpointMark>
>>>> consumeMessages()
>>>> >> > {
>>>> >> >         FlinkKafkaConsumer08<String> kafkaConsumer = new
>>>> >> > FlinkKafkaConsumer08<>(options.getKafkaTopic(),
>>>> >> >                 ? , props);
>>>> >> >
>>>> >> >         return UnboundedFlinkSource.of(kafkaConsumer);
>>>> >> >     }
>>>> >> >
>>>> >> >>  and how write Avro data into kafka ?
>>>> >> > public static void produceData(){
>>>> >> >         FlinkKafkaProducer08<String> kafkaSink =
>>>> >> >                 new FlinkKafkaProducer08<>(TOPIC, ? , props);
>>>> >> >
>>>> >> >         Pipeline pipeline = Pipeline.create(options);
>>>> >> >         pipeline
>>>> >> >         .apply(Create.of(
>>>> >> >                 new User("Joe", 3, "red"),
>>>> >> >                 new User("Mary", 4, "blue"),
>>>> >> >                 new User("Mark", 1, "green"),
>>>> >> >                 new User("Julia", 5, "purple"))
>>>> >> >             .withCoder(AvroCoder.of(User.class)))
>>>> >> >
>>>> >> >         .apply(transformationToWriteToKafkaSink());
>>>> >> >
>>>> >> >         pipeline.run();
>>>> >> >
>>>> >> >     }
>>>> >> >
>>>> >> > Thanks
>>>> >> > Kaniska
>>>> >
>>>> >
>>>>
>>>
>>>
>>
>

Attachment: Test.java
Description: Binary data

Reply via email to