Hello, I am trying to use a custom source function (declaration given below) for DataStream. if I add the source to stream using add source:
val stream = env.addSource(new QueryOneSource(args))
I get following error: Any explanations and help ??
Error:(14, 31) could not find implicit value for evidence parameter of type
org.apache.flink.api.common.typeinfo.TypeInformation[org.mpi.debs.Tuple]
val stream = env.addSource(new QueryOneSource(args))
^
Error:(14, 31) not enough arguments for method addSource: (implicit
evidence$15: scala.reflect.ClassTag[org.mpi.debs.Tuple], implicit evidence$16:
org.apache.flink.api.common.typeinfo.TypeInformation[org.mpi.debs.Tuple])org.apache.flink.streaming.api.scala.DataStream[org.mpi.debs.Tuple].
Unspecified value parameter evidence$16.
val stream = env.addSource(new QueryOneSource(args))
^
class QueryOneSource(filenames: Array[String]) extends SourceFunction[Tuple] {
val nextTuple: Tuple // case class Tuple(id: Long, value: Int)
override def run(ctx: SourceContext[Tuple]) = {
while (true) {
nextRecord()
ctx.collect(this.nextTuple)
}
}
override def cancel() = { }
}
override def nextRecord() = {
}
}
Best,
Ankur Sharma
Information Systems Group
3.15 E1.1 Universität des Saarlandes
66123, Saarbrücken Germany
Email: ankur.sha...@mpi-inf.mpg.de <mailto:ankur.sha...@mpi-inf.mpg.de>
an...@stud.uni-saarland.de <mailto:an...@stud.uni-saarland.de>
smime.p7s
Description: S/MIME cryptographic signature
