Whenever you use Scala and there is a Scala specific class use it.

remove: import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
add: import org.apache.flink.streaming.api.scala._

This will use org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.

Timo

Am 02.08.18 um 09:47 schrieb Mich Talebzadeh:
Tremendous. Many thanks.

Put the sbt build file and the Scala code here

https://github.com/michTalebzadeh/Flink

Regards,

Dr Mich Talebzadeh

LinkedIn /https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw/

http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk.Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.



On Thu, 2 Aug 2018 at 08:27, Timo Walther <[email protected] <mailto:[email protected]>> wrote:

    Hi Mich,

    could you share your project with us (maybe on github)? Then we
    can import it and debug what the problem is.

    Regards,
    Timo

    Am 02.08.18 um 07:37 schrieb Mich Talebzadeh:
    Hi Jorn,

    Here you go the dependencies

    libraryDependencies += "org.apache.hadoop" % "hadoop-core" % "1.2.1"
    libraryDependencies += "org.apache.hbase" % "hbase" % "1.2.6"
    libraryDependencies += "org.apache.hbase" % "hbase-client" % "1.2.6"
    libraryDependencies += "org.apache.hbase" % "hbase-common" % "1.2.6"
    libraryDependencies += "org.apache.hbase" % "hbase-server" % "1.2.6"
    libraryDependencies += "org.apache.flink" %%
    "flink-connector-kafka-0.11" % "1.5.0"
    libraryDependencies += "org.apache.flink" %%
    "flink-connector-kafka-base" % "1.5.0"
    libraryDependencies += "org.apache.flink" %% "flink-scala" % "1.5.0"
    libraryDependencies += "org.apache.kafka" % "kafka-clients" %
    "0.11.0.0"
    libraryDependencies += "org.apache.flink" %%
    "flink-streaming-scala" % "1.5.0"
    libraryDependencies += "org.apache.flink" %% "flink-table" %
    "1.5.0" % "provided"
    libraryDependencies += "org.apache.kafka" %% "kafka" % "0.11.0.0"

    Thanks

    Dr Mich Talebzadeh

    LinkedIn
    
