Regarding implementation of aggregate function using a ProcessFunction

2018-09-27 Thread Gaurav Luthra
Hi,

As we are aware, Currently we cannot use RichAggregateFunction in
aggregate() method upon windowed stream. So, To access the state in your
customAggregateFunction, you can implement it using a ProcessFuntion.
This issue is faced by many developers.
So, someone must have implemented or tried to implement it. So, kindly share
your feedback on this.
As I need to implement this.

Thanks & Regards
Gaurav Luthra



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: FlinkCEP, circular references and checkpointing failures

2018-09-27 Thread Shailesh Jain
Hi Dawid,

Thanks for your time on this. The diff should have pointed out only the top
3 commits, but since it did not, it is possible I did not rebase my branch
against 1.4.2 correctly. I'll check this out and get back to you if I hit
the same issue again.

Thanks again,
Shailesh

On Thu, Sep 27, 2018 at 1:00 PM Dawid Wysakowicz 
wrote:

> Hi Shailesh,
>
> I am afraid it is gonna be hard to help you, as this branch differs
> significantly from 1.4.2 release (I've done diff across your branch and
> tag/release-1.4.2). Moreover the code in the branch you've provided still
> does not correspond to the lines in the exception you've posted previously.
> Could you check if the problem occurs on vanilla flink as well?
>
> Best,
>
> Dawid
>
> On 27/09/18 08:22, Shailesh Jain wrote:
>
> Hi Dawid,
>
> Yes, it is version 1.4.2. We are running vanilla flink, but have added a
> couple of changes in the CEP operator specifically (top 3 commits here:
> https://github.com/jainshailesh/flink/commits/poc_on_1.4.2). Changes I've
> made to CEP operators do not touch the checkpointing path, just overloading
> the operator for a specific way of handling event time.
>
> We are hitting this in production, so I'm not sure it'll be feasible to
> move to 1.6.0 immediately, but eventually yes.
>
> Thanks,
> Shailesh
>
> On Wed, Sep 26, 2018 at 5:44 PM Dawid Wysakowicz 
> wrote:
>
>> Hi Shailesh,
>>
>> Are you sure you are using version 1.4.2? Do you run a vanilla flink, or
>> have you introduced some changes? I am asking cause the lines in stacktrace
>> does not align with the source code for 1.4.2.
>>
>> Also it is a different exception than the one in the issue you've linked,
>> so if it is a problem than it is definitely a different one. Last thing I
>> would recommend upgrading to the newest version, as we rewritten the
>> SharedBuffer implementation in 1.6.0.
>>
>> Best,
>>
>> Dawid
>>
>> On 26/09/18 13:50, Shailesh Jain wrote:
>>
>> Hi,
>>
>> I think I've hit this same issue on a 3 node standalone cluster (1.4.2)
>> using HDFS (2.8.4) as state backend.
>>
>> 2018-09-26 17:07:39,370 INFO
>> org.apache.flink.runtime.taskmanager.Task - Attempting
>> to fail task externally SelectCepOperator (1/1)
>> (3bec4aa1ef2226c4e0c5ff7b3860d340).
>> 2018-09-26 17:07:39,370 INFO
>> org.apache.flink.runtime.taskmanager.Task -
>> SelectCepOperator (1/1) (3bec4aa1ef2226c4e0c5ff7b3860d340) switched from
>> RUNNING to FAILED.
>> AsynchronousException{java.lang.Exception: Could not materialize
>> checkpoint 6 for operator SelectCepOperator (1/1).}
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:948)
>> at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.Exception: Could not materialize checkpoint 6 for
>> operator SelectCepOperator (1/1).
>> ... 6 more
>> Caused by: java.util.concurrent.ExecutionException:
>> java.lang.NullPointerException
>> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>> at
>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
>> ... 5 more
>> Suppressed: java.lang.Exception: Could not properly cancel managed
>> keyed state future.
>> at
>> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:91)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:976)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:939)
>> ... 5 more
>> Caused by: java.util.concurrent.ExecutionException:
>> java.lang.NullPointerException
>> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>> at
>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>> at
>> org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:66)
>> at
>> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:89)
>> ... 7 more
>> Caused by: java.lang.NullPointerException
>> at
>> org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:954)
>> at
>> org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:825)
>> at
>> 

Re: In-Memory Lookup in Flink Operators

2018-09-27 Thread Lasse Nedergaard
Hi. 

We have created our own database source that pools the data with a configured 
interval. We then use a co processed function. It takes to input one from our 
database and one from our data input. I require that you keyby with the 
attributes you use lookup in your map function. 
To delay your data input until your database lookup is done first time is not 
simple but a simple solution could be to implement a delay operation or keep 
the data in your process function until data arrive from your database stream. 

Med venlig hilsen / Best regards
Lasse Nedergaard


> Den 28. sep. 2018 kl. 06.28 skrev Chirag Dewan :
> 
> Hi,
> 
> I saw Apache Flink User Mailing List archive. - static/dynamic lookups in 
> flink streaming being discussed, and then I saw this FLIP 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API.
>  
> 
> I know we havent made much progress on this topic. I still wanted to put 
> forward my problem statement around this. 
> 
> I am also looking for a dynamic lookup in Flink operators. I actually want to 
> pre-fetch various Data Sources, like DB, Filesystem, Cassandra etc. into 
> memory. Along with that, I have to ensure a refresh of in-memory lookup table 
> periodically. The period being a configurable parameter. 
> 
> This is what a map operator would look like with lookup: 
> 
> -> Load in-memory lookup - Refresh timer start
> -> Stream processing start
> -> Call lookup
> -> Use lookup result in Stream processing
> -> Timer elapsed -> Reload lookup data source into in-memory table
> -> Continue processing
> 
> 
>  My concern around these are : 
> 
> 1) Possibly storing the same copy of data in every Task slots memory or state 
> backend(RocksDB in my case).
> 2) Having a dedicated refresh thread for each subtask instance(possibly, 
> every Task Manager having multiple refresh thread)
> 
> Am i thinking in the right direction? Or missing something very obvious? It 
> confusing.
> 
> Any leads are much appreciated. Thanks in advance.
> 
> Cheers, 
> Chirag


In-Memory Lookup in Flink Operators

2018-09-27 Thread Chirag Dewan
Hi,
I saw Apache Flink User Mailing List archive. - static/dynamic lookups in flink 
streaming being discussed, and then I saw this FLIP 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API.
 
I know we havent made much progress on this topic. I still wanted to put 
forward my problem statement around this. 
I am also looking for a dynamic lookup in Flink operators. I actually want to 
pre-fetch various Data Sources, like DB, Filesystem, Cassandra etc. into 
memory. Along with that, I have to ensure a refresh of in-memory lookup table 
periodically. The period being a configurable parameter. 
This is what a map operator would look like with lookup: 
-> Load in-memory lookup - Refresh timer start-> Stream processing start-> Call 
lookup-> Use lookup result in Stream processing
-> Timer elapsed -> Reload lookup data source into in-memory table-> Continue 
processing

 My concern around these are : 
1) Possibly storing the same copy of data in every Task slots memory or state 
backend(RocksDB in my case).2) Having a dedicated refresh thread for each 
subtask instance(possibly, every Task Manager having multiple refresh thread)
Am i thinking in the right direction? Or missing something very obvious? It 
confusing.
Any leads are much appreciated. Thanks in advance.
Cheers, Chirag

Re: New received containers silently lost during job auto restarts

2018-09-27 Thread Paul Lam
Hi everyone,

Today I get this error again in another job, and I find some logs indicating 
it’s probably related to the delegation token. 

```
2018-09-28 09:49:09,668 INFO  org.apache.flink.yarn.YarnResourceManager 
- Closing TaskExecutor connection 
container_1536852167599_1262075_03_10 because: token (HDFS_DELEGATION_TOKEN 
token 3647547 for gdc_sa) can't be found in cache
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
 token (HDFS_DELEGATION_TOKEN token 3647547 for gdc_sa) can't be found in cache
at org.apache.hadoop.ipc.Client.call(Client.java:1466)
at org.apache.hadoop.ipc.Client.call(Client.java:1403)
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230)
at com.sun.proxy.$Proxy15.getFileInfo(Unknown Source)
at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:757)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:256)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104)
at com.sun.proxy.$Proxy16.getFileInfo(Unknown Source)
at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:2102)
at 
org.apache.hadoop.hdfs.DistributedFileSystem$19.doCall(DistributedFileSystem.java:1214)
at 
org.apache.hadoop.hdfs.DistributedFileSystem$19.doCall(DistributedFileSystem.java:1210)
at 
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at 
org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1210)
at org.apache.hadoop.yarn.util.FSDownload.copy(FSDownload.java:251)
at org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:61)
at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:359)
at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:357)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1707)
at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:356)
at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:60)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
```

