Hi, I am trying to write a minimal Kafka consumer in Scala and got this far:
➜ scala git:(kafka_exp_001) ✗ cat KafkaConsumer.scala package io.allthingsdata.kafkaConsumer import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.util.serialization.SimpleStringSchema import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082 import org.apache.flink.api.common.typeinfo._ //import org.apache.flink.streaming.api.windowing.time.Time object KafkaConsumer { def main(args: Array[String]) { // set up the execution environment val env = StreamExecutionEnvironment.getExecutionEnvironment val valueDeserializer = new SimpleStringSchema() val props = new java.util.Properties() // create a Kafka Consumer val kafkaConsumer: FlinkKafkaConsumer082[String] = new FlinkKafkaConsumer082( "Topic1", valueDeserializer, props ) // get input data val messageStream: DataStream[String] = env.addSource(kafkaConsumer) // supply typeInfo ? // do something with it val messages = messageStream. map ( s => "Kafka and Flink say: $s" ) // execute and print result messages.print() } } /* based on this Java example code ParameterTool parameterTool = ParameterTool.fromArgs(args); DataStream<String> messageStream = env .addSource(new FlinkKafkaConsumer082<>( parameterTool.getRequired("topic"), new SimpleStringSchema(), parameterTool.getProperties())); Once a DataStream is created, you can transform it as you like. For example, let us pad every word with a fixed prefix, and print to stdout: messageStream .rebalance() .map ( s -> “Kafka and Flink says: ” + s) .print(); */ When trying to compile in sbt I get these error messages: ``` > compile [info] Compiling 1 Scala source to /Users/peter_v/data/github/petervandenabeele/flink-sbt/target/scala-2.10/classes... [error] /Users/peter_v/data/github/petervandenabeele/flink-sbt/src/main/scala/KafkaConsumer.scala:28: could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[String] [error] val messageStream: DataStream[String] = env.addSource(kafkaConsumer) // supply typeInfo ? [error] ^ [error] /Users/peter_v/data/github/petervandenabeele/flink-sbt/src/main/scala/KafkaConsumer.scala:32: could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[String] [error] map ( s => "Kafka and Flink say: $s" ) [error] ^ [error] two errors found [error] (compile:compileIncremental) Compilation failed [error] Total time: 0 s, completed Dec 19, 2015 5:11:56 PM ``` When inspecting DataStreamSource addSource, I read: /** * Ads a data source with a custom type information thus opening a * {@link DataStream}. Only in very special cases does the user need to * support type information. Otherwise use * {@link #addSource(org.apache.flink.streaming.api.functions.source.SourceFunction)} * I did try to supply a `BasicTypeInfo.STRING_TYPE_INFO` as typeInfo argument, but that does not solve it. When trying: `val messageStream: DataStream[String] = env.addSource(kafkaConsumer, BasicTypeInfo.STRING_TYPE_INFO) // supply typeInfo ?` I get: > compile [info] Compiling 1 Scala source to /Users/peter_v/data/github/petervandenabeele/flink-sbt/target/scala-2.10/classes... [error] /Users/peter_v/data/github/petervandenabeele/flink-sbt/src/main/scala/KafkaConsumer.scala:28: overloaded method value addSource with alternatives: [error] [T](function: org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext[T] => Unit)(implicit evidence$17: scala.reflect.ClassTag[T], implicit evidence$18: org.apache.flink.api.common.typeinfo.TypeInformation[T])org.apache.flink.streaming.api.scala.DataStream[T] <and> [error] [T](function: org.apache.flink.streaming.api.functions.source.SourceFunction[T])(implicit evidence$15: scala.reflect.ClassTag[T], implicit evidence$16: org.apache.flink.api.common.typeinfo.TypeInformation[T])org.apache.flink.streaming.api.scala.DataStream[T] [error] cannot be applied to (org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082[String], org.apache.flink.api.common.typeinfo.BasicTypeInfo[String]) [error] val messageStream: DataStream[String] = env.addSource(kafkaConsumer, BasicTypeInfo.STRING_TYPE_INFO) // supply typeInfo ? [error] ^ [error] /Users/peter_v/data/github/petervandenabeele/flink-sbt/src/main/scala/KafkaConsumer.scala:32: could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[String] [error] map ( s => "Kafka and Flink say: $s" ) [error] ^ [error] two errors found [error] (compile:compileIncremental) Compilation failed [error] Total time: 0 s, completed Dec 20, 2015 3:13:04 PM > The sbt.build I am using is: ``` ➜ flink-sbt git:(kafka_exp_001) ✗ cat build.sbt val flink_scala_0_10_0 = "org.apache.flink" % "flink-scala" % "0.10.0" val flink_clients_0_10_0 = "org.apache.flink" % "flink-clients" % "0.10.0" val flink_streaming_scala_0_10_0 = "org.apache.flink" % "flink-streaming-scala" % "0.10.0" val flink_connector_kafka_0_10_0 = "org.apache.flink" % "flink-connector-kafka" % "0.10.0" lazy val commonSettings = Seq( organization := "io.allthingsdata", version := "0.1.0", scalaVersion := "2.10.6" ) lazy val root = (project in file(".")). settings(commonSettings: _*). settings( name := "flink-sbt", fork in run := true, libraryDependencies += flink_scala_0_10_0, libraryDependencies += flink_clients_0_10_0, libraryDependencies += flink_streaming_scala_0_10_0, libraryDependencies += flink_connector_kafka_0_10_0 ) ``` Any hints would be very much appreciated at how to make this minimal example work. Many thanks :-) Peter