Re: Converting a DataStream into a Table throws error

2018-08-01 Thread Mich Talebzadeh
Hi Jorn,

Here you go the dependencies

libraryDependencies += "org.apache.hadoop" % "hadoop-core" % "1.2.1"
libraryDependencies += "org.apache.hbase" % "hbase" % "1.2.6"
libraryDependencies += "org.apache.hbase" % "hbase-client" % "1.2.6"
libraryDependencies += "org.apache.hbase" % "hbase-common" % "1.2.6"
libraryDependencies += "org.apache.hbase" % "hbase-server" % "1.2.6"
libraryDependencies += "org.apache.flink" %% "flink-connector-kafka-0.11" %
"1.5.0"
libraryDependencies += "org.apache.flink" %% "flink-connector-kafka-base" %
"1.5.0"
libraryDependencies += "org.apache.flink" %% "flink-scala" % "1.5.0"
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.11.0.0"
libraryDependencies += "org.apache.flink" %% "flink-streaming-scala" %
"1.5.0"
libraryDependencies += "org.apache.flink" %% "flink-table" % "1.5.0" %
"provided"
libraryDependencies += "org.apache.kafka" %% "kafka" % "0.11.0.0"

Thanks

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 2 Aug 2018 at 06:19, Jörn Franke  wrote:

>
> How does your build.sbt looks especially dependencies?
> On 2. Aug 2018, at 00:44, Mich Talebzadeh 
> wrote:
>
> Changed as suggested
>
>val  streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment
>  val dataStream =  streamExecEnv
>.addSource(new FlinkKafkaConsumer011[String](topicsValue, new
> SimpleStringSchema(), properties))
>   val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv)
>   tableEnv.registerDataStream("table1", streamExecEnv, 'key, 'ticker,
> 'timeissued, 'price)
>
> Still the same error
>
> [info] Compiling 1 Scala source to
> /home/hduser/dba/bin/flink/md_streaming/target/scala-2.11/classes...
> [error]
> /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:139:
> overloaded method value registerDataStream with alternatives:
> [error]   [T](name: String, dataStream:
> org.apache.flink.streaming.api.datastream.DataStream[T], fields:
> String)Unit 
> [error]   [T](name: String, dataStream:
> org.apache.flink.streaming.api.datastream.DataStream[T])Unit
> [error]  cannot be applied to (String,
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment,
> Symbol, Symbol, Symbol, Symbol)
> [error]   tableEnv.registerDataStream("table1", streamExecEnv, 'key,
> 'ticker, 'timeissued, 'price)
> [error]^
> [error] one error found
> [error] (compile:compileIncremental) Compilation failed
> [error] Total time: 3 s, completed Aug 1, 2018 11:40:47 PM
>
> Thanks anyway.
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Wed, 1 Aug 2018 at 23:34, Fabian Hueske  wrote:
>
>> Hi,
>>
>> You have to pass the StreamExecutionEnvironment to the
>> getTableEnvironment() method, not the DataStream (or DataStreamSource).
>> Change
>>
>> val tableEnv = TableEnvironment.getTableEnvironment(dataStream)
>>
>> to
>>
>> val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv)
>>
>> Best,
>> Fabian
>>
>> 2018-08-02 0:10 GMT+02:00 Mich Talebzadeh :
>>
>>> Hi,
>>>
>>> FYI, these are my imports
>>>
>>> import java.util.Properties
>>> import java.util.Arrays
>>> import org.apache.flink.api.common.functions.MapFunction
>>> import
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
>>> import org.apache.flink.streaming.api.scala
>>> import org.apache.flink.streaming.util.serialization.SimpleStringSchema
>>> import org.apache.flink.table.api.TableEnvironment
>>> import org.apache.flink.table.api.scala._
>>> import org.apache.flink.api.scala._
>>> import org.apache.kafka.clients.consumer.ConsumerConfig
>>> import org.apache.kafka.clients.consumer.ConsumerRecord
>>> import org.apache.kafka.clients.consumer.KafkaConsumer
>>> import org.apache.flink.core.fs.FileSystem
>>> import org.apache.flink.streaming.api.TimeCharacteristic
>>> import org.slf4j.LoggerFactory
>>> import
>>> 

Re: Converting a DataStream into a Table throws error

2018-08-01 Thread Jörn Franke