In addition, the failed subtask is always the first one (subtask 1). I guess 
that’s because it’s the first one to run and the first one to meet this 
problem. 
But as I’m submitting the job wit my keytab, it should not be bothered by the 
delegation token. 

Now I have three concerns:
1. Why missing delegation tokens in cache would make a subtask fail?
2. Why it can’t be recovered after a job auto restart? The subtask gets stuck 
in scheduled status and seems like never deployed.
3. What would happen if a subtask can’t be deployed on the new received 
container? Would JobManager grab more and more containers and consider the 
situation just normal before NoResourceAvailableException?

I will keep working on it because it’s critical to me, and any help is highly 
appreciated! Thank you!

Best,
Paul Lam


> 在 2018年9月27日,20:22,Kostas Kloudas  写道:
> 
> Hi Paul,
> 
> I am also cc’ing Till and Gary who may be able to help, but to give them more 
> information,
> it would help if you told us which Flink version you are using.
> 
> Cheers,
> Kostas
> 
>> On Sep 27, 2018, at 1:24 PM, Paul Lam  wrote:
>> 
>> Hi,
>> 
>> One of my Flink on YARN jobs got into a weird situation after a fail-fast 
>> restart. The restart was triggered by loss of a TaskManager, but when the 
>> job was recovering, one of its subtask (1/24) couldn’t be deployed, and 
>> finally failed with NoResourceAvailableException. 
>> 
>> Looking into the logs, I find the JobManager was requesting containers for 
>> the subtask again and again, but indeed it got a container in each round of 
>> requests, which seemed to be lost silently (with no error logs) or never 
>> registered. 
>> 
>> At last, I canceled the job and resubmitted it, and the problem was gone. 
>> 
>> I thought it might 

Re: Question about sharing resource among slots with a TM

2018-09-27 Thread Vishal Santoshi
Aah got it.
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/debugging_classloading.html


On Thu, Sep 27, 2018 at 11:04 AM Vishal Santoshi 
wrote:

> Makes sense. An additional query.. How does flink handle class loading.
> Is  there a separate class loader per job ?  In essence if I have a static
> member in a class in a job, it would be highly inapprpraite that that
> static member is available to another job.
>
> On Thu, Sep 27, 2018, 8:15 AM Kostas Kloudas 
> wrote:
>
>> Hi Vishal,
>>
>> Currently there is no way to share (user-defined) resources between tasks
>> on the same TM.
>> So I suppose that a singleton is the best way to go for now.
>>
>> Cheers,
>> Kostas
>>
>> On Sep 27, 2018, at 3:43 AM, Hequn Cheng  wrote:
>>
>> Hi vishal,
>>
>> Yes, we can define a static connection to reuse it or implement a
>> connection pool. Maybe we can also ask the problem in hbase community and
>> see if there are other better ways.
>>
>> Best, Hequn
>>
>>
>> On Thu, Sep 27, 2018 at 12:40 AM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> any one?
>>>
>>> On Wed, Sep 26, 2018 at 9:15 AM Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
 We use Hbase extensively and the general pattern we follow is acquiring
 a Connection in the open() method of a RichFunction and closing in the
 close() method. Of course that implies that if we have a parallelism of n,
 there will be n Hbase Connections. We want to use the fact that Hbase
 connection is inherently thread safe
 http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/client/Connection.html
 and it is a pretty heavy object to begin with and thus makes sense to share
 a Connection across slots in a single TM. We could do it through a a static
 singleton pattern but was wondering if and whether there is an established
 paradigm for sharing a resource 




>>


Re: JobManager in HA with a single node loses leadership

2018-09-27 Thread 朱翥
Hi Julio,

If you are using a HA mode depending on other services like ZooKeeper, you
can also check whether that service is OK when the JM lost leadership.
In our experience, network partitioning of JM to ZK and ZK object exceeding
max size limit can also lead to JM leadership lost.

vino yang  于2018年9月28日周五 上午9:24写道:

> Hi Julio,
>
> Which version of Flink are you using? If it is 1.5+, then you can try to
> increase the heartbeat timeout by configuring it[1].
> In addition, the possible cause is that the load of tm is too heavy, for
> example, because the Full GC causes JVM stalls,
> or deadlocks and other issues may cause heartbeat timeout. Please closely
> monitor the relevant indicators of tm.
>
> In addition, it is better to have two or more JM instances in Standalone
> HA mode.
>
> Thanks, vino.
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/config.html#heartbeat-manager
>
> Tzu-Li Chen  于2018年9月28日周五 上午8:56写道:
>
>> Hi Julio,
>>
>> If the single JobManager lost temporarily and reconnected later, it could
>> be regranted leadership. And if you use Flink on Yarn, the Yarn RM
>> (according to configuration) would start a new ApplicationMaster to act as
>> a take-over JobManager.
>>
>> Best,
>> tison.
>>
>>
>> Julio Biason  于2018年9月28日周五 上午3:56写道:
>>
>>> Hey guys,
>>>
>>> I'm seeing a weird error happening here: We have our JobManager
>>> configured in HA mode, but with a single JobManager in the cluster (the
>>> second one was in another machine that start showing flaky network, so we
>>> removed it). Everything is running in Standalone mode.
>>>
>>> Sometimes, the jobs are restarting and the JobManager logs shows this:
>>>
>>> org.apache.flink.util.FlinkException: JobManager responsible for
>>> bbbae593c175e0c17c32718a56527ab9 lost the
>>> leadership.
>>> at
>>> org.apache.flink.runtime.taskexecutor.TaskExecutor.closeJobManagerConnection(TaskExecutor.java:1167)
>>>
>>> at
>>> org.apache.flink.runtime.taskexecutor.TaskExecutor.access$1200(TaskExecutor.java:137)
>>> at
>>> org.apache.flink.runtime.taskexecutor.TaskExecutor$JobManagerHeartbeatListener.lambda$notifyHeartbeatTimeout$0(TaskExecutor.java:1608)
>>> at
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
>>> at
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
>>>
>>> at
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>>> at
>>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>>>
>>> at
>>> akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>>> at
>>> akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>>>
>>> at
>>> akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>> at
>>> akka.actor.ActorCell.invoke(ActorCell.scala:495)
>>>
>>> at
>>> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>> at
>>> akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>>
>>> at
>>> akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>>
>>> at
>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>
>>> at
>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>
>>> at
>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>
>>> at
>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>
>>> Caused by: java.util.concurrent.TimeoutException: The heartbeat of
>>> JobManager with id d4ca7942b20bdf87ccf9335f698a5029 timed
>>> out.
>>> at
>>> org.apache.flink.runtime.taskexecutor.TaskExecutor$JobManagerHeartbeatListener.lambda$notifyHeartbeatTimeout$0(TaskExecutor.java:1609)
>>>
>>> ... 15
>>> more
>>>
>>>
>>> If there is a single JobManager in the cluster... who is taking the
>>> leadership? Is that even possible?
>>>
>>> --
>>> *Julio Biason*, Sofware Engineer
>>> *AZION*  |  Deliver. Accelerate. Protect.
>>> Office: +55 51 3083 8101   |  Mobile: +55 51
>>> *99907 0554*
>>>
>>


Re: LIMIT and ORDER BY in hop window is not supported?

2018-09-27 Thread 徐涛
Hi Hequn,
If limit n is not supported in streaming, how to solve top n problem in 
stream scenario?

Best
Henry

> 在 2018年9月28日,上午12:03,Hequn Cheng  写道:
> 
> Hi Henry,
> 
> Currently, Order By is supported in Streaming while Limit is only 
> supported in Batch. Another thing to be noted is, for Order by, the result of 
> streaming queries must be primarily sorted on an ascending time attribute[1]. 
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html#orderby--limit
>  
> 
> 
> 
> 
> On Thu, Sep 27, 2018 at 9:05 PM 徐涛  > wrote:
> Hi,
>   I want a top n result on each hop window result, but some error throws 
> out when I add the order by sentence or the limit sentence, so how do I 
> implement such case ?
>   Thanks a lot.
> 
> SELECT
> trackId as id,track_title as description, count(*) as cnt
> FROM
> play
> WHERE
> appName='play.statistics.trace'
> GROUP BY
> HOP(started_at_ts, INTERVAL '1' SECOND, INTERVAL '5' 
> MINUTE),trackId,track_title
> ORDER BY
> cnt desc
> LIMIT 10
> 
> FlinkLogicalSort(sort0=[$2], dir0=[DESC])
>   FlinkLogicalWindowAggregate(group=[{1, 2}], cnt=[COUNT()])
> FlinkLogicalCalc(expr#0..4=[{inputs}], 
> expr#5=[_UTF-16LE'play.statistics.trace'], expr#6=[=($t0, $t5)], 
> started_at_ts=[$t4], trackId=[$t1], track_title=[$t2], $condition=[$t6])
>   FlinkLogicalNativeTableScan(table=[[play]])
> 
> This exception indicates that the query uses an unsupported SQL feature.
> Please check the documentation for the set of currently supported SQL 
> features.
>   at 
> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:275)
>   at 
> org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:845)
>   at 
> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:892)
>   at 
> org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:344)
>   at 
> org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:786)
>   at 
> org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:723)
>   at 
> org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:683)
> 
> Best
> Henry



