here is the output after fixing the scala issues

https://gist.github.com/karimamer/9e3bcf0a6d9110c01caa2ebd14aa7a8c

On Tue, Mar 20, 2018 at 11:39 AM, karim amer <karim.amer...@gmail.com>
wrote:

> Never mind after importing
>
> import org.apache.flink.api.scala._
>
> theses errors went away and i still have the original problem.
> Sorry my bad
>
> On Tue, Mar 20, 2018 at 11:04 AM, karim amer <karim.amer...@gmail.com>
> wrote:
>
>> To clarify should i file a bug report on sbt hiding the errors in the
>> previous email ?
>>
>> On Tue, Mar 20, 2018 at 9:44 AM, karim amer <karim.amer...@gmail.com>
>> wrote:
>>
>>> After switching to Maven from Sbt I got these errors
>>> Error:(63, 37) could not find implicit value for evidence parameter of
>>> type org.apache.flink.api.common.typeinfo.TypeInformation[org.apa
>>> che.flink.quickstart.DataStreamtotableapi.Calls]
>>>     val namedStream = dataStream.map((value:String) => {
>>>
>>>
>>> Error:(63, 37) not enough arguments for method map: (implicit
>>> evidence$7: org.apache.flink.api.common.typeinfo.TypeInformation[org.apa
>>> che.flink.quickstart.DataStreamtotableapi.Calls])org.apache.
>>> flink.streaming.api.scala.DataStream[org.apache.flink.quicks
>>> tart.DataStreamtotableapi.Calls].
>>> Unspecified value parameter evidence$7.
>>>     val namedStream = dataStream.map((value:String) => {
>>>
>>>
>>> Should i file a bug report  ?
>>>
>>> On Tue, Mar 20, 2018 at 9:30 AM, karim amer <karim.amer...@gmail.com>
>>> wrote:
>>>
>>>> Hi Fabian
>>>> Sorry if i confused you The first error is from Nico's code Not my code
>>>> or  snippet
>>>> I am still having the original problem in my snippet where it's writing
>>>> a blank csv file even though i get
>>>> [success] Total time: 26 s, completed Mar 20, 2018 9:28:06 AM
>>>> After running the job
>>>>
>>>> Cheers,
>>>> karim
>>>>
>>>> On Tue, Mar 20, 2018 at 6:25 AM, Fabian Hueske <fhue...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Karim,
>>>>>
>>>>> I cannot find a method invocation "tableEnv.registerDataStream("myTable2",
>>>>> set, 'A, 'B, 'C )" as shown in the error message in your example.
>>>>> It would help if you would keep error message and code consistent.
>>>>> Otherwise it's not possible to figure out what's going on.
>>>>>
>>>>> Best, Fabian
>>>>>
>>>>> 2018-03-20 0:24 GMT+01:00 karim amer <karim.amer...@gmail.com>:
>>>>>
>>>>>> Hi Nico,
>>>>>>
>>>>>> I tried to reproduce your code but registerDataStream keeps failing
>>>>>> to register the fields even though i am following your code and the Docs.
>>>>>> here is the error
>>>>>> [error]  found   : Symbol
>>>>>> [error]  required: org.apache.flink.table.expressions.Expression
>>>>>> [error]     tableEnv.registerDataStream("myTable2", set, 'A, 'B, 'C )
>>>>>> [error]
>>>>>> I think my code snippet was misleading. Here is the full snippet
>>>>>> Changing the name from table didn't fix it for
>>>>>>
>>>>>> import org.apache.flink.streaming.api.scala._
>>>>>> import org.apache.flink.api.java.utils.ParameterTool
>>>>>> import org.apache.flink.core.fs.FileSystem.WriteMode
>>>>>> import org.apache.flink.streaming.api.{CheckpointingMode, 
>>>>>> TimeCharacteristic}
>>>>>> import 
>>>>>> org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
>>>>>> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
>>>>>> import org.apache.flink.streaming.api.windowing.time.Time
>>>>>> import org.apache.flink.table.api.{Table, TableEnvironment}
>>>>>> import org.apache.flink.table.sinks.{CsvTableSink, TableSink}
>>>>>> import org.apache.flink.types.Row
>>>>>>
>>>>>>
>>>>>>
>>>>>> object datastreamtotableapi {
>>>>>>
>>>>>>   case class Calls(a: String,
>>>>>>                    b: String,
>>>>>>                    c: String,
>>>>>>                    d: String,
>>>>>>                    e: String,
>>>>>>                    f: String,
>>>>>>                    g: String,
>>>>>>                    h: String,
>>>>>>                    i: String,
>>>>>>                    j: String,
>>>>>>                    k: String,
>>>>>>                    l: String,
>>>>>>                    m: String,
>>>>>>                    n: String,
>>>>>>                    p: String,
>>>>>>                    q: String,
>>>>>>                    r: String,
>>>>>>                    s: String,
>>>>>>                    t: String,
>>>>>>                    v: String,
>>>>>>                    w: String)
>>>>>>
>>>>>>
>>>>>>   def main(args: Array[String]) {
>>>>>>
>>>>>>     val params = ParameterTool.fromArgs(args)
>>>>>>     val input = params.getRequired("input")
>>>>>>
>>>>>>
>>>>>>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>>>>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>>>>>     env.setParallelism(1)
>>>>>>     val tableEnv = TableEnvironment.getTableEnvironment(env)
>>>>>>
>>>>>>     val dataStream = env.readTextFile(input)
>>>>>>
>>>>>>     val namedStream = dataStream.map((value:String) => {
>>>>>>
>>>>>>       val columns = value.split(",")
>>>>>>       Calls(columns(0), columns(1),columns(2),columns(3), 
>>>>>> columns(4),columns(5),
>>>>>>         columns(6), columns(7),columns(8),columns(9), columns(10), 
>>>>>> columns(11),
>>>>>>         columns(12), columns(13),columns(14),columns(15), columns(16), 
>>>>>> columns(17),
>>>>>>         columns(18),columns(19), columns(20)
>>>>>>       )
>>>>>>     })
>>>>>>
>>>>>>
>>>>>>    val cleanedStream =  namedStream.filter(_.j == " ").filter(_.k==" ")
>>>>>>
>>>>>>    val watermarkedStream = 
>>>>>> cleanedStream.assignTimestampsAndWatermarks(new 
>>>>>> BoundedOutOfOrdernessTimestampExtractor[Calls](Time.seconds(100)) {
>>>>>>      override def extractTimestamp(element: Calls): Long = 
>>>>>> (element.j.concat(element.k)).toLong
>>>>>>    })
>>>>>>
>>>>>>
>>>>>>
>>>>>>     tableEnv.registerDataStream("CDRS", watermarkedStream)
>>>>>>     val results = tableEnv.sqlQuery( """
>>>>>>                                       |SELECT
>>>>>>                                       | a
>>>>>>                                       | FROM CDRS
>>>>>>                                        """.stripMargin)
>>>>>>
>>>>>>
>>>>>>     val result: Table = results
>>>>>>
>>>>>>     val path = "file:///Users/test/1.txt"
>>>>>>     val sink :TableSink[Row]=   new CsvTableSink(
>>>>>>       path,                             // output path
>>>>>>       fieldDelim = "|",                 // optional: delimit files by '|'
>>>>>>       numFiles = 1,                     // optional: write to a single 
>>>>>> file
>>>>>>       writeMode = WriteMode.OVERWRITE)
>>>>>>
>>>>>>     result.writeToSink(sink)
>>>>>>
>>>>>>
>>>>>>     env.execute("this job")
>>>>>>
>>>>>>   }
>>>>>> }
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Mon, Mar 19, 2018 at 9:13 AM, Nico Kruber <n...@data-artisans.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Karim,
>>>>>>> when I was trying to reproduce your code, I got an exception with the
>>>>>>> name 'table' being used - by replacing it and completing the job with
>>>>>>> some input, I did see the csv file popping up. Also, the job was
>>>>>>> crashing when the file 1.txt already existed.
>>>>>>>
>>>>>>> The code I used (running Flink 1.5-SNAPSHOT):
>>>>>>>
>>>>>>>   def main(args: Array[String]) {
>>>>>>>     // set up the streaming execution environment
>>>>>>>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>>>>>     val tableEnv = TableEnvironment.getTableEnvironment(env)
>>>>>>>
>>>>>>>     val stream: DataStream[(Int, Long, String)] =
>>>>>>> get3TupleDataStream(env)
>>>>>>>       .assignAscendingTimestamps(_._2)
>>>>>>>     tableEnv.registerDataStream("mytable", stream, 'A, 'B, 'C)
>>>>>>>
>>>>>>>     val results = tableEnv.sqlQuery( """
>>>>>>>                                        |SELECT
>>>>>>>                                        | A,C
>>>>>>>                                        | FROM mytable
>>>>>>>                                      """.stripMargin)
>>>>>>>
>>>>>>>     val result: Table = results
>>>>>>>
>>>>>>>     val path = "file:///tmp/test/1.txt"
>>>>>>>     val sink :TableSink[Row]=   new CsvTableSink(
>>>>>>>       path,                             // output path
>>>>>>>       fieldDelim = "|",                 // optional: delimit files
>>>>>>> by '|'
>>>>>>>       numFiles = 1,                     // optional: write to a
>>>>>>> single file
>>>>>>>       writeMode = WriteMode.NO_OVERWRITE)
>>>>>>>
>>>>>>>     result.writeToSink(sink)
>>>>>>>
>>>>>>>     env.execute("this job")
>>>>>>>   }
>>>>>>>
>>>>>>>   def get3TupleDataStream(env: StreamExecutionEnvironment):
>>>>>>> DataStream[(Int, Long, String)] = {
>>>>>>>     val data = new mutable.MutableList[(Int, Long, String)]
>>>>>>>     data.+=((1, 1L, "Hi"))
>>>>>>>     data.+=((2, 2L, "Hello"))
>>>>>>>     data.+=((3, 2L, "Hello world"))
>>>>>>>     data.+=((4, 3L, "Hello world, how are you?"))
>>>>>>>     data.+=((5, 3L, "I am fine."))
>>>>>>>     data.+=((6, 3L, "Luke Skywalker"))
>>>>>>>     env.fromCollection(data)
>>>>>>>   }
>>>>>>>
>>>>>>>
>>>>>>> Nico
>>>>>>>
>>>>>>> On 16/03/18 22:50, karim amer wrote:
>>>>>>> > Hi There,
>>>>>>> >
>>>>>>> >  I am trying to write a CSVsink to disk but it's not getting
>>>>>>> written. I
>>>>>>> > think the file is getting overwritten or truncated once The Stream
>>>>>>> > process finishes. Does anyone know why the file is getting
>>>>>>> overwritten
>>>>>>> > or truncated and how can i fix this ?
>>>>>>> >
>>>>>>> >
>>>>>>> > tableEnv.registerDataStream("table", watermarkedStream)
>>>>>>> >
>>>>>>> > val results = tableEnv.sqlQuery( """
>>>>>>> > |SELECT
>>>>>>> > | A
>>>>>>> > | FROM table
>>>>>>> > """.stripMargin)
>>>>>>> >
>>>>>>> >
>>>>>>> >
>>>>>>> > val result: Table = results
>>>>>>> >
>>>>>>> > val path = "file:///path/test/1.txt"
>>>>>>> > val sink :TableSink[Row]=   new CsvTableSink(
>>>>>>> >   path, // output path
>>>>>>> > fieldDelim = "|", // optional: delimit files by '|'
>>>>>>> > numFiles = 1, // optional: write to a single file
>>>>>>> > writeMode = WriteMode.NO_OVERWRITE)
>>>>>>> >
>>>>>>> > result.writeToSink(sink)
>>>>>>> >
>>>>>>> > env.execute("this job")
>>>>>>> >
>>>>>>> >
>>>>>>> >
>>>>>>> >
>>>>>>> > Thanks
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> karim amer
>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> karim amer
>>>>
>>>>
>>>
>>>
>>> --
>>> karim amer
>>>
>>>
>>
>>
>> --
>> karim amer
>>
>>
>
>
> --
> karim amer
>
>


-- 
karim amer

Reply via email to