/https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw/

    http://talebzadehmich.wordpress.com


    *Disclaimer:* Use it at your own risk.Any and all responsibility
    for any loss, damage or destruction of data or any other property
    which may arise from relying on this email's technical content is
    explicitly disclaimed. The author will in no case be liable for
    any monetary damages arising from such loss, damage or destruction.



    On Thu, 2 Aug 2018 at 06:19, Jörn Franke <[email protected]
    <mailto:[email protected]>> wrote:


        How does your build.sbt looks especially dependencies?
        On 2. Aug 2018, at 00:44, Mich Talebzadeh
        <[email protected]
        <mailto:[email protected]>> wrote:

        Changed as suggested

           val streamExecEnv =
        StreamExecutionEnvironment.getExecutionEnvironment
             val dataStream =  streamExecEnv
               .addSource(new
        FlinkKafkaConsumer011[String](topicsValue, new
        SimpleStringSchema(), properties))
          val tableEnv =
        TableEnvironment.getTableEnvironment(streamExecEnv)
        tableEnv.registerDataStream("table1", streamExecEnv, 'key,
        'ticker, 'timeissued, 'price)

        Still the same error

        [info] Compiling 1 Scala source to
        /home/hduser/dba/bin/flink/md_streaming/target/scala-2.11/classes...
        [error]
        
/home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:139:
        overloaded method value registerDataStream with alternatives:
        [error]   [T](name: String, dataStream:
        org.apache.flink.streaming.api.datastream.DataStream[T],
        fields: String)Unit <and>
        [error]   [T](name: String, dataStream:
        org.apache.flink.streaming.api.datastream.DataStream[T])Unit
        [error]  cannot be applied to (String,
        org.apache.flink.streaming.api.environment.StreamExecutionEnvironment,
        Symbol, Symbol, Symbol, Symbol)
        [error] tableEnv.registerDataStream("table1", streamExecEnv,
        'key, 'ticker, 'timeissued, 'price)
        [error]            ^
        [error] one error found
        [error] (compile:compileIncremental) Compilation failed
        [error] Total time: 3 s, completed Aug 1, 2018 11:40:47 PM

        Thanks anyway.

        Dr Mich Talebzadeh

        LinkedIn
        
/https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw/

        http://talebzadehmich.wordpress.com


        *Disclaimer:* Use it at your own risk.Any and all
        responsibility for any loss, damage or destruction of data
        or any other property which may arise from relying on this
        email's technical content is explicitly disclaimed. The
        author will in no case be liable for any monetary damages
        arising from such loss, damage or destruction.



        On Wed, 1 Aug 2018 at 23:34, Fabian Hueske
        <[email protected] <mailto:[email protected]>> wrote:

            Hi,

            You have to pass the StreamExecutionEnvironment to the
            getTableEnvironment() method, not the DataStream (or
            DataStreamSource).
            Change

            val tableEnv =
            TableEnvironment.getTableEnvironment(dataStream)

            to

            val tableEnv =
            TableEnvironment.getTableEnvironment(streamExecEnv)

            Best,
            Fabian

            2018-08-02 0:10 GMT+02:00 Mich Talebzadeh
            <[email protected]
            <mailto:[email protected]>>:

                Hi,

                FYI, these are my imports

                import java.util.Properties
                import java.util.Arrays
                import org.apache.flink.api.common.functions.MapFunction
                import
                
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
                import org.apache.flink.streaming.api.scala
                import
                org.apache.flink.streaming.util.serialization.SimpleStringSchema
                import org.apache.flink.table.api.TableEnvironment
                import org.apache.flink.table.api.scala._
                import org.apache.flink.api.scala._
                import org.apache.kafka.clients.consumer.ConsumerConfig
                import org.apache.kafka.clients.consumer.ConsumerRecord
                import org.apache.kafka.clients.consumer.KafkaConsumer
                import org.apache.flink.core.fs.FileSystem
                import org.apache.flink.streaming.api.TimeCharacteristic
                import org.slf4j.LoggerFactory
                import
                
org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer011,
                FlinkKafkaProducer011}
                import java.util.Calendar
                import java.util.Date
                import java.text.DateFormat
                import java.text.SimpleDateFormat
                import org.apache.log4j.Logger
                import org.apache.log4j.Level
                import sys.process.stringSeqToProcess
                import java.io.File

                And this is the simple code

                  val properties = new Properties()
                properties.setProperty("bootstrap.servers",
                bootstrapServers)
                properties.setProperty("zookeeper.connect",
                zookeeperConnect)
                    properties.setProperty("group.id
                <http://group.id>", flinkAppName)
                properties.setProperty("auto.offset.reset", "latest")
                    val  streamExecEnv =
                StreamExecutionEnvironment.getExecutionEnvironment
                    val dataStream = streamExecEnv
                       .addSource(new
                FlinkKafkaConsumer011[String](topicsValue, new
                SimpleStringSchema(), properties))
                  val tableEnv =
                TableEnvironment.getTableEnvironment(dataStream)
                tableEnv.registerDataStream("table1", dataStream,
                'key, 'ticker, 'timeissued, 'price)

                And this is the compilation error

                info] Compiling 1 Scala source to
                
/home/hduser/dba/bin/flink/md_streaming/target/scala-2.11/classes...
                [error]
                
/home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:138:
                overloaded method value getTableEnvironment with
                alternatives:
                [error] (executionEnvironment:
                
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment)org.apache.flink.table.api.scala.StreamTableEnvironment
                <and>
                [error] (executionEnvironment:
                
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment)org.apache.flink.table.api.java.StreamTableEnvironment
                <and>
                [error] (executionEnvironment:
                
org.apache.flink.api.scala.ExecutionEnvironment)org.apache.flink.table.api.scala.BatchTableEnvironment
                <and>
                [error] (executionEnvironment:
                
org.apache.flink.api.java.ExecutionEnvironment)org.apache.flink.table.api.java.BatchTableEnvironment
                [error]  cannot be applied to
                
(org.apache.flink.streaming.api.datastream.DataStreamSource[String])
                [error]   val tableEnv =
                TableEnvironment.getTableEnvironment(dataStream)
                [error]                                   ^
                [error] one error found
                [error] (compile:compileIncremental) Compilation failed
                [error] Total time: 3 s, completed Aug 1, 2018
                11:02:33 PM
                Completed compiling

                which is really strange

                Dr Mich Talebzadeh

                LinkedIn
                