Re: JobManager in HA with a single node loses leadership

2018-09-27 Thread vino yang
Hi Julio,

Which version of Flink are you using? If it is 1.5+, then you can try to
increase the heartbeat timeout by configuring it[1].
In addition, the possible cause is that the load of tm is too heavy, for
example, because the Full GC causes JVM stalls,
or deadlocks and other issues may cause heartbeat timeout. Please closely
monitor the relevant indicators of tm.

In addition, it is better to have two or more JM instances in Standalone HA
mode.

Thanks, vino.

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/config.html#heartbeat-manager

Tzu-Li Chen  于2018年9月28日周五 上午8:56写道:

> Hi Julio,
>
> If the single JobManager lost temporarily and reconnected later, it could
> be regranted leadership. And if you use Flink on Yarn, the Yarn RM
> (according to configuration) would start a new ApplicationMaster to act as
> a take-over JobManager.
>
> Best,
> tison.
>
>
> Julio Biason  于2018年9月28日周五 上午3:56写道:
>
>> Hey guys,
>>
>> I'm seeing a weird error happening here: We have our JobManager
>> configured in HA mode, but with a single JobManager in the cluster (the
>> second one was in another machine that start showing flaky network, so we
>> removed it). Everything is running in Standalone mode.
>>
>> Sometimes, the jobs are restarting and the JobManager logs shows this:
>>
>> org.apache.flink.util.FlinkException: JobManager responsible for
>> bbbae593c175e0c17c32718a56527ab9 lost the
>> leadership.
>> at
>> org.apache.flink.runtime.taskexecutor.TaskExecutor.closeJobManagerConnection(TaskExecutor.java:1167)
>>
>> at
>> org.apache.flink.runtime.taskexecutor.TaskExecutor.access$1200(TaskExecutor.java:137)
>> at
>> org.apache.flink.runtime.taskexecutor.TaskExecutor$JobManagerHeartbeatListener.lambda$notifyHeartbeatTimeout$0(TaskExecutor.java:1608)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
>>
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>> at
>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>>
>> at
>> akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>> at
>> akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>>
>> at
>> akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>> at
>> akka.actor.ActorCell.invoke(ActorCell.scala:495)
>>
>> at
>> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>> at
>> akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>
>> at
>> akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>
>> at
>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>
>> at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>
>> at
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>
>> at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>
>> Caused by: java.util.concurrent.TimeoutException: The heartbeat of
>> JobManager with id d4ca7942b20bdf87ccf9335f698a5029 timed
>> out.
>> at
>> org.apache.flink.runtime.taskexecutor.TaskExecutor$JobManagerHeartbeatListener.lambda$notifyHeartbeatTimeout$0(TaskExecutor.java:1609)
>>
>> ... 15
>> more
>>
>>
>> If there is a single JobManager in the cluster... who is taking the
>> leadership? Is that even possible?
>>
>> --
>> *Julio Biason*, Sofware Engineer
>> *AZION*  |  Deliver. Accelerate. Protect.
>> Office: +55 51 3083 8101   |  Mobile: +55 51
>> *99907 0554*
>>
>


Re: JobManager in HA with a single node loses leadership

2018-09-27 Thread Tzu-Li Chen
Hi Julio,

If the single JobManager lost temporarily and reconnected later, it could
be regranted leadership. And if you use Flink on Yarn, the Yarn RM
(according to configuration) would start a new ApplicationMaster to act as
a take-over JobManager.

Best,
tison.


Julio Biason  于2018年9月28日周五 上午3:56写道:

> Hey guys,
>
> I'm seeing a weird error happening here: We have our JobManager configured
> in HA mode, but with a single JobManager in the cluster (the second one was
> in another machine that start showing flaky network, so we removed it).
> Everything is running in Standalone mode.
>
> Sometimes, the jobs are restarting and the JobManager logs shows this:
>
> org.apache.flink.util.FlinkException: JobManager responsible for
> bbbae593c175e0c17c32718a56527ab9 lost the
> leadership.
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.closeJobManagerConnection(TaskExecutor.java:1167)
>
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.access$1200(TaskExecutor.java:137)
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor$JobManagerHeartbeatListener.lambda$notifyHeartbeatTimeout$0(TaskExecutor.java:1608)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
>
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
> at
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>
> at
> akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> at
> akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>
> at
> akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> at
> akka.actor.ActorCell.invoke(ActorCell.scala:495)
>
> at
> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> at
> akka.dispatch.Mailbox.run(Mailbox.scala:224)
>
> at
> akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>
> at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> Caused by: java.util.concurrent.TimeoutException: The heartbeat of
> JobManager with id d4ca7942b20bdf87ccf9335f698a5029 timed
> out.
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor$JobManagerHeartbeatListener.lambda$notifyHeartbeatTimeout$0(TaskExecutor.java:1609)
>
> ... 15
> more
>
>
> If there is a single JobManager in the cluster... who is taking the
> leadership? Is that even possible?
>
> --
> *Julio Biason*, Sofware Engineer
> *AZION*  |  Deliver. Accelerate. Protect.
> Office: +55 51 3083 8101   |  Mobile: +55 51
> *99907 0554*
>


Flink design questions - parallel processing and state management

2018-09-27 Thread lg...@yahoo.com
Hi,
We are new to Flink and would like to get some ideas of achieving parallelism 
and for maintaining state in Flink for few interesting problems.
We receive files that contain multiple types of events. We need to process them 
in the following way:

1. Multiple event sources send event files to our system.
2. Files from different event sources should be processed in parallel.
3. Each file contains events with one or more event types. We want to process 
each event file as follows:
3.1. Group all events of the same event type (e.g. EvTypeA, EvTypeB, EvTypeC, 
etc.).
3.2. Sequence the event groups in a predefined order and process them in that 
order as follows (EvTypeA-->EvTypeB-->EvTypeC) .
3.3 Based on an external configuration, the events in the each event type group 
should be processed either sequentially (based on the event time specified in 
each event) or in parallel. Each event type will get processed using a 
different Java class.
3.4. After all events for each event type are processed, process the events for 
the next event type group and so on. It is important to note that all events in 
each event type group need to be processed before the events in the next event 
group can be processed. Also as mentioned above, events in each event type 
group can be processed sequentially or in parallel based on a configuration.
3.5 Different files may contain different types of events and different number 
of event types. E.g. one file may contain EvTypeA+EvTypeB and another file may 
contain EvTypeA+EvTypeB+EvTypeC. So the execution plan for each file will be 
different.

Questions:
1. The parallelism should be achieved across multiple event sources and also 
within each event type group. How can we achieve this type of parallelism using 
Flink?
2. We want to record the progress of each file processing. So if a system 
failure occurs while processing a file, we will be able to resume from where we 
had left. For example, if a file contains 1000 events, 100 events of EvTypeA, 
700 events of EvTypeB, 200 events of EvTypeC and the system failure occurs 
after processing all 100 events of EvTypeA and after processing 150 events of 
EvTypeB, the system should resume  processing from 151st event of EvTypeB.
3. We also want to show the progress of each file processing status on a 
dashboard that shows all the files being processed and the progress status for 
each file (e.g. 250 out of 1000 event processed). Should we use Flink state 
management to keep status of each processed event or should we use our own 
state management and why?
4. If the Flink task node that was processing an event crashes, will Flink 
route that task to another node automatically?

 A few lines of Flink code that shows how to solve the above problems will be 
really helpful. Thank you for any help.

Search phrase: EventParallelism

- lgfmt


Re: RocksDB Read IOPs

2018-09-27 Thread Ning Shi
Yun,

> Then I would share some experience about tuning RocksDB performance. Since 
> you did not cache index and filter in block cache, it's no worry about the 
> competition between data blocks and index blocks[1]. And to improve 
> the read performance, you should increase your block cache size to 256MB or 
> even 512MB. What's more, writer buffer in rocksDB also acts as a role for 
> reading, from our experience, we use 4 max write buffers and 32MB each, e.g.  
> setMaxWriteBufferNumber(4) and setWriteBufferSize(32*1024*1024)