How does your build.sbt looks especially dependencies?
> On 2. Aug 2018, at 00:44, Mich Talebzadeh  wrote:
> 
> Changed as suggested
> 
>val  streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment
>  val dataStream =  streamExecEnv
>.addSource(new FlinkKafkaConsumer011[String](topicsValue, new 
> SimpleStringSchema(), properties))
>   val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv)
>   tableEnv.registerDataStream("table1", streamExecEnv, 'key, 'ticker, 
> 'timeissued, 'price)
> 
> Still the same error
> 
> [info] Compiling 1 Scala source to 
> /home/hduser/dba/bin/flink/md_streaming/target/scala-2.11/classes...
> [error] 
> /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:139:
>  overloaded method value registerDataStream with alternatives:
> [error]   [T](name: String, dataStream: 
> org.apache.flink.streaming.api.datastream.DataStream[T], fields: String)Unit 
> 
> [error]   [T](name: String, dataStream: 
> org.apache.flink.streaming.api.datastream.DataStream[T])Unit
> [error]  cannot be applied to (String, 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment, 
> Symbol, Symbol, Symbol, Symbol)
> [error]   tableEnv.registerDataStream("table1", streamExecEnv, 'key, 'ticker, 
> 'timeissued, 'price)
> [error]^
> [error] one error found
> [error] (compile:compileIncremental) Compilation failed
> [error] Total time: 3 s, completed Aug 1, 2018 11:40:47 PM
> 
> Thanks anyway.
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> http://talebzadehmich.wordpress.com
> 
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
> damage or destruction of data or any other property which may arise from 
> relying on this email's technical content is explicitly disclaimed. The 
> author will in no case be liable for any monetary damages arising from such 
> loss, damage or destruction.
>  
> 
> 
>> On Wed, 1 Aug 2018 at 23:34, Fabian Hueske  wrote:
>> Hi,
>> 
>> You have to pass the StreamExecutionEnvironment to the getTableEnvironment() 
>> method, not the DataStream (or DataStreamSource).
>> Change 
>> 
>> val tableEnv = TableEnvironment.getTableEnvironment(dataStream)
>> 
>> to
>> 
>> val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv)
>> 
>> Best,
>> Fabian
>> 
>> 2018-08-02 0:10 GMT+02:00 Mich Talebzadeh :
>>> Hi,
>>> 
>>> FYI, these are my imports
>>> 
>>> import java.util.Properties
>>> import java.util.Arrays
>>> import org.apache.flink.api.common.functions.MapFunction
>>> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
>>> import org.apache.flink.streaming.api.scala
>>> import org.apache.flink.streaming.util.serialization.SimpleStringSchema
>>> import org.apache.flink.table.api.TableEnvironment
>>> import org.apache.flink.table.api.scala._
>>> import org.apache.flink.api.scala._
>>> import org.apache.kafka.clients.consumer.ConsumerConfig
>>> import org.apache.kafka.clients.consumer.ConsumerRecord
>>> import org.apache.kafka.clients.consumer.KafkaConsumer
>>> import org.apache.flink.core.fs.FileSystem
>>> import org.apache.flink.streaming.api.TimeCharacteristic
>>> import org.slf4j.LoggerFactory
>>> import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer011, 
>>> FlinkKafkaProducer011}
>>> import java.util.Calendar
>>> import java.util.Date
>>> import java.text.DateFormat
>>> import java.text.SimpleDateFormat
>>> import org.apache.log4j.Logger
>>> import org.apache.log4j.Level
>>> import sys.process.stringSeqToProcess
>>> import java.io.File
>>> 
>>> And this is the simple code
>>> 
>>> val properties = new Properties()
>>> properties.setProperty("bootstrap.servers", bootstrapServers)
>>> properties.setProperty("zookeeper.connect", zookeeperConnect)
>>> properties.setProperty("group.id", flinkAppName)
>>> properties.setProperty("auto.offset.reset", "latest")
>>> val  streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment
>>> val dataStream =  streamExecEnv
>>>.addSource(new FlinkKafkaConsumer011[String](topicsValue, new 
>>> SimpleStringSchema(), properties))
>>>   val tableEnv = TableEnvironment.getTableEnvironment(dataStream)
>>>   tableEnv.registerDataStream("table1", dataStream, 'key, 'ticker, 
>>> 'timeissued, 'price)
>>> 
>>> And this is the compilation error
>>> 
>>> info] Compiling 1 Scala source to 
>>> /home/hduser/dba/bin/flink/md_streaming/target/scala-2.11/classes...
>>> [error] 
>>> /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:138:
>>>  overloaded method value getTableEnvironment with alternatives:
>>> [error]   (executionEnvironment: 
>>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment)org.apache.flink.table.api.scala.StreamTableEnvironment
>>>  
>>> [error]   (executionEnvironment: 
>>> 

Re: Delay in REST/UI readiness during JM recovery

2018-08-01 Thread vino yang
Hi Joey,

Currently rest endpoints are hosted in JM. Your scenario is at JM failover,
and your cluster is running so many jobs. Here, it takes a certain amount
of time for ZK to conduct the Leader election. Then JM needs to wait for
the TM registration. So many jobs need to be restored and start running. It
is likely to go through a long period of time, so within this period. JM
can be quite busy and can cause web services to be unresponsive or slow to
respond.
But 20-45 minutes is really long, so you first need to confirm what caused
it. For example, if you reduce the cluster's job data by half, can the web
response time be much faster?

Thanks, vino.

2018-08-02 3:39 GMT+08:00 Joey Echeverria :

> Sorry to ping my own thread, but has anyone else encountered this?
>
> -Joey
>
> > On Jul 30, 2018, at 11:10 AM, Joey Echeverria 
> wrote:
> >
> > I’m running Flink 1.5.0 in Kubernetes with HA enabled, but only a single
> Job Manager running. I’m using Zookeeper to store the fencing/leader
> information and S3 to store the job manager state. We’ve been running
> around 250 or so streaming jobs and we’ve noticed that if the job manager
> pod is deleted, it takes something like 20-45 minutes for the job manager’s
> REST endpoints and web UI to become available. Until it becomes available,
> we get a 503 response from the HTTP server with the message "Could not
> retrieve the redirect address of the current leader. Please try to
> refresh.”.
> >
> > Has anyone else run into this?
> >
> > Are there any configuration settings I should be looking at to speed up
> the availability of the HTTP endpoints?
> >
> > Thanks!
> >
> > -Joey
>
>


Re: Old job resurrected during HA failover

2018-08-01 Thread vino yang
Hi Elias,

Your analysis is correct, yes, in theory the old jobgraph should be
deleted, but Flink currently uses the method of locking and asynchronously
deleting Path, so that it can not give you the acknowledgment of deleting,
so this is a risk point.

cc Till, there have been users who have encountered this problem before. I
personally think that asynchronous deletion may be risky, which may cause
JM to be revived by the cancel job after the failover.

Thanks, vino.

2018-08-02 5:25 GMT+08:00 Elias Levy :

> I can see in the logs that the JM 1 (10.210.22.167), that one that became
> leader after failover, thinks it deleted the
> 2a4eff355aef849c5ca37dbac04f2ff1 job from ZK when it was canceled:
>
> July 30th 2018, 15:32:27.231 Trying to cancel job with ID
> 2a4eff355aef849c5ca37dbac04f2ff1.
> July 30th 2018, 15:32:27.232 Job Some Job (2a4eff355aef849c5ca37dbac04f2ff1)
> switched from state RESTARTING to CANCELED.
> July 30th 2018, 15:32:27.232 Stopping checkpoint coordinator for job
> 2a4eff355aef849c5ca37dbac04f2ff1
> July 30th 2018, 15:32:27.239 Removed job graph
> 2a4eff355aef849c5ca37dbac04f2ff1 from ZooKeeper.
> July 30th 2018, 15:32:27.245 Removing /flink/cluster_1/checkpoints/
> 2a4eff355aef849c5ca37dbac04f2ff1 from ZooKeeper
> July 30th 2018, 15:32:27.251 Removing /checkpoint-counter/
> 2a4eff355aef849c5ca37dbac04f2ff1 from ZooKeeper
>
> Both /flink/cluster_1/checkpoints/2a4eff355aef849c5ca37dbac04f2ff1
> and /flink/cluster_1/checkpoint-counter/2a4eff355aef849c5ca37dbac04f2ff1
> no longer exist, but for some reason the job graph as is still there.
>
> Looking at the ZK logs I find the problem:
>
> July 30th 2018, 15:32:27.241 Got user-level KeeperException when
> processing sessionid:0x201d2330001 type:delete cxid:0x434c
> zxid:0x60009dd94 txntype:-1 reqpath:n/a Error Path:/flink/cluster_1/
> jobgraphs/2a4eff355aef849c5ca37dbac04f2ff1 Error:KeeperErrorCode =
> Directory not empty for /flink/cluster_1/jobgraphs/
> 2a4eff355aef849c5ca37dbac04f2ff1
>
> Looking in ZK, we see:
>
> [zk: localhost:2181(CONNECTED) 0] ls /flink/cluster_1/jobgraphs/
> 2a4eff355aef849c5ca37dbac04f2ff1
> [d833418c-891a-4b5e-b983-080be803275c]
>
> From the comments in ZooKeeperStateHandleStore.java I gather that this
> child node is used as a deletion lock.  Looking at the contents of this
> ephemeral lock node:
>
> [zk: localhost:2181(CONNECTED) 16] get /flink/cluster_1/jobgraphs/
> 2a4eff355aef849c5ca37dbac04f2ff1/d833418c-891a-4b5e-b983-080be803275c
> *10.210.42.62*
> cZxid = 0x60002ffa7
> ctime = Tue Jun 12 20:01:26 UTC 2018
> mZxid = 0x60002ffa7
> mtime = Tue Jun 12 20:01:26 UTC 2018
> pZxid = 0x60002ffa7
> cversion = 0
> dataVersion = 0
> aclVersion = 0
> ephemeralOwner = 0x3003f4a0003
> dataLength = 12
> numChildren = 0
>
> and compared to the ephemeral node lock of the currently running job:
>
> [zk: localhost:2181(CONNECTED) 17] get /flink/cluster_1/jobgraphs/
> d77948df92813a68ea6dfd6783f40e7e/596a4add-9f5c-4113-99ec-9c942fe91172
> *10.210.22.167*
> cZxid = 0x60009df4b
> ctime = Mon Jul 30 23:01:04 UTC 2018
> mZxid = 0x60009df4b
> mtime = Mon Jul 30 23:01:04 UTC 2018
> pZxid = 0x60009df4b
> cversion = 0
> dataVersion = 0
> aclVersion = 0
> ephemeralOwner = 0x201d2330001
> dataLength = 13
> numChildren = 0
>
> Assuming the content of the nodes represent the owner, it seems the job
> graph for the old canceled job, 2a4eff355aef849c5ca37dbac04f2ff1, is
> locked by the previous JM leader, JM 2(10.210.42.62), while the running job
> locked by the current JM leader, JM 1 (10.210.22.167).
>
> Somehow the previous leader, JM 2, did not give up the lock when
> leadership failed over to JM 2.
>
> Shouldn't something call ZooKeeperStateHandleStore.releaseAll during HA
> failover to release the locks on the graphs?
>
>
> On Wed, Aug 1, 2018 at 9:49 AM Elias Levy 
> wrote:
>
>> Thanks for the reply.  Looking in ZK I see:
>>
>> [zk: localhost:2181(CONNECTED) 5] ls /flink/cluster_1/jobgraphs
>> [d77948df92813a68ea6dfd6783f40e7e, 2a4eff355aef849c5ca37dbac04f2ff1]
>>
>> Again we see HA state for job 2a4eff355aef849c5ca37dbac04f2ff1, even
>> though that job is no longer running (it was canceled while it was in a
>> loop attempting to restart, but failing because of a lack of cluster slots).
>>
>> Any idea why that may be the case?
>>
>>>


Re: Flink log and out files

2018-08-01 Thread vino yang
Hi Alexander,

Yes, usually if you configure the log output as a .log file, the .out file
will not have content, but sometimes some fatal exceptions, such as JVM
exit exceptions, may be printed to the .out file.
If Flink logs appear in your .out file, and it seems that the format of the
log is inconsistent with the format you configured in the configuration
file, it is likely that SLF4j detects that there are multiple interface
implementation bindings in your jar dependency.
This will cause the log output to be abnormal, which we have encountered
before.

Thanks, vino.

2018-08-02 3:27 GMT+08:00 Alexander Smirnov :

> thanks guys,
>
> So, is it a correct statement - if my job doesn't write anything to
> stdout, the "*.out" file should be empty?
>
> for some reason it contains the same info as "log" and much more.
>
> For the "log" files, I can control rotation via log4j configuration, but
> how do I setup rotation for "out" files?
> Or, how do I disable them at all?
>
> I'm using 1.4.2
>
> Thank you,
> Alex
>
> On Wed, Aug 1, 2018 at 7:00 PM Andrey Zagrebin 
> wrote:
>
>> Hi Alexander,
>>
>> there is also a doc link where log configuration  is described:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/monitoring/
>> logging.html
>> You can modify log configuration in conf directory according to logging
>> framework docs.
>>
>> Cheers,
>> Andrey
>>
>>
>> On 1 Aug 2018, at 17:30, vino yang  wrote:
>>
>> Hi Alexander:
>>
>> .log and .out are different. Usually, the .log file stores the log
>> information output by the log framework. Flink uses slf4j as the log
>> interface and supports log4j and logback configurations. The .out file
>> stores the STDOUT information. This information is usually output by you
>> calling some APIs such as the print sink API.
>>
>> Thanks, vino.
>>
>> 2018-08-01 23:19 GMT+08:00 Alexander Smirnov <
>> alexander.smirn...@gmail.com>:
>>
>>> Hi,
>>>
>>> could you please explain the difference between *.log and *.out files in
>>> Flink?
>>> What information is supposed to be in each of them?
>>> Is "log" a subset of "out"?
>>> How do I setup rotation with gzipping?
>>>
>>> Thank you,
>>> Alex
>>>
>>
>>
>>


Re: Converting a DataStream into a Table throws error

2018-08-01 Thread vino yang
Hi Mich,

It seems that the type of your DataStream stream is always wrong.
If you want to specify four fields, usually the DataStream type should be
similar: DataStream[(Type1, Type2, Type3, Type4)], not DataStream[String],
you can try it.

Thanks, vino

2018-08-02 6:44 GMT+08:00 Mich Talebzadeh :

> Changed as suggested
>
>val  streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment
>  val dataStream =  streamExecEnv
>.addSource(new FlinkKafkaConsumer011[String](topicsValue, new
> SimpleStringSchema(), properties))
>   val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv)
>   tableEnv.registerDataStream("table1", streamExecEnv, 'key, 'ticker,
> 'timeissued, 'price)
>
> Still the same error
>
> [info] Compiling 1 Scala source to /home/hduser/dba/bin/flink/md_
> streaming/target/scala-2.11/classes...
> [error] /home/hduser/dba/bin/flink/md_streaming/src/main/scala/
> myPackage/md_streaming.scala:139: overloaded method value
> registerDataStream with alternatives:
> [error]   [T](name: String, dataStream: 
> org.apache.flink.streaming.api.datastream.DataStream[T],
> fields: String)Unit 
> [error]   [T](name: String, dataStream: org.apache.flink.streaming.
> api.datastream.DataStream[T])Unit
> [error]  cannot be applied to (String, org.apache.flink.streaming.
> api.environment.StreamExecutionEnvironment, Symbol, Symbol, Symbol,
> Symbol)
> [error]   tableEnv.registerDataStream("table1", streamExecEnv, 'key,
> 'ticker, 'timeissued, 'price)
> [error]^
> [error] one error found
> [error] (compile:compileIncremental) Compilation failed
> [error] Total time: 3 s, completed Aug 1, 2018 11:40:47 PM
>
> Thanks anyway.
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Wed, 1 Aug 2018 at 23:34, Fabian Hueske  wrote:
>
>> Hi,
>>
>> You have to pass the StreamExecutionEnvironment to the
>> getTableEnvironment() method, not the DataStream (or DataStreamSource).
>> Change
>>
>> val tableEnv = TableEnvironment.getTableEnvironment(dataStream)
>>
>> to
>>
>> val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv)
>>
>> Best,
>> Fabian
>>
>> 2018-08-02 0:10 GMT+02:00 Mich Talebzadeh :
>>
>>> Hi,
>>>
>>> FYI, these are my imports
>>>
>>> import java.util.Properties
>>> import java.util.Arrays
>>> import org.apache.flink.api.common.functions.MapFunction
>>> import org.apache.flink.streaming.api.environment.
>>> StreamExecutionEnvironment
>>> import org.apache.flink.streaming.api.scala
>>> import org.apache.flink.streaming.util.serialization.SimpleStringSchema
>>> import org.apache.flink.table.api.TableEnvironment
>>> import org.apache.flink.table.api.scala._
>>> import org.apache.flink.api.scala._
>>> import org.apache.kafka.clients.consumer.ConsumerConfig
>>> import org.apache.kafka.clients.consumer.ConsumerRecord
>>> import org.apache.kafka.clients.consumer.KafkaConsumer
>>> import org.apache.flink.core.fs.FileSystem
>>> import org.apache.flink.streaming.api.TimeCharacteristic
>>> import org.slf4j.LoggerFactory
>>> import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer011,
>>> FlinkKafkaProducer011}
>>> import java.util.Calendar
>>> import java.util.Date
>>> import java.text.DateFormat
>>> import java.text.SimpleDateFormat
>>> import org.apache.log4j.Logger
>>> import org.apache.log4j.Level
>>> import sys.process.stringSeqToProcess
>>> import java.io.File
>>>
>>> And this is the simple code
>>>
>>> val properties = new Properties()
>>> properties.setProperty("bootstrap.servers", bootstrapServers)
>>> properties.setProperty("zookeeper.connect", zookeeperConnect)
>>> properties.setProperty("group.id", flinkAppName)
>>> properties.setProperty("auto.offset.reset", "latest")
>>> val  streamExecEnv = StreamExecutionEnvironment.
>>> getExecutionEnvironment
>>> val dataStream =  streamExecEnv
>>>.addSource(new FlinkKafkaConsumer011[String](topicsValue, new
>>> SimpleStringSchema(), properties))
>>>   val tableEnv = TableEnvironment.getTableEnvironment(dataStream)
>>>   tableEnv.registerDataStream("table1", dataStream, 'key, 'ticker,
>>> 'timeissued, 'price)
>>>
>>> And this is the compilation error
>>>
>>> info] Compiling 1 Scala source to /home/hduser/dba/bin/flink/md_
>>> streaming/target/scala-2.11/classes...
>>> [error] /home/hduser/dba/bin/flink/md_streaming/src/main/scala/
>>> myPackage/md_streaming.scala:138: overloaded method value
>>> getTableEnvironment with 

Re: Converting a DataStream into a Table throws error

2018-08-01 Thread Mich Talebzadeh
Changed as suggested

   val  streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment
 val dataStream =  streamExecEnv
   .addSource(new FlinkKafkaConsumer011[String](topicsValue, new
SimpleStringSchema(), properties))
  val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv)
  tableEnv.registerDataStream("table1", streamExecEnv, 'key, 'ticker,
'timeissued, 'price)

Still the same error

