Hi Mich, It seems that the type of your DataStream stream is always wrong. If you want to specify four fields, usually the DataStream type should be similar: DataStream[(Type1, Type2, Type3, Type4)], not DataStream[String], you can try it.
Thanks, vino 2018-08-02 6:44 GMT+08:00 Mich Talebzadeh <mich.talebza...@gmail.com>: > Changed as suggested > > val streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment > val dataStream = streamExecEnv > .addSource(new FlinkKafkaConsumer011[String](topicsValue, new > SimpleStringSchema(), properties)) > val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv) > tableEnv.registerDataStream("table1", streamExecEnv, 'key, 'ticker, > 'timeissued, 'price) > > Still the same error > > [info] Compiling 1 Scala source to /home/hduser/dba/bin/flink/md_ > streaming/target/scala-2.11/classes... > [error] /home/hduser/dba/bin/flink/md_streaming/src/main/scala/ > myPackage/md_streaming.scala:139: overloaded method value > registerDataStream with alternatives: > [error] [T](name: String, dataStream: > org.apache.flink.streaming.api.datastream.DataStream[T], > fields: String)Unit <and> > [error] [T](name: String, dataStream: org.apache.flink.streaming. > api.datastream.DataStream[T])Unit > [error] cannot be applied to (String, org.apache.flink.streaming. > api.environment.StreamExecutionEnvironment, Symbol, Symbol, Symbol, > Symbol) > [error] tableEnv.registerDataStream("table1", streamExecEnv, 'key, > 'ticker, 'timeissued, 'price) > [error] ^ > [error] one error found > [error] (compile:compileIncremental) Compilation failed > [error] Total time: 3 s, completed Aug 1, 2018 11:40:47 PM > > Thanks anyway. > > Dr Mich Talebzadeh > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > > http://talebzadehmich.wordpress.com > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Wed, 1 Aug 2018 at 23:34, Fabian Hueske <fhue...@gmail.com> wrote: > >> Hi, >> >> You have to pass the StreamExecutionEnvironment to the >> getTableEnvironment() method, not the DataStream (or DataStreamSource). >> Change >> >> val tableEnv = TableEnvironment.getTableEnvironment(dataStream) >> >> to >> >> val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv) >> >> Best, >> Fabian >> >> 2018-08-02 0:10 GMT+02:00 Mich Talebzadeh <mich.talebza...@gmail.com>: >> >>> Hi, >>> >>> FYI, these are my imports >>> >>> import java.util.Properties >>> import java.util.Arrays >>> import org.apache.flink.api.common.functions.MapFunction >>> import org.apache.flink.streaming.api.environment. >>> StreamExecutionEnvironment >>> import org.apache.flink.streaming.api.scala >>> import org.apache.flink.streaming.util.serialization.SimpleStringSchema >>> import org.apache.flink.table.api.TableEnvironment >>> import org.apache.flink.table.api.scala._ >>> import org.apache.flink.api.scala._ >>> import org.apache.kafka.clients.consumer.ConsumerConfig >>> import org.apache.kafka.clients.consumer.ConsumerRecord >>> import org.apache.kafka.clients.consumer.KafkaConsumer >>> import org.apache.flink.core.fs.FileSystem >>> import org.apache.flink.streaming.api.TimeCharacteristic >>> import org.slf4j.LoggerFactory >>> import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer011, >>> FlinkKafkaProducer011} >>> import java.util.Calendar >>> import java.util.Date >>> import java.text.DateFormat >>> import java.text.SimpleDateFormat >>> import org.apache.log4j.Logger >>> import org.apache.log4j.Level >>> import sys.process.stringSeqToProcess >>> import java.io.File >>> >>> And this is the simple code >>> >>> val properties = new Properties() >>> properties.setProperty("bootstrap.servers", bootstrapServers) >>> properties.setProperty("zookeeper.connect", zookeeperConnect) >>> properties.setProperty("group.id", flinkAppName) >>> properties.setProperty("auto.offset.reset", "latest") >>> val streamExecEnv = StreamExecutionEnvironment. >>> getExecutionEnvironment >>> val dataStream = streamExecEnv >>> .addSource(new FlinkKafkaConsumer011[String](topicsValue, new >>> SimpleStringSchema(), properties)) >>> val tableEnv = TableEnvironment.getTableEnvironment(dataStream) >>> tableEnv.registerDataStream("table1", dataStream, 'key, 'ticker, >>> 'timeissued, 'price) >>> >>> And this is the compilation error >>> >>> info] Compiling 1 Scala source to /home/hduser/dba/bin/flink/md_ >>> streaming/target/scala-2.11/classes... >>> [error] /home/hduser/dba/bin/flink/md_streaming/src/main/scala/ >>> myPackage/md_streaming.scala:138: overloaded method value >>> getTableEnvironment with alternatives: >>> [error] (executionEnvironment: org.apache.flink.streaming.api.scala. >>> StreamExecutionEnvironment)org.apache.flink.table.api.scala.StreamTableEnvironment >>> <and> >>> [error] (executionEnvironment: org.apache.flink.streaming. >>> api.environment.StreamExecutionEnvironment)org.apache.flink.table.api.java.StreamTableEnvironment >>> <and> >>> [error] (executionEnvironment: org.apache.flink.api.scala. >>> ExecutionEnvironment)org.apache.flink.table.api.scala.BatchTableEnvironment >>> <and> >>> [error] (executionEnvironment: org.apache.flink.api.java. >>> ExecutionEnvironment)org.apache.flink.table.api.java. >>> BatchTableEnvironment >>> [error] cannot be applied to (org.apache.flink.streaming. >>> api.datastream.DataStreamSource[String]) >>> [error] val tableEnv = TableEnvironment.getTableEnvironment( >>> dataStream) >>> [error] ^ >>> [error] one error found >>> [error] (compile:compileIncremental) Compilation failed >>> [error] Total time: 3 s, completed Aug 1, 2018 11:02:33 PM >>> Completed compiling >>> >>> which is really strange >>> >>> Dr Mich Talebzadeh >>> >>> >>> >>> LinkedIn * >>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>> >>> >>> >>> http://talebzadehmich.wordpress.com >>> >>> >>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>> any loss, damage or destruction of data or any other property which may >>> arise from relying on this email's technical content is explicitly >>> disclaimed. The author will in no case be liable for any monetary damages >>> arising from such loss, damage or destruction. >>> >>> >>> >>> >>> On Wed, 1 Aug 2018 at 13:42, Fabian Hueske <fhue...@gmail.com> wrote: >>> >>>> Hi I think you are mixing Java and Scala dependencies. >>>> >>>> org.apache.flink.streaming.api.datastream.DataStream is the DataStream >>>> of the Java DataStream API. >>>> You should use the DataStream of the Scala DataStream API. >>>> >>>> Best, Fabian >>>> >>>> 2018-08-01 14:01 GMT+02:00 Mich Talebzadeh <mich.talebza...@gmail.com>: >>>> >>>>> Hi, >>>>> >>>>> I believed I tried Hequn's suggestion and tried again >>>>> >>>>> import org.apache.flink.table.api.Table >>>>> import org.apache.flink.table.api.TableEnvironment >>>>> >>>>> *import org.apache.flink.table.api.scala._* >>>>> Unfortunately I am still getting the same error! >>>>> >>>>> [info] Compiling 1 Scala source to /home/hduser/dba/bin/flink/md_ >>>>> streaming/target/scala-2.11/classes... >>>>> [error] /home/hduser/dba/bin/flink/md_streaming/src/main/scala/ >>>>> myPackage/md_streaming.scala:151: overloaded method value >>>>> fromDataStream with alternatives: >>>>> [error] [T](dataStream: >>>>> org.apache.flink.streaming.api.datastream.DataStream[T], >>>>> fields: String)org.apache.flink.table.api.Table <and> >>>>> [error] [T](dataStream: org.apache.flink.streaming. >>>>> api.datastream.DataStream[T])org.apache.flink.table.api.Table >>>>> [error] cannot be applied to (org.apache.flink.streaming. >>>>> api.datastream.DataStreamSource[String], Symbol, Symbol, Symbol, >>>>> Symbol) >>>>> [error] val table1: Table = tableEnv.fromDataStream(dataStream, >>>>> 'key, 'ticker, 'timeissued, 'price) >>>>> [error] ^ >>>>> [error] one error found >>>>> [error] (compile:compileIncremental) Compilation failed >>>>> [error] Total time: 3 s, completed Aug 1, 2018 12:59:44 PM >>>>> Completed compiling >>>>> >>>>> >>>>> Dr Mich Talebzadeh >>>>> >>>>> >>>>> >>>>> LinkedIn * >>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>>> >>>>> >>>>> >>>>> http://talebzadehmich.wordpress.com >>>>> >>>>> >>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>>>> any loss, damage or destruction of data or any other property which may >>>>> arise from relying on this email's technical content is explicitly >>>>> disclaimed. The author will in no case be liable for any monetary damages >>>>> arising from such loss, damage or destruction. >>>>> >>>>> >>>>> >>>>> >>>>> On Wed, 1 Aug 2018 at 10:03, Timo Walther <twal...@apache.org> wrote: >>>>> >>>>>> If these two imports are the only imports that you added, then you >>>>>> did not follow Hequn's advice or the link that I sent you. >>>>>> >>>>>> You need to add the underscore imports to let Scala do its magic. >>>>>> >>>>>> Timo >>>>>> >>>>>> >>>>>> Am 01.08.18 um 10:28 schrieb Mich Talebzadeh: >>>>>> >>>>>> Hi Timo, >>>>>> >>>>>> These are my two flink table related imports >>>>>> >>>>>> import org.apache.flink.table.api.Table >>>>>> import org.apache.flink.table.api.TableEnvironment >>>>>> >>>>>> And these are my dependencies building with SBT >>>>>> >>>>>> libraryDependencies += "org.apache.hadoop" % "hadoop-core" % "1.2.1" >>>>>> libraryDependencies += "org.apache.hbase" % "hbase" % "1.2.6" >>>>>> libraryDependencies += "org.apache.hbase" % "hbase-client" % "1.2.6" >>>>>> libraryDependencies += "org.apache.hbase" % "hbase-common" % "1.2.6" >>>>>> libraryDependencies += "org.apache.hbase" % "hbase-server" % "1.2.6" >>>>>> libraryDependencies += "org.apache.flink" %% >>>>>> "flink-connector-kafka-0.11" % "1.5.0" >>>>>> libraryDependencies += "org.apache.flink" %% >>>>>> "flink-connector-kafka-base" % "1.5.0" >>>>>> libraryDependencies += "org.apache.flink" %% "flink-scala" % "1.5.0" >>>>>> libraryDependencies += "org.apache.kafka" % "kafka-clients" % >>>>>> "0.11.0.0" >>>>>> libraryDependencies += "org.apache.flink" %% "flink-streaming-java" % >>>>>> "1.5.0" % "provided" >>>>>> >>>>>> *libraryDependencies += "org.apache.flink" %% "flink-table" % "1.5.0" >>>>>> % "provided" *libraryDependencies += "org.apache.kafka" %% "kafka" % >>>>>> "0.11.0.0" >>>>>> >>>>>> There appears to be conflict somewhere that cause this error >>>>>> >>>>>> [info] Compiling 1 Scala source to /home/hduser/dba/bin/flink/md_ >>>>>> streaming/target/scala-2.11/classes... >>>>>> [error] /home/hduser/dba/bin/flink/md_streaming/src/main/scala/ >>>>>> myPackage/md_streaming.scala:152: overloaded method value >>>>>> fromDataStream with alternatives: >>>>>> [error] [T](dataStream: >>>>>> org.apache.flink.streaming.api.datastream.DataStream[T], >>>>>> fields: String)org.apache.flink.table.api.Table <and> >>>>>> [error] [T](dataStream: org.apache.flink.streaming. >>>>>> api.datastream.DataStream[T])org.apache.flink.table.api.Table >>>>>> [error] cannot be applied to (org.apache.flink.streaming. >>>>>> api.datastream.DataStreamSource[String], Symbol, Symbol, Symbol, >>>>>> Symbol) >>>>>> [error] val table1: Table = tableEnv.fromDataStream(dataStream, >>>>>> 'key, 'ticker, 'timeissued, 'price) >>>>>> [error] ^ >>>>>> [error] one error found >>>>>> [error] (compile:compileIncremental) Compilation failed >>>>>> >>>>>> Thanks >>>>>> >>>>>> >>>>>> Dr Mich Talebzadeh >>>>>> >>>>>> >>>>>> >>>>>> LinkedIn * >>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>>>> >>>>>> >>>>>> >>>>>> http://talebzadehmich.wordpress.com >>>>>> >>>>>> >>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility >>>>>> for any loss, damage or destruction of data or any other property which >>>>>> may >>>>>> arise from relying on this email's technical content is explicitly >>>>>> disclaimed. The author will in no case be liable for any monetary damages >>>>>> arising from such loss, damage or destruction. >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Wed, 1 Aug 2018 at 09:17, Timo Walther <twal...@apache.org> wrote: >>>>>> >>>>>>> Hi Mich, >>>>>>> >>>>>>> I would check you imports again [1]. This is a pure compiler issue >>>>>>> that is unrelated to your actual data stream. Also check your project >>>>>>> dependencies. >>>>>>> >>>>>>> Regards, >>>>>>> Timo >>>>>>> >>>>>>> [1] https://ci.apache.org/projects/flink/flink-docs- >>>>>>> master/dev/table/common.html#implicit-conversion-for-scala >>>>>>> >>>>>>> Am 01.08.18 um 09:30 schrieb Mich Talebzadeh: >>>>>>> >>>>>>> >>>>>>> Hi both, >>>>>>> >>>>>>> I added the import as Hequn suggested. >>>>>>> >>>>>>> My stream is very simple and consists of 4 values separated by "," >>>>>>> as below >>>>>>> >>>>>>> 05521df6-4ccf-4b2f-b874-eb27d461b305,IBM,2018-07-30T19:51:50,190.48 >>>>>>> >>>>>>> So this is what I have been trying to do >>>>>>> >>>>>>> Code >>>>>>> >>>>>>> val dataStream = streamExecEnv >>>>>>> .addSource(new FlinkKafkaConsumer011[String](topicsValue, >>>>>>> new SimpleStringSchema(), properties)) >>>>>>> // >>>>>>> // >>>>>>> val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv) >>>>>>> val table1: Table = tableEnv.fromDataStream(dataStream, 'key, >>>>>>> 'ticker, 'timeissued, 'price) >>>>>>> >>>>>>> note those four columns in Table1 definition >>>>>>> >>>>>>> And this is the error being thrown >>>>>>> >>>>>>> [info] Compiling 1 Scala source to /home/hduser/dba/bin/flink/md_ >>>>>>> streaming/target/scala-2.11/classes... >>>>>>> [error] /home/hduser/dba/bin/flink/md_streaming/src/main/scala/ >>>>>>> myPackage/md_streaming.scala:152: overloaded method value >>>>>>> fromDataStream with alternatives: >>>>>>> [error] [T](dataStream: >>>>>>> org.apache.flink.streaming.api.datastream.DataStream[T], >>>>>>> fields: String)org.apache.flink.table.api.Table <and> >>>>>>> [error] [T](dataStream: org.apache.flink.streaming. >>>>>>> api.datastream.DataStream[T])org.apache.flink.table.api.Table >>>>>>> [error] cannot be applied to (org.apache.flink.streaming. >>>>>>> api.datastream.DataStreamSource[String], Symbol, Symbol, Symbol, >>>>>>> Symbol) >>>>>>> [error] val table1: Table = tableEnv.fromDataStream(dataStream, >>>>>>> 'key, 'ticker, 'timeissued, 'price) >>>>>>> [error] ^ >>>>>>> [error] one error found >>>>>>> [error] (compile:compileIncremental) Compilation failed >>>>>>> >>>>>>> I suspect dataStream may not be compatible with this operation? >>>>>>> >>>>>>> Regards, >>>>>>> >>>>>>> Dr Mich Talebzadeh >>>>>>> >>>>>>> >>>>>>> >>>>>>> LinkedIn * >>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>>>>> >>>>>>> >>>>>>> >>>>>>> http://talebzadehmich.wordpress.com >>>>>>> >>>>>>> >>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility >>>>>>> for any loss, damage or destruction of data or any other property which >>>>>>> may >>>>>>> arise from relying on this email's technical content is explicitly >>>>>>> disclaimed. The author will in no case be liable for any monetary >>>>>>> damages >>>>>>> arising from such loss, damage or destruction. >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Wed, 1 Aug 2018 at 04:51, Hequn Cheng <chenghe...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi, Mich >>>>>>>> >>>>>>>> You can try adding "import org.apache.flink.table.api.scala._", so >>>>>>>> that the Symbol can be recognized as an Expression. >>>>>>>> >>>>>>>> Best, Hequn >>>>>>>> >>>>>>>> On Wed, Aug 1, 2018 at 6:16 AM, Mich Talebzadeh < >>>>>>>> mich.talebza...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>> I am following this example >>>>>>>>> >>>>>>>>> https://ci.apache.org/projects/flink/flink-docs- >>>>>>>>> release-1.5/dev/table/common.html#integration-with- >>>>>>>>> datastream-and-dataset-api >>>>>>>>> >>>>>>>>> This is my dataStream which is built on a Kafka topic >>>>>>>>> >>>>>>>>> // >>>>>>>>> //Create a Kafka consumer >>>>>>>>> // >>>>>>>>> val dataStream = streamExecEnv >>>>>>>>> .addSource(new FlinkKafkaConsumer011[String](topicsValue, >>>>>>>>> new SimpleStringSchema(), properties)) >>>>>>>>> // >>>>>>>>> // >>>>>>>>> val tableEnv = TableEnvironment.getTableEnvironment( >>>>>>>>> streamExecEnv) >>>>>>>>> val table1: Table = tableEnv.fromDataStream(dataStream, 'key, >>>>>>>>> 'ticker, 'timeissued, 'price) >>>>>>>>> >>>>>>>>> While compiling it throws this error >>>>>>>>> >>>>>>>>> [error] /home/hduser/dba/bin/flink/md_streaming/src/main/scala/ >>>>>>>>> myPackage/md_streaming.scala:169: overloaded method value >>>>>>>>> fromDataStream with alternatives: >>>>>>>>> [error] [T](dataStream: >>>>>>>>> org.apache.flink.streaming.api.datastream.DataStream[T], >>>>>>>>> fields: String)org.apache.flink.table.api.Table <and> >>>>>>>>> [error] [T](dataStream: org.apache.flink.streaming. >>>>>>>>> api.datastream.DataStream[T])org.apache.flink.table.api.Table >>>>>>>>> [error] cannot be applied to (org.apache.flink.streaming. >>>>>>>>> api.datastream.DataStreamSource[String], Symbol, Symbol, Symbol, >>>>>>>>> Symbol) >>>>>>>>> [error] val table1: Table = tableEnv.fromDataStream(dataStream, >>>>>>>>> 'key, 'ticker, 'timeissued, 'price) >>>>>>>>> [error] ^ >>>>>>>>> [error] one error found >>>>>>>>> [error] (compile:compileIncremental) Compilation failed >>>>>>>>> >>>>>>>>> The topic is very simple, it is comma separated prices. I tried >>>>>>>>> mapFunction and flatMap but neither worked! >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> >>>>>>>>> >>>>>>>>> Dr Mich Talebzadeh >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> LinkedIn * >>>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> http://talebzadehmich.wordpress.com >>>>>>>>> >>>>>>>>> >>>>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility >>>>>>>>> for any loss, damage or destruction of data or any other property >>>>>>>>> which may >>>>>>>>> arise from relying on this email's technical content is explicitly >>>>>>>>> disclaimed. The author will in no case be liable for any monetary >>>>>>>>> damages >>>>>>>>> arising from such loss, damage or destruction. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>> >>