Hi Peter, The problem is that you have the DataSet and DataStream package imports. Remove the import from the DataSet API (import org.apache.flink.api.scala._) to make the example work.
On Sun, Dec 20, 2015 at 3:20 PM, Peter Vandenabeele <pe...@vandenabeele.com> wrote: > Hi, > > I am trying to write a minimal Kafka consumer in Scala and got > this far: > > ➜ scala git:(kafka_exp_001) ✗ cat KafkaConsumer.scala > package io.allthingsdata.kafkaConsumer > > import org.apache.flink.api.scala._ > import org.apache.flink.streaming.api.scala._ > import org.apache.flink.streaming.util.serialization.SimpleStringSchema > import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082 > import org.apache.flink.api.common.typeinfo._ > //import org.apache.flink.streaming.api.windowing.time.Time > > object KafkaConsumer { > def main(args: Array[String]) { > > // set up the execution environment > val env = StreamExecutionEnvironment.getExecutionEnvironment > > val valueDeserializer = new SimpleStringSchema() > val props = new java.util.Properties() > > // create a Kafka Consumer > val kafkaConsumer: FlinkKafkaConsumer082[String] = > new FlinkKafkaConsumer082( > "Topic1", > valueDeserializer, > props > ) > > // get input data > val messageStream: DataStream[String] = env.addSource(kafkaConsumer) // > supply typeInfo ? > > // do something with it > val messages = messageStream. > map ( s => "Kafka and Flink say: $s" ) > > // execute and print result > messages.print() > } > } > > /* based on this Java example code > ParameterTool parameterTool = ParameterTool.fromArgs(args); > > DataStream<String> messageStream = env > .addSource(new FlinkKafkaConsumer082<>( > parameterTool.getRequired("topic"), > new SimpleStringSchema(), > parameterTool.getProperties())); > > Once a DataStream is created, you can transform it as you like. For > example, let us pad every word with a fixed prefix, and print to stdout: > > messageStream > .rebalance() > .map ( s -> “Kafka and Flink says: ” + s) > .print(); > */ > > > When trying to compile in sbt I get these error messages: > > ``` > > compile > [info] Compiling 1 Scala source to > > /Users/peter_v/data/github/petervandenabeele/flink-sbt/target/scala-2.10/classes... > [error] > > /Users/peter_v/data/github/petervandenabeele/flink-sbt/src/main/scala/KafkaConsumer.scala:28: > could not find implicit value for evidence parameter of type > org.apache.flink.api.common.typeinfo.TypeInformation[String] > [error] val messageStream: DataStream[String] = > env.addSource(kafkaConsumer) // supply typeInfo ? > [error] ^ > [error] > > /Users/peter_v/data/github/petervandenabeele/flink-sbt/src/main/scala/KafkaConsumer.scala:32: > could not find implicit value for evidence parameter of type > org.apache.flink.api.common.typeinfo.TypeInformation[String] > [error] map ( s => "Kafka and Flink say: $s" ) > [error] ^ > [error] two errors found > [error] (compile:compileIncremental) Compilation failed > [error] Total time: 0 s, completed Dec 19, 2015 5:11:56 PM > ``` > > When inspecting DataStreamSource addSource, I read: > > /** > * Ads a data source with a custom type information thus opening a > * {@link DataStream}. Only in very special cases does the user need to > * support type information. Otherwise use > * {@link > #addSource(org.apache.flink.streaming.api.functions.source.SourceFunction)} > * > > > I did try to supply a `BasicTypeInfo.STRING_TYPE_INFO` as typeInfo > argument, but that does not solve it. > > When trying: > > `val messageStream: DataStream[String] = env.addSource(kafkaConsumer, > BasicTypeInfo.STRING_TYPE_INFO) // supply typeInfo ?` > > I get: > > > compile > [info] Compiling 1 Scala source to > > /Users/peter_v/data/github/petervandenabeele/flink-sbt/target/scala-2.10/classes... > [error] > > /Users/peter_v/data/github/petervandenabeele/flink-sbt/src/main/scala/KafkaConsumer.scala:28: > overloaded method value addSource with alternatives: > [error] [T](function: > > org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext[T] > => Unit)(implicit evidence$17: scala.reflect.ClassTag[T], implicit > evidence$18: > > org.apache.flink.api.common.typeinfo.TypeInformation[T])org.apache.flink.streaming.api.scala.DataStream[T] > <and> > [error] [T](function: > org.apache.flink.streaming.api.functions.source.SourceFunction[T])(implicit > evidence$15: scala.reflect.ClassTag[T], implicit evidence$16: > > org.apache.flink.api.common.typeinfo.TypeInformation[T])org.apache.flink.streaming.api.scala.DataStream[T] > [error] cannot be applied to > (org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082[String], > org.apache.flink.api.common.typeinfo.BasicTypeInfo[String]) > [error] val messageStream: DataStream[String] = > env.addSource(kafkaConsumer, BasicTypeInfo.STRING_TYPE_INFO) // supply > typeInfo ? > [error] ^ > [error] > > /Users/peter_v/data/github/petervandenabeele/flink-sbt/src/main/scala/KafkaConsumer.scala:32: > could not find implicit value for evidence parameter of type > org.apache.flink.api.common.typeinfo.TypeInformation[String] > [error] map ( s => "Kafka and Flink say: $s" ) > [error] ^ > [error] two errors found > [error] (compile:compileIncremental) Compilation failed > [error] Total time: 0 s, completed Dec 20, 2015 3:13:04 PM > > > > The sbt.build I am using is: > > ``` > ➜ flink-sbt git:(kafka_exp_001) ✗ cat build.sbt > val flink_scala_0_10_0 = "org.apache.flink" % "flink-scala" % "0.10.0" > val flink_clients_0_10_0 = "org.apache.flink" % "flink-clients" % "0.10.0" > val flink_streaming_scala_0_10_0 = "org.apache.flink" % > "flink-streaming-scala" % "0.10.0" > val flink_connector_kafka_0_10_0 = "org.apache.flink" % > "flink-connector-kafka" % "0.10.0" > > lazy val commonSettings = Seq( > organization := "io.allthingsdata", > version := "0.1.0", > scalaVersion := "2.10.6" > ) > > lazy val root = (project in file(".")). > settings(commonSettings: _*). > settings( > name := "flink-sbt", > fork in run := true, > libraryDependencies += flink_scala_0_10_0, > libraryDependencies += flink_clients_0_10_0, > libraryDependencies += flink_streaming_scala_0_10_0, > libraryDependencies += flink_connector_kafka_0_10_0 > ) > ``` > > Any hints would be very much appreciated at how to make this minimal > example work. > > Many thanks :-) > > Peter >