To clarify should i file a bug report on sbt hiding the errors in the
previous email ?

On Tue, Mar 20, 2018 at 9:44 AM, karim amer <karim.amer...@gmail.com> wrote:

> After switching to Maven from Sbt I got these errors
> Error:(63, 37) could not find implicit value for evidence parameter of
> type org.apache.flink.api.common.typeinfo.TypeInformation[org.
> apache.flink.quickstart.DataStreamtotableapi.Calls]
>     val namedStream = dataStream.map((value:String) => {
>
>
> Error:(63, 37) not enough arguments for method map: (implicit evidence$7:
> org.apache.flink.api.common.typeinfo.TypeInformation[org.
> apache.flink.quickstart.DataStreamtotableapi.Calls])
> org.apache.flink.streaming.api.scala.DataStream[org.
> apache.flink.quickstart.DataStreamtotableapi.Calls].
> Unspecified value parameter evidence$7.
>     val namedStream = dataStream.map((value:String) => {
>
>
> Should i file a bug report  ?
>
> On Tue, Mar 20, 2018 at 9:30 AM, karim amer <karim.amer...@gmail.com>
> wrote:
>
>> Hi Fabian
>> Sorry if i confused you The first error is from Nico's code Not my code
>> or  snippet
>> I am still having the original problem in my snippet where it's writing a
>> blank csv file even though i get
>> [success] Total time: 26 s, completed Mar 20, 2018 9:28:06 AM
>> After running the job
>>
>> Cheers,
>> karim
>>
>> On Tue, Mar 20, 2018 at 6:25 AM, Fabian Hueske <fhue...@gmail.com> wrote:
>>
>>> Hi Karim,
>>>
>>> I cannot find a method invocation "tableEnv.registerDataStream("myTable2",
>>> set, 'A, 'B, 'C )" as shown in the error message in your example.
>>> It would help if you would keep error message and code consistent.
>>> Otherwise it's not possible to figure out what's going on.
>>>
>>> Best, Fabian
>>>
>>> 2018-03-20 0:24 GMT+01:00 karim amer <karim.amer...@gmail.com>:
>>>
>>>> Hi Nico,
>>>>
>>>> I tried to reproduce your code but registerDataStream keeps failing to
>>>> register the fields even though i am following your code and the Docs.
>>>> here is the error
>>>> [error]  found   : Symbol
>>>> [error]  required: org.apache.flink.table.expressions.Expression
>>>> [error]     tableEnv.registerDataStream("myTable2", set, 'A, 'B, 'C )
>>>> [error]
>>>> I think my code snippet was misleading. Here is the full snippet
>>>> Changing the name from table didn't fix it for
>>>>
>>>> import org.apache.flink.streaming.api.scala._
>>>> import org.apache.flink.api.java.utils.ParameterTool
>>>> import org.apache.flink.core.fs.FileSystem.WriteMode
>>>> import org.apache.flink.streaming.api.{CheckpointingMode, 
>>>> TimeCharacteristic}
>>>> import 
>>>> org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
>>>> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
>>>> import org.apache.flink.streaming.api.windowing.time.Time
>>>> import org.apache.flink.table.api.{Table, TableEnvironment}
>>>> import org.apache.flink.table.sinks.{CsvTableSink, TableSink}
>>>> import org.apache.flink.types.Row
>>>>
>>>>
>>>>
>>>> object datastreamtotableapi {
>>>>
>>>>   case class Calls(a: String,
>>>>                    b: String,
>>>>                    c: String,
>>>>                    d: String,
>>>>                    e: String,
>>>>                    f: String,
>>>>                    g: String,
>>>>                    h: String,
>>>>                    i: String,
>>>>                    j: String,
>>>>                    k: String,
>>>>                    l: String,
>>>>                    m: String,
>>>>                    n: String,
>>>>                    p: String,
>>>>                    q: String,
>>>>                    r: String,
>>>>                    s: String,
>>>>                    t: String,
>>>>                    v: String,
>>>>                    w: String)
>>>>
>>>>
>>>>   def main(args: Array[String]) {
>>>>
>>>>     val params = ParameterTool.fromArgs(args)
>>>>     val input = params.getRequired("input")
>>>>
>>>>
>>>>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>>>     env.setParallelism(1)
>>>>     val tableEnv = TableEnvironment.getTableEnvironment(env)
>>>>
>>>>     val dataStream = env.readTextFile(input)
>>>>
>>>>     val namedStream = dataStream.map((value:String) => {
>>>>
>>>>       val columns = value.split(",")
>>>>       Calls(columns(0), columns(1),columns(2),columns(3), 
>>>> columns(4),columns(5),
>>>>         columns(6), columns(7),columns(8),columns(9), columns(10), 
>>>> columns(11),
>>>>         columns(12), columns(13),columns(14),columns(15), columns(16), 
>>>> columns(17),
>>>>         columns(18),columns(19), columns(20)
>>>>       )
>>>>     })
>>>>
>>>>
>>>>    val cleanedStream =  namedStream.filter(_.j == " ").filter(_.k==" ")
>>>>
>>>>    val watermarkedStream = cleanedStream.assignTimestampsAndWatermarks(new 
>>>> BoundedOutOfOrdernessTimestampExtractor[Calls](Time.seconds(100)) {
>>>>      override def extractTimestamp(element: Calls): Long = 
>>>> (element.j.concat(element.k)).toLong
>>>>    })
>>>>
>>>>
>>>>
>>>>     tableEnv.registerDataStream("CDRS", watermarkedStream)
>>>>     val results = tableEnv.sqlQuery( """
>>>>                                       |SELECT
>>>>                                       | a
>>>>                                       | FROM CDRS
>>>>                                        """.stripMargin)
>>>>
>>>>
>>>>     val result: Table = results
>>>>
>>>>     val path = "file:///Users/test/1.txt"
>>>>     val sink :TableSink[Row]=   new CsvTableSink(
>>>>       path,                             // output path
>>>>       fieldDelim = "|",                 // optional: delimit files by '|'
>>>>       numFiles = 1,                     // optional: write to a single file
>>>>       writeMode = WriteMode.OVERWRITE)
>>>>
>>>>     result.writeToSink(sink)
>>>>
>>>>
>>>>     env.execute("this job")
>>>>
>>>>   }
>>>> }
>>>>
>>>>
>>>>
>>>>
>>>> On Mon, Mar 19, 2018 at 9:13 AM, Nico Kruber <n...@data-artisans.com>
>>>> wrote:
>>>>
>>>>> Hi Karim,
>>>>> when I was trying to reproduce your code, I got an exception with the
>>>>> name 'table' being used - by replacing it and completing the job with
>>>>> some input, I did see the csv file popping up. Also, the job was
>>>>> crashing when the file 1.txt already existed.
>>>>>
>>>>> The code I used (running Flink 1.5-SNAPSHOT):
>>>>>
>>>>>   def main(args: Array[String]) {
>>>>>     // set up the streaming execution environment
>>>>>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>>>     val tableEnv = TableEnvironment.getTableEnvironment(env)
>>>>>
>>>>>     val stream: DataStream[(Int, Long, String)] =
>>>>> get3TupleDataStream(env)
>>>>>       .assignAscendingTimestamps(_._2)
>>>>>     tableEnv.registerDataStream("mytable", stream, 'A, 'B, 'C)
>>>>>
>>>>>     val results = tableEnv.sqlQuery( """
>>>>>                                        |SELECT
>>>>>                                        | A,C
>>>>>                                        | FROM mytable
>>>>>                                      """.stripMargin)
>>>>>
>>>>>     val result: Table = results
>>>>>
>>>>>     val path = "file:///tmp/test/1.txt"
>>>>>     val sink :TableSink[Row]=   new CsvTableSink(
>>>>>       path,                             // output path
>>>>>       fieldDelim = "|",                 // optional: delimit files by
>>>>> '|'
>>>>>       numFiles = 1,                     // optional: write to a single
>>>>> file
>>>>>       writeMode = WriteMode.NO_OVERWRITE)
>>>>>
>>>>>     result.writeToSink(sink)
>>>>>
>>>>>     env.execute("this job")
>>>>>   }
>>>>>
>>>>>   def get3TupleDataStream(env: StreamExecutionEnvironment):
>>>>> DataStream[(Int, Long, String)] = {
>>>>>     val data = new mutable.MutableList[(Int, Long, String)]
>>>>>     data.+=((1, 1L, "Hi"))
>>>>>     data.+=((2, 2L, "Hello"))
>>>>>     data.+=((3, 2L, "Hello world"))
>>>>>     data.+=((4, 3L, "Hello world, how are you?"))
>>>>>     data.+=((5, 3L, "I am fine."))
>>>>>     data.+=((6, 3L, "Luke Skywalker"))
>>>>>     env.fromCollection(data)
>>>>>   }
>>>>>
>>>>>
>>>>> Nico
>>>>>
>>>>> On 16/03/18 22:50, karim amer wrote:
>>>>> > Hi There,
>>>>> >
>>>>> >  I am trying to write a CSVsink to disk but it's not getting
>>>>> written. I
>>>>> > think the file is getting overwritten or truncated once The Stream
>>>>> > process finishes. Does anyone know why the file is getting
>>>>> overwritten
>>>>> > or truncated and how can i fix this ?
>>>>> >
>>>>> >
>>>>> > tableEnv.registerDataStream("table", watermarkedStream)
>>>>> >
>>>>> > val results = tableEnv.sqlQuery( """
>>>>> > |SELECT
>>>>> > | A
>>>>> > | FROM table
>>>>> > """.stripMargin)
>>>>> >
>>>>> >
>>>>> >
>>>>> > val result: Table = results
>>>>> >
>>>>> > val path = "file:///path/test/1.txt"
>>>>> > val sink :TableSink[Row]=   new CsvTableSink(
>>>>> >   path, // output path
>>>>> > fieldDelim = "|", // optional: delimit files by '|'
>>>>> > numFiles = 1, // optional: write to a single file
>>>>> > writeMode = WriteMode.NO_OVERWRITE)
>>>>> >
>>>>> > result.writeToSink(sink)
>>>>> >
>>>>> > env.execute("this job")
>>>>> >
>>>>> >
>>>>> >
>>>>> >
>>>>> > Thanks
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> karim amer
>>>>
>>>>
>>>
>>
>>
>> --
>> karim amer
>>
>>
>
>
> --
> karim amer
>
>


-- 
karim amer

Reply via email to