Hello Flinkers!
The below program produces the following error when running locally. I am
building the program using maven, using 0.10.0 and running in streaming
only local mode "start-local-streaming.sh". I have verified that kafka and
the topic is working properly by using kafka-console-*.sh scripts. What am
I doing wrong? Any help would be appreciated it.
Caused by: java.lang.NumberFormatException: For input string: ""
at
java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.lang.Long.parseLong(Long.java:601)
at java.lang.Long.valueOf(Long.java:803)
at
org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.getOffsetFromZooKeeper(ZookeeperOffsetHandler.java:125)
at
org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.seekFetcherToInitialOffsets(ZookeeperOffsetHandler.java:88)
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");
val stream = env
.addSource(new FlinkKafkaConsumer082[String]("topic", new
SimpleStringSchema(), properties))
val counts = stream.map(f=>f.split(","))
print(counts)
env.execute()
}
--
*Alex Rovner*
*Director, Data Engineering *
*o:* 646.759.0052
* <http://www.magnetic.com/>*