Thank you very much for the hints. I read that tuning guide and added
some settings. Now it's doing much much better. The IOPs stays under
300 except for when checkpoints are taken, then it spikes to about
1.5k, which is totally expected.

For reference, the following are the settings I'm using right now. The
reason I didn't bump block cache size is because we have limited
amount of memory per instance (30GB).

@Override
public DBOptions createDBOptions(DBOptions currentOptions) {
return currentOptions
.setIncreaseParallelism(4)
.setMaxBackgroundFlushes(1)
.setMaxBackgroundCompactions(1)
.setUseFsync(false)
.setMaxOpenFiles(-1);
}

@Override
public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions
currentOptions) {
final long blockCacheSize = 64 * 1024 * 1024;
final long writeBufferSize = 64 * 1024 * 1024;

return currentOptions
.setCompressionType(CompressionType.LZ4_COMPRESSION)

.setCompactionStyle(CompactionStyle.LEVEL)
.setLevel0FileNumCompactionTrigger(10)
.setLevel0SlowdownWritesTrigger(20)
.setLevel0StopWritesTrigger(40)

.setWriteBufferSize(writeBufferSize) // In-memory memtable size
.setMaxWriteBufferNumber(5) // Max number of memtables
before stalling writes
.setMinWriteBufferNumberToMerge(2) // Merge two
memtables together to reduce duplicate keys

.setTargetFileSizeBase(writeBufferSize) // L0 file
size, same as memtable size
.setMaxBytesForLevelBase(writeBufferSize * 8)

.setTableFormatConfig(
new BlockBasedTableConfig()
.setFilter(new BloomFilter())
.setBlockCacheSize(blockCacheSize)
);
}

Ning


JobManager in HA with a single node loses leadership

2018-09-27 Thread Julio Biason
Hey guys,

I'm seeing a weird error happening here: We have our JobManager configured
in HA mode, but with a single JobManager in the cluster (the second one was
in another machine that start showing flaky network, so we removed it).
Everything is running in Standalone mode.

Sometimes, the jobs are restarting and the JobManager logs shows this:

org.apache.flink.util.FlinkException: JobManager responsible for
bbbae593c175e0c17c32718a56527ab9 lost the
leadership.
at
org.apache.flink.runtime.taskexecutor.TaskExecutor.closeJobManagerConnection(TaskExecutor.java:1167)

at
org.apache.flink.runtime.taskexecutor.TaskExecutor.access$1200(TaskExecutor.java:137)
at
org.apache.flink.runtime.taskexecutor.TaskExecutor$JobManagerHeartbeatListener.lambda$notifyHeartbeatTimeout$0(TaskExecutor.java:1608)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)

at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
at
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)

at
akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at
akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)

at
akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at
akka.actor.ActorCell.invoke(ActorCell.scala:495)

at
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at
akka.dispatch.Mailbox.run(Mailbox.scala:224)

at
akka.dispatch.Mailbox.exec(Mailbox.scala:234)

at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Caused by: java.util.concurrent.TimeoutException: The heartbeat of
JobManager with id d4ca7942b20bdf87ccf9335f698a5029 timed
out.
at
org.apache.flink.runtime.taskexecutor.TaskExecutor$JobManagerHeartbeatListener.lambda$notifyHeartbeatTimeout$0(TaskExecutor.java:1609)

... 15
more


If there is a single JobManager in the cluster... who is taking the
leadership? Is that even possible?

-- 
*Julio Biason*, Sofware Engineer
*AZION*  |  Deliver. Accelerate. Protect.
Office: +55 51 3083 8101   |  Mobile: +55 51
*99907 0554*


Re: Can't get the FlinkKinesisProducer to work against Kinesalite for tests

2018-09-27 Thread Bruno Aranda
Hi again,

We managed at the end to get data into Kinesalite using the
FlinkKinesisProducer, but to do so, we had to use different configuration,
such as ignoring the 'aws.endpoint' setting and going for the ones that the
Kinesis configuration will expect. So, to our FlinkKinesisProducer we pass
configuration such as:

producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
producerConfig.put("KinesisEndpoint",
"localhost")producerConfig.put("KinesisPort",
"4567")producerConfig.put("VerifyCertificate", "false")

We had to make sure that Kinesalite itself was being started with the
`--ssl` parameter, in order to use TLS and available thought HTTPS.

And, very importantly as well, our tests use Docker to run and we have find
out just before throwing the towel that for this you can not use an
Alpine-based image. If you want the Amazon KPL to work fine, it will need
to be one of the Debian images running in Docker.

Hope this saves someone all the days we have spent looking at it :)

Cheers,

Bruno

On Wed, 26 Sep 2018 at 14:59 Bruno Aranda  wrote:

> Hi,
>
> We have started to use Kinesis with Flink and we need to be able to test
> when a Flink jobs writes to Kinesis. For that, we use a docker image with
> Kinesalite.
>
> To configure the producer, we do like it is explained in the docs [1].
>
> However, if we use this code, the job submission is going to fail, because
> the Flink Kinesis connector expect the configuration to have either the
> endpoint or the region, but not both, or none. (there is a typo in the
> error message as well where 'aws.region' is metioned twice) [2].
>
> However, if we only specify the endpoint, then the KPL will fail
> complaining that there is no Region configured. It does look like Kinesis
> may not be trying to set up the endpoint? We are confused.
>
> On the other hand, the Flink consumer works as expected and the endpoint
> pointing to Kinesalite works fine. The consumer follows a different path
> and creates the AWS client through a call to AWSUtil [3], which will take
> the endpoint into account.
>
> Are we missing something? We have tried this in Flink versions from 1.3.2
> to 1.6.1, building the kinesis connector against the latests KPLs.
>
> Any help is appreciated,
>
> Thanks!
>
> Bruno
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kinesis.html#using-non-aws-kinesis-endpoints-for-testing
> [2]
> https://github.com/apache/flink/blob/a36d1999f743d00c4fac8fdf61ad85a4b5d5f3bc/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java#L272
> [3]
> https://github.com/apache/flink/blob/a36d1999f743d00c4fac8fdf61ad85a4b5d5f3bc/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java#L66
>
>


Re: LIMIT and ORDER BY in hop window is not supported?

2018-09-27 Thread Hequn Cheng
Hi Henry,

Currently, Order By is supported in Streaming while Limit is only
supported in Batch. Another thing to be noted is, for Order by, the result
of streaming queries must be primarily sorted on an ascending time
attribute[1].

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html#orderby--limit



On Thu, Sep 27, 2018 at 9:05 PM 徐涛  wrote:

> Hi,
> I want a top n result on each hop window result, but some error throws out
> when I add the order by sentence or the limit sentence, so how do I
> implement such case ?
> Thanks a lot.
>
> SELECT
>
> trackId as id,track_title as description, count(*) as cnt
>
> FROM
>
> play
>
> WHERE
>
> appName='play.statistics.trace'
>
> GROUP BY
>
> HOP(started_at_ts, INTERVAL '1' SECOND, INTERVAL '5' 
> MINUTE),trackId,track_title
>
> ORDER BY
>
> cnt desc
>
> LIMIT 10
>
>
> FlinkLogicalSort(sort0=[$2], dir0=[DESC])
>   FlinkLogicalWindowAggregate(group=[{1, 2}], cnt=[COUNT()])
> FlinkLogicalCalc(expr#0..4=[{inputs}],
> expr#5=[_UTF-16LE'play.statistics.trace'], expr#6=[=($t0, $t5)],
> started_at_ts=[$t4], trackId=[$t1], track_title=[$t2], $condition=[$t6])
>   FlinkLogicalNativeTableScan(table=[[play]])
>
> This exception indicates that the query uses an unsupported SQL feature.
> Please check the documentation for the set of currently supported SQL
> features.
> at
> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:275)
> at
> org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:845)
> at
> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:892)
> at
> org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:344)
> at
> org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:786)
> at
> org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:723)
> at
> org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:683)
>
>
> Best
> Henry
>


Re: Question about sharing resource among slots with a TM

2018-09-27 Thread Vishal Santoshi
Makes sense. An additional query.. How does flink handle class loading. Is
there a separate class loader per job ?  In essence if I have a static
member in a class in a job, it would be highly inapprpraite that that
static member is available to another job.

On Thu, Sep 27, 2018, 8:15 AM Kostas Kloudas 
wrote:

> Hi Vishal,
>
> Currently there is no way to share (user-defined) resources between tasks
> on the same TM.
> So I suppose that a singleton is the best way to go for now.
>
> Cheers,
> Kostas
>
> On Sep 27, 2018, at 3:43 AM, Hequn Cheng  wrote:
>
> Hi vishal,
>
> Yes, we can define a static connection to reuse it or implement a
> connection pool. Maybe we can also ask the problem in hbase community and
> see if there are other better ways.
>
> Best, Hequn
>
>
> On Thu, Sep 27, 2018 at 12:40 AM Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
>
>> any one?
>>
>> On Wed, Sep 26, 2018 at 9:15 AM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> We use Hbase extensively and the general pattern we follow is acquiring
>>> a Connection in the open() method of a RichFunction and closing in the
>>> close() method. Of course that implies that if we have a parallelism of n,
>>> there will be n Hbase Connections. We want to use the fact that Hbase
>>> connection is inherently thread safe
>>> http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/client/Connection.html
>>> and it is a pretty heavy object to begin with and thus makes sense to share
>>> a Connection across slots in a single TM. We could do it through a a static
>>> singleton pattern but was wondering if and whether there is an established
>>> paradigm for sharing a resource 
>>>
>>>
>>>
>>>
>


Re: Metrics: (1) Histogram in prometheus and (2) meter by event_time

2018-09-27 Thread Kostas Kloudas
Hi Averell,

> On Sep 27, 2018, at 3:09 PM, Averell  wrote:
> 
> Hi Kostas,
> 
> Yes, I want them as metrics, as they are purely for monitoring purpose.
> There's no need of fault tolerance.
> 
> If I use side-output, for example for that metric no.1, I would need a
> tumbling AllWindowFunction, which, as I understand, would introduce some
> delay to both the normal processing flow, and to the checkpoint process. 
> 

Side-output may introduce all that but you can always do something like:

mymainStream = …

myMainStream.myMainComputation….

muMainStream.windowAll().myMonitoringComputation…

This does not affect the main path of your computation (if this is your only 
concern).

> I already tried to follow the referencing web page that you sent. However, I
> could not know how to have what I want.
> For example, with metrics no.1 - meter: org.apache.flink.metrics.Meter only
> provides markEvent(), which marks an event to that Meter. There is no option
> to provide the event_time, and processing_time is always used. So my graph
> is spread over time like the one below.
> 
>  
> 

Event time is a notion of Flink and a property of your data (timestamp).

Metric systems like Prometheus take whatever you expose as metric and attach a 
timestamp based on 
the current wall-clock time, as for them, the time an event occurred is the 
time that they got that metric.

So, if you want event-time computations, then those should be done in Flink.

> For metrics no.2 - histogram: What I can see at Prometheus is the calculated
> percentile values (0.5, 0.75, 0.9, 0.99, 0.999), which tells me, for
> example: 99% the total number of records had ts1-ts2 <= 350s (which looks
> more like a rolling average). But it doesn't tell me roughly how many % of
> record have diff of 250ms, how many of 260ms, etc...
> 
>  
> 

For this case, the two notions of histograms (yours and Prometheus’) are not 
aligned. So
what you could do, is keep each “bucket” of your histogram as a separate metric 
and 
expose it as such to prometheus. So essentially, you are the one creating the 
histogram.

Metric systems do not perform any (complex) computation for you.

Once again, I would say that these exploratory metrics about your stream fall 
more under the category of
analytics about your input data, rather than “metrics”, but of course feel free 
to disagree :)

Cheers,
Kostas

> Thanks and regards,
> Averell
> 
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Metrics: (1) Histogram in prometheus and (2) meter by event_time

2018-09-27 Thread Averell
Hi Kostas,

Yes, I want them as metrics, as they are purely for monitoring purpose.
There's no need of fault tolerance.

If I use side-output, for example for that metric no.1, I would need a
tumbling AllWindowFunction, which, as I understand, would introduce some
delay to both the normal processing flow, and to the checkpoint process. 

I already tried to follow the referencing web page that you sent. However, I
could not know how to have what I want.
For example, with metrics no.1 - meter: org.apache.flink.metrics.Meter only
provides markEvent(), which marks an event to that Meter. There is no option
to provide the event_time, and processing_time is always used. So my graph
is spread over time like the one below.

 

For metrics no.2 - histogram: What I can see at Prometheus is the calculated
percentile values (0.5, 0.75, 0.9, 0.99, 0.999), which tells me, for
example: 99% the total number of records had ts1-ts2 <= 350s (which looks
more like a rolling average). But it doesn't tell me roughly how many % of
record have diff of 250ms, how many of 260ms, etc...

 

Thanks and regards,
Averell




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


LIMIT and ORDER BY in hop window is not supported?

2018-09-27 Thread 徐涛
Hi,
I want a top n result on each hop window result, but some error throws 
out when I add the order by sentence or the limit sentence, so how do I 
implement such case ?
Thanks a lot.

SELECT
trackId as id,track_title as description, count(*) as cnt
FROM
play
WHERE
appName='play.statistics.trace'
GROUP BY
HOP(started_at_ts, INTERVAL '1' SECOND, INTERVAL '5' 
MINUTE),trackId,track_title
ORDER BY
cnt desc
LIMIT 10

FlinkLogicalSort(sort0=[$2], dir0=[DESC])
  FlinkLogicalWindowAggregate(group=[{1, 2}], cnt=[COUNT()])
FlinkLogicalCalc(expr#0..4=[{inputs}], 
expr#5=[_UTF-16LE'play.statistics.trace'], expr#6=[=($t0, $t5)], 
started_at_ts=[$t4], trackId=[$t1], track_title=[$t2], $condition=[$t6])
  FlinkLogicalNativeTableScan(table=[[play]])

This exception indicates that the query uses an unsupported SQL feature.
Please check the documentation for the set of currently supported SQL features.
at 
org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:275)
at 
org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:845)
at 
org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:892)
at 
org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:344)
at 
org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:786)
at 
org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:723)
at 
org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:683)

Best
Henry

Re: New received containers silently lost during job auto restarts

2018-09-27 Thread Paul Lam
Hi Kostas,

Sorry, I forget that. I'm using Flink 1.5.3.

Best,
Paul Lam

Kostas Kloudas  于2018年9月27日周四 下午8:22写道:

> Hi Paul,
>
> I am also cc’ing Till and Gary who may be able to help, but to give them
> more information,
> it would help if you told us which Flink version you are using.
>
> Cheers,
> Kostas
>
> > On Sep 27, 2018, at 1:24 PM, Paul Lam  wrote:
> >
> > Hi,
> >
> > One of my Flink on YARN jobs got into a weird situation after a
> fail-fast restart. The restart was triggered by loss of a TaskManager, but
> when the job was recovering, one of its subtask (1/24) couldn’t be
> deployed, and finally failed with NoResourceAvailableException.
> >
> > Looking into the logs, I find the JobManager was requesting containers
> for the subtask again and again, but indeed it got a container in each
> round of requests, which seemed to be lost silently (with no error logs) or
> never registered.
> >
> > At last, I canceled the job and resubmitted it, and the problem was
> gone.
> >
> > I thought it might be something wrong with the TaskManager
> initialization, but if it is the case there would be some errors in
> JobManagar’s log, right?
> >
> > Sorry for not being able to reproduce the problem, but does anyone have
> any idea on this? Thanks a lot!
> >
> > This is the cause of the job restart:
> > ```
> > 2018-09-27 12:33:11,639 INFO  org.apache.flink.yarn.YarnResourceManager
>- Closing TaskExecutor connection
> container_1536852167599_809133_02_16 because: Container released on a
> *lost* node
> > 2018-09-27 12:33:11,639 INFO
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  -
> Unregister TaskManager 554e53380a836229ecbc4ccae58f9b0f from the
> SlotManager.
> > 2018-09-27 12:33:11,639 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph-
> LogAndConfigCoMap -> Filter -> FieldRegexFlatMap -> Log2JsonMap -> Sink:
> LogKafkaSink (11/24) (51fedf378844bc7e5d2855f59baafcd0) switched from
> RUNNING to FAILED.
> > org.apache.flink.util.FlinkException: The assigned slot
> container_1536852167599_809133_02_16_0 was removed.
> >   at
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlot(SlotManager.java:786)
> >   at
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlots(SlotManager.java:756)
> >   at
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.internalUnregisterTaskManager(SlotManager.java:948)
> >   at
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.unregisterTaskManager(SlotManager.java:372)
> >   at
> org.apache.flink.runtime.resourcemanager.ResourceManager.closeTaskManagerConnection(ResourceManager.java:802)
> >   at
> org.apache.flink.yarn.YarnResourceManager.lambda$onContainersCompleted$0(YarnResourceManager.java:339)
> >   at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
> >   at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
> >   at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
> >   at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
> >   at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
> >   at
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
> >   at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> >   at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
> >   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> >   at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> >   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> >   at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> >   at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> >   at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> >   at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> >   at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> >   at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> > ```
> >
> > And then JobManager kept requesting and returning containers before
> timeout:
> > ```
> > 2018-09-27 12:38:11,072 INFO  org.apache.flink.yarn.YarnResourceManager
>- Received new container:
> container_1536852167599_809133_02_000832 - Remaining pending container
> requests: 1
> > 2018-09-27 12:38:11,072 INFO  org.apache.flink.yarn.YarnResourceManager
>- Adding keytab
> hdfs://horton2/user/gdc_sa/.flink/application_1536852167599_809133/gdc_sa.keytab
> to the AM container local resource bucket
> > 2018-09-27 12:38:11,338 INFO  org.apache.flink.yarn.YarnResourceManager
>- Creating container launch context for TaskManagers
> > 2018-09-27 

Re: New received containers silently lost during job auto restarts

2018-09-27 Thread Kostas Kloudas
Hi Paul,

I am also cc’ing Till and Gary who may be able to help, but to give them more 
information,
it would help if you told us which Flink version you are using.

Cheers,
Kostas

> On Sep 27, 2018, at 1:24 PM, Paul Lam  wrote:
> 
> Hi,
> 
> One of my Flink on YARN jobs got into a weird situation after a fail-fast 
> restart. The restart was triggered by loss of a TaskManager, but when the job 
> was recovering, one of its subtask (1/24) couldn’t be deployed, and finally 
> failed with NoResourceAvailableException. 
> 
> Looking into the logs, I find the JobManager was requesting containers for 
> the subtask again and again, but indeed it got a container in each round of 
> requests, which seemed to be lost silently (with no error logs) or never 
> registered. 
> 
> At last, I canceled the job and resubmitted it, and the problem was gone. 
> 
> I thought it might be something wrong with the TaskManager initialization, 
> but if it is the case there would be some errors in JobManagar’s log, right? 
> 
> Sorry for not being able to reproduce the problem, but does anyone have any 
> idea on this? Thanks a lot!
> 
> This is the cause of the job restart:
> ```
> 2018-09-27 12:33:11,639 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - Closing TaskExecutor connection 
> container_1536852167599_809133_02_16 because: Container released on a 
> *lost* node
> 2018-09-27 12:33:11,639 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - 
> Unregister TaskManager 554e53380a836229ecbc4ccae58f9b0f from the SlotManager.
> 2018-09-27 12:33:11,639 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- 
> LogAndConfigCoMap -> Filter -> FieldRegexFlatMap -> Log2JsonMap -> Sink: 
> LogKafkaSink (11/24) (51fedf378844bc7e5d2855f59baafcd0) switched from RUNNING 
> to FAILED.
> org.apache.flink.util.FlinkException: The assigned slot 
> container_1536852167599_809133_02_16_0 was removed.
>   at 
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlot(SlotManager.java:786)
>   at 
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlots(SlotManager.java:756)
>   at 
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.internalUnregisterTaskManager(SlotManager.java:948)
>   at 
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.unregisterTaskManager(SlotManager.java:372)
>   at 
> org.apache.flink.runtime.resourcemanager.ResourceManager.closeTaskManagerConnection(ResourceManager.java:802)
>   at 
> org.apache.flink.yarn.YarnResourceManager.lambda$onContainersCompleted$0(YarnResourceManager.java:339)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
>   at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>   at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>   at 
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>   at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>   at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> ```
> 
> And then JobManager kept requesting and returning containers before timeout:
> ```
> 2018-09-27 12:38:11,072 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - Received new container: 
> container_1536852167599_809133_02_000832 - Remaining pending container 
> requests: 1
> 2018-09-27 12:38:11,072 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - Adding keytab 
> hdfs://horton2/user/gdc_sa/.flink/application_1536852167599_809133/gdc_sa.keytab
>  to the AM container local resource bucket
> 2018-09-27 12:38:11,338 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - Creating container launch context for TaskManagers
> 2018-09-27 12:38:11,338 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - Starting TaskManagers
> 2018-09-27 12:38:11,340 INFO  
> 

Re: Question about sharing resource among slots with a TM

2018-09-27 Thread Kostas Kloudas
Hi Vishal,

Currently there is no way to share (user-defined) resources between tasks on 
the same TM.
So I suppose that a singleton is the best way to go for now.

Cheers,
Kostas

> On Sep 27, 2018, at 3:43 AM, Hequn Cheng  wrote:
> 
> Hi vishal,
> 
> Yes, we can define a static connection to reuse it or implement a connection 
> pool. Maybe we can also ask the problem in hbase community and see if there 
> are other better ways.
> 
> Best, Hequn
> 
> 
> On Thu, Sep 27, 2018 at 12:40 AM Vishal Santoshi  > wrote:
> any one? 
> 
> On Wed, Sep 26, 2018 at 9:15 AM Vishal Santoshi  > wrote:
> We use Hbase extensively and the general pattern we follow is acquiring a 
> Connection in the open() method of a RichFunction and closing in the close() 
> method. Of course that implies that if we have a parallelism of n, there will 
> be n Hbase Connections. We want to use the fact that Hbase connection is 
> inherently thread safe 
> http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/client/Connection.html
>  
> 
>   and it is a pretty heavy object to begin with and thus makes sense to share 
> a Connection across slots in a single TM. We could do it through a a static 
> singleton pattern but was wondering if and whether there is an established 
> paradigm for sharing a resource 
> 
> 
> 



Re: Metrics: (1) Histogram in prometheus and (2) meter by event_time

2018-09-27 Thread Kostas Kloudas
Hi Averell,

From what I understand for your use case, it is possible to do what you want 
with Flink.

If you are implementing a function, then you have access to the metric system 
through
the runtime context (see [1] for more information).

Some things to take into consideration:

1) Metrics are not fault-tolerant, so if you need fault-tolerance then you have 
to take care of that 
(e.g. keep them in Flink’s state).

2) Are you sure you want them as metrics and not something like side-output? 
Metrics are more 
supposed to monitor the health of you cluster and more “system” 
characteristics, rather than
business logic or data properties.

Cheers,
Kostas

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html 


> On Sep 27, 2018, at 8:45 AM, Averell  wrote:
> 
> Good day everyone,
> 
> I have a stream with two timestamps (ts1 and ts2) inside each record. My
> event time is ts1. This ts1 has value truncated to a quarter (like 23:30,
> 23:45, 00:00,...) 
> I want to report two metrics:
> 1. A meter which counts number of records per value of ts1. (fig.1)
> 
> 
>  
> 
> 2. A histogram which shows the distribution of the difference between ts1
> and ts2 within each record (fig.2)
> 
>  
> 
> I'm using Prometheus with Grafana. Is that possible to do what I mentioned?
> 
> Thanks and best regards,
> Averell
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Information required regarding SSL algorithms for Flink 1.5.x

2018-09-27 Thread Chesnay Schepler
Please see 
https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/config.html#security-ssl-algorithms

for the SSL algorithms that are available by default for 1.5 .

On 27.09.2018 13:24, V N, Suchithra (Nokia - IN/Bangalore) wrote:


Gentle reminder on this question.

*From:*V N, Suchithra (Nokia - IN/Bangalore)
*Sent:* Monday, September 24, 2018 3:56 PM
*To:* user@flink.apache.org
*Subject:* Information required regarding SSL algorithms for Flink 1.5.x

Hello,

We have a query regarding SSL algorithms available for Flink versions. 
From the documents of Flink 1.6.0 we could see following SSL 
algorithms options are supported.


security.ssl.algorithms: 
TLS_DHE_RSA_WITH_AES_128_GCM_SHA256,TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,TLS_DHE_RSA_WITH_AES_256_GCM_SHA384,TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384


Please let us know if all these options are supported in Flink 1.5.x 
releases as well.


Thanks,

Suchithra





Re: Help: how to get latency value comfortable in Flink1.5?

2018-09-27 Thread Chesnay Schepler

So there's 2 issues here:
1) your reporter configuration is wrong, configuration values for a 
specific reporter are prefixed with "metrics.reporter", not 
"metrics.reporters" (note the "s"). See below for a correct config.


metrics.reporters: varuy
metrics.reporter.varuy.host: tracing115
metrics.reporter.varuy.port: 2003
metrics.reporter.varuy.class: 
org.apache.flink.metrics.graphite.GraphiteReporter

metrics.reporter.varuy.interval: 1 SECONDS
metrics.reporter.varuy.protocol: TCP

2) The reporter only exports data to an external graphite server, so yes 
you have to start that one separately.