[info] Compiling 1 Scala source to
/home/hduser/dba/bin/flink/md_streaming/target/scala-2.11/classes...
[error]
/home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:139:
overloaded method value registerDataStream with alternatives:
[error]   [T](name: String, dataStream:
org.apache.flink.streaming.api.datastream.DataStream[T], fields:
String)Unit 
[error]   [T](name: String, dataStream:
org.apache.flink.streaming.api.datastream.DataStream[T])Unit
[error]  cannot be applied to (String,
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment,
Symbol, Symbol, Symbol, Symbol)
[error]   tableEnv.registerDataStream("table1", streamExecEnv, 'key,
'ticker, 'timeissued, 'price)
[error]^
[error] one error found
[error] (compile:compileIncremental) Compilation failed
[error] Total time: 3 s, completed Aug 1, 2018 11:40:47 PM

Thanks anyway.

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Wed, 1 Aug 2018 at 23:34, Fabian Hueske  wrote:

> Hi,
>
> You have to pass the StreamExecutionEnvironment to the
> getTableEnvironment() method, not the DataStream (or DataStreamSource).
> Change
>
> val tableEnv = TableEnvironment.getTableEnvironment(dataStream)
>
> to
>
> val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv)
>
> Best,
> Fabian
>
> 2018-08-02 0:10 GMT+02:00 Mich Talebzadeh :
>
>> Hi,
>>
>> FYI, these are my imports
>>
>> import java.util.Properties
>> import java.util.Arrays
>> import org.apache.flink.api.common.functions.MapFunction
>> import
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
>> import org.apache.flink.streaming.api.scala
>> import org.apache.flink.streaming.util.serialization.SimpleStringSchema
>> import org.apache.flink.table.api.TableEnvironment
>> import org.apache.flink.table.api.scala._
>> import org.apache.flink.api.scala._
>> import org.apache.kafka.clients.consumer.ConsumerConfig
>> import org.apache.kafka.clients.consumer.ConsumerRecord
>> import org.apache.kafka.clients.consumer.KafkaConsumer
>> import org.apache.flink.core.fs.FileSystem
>> import org.apache.flink.streaming.api.TimeCharacteristic
>> import org.slf4j.LoggerFactory
>> import
>> org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer011,
>> FlinkKafkaProducer011}
>> import java.util.Calendar
>> import java.util.Date
>> import java.text.DateFormat
>> import java.text.SimpleDateFormat
>> import org.apache.log4j.Logger
>> import org.apache.log4j.Level
>> import sys.process.stringSeqToProcess
>> import java.io.File
>>
>> And this is the simple code
>>
>> val properties = new Properties()
>> properties.setProperty("bootstrap.servers", bootstrapServers)
>> properties.setProperty("zookeeper.connect", zookeeperConnect)
>> properties.setProperty("group.id", flinkAppName)
>> properties.setProperty("auto.offset.reset", "latest")
>> val  streamExecEnv =
>> StreamExecutionEnvironment.getExecutionEnvironment
>> val dataStream =  streamExecEnv
>>.addSource(new FlinkKafkaConsumer011[String](topicsValue, new
>> SimpleStringSchema(), properties))
>>   val tableEnv = TableEnvironment.getTableEnvironment(dataStream)
>>   tableEnv.registerDataStream("table1", dataStream, 'key, 'ticker,
>> 'timeissued, 'price)
>>
>> And this is the compilation error
>>
>> info] Compiling 1 Scala source to
>> /home/hduser/dba/bin/flink/md_streaming/target/scala-2.11/classes...
>> [error]
>> /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:138:
>> overloaded method value getTableEnvironment with alternatives:
>> [error]   (executionEnvironment:
>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment)org.apache.flink.table.api.scala.StreamTableEnvironment
>> 
>> [error]   (executionEnvironment:
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment)org.apache.flink.table.api.java.StreamTableEnvironment
>> 
>> [error]   (executionEnvironment:
>> 

Re: Converting a DataStream into a Table throws error

2018-08-01 Thread Fabian Hueske
Hi,

You have to pass the StreamExecutionEnvironment to the
getTableEnvironment() method, not the DataStream (or DataStreamSource).
Change

val tableEnv = TableEnvironment.getTableEnvironment(dataStream)

to

val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv)

Best,
Fabian

2018-08-02 0:10 GMT+02:00 Mich Talebzadeh :

> Hi,
>
> FYI, these are my imports
>
> import java.util.Properties
> import java.util.Arrays
> import org.apache.flink.api.common.functions.MapFunction
> import org.apache.flink.streaming.api.environment.StreamExecutionEn
> vironment
> import org.apache.flink.streaming.api.scala
> import org.apache.flink.streaming.util.serialization.SimpleStringSchema
> import org.apache.flink.table.api.TableEnvironment
> import org.apache.flink.table.api.scala._
> import org.apache.flink.api.scala._
> import org.apache.kafka.clients.consumer.ConsumerConfig
> import org.apache.kafka.clients.consumer.ConsumerRecord
> import org.apache.kafka.clients.consumer.KafkaConsumer
> import org.apache.flink.core.fs.FileSystem
> import org.apache.flink.streaming.api.TimeCharacteristic
> import org.slf4j.LoggerFactory
> import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer011,
> FlinkKafkaProducer011}
> import java.util.Calendar
> import java.util.Date
> import java.text.DateFormat
> import java.text.SimpleDateFormat
> import org.apache.log4j.Logger
> import org.apache.log4j.Level
> import sys.process.stringSeqToProcess
> import java.io.File
>
> And this is the simple code
>
> val properties = new Properties()
> properties.setProperty("bootstrap.servers", bootstrapServers)
> properties.setProperty("zookeeper.connect", zookeeperConnect)
> properties.setProperty("group.id", flinkAppName)
> properties.setProperty("auto.offset.reset", "latest")
> val  streamExecEnv = StreamExecutionEnvironment.get
> ExecutionEnvironment
> val dataStream =  streamExecEnv
>.addSource(new FlinkKafkaConsumer011[String](topicsValue, new
> SimpleStringSchema(), properties))
>   val tableEnv = TableEnvironment.getTableEnvironment(dataStream)
>   tableEnv.registerDataStream("table1", dataStream, 'key, 'ticker,
> 'timeissued, 'price)
>
> And this is the compilation error
>
> info] Compiling 1 Scala source to /home/hduser/dba/bin/flink/md_
> streaming/target/scala-2.11/classes...
> [error] 
> /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:138:
> overloaded method value getTableEnvironment with alternatives:
> [error]   (executionEnvironment: org.apache.flink.streaming.api
> .scala.StreamExecutionEnvironment)org.apache.flink.table.api.scala.StreamTableEnvironment
> 
> [error]   (executionEnvironment: org.apache.flink.streaming.api
> .environment.StreamExecutionEnvironment)org.apache.flink.table.api.java.StreamTableEnvironment
> 
> [error]   (executionEnvironment: org.apache.flink.api.scala.Exe
> cutionEnvironment)org.apache.flink.table.api.scala.BatchTableEnvironment
> 
> [error]   (executionEnvironment: org.apache.flink.api.java.Exec
> utionEnvironment)org.apache.flink.table.api.java.BatchTableEnvironment
> [error]  cannot be applied to (org.apache.flink.streaming.ap
> i.datastream.DataStreamSource[String])
> [error]   val tableEnv = TableEnvironment.getTableEnvironment(dataStream)
> [error]   ^
> [error] one error found
> [error] (compile:compileIncremental) Compilation failed
> [error] Total time: 3 s, completed Aug 1, 2018 11:02:33 PM
> Completed compiling
>
> which is really strange
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Wed, 1 Aug 2018 at 13:42, Fabian Hueske  wrote:
>
>> Hi I think you are mixing Java and Scala dependencies.
>>
>> org.apache.flink.streaming.api.datastream.DataStream is the DataStream
>> of the Java DataStream API.
>> You should use the DataStream of the Scala DataStream API.
>>
>> Best, Fabian
>>
>> 2018-08-01 14:01 GMT+02:00 Mich Talebzadeh :
>>
>>> Hi,
>>>
>>> I believed I tried Hequn's suggestion and tried again
>>>
>>> import org.apache.flink.table.api.Table
>>> import org.apache.flink.table.api.TableEnvironment
>>>
>>> *import org.apache.flink.table.api.scala._*
>>> Unfortunately I am still getting the same error!
>>>
>>> [info] Compiling 1 Scala source to /home/hduser/dba/bin/flink/md_
>>> streaming/target/scala-2.11/classes...
>>> [error] 
>>> 

Re: Counting elements that appear "behind" the watermark

2018-08-01 Thread Julio Biason
Awesome, thanks Elias!

On Tue, Jul 31, 2018 at 10:02 PM, Elias Levy 
wrote:

> Correct.  Context gives you access to the element timestamp
> .
> But it also gives you access to the current watermark via timerService
> 
>  ->
> currentWatermark
> 
> .
>
> On Tue, Jul 31, 2018 at 7:45 AM Julio Biason 
> wrote:
>
>> Thanks for the tips. Unfortunately, it seems `Context` only have
>> information from the element being processed (https://github.com/apache/
>> flink/blob/master/flink-streaming-java/src/main/java/
>> org/apache/flink/streaming/api/functions/ProcessFunction.java#L91) and
>> the RuntimeContext doesn't have access to any watermark information (
>> https://github.com/apache/flink/blob/master/flink-core/
>> src/main/java/org/apache/flink/api/common/functions/
>> RuntimeContext.java#L57).
>>
>>


-- 
*Julio Biason*, Sofware Engineer
*AZION*  |  Deliver. Accelerate. Protect.
Office: +55 51 3083 8101   |  Mobile: +55 51
*99907 0554*


Re: Converting a DataStream into a Table throws error

2018-08-01 Thread Mich Talebzadeh
Hi,

FYI, these are my imports

import java.util.Properties
import java.util.Arrays
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._
import org.apache.flink.api.scala._
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.flink.core.fs.FileSystem
import org.apache.flink.streaming.api.TimeCharacteristic
import org.slf4j.LoggerFactory
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer011,
FlinkKafkaProducer011}
import java.util.Calendar
import java.util.Date
import java.text.DateFormat
import java.text.SimpleDateFormat
import org.apache.log4j.Logger
import org.apache.log4j.Level
import sys.process.stringSeqToProcess
import java.io.File

And this is the simple code

val properties = new Properties()
properties.setProperty("bootstrap.servers", bootstrapServers)
properties.setProperty("zookeeper.connect", zookeeperConnect)
properties.setProperty("group.id", flinkAppName)
properties.setProperty("auto.offset.reset", "latest")
val  streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment
val dataStream =  streamExecEnv
   .addSource(new FlinkKafkaConsumer011[String](topicsValue, new
SimpleStringSchema(), properties))
  val tableEnv = TableEnvironment.getTableEnvironment(dataStream)
  tableEnv.registerDataStream("table1", dataStream, 'key, 'ticker,
'timeissued, 'price)

And this is the compilation error

info] Compiling 1 Scala source to
/home/hduser/dba/bin/flink/md_streaming/target/scala-2.11/classes...
[error]
/home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:138:
overloaded method value getTableEnvironment with alternatives:
[error]   (executionEnvironment:
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment)org.apache.flink.table.api.scala.StreamTableEnvironment

[error]   (executionEnvironment:
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment)org.apache.flink.table.api.java.StreamTableEnvironment

[error]   (executionEnvironment:
org.apache.flink.api.scala.ExecutionEnvironment)org.apache.flink.table.api.scala.BatchTableEnvironment

[error]   (executionEnvironment:
org.apache.flink.api.java.ExecutionEnvironment)org.apache.flink.table.api.java.BatchTableEnvironment
[error]  cannot be applied to
(org.apache.flink.streaming.api.datastream.DataStreamSource[String])
[error]   val tableEnv = TableEnvironment.getTableEnvironment(dataStream)
[error]   ^
[error] one error found
[error] (compile:compileIncremental) Compilation failed
[error] Total time: 3 s, completed Aug 1, 2018 11:02:33 PM
Completed compiling

which is really strange

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Wed, 1 Aug 2018 at 13:42, Fabian Hueske  wrote:

