Hi Fabian Sorry if i confused you The first error is from Nico's code Not my code or snippet I am still having the original problem in my snippet where it's writing a blank csv file even though i get [success] Total time: 26 s, completed Mar 20, 2018 9:28:06 AM After running the job
Cheers, karim On Tue, Mar 20, 2018 at 6:25 AM, Fabian Hueske <fhue...@gmail.com> wrote: > Hi Karim, > > I cannot find a method invocation "tableEnv.registerDataStream("myTable2", > set, 'A, 'B, 'C )" as shown in the error message in your example. > It would help if you would keep error message and code consistent. > Otherwise it's not possible to figure out what's going on. > > Best, Fabian > > 2018-03-20 0:24 GMT+01:00 karim amer <karim.amer...@gmail.com>: > >> Hi Nico, >> >> I tried to reproduce your code but registerDataStream keeps failing to >> register the fields even though i am following your code and the Docs. >> here is the error >> [error] found : Symbol >> [error] required: org.apache.flink.table.expressions.Expression >> [error] tableEnv.registerDataStream("myTable2", set, 'A, 'B, 'C ) >> [error] >> I think my code snippet was misleading. Here is the full snippet Changing >> the name from table didn't fix it for >> >> import org.apache.flink.streaming.api.scala._ >> import org.apache.flink.api.java.utils.ParameterTool >> import org.apache.flink.core.fs.FileSystem.WriteMode >> import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic} >> import >> org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor >> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment >> import org.apache.flink.streaming.api.windowing.time.Time >> import org.apache.flink.table.api.{Table, TableEnvironment} >> import org.apache.flink.table.sinks.{CsvTableSink, TableSink} >> import org.apache.flink.types.Row >> >> >> >> object datastreamtotableapi { >> >> case class Calls(a: String, >> b: String, >> c: String, >> d: String, >> e: String, >> f: String, >> g: String, >> h: String, >> i: String, >> j: String, >> k: String, >> l: String, >> m: String, >> n: String, >> p: String, >> q: String, >> r: String, >> s: String, >> t: String, >> v: String, >> w: String) >> >> >> def main(args: Array[String]) { >> >> val params = ParameterTool.fromArgs(args) >> val input = params.getRequired("input") >> >> >> val env = StreamExecutionEnvironment.getExecutionEnvironment >> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) >> env.setParallelism(1) >> val tableEnv = TableEnvironment.getTableEnvironment(env) >> >> val dataStream = env.readTextFile(input) >> >> val namedStream = dataStream.map((value:String) => { >> >> val columns = value.split(",") >> Calls(columns(0), columns(1),columns(2),columns(3), >> columns(4),columns(5), >> columns(6), columns(7),columns(8),columns(9), columns(10), >> columns(11), >> columns(12), columns(13),columns(14),columns(15), columns(16), >> columns(17), >> columns(18),columns(19), columns(20) >> ) >> }) >> >> >> val cleanedStream = namedStream.filter(_.j == " ").filter(_.k==" ") >> >> val watermarkedStream = cleanedStream.assignTimestampsAndWatermarks(new >> BoundedOutOfOrdernessTimestampExtractor[Calls](Time.seconds(100)) { >> override def extractTimestamp(element: Calls): Long = >> (element.j.concat(element.k)).toLong >> }) >> >> >> >> tableEnv.registerDataStream("CDRS", watermarkedStream) >> val results = tableEnv.sqlQuery( """ >> |SELECT >> | a >> | FROM CDRS >> """.stripMargin) >> >> >> val result: Table = results >> >> val path = "file:///Users/test/1.txt" >> val sink :TableSink[Row]= new CsvTableSink( >> path, // output path >> fieldDelim = "|", // optional: delimit files by '|' >> numFiles = 1, // optional: write to a single file >> writeMode = WriteMode.OVERWRITE) >> >> result.writeToSink(sink) >> >> >> env.execute("this job") >> >> } >> } >> >> >> >> >> On Mon, Mar 19, 2018 at 9:13 AM, Nico Kruber <n...@data-artisans.com> >> wrote: >> >>> Hi Karim, >>> when I was trying to reproduce your code, I got an exception with the >>> name 'table' being used - by replacing it and completing the job with >>> some input, I did see the csv file popping up. Also, the job was >>> crashing when the file 1.txt already existed. >>> >>> The code I used (running Flink 1.5-SNAPSHOT): >>> >>> def main(args: Array[String]) { >>> // set up the streaming execution environment >>> val env = StreamExecutionEnvironment.getExecutionEnvironment >>> val tableEnv = TableEnvironment.getTableEnvironment(env) >>> >>> val stream: DataStream[(Int, Long, String)] = >>> get3TupleDataStream(env) >>> .assignAscendingTimestamps(_._2) >>> tableEnv.registerDataStream("mytable", stream, 'A, 'B, 'C) >>> >>> val results = tableEnv.sqlQuery( """ >>> |SELECT >>> | A,C >>> | FROM mytable >>> """.stripMargin) >>> >>> val result: Table = results >>> >>> val path = "file:///tmp/test/1.txt" >>> val sink :TableSink[Row]= new CsvTableSink( >>> path, // output path >>> fieldDelim = "|", // optional: delimit files by '|' >>> numFiles = 1, // optional: write to a single >>> file >>> writeMode = WriteMode.NO_OVERWRITE) >>> >>> result.writeToSink(sink) >>> >>> env.execute("this job") >>> } >>> >>> def get3TupleDataStream(env: StreamExecutionEnvironment): >>> DataStream[(Int, Long, String)] = { >>> val data = new mutable.MutableList[(Int, Long, String)] >>> data.+=((1, 1L, "Hi")) >>> data.+=((2, 2L, "Hello")) >>> data.+=((3, 2L, "Hello world")) >>> data.+=((4, 3L, "Hello world, how are you?")) >>> data.+=((5, 3L, "I am fine.")) >>> data.+=((6, 3L, "Luke Skywalker")) >>> env.fromCollection(data) >>> } >>> >>> >>> Nico >>> >>> On 16/03/18 22:50, karim amer wrote: >>> > Hi There, >>> > >>> > I am trying to write a CSVsink to disk but it's not getting written. I >>> > think the file is getting overwritten or truncated once The Stream >>> > process finishes. Does anyone know why the file is getting overwritten >>> > or truncated and how can i fix this ? >>> > >>> > >>> > tableEnv.registerDataStream("table", watermarkedStream) >>> > >>> > val results = tableEnv.sqlQuery( """ >>> > |SELECT >>> > | A >>> > | FROM table >>> > """.stripMargin) >>> > >>> > >>> > >>> > val result: Table = results >>> > >>> > val path = "file:///path/test/1.txt" >>> > val sink :TableSink[Row]= new CsvTableSink( >>> > path, // output path >>> > fieldDelim = "|", // optional: delimit files by '|' >>> > numFiles = 1, // optional: write to a single file >>> > writeMode = WriteMode.NO_OVERWRITE) >>> > >>> > result.writeToSink(sink) >>> > >>> > env.execute("this job") >>> > >>> > >>> > >>> > >>> > Thanks >>> >>> >> >> >> -- >> karim amer >> >> > -- karim amer