Great, thanks for reporting back!

Best, Fabian

2018-03-20 22:40 GMT+01:00 karim amer <karim.amer...@gmail.com>:

> Never mind I found the error and has nothing to do with flink.
> Sorry
>
> On Tue, Mar 20, 2018 at 12:12 PM, karim amer <karim.amer...@gmail.com>
> wrote:
>
>> here is the output after fixing the scala issues
>>
>> https://gist.github.com/karimamer/9e3bcf0a6d9110c01caa2ebd14aa7a8c
>>
>> On Tue, Mar 20, 2018 at 11:39 AM, karim amer <karim.amer...@gmail.com>
>> wrote:
>>
>>> Never mind after importing
>>>
>>> import org.apache.flink.api.scala._
>>>
>>> theses errors went away and i still have the original problem.
>>> Sorry my bad
>>>
>>> On Tue, Mar 20, 2018 at 11:04 AM, karim amer <karim.amer...@gmail.com>
>>> wrote:
>>>
>>>> To clarify should i file a bug report on sbt hiding the errors in the
>>>> previous email ?
>>>>
>>>> On Tue, Mar 20, 2018 at 9:44 AM, karim amer <karim.amer...@gmail.com>
>>>> wrote:
>>>>
>>>>> After switching to Maven from Sbt I got these errors
>>>>> Error:(63, 37) could not find implicit value for evidence parameter of
>>>>> type org.apache.flink.api.common.typeinfo.TypeInformation[org.apa
>>>>> che.flink.quickstart.DataStreamtotableapi.Calls]
>>>>>     val namedStream = dataStream.map((value:String) => {
>>>>>
>>>>>
>>>>> Error:(63, 37) not enough arguments for method map: (implicit
>>>>> evidence$7: org.apache.flink.api.common.ty
>>>>> peinfo.TypeInformation[org.apache.flink.quickstart.DataStrea
>>>>> mtotableapi.Calls])org.apache.flink.streaming.api.scala.Data
>>>>> Stream[org.apache.flink.quickstart.DataStreamtotableapi.Calls].
>>>>> Unspecified value parameter evidence$7.
>>>>>     val namedStream = dataStream.map((value:String) => {
>>>>>
>>>>>
>>>>> Should i file a bug report  ?
>>>>>
>>>>> On Tue, Mar 20, 2018 at 9:30 AM, karim amer <karim.amer...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Fabian
>>>>>> Sorry if i confused you The first error is from Nico's code Not my
>>>>>> code or  snippet
>>>>>> I am still having the original problem in my snippet where it's
>>>>>> writing a blank csv file even though i get
>>>>>> [success] Total time: 26 s, completed Mar 20, 2018 9:28:06 AM
>>>>>> After running the job
>>>>>>
>>>>>> Cheers,
>>>>>> karim
>>>>>>
>>>>>> On Tue, Mar 20, 2018 at 6:25 AM, Fabian Hueske <fhue...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Karim,
>>>>>>>
>>>>>>> I cannot find a method invocation 
>>>>>>> "tableEnv.registerDataStream("myTable2",
>>>>>>> set, 'A, 'B, 'C )" as shown in the error message in your example.
>>>>>>> It would help if you would keep error message and code consistent.
>>>>>>> Otherwise it's not possible to figure out what's going on.
>>>>>>>
>>>>>>> Best, Fabian
>>>>>>>
>>>>>>> 2018-03-20 0:24 GMT+01:00 karim amer <karim.amer...@gmail.com>:
>>>>>>>
>>>>>>>> Hi Nico,
>>>>>>>>
>>>>>>>> I tried to reproduce your code but registerDataStream keeps
>>>>>>>> failing to register the fields even though i am following your code 
>>>>>>>> and the
>>>>>>>> Docs.
>>>>>>>> here is the error
>>>>>>>> [error]  found   : Symbol
>>>>>>>> [error]  required: org.apache.flink.table.expressions.Expression
>>>>>>>> [error]     tableEnv.registerDataStream("myTable2", set, 'A, 'B,
>>>>>>>> 'C )
>>>>>>>> [error]
>>>>>>>> I think my code snippet was misleading. Here is the full snippet
>>>>>>>> Changing the name from table didn't fix it for
>>>>>>>>
>>>>>>>> import org.apache.flink.streaming.api.scala._
>>>>>>>> import org.apache.flink.api.java.utils.ParameterTool
>>>>>>>> import org.apache.flink.core.fs.FileSystem.WriteMode
>>>>>>>> import org.apache.flink.streaming.api.{CheckpointingMode, 
>>>>>>>> TimeCharacteristic}
>>>>>>>> import 
>>>>>>>> org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
>>>>>>>> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
>>>>>>>> import org.apache.flink.streaming.api.windowing.time.Time
>>>>>>>> import org.apache.flink.table.api.{Table, TableEnvironment}
>>>>>>>> import org.apache.flink.table.sinks.{CsvTableSink, TableSink}
>>>>>>>> import org.apache.flink.types.Row
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> object datastreamtotableapi {
>>>>>>>>
>>>>>>>>   case class Calls(a: String,
>>>>>>>>                    b: String,
>>>>>>>>                    c: String,
>>>>>>>>                    d: String,
>>>>>>>>                    e: String,
>>>>>>>>                    f: String,
>>>>>>>>                    g: String,
>>>>>>>>                    h: String,
>>>>>>>>                    i: String,
>>>>>>>>                    j: String,
>>>>>>>>                    k: String,
>>>>>>>>                    l: String,
>>>>>>>>                    m: String,
>>>>>>>>                    n: String,
>>>>>>>>                    p: String,
>>>>>>>>                    q: String,
>>>>>>>>                    r: String,
>>>>>>>>                    s: String,
>>>>>>>>                    t: String,
>>>>>>>>                    v: String,
>>>>>>>>                    w: String)
>>>>>>>>
>>>>>>>>
>>>>>>>>   def main(args: Array[String]) {
>>>>>>>>
>>>>>>>>     val params = ParameterTool.fromArgs(args)
>>>>>>>>     val input = params.getRequired("input")
>>>>>>>>
>>>>>>>>
>>>>>>>>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>>>>>>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>>>>>>>     env.setParallelism(1)
>>>>>>>>     val tableEnv = TableEnvironment.getTableEnvironment(env)
>>>>>>>>
>>>>>>>>     val dataStream = env.readTextFile(input)
>>>>>>>>
>>>>>>>>     val namedStream = dataStream.map((value:String) => {
>>>>>>>>
>>>>>>>>       val columns = value.split(",")
>>>>>>>>       Calls(columns(0), columns(1),columns(2),columns(3), 
>>>>>>>> columns(4),columns(5),
>>>>>>>>         columns(6), columns(7),columns(8),columns(9), columns(10), 
>>>>>>>> columns(11),
>>>>>>>>         columns(12), columns(13),columns(14),columns(15), columns(16), 
>>>>>>>> columns(17),
>>>>>>>>         columns(18),columns(19), columns(20)
>>>>>>>>       )
>>>>>>>>     })
>>>>>>>>
>>>>>>>>
>>>>>>>>    val cleanedStream =  namedStream.filter(_.j == " ").filter(_.k==" ")
>>>>>>>>
>>>>>>>>    val watermarkedStream = 
>>>>>>>> cleanedStream.assignTimestampsAndWatermarks(new 
>>>>>>>> BoundedOutOfOrdernessTimestampExtractor[Calls](Time.seconds(100)) {
>>>>>>>>      override def extractTimestamp(element: Calls): Long = 
>>>>>>>> (element.j.concat(element.k)).toLong
>>>>>>>>    })
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>     tableEnv.registerDataStream("CDRS", watermarkedStream)
>>>>>>>>     val results = tableEnv.sqlQuery( """
>>>>>>>>                                       |SELECT
>>>>>>>>                                       | a
>>>>>>>>                                       | FROM CDRS
>>>>>>>>                                        """.stripMargin)
>>>>>>>>
>>>>>>>>
>>>>>>>>     val result: Table = results
>>>>>>>>
>>>>>>>>     val path = "file:///Users/test/1.txt"
>>>>>>>>     val sink :TableSink[Row]=   new CsvTableSink(
>>>>>>>>       path,                             // output path
>>>>>>>>       fieldDelim = "|",                 // optional: delimit files by 
>>>>>>>> '|'
>>>>>>>>       numFiles = 1,                     // optional: write to a single 
>>>>>>>> file
>>>>>>>>       writeMode = WriteMode.OVERWRITE)
>>>>>>>>
>>>>>>>>     result.writeToSink(sink)
>>>>>>>>
>>>>>>>>
>>>>>>>>     env.execute("this job")
>>>>>>>>
>>>>>>>>   }
>>>>>>>> }
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, Mar 19, 2018 at 9:13 AM, Nico Kruber <
>>>>>>>> n...@data-artisans.com> wrote:
>>>>>>>>
>>>>>>>>> Hi Karim,
>>>>>>>>> when I was trying to reproduce your code, I got an exception with
>>>>>>>>> the
>>>>>>>>> name 'table' being used - by replacing it and completing the job
>>>>>>>>> with
>>>>>>>>> some input, I did see the csv file popping up. Also, the job was
>>>>>>>>> crashing when the file 1.txt already existed.
>>>>>>>>>
>>>>>>>>> The code I used (running Flink 1.5-SNAPSHOT):
>>>>>>>>>
>>>>>>>>>   def main(args: Array[String]) {
>>>>>>>>>     // set up the streaming execution environment
>>>>>>>>>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>>>>>>>     val tableEnv = TableEnvironment.getTableEnvironment(env)
>>>>>>>>>
>>>>>>>>>     val stream: DataStream[(Int, Long, String)] =
>>>>>>>>> get3TupleDataStream(env)
>>>>>>>>>       .assignAscendingTimestamps(_._2)
>>>>>>>>>     tableEnv.registerDataStream("mytable", stream, 'A, 'B, 'C)
>>>>>>>>>
>>>>>>>>>     val results = tableEnv.sqlQuery( """
>>>>>>>>>                                        |SELECT
>>>>>>>>>                                        | A,C
>>>>>>>>>                                        | FROM mytable
>>>>>>>>>                                      """.stripMargin)
>>>>>>>>>
>>>>>>>>>     val result: Table = results
>>>>>>>>>
>>>>>>>>>     val path = "file:///tmp/test/1.txt"
>>>>>>>>>     val sink :TableSink[Row]=   new CsvTableSink(
>>>>>>>>>       path,                             // output path
>>>>>>>>>       fieldDelim = "|",                 // optional: delimit files
>>>>>>>>> by '|'
>>>>>>>>>       numFiles = 1,                     // optional: write to a
>>>>>>>>> single file
>>>>>>>>>       writeMode = WriteMode.NO_OVERWRITE)
>>>>>>>>>
>>>>>>>>>     result.writeToSink(sink)
>>>>>>>>>
>>>>>>>>>     env.execute("this job")
>>>>>>>>>   }
>>>>>>>>>
>>>>>>>>>   def get3TupleDataStream(env: StreamExecutionEnvironment):
>>>>>>>>> DataStream[(Int, Long, String)] = {
>>>>>>>>>     val data = new mutable.MutableList[(Int, Long, String)]
>>>>>>>>>     data.+=((1, 1L, "Hi"))
>>>>>>>>>     data.+=((2, 2L, "Hello"))
>>>>>>>>>     data.+=((3, 2L, "Hello world"))
>>>>>>>>>     data.+=((4, 3L, "Hello world, how are you?"))
>>>>>>>>>     data.+=((5, 3L, "I am fine."))
>>>>>>>>>     data.+=((6, 3L, "Luke Skywalker"))
>>>>>>>>>     env.fromCollection(data)
>>>>>>>>>   }
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Nico
>>>>>>>>>
>>>>>>>>> On 16/03/18 22:50, karim amer wrote:
>>>>>>>>> > Hi There,
>>>>>>>>> >
>>>>>>>>> >  I am trying to write a CSVsink to disk but it's not getting
>>>>>>>>> written. I
>>>>>>>>> > think the file is getting overwritten or truncated once The
>>>>>>>>> Stream
>>>>>>>>> > process finishes. Does anyone know why the file is getting
>>>>>>>>> overwritten
>>>>>>>>> > or truncated and how can i fix this ?
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> > tableEnv.registerDataStream("table", watermarkedStream)
>>>>>>>>> >
>>>>>>>>> > val results = tableEnv.sqlQuery( """
>>>>>>>>> > |SELECT
>>>>>>>>> > | A
>>>>>>>>> > | FROM table
>>>>>>>>> > """.stripMargin)
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> > val result: Table = results
>>>>>>>>> >
>>>>>>>>> > val path = "file:///path/test/1.txt"
>>>>>>>>> > val sink :TableSink[Row]=   new CsvTableSink(
>>>>>>>>> >   path, // output path
>>>>>>>>> > fieldDelim = "|", // optional: delimit files by '|'
>>>>>>>>> > numFiles = 1, // optional: write to a single file
>>>>>>>>> > writeMode = WriteMode.NO_OVERWRITE)
>>>>>>>>> >
>>>>>>>>> > result.writeToSink(sink)
>>>>>>>>> >
>>>>>>>>> > env.execute("this job")
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> > Thanks
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> karim amer
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> karim amer
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> karim amer
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> karim amer
>>>>
>>>>
>>>
>>>
>>> --
>>> karim amer
>>>
>>>
>>
>>
>> --
>> karim amer
>>
>>
>
>
> --
> karim amer
>
>

Reply via email to