Changed as suggested
val streamExecEnv =
StreamExecutionEnvironment.getExecutionEnvironment
val dataStream = streamExecEnv
.addSource(new
FlinkKafkaConsumer011[String](topicsValue, new
SimpleStringSchema(), properties))
val tableEnv =
TableEnvironment.getTableEnvironment(streamExecEnv)
tableEnv.registerDataStream("table1", streamExecEnv, 'key,
'ticker, 'timeissued, 'price)
Still the same error
[info] Compiling 1 Scala source to
/home/hduser/dba/bin/flink/md_streaming/target/scala-2.11/classes...
[error]
/home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:139:
overloaded method value registerDataStream with alternatives:
[error] [T](name: String, dataStream:
org.apache.flink.streaming.api.datastream.DataStream[T],
fields: String)Unit <and>
[error] [T](name: String, dataStream:
org.apache.flink.streaming.api.datastream.DataStream[T])Unit
[error] cannot be applied to (String,
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment,
Symbol, Symbol, Symbol, Symbol)
[error] tableEnv.registerDataStream("table1", streamExecEnv,
'key, 'ticker, 'timeissued, 'price)
[error] ^
[error] one error found
[error] (compile:compileIncremental) Compilation failed
[error] Total time: 3 s, completed Aug 1, 2018 11:40:47 PM
Thanks anyway.
Dr Mich Talebzadeh
LinkedIn
/https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw/
http://talebzadehmich.wordpress.com
*Disclaimer:* Use it at your own risk.Any and all
responsibility for any loss, damage or destruction of data
or any other property which may arise from relying on this
email's technical content is explicitly disclaimed. The
author will in no case be liable for any monetary damages
arising from such loss, damage or destruction.
On Wed, 1 Aug 2018 at 23:34, Fabian Hueske
<[email protected] <mailto:[email protected]>> wrote:
Hi,
You have to pass the StreamExecutionEnvironment to the
getTableEnvironment() method, not the DataStream (or
DataStreamSource).
Change
val tableEnv =
TableEnvironment.getTableEnvironment(dataStream)
to
val tableEnv =
TableEnvironment.getTableEnvironment(streamExecEnv)
Best,
Fabian
2018-08-02 0:10 GMT+02:00 Mich Talebzadeh
<[email protected]
<mailto:[email protected]>>:
Hi,
FYI, these are my imports
import java.util.Properties
import java.util.Arrays
import org.apache.flink.api.common.functions.MapFunction
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala
import
org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._
import org.apache.flink.api.scala._
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.flink.core.fs.FileSystem
import org.apache.flink.streaming.api.TimeCharacteristic
import org.slf4j.LoggerFactory
import
org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer011,
FlinkKafkaProducer011}
import java.util.Calendar
import java.util.Date
import java.text.DateFormat
import java.text.SimpleDateFormat
import org.apache.log4j.Logger
import org.apache.log4j.Level
import sys.process.stringSeqToProcess
import java.io.File
And this is the simple code
val properties = new Properties()
properties.setProperty("bootstrap.servers",
bootstrapServers)
properties.setProperty("zookeeper.connect",
zookeeperConnect)
properties.setProperty("group.id
<http://group.id>", flinkAppName)
properties.setProperty("auto.offset.reset", "latest")
val streamExecEnv =
StreamExecutionEnvironment.getExecutionEnvironment
val dataStream = streamExecEnv
.addSource(new
FlinkKafkaConsumer011[String](topicsValue, new
SimpleStringSchema(), properties))
val tableEnv =
TableEnvironment.getTableEnvironment(dataStream)
tableEnv.registerDataStream("table1", dataStream,
'key, 'ticker, 'timeissued, 'price)
And this is the compilation error
info] Compiling 1 Scala source to
/home/hduser/dba/bin/flink/md_streaming/target/scala-2.11/classes...
[error]
/home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:138:
overloaded method value getTableEnvironment with
alternatives:
[error] (executionEnvironment:
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment)org.apache.flink.table.api.scala.StreamTableEnvironment
<and>
[error] (executionEnvironment:
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment)org.apache.flink.table.api.java.StreamTableEnvironment
<and>
[error] (executionEnvironment:
org.apache.flink.api.scala.ExecutionEnvironment)org.apache.flink.table.api.scala.BatchTableEnvironment
<and>
[error] (executionEnvironment:
org.apache.flink.api.java.ExecutionEnvironment)org.apache.flink.table.api.java.BatchTableEnvironment
[error] cannot be applied to
(org.apache.flink.streaming.api.datastream.DataStreamSource[String])
[error] val tableEnv =
TableEnvironment.getTableEnvironment(dataStream)
[error] ^
[error] one error found
[error] (compile:compileIncremental) Compilation failed
[error] Total time: 3 s, completed Aug 1, 2018
11:02:33 PM
Completed compiling
which is really strange
Dr Mich Talebzadeh
LinkedIn
/https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw/
http://talebzadehmich.wordpress.com
*Disclaimer:* Use it at your own risk.Any and all
responsibility for any loss, damage or destruction
of data or any other property which may arise from
relying on this email's technical content is
explicitly disclaimed. The author will in no case be
liable for any monetary damages arising from such
loss, damage or destruction.
On Wed, 1 Aug 2018 at 13:42, Fabian Hueske
<[email protected] <mailto:[email protected]>> wrote:
Hi I think you are mixing Java and Scala
dependencies.
org.apache.flink.streaming.api.datastream.DataStream
is the DataStream of the Java DataStream API.
You should use the DataStream of the Scala
DataStream API.
Best, Fabian
2018-08-01 14:01 GMT+02:00 Mich Talebzadeh
<[email protected]
<mailto:[email protected]>>:
Hi,
I believed I tried Hequn's suggestion and
tried again
import org.apache.flink.table.api.Table
import
org.apache.flink.table.api.TableEnvironment
*import org.apache.flink.table.api.scala._
*
Unfortunately I am still getting the same error!
[info] Compiling 1 Scala source to
/home/hduser/dba/bin/flink/md_streaming/target/scala-2.11/classes...
[error]
/home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:151:
overloaded method value fromDataStream with
alternatives:
[error] [T](dataStream:
org.apache.flink.streaming.api.datastream.DataStream[T],
fields:
String)org.apache.flink.table.api.Table <and>
[error] [T](dataStream:
org.apache.flink.streaming.api.datastream.DataStream[T])org.apache.flink.table.api.Table
[error] cannot be applied to
(org.apache.flink.streaming.api.datastream.DataStreamSource[String],
Symbol, Symbol, Symbol, Symbol)
[error] val table1: Table =
tableEnv.fromDataStream(dataStream, 'key,
'ticker, 'timeissued, 'price)
[error] ^
[error] one error found
[error] (compile:compileIncremental)
Compilation failed
[error] Total time: 3 s, completed Aug 1,
2018 12:59:44 PM
Completed compiling
Dr Mich Talebzadeh
LinkedIn
/https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw/
http://talebzadehmich.wordpress.com
*Disclaimer:* Use it at your own risk.Any
and all responsibility for any loss, damage
or destruction of data or any other property
which may arise from relying on this
email's technical content is explicitly
disclaimed. The author will in no case be
liable for any monetary damages arising from
such loss, damage or destruction.
On Wed, 1 Aug 2018 at 10:03, Timo Walther
<[email protected]
<mailto:[email protected]>> wrote:
If these two imports are the only
imports that you added, then you did not
follow Hequn's advice or the link that I
sent you.
You need to add the underscore imports
to let Scala do its magic.
Timo
Am 01.08.18 um 10:28 schrieb Mich
Talebzadeh:
Hi Timo,
These are my two flink table related
imports
import org.apache.flink.table.api.Table
import
org.apache.flink.table.api.TableEnvironment
And these are my dependencies building
with SBT
libraryDependencies +=
"org.apache.hadoop" % "hadoop-core" %
"1.2.1"
libraryDependencies +=
"org.apache.hbase" % "hbase" % "1.2.6"
libraryDependencies +=
"org.apache.hbase" % "hbase-client" %
"1.2.6"
libraryDependencies +=
"org.apache.hbase" % "hbase-common" %
"1.2.6"
libraryDependencies +=
"org.apache.hbase" % "hbase-server" %
"1.2.6"
libraryDependencies +=
"org.apache.flink" %%
"flink-connector-kafka-0.11" % "1.5.0"
libraryDependencies +=
"org.apache.flink" %%
"flink-connector-kafka-base" % "1.5.0"
libraryDependencies +=
"org.apache.flink" %% "flink-scala" %
"1.5.0"
libraryDependencies +=
"org.apache.kafka" % "kafka-clients" %
"0.11.0.0"
libraryDependencies +=
"org.apache.flink" %%
"flink-streaming-java" % "1.5.0" %
"provided"
*libraryDependencies +=
"org.apache.flink" %% "flink-table" %
"1.5.0" % "provided"
*libraryDependencies +=
"org.apache.kafka" %% "kafka" % "0.11.0.0"
There appears to be conflict somewhere
that cause this error
[info] Compiling 1 Scala source to
/home/hduser/dba/bin/flink/md_streaming/target/scala-2.11/classes...
[error]
/home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:152:
overloaded method value fromDataStream
with alternatives:
[error] [T](dataStream:
org.apache.flink.streaming.api.datastream.DataStream[T],
fields:
String)org.apache.flink.table.api.Table
<and>
[error] [T](dataStream:
org.apache.flink.streaming.api.datastream.DataStream[T])org.apache.flink.table.api.Table
[error] cannot be applied to
(org.apache.flink.streaming.api.datastream.DataStreamSource[String],
Symbol, Symbol, Symbol, Symbol)
[error] val table1: Table =
tableEnv.fromDataStream(dataStream,
'key, 'ticker, 'timeissued, 'price)
[error] ^
[error] one error found
[error] (compile:compileIncremental)
Compilation failed
Thanks
Dr Mich Talebzadeh
LinkedIn
/https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw/
http://talebzadehmich.wordpress.com
*Disclaimer:* Use it at your own
risk.Any and all responsibility for any
loss, damage or destruction of data or
any other property which may arise from
relying on this
email's technical content is explicitly
disclaimed. The author will in no case
be liable for any monetary damages
arising from such loss, damage or
destruction.
On Wed, 1 Aug 2018 at 09:17, Timo
Walther <[email protected]
<mailto:[email protected]>> wrote:
Hi Mich,
I would check you imports again
[1]. This is a pure compiler issue
that is unrelated to your actual
data stream. Also check your
project dependencies.
Regards,
Timo
[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#implicit-conversion-for-scala
Am 01.08.18 um 09:30 schrieb Mich
Talebzadeh:
Hi both,
I added the import as Hequn suggested.
My stream is very simple and
consists of 4 values separated by
"," as below
05521df6-4ccf-4b2f-b874-eb27d461b305,IBM,2018-07-30T19:51:50,190.48
So this is what I have been trying
to do
Code
val dataStream = streamExecEnv
.addSource(new
FlinkKafkaConsumer011[String](topicsValue,
new SimpleStringSchema(), properties))
//
//
val tableEnv =
TableEnvironment.getTableEnvironment(streamExecEnv)
val table1: Table =
tableEnv.fromDataStream(dataStream,
'key, 'ticker, 'timeissued, 'price)
note those four columns in Table1
definition
And this is the error being thrown
[info] Compiling 1 Scala source to
/home/hduser/dba/bin/flink/md_streaming/target/scala-2.11/classes...
[error]
/home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:152:
overloaded method value
fromDataStream with alternatives:
[error] [T](dataStream:
org.apache.flink.streaming.api.datastream.DataStream[T],
fields:
String)org.apache.flink.table.api.Table
<and>
[error] [T](dataStream:
org.apache.flink.streaming.api.datastream.DataStream[T])org.apache.flink.table.api.Table
[error] cannot be applied to
(org.apache.flink.streaming.api.datastream.DataStreamSource[String],
Symbol, Symbol, Symbol, Symbol)
[error] val table1: Table =
tableEnv.fromDataStream(dataStream,
'key, 'ticker, 'timeissued, 'price)
[error] ^
[error] one error found
[error]
(compile:compileIncremental)
Compilation failed
I suspect dataStream may not be
compatible with this operation?
Regards,
Dr Mich Talebzadeh
LinkedIn
/https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw/
http://talebzadehmich.wordpress.com
*Disclaimer:* Use it at your own
risk.Any and all responsibility
for any loss, damage or
destruction of data or any other
property which may arise from
relying on this
email's technical content is
explicitly disclaimed. The author
will in no case be liable for any
monetary damages arising from such
loss, damage or destruction.
On Wed, 1 Aug 2018 at 04:51, Hequn
Cheng <[email protected]
<mailto:[email protected]>> wrote:
Hi, Mich
You can try adding "import
org.apache.flink.table.api.scala._",
so that the Symbol can be
recognized as an Expression.
Best, Hequn
On Wed, Aug 1, 2018 at 6:16
AM, Mich