> Hi I think you are mixing Java and Scala dependencies.
>
> org.apache.flink.streaming.api.datastream.DataStream is the DataStream of
> the Java DataStream API.
> You should use the DataStream of the Scala DataStream API.
>
> Best, Fabian
>
> 2018-08-01 14:01 GMT+02:00 Mich Talebzadeh :
>
>> Hi,
>>
>> I believed I tried Hequn's suggestion and tried again
>>
>> import org.apache.flink.table.api.Table
>> import org.apache.flink.table.api.TableEnvironment
>>
>> *import org.apache.flink.table.api.scala._*
>> Unfortunately I am still getting the same error!
>>
>> [info] Compiling 1 Scala source to
>> /home/hduser/dba/bin/flink/md_streaming/target/scala-2.11/classes...
>> [error]
>> /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:151:
>> overloaded method value fromDataStream with alternatives:
>> [error]   [T](dataStream:
>> org.apache.flink.streaming.api.datastream.DataStream[T], fields:
>> String)org.apache.flink.table.api.Table 
>> [error]   [T](dataStream:
>> org.apache.flink.streaming.api.datastream.DataStream[T])org.apache.flink.table.api.Table
>> [error]  cannot be applied to
>> (org.apache.flink.streaming.api.datastream.DataStreamSource[String],
>> Symbol, Symbol, Symbol, Symbol)
>> [error]   val table1: Table = tableEnv.fromDataStream(dataStream, 'key,
>> 

Re: Old job resurrected during HA failover

2018-08-01 Thread Elias Levy
I can see in the logs that the JM 1 (10.210.22.167), that one that became
leader after failover, thinks it deleted
the 2a4eff355aef849c5ca37dbac04f2ff1 job from ZK when it was canceled:

July 30th 2018, 15:32:27.231 Trying to cancel job with ID
2a4eff355aef849c5ca37dbac04f2ff1.
July 30th 2018, 15:32:27.232 Job Some Job
(2a4eff355aef849c5ca37dbac04f2ff1) switched from state RESTARTING to
CANCELED.
July 30th 2018, 15:32:27.232 Stopping checkpoint coordinator for job
2a4eff355aef849c5ca37dbac04f2ff1
July 30th 2018, 15:32:27.239 Removed job graph
2a4eff355aef849c5ca37dbac04f2ff1 from ZooKeeper.
July 30th 2018, 15:32:27.245 Removing
/flink/cluster_1/checkpoints/2a4eff355aef849c5ca37dbac04f2ff1 from ZooKeeper
July 30th 2018, 15:32:27.251 Removing
/checkpoint-counter/2a4eff355aef849c5ca37dbac04f2ff1 from ZooKeeper

Both /flink/cluster_1/checkpoints/2a4eff355aef849c5ca37dbac04f2ff1
and /flink/cluster_1/checkpoint-counter/2a4eff355aef849c5ca37dbac04f2ff1 no
longer exist, but for some reason the job graph as is still there.

Looking at the ZK logs I find the problem:

July 30th 2018, 15:32:27.241 Got user-level KeeperException when processing
sessionid:0x201d2330001 type:delete cxid:0x434c zxid:0x60009dd94
txntype:-1 reqpath:n/a Error
Path:/flink/cluster_1/jobgraphs/2a4eff355aef849c5ca37dbac04f2ff1
Error:KeeperErrorCode = Directory not empty for
/flink/cluster_1/jobgraphs/2a4eff355aef849c5ca37dbac04f2ff1

Looking in ZK, we see:

[zk: localhost:2181(CONNECTED) 0] ls
/flink/cluster_1/jobgraphs/2a4eff355aef849c5ca37dbac04f2ff1
[d833418c-891a-4b5e-b983-080be803275c]

>From the comments in ZooKeeperStateHandleStore.java I gather that this
child node is used as a deletion lock.  Looking at the contents of this
ephemeral lock node:

[zk: localhost:2181(CONNECTED) 16] get
/flink/cluster_1/jobgraphs/2a4eff355aef849c5ca37dbac04f2ff1/d833418c-891a-4b5e-b983-080be803275c
*10.210.42.62*
cZxid = 0x60002ffa7
ctime = Tue Jun 12 20:01:26 UTC 2018
mZxid = 0x60002ffa7
mtime = Tue Jun 12 20:01:26 UTC 2018
pZxid = 0x60002ffa7
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x3003f4a0003
dataLength = 12
numChildren = 0

and compared to the ephemeral node lock of the currently running job:

[zk: localhost:2181(CONNECTED) 17] get
/flink/cluster_1/jobgraphs/d77948df92813a68ea6dfd6783f40e7e/596a4add-9f5c-4113-99ec-9c942fe91172
*10.210.22.167*
cZxid = 0x60009df4b
ctime = Mon Jul 30 23:01:04 UTC 2018
mZxid = 0x60009df4b
mtime = Mon Jul 30 23:01:04 UTC 2018
pZxid = 0x60009df4b
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x201d2330001
dataLength = 13
numChildren = 0

Assuming the content of the nodes represent the owner, it seems the job
graph for the old canceled job, 2a4eff355aef849c5ca37dbac04f2ff1, is locked
by the previous JM leader, JM 2(10.210.42.62), while the running job locked
by the current JM leader, JM 1 (10.210.22.167).

Somehow the previous leader, JM 2, did not give up the lock when leadership
failed over to JM 2.

Shouldn't something call ZooKeeperStateHandleStore.releaseAll during HA
failover to release the locks on the graphs?


On Wed, Aug 1, 2018 at 9:49 AM Elias Levy 
wrote:

> Thanks for the reply.  Looking in ZK I see:
>
> [zk: localhost:2181(CONNECTED) 5] ls /flink/cluster_1/jobgraphs
> [d77948df92813a68ea6dfd6783f40e7e, 2a4eff355aef849c5ca37dbac04f2ff1]
>
> Again we see HA state for job 2a4eff355aef849c5ca37dbac04f2ff1, even
> though that job is no longer running (it was canceled while it was in a
> loop attempting to restart, but failing because of a lack of cluster slots).
>
> Any idea why that may be the case?
>
>>


Re: Delay in REST/UI readiness during JM recovery

2018-08-01 Thread Joey Echeverria
Sorry to ping my own thread, but has anyone else encountered this?

-Joey

> On Jul 30, 2018, at 11:10 AM, Joey Echeverria  wrote:
> 
> I’m running Flink 1.5.0 in Kubernetes with HA enabled, but only a single Job 
> Manager running. I’m using Zookeeper to store the fencing/leader information 
> and S3 to store the job manager state. We’ve been running around 250 or so 
> streaming jobs and we’ve noticed that if the job manager pod is deleted, it 
> takes something like 20-45 minutes for the job manager’s REST endpoints and 
> web UI to become available. Until it becomes available, we get a 503 response 
> from the HTTP server with the message "Could not retrieve the redirect 
> address of the current leader. Please try to refresh.”.
> 
> Has anyone else run into this?
> 
> Are there any configuration settings I should be looking at to speed up the 
> availability of the HTTP endpoints?
> 
> Thanks!
> 
> -Joey



Re: Flink log and out files

2018-08-01 Thread Alexander Smirnov
thanks guys,

So, is it a correct statement - if my job doesn't write anything to stdout,
the "*.out" file should be empty?

for some reason it contains the same info as "log" and much more.

For the "log" files, I can control rotation via log4j configuration, but
how do I setup rotation for "out" files?
Or, how do I disable them at all?

I'm using 1.4.2

Thank you,
Alex

On Wed, Aug 1, 2018 at 7:00 PM Andrey Zagrebin 
wrote:

> Hi Alexander,
>
> there is also a doc link where log configuration  is described:
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/monitoring/logging.html
> You can modify log configuration in conf directory according to logging
> framework docs.
>
> Cheers,
> Andrey
>
>
> On 1 Aug 2018, at 17:30, vino yang  wrote:
>
> Hi Alexander:
>
> .log and .out are different. Usually, the .log file stores the log
> information output by the log framework. Flink uses slf4j as the log
> interface and supports log4j and logback configurations. The .out file
> stores the STDOUT information. This information is usually output by you
> calling some APIs such as the print sink API.
>
> Thanks, vino.
>
> 2018-08-01 23:19 GMT+08:00 Alexander Smirnov  >:
>
>> Hi,
>>
>> could you please explain the difference between *.log and *.out files in
>> Flink?
>> What information is supposed to be in each of them?
>> Is "log" a subset of "out"?
>> How do I setup rotation with gzipping?
>>
>> Thank you,
>> Alex
>>
>
>
>


Re: Old job resurrected during HA failover

2018-08-01 Thread Elias Levy
Vino,

Thanks for the reply.  Looking in ZK I see:

[zk: localhost:2181(CONNECTED) 5] ls /flink/cluster_1/jobgraphs
[d77948df92813a68ea6dfd6783f40e7e, 2a4eff355aef849c5ca37dbac04f2ff1]

Again we see HA state for job 2a4eff355aef849c5ca37dbac04f2ff1, even though
that job is no longer running (it was canceled while it was in a loop
attempting to restart, but failing because of a lack of cluster slots).

Any idea why that may be the case?


On Wed, Aug 1, 2018 at 8:38 AM vino yang  wrote:

> If a job is explicitly canceled, its jobgraph node on ZK will be deleted.
> However, it is worth noting here that Flink enables a background thread to
> asynchronously delete the jobGraph node,
> so there may be cases where it cannot be deleted.
> On the other hand, the jobgraph node on ZK is the only basis for the JM
> leader to restore the job.
> There may be an unexpected recovery or an old job resurrection.
>


Re: Dynamical Windows

2018-08-01 Thread antonio saldivar
Awesome, thank you very much I will try to do it with key selector to send
the key from the front end

El mié., 1 ago. 2018 a las 11:57, vino yang ()
escribió:

> Sorry, the KeySelector's Java doc is here :
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/java/functions/KeySelector.html
>
> 2018-08-01 23:57 GMT+08:00 vino yang :
>
>> Hi antonio,
>>
>> The keyBy API can accept a KeySelector [1] which is a interface you can
>> implement to specify the key for your business.
>>
>> I think you can use it and implement its getKey method. In the method,
>> you can access outer system (such as Zookeeper) to get a dynamic key.
>>
>> It's just an idea, you can try it.
>>
>> Thanks, vino.
>>
>>
>> 2018-08-01 23:46 GMT+08:00 antonio saldivar :
>>
>>> Hello
>>>
>>> I am developing a Flink 1.4.2 application currently with sliding windows
>>> (Example below)
>>> I want to ask if there is a way to create the window time dynamically
>>> also the key has to change in some Use Cases and we don't want to create an
>>> specific window for each UC
>>>
>>> I want to send those values from the front end
>>>
>>> SingleOutputStreamOperator windowed = ObjectDTO
>>>
>>> .keyBy("Key")
>>>
>>> .timeWindow(Time.minutes(10),Time.minutes(1))
>>>
>>> .trigger(new AlertTrigger(env.getStreamTimeCharacteristic()))
>>>
>>> .aggregate(new TxnAggregator(), new TxnWindowFn())
>>>
>>> .name("TEN_MINUTES_WINDOW")
>>>
>>>
>>> Thank you
>>> Best Regards
>>>
>>
>>
>


Re: Flink log and out files

2018-08-01 Thread Andrey Zagrebin
Hi Alexander,

there is also a doc link where log configuration  is described:
https://ci.apache.org/projects/flink/flink-docs-release-1.5/monitoring/logging.html
 

You can modify log configuration in conf directory according to logging 
framework docs.

Cheers,
Andrey

> On 1 Aug 2018, at 17:30, vino yang  wrote:
> 
> Hi Alexander:
> 
> .log and .out are different. Usually, the .log file stores the log 
> information output by the log framework. Flink uses slf4j as the log 
> interface and supports log4j and logback configurations. The .out file stores 
> the STDOUT information. This information is usually output by you calling 
> some APIs such as the print sink API.
> 
> Thanks, vino.
> 
> 2018-08-01 23:19 GMT+08:00 Alexander Smirnov  >:
> Hi,
> 
> could you please explain the difference between *.log and *.out files in 
> Flink?
> What information is supposed to be in each of them?
> Is "log" a subset of "out"?
> How do I setup rotation with gzipping?
> 
> Thank you,
> Alex
> 



Re: Dynamical Windows

2018-08-01 Thread vino yang
Sorry, the KeySelector's Java doc is here :
https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/java/functions/KeySelector.html

2018-08-01 23:57 GMT+08:00 vino yang :

> Hi antonio,
>
> The keyBy API can accept a KeySelector [1] which is a interface you can
> implement to specify the key for your business.
>
> I think you can use it and implement its getKey method. In the method, you
> can access outer system (such as Zookeeper) to get a dynamic key.
>
> It's just an idea, you can try it.
>
> Thanks, vino.
>
>
> 2018-08-01 23:46 GMT+08:00 antonio saldivar :
>
>> Hello
>>
>> I am developing a Flink 1.4.2 application currently with sliding windows
>> (Example below)
>> I want to ask if there is a way to create the window time dynamically
>> also the key has to change in some Use Cases and we don't want to create an
>> specific window for each UC
>>
>> I want to send those values from the front end
>>
>> SingleOutputStreamOperator windowed = ObjectDTO
>>
>> .keyBy("Key")
>>
>> .timeWindow(Time.minutes(10),Time.minutes(1))
>>
>> .trigger(new AlertTrigger(env.getStreamTimeCharacteristic()))
>>
>> .aggregate(new TxnAggregator(), new TxnWindowFn())
>>
>> .name("TEN_MINUTES_WINDOW")
>>
>>
>> Thank you
>> Best Regards
>>
>
>


Re: Dynamical Windows

2018-08-01 Thread vino yang
Hi antonio,

The keyBy API can accept a KeySelector [1] which is a interface you can
implement to specify the key for your business.

I think you can use it and implement its getKey method. In the method, you
can access outer system (such as Zookeeper) to get a dynamic key.

It's just an idea, you can try it.

Thanks, vino.


2018-08-01 23:46 GMT+08:00 antonio saldivar :

> Hello
>
> I am developing a Flink 1.4.2 application currently with sliding windows
> (Example below)
> I want to ask if there is a way to create the window time dynamically also
> the key has to change in some Use Cases and we don't want to create an
> specific window for each UC
>
> I want to send those values from the front end
>
> SingleOutputStreamOperator windowed = ObjectDTO
>
> .keyBy("Key")
>
> .timeWindow(Time.minutes(10),Time.minutes(1))
>
> .trigger(new AlertTrigger(env.getStreamTimeCharacteristic()))
>
> .aggregate(new TxnAggregator(), new TxnWindowFn())
>
> .name("TEN_MINUTES_WINDOW")
>
>
> Thank you
> Best Regards
>


Dynamical Windows

2018-08-01 Thread antonio saldivar
Hello

I am developing a Flink 1.4.2 application currently with sliding windows
(Example below)
I want to ask if there is a way to create the window time dynamically also
the key has to change in some Use Cases and we don't want to create an
specific window for each UC

I want to send those values from the front end

SingleOutputStreamOperator windowed = ObjectDTO

.keyBy("Key")

.timeWindow(Time.minutes(10),Time.minutes(1))

.trigger(new AlertTrigger(env.getStreamTimeCharacteristic()))

.aggregate(new TxnAggregator(), new TxnWindowFn())

.name("TEN_MINUTES_WINDOW")


Thank you
Best Regards


Re: Rest API calls

2018-08-01 Thread Rong Rong
Hi Yuvraj,

Vino is right, having a customized function is probably the easiest at this
moment.

Alternatively, I think what you are looking for is very much similar to
side-input feature of data stream[2].

Thanks,
Rong

[2] FLIP-17:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API

On Wed, Aug 1, 2018 at 5:10 AM vino yang  wrote:

> Hi yuvraj,
>
> Sorry, I didn't see it clearly. I think in many UDF contexts, such as
> MapFunction, ProcessFunction, etc., you can access the Rest API as a
> client.
> Also if you want to improve performance, maybe async I/O will help you[1].
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/operators/asyncio.html
>
> Thanks, vino.
>
> 2018-08-01 11:55 GMT+08:00 yuvraj singh <19yuvrajsing...@gmail.com>:
>
>> Hi vino , thanks for the information .
>> But I was looking for the use case where I need to call a web service on
>> the stream .
>>
>> Thanks
>> Yubraj Singh
>>
>> On Wed, Aug 1, 2018, 8:32 AM vino yang  wrote:
>>
>>> Hi yuvraj,
>>>
>>> The documentation of Flink REST API is here :
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/monitoring/rest_api.html#monitoring-rest-api
>>>
>>> Thanks, vino.
>>>
>>> 2018-08-01 3:29 GMT+08:00 yuvraj singh <19yuvrajsing...@gmail.com>:
>>>
 Hi I have a use case where I need to call rest apis from a flink . I am
 not getting much context form internet , please help me on this .

 Thanks


>>>
>


Re: Old job resurrected during HA failover

2018-08-01 Thread vino yang
Hi Elias,

If a job is explicitly canceled, its jobgraph node on ZK will be deleted.
However, it is worth noting here that Flink enables a background thread to
asynchronously delete the jobGraph node,
so there may be cases where it cannot be deleted.
On the other hand, the jobgraph node on ZK is the only basis for the JM
leader to restore the job.
There may be an unexpected recovery or an old job resurrection.

Thanks, vino.

2018-08-01 23:13 GMT+08:00 Elias Levy :

> For the second time in as many months we've had an old job resurrected
> during HA failover in a 1.4.2 standalone cluster.  Failover was initiated
> when the leading JM lost its connection to ZK.  I opened FLINK-10011
>  with the details.
>
> We are using S3 with the Presto adapter as our distributed store.  After
> we cleaned up the cluster by shutting down the two jobs started after
> failover and starting a new job from the last known good checkpoint from
> the single job running in the cluster before failover, the HA recovery
> directory looks as follows:
>
> 3cmd ls s3://bucket/flink/cluster_1/recovery/
>  DIR s3://bucket/flink/cluster_1/recovery/some_job/}}
> 2018-07-31 17:33 35553 s3://bucket/flink/cluster_1/recovery/
> completedCheckpoint12e06bef01c5
> 2018-07-31 17:34 35553 s3://bucket/flink/cluster_1/recovery/
> completedCheckpoint187e0d2ae7cb
> 2018-07-31 17:32 35553 s3://bucket/flink/cluster_1/recovery/
> completedCheckpoint22fc8ca46f02
> 2018-06-12 20:01 284626 s3://bucket/flink/cluster_1/recovery/
> submittedJobGraph7f627a661cec
> 2018-07-30 23:01 285257 s3://bucket/flink/cluster_1/recovery/
> submittedJobGraphf3767780c00c
>
> submittedJobGraph7f627a661cec appears to be job
> 2a4eff355aef849c5ca37dbac04f2ff1, the long running job that failed during
> the ZK failover
>
> submittedJobGraphf3767780c00c appears to be job
> d77948df92813a68ea6dfd6783f40e7e, the job we started restoring from a
> checkpoint after shutting down the duplicate jobs
>
> Should submittedJobGraph7f627a661cec exist in the recovery directory if
> 2a4eff355aef849c5ca37dbac04f2ff1 is no longer running?
>
>
>


Re: Flink log and out files

2018-08-01 Thread vino yang
Hi Alexander:

.log and .out are different. Usually, the .log file stores the log
information output by the log framework. Flink uses slf4j as the log
interface and supports log4j and logback configurations. The .out file
stores the STDOUT information. This information is usually output by you
calling some APIs such as the print sink API.

Thanks, vino.

2018-08-01 23:19 GMT+08:00 Alexander Smirnov :

> Hi,
>
> could you please explain the difference between *.log and *.out files in
> Flink?
> What information is supposed to be in each of them?
> Is "log" a subset of "out"?
> How do I setup rotation with gzipping?
>
> Thank you,
> Alex
>


Flink log and out files

2018-08-01 Thread Alexander Smirnov
Hi,

could you please explain the difference between *.log and *.out files in
Flink?
What information is supposed to be in each of them?
Is "log" a subset of "out"?
How do I setup rotation with gzipping?

Thank you,
Alex


Old job resurrected during HA failover

2018-08-01 Thread Elias Levy
For the second time in as many months we've had an old job resurrected
during HA failover in a 1.4.2 standalone cluster.  Failover was initiated
when the leading JM lost its connection to ZK.  I opened FLINK-10011
 with the details.

We are using S3 with the Presto adapter as our distributed store.  After we
cleaned up the cluster by shutting down the two jobs started after failover
and starting a new job from the last known good checkpoint from the single
job running in the cluster before failover, the HA recovery directory looks
as follows:

3cmd ls s3://bucket/flink/cluster_1/recovery/
 DIR s3://bucket/flink/cluster_1/recovery/some_job/}}
2018-07-31 17:33 35553
s3://bucket/flink/cluster_1/recovery/completedCheckpoint12e06bef01c5
2018-07-31 17:34 35553
s3://bucket/flink/cluster_1/recovery/completedCheckpoint187e0d2ae7cb
2018-07-31 17:32 35553
s3://bucket/flink/cluster_1/recovery/completedCheckpoint22fc8ca46f02
2018-06-12 20:01 284626
s3://bucket/flink/cluster_1/recovery/submittedJobGraph7f627a661cec
2018-07-30 23:01 285257
s3://bucket/flink/cluster_1/recovery/submittedJobGraphf3767780c00c

submittedJobGraph7f627a661cec appears to be job
2a4eff355aef849c5ca37dbac04f2ff1, the long running job that failed during
the ZK failover

submittedJobGraphf3767780c00c appears to be job
d77948df92813a68ea6dfd6783f40e7e, the job we started restoring from a
checkpoint after shutting down the duplicate jobs

Should submittedJobGraph7f627a661cec exist in the recovery directory if
2a4eff355aef849c5ca37dbac04f2ff1 is no longer running?


Re: Could not retrieve the redirect address - No REST endpoint has been started

2018-08-01 Thread Andrey Zagrebin
Hi Pedro,

when you restart the cluster, do you keep Web UI open?
and does Web UI work eventually after restart and getting this error?

Cheers,
Andrey

> On 1 Aug 2018, at 11:56, PedroMrChaves  wrote:
> 
> Hello,
> 
> I have a running standalone Flink cluster with 2 task managers and 2 job
> manages (one task manager and job manager per machine). 
> Sometimes, when I restart the cluster I get the following error message: 
> /
> java.util.concurrent.CompletionException:
> org.apache.flink.util.FlinkException: No REST endpoint has been started for
> the JobManager.
>at
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
>at
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
>at
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
>at
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
>at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>at
> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:442)
>at akka.dispatch.OnComplete.internal(Future.scala:258)
>at akka.dispatch.OnComplete.internal(Future.scala:256)
>at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
>at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
>at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>at
> org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
>at
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
>at
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
>at scala.concurrent.Promise$class.complete(Promise.scala:55)
>at
> scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:157)
>at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237)
>at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237)
>at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>at
> scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:63)
>at
> scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:78)
>at
> scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:55)
>at
> scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:55)
>at
> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
>at
> scala.concurrent.BatchingExecutor$Batch.run(BatchingExecutor.scala:54)
>at
> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
>at
> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:106)
>at
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
>at
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
>at
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
>at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:534)
>at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:97)
>at
> akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:982)
>at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:446)
>at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: org.apache.flink.util.FlinkException: No REST endpoint has been
> started for the JobManager./
> 
> which prevents the access to the web interface. 
> 
> AM using version 1.4.2
> 
> Any idea on what might be causing this?
> 
> Regards,
> Pedro.
> 
> 
> 
> 
> 
> -
> Best Regards,
> Pedro Chaves
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Access to Kafka Event Time

