Never mind after importing import org.apache.flink.api.scala._
theses errors went away and i still have the original problem. Sorry my bad On Tue, Mar 20, 2018 at 11:04 AM, karim amer <karim.amer...@gmail.com> wrote: > To clarify should i file a bug report on sbt hiding the errors in the > previous email ? > > On Tue, Mar 20, 2018 at 9:44 AM, karim amer <karim.amer...@gmail.com> > wrote: > >> After switching to Maven from Sbt I got these errors >> Error:(63, 37) could not find implicit value for evidence parameter of >> type org.apache.flink.api.common.typeinfo.TypeInformation[org.apa >> che.flink.quickstart.DataStreamtotableapi.Calls] >> val namedStream = dataStream.map((value:String) => { >> >> >> Error:(63, 37) not enough arguments for method map: (implicit evidence$7: >> org.apache.flink.api.common.typeinfo.TypeInformation[org.apa >> che.flink.quickstart.DataStreamtotableapi.Calls])org.apache. >> flink.streaming.api.scala.DataStream[org.apache.flink. >> quickstart.DataStreamtotableapi.Calls]. >> Unspecified value parameter evidence$7. >> val namedStream = dataStream.map((value:String) => { >> >> >> Should i file a bug report ? >> >> On Tue, Mar 20, 2018 at 9:30 AM, karim amer <karim.amer...@gmail.com> >> wrote: >> >>> Hi Fabian >>> Sorry if i confused you The first error is from Nico's code Not my code >>> or snippet >>> I am still having the original problem in my snippet where it's writing >>> a blank csv file even though i get >>> [success] Total time: 26 s, completed Mar 20, 2018 9:28:06 AM >>> After running the job >>> >>> Cheers, >>> karim >>> >>> On Tue, Mar 20, 2018 at 6:25 AM, Fabian Hueske <fhue...@gmail.com> >>> wrote: >>> >>>> Hi Karim, >>>> >>>> I cannot find a method invocation "tableEnv.registerDataStream("myTable2", >>>> set, 'A, 'B, 'C )" as shown in the error message in your example. >>>> It would help if you would keep error message and code consistent. >>>> Otherwise it's not possible to figure out what's going on. >>>> >>>> Best, Fabian >>>> >>>> 2018-03-20 0:24 GMT+01:00 karim amer <karim.amer...@gmail.com>: >>>> >>>>> Hi Nico, >>>>> >>>>> I tried to reproduce your code but registerDataStream keeps failing >>>>> to register the fields even though i am following your code and the Docs. >>>>> here is the error >>>>> [error] found : Symbol >>>>> [error] required: org.apache.flink.table.expressions.Expression >>>>> [error] tableEnv.registerDataStream("myTable2", set, 'A, 'B, 'C ) >>>>> [error] >>>>> I think my code snippet was misleading. Here is the full snippet >>>>> Changing the name from table didn't fix it for >>>>> >>>>> import org.apache.flink.streaming.api.scala._ >>>>> import org.apache.flink.api.java.utils.ParameterTool >>>>> import org.apache.flink.core.fs.FileSystem.WriteMode >>>>> import org.apache.flink.streaming.api.{CheckpointingMode, >>>>> TimeCharacteristic} >>>>> import >>>>> org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor >>>>> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment >>>>> import org.apache.flink.streaming.api.windowing.time.Time >>>>> import org.apache.flink.table.api.{Table, TableEnvironment} >>>>> import org.apache.flink.table.sinks.{CsvTableSink, TableSink} >>>>> import org.apache.flink.types.Row >>>>> >>>>> >>>>> >>>>> object datastreamtotableapi { >>>>> >>>>> case class Calls(a: String, >>>>> b: String, >>>>> c: String, >>>>> d: String, >>>>> e: String, >>>>> f: String, >>>>> g: String, >>>>> h: String, >>>>> i: String, >>>>> j: String, >>>>> k: String, >>>>> l: String, >>>>> m: String, >>>>> n: String, >>>>> p: String, >>>>> q: String, >>>>> r: String, >>>>> s: String, >>>>> t: String, >>>>> v: String, >>>>> w: String) >>>>> >>>>> >>>>> def main(args: Array[String]) { >>>>> >>>>> val params = ParameterTool.fromArgs(args) >>>>> val input = params.getRequired("input") >>>>> >>>>> >>>>> val env = StreamExecutionEnvironment.getExecutionEnvironment >>>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) >>>>> env.setParallelism(1) >>>>> val tableEnv = TableEnvironment.getTableEnvironment(env) >>>>> >>>>> val dataStream = env.readTextFile(input) >>>>> >>>>> val namedStream = dataStream.map((value:String) => { >>>>> >>>>> val columns = value.split(",") >>>>> Calls(columns(0), columns(1),columns(2),columns(3), >>>>> columns(4),columns(5), >>>>> columns(6), columns(7),columns(8),columns(9), columns(10), >>>>> columns(11), >>>>> columns(12), columns(13),columns(14),columns(15), columns(16), >>>>> columns(17), >>>>> columns(18),columns(19), columns(20) >>>>> ) >>>>> }) >>>>> >>>>> >>>>> val cleanedStream = namedStream.filter(_.j == " ").filter(_.k==" ") >>>>> >>>>> val watermarkedStream = >>>>> cleanedStream.assignTimestampsAndWatermarks(new >>>>> BoundedOutOfOrdernessTimestampExtractor[Calls](Time.seconds(100)) { >>>>> override def extractTimestamp(element: Calls): Long = >>>>> (element.j.concat(element.k)).toLong >>>>> }) >>>>> >>>>> >>>>> >>>>> tableEnv.registerDataStream("CDRS", watermarkedStream) >>>>> val results = tableEnv.sqlQuery( """ >>>>> |SELECT >>>>> | a >>>>> | FROM CDRS >>>>> """.stripMargin) >>>>> >>>>> >>>>> val result: Table = results >>>>> >>>>> val path = "file:///Users/test/1.txt" >>>>> val sink :TableSink[Row]= new CsvTableSink( >>>>> path, // output path >>>>> fieldDelim = "|", // optional: delimit files by '|' >>>>> numFiles = 1, // optional: write to a single >>>>> file >>>>> writeMode = WriteMode.OVERWRITE) >>>>> >>>>> result.writeToSink(sink) >>>>> >>>>> >>>>> env.execute("this job") >>>>> >>>>> } >>>>> } >>>>> >>>>> >>>>> >>>>> >>>>> On Mon, Mar 19, 2018 at 9:13 AM, Nico Kruber <n...@data-artisans.com> >>>>> wrote: >>>>> >>>>>> Hi Karim, >>>>>> when I was trying to reproduce your code, I got an exception with the >>>>>> name 'table' being used - by replacing it and completing the job with >>>>>> some input, I did see the csv file popping up. Also, the job was >>>>>> crashing when the file 1.txt already existed. >>>>>> >>>>>> The code I used (running Flink 1.5-SNAPSHOT): >>>>>> >>>>>> def main(args: Array[String]) { >>>>>> // set up the streaming execution environment >>>>>> val env = StreamExecutionEnvironment.getExecutionEnvironment >>>>>> val tableEnv = TableEnvironment.getTableEnvironment(env) >>>>>> >>>>>> val stream: DataStream[(Int, Long, String)] = >>>>>> get3TupleDataStream(env) >>>>>> .assignAscendingTimestamps(_._2) >>>>>> tableEnv.registerDataStream("mytable", stream, 'A, 'B, 'C) >>>>>> >>>>>> val results = tableEnv.sqlQuery( """ >>>>>> |SELECT >>>>>> | A,C >>>>>> | FROM mytable >>>>>> """.stripMargin) >>>>>> >>>>>> val result: Table = results >>>>>> >>>>>> val path = "file:///tmp/test/1.txt" >>>>>> val sink :TableSink[Row]= new CsvTableSink( >>>>>> path, // output path >>>>>> fieldDelim = "|", // optional: delimit files by >>>>>> '|' >>>>>> numFiles = 1, // optional: write to a >>>>>> single file >>>>>> writeMode = WriteMode.NO_OVERWRITE) >>>>>> >>>>>> result.writeToSink(sink) >>>>>> >>>>>> env.execute("this job") >>>>>> } >>>>>> >>>>>> def get3TupleDataStream(env: StreamExecutionEnvironment): >>>>>> DataStream[(Int, Long, String)] = { >>>>>> val data = new mutable.MutableList[(Int, Long, String)] >>>>>> data.+=((1, 1L, "Hi")) >>>>>> data.+=((2, 2L, "Hello")) >>>>>> data.+=((3, 2L, "Hello world")) >>>>>> data.+=((4, 3L, "Hello world, how are you?")) >>>>>> data.+=((5, 3L, "I am fine.")) >>>>>> data.+=((6, 3L, "Luke Skywalker")) >>>>>> env.fromCollection(data) >>>>>> } >>>>>> >>>>>> >>>>>> Nico >>>>>> >>>>>> On 16/03/18 22:50, karim amer wrote: >>>>>> > Hi There, >>>>>> > >>>>>> > I am trying to write a CSVsink to disk but it's not getting >>>>>> written. I >>>>>> > think the file is getting overwritten or truncated once The Stream >>>>>> > process finishes. Does anyone know why the file is getting >>>>>> overwritten >>>>>> > or truncated and how can i fix this ? >>>>>> > >>>>>> > >>>>>> > tableEnv.registerDataStream("table", watermarkedStream) >>>>>> > >>>>>> > val results = tableEnv.sqlQuery( """ >>>>>> > |SELECT >>>>>> > | A >>>>>> > | FROM table >>>>>> > """.stripMargin) >>>>>> > >>>>>> > >>>>>> > >>>>>> > val result: Table = results >>>>>> > >>>>>> > val path = "file:///path/test/1.txt" >>>>>> > val sink :TableSink[Row]= new CsvTableSink( >>>>>> > path, // output path >>>>>> > fieldDelim = "|", // optional: delimit files by '|' >>>>>> > numFiles = 1, // optional: write to a single file >>>>>> > writeMode = WriteMode.NO_OVERWRITE) >>>>>> > >>>>>> > result.writeToSink(sink) >>>>>> > >>>>>> > env.execute("this job") >>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> > Thanks >>>>>> >>>>>> >>>>> >>>>> >>>>> -- >>>>> karim amer >>>>> >>>>> >>>> >>> >>> >>> -- >>> karim amer >>> >>> >> >> >> -- >> karim amer >> >> > > > -- > karim amer > > -- karim amer