Never mind after importing

import org.apache.flink.api.scala._

theses errors went away and i still have the original problem.
Sorry my bad

On Tue, Mar 20, 2018 at 11:04 AM, karim amer <karim.amer...@gmail.com>
wrote:

> To clarify should i file a bug report on sbt hiding the errors in the
> previous email ?
>
> On Tue, Mar 20, 2018 at 9:44 AM, karim amer <karim.amer...@gmail.com>
> wrote:
>
>> After switching to Maven from Sbt I got these errors
>> Error:(63, 37) could not find implicit value for evidence parameter of
>> type org.apache.flink.api.common.typeinfo.TypeInformation[org.apa
>> che.flink.quickstart.DataStreamtotableapi.Calls]
>>     val namedStream = dataStream.map((value:String) => {
>>
>>
>> Error:(63, 37) not enough arguments for method map: (implicit evidence$7:
>> org.apache.flink.api.common.typeinfo.TypeInformation[org.apa
>> che.flink.quickstart.DataStreamtotableapi.Calls])org.apache.
>> flink.streaming.api.scala.DataStream[org.apache.flink.
>> quickstart.DataStreamtotableapi.Calls].
>> Unspecified value parameter evidence$7.
>>     val namedStream = dataStream.map((value:String) => {
>>
>>
>> Should i file a bug report  ?
>>
>> On Tue, Mar 20, 2018 at 9:30 AM, karim amer <karim.amer...@gmail.com>
>> wrote:
>>
>>> Hi Fabian
>>> Sorry if i confused you The first error is from Nico's code Not my code
>>> or  snippet
>>> I am still having the original problem in my snippet where it's writing
>>> a blank csv file even though i get
>>> [success] Total time: 26 s, completed Mar 20, 2018 9:28:06 AM
>>> After running the job
>>>
>>> Cheers,
>>> karim
>>>
>>> On Tue, Mar 20, 2018 at 6:25 AM, Fabian Hueske <fhue...@gmail.com>
>>> wrote:
>>>
>>>> Hi Karim,
>>>>
>>>> I cannot find a method invocation "tableEnv.registerDataStream("myTable2",
>>>> set, 'A, 'B, 'C )" as shown in the error message in your example.
>>>> It would help if you would keep error message and code consistent.
>>>> Otherwise it's not possible to figure out what's going on.
>>>>
>>>> Best, Fabian
>>>>
>>>> 2018-03-20 0:24 GMT+01:00 karim amer <karim.amer...@gmail.com>:
>>>>
>>>>> Hi Nico,
>>>>>
>>>>> I tried to reproduce your code but registerDataStream keeps failing
>>>>> to register the fields even though i am following your code and the Docs.
>>>>> here is the error
>>>>> [error]  found   : Symbol
>>>>> [error]  required: org.apache.flink.table.expressions.Expression
>>>>> [error]     tableEnv.registerDataStream("myTable2", set, 'A, 'B, 'C )
>>>>> [error]
>>>>> I think my code snippet was misleading. Here is the full snippet
>>>>> Changing the name from table didn't fix it for
>>>>>
>>>>> import org.apache.flink.streaming.api.scala._
>>>>> import org.apache.flink.api.java.utils.ParameterTool
>>>>> import org.apache.flink.core.fs.FileSystem.WriteMode
>>>>> import org.apache.flink.streaming.api.{CheckpointingMode, 
>>>>> TimeCharacteristic}
>>>>> import 
>>>>> org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
>>>>> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
>>>>> import org.apache.flink.streaming.api.windowing.time.Time
>>>>> import org.apache.flink.table.api.{Table, TableEnvironment}
>>>>> import org.apache.flink.table.sinks.{CsvTableSink, TableSink}
>>>>> import org.apache.flink.types.Row
>>>>>
>>>>>
>>>>>
>>>>> object datastreamtotableapi {
>>>>>
>>>>>   case class Calls(a: String,
>>>>>                    b: String,
>>>>>                    c: String,
>>>>>                    d: String,
>>>>>                    e: String,
>>>>>                    f: String,
>>>>>                    g: String,
>>>>>                    h: String,
>>>>>                    i: String,
>>>>>                    j: String,
>>>>>                    k: String,
>>>>>                    l: String,
>>>>>                    m: String,
>>>>>                    n: String,
>>>>>                    p: String,
>>>>>                    q: String,
>>>>>                    r: String,
>>>>>                    s: String,
>>>>>                    t: String,
>>>>>                    v: String,
>>>>>                    w: String)
>>>>>
>>>>>
>>>>>   def main(args: Array[String]) {
>>>>>
>>>>>     val params = ParameterTool.fromArgs(args)
>>>>>     val input = params.getRequired("input")
>>>>>
>>>>>
>>>>>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>>>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>>>>     env.setParallelism(1)
>>>>>     val tableEnv = TableEnvironment.getTableEnvironment(env)
>>>>>
>>>>>     val dataStream = env.readTextFile(input)
>>>>>
>>>>>     val namedStream = dataStream.map((value:String) => {
>>>>>
>>>>>       val columns = value.split(",")
>>>>>       Calls(columns(0), columns(1),columns(2),columns(3), 
>>>>> columns(4),columns(5),
>>>>>         columns(6), columns(7),columns(8),columns(9), columns(10), 
>>>>> columns(11),
>>>>>         columns(12), columns(13),columns(14),columns(15), columns(16), 
>>>>> columns(17),
>>>>>         columns(18),columns(19), columns(20)
>>>>>       )
>>>>>     })
>>>>>
>>>>>
>>>>>    val cleanedStream =  namedStream.filter(_.j == " ").filter(_.k==" ")
>>>>>
>>>>>    val watermarkedStream = 
>>>>> cleanedStream.assignTimestampsAndWatermarks(new 
>>>>> BoundedOutOfOrdernessTimestampExtractor[Calls](Time.seconds(100)) {
>>>>>      override def extractTimestamp(element: Calls): Long = 
>>>>> (element.j.concat(element.k)).toLong
>>>>>    })
>>>>>
>>>>>
>>>>>
>>>>>     tableEnv.registerDataStream("CDRS", watermarkedStream)
>>>>>     val results = tableEnv.sqlQuery( """
>>>>>                                       |SELECT
>>>>>                                       | a
>>>>>                                       | FROM CDRS
>>>>>                                        """.stripMargin)
>>>>>
>>>>>
>>>>>     val result: Table = results
>>>>>
>>>>>     val path = "file:///Users/test/1.txt"
>>>>>     val sink :TableSink[Row]=   new CsvTableSink(
>>>>>       path,                             // output path
>>>>>       fieldDelim = "|",                 // optional: delimit files by '|'
>>>>>       numFiles = 1,                     // optional: write to a single 
>>>>> file
>>>>>       writeMode = WriteMode.OVERWRITE)
>>>>>
>>>>>     result.writeToSink(sink)
>>>>>
>>>>>
>>>>>     env.execute("this job")
>>>>>
>>>>>   }
>>>>> }
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Mon, Mar 19, 2018 at 9:13 AM, Nico Kruber <n...@data-artisans.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Karim,
>>>>>> when I was trying to reproduce your code, I got an exception with the
>>>>>> name 'table' being used - by replacing it and completing the job with
>>>>>> some input, I did see the csv file popping up. Also, the job was
>>>>>> crashing when the file 1.txt already existed.
>>>>>>
>>>>>> The code I used (running Flink 1.5-SNAPSHOT):
>>>>>>
>>>>>>   def main(args: Array[String]) {
>>>>>>     // set up the streaming execution environment
>>>>>>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>>>>     val tableEnv = TableEnvironment.getTableEnvironment(env)
>>>>>>
>>>>>>     val stream: DataStream[(Int, Long, String)] =
>>>>>> get3TupleDataStream(env)
>>>>>>       .assignAscendingTimestamps(_._2)
>>>>>>     tableEnv.registerDataStream("mytable", stream, 'A, 'B, 'C)
>>>>>>
>>>>>>     val results = tableEnv.sqlQuery( """
>>>>>>                                        |SELECT
>>>>>>                                        | A,C
>>>>>>                                        | FROM mytable
>>>>>>                                      """.stripMargin)
>>>>>>
>>>>>>     val result: Table = results
>>>>>>
>>>>>>     val path = "file:///tmp/test/1.txt"
>>>>>>     val sink :TableSink[Row]=   new CsvTableSink(
>>>>>>       path,                             // output path
>>>>>>       fieldDelim = "|",                 // optional: delimit files by
>>>>>> '|'
>>>>>>       numFiles = 1,                     // optional: write to a
>>>>>> single file
>>>>>>       writeMode = WriteMode.NO_OVERWRITE)
>>>>>>
>>>>>>     result.writeToSink(sink)
>>>>>>
>>>>>>     env.execute("this job")
>>>>>>   }
>>>>>>
>>>>>>   def get3TupleDataStream(env: StreamExecutionEnvironment):
>>>>>> DataStream[(Int, Long, String)] = {
>>>>>>     val data = new mutable.MutableList[(Int, Long, String)]
>>>>>>     data.+=((1, 1L, "Hi"))
>>>>>>     data.+=((2, 2L, "Hello"))
>>>>>>     data.+=((3, 2L, "Hello world"))
>>>>>>     data.+=((4, 3L, "Hello world, how are you?"))
>>>>>>     data.+=((5, 3L, "I am fine."))
>>>>>>     data.+=((6, 3L, "Luke Skywalker"))
>>>>>>     env.fromCollection(data)
>>>>>>   }
>>>>>>
>>>>>>
>>>>>> Nico
>>>>>>
>>>>>> On 16/03/18 22:50, karim amer wrote:
>>>>>> > Hi There,
>>>>>> >
>>>>>> >  I am trying to write a CSVsink to disk but it's not getting
>>>>>> written. I
>>>>>> > think the file is getting overwritten or truncated once The Stream
>>>>>> > process finishes. Does anyone know why the file is getting
>>>>>> overwritten
>>>>>> > or truncated and how can i fix this ?
>>>>>> >
>>>>>> >
>>>>>> > tableEnv.registerDataStream("table", watermarkedStream)
>>>>>> >
>>>>>> > val results = tableEnv.sqlQuery( """
>>>>>> > |SELECT
>>>>>> > | A
>>>>>> > | FROM table
>>>>>> > """.stripMargin)
>>>>>> >
>>>>>> >
>>>>>> >
>>>>>> > val result: Table = results
>>>>>> >
>>>>>> > val path = "file:///path/test/1.txt"
>>>>>> > val sink :TableSink[Row]=   new CsvTableSink(
>>>>>> >   path, // output path
>>>>>> > fieldDelim = "|", // optional: delimit files by '|'
>>>>>> > numFiles = 1, // optional: write to a single file
>>>>>> > writeMode = WriteMode.NO_OVERWRITE)
>>>>>> >
>>>>>> > result.writeToSink(sink)
>>>>>> >
>>>>>> > env.execute("this job")
>>>>>> >
>>>>>> >
>>>>>> >
>>>>>> >
>>>>>> > Thanks
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> karim amer
>>>>>
>>>>>
>>>>
>>>
>>>
>>> --
>>> karim amer
>>>
>>>
>>
>>
>> --
>> karim amer
>>
>>
>
>
> --
> karim amer
>
>


-- 
karim amer

Reply via email to