2018-08-01 Thread Hequn Cheng
Hi Vishal,

> We have a use case where multiple topics are streamed to hdfs and we
would want to created buckets based on ingestion time
If I understand correctly, you want to create buckets based on event time.
Maybe you can use window[1]. For example, a tumbling window of 5 minutes
groups rows in 5 minutes intervals. And you can get window start
time(TUMBLE_START(time_attr, interval)) and end time(TUMBLE_END(time_attr,
interval)) when output data.

Best, Hequn

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html#group-windows

On Wed, Aug 1, 2018 at 8:21 PM, Vishal Santoshi 
wrote:

> Any feedbaxk?
>
> On Tue, Jul 31, 2018, 10:20 AM Vishal Santoshi 
> wrote:
>
>> In fact it may be available else where too ( for example ProcessFunction
>> etc ) but do we have no need to create one, it is just a data relay ( kafka
>> to hdfs ) and any intermediate processing should be avoided if possible
>> IMHO.
>>
>> On Tue, Jul 31, 2018 at 9:10 AM, Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> We have a use case where multiple topics are streamed to hdfsand we
>>> would want to created buckets based on ingestion time ( the time the event
>>> were pushed to kafka ). Our producers to kafka will set that the event time
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-
>>> release-1.5/dev/connectors/kafka.html#using-kafka-
>>> timestamps-and-flink-event-time-in-kafka-010
>>>
>>> suggests that the the "previousElementTimeStamp" will provide that
>>> timestamp provided "EventTime" characteristic is set. It also provides for
>>> the element. In out case the element will expose setIngestionTIme(long
>>> time) method. Is the element in this method
>>>
>>> public long extractTimestamp(Long element, long previousElementTimestamp)
>>>
>>>  passed by reference and can it be safely ( loss lessly ) mutated for
>>> downstream operators ?
>>>
>>>
>>> That said there is another place where that record time stamp is
>>> available.
>>>
>>> https://github.com/apache/flink/blob/master/flink-
>>> connectors/flink-connector-kafka-0.9/src/main/java/org/
>>> apache/flink/streaming/connectors/kafka/internal/
>>> Kafka09Fetcher.java#L141
>>>
>>> Is it possible to change the signature of the
>>>
>>> https://github.com/apache/flink/blob/master/flink-
>>> connectors/flink-connector-kafka-base/src/main/java/org/
>>> apache/flink/streaming/util/serialization/KeyedDeserializationSchema.
>>> java#L46
>>>
>>> to add record timestamp as the last argument ?
>>>
>>> Regards,
>>>
>>> Vishal
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>


