To clarify should i file a bug report on sbt hiding the errors in the previous email ?
On Tue, Mar 20, 2018 at 9:44 AM, karim amer <karim.amer...@gmail.com> wrote: > After switching to Maven from Sbt I got these errors > Error:(63, 37) could not find implicit value for evidence parameter of > type org.apache.flink.api.common.typeinfo.TypeInformation[org. > apache.flink.quickstart.DataStreamtotableapi.Calls] > val namedStream = dataStream.map((value:String) => { > > > Error:(63, 37) not enough arguments for method map: (implicit evidence$7: > org.apache.flink.api.common.typeinfo.TypeInformation[org. > apache.flink.quickstart.DataStreamtotableapi.Calls]) > org.apache.flink.streaming.api.scala.DataStream[org. > apache.flink.quickstart.DataStreamtotableapi.Calls]. > Unspecified value parameter evidence$7. > val namedStream = dataStream.map((value:String) => { > > > Should i file a bug report ? > > On Tue, Mar 20, 2018 at 9:30 AM, karim amer <karim.amer...@gmail.com> > wrote: > >> Hi Fabian >> Sorry if i confused you The first error is from Nico's code Not my code >> or snippet >> I am still having the original problem in my snippet where it's writing a >> blank csv file even though i get >> [success] Total time: 26 s, completed Mar 20, 2018 9:28:06 AM >> After running the job >> >> Cheers, >> karim >> >> On Tue, Mar 20, 2018 at 6:25 AM, Fabian Hueske <fhue...@gmail.com> wrote: >> >>> Hi Karim, >>> >>> I cannot find a method invocation "tableEnv.registerDataStream("myTable2", >>> set, 'A, 'B, 'C )" as shown in the error message in your example. >>> It would help if you would keep error message and code consistent. >>> Otherwise it's not possible to figure out what's going on. >>> >>> Best, Fabian >>> >>> 2018-03-20 0:24 GMT+01:00 karim amer <karim.amer...@gmail.com>: >>> >>>> Hi Nico, >>>> >>>> I tried to reproduce your code but registerDataStream keeps failing to >>>> register the fields even though i am following your code and the Docs. >>>> here is the error >>>> [error] found : Symbol >>>> [error] required: org.apache.flink.table.expressions.Expression >>>> [error] tableEnv.registerDataStream("myTable2", set, 'A, 'B, 'C ) >>>> [error] >>>> I think my code snippet was misleading. Here is the full snippet >>>> Changing the name from table didn't fix it for >>>> >>>> import org.apache.flink.streaming.api.scala._ >>>> import org.apache.flink.api.java.utils.ParameterTool >>>> import org.apache.flink.core.fs.FileSystem.WriteMode >>>> import org.apache.flink.streaming.api.{CheckpointingMode, >>>> TimeCharacteristic} >>>> import >>>> org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor >>>> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment >>>> import org.apache.flink.streaming.api.windowing.time.Time >>>> import org.apache.flink.table.api.{Table, TableEnvironment} >>>> import org.apache.flink.table.sinks.{CsvTableSink, TableSink} >>>> import org.apache.flink.types.Row >>>> >>>> >>>> >>>> object datastreamtotableapi { >>>> >>>> case class Calls(a: String, >>>> b: String, >>>> c: String, >>>> d: String, >>>> e: String, >>>> f: String, >>>> g: String, >>>> h: String, >>>> i: String, >>>> j: String, >>>> k: String, >>>> l: String, >>>> m: String, >>>> n: String, >>>> p: String, >>>> q: String, >>>> r: String, >>>> s: String, >>>> t: String, >>>> v: String, >>>> w: String) >>>> >>>> >>>> def main(args: Array[String]) { >>>> >>>> val params = ParameterTool.fromArgs(args) >>>> val input = params.getRequired("input") >>>> >>>> >>>> val env = StreamExecutionEnvironment.getExecutionEnvironment >>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) >>>> env.setParallelism(1) >>>> val tableEnv = TableEnvironment.getTableEnvironment(env) >>>> >>>> val dataStream = env.readTextFile(input) >>>> >>>> val namedStream = dataStream.map((value:String) => { >>>> >>>> val columns = value.split(",") >>>> Calls(columns(0), columns(1),columns(2),columns(3), >>>> columns(4),columns(5), >>>> columns(6), columns(7),columns(8),columns(9), columns(10), >>>> columns(11), >>>> columns(12), columns(13),columns(14),columns(15), columns(16), >>>> columns(17), >>>> columns(18),columns(19), columns(20) >>>> ) >>>> }) >>>> >>>> >>>> val cleanedStream = namedStream.filter(_.j == " ").filter(_.k==" ") >>>> >>>> val watermarkedStream = cleanedStream.assignTimestampsAndWatermarks(new >>>> BoundedOutOfOrdernessTimestampExtractor[Calls](Time.seconds(100)) { >>>> override def extractTimestamp(element: Calls): Long = >>>> (element.j.concat(element.k)).toLong >>>> }) >>>> >>>> >>>> >>>> tableEnv.registerDataStream("CDRS", watermarkedStream) >>>> val results = tableEnv.sqlQuery( """ >>>> |SELECT >>>> | a >>>> | FROM CDRS >>>> """.stripMargin) >>>> >>>> >>>> val result: Table = results >>>> >>>> val path = "file:///Users/test/1.txt" >>>> val sink :TableSink[Row]= new CsvTableSink( >>>> path, // output path >>>> fieldDelim = "|", // optional: delimit files by '|' >>>> numFiles = 1, // optional: write to a single file >>>> writeMode = WriteMode.OVERWRITE) >>>> >>>> result.writeToSink(sink) >>>> >>>> >>>> env.execute("this job") >>>> >>>> } >>>> } >>>> >>>> >>>> >>>> >>>> On Mon, Mar 19, 2018 at 9:13 AM, Nico Kruber <n...@data-artisans.com> >>>> wrote: >>>> >>>>> Hi Karim, >>>>> when I was trying to reproduce your code, I got an exception with the >>>>> name 'table' being used - by replacing it and completing the job with >>>>> some input, I did see the csv file popping up. Also, the job was >>>>> crashing when the file 1.txt already existed. >>>>> >>>>> The code I used (running Flink 1.5-SNAPSHOT): >>>>> >>>>> def main(args: Array[String]) { >>>>> // set up the streaming execution environment >>>>> val env = StreamExecutionEnvironment.getExecutionEnvironment >>>>> val tableEnv = TableEnvironment.getTableEnvironment(env) >>>>> >>>>> val stream: DataStream[(Int, Long, String)] = >>>>> get3TupleDataStream(env) >>>>> .assignAscendingTimestamps(_._2) >>>>> tableEnv.registerDataStream("mytable", stream, 'A, 'B, 'C) >>>>> >>>>> val results = tableEnv.sqlQuery( """ >>>>> |SELECT >>>>> | A,C >>>>> | FROM mytable >>>>> """.stripMargin) >>>>> >>>>> val result: Table = results >>>>> >>>>> val path = "file:///tmp/test/1.txt" >>>>> val sink :TableSink[Row]= new CsvTableSink( >>>>> path, // output path >>>>> fieldDelim = "|", // optional: delimit files by >>>>> '|' >>>>> numFiles = 1, // optional: write to a single >>>>> file >>>>> writeMode = WriteMode.NO_OVERWRITE) >>>>> >>>>> result.writeToSink(sink) >>>>> >>>>> env.execute("this job") >>>>> } >>>>> >>>>> def get3TupleDataStream(env: StreamExecutionEnvironment): >>>>> DataStream[(Int, Long, String)] = { >>>>> val data = new mutable.MutableList[(Int, Long, String)] >>>>> data.+=((1, 1L, "Hi")) >>>>> data.+=((2, 2L, "Hello")) >>>>> data.+=((3, 2L, "Hello world")) >>>>> data.+=((4, 3L, "Hello world, how are you?")) >>>>> data.+=((5, 3L, "I am fine.")) >>>>> data.+=((6, 3L, "Luke Skywalker")) >>>>> env.fromCollection(data) >>>>> } >>>>> >>>>> >>>>> Nico >>>>> >>>>> On 16/03/18 22:50, karim amer wrote: >>>>> > Hi There, >>>>> > >>>>> > I am trying to write a CSVsink to disk but it's not getting >>>>> written. I >>>>> > think the file is getting overwritten or truncated once The Stream >>>>> > process finishes. Does anyone know why the file is getting >>>>> overwritten >>>>> > or truncated and how can i fix this ? >>>>> > >>>>> > >>>>> > tableEnv.registerDataStream("table", watermarkedStream) >>>>> > >>>>> > val results = tableEnv.sqlQuery( """ >>>>> > |SELECT >>>>> > | A >>>>> > | FROM table >>>>> > """.stripMargin) >>>>> > >>>>> > >>>>> > >>>>> > val result: Table = results >>>>> > >>>>> > val path = "file:///path/test/1.txt" >>>>> > val sink :TableSink[Row]= new CsvTableSink( >>>>> > path, // output path >>>>> > fieldDelim = "|", // optional: delimit files by '|' >>>>> > numFiles = 1, // optional: write to a single file >>>>> > writeMode = WriteMode.NO_OVERWRITE) >>>>> > >>>>> > result.writeToSink(sink) >>>>> > >>>>> > env.execute("this job") >>>>> > >>>>> > >>>>> > >>>>> > >>>>> > Thanks >>>>> >>>>> >>>> >>>> >>>> -- >>>> karim amer >>>> >>>> >>> >> >> >> -- >> karim amer >> >> > > > -- > karim amer > > -- karim amer