/https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw/

                http://talebzadehmich.wordpress.com


                *Disclaimer:* Use it at your own risk.Any and all
                responsibility for any loss, damage or destruction
                of data or any other property which may arise from
                relying on this email's technical content is
                explicitly disclaimed. The author will in no case be
                liable for any monetary damages arising from such
                loss, damage or destruction.



                On Wed, 1 Aug 2018 at 13:42, Fabian Hueske
                <[email protected] <mailto:[email protected]>> wrote:

                    Hi I think you are mixing Java and Scala
                    dependencies.

                    org.apache.flink.streaming.api.datastream.DataStream
                    is the DataStream of the Java DataStream API.
                    You should use the DataStream of the Scala
                    DataStream API.

                    Best, Fabian

                    2018-08-01 14:01 GMT+02:00 Mich Talebzadeh
                    <[email protected]
                    <mailto:[email protected]>>:

                        Hi,

                        I believed I tried Hequn's suggestion and
                        tried again

                        import org.apache.flink.table.api.Table
                        import
                        org.apache.flink.table.api.TableEnvironment
                        *import org.apache.flink.table.api.scala._
                        *
                        Unfortunately I am still getting the same error!

                        [info] Compiling 1 Scala source to
                        
/home/hduser/dba/bin/flink/md_streaming/target/scala-2.11/classes...
                        [error]
                        
/home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:151:
                        overloaded method value fromDataStream with
                        alternatives:
                        [error] [T](dataStream:
                        org.apache.flink.streaming.api.datastream.DataStream[T],
                        fields:
                        String)org.apache.flink.table.api.Table <and>
                        [error] [T](dataStream:
                        
org.apache.flink.streaming.api.datastream.DataStream[T])org.apache.flink.table.api.Table
                        [error]  cannot be applied to
                        