Re: Converting a DataStream into a Table throws error

2018-08-01 Thread Fabian Hueske
Hi I think you are mixing Java and Scala dependencies.

org.apache.flink.streaming.api.datastream.DataStream is the DataStream of
the Java DataStream API.
You should use the DataStream of the Scala DataStream API.

Best, Fabian

2018-08-01 14:01 GMT+02:00 Mich Talebzadeh :

> Hi,
>
> I believed I tried Hequn's suggestion and tried again
>
> import org.apache.flink.table.api.Table
> import org.apache.flink.table.api.TableEnvironment
>
> *import org.apache.flink.table.api.scala._*
> Unfortunately I am still getting the same error!
>
> [info] Compiling 1 Scala source to /home/hduser/dba/bin/flink/md_
> streaming/target/scala-2.11/classes...
> [error] /home/hduser/dba/bin/flink/md_streaming/src/main/scala/
> myPackage/md_streaming.scala:151: overloaded method value fromDataStream
> with alternatives:
> [error]   [T](dataStream: 
> org.apache.flink.streaming.api.datastream.DataStream[T],
> fields: String)org.apache.flink.table.api.Table 
> [error]   [T](dataStream: org.apache.flink.streaming.
> api.datastream.DataStream[T])org.apache.flink.table.api.Table
> [error]  cannot be applied to 
> (org.apache.flink.streaming.api.datastream.DataStreamSource[String],
> Symbol, Symbol, Symbol, Symbol)
> [error]   val table1: Table = tableEnv.fromDataStream(dataStream, 'key,
> 'ticker, 'timeissued, 'price)
> [error]^
> [error] one error found
> [error] (compile:compileIncremental) Compilation failed
> [error] Total time: 3 s, completed Aug 1, 2018 12:59:44 PM
> Completed compiling
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Wed, 1 Aug 2018 at 10:03, Timo Walther  wrote:
>
>> If these two imports are the only imports that you added, then you did
>> not follow Hequn's advice or the link that I sent you.
>>
>> You need to add the underscore imports to let Scala do its magic.
>>
>> Timo
>>
>>
>> Am 01.08.18 um 10:28 schrieb Mich Talebzadeh:
>>
>> Hi Timo,
>>
>> These are my two flink table related imports
>>
>> import org.apache.flink.table.api.Table
>> import org.apache.flink.table.api.TableEnvironment
>>
>> And these are my dependencies building with SBT
>>
>> libraryDependencies += "org.apache.hadoop" % "hadoop-core" % "1.2.1"
>> libraryDependencies += "org.apache.hbase" % "hbase" % "1.2.6"
>> libraryDependencies += "org.apache.hbase" % "hbase-client" % "1.2.6"
>> libraryDependencies += "org.apache.hbase" % "hbase-common" % "1.2.6"
>> libraryDependencies += "org.apache.hbase" % "hbase-server" % "1.2.6"
>> libraryDependencies += "org.apache.flink" %% "flink-connector-kafka-0.11"
>> % "1.5.0"
>> libraryDependencies += "org.apache.flink" %% "flink-connector-kafka-base"
>> % "1.5.0"
>> libraryDependencies += "org.apache.flink" %% "flink-scala" % "1.5.0"
>> libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.11.0.0"
>> libraryDependencies += "org.apache.flink" %% "flink-streaming-java" %
>> "1.5.0" % "provided"
>>
>> *libraryDependencies += "org.apache.flink" %% "flink-table" % "1.5.0" %
>> "provided" *libraryDependencies += "org.apache.kafka" %% "kafka" %
>> "0.11.0.0"
>>
>> There appears to be conflict somewhere that cause this error
>>
>> [info] Compiling 1 Scala source to /home/hduser/dba/bin/flink/md_
>> streaming/target/scala-2.11/classes...
>> [error] /home/hduser/dba/bin/flink/md_streaming/src/main/scala/
>> myPackage/md_streaming.scala:152: overloaded method value fromDataStream
>> with alternatives:
>> [error]   [T](dataStream: 
>> org.apache.flink.streaming.api.datastream.DataStream[T],
>> fields: String)org.apache.flink.table.api.Table 
>> [error]   [T](dataStream: org.apache.flink.streaming.
>> api.datastream.DataStream[T])org.apache.flink.table.api.Table
>> [error]  cannot be applied to 
>> (org.apache.flink.streaming.api.datastream.DataStreamSource[String],
>> Symbol, Symbol, Symbol, Symbol)
>> [error]   val table1: Table = tableEnv.fromDataStream(dataStream, 'key,
>> 'ticker, 'timeissued, 'price)
>> [error]^
>> [error] one error found
>> [error] (compile:compileIncremental) Compilation failed
>>
>> Thanks
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or 

Re: Access to Kafka Event Time

2018-08-01 Thread Vishal Santoshi
Any feedbaxk?

On Tue, Jul 31, 2018, 10:20 AM Vishal Santoshi 
wrote:

> In fact it may be available else where too ( for example ProcessFunction
> etc ) but do we have no need to create one, it is just a data relay ( kafka
> to hdfs ) and any intermediate processing should be avoided if possible
> IMHO.
>
> On Tue, Jul 31, 2018 at 9:10 AM, Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
>
>> We have a use case where multiple topics are streamed to hdfsand we would
>> want to created buckets based on ingestion time ( the time the event were
>> pushed to kafka ). Our producers to kafka will set that the event time
>>
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/connectors/kafka.html#using-kafka-timestamps-and-flink-event-time-in-kafka-010
>>
>> suggests that the the "previousElementTimeStamp" will provide that
>> timestamp provided "EventTime" characteristic is set. It also provides for
>> the element. In out case the element will expose setIngestionTIme(long
>> time) method. Is the element in this method
>>
>> public long extractTimestamp(Long element, long previousElementTimestamp)
>>
>>  passed by reference and can it be safely ( loss lessly ) mutated for
>> downstream operators ?
>>
>>
>> That said there is another place where that record time stamp is
>> available.
>>
>>
>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java#L141
>>
>> Is it possible to change the signature of the
>>
>>
>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java#L46
>>
>> to add record timestamp as the last argument ?
>>
>> Regards,
>>
>> Vishal
>>
>>
>>
>>
>>
>>
>>
>


Re: Default Restart Strategy Not Work With Checkpointing

2018-08-01 Thread Chesnay Schepler

Please see FLINK-9143 .

On 01.08.2018 14:07, Paul Lam wrote:

Hi,
I’m running a Flink 1.5.0 standalone cluster on which `restart-strategy` was 
set to `failure-rate`, and the web frontend shows that the JobManager and the 
TaskManagers are following this configuration, but streaming jobs with 
checkpointing enabled are still using the fixed delay strategy with no respect 
to the default restart strategy (no explicit overwrites in the user code).

I read the source code and found a possible explanation for this (but not very 
sure): the client generates JobGraph without respect to flink-conf.yaml and 
sets the restart strategy to fixed delay if the checkpointing is on, and the 
server side (JobMaster) follows the flink-conf.yaml's default restart strategy 
configuration, but will gave the one in JobGraph a higher priority, so it’s 
always overwritten by the fixed delay strategy.

If I understand correctly, this might be a bug. Is there anything suggestion to 
avoid it for now?

Best regard,
Paul Lam





Re: Rest API calls

2018-08-01 Thread vino yang
Hi yuvraj,

Sorry, I didn't see it clearly. I think in many UDF contexts, such as
MapFunction, ProcessFunction, etc., you can access the Rest API as a
client.
Also if you want to improve performance, maybe async I/O will help you[1].

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/operators/asyncio.html

Thanks, vino.

2018-08-01 11:55 GMT+08:00 yuvraj singh <19yuvrajsing...@gmail.com>:

> Hi vino , thanks for the information .
> But I was looking for the use case where I need to call a web service on
> the stream .
>
> Thanks
> Yubraj Singh
>
> On Wed, Aug 1, 2018, 8:32 AM vino yang  wrote:
>
>> Hi yuvraj,
>>
>> The documentation of Flink REST API is here : https://ci.apache.org/
>> projects/flink/flink-docs-release-1.5/monitoring/rest_
>> api.html#monitoring-rest-api
>>
>> Thanks, vino.
>>
>> 2018-08-01 3:29 GMT+08:00 yuvraj singh <19yuvrajsing...@gmail.com>:
>>
>>> Hi I have a use case where I need to call rest apis from a flink . I am
>>> not getting much context form internet , please help me on this .
>>>
>>> Thanks
>>>
>>>
>>


Default Restart Strategy Not Work With Checkpointing

2018-08-01 Thread Paul Lam
Hi, 
I’m running a Flink 1.5.0 standalone cluster on which `restart-strategy` was 
set to `failure-rate`, and the web frontend shows that the JobManager and the 
TaskManagers are following this configuration, but streaming jobs with 
checkpointing enabled are still using the fixed delay strategy with no respect 
to the default restart strategy (no explicit overwrites in the user code). 

I read the source code and found a possible explanation for this (but not very 
sure): the client generates JobGraph without respect to flink-conf.yaml and 
sets the restart strategy to fixed delay if the checkpointing is on, and the 
server side (JobMaster) follows the flink-conf.yaml's default restart strategy 
configuration, but will gave the one in JobGraph a higher priority, so it’s 
always overwritten by the fixed delay strategy. 

If I understand correctly, this might be a bug. Is there anything suggestion to 
avoid it for now?

Best regard,
Paul Lam

Re: Converting a DataStream into a Table throws error

2018-08-01 Thread Mich Talebzadeh
Hi,

I believed I tried Hequn's suggestion and tried again

import org.apache.flink.table.api.Table
import org.apache.flink.table.api.TableEnvironment

*import org.apache.flink.table.api.scala._*
Unfortunately I am still getting the same error!

[info] Compiling 1 Scala source to
/home/hduser/dba/bin/flink/md_streaming/target/scala-2.11/classes...
[error]
/home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:151:
overloaded method value fromDataStream with alternatives:
[error]   [T](dataStream:
org.apache.flink.streaming.api.datastream.DataStream[T], fields:
String)org.apache.flink.table.api.Table 
[error]   [T](dataStream:
org.apache.flink.streaming.api.datastream.DataStream[T])org.apache.flink.table.api.Table
[error]  cannot be applied to
(org.apache.flink.streaming.api.datastream.DataStreamSource[String],
Symbol, Symbol, Symbol, Symbol)
[error]   val table1: Table = tableEnv.fromDataStream(dataStream, 'key,
'ticker, 'timeissued, 'price)
[error]^
[error] one error found
[error] (compile:compileIncremental) Compilation failed
[error] Total time: 3 s, completed Aug 1, 2018 12:59:44 PM
Completed compiling


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Wed, 1 Aug 2018 at 10:03, Timo Walther  wrote:

> If these two imports are the only imports that you added, then you did not
> follow Hequn's advice or the link that I sent you.
>
> You need to add the underscore imports to let Scala do its magic.
>
> Timo
>
>
> Am 01.08.18 um 10:28 schrieb Mich Talebzadeh:
>
> Hi Timo,
>
> These are my two flink table related imports
>
> import org.apache.flink.table.api.Table
> import org.apache.flink.table.api.TableEnvironment
>
> And these are my dependencies building with SBT
>
> libraryDependencies += "org.apache.hadoop" % "hadoop-core" % "1.2.1"
> libraryDependencies += "org.apache.hbase" % "hbase" % "1.2.6"
> libraryDependencies += "org.apache.hbase" % "hbase-client" % "1.2.6"
> libraryDependencies += "org.apache.hbase" % "hbase-common" % "1.2.6"
> libraryDependencies += "org.apache.hbase" % "hbase-server" % "1.2.6"
> libraryDependencies += "org.apache.flink" %% "flink-connector-kafka-0.11"
> % "1.5.0"
> libraryDependencies += "org.apache.flink" %% "flink-connector-kafka-base"
> % "1.5.0"
> libraryDependencies += "org.apache.flink" %% "flink-scala" % "1.5.0"
> libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.11.0.0"
> libraryDependencies += "org.apache.flink" %% "flink-streaming-java" %
> "1.5.0" % "provided"
>
> *libraryDependencies += "org.apache.flink" %% "flink-table" % "1.5.0" %
> "provided" *libraryDependencies += "org.apache.kafka" %% "kafka" %
> "0.11.0.0"
>
> There appears to be conflict somewhere that cause this error
>
> [info] Compiling 1 Scala source to
> /home/hduser/dba/bin/flink/md_streaming/target/scala-2.11/classes...
> [error]
> /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:152:
> overloaded method value fromDataStream with alternatives:
> [error]   [T](dataStream:
> org.apache.flink.streaming.api.datastream.DataStream[T], fields:
> String)org.apache.flink.table.api.Table 
> [error]   [T](dataStream:
> org.apache.flink.streaming.api.datastream.DataStream[T])org.apache.flink.table.api.Table
> [error]  cannot be applied to
> (org.apache.flink.streaming.api.datastream.DataStreamSource[String],
> Symbol, Symbol, Symbol, Symbol)
> [error]   val table1: Table = tableEnv.fromDataStream(dataStream, 'key,
> 'ticker, 'timeissued, 'price)
> [error]^
> [error] one error found
> [error] (compile:compileIncremental) Compilation failed
>
> Thanks
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Wed, 1 Aug 2018 at 09:17, Timo Walther  wrote:
>
>> Hi Mich,
>>
>> I would check you imports again [1]. This is a pure compiler issue that
>> is unrelated to your actual data stream. Also check your 

Could not retrieve the redirect address - No REST endpoint has been started

2018-08-01 Thread PedroMrChaves
Hello,

I have a running standalone Flink cluster with 2 task managers and 2 job
manages (one task manager and job manager per machine). 
Sometimes, when I restart the cluster I get the following error message: 
/
java.util.concurrent.CompletionException:
org.apache.flink.util.FlinkException: No REST endpoint has been started for
the JobManager.
at
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
at
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
at
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
at
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:442)
at akka.dispatch.OnComplete.internal(Future.scala:258)
at akka.dispatch.OnComplete.internal(Future.scala:256)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at
org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
at
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
at
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
at scala.concurrent.Promise$class.complete(Promise.scala:55)
at
scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:157)
at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237)
at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at
scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:63)
at
scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:78)
at
scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:55)
at
scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:55)
at
scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
at
scala.concurrent.BatchingExecutor$Batch.run(BatchingExecutor.scala:54)
at
scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
at
scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:106)
at
scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
at
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
at
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:534)
at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:97)
at
akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:982)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:446)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.util.FlinkException: No REST endpoint has been
started for the JobManager./

