Thanks Ziyad. That was a cut and paste error. Anyway, I figured out a
solution to the issue. All of my Flink dependancies were pointing at 1.3.1.
Pointing at 1.3.0 resolved the issue.

On Wed, Jul 12, 2017 at 2:17 AM, Ziyad Muhammed <mmzi...@gmail.com> wrote:

> Hi Sridhar
>
> Are you using *ParameterTool *to set the properties? I couldn't see it in
> your code, but you use it in the below line:
>
> FlinkKafkaConsumer010<LogMessage> flinkConsumer =
>                 new FlinkKafkaConsumer010<LogMessage>(
>                         
> Arrays.asList(parameterTool.getRequired("topic").split(",")), new 
> LogDeserializationSchema(), parameterTool.getProperties());
>
>
> Make sure that the correct properties are passed to FlinkKafkaConsumer.
>
>
> Best
>
> Ziyad
>
>
>
> Best Regards
> *Ziyad Muhammed Mohiyudheen *
> 407, Internationales Studienzentrum Berlin
> Theodor-Heuss-Platz 5
> 14052 Berlin
> *Ph: +49 176 6587 3343 <%2B49%20176%206587%203343>*
> *Mail to*: *mmzi...@gmail.com <mmzi...@gmail.com>*
>
> On Tue, Jul 11, 2017 at 9:12 AM, Sridhar Chellappa <flinken...@gmail.com>
> wrote:
>
>> I am pretty sure I am doing something wrong here. Just that I do not
>> understand why?
>>
>> I wrote a small program that reads messages from Kafka and prints it out.
>>
>>
>>
>> public class Main {
>>
>>     private static final int CHECKPOINT_INTERVAL = 100000;
>>
>>
>>     private static Properties getpropsFromEnv() {
>>         Properties props = new Properties();
>>         props.setProperty("bootstrap.servers", 
>> System.getenv("KAFKA_ADDRESS"));
>>         props.setProperty("group.id", System.getenv("CONSUMER_GROUP_ID"));
>>         props.setProperty("topic", System.getenv("KAFKA_TOPIC"));
>>         return props;
>>     }
>>
>>     public static void main(String[] args) throws Exception {
>>
>>
>>         Properties props = getpropsFromEnv();
>>
>>
>>         StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>         env.enableCheckpointing(CHECKPOINT_INTERVAL);
>>         env.setParallelism(1);
>>         FlinkKafkaConsumer010<LogMessage> flinkConsumer =
>>                 new FlinkKafkaConsumer010<LogMessage>(
>>                         
>> Arrays.asList(parameterTool.getRequired("topic").split(",")), new 
>> LogDeserializationSchema(), parameterTool.getProperties()
>>                     );
>>
>>         DataStream<LogMessage> logMessageDataStream = 
>> env.addSource(flinkConsumer);
>>         logMessageDataStream.print();
>>
>>         env.execute("SomeJob");
>>
>>     }
>> }
>>
>> public class LogDeserializationSchema implements 
>> DeserializationSchema<LogMessage> {
>>
>>     @Override
>>     public LogMessage deserialize(byte[] message) {
>>         LogMessage logMessage = null;
>>         try {
>>             logMessage =  LogMessage.parseFrom(message);
>>         } catch (InvalidProtocolBufferException e) {
>>             e.printStackTrace();
>>         } finally {
>>             return logMessage;
>>         }
>>     }
>>
>>     @Override
>>     public boolean isEndOfStream(LogMessage nextElement) {
>>         return false;
>>     }
>>
>>     @Override
>>     public TypeInformation<LogMessage> getProducedType() {
>>         return TypeExtractor.getForClass(LogMessage.class);
>>     }
>> }
>>
>>
>> When I run this program, I do not see any messages being read by the 
>> consumer.
>>
>> Things to note :
>>
>> 1. I ran kafka-console-consumer using the same Kafka parameters and saw 
>> continuous output.
>>
>> 2. My Gradle file has the following depencies :
>>
>> dependencies {
>>     compile group: 'ch.qos.logback', name: 'logback-classic', version: 
>> '1.1.7'
>>
>>
>>     compile 'org.aeonbits.owner:owner:1.0.9'
>>     compile group: 'com.mashape.unirest', name: 'unirest-java', version: 
>> '1.4.9' // For driver Suspension
>>     compile group: 'joda-time', name: 'joda-time', version: '2.9.4'
>>     compile 'com.google.protobuf:protobuf-java-util:3.1.0'
>>
>>     /*
>>      * Flink Dependencies
>>      */
>>     compile group: 'org.apache.flink', name: 'flink-java', version: '1.3.0'
>>     compile group: 'org.apache.flink', name: 
>> 'flink-connector-kafka-0.10_2.10', version: '1.3.0'
>>     compile group: 'org.apache.flink', name: 'flink-streaming-java_2.10', 
>> version: '1.3.0'
>>     compile group: 'org.apache.flink', name: 'flink-clients_2.10', version: 
>> '1.3.0'
>>
>>
>>
>>
>> }
>>
>> Can Someone please help ?
>>
>>
>

Reply via email to