Hi Fabian
Sorry if i confused you The first error is from Nico's code Not my code or
snippet
I am still having the original problem in my snippet where it's writing a
blank csv file even though i get
[success] Total time: 26 s, completed Mar 20, 2018 9:28:06 AM
After running the job

Cheers,
karim

On Tue, Mar 20, 2018 at 6:25 AM, Fabian Hueske <fhue...@gmail.com> wrote:

> Hi Karim,
>
> I cannot find a method invocation "tableEnv.registerDataStream("myTable2",
> set, 'A, 'B, 'C )" as shown in the error message in your example.
> It would help if you would keep error message and code consistent.
> Otherwise it's not possible to figure out what's going on.
>
> Best, Fabian
>
> 2018-03-20 0:24 GMT+01:00 karim amer <karim.amer...@gmail.com>:
>
>> Hi Nico,
>>
>> I tried to reproduce your code but registerDataStream keeps failing to
>> register the fields even though i am following your code and the Docs.
>> here is the error
>> [error]  found   : Symbol
>> [error]  required: org.apache.flink.table.expressions.Expression
>> [error]     tableEnv.registerDataStream("myTable2", set, 'A, 'B, 'C )
>> [error]
>> I think my code snippet was misleading. Here is the full snippet Changing
>> the name from table didn't fix it for
>>
>> import org.apache.flink.streaming.api.scala._
>> import org.apache.flink.api.java.utils.ParameterTool
>> import org.apache.flink.core.fs.FileSystem.WriteMode
>> import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
>> import 
>> org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
>> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
>> import org.apache.flink.streaming.api.windowing.time.Time
>> import org.apache.flink.table.api.{Table, TableEnvironment}
>> import org.apache.flink.table.sinks.{CsvTableSink, TableSink}
>> import org.apache.flink.types.Row
>>
>>
>>
>> object datastreamtotableapi {
>>
>>   case class Calls(a: String,
>>                    b: String,
>>                    c: String,
>>                    d: String,
>>                    e: String,
>>                    f: String,
>>                    g: String,
>>                    h: String,
>>                    i: String,
>>                    j: String,
>>                    k: String,
>>                    l: String,
>>                    m: String,
>>                    n: String,
>>                    p: String,
>>                    q: String,
>>                    r: String,
>>                    s: String,
>>                    t: String,
>>                    v: String,
>>                    w: String)
>>
>>
>>   def main(args: Array[String]) {
>>
>>     val params = ParameterTool.fromArgs(args)
>>     val input = params.getRequired("input")
>>
>>
>>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>     env.setParallelism(1)
>>     val tableEnv = TableEnvironment.getTableEnvironment(env)
>>
>>     val dataStream = env.readTextFile(input)
>>
>>     val namedStream = dataStream.map((value:String) => {
>>
>>       val columns = value.split(",")
>>       Calls(columns(0), columns(1),columns(2),columns(3), 
>> columns(4),columns(5),
>>         columns(6), columns(7),columns(8),columns(9), columns(10), 
>> columns(11),
>>         columns(12), columns(13),columns(14),columns(15), columns(16), 
>> columns(17),
>>         columns(18),columns(19), columns(20)
>>       )
>>     })
>>
>>
>>    val cleanedStream =  namedStream.filter(_.j == " ").filter(_.k==" ")
>>
>>    val watermarkedStream = cleanedStream.assignTimestampsAndWatermarks(new 
>> BoundedOutOfOrdernessTimestampExtractor[Calls](Time.seconds(100)) {
>>      override def extractTimestamp(element: Calls): Long = 
>> (element.j.concat(element.k)).toLong
>>    })
>>
>>
>>
>>     tableEnv.registerDataStream("CDRS", watermarkedStream)
>>     val results = tableEnv.sqlQuery( """
>>                                       |SELECT
>>                                       | a
>>                                       | FROM CDRS
>>                                        """.stripMargin)
>>
>>
>>     val result: Table = results
>>
>>     val path = "file:///Users/test/1.txt"
>>     val sink :TableSink[Row]=   new CsvTableSink(
>>       path,                             // output path
>>       fieldDelim = "|",                 // optional: delimit files by '|'
>>       numFiles = 1,                     // optional: write to a single file
>>       writeMode = WriteMode.OVERWRITE)
>>
>>     result.writeToSink(sink)
>>
>>
>>     env.execute("this job")
>>
>>   }
>> }
>>
>>
>>
>>
>> On Mon, Mar 19, 2018 at 9:13 AM, Nico Kruber <n...@data-artisans.com>
>> wrote:
>>
>>> Hi Karim,
>>> when I was trying to reproduce your code, I got an exception with the
>>> name 'table' being used - by replacing it and completing the job with
>>> some input, I did see the csv file popping up. Also, the job was
>>> crashing when the file 1.txt already existed.
>>>
>>> The code I used (running Flink 1.5-SNAPSHOT):
>>>
>>>   def main(args: Array[String]) {
>>>     // set up the streaming execution environment
>>>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>     val tableEnv = TableEnvironment.getTableEnvironment(env)
>>>
>>>     val stream: DataStream[(Int, Long, String)] =
>>> get3TupleDataStream(env)
>>>       .assignAscendingTimestamps(_._2)
>>>     tableEnv.registerDataStream("mytable", stream, 'A, 'B, 'C)
>>>
>>>     val results = tableEnv.sqlQuery( """
>>>                                        |SELECT
>>>                                        | A,C
>>>                                        | FROM mytable
>>>                                      """.stripMargin)
>>>
>>>     val result: Table = results
>>>
>>>     val path = "file:///tmp/test/1.txt"
>>>     val sink :TableSink[Row]=   new CsvTableSink(
>>>       path,                             // output path
>>>       fieldDelim = "|",                 // optional: delimit files by '|'
>>>       numFiles = 1,                     // optional: write to a single
>>> file
>>>       writeMode = WriteMode.NO_OVERWRITE)
>>>
>>>     result.writeToSink(sink)
>>>
>>>     env.execute("this job")
>>>   }
>>>
>>>   def get3TupleDataStream(env: StreamExecutionEnvironment):
>>> DataStream[(Int, Long, String)] = {
>>>     val data = new mutable.MutableList[(Int, Long, String)]
>>>     data.+=((1, 1L, "Hi"))
>>>     data.+=((2, 2L, "Hello"))
>>>     data.+=((3, 2L, "Hello world"))
>>>     data.+=((4, 3L, "Hello world, how are you?"))
>>>     data.+=((5, 3L, "I am fine."))
>>>     data.+=((6, 3L, "Luke Skywalker"))
>>>     env.fromCollection(data)
>>>   }
>>>
>>>
>>> Nico
>>>
>>> On 16/03/18 22:50, karim amer wrote:
>>> > Hi There,
>>> >
>>> >  I am trying to write a CSVsink to disk but it's not getting written. I
>>> > think the file is getting overwritten or truncated once The Stream
>>> > process finishes. Does anyone know why the file is getting overwritten
>>> > or truncated and how can i fix this ?
>>> >
>>> >
>>> > tableEnv.registerDataStream("table", watermarkedStream)
>>> >
>>> > val results = tableEnv.sqlQuery( """
>>> > |SELECT
>>> > | A
>>> > | FROM table
>>> > """.stripMargin)
>>> >
>>> >
>>> >
>>> > val result: Table = results
>>> >
>>> > val path = "file:///path/test/1.txt"
>>> > val sink :TableSink[Row]=   new CsvTableSink(
>>> >   path, // output path
>>> > fieldDelim = "|", // optional: delimit files by '|'
>>> > numFiles = 1, // optional: write to a single file
>>> > writeMode = WriteMode.NO_OVERWRITE)
>>> >
>>> > result.writeToSink(sink)
>>> >
>>> > env.execute("this job")
>>> >
>>> >
>>> >
>>> >
>>> > Thanks
>>>
>>>
>>
>>
>> --
>> karim amer
>>
>>
>


-- 
karim amer

Reply via email to