which prevents the access to the web interface. 

AM using version 1.4.2

Any idea on what might be causing this?

Regards,
Pedro.





-
Best Regards,
Pedro Chaves
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Service discovery for flink-metrics-prometheus

2018-08-01 Thread Dongwon Kim
Hi all,

I also suffer from the lack of service discovery for flink-metrics-prometheus 
while using YARN for deployment, Prometheus for instrumentation, and Flink for 
stream processing.
I just upload a Python script for the purpose here: 
https://github.com/eastcirclek/flink-service-discovery 

Hope it can be helpful for your use case.

Best,

- Dongwon


https://github.com/eastcirclek/flink-service-discovery 


> 2018. 1. 8. 오후 7:27, Chesnay Schepler  작성:
> 
> Yes, the logs are the only way to find out which port the reporter is bound 
> to. 
> 
> We may be able to display this information in the web-UI, but it isn't very 
> high on my list and will probably require 
> modifications to the reporter interface. 
> 
> On 06.01.2018 04:24, Kien Truong wrote:
>> Hi,
>> 
>> We are using YARN for deployment, so the combination of host for the 
>> Prometheus reporters can be really random depending on how the containers 
>> are co-located.  
>> 
>> One option we thought of was scrapping the log for this information, but it 
>> can be really messy in the long run.
>> 
>> Regards,
>> Kien
>> 
>> Sent from TypeApp 
>> On Jan 5, 2018, at 03:53, Stephan Ewen > > wrote:
>> How are you running deploying your Flink processes? For Service Discovery 
>> for Prometheus on Kubernetes, there are a few articles out there...
>> 
>> On Thu, Jan 4, 2018 at 3:52 PM, Aljoscha Krettek > > wrote: 
>> I'm not aware of how this is typically done but maybe Chesnay (cc'ed) has an 
>> idea. 
>> 
>> > On 14. Dec 2017, at 16:55, Kien Truong < duckientru...@gmail.com 
>> > > wrote: 
>> > 
>> > Hi, 
>> > 
>> > Does anyone have recommendations about integrating 
>> > flink-metrics-prometheus with some SD mechanism 
>> > 
>> > so that Prometheus can pick up the Task Manager's location dynamically ? 
>> > 
>> > Best regards, 
>> > 
>> > Kien 
>> > 
>> 
>> 
> 



Re: [ANNOUNCE] Apache Flink 1.5.2 released

2018-08-01 Thread Till Rohrmann
Great work! Thanks to everyone who helped with the release.

Cheers,
Till

On Tue, Jul 31, 2018 at 7:43 PM Bowen Li  wrote:

> Congratulations, community!
>
> On Tue, Jul 31, 2018 at 1:44 AM Chesnay Schepler 
> wrote:
>
>> The Apache Flink community is very happy to announce the release of
>> Apache Flink 1.5.2, which is the second bugfix release for the Apache Flink
>> 1.5 series.
>>
>> Apache Flink® is an open-source stream processing framework for
>> distributed, high-performing, always-available, and accurate data streaming
>> applications.
>>
>> The release is available for download at:
>> https://flink.apache.org/downloads.html
>>
>> Please check out the release blog post for an overview of the
>> improvements for this bugfix release:
>> https://flink.apache.org/news/2018/07/31/release-1.5.2.html
>>
>> The full release notes are available in Jira:
>>
>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12343588
>>
>> We would like to thank all contributors of the Apache Flink community who
>> made this release possible!
>>
>> Regards,
>> Chesnay
>>
>


Service discovery for Prometheus on YARN

2018-08-01 Thread Dongwon Kim
Hi community,

Prior to my presentation [1], I'd like to share a Python script [2] to discover 
Flink clusters on YARN and let Prometheus know via its file-based service 
discovery mechanism [3].

Prometheus needs to pull metrics from Prometheus exporters running inside 
TaskManagers.
The problem is that, as also discussed in [4], we cannot know in advance hosts 
and ports to which Prometheus exporters are bound.
As Chesnay mentioned in [4], unless the information as to which the exporters 
are bound is exposed by Flink's REST APIs, the only way is to manually scrape 
logs of JM and TMs.
Then you need to let Prometheus know where to pull metrics from.

The script is intended to automate the manual process.
Please visit the git repository for the detailed information.

Best,

- Dongwon

[1] 
https://berlin-2018.flink-forward.org/conference-program/#real-time-driving-score-service-using-flink
 

[2] https://github.com/eastcirclek/flink-service-discovery 

[3] 
https://prometheus.io/docs/prometheus/latest/configuration/configuration/#%3Cfile_sd_config%3E
 

[4] 
http://mail-archives.apache.org/mod_mbox/flink-user/201801.mbox/%3cb9b8868a-3026-689a-941f-ac20b42e6...@apache.org%3E
 


Re: Converting a DataStream into a Table throws error

2018-08-01 Thread Timo Walther
If these two imports are the only imports that you added, then you did 
not follow Hequn's advice or the link that I sent you.


You need to add the underscore imports to let Scala do its magic.

Timo


Am 01.08.18 um 10:28 schrieb Mich Talebzadeh:

Hi Timo,

These are my two flink table related imports

import org.apache.flink.table.api.Table
import org.apache.flink.table.api.TableEnvironment

And these are my dependencies building with SBT

libraryDependencies += "org.apache.hadoop" % "hadoop-core" % "1.2.1"
libraryDependencies += "org.apache.hbase" % "hbase" % "1.2.6"
libraryDependencies += "org.apache.hbase" % "hbase-client" % "1.2.6"
libraryDependencies += "org.apache.hbase" % "hbase-common" % "1.2.6"
libraryDependencies += "org.apache.hbase" % "hbase-server" % "1.2.6"
libraryDependencies += "org.apache.flink" %% 
"flink-connector-kafka-0.11" % "1.5.0"
libraryDependencies += "org.apache.flink" %% 
"flink-connector-kafka-base" % "1.5.0"

libraryDependencies += "org.apache.flink" %% "flink-scala" % "1.5.0"
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.11.0.0"
libraryDependencies += "org.apache.flink" %% "flink-streaming-java" % 
"1.5.0" % "provided"
*libraryDependencies += "org.apache.flink" %% "flink-table" % "1.5.0" 
% "provided"

*libraryDependencies += "org.apache.kafka" %% "kafka" % "0.11.0.0"

There appears to be conflict somewhere that cause this error

[info] Compiling 1 Scala source to 
/home/hduser/dba/bin/flink/md_streaming/target/scala-2.11/classes...
[error] 
/home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:152: 
overloaded method value fromDataStream with alternatives:
[error]   [T](dataStream: 
org.apache.flink.streaming.api.datastream.DataStream[T], fields: 
String)org.apache.flink.table.api.Table 
[error]   [T](dataStream: 
org.apache.flink.streaming.api.datastream.DataStream[T])org.apache.flink.table.api.Table
[error]  cannot be applied to 
(org.apache.flink.streaming.api.datastream.DataStreamSource[String], 
Symbol, Symbol, Symbol, Symbol)
[error]   val table1: Table = tableEnv.fromDataStream(dataStream, 
'key, 'ticker, 'timeissued, 'price)

[error]    ^
[error] one error found
[error] (compile:compileIncremental) Compilation failed

Thanks


Dr Mich Talebzadeh

LinkedIn 
/https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw/


http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk.Any and all responsibility for 
any loss, damage or destruction of data or any other property which 
may arise from relying on this email's technical content is explicitly 
disclaimed. The author will in no case be liable for any monetary 
damages arising from such loss, damage or destruction.




On Wed, 1 Aug 2018 at 09:17, Timo Walther > wrote:


Hi Mich,

I would check you imports again [1]. This is a pure compiler issue
that is unrelated to your actual data stream. Also check your
project dependencies.

Regards,
Timo

[1]

https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#implicit-conversion-for-scala

Am 01.08.18 um 09:30 schrieb Mich Talebzadeh:


Hi both,

I added the import as Hequn suggested.

My stream is very simple and consists of 4 values separated by
"," as below

05521df6-4ccf-4b2f-b874-eb27d461b305,IBM,2018-07-30T19:51:50,190.48

So this is what I have been trying to do

Code