(org.apache.flink.streaming.api.datastream.DataStreamSource[String],
                        Symbol, Symbol, Symbol, Symbol)
                        [error]   val table1: Table =
                        tableEnv.fromDataStream(dataStream, 'key,
                        'ticker, 'timeissued, 'price)
                        [error]                                ^
                        [error] one error found
                        [error] (compile:compileIncremental)
                        Compilation failed
                        [error] Total time: 3 s, completed Aug 1,
                        2018 12:59:44 PM
                        Completed compiling


                        Dr Mich Talebzadeh

                        LinkedIn
                        
/https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw/

                        http://talebzadehmich.wordpress.com


                        *Disclaimer:* Use it at your own risk.Any
                        and all responsibility for any loss, damage
                        or destruction of data or any other property
                        which may arise from relying on this
                        email's technical content is explicitly
                        disclaimed. The author will in no case be
                        liable for any monetary damages arising from
                        such loss, damage or destruction.



                        On Wed, 1 Aug 2018 at 10:03, Timo Walther
                        <[email protected]
                        <mailto:[email protected]>> wrote:

                            If these two imports are the only
                            imports that you added, then you did not
                            follow Hequn's advice or the link that I
                            sent you.

                            You need to add the underscore imports
                            to let Scala do its magic.

                            Timo


                            Am 01.08.18 um 10:28 schrieb Mich
                            Talebzadeh:
                            Hi Timo,

                            These are my two flink table related
                            imports

                            import org.apache.flink.table.api.Table
                            import
                            org.apache.flink.table.api.TableEnvironment

                            And these are my dependencies building
                            with SBT

                            libraryDependencies +=
                            "org.apache.hadoop" % "hadoop-core" %
                            "1.2.1"
                            libraryDependencies +=
                            "org.apache.hbase" % "hbase" % "1.2.6"
                            libraryDependencies +=
                            "org.apache.hbase" % "hbase-client" %
                            "1.2.6"
                            libraryDependencies +=
                            "org.apache.hbase" % "hbase-common" %
                            "1.2.6"
                            libraryDependencies +=
                            "org.apache.hbase" % "hbase-server" %
                            "1.2.6"
                            libraryDependencies +=
                            "org.apache.flink" %%
                            "flink-connector-kafka-0.11" % "1.5.0"
                            libraryDependencies +=
                            "org.apache.flink" %%
                            "flink-connector-kafka-base" % "1.5.0"
                            libraryDependencies +=
                            "org.apache.flink" %% "flink-scala" %
                            "1.5.0"
                            libraryDependencies +=
                            "org.apache.kafka" % "kafka-clients" %
                            "0.11.0.0"
                            libraryDependencies +=
                            "org.apache.flink" %%
                            "flink-streaming-java" % "1.5.0" %
                            "provided"
                            *libraryDependencies +=
                            "org.apache.flink" %% "flink-table" %
                            "1.5.0" % "provided"
                            *libraryDependencies +=
                            "org.apache.kafka" %% "kafka" % "0.11.0.0"

                            There appears to be conflict somewhere
                            that cause this error

                            [info] Compiling 1 Scala source to
                            
/home/hduser/dba/bin/flink/md_streaming/target/scala-2.11/classes...
                            [error]
                            
/home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:152:
                            overloaded method value fromDataStream
                            with alternatives:
                            [error] [T](dataStream:
                            
org.apache.flink.streaming.api.datastream.DataStream[T],
                            fields:
                            String)org.apache.flink.table.api.Table
                            <and>
                            [error] [T](dataStream:
                            
org.apache.flink.streaming.api.datastream.DataStream[T])org.apache.flink.table.api.Table
                            [error] cannot be applied to
                            
(org.apache.flink.streaming.api.datastream.DataStreamSource[String],
                            Symbol, Symbol, Symbol, Symbol)
                            [error]   val table1: Table =
                            tableEnv.fromDataStream(dataStream,
                            'key, 'ticker, 'timeissued, 'price)
                            [error]                                ^
                            [error] one error found
                            [error] (compile:compileIncremental)
                            Compilation failed

                            Thanks


                            Dr Mich Talebzadeh

                            LinkedIn
                            
/https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw/

                            http://talebzadehmich.wordpress.com


                            *Disclaimer:* Use it at your own
                            risk.Any and all responsibility for any
                            loss, damage or destruction of data or
                            any other property which may arise from
                            relying on this
                            email's technical content is explicitly
                            disclaimed. The author will in no case
                            be liable for any monetary damages
                            arising from such loss, damage or
                            destruction.



                            On Wed, 1 Aug 2018 at 09:17, Timo
                            Walther <[email protected]
                            <mailto:[email protected]>> wrote:

                                Hi Mich,

                                I would check you imports again
                                [1]. This is a pure compiler issue
                                that is unrelated to your actual
                                data stream. Also check your
                                project dependencies.

                                Regards,
                                Timo

                                [1]
                                
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#implicit-conversion-for-scala

                                Am 01.08.18 um 09:30 schrieb Mich
                                Talebzadeh:

                                Hi both,

                                I added the import as Hequn suggested.

                                My stream is very simple and
                                consists of 4 values separated by
                                "," as below

                                
05521df6-4ccf-4b2f-b874-eb27d461b305,IBM,2018-07-30T19:51:50,190.48

                                So this is what I have been trying
                                to do

                                Code

                                    val dataStream =  streamExecEnv
                                .addSource(new
                                FlinkKafkaConsumer011[String](topicsValue,
                                new SimpleStringSchema(), properties))
                                 //
                                 //
                                  val tableEnv =
                                
TableEnvironment.getTableEnvironment(streamExecEnv)
                                  val table1: Table =
                                tableEnv.fromDataStream(dataStream,
                                'key, 'ticker, 'timeissued, 'price)

                                note those four columns in Table1
                                definition

                                And this is the error being thrown

                                [info] Compiling 1 Scala source to
                                
/home/hduser/dba/bin/flink/md_streaming/target/scala-2.11/classes...
                                [error]
                                
/home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:152:
                                overloaded method value
                                fromDataStream with alternatives:
                                [error] [T](dataStream:
                                
org.apache.flink.streaming.api.datastream.DataStream[T],
                                fields:
                                String)org.apache.flink.table.api.Table
                                <and>
                                [error] [T](dataStream:
                                
org.apache.flink.streaming.api.datastream.DataStream[T])org.apache.flink.table.api.Table
                                [error] cannot be applied to
                                
(org.apache.flink.streaming.api.datastream.DataStreamSource[String],
                                Symbol, Symbol, Symbol, Symbol)
                                [error]   val table1: Table =
                                tableEnv.fromDataStream(dataStream,
                                'key, 'ticker, 'timeissued, 'price)
                                [error] ^
                                [error] one error found
                                [error]
                                (compile:compileIncremental)
                                Compilation failed

                                I suspect dataStream may not be
                                compatible with this operation?

                                Regards,

                                Dr Mich Talebzadeh

                                LinkedIn
                                
/https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw/

                                http://talebzadehmich.wordpress.com


                                *Disclaimer:* Use it at your own
                                risk.Any and all responsibility
                                for any loss, damage or
                                destruction of data or any other
                                property which may arise from
                                relying on this
                                email's technical content is
                                explicitly disclaimed. The author
                                will in no case be liable for any
                                monetary damages arising from such
                                loss, damage or destruction.



                                On Wed, 1 Aug 2018 at 04:51, Hequn
                                Cheng <[email protected]
                                <mailto:[email protected]>> wrote:

                                    Hi, Mich

                                    You can try adding "import
                                    org.apache.flink.table.api.scala._",
                                    so that the Symbol can be
                                    recognized as an Expression.

                                    Best, Hequn

                                    On Wed, Aug 1, 2018 at 6:16
                                    AM, Mich


Reply via email to