After switching to Maven from Sbt I got these errors
Error:(63, 37) could not find implicit value for evidence parameter of type
org.apache.flink.api.common.typeinfo.TypeInformation[org.apache.flink.quickstart.DataStreamtotableapi.Calls]
    val namedStream = dataStream.map((value:String) => {


Error:(63, 37) not enough arguments for method map: (implicit evidence$7:
org.apache.flink.api.common.typeinfo.TypeInformation[org.apache.flink.quickstart.DataStreamtotableapi.Calls])org.apache.flink.streaming.api.scala.DataStream[org.apache.flink.quickstart.DataStreamtotableapi.Calls].
Unspecified value parameter evidence$7.
    val namedStream = dataStream.map((value:String) => {


Should i file a bug report  ?

On Tue, Mar 20, 2018 at 9:30 AM, karim amer <karim.amer...@gmail.com> wrote:

> Hi Fabian
> Sorry if i confused you The first error is from Nico's code Not my code
> or  snippet
> I am still having the original problem in my snippet where it's writing a
> blank csv file even though i get
> [success] Total time: 26 s, completed Mar 20, 2018 9:28:06 AM
> After running the job
>
> Cheers,
> karim
>
> On Tue, Mar 20, 2018 at 6:25 AM, Fabian Hueske <fhue...@gmail.com> wrote:
>
>> Hi Karim,
>>
>> I cannot find a method invocation "tableEnv.registerDataStream("myTable2",
>> set, 'A, 'B, 'C )" as shown in the error message in your example.
>> It would help if you would keep error message and code consistent.
>> Otherwise it's not possible to figure out what's going on.
>>
>> Best, Fabian
>>
>> 2018-03-20 0:24 GMT+01:00 karim amer <karim.amer...@gmail.com>:
>>
>>> Hi Nico,
>>>
>>> I tried to reproduce your code but registerDataStream keeps failing to
>>> register the fields even though i am following your code and the Docs.
>>> here is the error
>>> [error]  found   : Symbol
>>> [error]  required: org.apache.flink.table.expressions.Expression
>>> [error]     tableEnv.registerDataStream("myTable2", set, 'A, 'B, 'C )
>>> [error]
>>> I think my code snippet was misleading. Here is the full snippet
>>> Changing the name from table didn't fix it for
>>>
>>> import org.apache.flink.streaming.api.scala._
>>> import org.apache.flink.api.java.utils.ParameterTool
>>> import org.apache.flink.core.fs.FileSystem.WriteMode
>>> import org.apache.flink.streaming.api.{CheckpointingMode, 
>>> TimeCharacteristic}
>>> import 
>>> org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
>>> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
>>> import org.apache.flink.streaming.api.windowing.time.Time
>>> import org.apache.flink.table.api.{Table, TableEnvironment}
>>> import org.apache.flink.table.sinks.{CsvTableSink, TableSink}
>>> import org.apache.flink.types.Row
>>>
>>>
>>>
>>> object datastreamtotableapi {
>>>
>>>   case class Calls(a: String,
>>>                    b: String,
>>>                    c: String,
>>>                    d: String,
>>>                    e: String,
>>>                    f: String,
>>>                    g: String,
>>>                    h: String,
>>>                    i: String,
>>>                    j: String,
>>>                    k: String,
>>>                    l: String,
>>>                    m: String,
>>>                    n: String,
>>>                    p: String,
>>>                    q: String,
>>>                    r: String,
>>>                    s: String,
>>>                    t: String,
>>>                    v: String,
>>>                    w: String)
>>>
>>>
>>>   def main(args: Array[String]) {
>>>
>>>     val params = ParameterTool.fromArgs(args)
>>>     val input = params.getRequired("input")
>>>
>>>
>>>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>>     env.setParallelism(1)
>>>     val tableEnv = TableEnvironment.getTableEnvironment(env)
>>>
>>>     val dataStream = env.readTextFile(input)
>>>
>>>     val namedStream = dataStream.map((value:String) => {
>>>
>>>       val columns = value.split(",")
>>>       Calls(columns(0), columns(1),columns(2),columns(3), 
>>> columns(4),columns(5),
>>>         columns(6), columns(7),columns(8),columns(9), columns(10), 
>>> columns(11),
>>>         columns(12), columns(13),columns(14),columns(15), columns(16), 
>>> columns(17),
>>>         columns(18),columns(19), columns(20)
>>>       )
>>>     })
>>>
>>>
>>>    val cleanedStream =  namedStream.filter(_.j == " ").filter(_.k==" ")
>>>
>>>    val watermarkedStream = cleanedStream.assignTimestampsAndWatermarks(new 
>>> BoundedOutOfOrdernessTimestampExtractor[Calls](Time.seconds(100)) {
>>>      override def extractTimestamp(element: Calls): Long = 
>>> (element.j.concat(element.k)).toLong
>>>    })
>>>
>>>
>>>
>>>     tableEnv.registerDataStream("CDRS", watermarkedStream)
>>>     val results = tableEnv.sqlQuery( """
>>>                                       |SELECT
>>>                                       | a
>>>                                       | FROM CDRS
>>>                                        """.stripMargin)
>>>
>>>
>>>     val result: Table = results
>>>
>>>     val path = "file:///Users/test/1.txt"
>>>     val sink :TableSink[Row]=   new CsvTableSink(
>>>       path,                             // output path
>>>       fieldDelim = "|",                 // optional: delimit files by '|'
>>>       numFiles = 1,                     // optional: write to a single file
>>>       writeMode = WriteMode.OVERWRITE)
>>>
>>>     result.writeToSink(sink)
>>>
>>>
>>>     env.execute("this job")
>>>
>>>   }
>>> }
>>>
>>>
>>>
>>>
>>> On Mon, Mar 19, 2018 at 9:13 AM, Nico Kruber <n...@data-artisans.com>
>>> wrote:
>>>
>>>> Hi Karim,
>>>> when I was trying to reproduce your code, I got an exception with the
>>>> name 'table' being used - by replacing it and completing the job with
>>>> some input, I did see the csv file popping up. Also, the job was
>>>> crashing when the file 1.txt already existed.
>>>>
>>>> The code I used (running Flink 1.5-SNAPSHOT):
>>>>
>>>>   def main(args: Array[String]) {
>>>>     // set up the streaming execution environment
>>>>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>>     val tableEnv = TableEnvironment.getTableEnvironment(env)
>>>>
>>>>     val stream: DataStream[(Int, Long, String)] =
>>>> get3TupleDataStream(env)
>>>>       .assignAscendingTimestamps(_._2)
>>>>     tableEnv.registerDataStream("mytable", stream, 'A, 'B, 'C)
>>>>
>>>>     val results = tableEnv.sqlQuery( """
>>>>                                        |SELECT
>>>>                                        | A,C
>>>>                                        | FROM mytable
>>>>                                      """.stripMargin)
>>>>
>>>>     val result: Table = results
>>>>
>>>>     val path = "file:///tmp/test/1.txt"
>>>>     val sink :TableSink[Row]=   new CsvTableSink(
>>>>       path,                             // output path
>>>>       fieldDelim = "|",                 // optional: delimit files by
>>>> '|'
>>>>       numFiles = 1,                     // optional: write to a single
>>>> file
>>>>       writeMode = WriteMode.NO_OVERWRITE)
>>>>
>>>>     result.writeToSink(sink)
>>>>
>>>>     env.execute("this job")
>>>>   }
>>>>
>>>>   def get3TupleDataStream(env: StreamExecutionEnvironment):
>>>> DataStream[(Int, Long, String)] = {
>>>>     val data = new mutable.MutableList[(Int, Long, String)]
>>>>     data.+=((1, 1L, "Hi"))
>>>>     data.+=((2, 2L, "Hello"))
>>>>     data.+=((3, 2L, "Hello world"))
>>>>     data.+=((4, 3L, "Hello world, how are you?"))
>>>>     data.+=((5, 3L, "I am fine."))
>>>>     data.+=((6, 3L, "Luke Skywalker"))
>>>>     env.fromCollection(data)
>>>>   }
>>>>
>>>>
>>>> Nico
>>>>
>>>> On 16/03/18 22:50, karim amer wrote:
>>>> > Hi There,
>>>> >
>>>> >  I am trying to write a CSVsink to disk but it's not getting written.
>>>> I
>>>> > think the file is getting overwritten or truncated once The Stream
>>>> > process finishes. Does anyone know why the file is getting overwritten
>>>> > or truncated and how can i fix this ?
>>>> >
>>>> >
>>>> > tableEnv.registerDataStream("table", watermarkedStream)
>>>> >
>>>> > val results = tableEnv.sqlQuery( """
>>>> > |SELECT
>>>> > | A
>>>> > | FROM table
>>>> > """.stripMargin)
>>>> >
>>>> >
>>>> >
>>>> > val result: Table = results
>>>> >
>>>> > val path = "file:///path/test/1.txt"
>>>> > val sink :TableSink[Row]=   new CsvTableSink(
>>>> >   path, // output path
>>>> > fieldDelim = "|", // optional: delimit files by '|'
>>>> > numFiles = 1, // optional: write to a single file
>>>> > writeMode = WriteMode.NO_OVERWRITE)
>>>> >
>>>> > result.writeToSink(sink)
>>>> >
>>>> > env.execute("this job")
>>>> >
>>>> >
>>>> >
>>>> >
>>>> > Thanks
>>>>
>>>>
>>>
>>>
>>> --
>>> karim amer
>>>
>>>
>>
>
>
> --
> karim amer
>
>


-- 
karim amer

Reply via email to