val dataStream =  streamExecEnv
   .addSource(new FlinkKafkaConsumer011[String](topicsValue,
new SimpleStringSchema(), properties))
 //
 //
  val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv)
  val table1: Table = tableEnv.fromDataStream(dataStream, 'key,
'ticker, 'timeissued, 'price)

note those four columns in Table1 definition

And this is the error being thrown

[info] Compiling 1 Scala source to
/home/hduser/dba/bin/flink/md_streaming/target/scala-2.11/classes...
[error]

/home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:152:
overloaded method value fromDataStream with alternatives:
[error]   [T](dataStream:
org.apache.flink.streaming.api.datastream.DataStream[T], fields:
String)org.apache.flink.table.api.Table 
[error]   [T](dataStream:

org.apache.flink.streaming.api.datastream.DataStream[T])org.apache.flink.table.api.Table
[error]  cannot be applied to
(org.apache.flink.streaming.api.datastream.DataStreamSource[String],
Symbol, Symbol, Symbol, Symbol)
[error]   val table1: Table = tableEnv.fromDataStream(dataStream,
'key, 'ticker, 'timeissued, 'price)
[error]    ^
[error] one error found
[error] (compile:compileIncremental) Compilation failed

I suspect dataStream may not be compatible with this operation?

Regards,

Dr Mich Talebzadeh

LinkedIn


Re: Converting a DataStream into a Table throws error

2018-08-01 Thread Mich Talebzadeh
Hi Timo,

These are my two flink table related imports

import org.apache.flink.table.api.Table
import org.apache.flink.table.api.TableEnvironment

And these are my dependencies building with SBT

libraryDependencies += "org.apache.hadoop" % "hadoop-core" % "1.2.1"
libraryDependencies += "org.apache.hbase" % "hbase" % "1.2.6"
libraryDependencies += "org.apache.hbase" % "hbase-client" % "1.2.6"
libraryDependencies += "org.apache.hbase" % "hbase-common" % "1.2.6"
libraryDependencies += "org.apache.hbase" % "hbase-server" % "1.2.6"
libraryDependencies += "org.apache.flink" %% "flink-connector-kafka-0.11" %
"1.5.0"
libraryDependencies += "org.apache.flink" %% "flink-connector-kafka-base" %
"1.5.0"
libraryDependencies += "org.apache.flink" %% "flink-scala" % "1.5.0"
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.11.0.0"
libraryDependencies += "org.apache.flink" %% "flink-streaming-java" %
"1.5.0" % "provided"

*libraryDependencies += "org.apache.flink" %% "flink-table" % "1.5.0" %
"provided"*libraryDependencies += "org.apache.kafka" %% "kafka" % "0.11.0.0"

There appears to be conflict somewhere that cause this error

[info] Compiling 1 Scala source to
/home/hduser/dba/bin/flink/md_streaming/target/scala-2.11/classes...
[error]
/home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:152:
overloaded method value fromDataStream with alternatives:
[error]   [T](dataStream:
org.apache.flink.streaming.api.datastream.DataStream[T], fields:
String)org.apache.flink.table.api.Table 
[error]   [T](dataStream:
org.apache.flink.streaming.api.datastream.DataStream[T])org.apache.flink.table.api.Table
[error]  cannot be applied to
(org.apache.flink.streaming.api.datastream.DataStreamSource[String],
Symbol, Symbol, Symbol, Symbol)
[error]   val table1: Table = tableEnv.fromDataStream(dataStream, 'key,
'ticker, 'timeissued, 'price)
[error]^
[error] one error found
[error] (compile:compileIncremental) Compilation failed

Thanks


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Wed, 1 Aug 2018 at 09:17, Timo Walther  wrote:

> Hi Mich,
>
> I would check you imports again [1]. This is a pure compiler issue that is
> unrelated to your actual data stream. Also check your project dependencies.
>
> Regards,
> Timo
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#implicit-conversion-for-scala
>
> Am 01.08.18 um 09:30 schrieb Mich Talebzadeh:
>
>
> Hi both,
>
> I added the import as Hequn suggested.
>
> My stream is very simple and consists of 4 values separated by "," as
> below
>
> 05521df6-4ccf-4b2f-b874-eb27d461b305,IBM,2018-07-30T19:51:50,190.48
>
> So this is what I have been trying to do
>
> Code
>
> val dataStream =  streamExecEnv
>.addSource(new FlinkKafkaConsumer011[String](topicsValue, new
> SimpleStringSchema(), properties))
>  //
>  //
>   val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv)
>   val table1: Table = tableEnv.fromDataStream(dataStream, 'key, 'ticker,
> 'timeissued, 'price)
>
> note those four columns in Table1 definition
>
> And this is the error being thrown
>
> [info] Compiling 1 Scala source to
> /home/hduser/dba/bin/flink/md_streaming/target/scala-2.11/classes...
> [error]
> /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:152:
> overloaded method value fromDataStream with alternatives:
> [error]   [T](dataStream:
> org.apache.flink.streaming.api.datastream.DataStream[T], fields:
> String)org.apache.flink.table.api.Table 
> [error]   [T](dataStream:
> org.apache.flink.streaming.api.datastream.DataStream[T])org.apache.flink.table.api.Table
> [error]  cannot be applied to
> (org.apache.flink.streaming.api.datastream.DataStreamSource[String],
> Symbol, Symbol, Symbol, Symbol)
> [error]   val table1: Table = tableEnv.fromDataStream(dataStream, 'key,
> 'ticker, 'timeissued, 'price)
> [error]^
> [error] one error found
> [error] (compile:compileIncremental) Compilation failed
>
> I suspect dataStream may not be compatible with this operation?
>
> Regards,
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> 

Re: Converting a DataStream into a Table throws error

2018-08-01 Thread Timo Walther

Hi Mich,

I would check you imports again [1]. This is a pure compiler issue that 
is unrelated to your actual data stream. Also check your project 
dependencies.


Regards,
Timo

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#implicit-conversion-for-scala


Am 01.08.18 um 09:30 schrieb Mich Talebzadeh:


Hi both,

I added the import as Hequn suggested.

My stream is very simple and consists of 4 values separated by "," as 
below


05521df6-4ccf-4b2f-b874-eb27d461b305,IBM,2018-07-30T19:51:50,190.48

So this is what I have been trying to do

Code

    val dataStream =  streamExecEnv
   .addSource(new FlinkKafkaConsumer011[String](topicsValue, new 
SimpleStringSchema(), properties))

 //
 //
  val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv)
  val table1: Table = tableEnv.fromDataStream(dataStream, 'key, 
'ticker, 'timeissued, 'price)


note those four columns in Table1 definition

And this is the error being thrown

[info] Compiling 1 Scala source to 
/home/hduser/dba/bin/flink/md_streaming/target/scala-2.11/classes...
[error] 
/home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:152: 
overloaded method value fromDataStream with alternatives:
[error]   [T](dataStream: 
org.apache.flink.streaming.api.datastream.DataStream[T], fields: 
String)org.apache.flink.table.api.Table 
[error]   [T](dataStream: 
org.apache.flink.streaming.api.datastream.DataStream[T])org.apache.flink.table.api.Table
[error]  cannot be applied to 
(org.apache.flink.streaming.api.datastream.DataStreamSource[String], 
Symbol, Symbol, Symbol, Symbol)
[error]   val table1: Table = tableEnv.fromDataStream(dataStream, 
'key, 'ticker, 'timeissued, 'price)

[error]    ^
[error] one error found
[error] (compile:compileIncremental) Compilation failed

I suspect dataStream may not be compatible with this operation?

Regards,

Dr Mich Talebzadeh

LinkedIn 
/https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw/


http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk.Any and all responsibility for 
any loss, damage or destruction of data or any other property which 
may arise from relying on this email's technical content is explicitly 
disclaimed. The author will in no case be liable for any monetary 
damages arising from such loss, damage or destruction.




On Wed, 1 Aug 2018 at 04:51, Hequn Cheng > wrote:


Hi, Mich

You can try adding "import org.apache.flink.table.api.scala._", so
that the Symbol can be recognized as an Expression.

Best, Hequn

On Wed, Aug 1, 2018 at 6:16 AM, Mich Talebzadeh
mailto:mich.talebza...@gmail.com>> wrote:

Hi,

I am following this example


https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/common.html#integration-with-datastream-and-dataset-api

This is my dataStream which is built on a Kafka topic

//
    //Create a Kafka consumer
    //
    val dataStream =  streamExecEnv
   .addSource(new
FlinkKafkaConsumer011[String](topicsValue, new
SimpleStringSchema(), properties))
 //
 //
  val tableEnv =
TableEnvironment.getTableEnvironment(streamExecEnv)
  val table1: Table = tableEnv.fromDataStream(dataStream,
'key, 'ticker, 'timeissued, 'price)

While compiling it throws this error

[error]

/home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:169:
overloaded method value fromDataStream with alternatives:
[error]   [T](dataStream:
org.apache.flink.streaming.api.datastream.DataStream[T],
fields: String)org.apache.flink.table.api.Table 
[error]   [T](dataStream:

org.apache.flink.streaming.api.datastream.DataStream[T])org.apache.flink.table.api.Table
[error]  cannot be applied to
(org.apache.flink.streaming.api.datastream.DataStreamSource[String],
Symbol, Symbol, Symbol, Symbol)
[error]   val table1: Table =
tableEnv.fromDataStream(dataStream, 'key, 'ticker,
'timeissued, 'price)
[error]    ^
[error] one error found
[error] (compile:compileIncremental) Compilation failed

The topic is very simple, it is comma separated prices. I
tried mapFunction and flatMap but neither worked!

Thanks,


Dr Mich Talebzadeh

LinkedIn

/https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw/

http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk.Any and all
responsibility for any loss, damage or destruction of data or
any other property which may arise from relying on this
email's technical content is explicitly disclaimed. The author
 

Re: Converting a DataStream into a Table throws error

2018-08-01 Thread Mich Talebzadeh
Hi both,

I added the import as Hequn suggested.

My stream is very simple and consists of 4 values separated by "," as below

05521df6-4ccf-4b2f-b874-eb27d461b305,IBM,2018-07-30T19:51:50,190.48

So this is what I have been trying to do

Code

val dataStream =  streamExecEnv
   .addSource(new FlinkKafkaConsumer011[String](topicsValue, new
SimpleStringSchema(), properties))
 //
 //
  val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv)
  val table1: Table = tableEnv.fromDataStream(dataStream, 'key, 'ticker,
'timeissued, 'price)

note those four columns in Table1 definition

And this is the error being thrown

[info] Compiling 1 Scala source to
/home/hduser/dba/bin/flink/md_streaming/target/scala-2.11/classes...
[error]
/home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:152:
overloaded method value fromDataStream with alternatives:
[error]   [T](dataStream:
org.apache.flink.streaming.api.datastream.DataStream[T], fields:
String)org.apache.flink.table.api.Table 
[error]   [T](dataStream:
org.apache.flink.streaming.api.datastream.DataStream[T])org.apache.flink.table.api.Table
[error]  cannot be applied to
(org.apache.flink.streaming.api.datastream.DataStreamSource[String],
Symbol, Symbol, Symbol, Symbol)
[error]   val table1: Table = tableEnv.fromDataStream(dataStream, 'key,
'ticker, 'timeissued, 'price)
[error]^
[error] one error found
[error] (compile:compileIncremental) Compilation failed

I suspect dataStream may not be compatible with this operation?

Regards,

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Wed, 1 Aug 2018 at 04:51, Hequn Cheng  wrote:

> Hi, Mich
>
> You can try adding "import org.apache.flink.table.api.scala._", so that
> the Symbol can be recognized as an Expression.
>
> Best, Hequn
>
> On Wed, Aug 1, 2018 at 6:16 AM, Mich Talebzadeh  > wrote:
>
>> Hi,
>>
>> I am following this example
>>
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/common.html#integration-with-datastream-and-dataset-api
>>
>> This is my dataStream which is built on a Kafka topic
>>
>>//
>> //Create a Kafka consumer
>> //
>> val dataStream =  streamExecEnv
>>.addSource(new FlinkKafkaConsumer011[String](topicsValue, new
>> SimpleStringSchema(), properties))
>>  //
>>  //
>>   val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv)
>>   val table1: Table = tableEnv.fromDataStream(dataStream, 'key, 'ticker,
>> 'timeissued, 'price)
>>
>> While compiling it throws this error
>>
>> [error]
>> /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:169:
>> overloaded method value fromDataStream with alternatives:
>> [error]   [T](dataStream:
>> org.apache.flink.streaming.api.datastream.DataStream[T], fields:
>> String)org.apache.flink.table.api.Table 
>> [error]   [T](dataStream:
>> org.apache.flink.streaming.api.datastream.DataStream[T])org.apache.flink.table.api.Table
>> [error]  cannot be applied to
>> (org.apache.flink.streaming.api.datastream.DataStreamSource[String],
>> Symbol, Symbol, Symbol, Symbol)
>> [error]   val table1: Table = tableEnv.fromDataStream(dataStream, 'key,
>> 'ticker, 'timeissued, 'price)
>> [error]^
>> [error] one error found
>> [error] (compile:compileIncremental) Compilation failed
>>
>> The topic is very simple, it is comma separated prices. I tried
>> mapFunction and flatMap but neither worked!
>>
>> Thanks,
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>
>


Re: AM Delegation Token Regeneration

2018-08-01 Thread Paul Lam
Hi Chen,
Thanks for the quick reply! I’ve read the design document and it is very much 
what I’m looking for. And I think the design was absorbed in FLIP-26, right? I 
will keep watching this FLIP. Thanks again.

Best regards, 
Paul Lam