On 27.09.2018 08:13, varuy322 wrote:

Hi Till,

I have copy the flink-metrics-graphite-1.5.2.jar to lib/, and restart
flink.It seems make no difference.
When I checked the jobmanager log, the metrics configured in
flink-conf.yaml, could be load since it printed as below:

"2018-09-27 09:50:23,953 INFO
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: metrics.reporters, varuy
2018-09-27 09:50:23,953 INFO
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: metrics.reporters.varuy.host, tracing115
2018-09-27 09:50:23,953 INFO
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: metrics.reporters.varuy.port, 2003
2018-09-27 09:50:23,953 INFO
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: metrics.reporters.varuy.class,
org.apache.flink.metrics.graphite.GraphiteReporter
2018-09-27 09:50:23,954 INFO
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: metrics.reporters.varuy.interval, 1 SECONDS
2018-09-27 09:50:23,954 INFO
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: metrics.reporters.varuy.protocol, TCP"

However, I found the GraphiteReporter do not run correctly for the info
list:

"2018-09-27 09:50:24,938 INFO
org.apache.flink.runtime.metrics.MetricRegistryImpl   - No metrics
reporter configured, no metrics will be exposed/reported."

Should I install Graphite Server independently?

Best Regards & Thanks

Rui,Wang
  




-
stay hungry, stay foolish.
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





New received containers silently lost during job auto restarts

2018-09-27 Thread Paul Lam
Hi,

One of my Flink on YARN jobs got into a weird situation after a fail-fast 
restart. The restart was triggered by loss of a TaskManager, but when the job 
was recovering, one of its subtask (1/24) couldn’t be deployed, and finally 
failed with NoResourceAvailableException. 

Looking into the logs, I find the JobManager was requesting containers for the 
subtask again and again, but indeed it got a container in each round of 
requests, which seemed to be lost silently (with no error logs) or never 
registered. 

At last, I canceled the job and resubmitted it, and the problem was gone. 

I thought it might be something wrong with the TaskManager initialization, but 
if it is the case there would be some errors in JobManagar’s log, right? 

Sorry for not being able to reproduce the problem, but does anyone have any 
idea on this? Thanks a lot!

This is the cause of the job restart:
```
2018-09-27 12:33:11,639 INFO  org.apache.flink.yarn.YarnResourceManager 
- Closing TaskExecutor connection 
container_1536852167599_809133_02_16 because: Container released on a 
*lost* node
2018-09-27 12:33:11,639 INFO  
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Unregister 
TaskManager 554e53380a836229ecbc4ccae58f9b0f from the SlotManager.
2018-09-27 12:33:11,639 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- 
LogAndConfigCoMap -> Filter -> FieldRegexFlatMap -> Log2JsonMap -> Sink: 
LogKafkaSink (11/24) (51fedf378844bc7e5d2855f59baafcd0) switched from RUNNING 
to FAILED.
org.apache.flink.util.FlinkException: The assigned slot 
container_1536852167599_809133_02_16_0 was removed.
at 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlot(SlotManager.java:786)
at 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlots(SlotManager.java:756)
at 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.internalUnregisterTaskManager(SlotManager.java:948)
at 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.unregisterTaskManager(SlotManager.java:372)
at 
org.apache.flink.runtime.resourcemanager.ResourceManager.closeTaskManagerConnection(ResourceManager.java:802)
at 
org.apache.flink.yarn.YarnResourceManager.lambda$onContainersCompleted$0(YarnResourceManager.java:339)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
at 
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
```

