Thanks Ziyad. That was a cut and paste error. Anyway, I figured out a solution to the issue. All of my Flink dependancies were pointing at 1.3.1. Pointing at 1.3.0 resolved the issue.
On Wed, Jul 12, 2017 at 2:17 AM, Ziyad Muhammed <mmzi...@gmail.com> wrote: > Hi Sridhar > > Are you using *ParameterTool *to set the properties? I couldn't see it in > your code, but you use it in the below line: > > FlinkKafkaConsumer010<LogMessage> flinkConsumer = > new FlinkKafkaConsumer010<LogMessage>( > > Arrays.asList(parameterTool.getRequired("topic").split(",")), new > LogDeserializationSchema(), parameterTool.getProperties()); > > > Make sure that the correct properties are passed to FlinkKafkaConsumer. > > > Best > > Ziyad > > > > Best Regards > *Ziyad Muhammed Mohiyudheen * > 407, Internationales Studienzentrum Berlin > Theodor-Heuss-Platz 5 > 14052 Berlin > *Ph: +49 176 6587 3343 <%2B49%20176%206587%203343>* > *Mail to*: *mmzi...@gmail.com <mmzi...@gmail.com>* > > On Tue, Jul 11, 2017 at 9:12 AM, Sridhar Chellappa <flinken...@gmail.com> > wrote: > >> I am pretty sure I am doing something wrong here. Just that I do not >> understand why? >> >> I wrote a small program that reads messages from Kafka and prints it out. >> >> >> >> public class Main { >> >> private static final int CHECKPOINT_INTERVAL = 100000; >> >> >> private static Properties getpropsFromEnv() { >> Properties props = new Properties(); >> props.setProperty("bootstrap.servers", >> System.getenv("KAFKA_ADDRESS")); >> props.setProperty("group.id", System.getenv("CONSUMER_GROUP_ID")); >> props.setProperty("topic", System.getenv("KAFKA_TOPIC")); >> return props; >> } >> >> public static void main(String[] args) throws Exception { >> >> >> Properties props = getpropsFromEnv(); >> >> >> StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> env.enableCheckpointing(CHECKPOINT_INTERVAL); >> env.setParallelism(1); >> FlinkKafkaConsumer010<LogMessage> flinkConsumer = >> new FlinkKafkaConsumer010<LogMessage>( >> >> Arrays.asList(parameterTool.getRequired("topic").split(",")), new >> LogDeserializationSchema(), parameterTool.getProperties() >> ); >> >> DataStream<LogMessage> logMessageDataStream = >> env.addSource(flinkConsumer); >> logMessageDataStream.print(); >> >> env.execute("SomeJob"); >> >> } >> } >> >> public class LogDeserializationSchema implements >> DeserializationSchema<LogMessage> { >> >> @Override >> public LogMessage deserialize(byte[] message) { >> LogMessage logMessage = null; >> try { >> logMessage = LogMessage.parseFrom(message); >> } catch (InvalidProtocolBufferException e) { >> e.printStackTrace(); >> } finally { >> return logMessage; >> } >> } >> >> @Override >> public boolean isEndOfStream(LogMessage nextElement) { >> return false; >> } >> >> @Override >> public TypeInformation<LogMessage> getProducedType() { >> return TypeExtractor.getForClass(LogMessage.class); >> } >> } >> >> >> When I run this program, I do not see any messages being read by the >> consumer. >> >> Things to note : >> >> 1. I ran kafka-console-consumer using the same Kafka parameters and saw >> continuous output. >> >> 2. My Gradle file has the following depencies : >> >> dependencies { >> compile group: 'ch.qos.logback', name: 'logback-classic', version: >> '1.1.7' >> >> >> compile 'org.aeonbits.owner:owner:1.0.9' >> compile group: 'com.mashape.unirest', name: 'unirest-java', version: >> '1.4.9' // For driver Suspension >> compile group: 'joda-time', name: 'joda-time', version: '2.9.4' >> compile 'com.google.protobuf:protobuf-java-util:3.1.0' >> >> /* >> * Flink Dependencies >> */ >> compile group: 'org.apache.flink', name: 'flink-java', version: '1.3.0' >> compile group: 'org.apache.flink', name: >> 'flink-connector-kafka-0.10_2.10', version: '1.3.0' >> compile group: 'org.apache.flink', name: 'flink-streaming-java_2.10', >> version: '1.3.0' >> compile group: 'org.apache.flink', name: 'flink-clients_2.10', version: >> '1.3.0' >> >> >> >> >> } >> >> Can Someone please help ? >> >> >