Great, thanks for reporting back! Best, Fabian
2018-03-20 22:40 GMT+01:00 karim amer <karim.amer...@gmail.com>: > Never mind I found the error and has nothing to do with flink. > Sorry > > On Tue, Mar 20, 2018 at 12:12 PM, karim amer <karim.amer...@gmail.com> > wrote: > >> here is the output after fixing the scala issues >> >> https://gist.github.com/karimamer/9e3bcf0a6d9110c01caa2ebd14aa7a8c >> >> On Tue, Mar 20, 2018 at 11:39 AM, karim amer <karim.amer...@gmail.com> >> wrote: >> >>> Never mind after importing >>> >>> import org.apache.flink.api.scala._ >>> >>> theses errors went away and i still have the original problem. >>> Sorry my bad >>> >>> On Tue, Mar 20, 2018 at 11:04 AM, karim amer <karim.amer...@gmail.com> >>> wrote: >>> >>>> To clarify should i file a bug report on sbt hiding the errors in the >>>> previous email ? >>>> >>>> On Tue, Mar 20, 2018 at 9:44 AM, karim amer <karim.amer...@gmail.com> >>>> wrote: >>>> >>>>> After switching to Maven from Sbt I got these errors >>>>> Error:(63, 37) could not find implicit value for evidence parameter of >>>>> type org.apache.flink.api.common.typeinfo.TypeInformation[org.apa >>>>> che.flink.quickstart.DataStreamtotableapi.Calls] >>>>> val namedStream = dataStream.map((value:String) => { >>>>> >>>>> >>>>> Error:(63, 37) not enough arguments for method map: (implicit >>>>> evidence$7: org.apache.flink.api.common.ty >>>>> peinfo.TypeInformation[org.apache.flink.quickstart.DataStrea >>>>> mtotableapi.Calls])org.apache.flink.streaming.api.scala.Data >>>>> Stream[org.apache.flink.quickstart.DataStreamtotableapi.Calls]. >>>>> Unspecified value parameter evidence$7. >>>>> val namedStream = dataStream.map((value:String) => { >>>>> >>>>> >>>>> Should i file a bug report ? >>>>> >>>>> On Tue, Mar 20, 2018 at 9:30 AM, karim amer <karim.amer...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi Fabian >>>>>> Sorry if i confused you The first error is from Nico's code Not my >>>>>> code or snippet >>>>>> I am still having the original problem in my snippet where it's >>>>>> writing a blank csv file even though i get >>>>>> [success] Total time: 26 s, completed Mar 20, 2018 9:28:06 AM >>>>>> After running the job >>>>>> >>>>>> Cheers, >>>>>> karim >>>>>> >>>>>> On Tue, Mar 20, 2018 at 6:25 AM, Fabian Hueske <fhue...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi Karim, >>>>>>> >>>>>>> I cannot find a method invocation >>>>>>> "tableEnv.registerDataStream("myTable2", >>>>>>> set, 'A, 'B, 'C )" as shown in the error message in your example. >>>>>>> It would help if you would keep error message and code consistent. >>>>>>> Otherwise it's not possible to figure out what's going on. >>>>>>> >>>>>>> Best, Fabian >>>>>>> >>>>>>> 2018-03-20 0:24 GMT+01:00 karim amer <karim.amer...@gmail.com>: >>>>>>> >>>>>>>> Hi Nico, >>>>>>>> >>>>>>>> I tried to reproduce your code but registerDataStream keeps >>>>>>>> failing to register the fields even though i am following your code >>>>>>>> and the >>>>>>>> Docs. >>>>>>>> here is the error >>>>>>>> [error] found : Symbol >>>>>>>> [error] required: org.apache.flink.table.expressions.Expression >>>>>>>> [error] tableEnv.registerDataStream("myTable2", set, 'A, 'B, >>>>>>>> 'C ) >>>>>>>> [error] >>>>>>>> I think my code snippet was misleading. Here is the full snippet >>>>>>>> Changing the name from table didn't fix it for >>>>>>>> >>>>>>>> import org.apache.flink.streaming.api.scala._ >>>>>>>> import org.apache.flink.api.java.utils.ParameterTool >>>>>>>> import org.apache.flink.core.fs.FileSystem.WriteMode >>>>>>>> import org.apache.flink.streaming.api.{CheckpointingMode, >>>>>>>> TimeCharacteristic} >>>>>>>> import >>>>>>>> org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor >>>>>>>> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment >>>>>>>> import org.apache.flink.streaming.api.windowing.time.Time >>>>>>>> import org.apache.flink.table.api.{Table, TableEnvironment} >>>>>>>> import org.apache.flink.table.sinks.{CsvTableSink, TableSink} >>>>>>>> import org.apache.flink.types.Row >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> object datastreamtotableapi { >>>>>>>> >>>>>>>> case class Calls(a: String, >>>>>>>> b: String, >>>>>>>> c: String, >>>>>>>> d: String, >>>>>>>> e: String, >>>>>>>> f: String, >>>>>>>> g: String, >>>>>>>> h: String, >>>>>>>> i: String, >>>>>>>> j: String, >>>>>>>> k: String, >>>>>>>> l: String, >>>>>>>> m: String, >>>>>>>> n: String, >>>>>>>> p: String, >>>>>>>> q: String, >>>>>>>> r: String, >>>>>>>> s: String, >>>>>>>> t: String, >>>>>>>> v: String, >>>>>>>> w: String) >>>>>>>> >>>>>>>> >>>>>>>> def main(args: Array[String]) { >>>>>>>> >>>>>>>> val params = ParameterTool.fromArgs(args) >>>>>>>> val input = params.getRequired("input") >>>>>>>> >>>>>>>> >>>>>>>> val env = StreamExecutionEnvironment.getExecutionEnvironment >>>>>>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) >>>>>>>> env.setParallelism(1) >>>>>>>> val tableEnv = TableEnvironment.getTableEnvironment(env) >>>>>>>> >>>>>>>> val dataStream = env.readTextFile(input) >>>>>>>> >>>>>>>> val namedStream = dataStream.map((value:String) => { >>>>>>>> >>>>>>>> val columns = value.split(",") >>>>>>>> Calls(columns(0), columns(1),columns(2),columns(3), >>>>>>>> columns(4),columns(5), >>>>>>>> columns(6), columns(7),columns(8),columns(9), columns(10), >>>>>>>> columns(11), >>>>>>>> columns(12), columns(13),columns(14),columns(15), columns(16), >>>>>>>> columns(17), >>>>>>>> columns(18),columns(19), columns(20) >>>>>>>> ) >>>>>>>> }) >>>>>>>> >>>>>>>> >>>>>>>> val cleanedStream = namedStream.filter(_.j == " ").filter(_.k==" ") >>>>>>>> >>>>>>>> val watermarkedStream = >>>>>>>> cleanedStream.assignTimestampsAndWatermarks(new >>>>>>>> BoundedOutOfOrdernessTimestampExtractor[Calls](Time.seconds(100)) { >>>>>>>> override def extractTimestamp(element: Calls): Long = >>>>>>>> (element.j.concat(element.k)).toLong >>>>>>>> }) >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> tableEnv.registerDataStream("CDRS", watermarkedStream) >>>>>>>> val results = tableEnv.sqlQuery( """ >>>>>>>> |SELECT >>>>>>>> | a >>>>>>>> | FROM CDRS >>>>>>>> """.stripMargin) >>>>>>>> >>>>>>>> >>>>>>>> val result: Table = results >>>>>>>> >>>>>>>> val path = "file:///Users/test/1.txt" >>>>>>>> val sink :TableSink[Row]= new CsvTableSink( >>>>>>>> path, // output path >>>>>>>> fieldDelim = "|", // optional: delimit files by >>>>>>>> '|' >>>>>>>> numFiles = 1, // optional: write to a single >>>>>>>> file >>>>>>>> writeMode = WriteMode.OVERWRITE) >>>>>>>> >>>>>>>> result.writeToSink(sink) >>>>>>>> >>>>>>>> >>>>>>>> env.execute("this job") >>>>>>>> >>>>>>>> } >>>>>>>> } >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Mon, Mar 19, 2018 at 9:13 AM, Nico Kruber < >>>>>>>> n...@data-artisans.com> wrote: >>>>>>>> >>>>>>>>> Hi Karim, >>>>>>>>> when I was trying to reproduce your code, I got an exception with >>>>>>>>> the >>>>>>>>> name 'table' being used - by replacing it and completing the job >>>>>>>>> with >>>>>>>>> some input, I did see the csv file popping up. Also, the job was >>>>>>>>> crashing when the file 1.txt already existed. >>>>>>>>> >>>>>>>>> The code I used (running Flink 1.5-SNAPSHOT): >>>>>>>>> >>>>>>>>> def main(args: Array[String]) { >>>>>>>>> // set up the streaming execution environment >>>>>>>>> val env = StreamExecutionEnvironment.getExecutionEnvironment >>>>>>>>> val tableEnv = TableEnvironment.getTableEnvironment(env) >>>>>>>>> >>>>>>>>> val stream: DataStream[(Int, Long, String)] = >>>>>>>>> get3TupleDataStream(env) >>>>>>>>> .assignAscendingTimestamps(_._2) >>>>>>>>> tableEnv.registerDataStream("mytable", stream, 'A, 'B, 'C) >>>>>>>>> >>>>>>>>> val results = tableEnv.sqlQuery( """ >>>>>>>>> |SELECT >>>>>>>>> | A,C >>>>>>>>> | FROM mytable >>>>>>>>> """.stripMargin) >>>>>>>>> >>>>>>>>> val result: Table = results >>>>>>>>> >>>>>>>>> val path = "file:///tmp/test/1.txt" >>>>>>>>> val sink :TableSink[Row]= new CsvTableSink( >>>>>>>>> path, // output path >>>>>>>>> fieldDelim = "|", // optional: delimit files >>>>>>>>> by '|' >>>>>>>>> numFiles = 1, // optional: write to a >>>>>>>>> single file >>>>>>>>> writeMode = WriteMode.NO_OVERWRITE) >>>>>>>>> >>>>>>>>> result.writeToSink(sink) >>>>>>>>> >>>>>>>>> env.execute("this job") >>>>>>>>> } >>>>>>>>> >>>>>>>>> def get3TupleDataStream(env: StreamExecutionEnvironment): >>>>>>>>> DataStream[(Int, Long, String)] = { >>>>>>>>> val data = new mutable.MutableList[(Int, Long, String)] >>>>>>>>> data.+=((1, 1L, "Hi")) >>>>>>>>> data.+=((2, 2L, "Hello")) >>>>>>>>> data.+=((3, 2L, "Hello world")) >>>>>>>>> data.+=((4, 3L, "Hello world, how are you?")) >>>>>>>>> data.+=((5, 3L, "I am fine.")) >>>>>>>>> data.+=((6, 3L, "Luke Skywalker")) >>>>>>>>> env.fromCollection(data) >>>>>>>>> } >>>>>>>>> >>>>>>>>> >>>>>>>>> Nico >>>>>>>>> >>>>>>>>> On 16/03/18 22:50, karim amer wrote: >>>>>>>>> > Hi There, >>>>>>>>> > >>>>>>>>> > I am trying to write a CSVsink to disk but it's not getting >>>>>>>>> written. I >>>>>>>>> > think the file is getting overwritten or truncated once The >>>>>>>>> Stream >>>>>>>>> > process finishes. Does anyone know why the file is getting >>>>>>>>> overwritten >>>>>>>>> > or truncated and how can i fix this ? >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > tableEnv.registerDataStream("table", watermarkedStream) >>>>>>>>> > >>>>>>>>> > val results = tableEnv.sqlQuery( """ >>>>>>>>> > |SELECT >>>>>>>>> > | A >>>>>>>>> > | FROM table >>>>>>>>> > """.stripMargin) >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > val result: Table = results >>>>>>>>> > >>>>>>>>> > val path = "file:///path/test/1.txt" >>>>>>>>> > val sink :TableSink[Row]= new CsvTableSink( >>>>>>>>> > path, // output path >>>>>>>>> > fieldDelim = "|", // optional: delimit files by '|' >>>>>>>>> > numFiles = 1, // optional: write to a single file >>>>>>>>> > writeMode = WriteMode.NO_OVERWRITE) >>>>>>>>> > >>>>>>>>> > result.writeToSink(sink) >>>>>>>>> > >>>>>>>>> > env.execute("this job") >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > Thanks >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> karim amer >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> karim amer >>>>>> >>>>>> >>>>> >>>>> >>>>> -- >>>>> karim amer >>>>> >>>>> >>>> >>>> >>>> -- >>>> karim amer >>>> >>>> >>> >>> >>> -- >>> karim amer >>> >>> >> >> >> -- >> karim amer >> >> > > > -- > karim amer > >