And then JobManager kept requesting and returning containers before timeout:
```
2018-09-27 12:38:11,072 INFO  org.apache.flink.yarn.YarnResourceManager 
- Received new container: container_1536852167599_809133_02_000832 
- Remaining pending container requests: 1
2018-09-27 12:38:11,072 INFO  org.apache.flink.yarn.YarnResourceManager 
- Adding keytab 
hdfs://horton2/user/gdc_sa/.flink/application_1536852167599_809133/gdc_sa.keytab
 to the AM container local resource bucket
2018-09-27 12:38:11,338 INFO  org.apache.flink.yarn.YarnResourceManager 
- Creating container launch context for TaskManagers
2018-09-27 12:38:11,338 INFO  org.apache.flink.yarn.YarnResourceManager 
- Starting TaskManagers
2018-09-27 12:38:11,340 INFO  
org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy  - 
Opening proxy : gdc-dn254:8041
2018-09-27 12:38:11,346 INFO  org.apache.flink.yarn.YarnResourceManager 
- Received new container: container_1536852167599_809133_02_000833 
- Remaining pending container requests: 0
2018-09-27 12:38:11,346 INFO  org.apache.flink.yarn.YarnResourceManager 
- Returning 

RE: Information required regarding SSL algorithms for Flink 1.5.x

2018-09-27 Thread V N, Suchithra (Nokia - IN/Bangalore)
Gentle reminder on this question.

From: V N, Suchithra (Nokia - IN/Bangalore)
Sent: Monday, September 24, 2018 3:56 PM
To: user@flink.apache.org
Subject: Information required regarding SSL algorithms for Flink 1.5.x

Hello,

We have a query regarding SSL algorithms available for Flink versions. From the 
documents of Flink 1.6.0 we could see following SSL algorithms options are 
supported.

security.ssl.algorithms: 
TLS_DHE_RSA_WITH_AES_128_GCM_SHA256,TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,TLS_DHE_RSA_WITH_AES_256_GCM_SHA384,TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384
Please let us know if all these options are supported in Flink 1.5.x releases 
as well.

Thanks,
Suchithra




Avro serialization problem after updating to flink 1.6.0

2018-09-27 Thread Mark Harris
Hi,

I recently tried to update a flink job from 1.3.2 to 1.6.0. It deploys
successfully as usual, but logs the following exception shortly after
starting:

Caused by: org.apache.avro.AvroRuntimeException:
avro.shaded.com.google.common.util.concurrent.UncheckedExecutionException:
org.apache.avro.AvroRuntimeException: Not a Specific class: class
uk.co.test.serde.AlertEvent
at
org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:227)
at
org.apache.flink.formats.avro.typeutils.AvroSerializer.initializeAvro(AvroSerializer.java:367)
at
org.apache.flink.formats.avro.typeutils.AvroSerializer.checkAvroInitialized(AvroSerializer.java:357)
at
org.apache.flink.formats.avro.typeutils.AvroSerializer.snapshotConfiguration(AvroSerializer.java:269)
at
org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo.computeSnapshot(RegisteredKeyValueStateBackendMetaInfo.java:241)
at
org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo.snapshot(RegisteredKeyValueStateBackendMetaInfo.java:226)
at
org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.getMetaInfoSnapshot(CopyOnWriteStateTableSnapshot.java:173)
at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy.processSnapshotMetaInfoForAllStates(HeapKeyedStateBackend.java:880)
at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy.performSnapshot(HeapKeyedStateBackend.java:719)
at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.snapshot(HeapKeyedStateBackend.java:355)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:383)
... 13 more
Caused by:
avro.shaded.com.google.common.util.concurrent.UncheckedExecutionException:
org.apache.avro.AvroRuntimeException: Not a Specific class: class
uk.co.test.serde.AlertEvent
at
avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2234)
at
avro.shaded.com.google.common.cache.LocalCache.get(LocalCache.java:3965)
at
avro.shaded.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3969)
at
avro.shaded.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4829)
at
org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:225)
... 23 more
Caused by: org.apache.avro.AvroRuntimeException: Not a Specific class:
class uk.co.AlertEvent
at
org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:285)
at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:218)
at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:215)
at
avro.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3568)
at
avro.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2350)
at
avro.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2313)
at
avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2228)
... 27 more


AlertEvent is a scala case class generated using sbt avrohugger (
https://github.com/julianpeeters/sbt-avrohugger) that definitely implements
SpecificRecordBase.

There has been an Avro verion jump betwen 1.3.2 and 1.6.0, from 1.7.7 to
1.8.2 but I've rebuilt the avro model against Avro 1.8.2 and had a brief
look at the code in SpecificData.create - it seems like it would still have
tried the getDeclaredField("$SCHEMA") check that's throwing.

Any advice on how to figure out what's causing the problem, or work around
it would be gratefully received.

Best regards,

Mark Harris

-- 
hivehome.com 






Hive | London | Cambridge | 
Houston | Toronto
The information contained in or attached to this 
email is confidential and intended only for the use of the individual(s) to 
which it is addressed. It may contain information which is confidential 
and/or covered by legal professional or other privilege. The views 
expressed in this email are not necessarily the views of Centrica plc, and 
the company, its directors, officers or employees make no representation or 
accept any liability for their accuracy or completeness unless expressly 
stated to the contrary. 
Centrica Hive Limited (company no: 5782908), 
registered in England and Wales with its registered office at Millstream, 
Maidenhead Road, Windsor, Berkshire SL4 5GD.




Re: FlinkCEP, circular references and checkpointing failures

2018-09-27 Thread Dawid Wysakowicz
Hi Shailesh,

I am afraid it is gonna be hard to help you, as this branch differs
significantly from 1.4.2 release (I've done diff across your branch and
tag/release-1.4.2). Moreover the code in the branch you've provided
still does not correspond to the lines in the exception you've posted
previously. Could you check if the problem occurs on vanilla flink as well?

Best,

Dawid


On 27/09/18 08:22, Shailesh Jain wrote:
> Hi Dawid,
>
> Yes, it is version 1.4.2. We are running vanilla flink, but have added
> a couple of changes in the CEP operator specifically (top 3 commits
> here: https://github.com/jainshailesh/flink/commits/poc_on_1.4.2).
> Changes I've made to CEP operators do not touch the checkpointing
> path, just overloading the operator for a specific way of handling
> event time.
>
> We are hitting this in production, so I'm not sure it'll be feasible
> to move to 1.6.0 immediately, but eventually yes.
>
> Thanks,
> Shailesh
>
> On Wed, Sep 26, 2018 at 5:44 PM Dawid Wysakowicz
> mailto:dwysakow...@apache.org>> wrote:
>
> Hi Shailesh,
>
> Are you sure you are using version 1.4.2? Do you run a vanilla
> flink, or have you introduced some changes? I am asking cause the
> lines in stacktrace does not align with the source code for 1.4.2.
>
> Also it is a different exception than the one in the issue you've
> linked, so if it is a problem than it is definitely a different
> one. Last thing I would recommend upgrading to the newest version,
> as we rewritten the SharedBuffer implementation in 1.6.0.
>
> Best,
>
> Dawid
>
>
> On 26/09/18 13:50, Shailesh Jain wrote:
>> Hi,
>>
>> I think I've hit this same issue on a 3 node standalone cluster
>> (1.4.2) using HDFS (2.8.4) as state backend.
>>
>> 2018-09-26 17:07:39,370 INFO 
>> org.apache.flink.runtime.taskmanager.Task -
>> Attempting to fail task externally SelectCepOperator (1/1)
>> (3bec4aa1ef2226c4e0c5ff7b3860d340).
>> 2018-09-26 17:07:39,370 INFO 
>> org.apache.flink.runtime.taskmanager.Task -
>> SelectCepOperator (1/1) (3bec4aa1ef2226c4e0c5ff7b3860d340)
>> switched from RUNNING to FAILED.
>> AsynchronousException{java.lang.Exception: Could not materialize
>> checkpoint 6 for operator SelectCepOperator (1/1).}
>>     at
>> 
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:948)
>>     at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>     at
>> 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>     at
>> 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>     at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.Exception: Could not materialize checkpoint
>> 6 for operator SelectCepOperator (1/1).
>>     ... 6 more
>> Caused by: java.util.concurrent.ExecutionException:
>> java.lang.NullPointerException
>>     at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>>     at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>     at
>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>>     at
>> 
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
>>     ... 5 more
>>     Suppressed: java.lang.Exception: Could not properly cancel
>> managed keyed state future.
>>         at
>> 
>> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:91)
>>         at
>> 
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:976)
>>         at
>> 
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:939)
>>         ... 5 more
>>     Caused by: java.util.concurrent.ExecutionException:
>> java.lang.NullPointerException
>>         at
>> java.util.concurrent.FutureTask.report(FutureTask.java:122)
>>         at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>         at
>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>>         at
>> 
>> org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:66)
>>         at
>> 
>> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:89)
>>         ... 7 more
>>     Caused by: java.lang.NullPointerException
>>         at
>> 
>> org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:954)
>>         at
>> 
>> org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:825)
>>         at
>> 

Metrics: (1) Histogram in prometheus and (2) meter by event_time

2018-09-27 Thread Averell
Good day everyone,

I have a stream with two timestamps (ts1 and ts2) inside each record. My
event time is ts1. This ts1 has value truncated to a quarter (like 23:30,
23:45, 00:00,...) 
I want to report two metrics:
 1. A meter which counts number of records per value of ts1. (fig.1)
 

 

 2. A histogram which shows the distribution of the difference between ts1
and ts2 within each record (fig.2)

 

I'm using Prometheus with Grafana. Is that possible to do what I mentioned?

Thanks and best regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: FlinkCEP, circular references and checkpointing failures

2018-09-27 Thread Shailesh Jain
Hi Dawid,

Yes, it is version 1.4.2. We are running vanilla flink, but have added a
couple of changes in the CEP operator specifically (top 3 commits here:
https://github.com/jainshailesh/flink/commits/poc_on_1.4.2). Changes I've
made to CEP operators do not touch the checkpointing path, just overloading
the operator for a specific way of handling event time.

We are hitting this in production, so I'm not sure it'll be feasible to
move to 1.6.0 immediately, but eventually yes.

Thanks,
Shailesh

On Wed, Sep 26, 2018 at 5:44 PM Dawid Wysakowicz 
wrote:

> Hi Shailesh,
>
> Are you sure you are using version 1.4.2? Do you run a vanilla flink, or
> have you introduced some changes? I am asking cause the lines in stacktrace
> does not align with the source code for 1.4.2.
>
> Also it is a different exception than the one in the issue you've linked,
> so if it is a problem than it is definitely a different one. Last thing I
> would recommend upgrading to the newest version, as we rewritten the
> SharedBuffer implementation in 1.6.0.
>
> Best,
>
> Dawid
>
> On 26/09/18 13:50, Shailesh Jain wrote:
>
> Hi,
>
> I think I've hit this same issue on a 3 node standalone cluster (1.4.2)
> using HDFS (2.8.4) as state backend.
>
> 2018-09-26 17:07:39,370 INFO
> org.apache.flink.runtime.taskmanager.Task - Attempting
> to fail task externally SelectCepOperator (1/1)
> (3bec4aa1ef2226c4e0c5ff7b3860d340).
> 2018-09-26 17:07:39,370 INFO
> org.apache.flink.runtime.taskmanager.Task -
> SelectCepOperator (1/1) (3bec4aa1ef2226c4e0c5ff7b3860d340) switched from
> RUNNING to FAILED.
> AsynchronousException{java.lang.Exception: Could not materialize
> checkpoint 6 for operator SelectCepOperator (1/1).}
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:948)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.Exception: Could not materialize checkpoint 6 for
> operator SelectCepOperator (1/1).
> ... 6 more
> Caused by: java.util.concurrent.ExecutionException:
> java.lang.NullPointerException
> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
> at
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
> ... 5 more
> Suppressed: java.lang.Exception: Could not properly cancel managed
> keyed state future.
> at
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:91)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:976)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:939)
> ... 5 more
> Caused by: java.util.concurrent.ExecutionException:
> java.lang.NullPointerException
> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
> at
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
> at
> org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:66)
> at
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:89)
> ... 7 more
> Caused by: java.lang.NullPointerException
> at
> org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:954)
> at
> org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:825)
> at
> org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:888)
> at
> org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:820)
> at
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.writeMappingsInKeyGroup(CopyOnWriteStateTableSnapshot.java:196)
> at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:390)
> at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:339)
> at
> org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
> at
> 

Re: Help: how to get latency value comfortable in Flink1.5?

2018-09-27 Thread varuy322
Hi Till,

I have copy the flink-metrics-graphite-1.5.2.jar to lib/, and restart
flink.It seems make no difference.
When I checked the jobmanager log, the metrics configured in
flink-conf.yaml, could be load since it printed as below:

"2018-09-27 09:50:23,953 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: metrics.reporters, varuy
2018-09-27 09:50:23,953 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: metrics.reporters.varuy.host, tracing115
2018-09-27 09:50:23,953 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: metrics.reporters.varuy.port, 2003
2018-09-27 09:50:23,953 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: metrics.reporters.varuy.class,
org.apache.flink.metrics.graphite.GraphiteReporter
2018-09-27 09:50:23,954 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: metrics.reporters.varuy.interval, 1 SECONDS
2018-09-27 09:50:23,954 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: metrics.reporters.varuy.protocol, TCP"

However, I found the GraphiteReporter do not run correctly for the info
list:

"2018-09-27 09:50:24,938 INFO 
org.apache.flink.runtime.metrics.MetricRegistryImpl   - No metrics
reporter configured, no metrics will be exposed/reported."

Should I install Graphite Server independently?

Best Regards & Thanks

Rui,Wang
 



-
stay hungry, stay foolish.
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/