After switching to Maven from Sbt I got these errors Error:(63, 37) could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[org.apache.flink.quickstart.DataStreamtotableapi.Calls] val namedStream = dataStream.map((value:String) => {
Error:(63, 37) not enough arguments for method map: (implicit evidence$7: org.apache.flink.api.common.typeinfo.TypeInformation[org.apache.flink.quickstart.DataStreamtotableapi.Calls])org.apache.flink.streaming.api.scala.DataStream[org.apache.flink.quickstart.DataStreamtotableapi.Calls]. Unspecified value parameter evidence$7. val namedStream = dataStream.map((value:String) => { Should i file a bug report ? On Tue, Mar 20, 2018 at 9:30 AM, karim amer <karim.amer...@gmail.com> wrote: > Hi Fabian > Sorry if i confused you The first error is from Nico's code Not my code > or snippet > I am still having the original problem in my snippet where it's writing a > blank csv file even though i get > [success] Total time: 26 s, completed Mar 20, 2018 9:28:06 AM > After running the job > > Cheers, > karim > > On Tue, Mar 20, 2018 at 6:25 AM, Fabian Hueske <fhue...@gmail.com> wrote: > >> Hi Karim, >> >> I cannot find a method invocation "tableEnv.registerDataStream("myTable2", >> set, 'A, 'B, 'C )" as shown in the error message in your example. >> It would help if you would keep error message and code consistent. >> Otherwise it's not possible to figure out what's going on. >> >> Best, Fabian >> >> 2018-03-20 0:24 GMT+01:00 karim amer <karim.amer...@gmail.com>: >> >>> Hi Nico, >>> >>> I tried to reproduce your code but registerDataStream keeps failing to >>> register the fields even though i am following your code and the Docs. >>> here is the error >>> [error] found : Symbol >>> [error] required: org.apache.flink.table.expressions.Expression >>> [error] tableEnv.registerDataStream("myTable2", set, 'A, 'B, 'C ) >>> [error] >>> I think my code snippet was misleading. Here is the full snippet >>> Changing the name from table didn't fix it for >>> >>> import org.apache.flink.streaming.api.scala._ >>> import org.apache.flink.api.java.utils.ParameterTool >>> import org.apache.flink.core.fs.FileSystem.WriteMode >>> import org.apache.flink.streaming.api.{CheckpointingMode, >>> TimeCharacteristic} >>> import >>> org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor >>> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment >>> import org.apache.flink.streaming.api.windowing.time.Time >>> import org.apache.flink.table.api.{Table, TableEnvironment} >>> import org.apache.flink.table.sinks.{CsvTableSink, TableSink} >>> import org.apache.flink.types.Row >>> >>> >>> >>> object datastreamtotableapi { >>> >>> case class Calls(a: String, >>> b: String, >>> c: String, >>> d: String, >>> e: String, >>> f: String, >>> g: String, >>> h: String, >>> i: String, >>> j: String, >>> k: String, >>> l: String, >>> m: String, >>> n: String, >>> p: String, >>> q: String, >>> r: String, >>> s: String, >>> t: String, >>> v: String, >>> w: String) >>> >>> >>> def main(args: Array[String]) { >>> >>> val params = ParameterTool.fromArgs(args) >>> val input = params.getRequired("input") >>> >>> >>> val env = StreamExecutionEnvironment.getExecutionEnvironment >>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) >>> env.setParallelism(1) >>> val tableEnv = TableEnvironment.getTableEnvironment(env) >>> >>> val dataStream = env.readTextFile(input) >>> >>> val namedStream = dataStream.map((value:String) => { >>> >>> val columns = value.split(",") >>> Calls(columns(0), columns(1),columns(2),columns(3), >>> columns(4),columns(5), >>> columns(6), columns(7),columns(8),columns(9), columns(10), >>> columns(11), >>> columns(12), columns(13),columns(14),columns(15), columns(16), >>> columns(17), >>> columns(18),columns(19), columns(20) >>> ) >>> }) >>> >>> >>> val cleanedStream = namedStream.filter(_.j == " ").filter(_.k==" ") >>> >>> val watermarkedStream = cleanedStream.assignTimestampsAndWatermarks(new >>> BoundedOutOfOrdernessTimestampExtractor[Calls](Time.seconds(100)) { >>> override def extractTimestamp(element: Calls): Long = >>> (element.j.concat(element.k)).toLong >>> }) >>> >>> >>> >>> tableEnv.registerDataStream("CDRS", watermarkedStream) >>> val results = tableEnv.sqlQuery( """ >>> |SELECT >>> | a >>> | FROM CDRS >>> """.stripMargin) >>> >>> >>> val result: Table = results >>> >>> val path = "file:///Users/test/1.txt" >>> val sink :TableSink[Row]= new CsvTableSink( >>> path, // output path >>> fieldDelim = "|", // optional: delimit files by '|' >>> numFiles = 1, // optional: write to a single file >>> writeMode = WriteMode.OVERWRITE) >>> >>> result.writeToSink(sink) >>> >>> >>> env.execute("this job") >>> >>> } >>> } >>> >>> >>> >>> >>> On Mon, Mar 19, 2018 at 9:13 AM, Nico Kruber <n...@data-artisans.com> >>> wrote: >>> >>>> Hi Karim, >>>> when I was trying to reproduce your code, I got an exception with the >>>> name 'table' being used - by replacing it and completing the job with >>>> some input, I did see the csv file popping up. Also, the job was >>>> crashing when the file 1.txt already existed. >>>> >>>> The code I used (running Flink 1.5-SNAPSHOT): >>>> >>>> def main(args: Array[String]) { >>>> // set up the streaming execution environment >>>> val env = StreamExecutionEnvironment.getExecutionEnvironment >>>> val tableEnv = TableEnvironment.getTableEnvironment(env) >>>> >>>> val stream: DataStream[(Int, Long, String)] = >>>> get3TupleDataStream(env) >>>> .assignAscendingTimestamps(_._2) >>>> tableEnv.registerDataStream("mytable", stream, 'A, 'B, 'C) >>>> >>>> val results = tableEnv.sqlQuery( """ >>>> |SELECT >>>> | A,C >>>> | FROM mytable >>>> """.stripMargin) >>>> >>>> val result: Table = results >>>> >>>> val path = "file:///tmp/test/1.txt" >>>> val sink :TableSink[Row]= new CsvTableSink( >>>> path, // output path >>>> fieldDelim = "|", // optional: delimit files by >>>> '|' >>>> numFiles = 1, // optional: write to a single >>>> file >>>> writeMode = WriteMode.NO_OVERWRITE) >>>> >>>> result.writeToSink(sink) >>>> >>>> env.execute("this job") >>>> } >>>> >>>> def get3TupleDataStream(env: StreamExecutionEnvironment): >>>> DataStream[(Int, Long, String)] = { >>>> val data = new mutable.MutableList[(Int, Long, String)] >>>> data.+=((1, 1L, "Hi")) >>>> data.+=((2, 2L, "Hello")) >>>> data.+=((3, 2L, "Hello world")) >>>> data.+=((4, 3L, "Hello world, how are you?")) >>>> data.+=((5, 3L, "I am fine.")) >>>> data.+=((6, 3L, "Luke Skywalker")) >>>> env.fromCollection(data) >>>> } >>>> >>>> >>>> Nico >>>> >>>> On 16/03/18 22:50, karim amer wrote: >>>> > Hi There, >>>> > >>>> > I am trying to write a CSVsink to disk but it's not getting written. >>>> I >>>> > think the file is getting overwritten or truncated once The Stream >>>> > process finishes. Does anyone know why the file is getting overwritten >>>> > or truncated and how can i fix this ? >>>> > >>>> > >>>> > tableEnv.registerDataStream("table", watermarkedStream) >>>> > >>>> > val results = tableEnv.sqlQuery( """ >>>> > |SELECT >>>> > | A >>>> > | FROM table >>>> > """.stripMargin) >>>> > >>>> > >>>> > >>>> > val result: Table = results >>>> > >>>> > val path = "file:///path/test/1.txt" >>>> > val sink :TableSink[Row]= new CsvTableSink( >>>> > path, // output path >>>> > fieldDelim = "|", // optional: delimit files by '|' >>>> > numFiles = 1, // optional: write to a single file >>>> > writeMode = WriteMode.NO_OVERWRITE) >>>> > >>>> > result.writeToSink(sink) >>>> > >>>> > env.execute("this job") >>>> > >>>> > >>>> > >>>> > >>>> > Thanks >>>> >>>> >>> >>> >>> -- >>> karim amer >>> >>> >> > > > -- > karim amer > > -- karim amer