Hi Mich,

It seems that the type of your DataStream stream is always wrong.
If you want to specify four fields, usually the DataStream type should be
similar: DataStream[(Type1, Type2, Type3, Type4)], not DataStream[String],
you can try it.

Thanks, vino

2018-08-02 6:44 GMT+08:00 Mich Talebzadeh <mich.talebza...@gmail.com>:

> Changed as suggested
>
>    val  streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment
>      val dataStream =  streamExecEnv
>        .addSource(new FlinkKafkaConsumer011[String](topicsValue, new
> SimpleStringSchema(), properties))
>   val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv)
>   tableEnv.registerDataStream("table1", streamExecEnv, 'key, 'ticker,
> 'timeissued, 'price)
>
> Still the same error
>
> [info] Compiling 1 Scala source to /home/hduser/dba/bin/flink/md_
> streaming/target/scala-2.11/classes...
> [error] /home/hduser/dba/bin/flink/md_streaming/src/main/scala/
> myPackage/md_streaming.scala:139: overloaded method value
> registerDataStream with alternatives:
> [error]   [T](name: String, dataStream: 
> org.apache.flink.streaming.api.datastream.DataStream[T],
> fields: String)Unit <and>
> [error]   [T](name: String, dataStream: org.apache.flink.streaming.
> api.datastream.DataStream[T])Unit
> [error]  cannot be applied to (String, org.apache.flink.streaming.
> api.environment.StreamExecutionEnvironment, Symbol, Symbol, Symbol,
> Symbol)
> [error]   tableEnv.registerDataStream("table1", streamExecEnv, 'key,
> 'ticker, 'timeissued, 'price)
> [error]            ^
> [error] one error found
> [error] (compile:compileIncremental) Compilation failed
> [error] Total time: 3 s, completed Aug 1, 2018 11:40:47 PM
>
> Thanks anyway.
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Wed, 1 Aug 2018 at 23:34, Fabian Hueske <fhue...@gmail.com> wrote:
>
>> Hi,
>>
>> You have to pass the StreamExecutionEnvironment to the
>> getTableEnvironment() method, not the DataStream (or DataStreamSource).
>> Change
>>
>> val tableEnv = TableEnvironment.getTableEnvironment(dataStream)
>>
>> to
>>
>> val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv)
>>
>> Best,
>> Fabian
>>
>> 2018-08-02 0:10 GMT+02:00 Mich Talebzadeh <mich.talebza...@gmail.com>:
>>
>>> Hi,
>>>
>>> FYI, these are my imports
>>>
>>> import java.util.Properties
>>> import java.util.Arrays
>>> import org.apache.flink.api.common.functions.MapFunction
>>> import org.apache.flink.streaming.api.environment.
>>> StreamExecutionEnvironment
>>> import org.apache.flink.streaming.api.scala
>>> import org.apache.flink.streaming.util.serialization.SimpleStringSchema
>>> import org.apache.flink.table.api.TableEnvironment
>>> import org.apache.flink.table.api.scala._
>>> import org.apache.flink.api.scala._
>>> import org.apache.kafka.clients.consumer.ConsumerConfig
>>> import org.apache.kafka.clients.consumer.ConsumerRecord
>>> import org.apache.kafka.clients.consumer.KafkaConsumer
>>> import org.apache.flink.core.fs.FileSystem
>>> import org.apache.flink.streaming.api.TimeCharacteristic
>>> import org.slf4j.LoggerFactory
>>> import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer011,
>>> FlinkKafkaProducer011}
>>> import java.util.Calendar
>>> import java.util.Date
>>> import java.text.DateFormat
>>> import java.text.SimpleDateFormat
>>> import org.apache.log4j.Logger
>>> import org.apache.log4j.Level
>>> import sys.process.stringSeqToProcess
>>> import java.io.File
>>>
>>> And this is the simple code
>>>
>>>     val properties = new Properties()
>>>     properties.setProperty("bootstrap.servers", bootstrapServers)
>>>     properties.setProperty("zookeeper.connect", zookeeperConnect)
>>>     properties.setProperty("group.id", flinkAppName)
>>>     properties.setProperty("auto.offset.reset", "latest")
>>>     val  streamExecEnv = StreamExecutionEnvironment.
>>> getExecutionEnvironment
>>>     val dataStream =  streamExecEnv
>>>        .addSource(new FlinkKafkaConsumer011[String](topicsValue, new
>>> SimpleStringSchema(), properties))
>>>   val tableEnv = TableEnvironment.getTableEnvironment(dataStream)
>>>   tableEnv.registerDataStream("table1", dataStream, 'key, 'ticker,
>>> 'timeissued, 'price)
>>>
>>> And this is the compilation error
>>>
>>> info] Compiling 1 Scala source to /home/hduser/dba/bin/flink/md_
>>> streaming/target/scala-2.11/classes...
>>> [error] /home/hduser/dba/bin/flink/md_streaming/src/main/scala/
>>> myPackage/md_streaming.scala:138: overloaded method value
>>> getTableEnvironment with alternatives:
>>> [error]   (executionEnvironment: org.apache.flink.streaming.api.scala.
>>> StreamExecutionEnvironment)org.apache.flink.table.api.scala.StreamTableEnvironment
>>> <and>
>>> [error]   (executionEnvironment: org.apache.flink.streaming.
>>> api.environment.StreamExecutionEnvironment)org.apache.flink.table.api.java.StreamTableEnvironment
>>> <and>
>>> [error]   (executionEnvironment: org.apache.flink.api.scala.
>>> ExecutionEnvironment)org.apache.flink.table.api.scala.BatchTableEnvironment
>>> <and>
>>> [error]   (executionEnvironment: org.apache.flink.api.java.
>>> ExecutionEnvironment)org.apache.flink.table.api.java.
>>> BatchTableEnvironment
>>> [error]  cannot be applied to (org.apache.flink.streaming.
>>> api.datastream.DataStreamSource[String])
>>> [error]   val tableEnv = TableEnvironment.getTableEnvironment(
>>> dataStream)
>>> [error]                                   ^
>>> [error] one error found
>>> [error] (compile:compileIncremental) Compilation failed
>>> [error] Total time: 3 s, completed Aug 1, 2018 11:02:33 PM
>>> Completed compiling
>>>
>>> which is really strange
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Wed, 1 Aug 2018 at 13:42, Fabian Hueske <fhue...@gmail.com> wrote:
>>>
>>>> Hi I think you are mixing Java and Scala dependencies.
>>>>
>>>> org.apache.flink.streaming.api.datastream.DataStream is the DataStream
>>>> of the Java DataStream API.
>>>> You should use the DataStream of the Scala DataStream API.
>>>>
>>>> Best, Fabian
>>>>
>>>> 2018-08-01 14:01 GMT+02:00 Mich Talebzadeh <mich.talebza...@gmail.com>:
>>>>
>>>>> Hi,
>>>>>
>>>>> I believed I tried Hequn's suggestion and tried again
>>>>>
>>>>> import org.apache.flink.table.api.Table
>>>>> import org.apache.flink.table.api.TableEnvironment
>>>>>
>>>>> *import org.apache.flink.table.api.scala._*
>>>>> Unfortunately I am still getting the same error!
>>>>>
>>>>> [info] Compiling 1 Scala source to /home/hduser/dba/bin/flink/md_
>>>>> streaming/target/scala-2.11/classes...
>>>>> [error] /home/hduser/dba/bin/flink/md_streaming/src/main/scala/
>>>>> myPackage/md_streaming.scala:151: overloaded method value
>>>>> fromDataStream with alternatives:
>>>>> [error]   [T](dataStream: 
>>>>> org.apache.flink.streaming.api.datastream.DataStream[T],
>>>>> fields: String)org.apache.flink.table.api.Table <and>
>>>>> [error]   [T](dataStream: org.apache.flink.streaming.
>>>>> api.datastream.DataStream[T])org.apache.flink.table.api.Table
>>>>> [error]  cannot be applied to (org.apache.flink.streaming.
>>>>> api.datastream.DataStreamSource[String], Symbol, Symbol, Symbol,
>>>>> Symbol)
>>>>> [error]   val table1: Table = tableEnv.fromDataStream(dataStream,
>>>>> 'key, 'ticker, 'timeissued, 'price)
>>>>> [error]                                ^
>>>>> [error] one error found
>>>>> [error] (compile:compileIncremental) Compilation failed
>>>>> [error] Total time: 3 s, completed Aug 1, 2018 12:59:44 PM
>>>>> Completed compiling
>>>>>
>>>>>
>>>>> Dr Mich Talebzadeh
>>>>>
>>>>>
>>>>>
>>>>> LinkedIn * 
>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>
>>>>>
>>>>>
>>>>> http://talebzadehmich.wordpress.com
>>>>>
>>>>>
>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>>> any loss, damage or destruction of data or any other property which may
>>>>> arise from relying on this email's technical content is explicitly
>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>> arising from such loss, damage or destruction.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Wed, 1 Aug 2018 at 10:03, Timo Walther <twal...@apache.org> wrote:
>>>>>
>>>>>> If these two imports are the only imports that you added, then you
>>>>>> did not follow Hequn's advice or the link that I sent you.
>>>>>>
>>>>>> You need to add the underscore imports to let Scala do its magic.
>>>>>>
>>>>>> Timo
>>>>>>
>>>>>>
>>>>>> Am 01.08.18 um 10:28 schrieb Mich Talebzadeh:
>>>>>>
>>>>>> Hi Timo,
>>>>>>
>>>>>> These are my two flink table related imports
>>>>>>
>>>>>> import org.apache.flink.table.api.Table
>>>>>> import org.apache.flink.table.api.TableEnvironment
>>>>>>
>>>>>> And these are my dependencies building with SBT
>>>>>>
>>>>>> libraryDependencies += "org.apache.hadoop" % "hadoop-core" % "1.2.1"
>>>>>> libraryDependencies += "org.apache.hbase" % "hbase" % "1.2.6"
>>>>>> libraryDependencies += "org.apache.hbase" % "hbase-client" % "1.2.6"
>>>>>> libraryDependencies += "org.apache.hbase" % "hbase-common" % "1.2.6"
>>>>>> libraryDependencies += "org.apache.hbase" % "hbase-server" % "1.2.6"
>>>>>> libraryDependencies += "org.apache.flink" %%
>>>>>> "flink-connector-kafka-0.11" % "1.5.0"
>>>>>> libraryDependencies += "org.apache.flink" %%
>>>>>> "flink-connector-kafka-base" % "1.5.0"
>>>>>> libraryDependencies += "org.apache.flink" %% "flink-scala" % "1.5.0"
>>>>>> libraryDependencies += "org.apache.kafka" % "kafka-clients" %
>>>>>> "0.11.0.0"
>>>>>> libraryDependencies += "org.apache.flink" %% "flink-streaming-java" %
>>>>>> "1.5.0" % "provided"
>>>>>>
>>>>>> *libraryDependencies += "org.apache.flink" %% "flink-table" % "1.5.0"
>>>>>> % "provided" *libraryDependencies += "org.apache.kafka" %% "kafka" %
>>>>>> "0.11.0.0"
>>>>>>
>>>>>> There appears to be conflict somewhere that cause this error
>>>>>>
>>>>>> [info] Compiling 1 Scala source to /home/hduser/dba/bin/flink/md_
>>>>>> streaming/target/scala-2.11/classes...
>>>>>> [error] /home/hduser/dba/bin/flink/md_streaming/src/main/scala/
>>>>>> myPackage/md_streaming.scala:152: overloaded method value
>>>>>> fromDataStream with alternatives:
>>>>>> [error]   [T](dataStream: 
>>>>>> org.apache.flink.streaming.api.datastream.DataStream[T],
>>>>>> fields: String)org.apache.flink.table.api.Table <and>
>>>>>> [error]   [T](dataStream: org.apache.flink.streaming.
>>>>>> api.datastream.DataStream[T])org.apache.flink.table.api.Table
>>>>>> [error]  cannot be applied to (org.apache.flink.streaming.
>>>>>> api.datastream.DataStreamSource[String], Symbol, Symbol, Symbol,
>>>>>> Symbol)
>>>>>> [error]   val table1: Table = tableEnv.fromDataStream(dataStream,
>>>>>> 'key, 'ticker, 'timeissued, 'price)
>>>>>> [error]                                ^
>>>>>> [error] one error found
>>>>>> [error] (compile:compileIncremental) Compilation failed
>>>>>>
>>>>>> Thanks
>>>>>>
>>>>>>
>>>>>> Dr Mich Talebzadeh
>>>>>>
>>>>>>
>>>>>>
>>>>>> LinkedIn * 
>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>
>>>>>>
>>>>>>
>>>>>> http://talebzadehmich.wordpress.com
>>>>>>
>>>>>>
>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>> for any loss, damage or destruction of data or any other property which 
>>>>>> may
>>>>>> arise from relying on this email's technical content is explicitly
>>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>>> arising from such loss, damage or destruction.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, 1 Aug 2018 at 09:17, Timo Walther <twal...@apache.org> wrote:
>>>>>>
>>>>>>> Hi Mich,
>>>>>>>
>>>>>>> I would check you imports again [1]. This is a pure compiler issue
>>>>>>> that is unrelated to your actual data stream. Also check your project
>>>>>>> dependencies.
>>>>>>>
>>>>>>> Regards,
>>>>>>> Timo
>>>>>>>
>>>>>>> [1] https://ci.apache.org/projects/flink/flink-docs-
>>>>>>> master/dev/table/common.html#implicit-conversion-for-scala
>>>>>>>
>>>>>>> Am 01.08.18 um 09:30 schrieb Mich Talebzadeh:
>>>>>>>
>>>>>>>
>>>>>>> Hi both,
>>>>>>>
>>>>>>> I added the import as Hequn suggested.
>>>>>>>
>>>>>>> My stream is very simple and consists of 4 values separated by ","
>>>>>>> as below
>>>>>>>
>>>>>>> 05521df6-4ccf-4b2f-b874-eb27d461b305,IBM,2018-07-30T19:51:50,190.48
>>>>>>>
>>>>>>> So this is what I have been trying to do
>>>>>>>
>>>>>>> Code
>>>>>>>
>>>>>>>     val dataStream =  streamExecEnv
>>>>>>>        .addSource(new FlinkKafkaConsumer011[String](topicsValue,
>>>>>>> new SimpleStringSchema(), properties))
>>>>>>>  //
>>>>>>>  //
>>>>>>>   val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv)
>>>>>>>   val table1: Table = tableEnv.fromDataStream(dataStream, 'key,
>>>>>>> 'ticker, 'timeissued, 'price)
>>>>>>>
>>>>>>> note those four columns in Table1 definition
>>>>>>>
>>>>>>> And this is the error being thrown
>>>>>>>
>>>>>>> [info] Compiling 1 Scala source to /home/hduser/dba/bin/flink/md_
>>>>>>> streaming/target/scala-2.11/classes...
>>>>>>> [error] /home/hduser/dba/bin/flink/md_streaming/src/main/scala/
>>>>>>> myPackage/md_streaming.scala:152: overloaded method value
>>>>>>> fromDataStream with alternatives:
>>>>>>> [error]   [T](dataStream: 
>>>>>>> org.apache.flink.streaming.api.datastream.DataStream[T],
>>>>>>> fields: String)org.apache.flink.table.api.Table <and>
>>>>>>> [error]   [T](dataStream: org.apache.flink.streaming.
>>>>>>> api.datastream.DataStream[T])org.apache.flink.table.api.Table
>>>>>>> [error]  cannot be applied to (org.apache.flink.streaming.
>>>>>>> api.datastream.DataStreamSource[String], Symbol, Symbol, Symbol,
>>>>>>> Symbol)
>>>>>>> [error]   val table1: Table = tableEnv.fromDataStream(dataStream,
>>>>>>> 'key, 'ticker, 'timeissued, 'price)
>>>>>>> [error]                                ^
>>>>>>> [error] one error found
>>>>>>> [error] (compile:compileIncremental) Compilation failed
>>>>>>>
>>>>>>> I suspect dataStream may not be compatible with this operation?
>>>>>>>
>>>>>>> Regards,
>>>>>>>
>>>>>>> Dr Mich Talebzadeh
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> LinkedIn * 
>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>
>>>>>>>
>>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>>> for any loss, damage or destruction of data or any other property which 
>>>>>>> may
>>>>>>> arise from relying on this email's technical content is explicitly
>>>>>>> disclaimed. The author will in no case be liable for any monetary 
>>>>>>> damages
>>>>>>> arising from such loss, damage or destruction.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Wed, 1 Aug 2018 at 04:51, Hequn Cheng <chenghe...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi, Mich
>>>>>>>>
>>>>>>>> You can try adding "import org.apache.flink.table.api.scala._", so
>>>>>>>> that the Symbol can be recognized as an Expression.
>>>>>>>>
>>>>>>>> Best, Hequn
>>>>>>>>
>>>>>>>> On Wed, Aug 1, 2018 at 6:16 AM, Mich Talebzadeh <
>>>>>>>> mich.talebza...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> I am following this example
>>>>>>>>>
>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-
>>>>>>>>> release-1.5/dev/table/common.html#integration-with-
>>>>>>>>> datastream-and-dataset-api
>>>>>>>>>
>>>>>>>>> This is my dataStream which is built on a Kafka topic
>>>>>>>>>
>>>>>>>>>    //
>>>>>>>>>     //Create a Kafka consumer
>>>>>>>>>     //
>>>>>>>>>     val dataStream =  streamExecEnv
>>>>>>>>>        .addSource(new FlinkKafkaConsumer011[String](topicsValue,
>>>>>>>>> new SimpleStringSchema(), properties))
>>>>>>>>>  //
>>>>>>>>>  //
>>>>>>>>>   val tableEnv = TableEnvironment.getTableEnvironment(
>>>>>>>>> streamExecEnv)
>>>>>>>>>   val table1: Table = tableEnv.fromDataStream(dataStream, 'key,
>>>>>>>>> 'ticker, 'timeissued, 'price)
>>>>>>>>>
>>>>>>>>> While compiling it throws this error
>>>>>>>>>
>>>>>>>>> [error] /home/hduser/dba/bin/flink/md_streaming/src/main/scala/
>>>>>>>>> myPackage/md_streaming.scala:169: overloaded method value
>>>>>>>>> fromDataStream with alternatives:
>>>>>>>>> [error]   [T](dataStream: 
>>>>>>>>> org.apache.flink.streaming.api.datastream.DataStream[T],
>>>>>>>>> fields: String)org.apache.flink.table.api.Table <and>
>>>>>>>>> [error]   [T](dataStream: org.apache.flink.streaming.
>>>>>>>>> api.datastream.DataStream[T])org.apache.flink.table.api.Table
>>>>>>>>> [error]  cannot be applied to (org.apache.flink.streaming.
>>>>>>>>> api.datastream.DataStreamSource[String], Symbol, Symbol, Symbol,
>>>>>>>>> Symbol)
>>>>>>>>> [error]   val table1: Table = tableEnv.fromDataStream(dataStream,
>>>>>>>>> 'key, 'ticker, 'timeissued, 'price)
>>>>>>>>> [error]                                ^
>>>>>>>>> [error] one error found
>>>>>>>>> [error] (compile:compileIncremental) Compilation failed
>>>>>>>>>
>>>>>>>>> The topic is very simple, it is comma separated prices. I tried
>>>>>>>>> mapFunction and flatMap but neither worked!
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Dr Mich Talebzadeh
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> LinkedIn * 
>>>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>>>>> for any loss, damage or destruction of data or any other property 
>>>>>>>>> which may
>>>>>>>>> arise from relying on this email's technical content is explicitly
>>>>>>>>> disclaimed. The author will in no case be liable for any monetary 
>>>>>>>>> damages
>>>>>>>>> arising from such loss, damage or destruction.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>
>>

Reply via email to