here is the output after fixing the scala issues https://gist.github.com/karimamer/9e3bcf0a6d9110c01caa2ebd14aa7a8c
On Tue, Mar 20, 2018 at 11:39 AM, karim amer <karim.amer...@gmail.com> wrote: > Never mind after importing > > import org.apache.flink.api.scala._ > > theses errors went away and i still have the original problem. > Sorry my bad > > On Tue, Mar 20, 2018 at 11:04 AM, karim amer <karim.amer...@gmail.com> > wrote: > >> To clarify should i file a bug report on sbt hiding the errors in the >> previous email ? >> >> On Tue, Mar 20, 2018 at 9:44 AM, karim amer <karim.amer...@gmail.com> >> wrote: >> >>> After switching to Maven from Sbt I got these errors >>> Error:(63, 37) could not find implicit value for evidence parameter of >>> type org.apache.flink.api.common.typeinfo.TypeInformation[org.apa >>> che.flink.quickstart.DataStreamtotableapi.Calls] >>> val namedStream = dataStream.map((value:String) => { >>> >>> >>> Error:(63, 37) not enough arguments for method map: (implicit >>> evidence$7: org.apache.flink.api.common.typeinfo.TypeInformation[org.apa >>> che.flink.quickstart.DataStreamtotableapi.Calls])org.apache. >>> flink.streaming.api.scala.DataStream[org.apache.flink.quicks >>> tart.DataStreamtotableapi.Calls]. >>> Unspecified value parameter evidence$7. >>> val namedStream = dataStream.map((value:String) => { >>> >>> >>> Should i file a bug report ? >>> >>> On Tue, Mar 20, 2018 at 9:30 AM, karim amer <karim.amer...@gmail.com> >>> wrote: >>> >>>> Hi Fabian >>>> Sorry if i confused you The first error is from Nico's code Not my code >>>> or snippet >>>> I am still having the original problem in my snippet where it's writing >>>> a blank csv file even though i get >>>> [success] Total time: 26 s, completed Mar 20, 2018 9:28:06 AM >>>> After running the job >>>> >>>> Cheers, >>>> karim >>>> >>>> On Tue, Mar 20, 2018 at 6:25 AM, Fabian Hueske <fhue...@gmail.com> >>>> wrote: >>>> >>>>> Hi Karim, >>>>> >>>>> I cannot find a method invocation "tableEnv.registerDataStream("myTable2", >>>>> set, 'A, 'B, 'C )" as shown in the error message in your example. >>>>> It would help if you would keep error message and code consistent. >>>>> Otherwise it's not possible to figure out what's going on. >>>>> >>>>> Best, Fabian >>>>> >>>>> 2018-03-20 0:24 GMT+01:00 karim amer <karim.amer...@gmail.com>: >>>>> >>>>>> Hi Nico, >>>>>> >>>>>> I tried to reproduce your code but registerDataStream keeps failing >>>>>> to register the fields even though i am following your code and the Docs. >>>>>> here is the error >>>>>> [error] found : Symbol >>>>>> [error] required: org.apache.flink.table.expressions.Expression >>>>>> [error] tableEnv.registerDataStream("myTable2", set, 'A, 'B, 'C ) >>>>>> [error] >>>>>> I think my code snippet was misleading. Here is the full snippet >>>>>> Changing the name from table didn't fix it for >>>>>> >>>>>> import org.apache.flink.streaming.api.scala._ >>>>>> import org.apache.flink.api.java.utils.ParameterTool >>>>>> import org.apache.flink.core.fs.FileSystem.WriteMode >>>>>> import org.apache.flink.streaming.api.{CheckpointingMode, >>>>>> TimeCharacteristic} >>>>>> import >>>>>> org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor >>>>>> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment >>>>>> import org.apache.flink.streaming.api.windowing.time.Time >>>>>> import org.apache.flink.table.api.{Table, TableEnvironment} >>>>>> import org.apache.flink.table.sinks.{CsvTableSink, TableSink} >>>>>> import org.apache.flink.types.Row >>>>>> >>>>>> >>>>>> >>>>>> object datastreamtotableapi { >>>>>> >>>>>> case class Calls(a: String, >>>>>> b: String, >>>>>> c: String, >>>>>> d: String, >>>>>> e: String, >>>>>> f: String, >>>>>> g: String, >>>>>> h: String, >>>>>> i: String, >>>>>> j: String, >>>>>> k: String, >>>>>> l: String, >>>>>> m: String, >>>>>> n: String, >>>>>> p: String, >>>>>> q: String, >>>>>> r: String, >>>>>> s: String, >>>>>> t: String, >>>>>> v: String, >>>>>> w: String) >>>>>> >>>>>> >>>>>> def main(args: Array[String]) { >>>>>> >>>>>> val params = ParameterTool.fromArgs(args) >>>>>> val input = params.getRequired("input") >>>>>> >>>>>> >>>>>> val env = StreamExecutionEnvironment.getExecutionEnvironment >>>>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) >>>>>> env.setParallelism(1) >>>>>> val tableEnv = TableEnvironment.getTableEnvironment(env) >>>>>> >>>>>> val dataStream = env.readTextFile(input) >>>>>> >>>>>> val namedStream = dataStream.map((value:String) => { >>>>>> >>>>>> val columns = value.split(",") >>>>>> Calls(columns(0), columns(1),columns(2),columns(3), >>>>>> columns(4),columns(5), >>>>>> columns(6), columns(7),columns(8),columns(9), columns(10), >>>>>> columns(11), >>>>>> columns(12), columns(13),columns(14),columns(15), columns(16), >>>>>> columns(17), >>>>>> columns(18),columns(19), columns(20) >>>>>> ) >>>>>> }) >>>>>> >>>>>> >>>>>> val cleanedStream = namedStream.filter(_.j == " ").filter(_.k==" ") >>>>>> >>>>>> val watermarkedStream = >>>>>> cleanedStream.assignTimestampsAndWatermarks(new >>>>>> BoundedOutOfOrdernessTimestampExtractor[Calls](Time.seconds(100)) { >>>>>> override def extractTimestamp(element: Calls): Long = >>>>>> (element.j.concat(element.k)).toLong >>>>>> }) >>>>>> >>>>>> >>>>>> >>>>>> tableEnv.registerDataStream("CDRS", watermarkedStream) >>>>>> val results = tableEnv.sqlQuery( """ >>>>>> |SELECT >>>>>> | a >>>>>> | FROM CDRS >>>>>> """.stripMargin) >>>>>> >>>>>> >>>>>> val result: Table = results >>>>>> >>>>>> val path = "file:///Users/test/1.txt" >>>>>> val sink :TableSink[Row]= new CsvTableSink( >>>>>> path, // output path >>>>>> fieldDelim = "|", // optional: delimit files by '|' >>>>>> numFiles = 1, // optional: write to a single >>>>>> file >>>>>> writeMode = WriteMode.OVERWRITE) >>>>>> >>>>>> result.writeToSink(sink) >>>>>> >>>>>> >>>>>> env.execute("this job") >>>>>> >>>>>> } >>>>>> } >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Mon, Mar 19, 2018 at 9:13 AM, Nico Kruber <n...@data-artisans.com> >>>>>> wrote: >>>>>> >>>>>>> Hi Karim, >>>>>>> when I was trying to reproduce your code, I got an exception with the >>>>>>> name 'table' being used - by replacing it and completing the job with >>>>>>> some input, I did see the csv file popping up. Also, the job was >>>>>>> crashing when the file 1.txt already existed. >>>>>>> >>>>>>> The code I used (running Flink 1.5-SNAPSHOT): >>>>>>> >>>>>>> def main(args: Array[String]) { >>>>>>> // set up the streaming execution environment >>>>>>> val env = StreamExecutionEnvironment.getExecutionEnvironment >>>>>>> val tableEnv = TableEnvironment.getTableEnvironment(env) >>>>>>> >>>>>>> val stream: DataStream[(Int, Long, String)] = >>>>>>> get3TupleDataStream(env) >>>>>>> .assignAscendingTimestamps(_._2) >>>>>>> tableEnv.registerDataStream("mytable", stream, 'A, 'B, 'C) >>>>>>> >>>>>>> val results = tableEnv.sqlQuery( """ >>>>>>> |SELECT >>>>>>> | A,C >>>>>>> | FROM mytable >>>>>>> """.stripMargin) >>>>>>> >>>>>>> val result: Table = results >>>>>>> >>>>>>> val path = "file:///tmp/test/1.txt" >>>>>>> val sink :TableSink[Row]= new CsvTableSink( >>>>>>> path, // output path >>>>>>> fieldDelim = "|", // optional: delimit files >>>>>>> by '|' >>>>>>> numFiles = 1, // optional: write to a >>>>>>> single file >>>>>>> writeMode = WriteMode.NO_OVERWRITE) >>>>>>> >>>>>>> result.writeToSink(sink) >>>>>>> >>>>>>> env.execute("this job") >>>>>>> } >>>>>>> >>>>>>> def get3TupleDataStream(env: StreamExecutionEnvironment): >>>>>>> DataStream[(Int, Long, String)] = { >>>>>>> val data = new mutable.MutableList[(Int, Long, String)] >>>>>>> data.+=((1, 1L, "Hi")) >>>>>>> data.+=((2, 2L, "Hello")) >>>>>>> data.+=((3, 2L, "Hello world")) >>>>>>> data.+=((4, 3L, "Hello world, how are you?")) >>>>>>> data.+=((5, 3L, "I am fine.")) >>>>>>> data.+=((6, 3L, "Luke Skywalker")) >>>>>>> env.fromCollection(data) >>>>>>> } >>>>>>> >>>>>>> >>>>>>> Nico >>>>>>> >>>>>>> On 16/03/18 22:50, karim amer wrote: >>>>>>> > Hi There, >>>>>>> > >>>>>>> > I am trying to write a CSVsink to disk but it's not getting >>>>>>> written. I >>>>>>> > think the file is getting overwritten or truncated once The Stream >>>>>>> > process finishes. Does anyone know why the file is getting >>>>>>> overwritten >>>>>>> > or truncated and how can i fix this ? >>>>>>> > >>>>>>> > >>>>>>> > tableEnv.registerDataStream("table", watermarkedStream) >>>>>>> > >>>>>>> > val results = tableEnv.sqlQuery( """ >>>>>>> > |SELECT >>>>>>> > | A >>>>>>> > | FROM table >>>>>>> > """.stripMargin) >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > val result: Table = results >>>>>>> > >>>>>>> > val path = "file:///path/test/1.txt" >>>>>>> > val sink :TableSink[Row]= new CsvTableSink( >>>>>>> > path, // output path >>>>>>> > fieldDelim = "|", // optional: delimit files by '|' >>>>>>> > numFiles = 1, // optional: write to a single file >>>>>>> > writeMode = WriteMode.NO_OVERWRITE) >>>>>>> > >>>>>>> > result.writeToSink(sink) >>>>>>> > >>>>>>> > env.execute("this job") >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > Thanks >>>>>>> >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> karim amer >>>>>> >>>>>> >>>>> >>>> >>>> >>>> -- >>>> karim amer >>>> >>>> >>> >>> >>> -- >>> karim amer >>> >>> >> >> >> -- >> karim amer >> >> > > > -- > karim amer > > -- karim amer