Kafka Version is 0.10.0

On Tue, Aug 29, 2017 at 6:43 AM, Sridhar Chellappa <flinken...@gmail.com>
wrote:

> 1.3.0
>
> On Mon, Aug 28, 2017 at 10:09 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>
>> Which Flink version are you using (so that line numbers can be matched
>> with source code) ?
>>
>> On Mon, Aug 28, 2017 at 9:16 AM, Sridhar Chellappa <flinken...@gmail.com>
>> wrote:
>>
>>> DataStream<MyKafkaMessage> MyKafkaMessageDataStream = env.addSource(
>>>                 getStreamSource(env, parameterTool);
>>>         );
>>>
>>>
>>>
>>>         public RichParallelSourceFunction<MyKafkaMessage>
>>> getStreamSource(StreamExecutionEnvironment env, ParameterTool
>>> parameterTool) {
>>>
>>>            // MyKAfkaMessage is a ProtoBuf message
>>>             
>>> env.getConfig().registerTypeWithKryoSerializer(MyKafkaMessage.class,
>>> ProtobufSerializer.class);
>>>
>>>             KafkaDataSource<MyKafkaMessage> flinkCepConsumer =
>>>                     new KafkaDataSource<MyKafkaMessage>(parameterTool,
>>> new MyKafkaMessageSerDeSchema());
>>>
>>>             return flinkCepConsumer;
>>>         }
>>>
>>>
>>> public class KafkaDataSource<T> extends FlinkKafkaConsumer010<T> {
>>>
>>>     public KafkaDataSource(ParameterTool parameterTool,
>>> DeserializationSchema<T> deserializer) {
>>>         super(
>>>                 Arrays.asList(parameterTool.ge
>>> tRequired("topic").split(",")),
>>>                 deserializer,
>>>                 parameterTool.getProperties()
>>>         );
>>>
>>>     }
>>>
>>> }
>>>
>>> public class MyKafkaMessageSerDeSchema implements
>>> DeserializationSchema<MyKafkaMessage>, SerializationSchema<MyKafkaMessage>
>>> {
>>>
>>>     @Override
>>>     public MyKafkaMessage deserialize(byte[] message) throws IOException
>>> {
>>>         MyKafkaMessage MyKafkaMessage = null;
>>>         try {
>>>             MyKafkaMessage =  MyKafkaMessage.parseFrom(message);
>>>         } catch (InvalidProtocolBufferException e) {
>>>             e.printStackTrace();
>>>         } finally {
>>>             return MyKafkaMessage;
>>>         }
>>>     }
>>>
>>>     @Override
>>>     public boolean isEndOfStream(MyKafkaMessage nextElement) {
>>>         return false;
>>>     }
>>>
>>>     @Override
>>>     public TypeInformation<MyKafkaMessage> getProducedType() {
>>>         return null;
>>>     }
>>>
>>>     @Override
>>>     public byte[] serialize(MyKafkaMessage element) {
>>>         return new byte[0];
>>>     }
>>> }
>>>
>>> On Mon, Aug 28, 2017 at 8:26 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>>
>>>> Which version of Flink / Kafka are you using ?
>>>>
>>>> Can you show the snippet of code where you create the DataStream ?
>>>>
>>>> Cheers
>>>>
>>>> On Mon, Aug 28, 2017 at 7:38 AM, Sridhar Chellappa <
>>>> flinken...@gmail.com> wrote:
>>>>
>>>>> Folks,
>>>>>
>>>>> I have a KafkaConsumer that I am trying to read messages from. When I
>>>>> try to create a DataStream from the KafkConsumer (env.addSource()) I get
>>>>> the following exception :
>>>>>
>>>>> Any idea on how can this happen?
>>>>>
>>>>> java.lang.NullPointerException
>>>>>   at 
>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526)
>>>>>   at 
>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
>>>>>   at 
>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
>>>>>   at 
>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
>>>>>   at 
>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
>>>>>   at 
>>>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:103)
>>>>>   at 
>>>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:110)
>>>>>   at 
>>>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:264)
>>>>>   at 
>>>>> org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:86)
>>>>>   at 
>>>>> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:149)
>>>>>   at 
>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:449)
>>>>>   at 
>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
>>>>>   at 
>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
>>>>>   at 
>>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95)
>>>>>   at 
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
>>>>>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>>>>>   at java.lang.Thread.run(Thread.java:748)
>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to