回复: flink 缓存本地文件被删除疑问

2019-10-10 Thread 戴嘉诚
你好,我的任务是用RocksDB存储的Checkpoint, 是运行了一段时间后报的这个错误

发件人: pengchenglin
发送时间: 2019年10月11日 11:59
收件人: user-zh@flink.apache.org
主题: Re: flink 缓存本地文件被删除疑问

你好,你的任务是用RocksDB存储的Checkpoint吗?任务是每次启动时就报这个错误,还是运行一段时间报这个错误》
 
发件人: 戴嘉诚
发送时间: 2019-10-11 11:00
收件人: user-zh@flink.apache.org
主题: flink 缓存本地文件被删除疑问
大家好:
最近我的程序迁移到了flink1.9 on yarn 
session后,原本有个程序是读取kafka消息,然后key分组,window聚集一秒后,批量异步写入hbase中经常发生异常,其中异常为:
java.lang.Exception: Exception while creating StreamOperatorStateContext.
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:253)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:878)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:392)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state 
backend for WindowOperator_3572e2ca02acab3056013a7c1c4fa645_(6/6) from any of 
the 1 provided restore options.
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:307)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135)
... 6 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught 
unexpected exception.
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:326)
at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:520)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:291)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
... 8 more
Caused by: java.nio.file.NoSuchFileException: 
/data/hadoop/yarn/local/usercache/hdfs/appcache/application_1569065892199_0031/flink-io-6bcd299a-00e7-4f6a-8466-09d8d13d831d/job_add9718b9fe3e5054577d8b9e5799829_op_WindowOperator_3572e2ca02acab3056013a7c1c4fa645__6_6__uuid_53528904-c829-431a-ba0a-b76041cd2d2b/db/000835.sst
 -> 
/data/hadoop/yarn/local/usercache/hdfs/appcache/application_1569065892199_0031/flink-io-6bcd299a-00e7-4f6a-8466-09d8d13d831d/job_add9718b9fe3e5054577d8b9e5799829_op_WindowOperator_3572e2ca02acab3056013a7c1c4fa645__6_6__uuid_53528904-c829-431a-ba0a-b76041cd2d2b/14634105-83be-429e-9cdc-36dbff14a7fa/000835.sst
at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
at 
sun.nio.fs.UnixFileSystemProvider.createLink(UnixFileSystemProvider.java:476)
at java.nio.file.Files.createLink(Files.java:1086)
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreInstanceDirectoryFromPath(RocksDBIncrementalRestoreOperation.java:473)
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromLocalState(RocksDBIncrementalRestoreOperation.java:212)
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:188)
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:162)
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:148)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:270)
... 12 more
 
 
我这里的session中是有多个job在session上运行,但是这里job都是写入hbase中的。
这里看上去是本地机器上的目录文件被删除了?但是这个文件我是没有删除过的…
 
 



Re: Where are uploaded Job jars stored?

2019-10-10 Thread Steven Nelson
John,

I think you are referring to the web upload directory. There is a setting for 
that folder ‘web.upload.dir’. If you set that to a folder writeable to both 
masters it will work as desired. I used an NFS mount (AWS EFS).

-Steve


Sent from my iPhone

> On Oct 10, 2019, at 10:11 PM, Zhu Zhu  wrote:
> 
> Hi John,
> 
> Not sure why you need to know the location of uploaded job jars?
> 
> The job jar will be automatically localized to a taskmanager via BlobService 
> when a task belonging to the job is running on that taskmanager.
> The localization dir is blob.storage.directory. If it is not specified, it 
> will be java.io.tmpdir in standalone mode.
> 
> Thanks,
> Zhu Zhu 
> 
> John Smith  于2019年10月11日周五 上午2:41写道:
>> And can that folder be shared so that all nodes see it?
>> 
>>> On Thu, 10 Oct 2019 at 14:36, Yun Tang  wrote:
>>> Hi John
>>> 
>>> The jar is not stored in HA path, I think the answer [1] could help you.
>>> 
>>> [1] 
>>> https://stackoverflow.com/questions/51936608/where-can-i-find-my-jar-on-apache-flink-server-which-i-submitted-using-apache-fl
>>> 
>>> Best
>>> Yun Tang
>>> From: John Smith 
>>> Sent: Friday, October 11, 2019 2:06
>>> To: user 
>>> Subject: Where are uploaded Job jars stored?
>>>  
>>> Hi using 1.8.0 running on standalone cluster with Zookeeper HA.
>>> 
>>> Are job JARs stored at: high-availability.storageDir ???
>>> 
>>> The thing is when you browse the individual nodes at port 8080 to go submit 
>>> the job only the node where you uploaded the JAR has it.
>>> 
>>> - Go to any given node
>>> - Upload a jar
>>> - Browse another node
>>> - Jar is not there.
>>> 
>>> 


Re: Fencing token exceptions from Job Manager High Availability mode

2019-10-10 Thread Joshua Fan
Sorry to forget the version, we run flink 1.7 on yarn in a ha mode.

On Fri, Oct 11, 2019 at 12:02 PM Joshua Fan  wrote:

> Hi Till
>
> After got your advice, I checked the log again. It seems not wholely the
> same as the condition you mentioned.
>
> I would like to summarize the story in the belowed log.
>
> Once a time, the zk connection  was not stable, so there happened 3 times
> suspended-reconnected.
>
> After the first suspended-reconnected, the Minidispatcher tried to recover
> all jobs.
>
> Then the second suspended-reconnected came, after this reconnected, there
> happened a 'The heartbeat of JobManager with id
> dbad79e0173c5658b029fba4d70e8084 timed out', and in this turn, the
> Minidispatcher didn't try to recover the job.
>
> Due to the zk connection did not recover, the third suspended-reconnected
> came, after the zk reconnected, the Minidispatcher did not try to recover
> job ,but just repeated throw FencingTokenException, the AM was hanging, our
> alarm-system just
> found the job was gone, but can not get a final state of the job. And the
> FencingTokenException was ongoing for nearly one day long before we killed
> the AM.
>
> the whole log is attached.
>
> Thanks
>
> Joshua
>
> On Fri, Oct 11, 2019 at 10:35 AM Hanson, Bruce 
> wrote:
>
>> Hi Till and Fabian,
>>
>>
>>
>> My apologies for taking a week to reply; it took some time to reproduce
>> the issue with debug logging. I’ve attached logs from a two minute period
>> when the problem happened. I’m just sending this to you two to avoid
>> sending the log file all over the place. If you’d like to have our
>> conversation in the user group mailing list, that’s fine.
>>
>>
>>
>> The job was submitted by using the job manager REST api starting at
>> 20:33:46.262 and finishing at 20:34:01.547. This worked normally, and the
>> job started running. We then run a monitor that polls the /overview
>> endpoint of the JM REST api. This started polling at 20:34:31.380 and
>> resulted in the JM throwing the FencingTokenException at 20:34:31:393, and
>> the JM returned a 500 to our monitor. This will happen every time we poll
>> until the monitor times out and then we tear down the cluster, even though
>> the job is running, we can’t tell that it is. This is somewhat rare,
>> happening maybe 5% of the time.
>>
>>
>>
>> We’re running Flink 1.7.1. This issue only happens when we run in Job
>> Manager High Availability mode. We provision two Job Managers, a 3-node
>> zookeeper cluster, task managers and our monitor all in their own
>> Kubernetes namespace. I can send you Zookeeper logs too if that would be
>> helpful.
>>
>>
>>
>> Thanks in advance for any help you can provide!
>>
>>
>>
>> -Bruce
>>
>> --
>>
>>
>>
>>
>>
>> *From: *Till Rohrmann 
>> *Date: *Wednesday, October 2, 2019 at 6:10 AM
>> *To: *Fabian Hueske 
>> *Cc: *"Hanson, Bruce" , "user@flink.apache.org" <
>> user@flink.apache.org>
>> *Subject: *Re: Fencing token exceptions from Job Manager High
>> Availability mode
>>
>>
>>
>> Hi Bruce, are you able to provide us with the full debug logs? From the
>> excerpt itself it is hard to tell what is going on.
>>
>>
>>
>> Cheers,
>>
>> Till
>>
>>
>>
>> On Wed, Oct 2, 2019 at 2:24 PM Fabian Hueske  wrote:
>>
>> Hi Bruce,
>>
>>
>>
>> I haven't seen such an exception yet, but maybe Till (in CC) can help.
>>
>>
>>
>> Best,
>>
>> Fabian
>>
>>
>>
>> Am Di., 1. Okt. 2019 um 05:51 Uhr schrieb Hanson, Bruce <
>> bruce.han...@here.com>:
>>
>> Hi all,
>>
>>
>>
>> We are running some of our Flink jobs with Job Manager High Availability.
>> Occasionally we get a cluster that comes up improperly and doesn’t respond.
>> Attempts to submit the job seem to hang and when we hit the /overview REST
>> endpoint in the Job Manager we get a 500 error and a fencing token
>> exception like this:
>>
>>
>>
>> *2019-09-21 05:04:07.785 [flink-akka.actor.default-dispatcher-4428]
>> level=ERROR o.a.f.runtime.rest.handler.cluster.ClusterOverviewHandler  -
>> Implementation error: Unhandled exception.*
>>
>> *org.apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing
>> token not set: Ignoring message LocalFencedMessage(null,
>> LocalRpcInvocation(requestResourceOverview(Time))) sent to
>> akka.tcp://fl...@job-ef80a156-3350-4e85-8761-b0e42edc346f-jm-0.job-ef80a156-3350-4e85-8761-b0e42edc346f-jm-svc.olp-here-test-j-ef80a156-3350-4e85-8761-b0e42edc346f.svc.cluster.local:6126/user/resourcemanager
>> because the fencing token is null.*
>>
>> *at
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:59)*
>>
>> *at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)*
>>
>> *at
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)*
>>
>> *at
>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)*
>>
>> *at akka.actor.Actor$class.aroundReceive(Actor.scala:502)*
>>
>> *at 

Re: Challenges Deploying Flink With Savepoints On Kubernetes

2019-10-10 Thread Yun Tang
Hi Hao

It seems that I misunderstood the background of usage for your cases. High 
availability configuration targets for fault tolerance not for general 
development evolution. If you want to change your job topology, just follow the 
general rule to restore from savepoint/checkpoint, do not rely on HA to do job 
migration things.

Best
Yun Tang

From: Hao Sun 
Sent: Friday, October 11, 2019 8:33
To: Yun Tang 
Cc: Vijay Bhaskar ; Yang Wang 
; Sean Hester ; Aleksandar 
Mastilovic ; Yuval Itzchakov ; 
user 
Subject: Re: Challenges Deploying Flink With Savepoints On Kubernetes

Yep I know that option. That's where get me confused as well. In a HA setup, 
where do I supply this option (allowNonRestoredState)?
This option requires a savepoint path when I start a flink job I remember. And 
HA does not require the path

Hao Sun


On Thu, Oct 10, 2019 at 11:16 AM Yun Tang 
mailto:myas...@live.com>> wrote:
Just a minor supplement @Hao Sun, if you decided to 
drop a operator, don't forget to add --allowNonRestoredState (short: -n) option 
[1]


[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#allowing-non-restored-state

Best
Yun Tang


From: Vijay Bhaskar mailto:bhaskar.eba...@gmail.com>>
Sent: Thursday, October 10, 2019 19:24
To: Yang Wang mailto:danrtsey...@gmail.com>>
Cc: Sean Hester 
mailto:sean.hes...@bettercloud.com>>; Aleksandar 
Mastilovic mailto:amastilo...@sightmachine.com>>; 
Yun Tang mailto:myas...@live.com>>; Hao Sun 
mailto:ha...@zendesk.com>>; Yuval Itzchakov 
mailto:yuva...@gmail.com>>; user 
mailto:user@flink.apache.org>>
Subject: Re: Challenges Deploying Flink With Savepoints On Kubernetes

Thanks Yang. We will try and let you know if any issues arise

Regards
Bhaskar

On Thu, Oct 10, 2019 at 1:53 PM Yang Wang 
mailto:danrtsey...@gmail.com>> wrote:
@ Hao Sun,
I have made a confirmation that even we change parallelism and/or modify 
operators, add new operators,
the flink cluster could also recover from latest checkpoint.

@ Vijay
a) Some individual jobmanager/taskmanager crashed exceptionally(someother 
jobmanagers
and taskmanagers are alive), it could recover from the latest checkpoint.
b) All jobmanagers and taskmanagers fails, it could still recover from the 
latest checkpoint if the cluster-id
is not changed.

When we enable the HA, The meta of jobgraph and checkpoint is saved on 
zookeeper and the real files are save
on high-availability storage(HDFS). So when the flink application is submitted 
again with same cluster-id, it could
recover jobs and checkpoint from zookeeper. I think it has been supported for a 
long time. Maybe you could have a
try with flink-1.8 or 1.9.

Best,
Yang


Vijay Bhaskar mailto:bhaskar.eba...@gmail.com>> 
于2019年10月10日周四 下午2:26写道:
Thanks Yang and Sean. I have couple of questions:

1) Suppose the scenario of , bringing back entire cluster,
 a) In that case, at least one job manager out of HA group should be up and 
running right? or
 b) All the job managers fails, then also this works? In that case please 
let me know the procedure/share the documentation?
 How to start from previous check point?
 What Flink version onwards this feature is stable?

Regards
Bhaskar


On Wed, Oct 9, 2019 at 8:51 AM Yang Wang 
mailto:danrtsey...@gmail.com>> wrote:
Hi Vijay,

If you are using HA solution, i think you do not need to specify the savepoint. 
Instead the checkpoint is used.
The checkpoint is done automatically and periodically based on your 
configuration.When the
jobmanager/taskmanager fails or the whole cluster crashes, it could always 
recover from the latest
checkpoint. Does this meed your requirement?

Best,
Yang

Sean Hester mailto:sean.hes...@bettercloud.com>> 
于2019年10月1日周二 上午1:47写道:
Vijay,

That is my understanding as well: the HA solution only solves the problem up to 
the point all job managers fail/restart at the same time. That's where my 
original concern was.

But to Aleksandar and Yun's point, running in HA with 2 or 3 Job Managers per 
cluster--as long as they are all deployed to separate GKE nodes--would provide 
a very high uptime/low failure rate, at least on paper. It's a promising enough 
option that we're going to run in HA for a month or two and monitor results 
before we put in any extra work to customize the savepoint start-up behavior.

On Fri, Sep 27, 2019 at 2:24 AM Vijay Bhaskar 
mailto:bhaskar.eba...@gmail.com>> wrote:
I don't think HA will help to recover from cluster crash, for that we should 
take periodic savepoint right? Please correct me in case i am wrong

Regards
Bhaskar

On Fri, Sep 27, 2019 at 11:48 AM Vijay Bhaskar 
mailto:bhaskar.eba...@gmail.com>> wrote:
Suppose my cluster got crashed and need to bring up the entire cluster back? 
Does HA still helps to run the cluster from latest save point?

Regards
Bhaskar

On Thu, Sep 26, 2019 at 7:44 PM Sean Hester 

Re: Batch Job in a Flink 1.9 Standalone Cluster

2019-10-10 Thread Xintong Song
I think it depends on your configurations.
- Are you using on-heap/off-heap managed memory? (configured by
'taskmanager.memory.off-heap', by default is false)

- Is managed memory pre-allocated? (configured by
'taskmanager.memory.preallocate', by default is ffalse)


If managed memory is pre-allocated, then the allocated memory segments will
never be released. If it's not pre-allocated, memory segments should be
released when the task is finished, but the actual memory will not be
de-allocated until next GC. Since the job is finished, there may not be
enough heap activities to trigger the GC. If on-heap memory is used, you
may not be able to observe the decreasing of TM memory usage, because JVM
heap size does not scale down. Only if off-heap memory is used, you might
be able to observe the decreasing of TM memory usage after a GC, but not
from a jmap dump because jmap dumps heap memory usage only.


Besides, I don't think you need to worry about whether memory is released
after one job is finished. Sometimes flink/jvm do not release memory after
jobs/tasks finished, so that it can be reused directly by other jobs/tasks,
for the purpose of reducing allocate/deallocated overheads and optimizing
performance.


Thank you~

Xintong Song



On Thu, Oct 10, 2019 at 7:55 PM Timothy Victor  wrote:

> After a batch job finishes in a flink standalone cluster, I notice that
> the memory isn't freed up.   I understand Flink uses it's own memory
> manager and just allocates a large tenured byte array that is not GC'ed.
>  But does the memory used in this byte array get released when the batch
> job is done?
>
> The scenario I am facing is that I am running a series of scheduled batch
> jobs on a standalone cluster with 1 TM and 1 Slot.  I notice that after a
> job is complete the memory used in the TM isn't freed up.  I can confirm
> this by running  jmap dump.
>
> Has anyone else run into this issue?   This is on 1.9.
>
> Thanks
>
> Tim
>


Re: Fencing token exceptions from Job Manager High Availability mode

2019-10-10 Thread Joshua Fan
Hi Till

After got your advice, I checked the log again. It seems not wholely the
same as the condition you mentioned.

I would like to summarize the story in the belowed log.

Once a time, the zk connection  was not stable, so there happened 3 times
suspended-reconnected.

After the first suspended-reconnected, the Minidispatcher tried to recover
all jobs.

Then the second suspended-reconnected came, after this reconnected, there
happened a 'The heartbeat of JobManager with id
dbad79e0173c5658b029fba4d70e8084 timed out', and in this turn, the
Minidispatcher didn't try to recover the job.

Due to the zk connection did not recover, the third suspended-reconnected
came, after the zk reconnected, the Minidispatcher did not try to recover
job ,but just repeated throw FencingTokenException, the AM was hanging, our
alarm-system just
found the job was gone, but can not get a final state of the job. And the
FencingTokenException was ongoing for nearly one day long before we killed
the AM.

the whole log is attached.

Thanks

Joshua

On Fri, Oct 11, 2019 at 10:35 AM Hanson, Bruce 
wrote:

> Hi Till and Fabian,
>
>
>
> My apologies for taking a week to reply; it took some time to reproduce
> the issue with debug logging. I’ve attached logs from a two minute period
> when the problem happened. I’m just sending this to you two to avoid
> sending the log file all over the place. If you’d like to have our
> conversation in the user group mailing list, that’s fine.
>
>
>
> The job was submitted by using the job manager REST api starting at
> 20:33:46.262 and finishing at 20:34:01.547. This worked normally, and the
> job started running. We then run a monitor that polls the /overview
> endpoint of the JM REST api. This started polling at 20:34:31.380 and
> resulted in the JM throwing the FencingTokenException at 20:34:31:393, and
> the JM returned a 500 to our monitor. This will happen every time we poll
> until the monitor times out and then we tear down the cluster, even though
> the job is running, we can’t tell that it is. This is somewhat rare,
> happening maybe 5% of the time.
>
>
>
> We’re running Flink 1.7.1. This issue only happens when we run in Job
> Manager High Availability mode. We provision two Job Managers, a 3-node
> zookeeper cluster, task managers and our monitor all in their own
> Kubernetes namespace. I can send you Zookeeper logs too if that would be
> helpful.
>
>
>
> Thanks in advance for any help you can provide!
>
>
>
> -Bruce
>
> --
>
>
>
>
>
> *From: *Till Rohrmann 
> *Date: *Wednesday, October 2, 2019 at 6:10 AM
> *To: *Fabian Hueske 
> *Cc: *"Hanson, Bruce" , "user@flink.apache.org" <
> user@flink.apache.org>
> *Subject: *Re: Fencing token exceptions from Job Manager High
> Availability mode
>
>
>
> Hi Bruce, are you able to provide us with the full debug logs? From the
> excerpt itself it is hard to tell what is going on.
>
>
>
> Cheers,
>
> Till
>
>
>
> On Wed, Oct 2, 2019 at 2:24 PM Fabian Hueske  wrote:
>
> Hi Bruce,
>
>
>
> I haven't seen such an exception yet, but maybe Till (in CC) can help.
>
>
>
> Best,
>
> Fabian
>
>
>
> Am Di., 1. Okt. 2019 um 05:51 Uhr schrieb Hanson, Bruce <
> bruce.han...@here.com>:
>
> Hi all,
>
>
>
> We are running some of our Flink jobs with Job Manager High Availability.
> Occasionally we get a cluster that comes up improperly and doesn’t respond.
> Attempts to submit the job seem to hang and when we hit the /overview REST
> endpoint in the Job Manager we get a 500 error and a fencing token
> exception like this:
>
>
>
> *2019-09-21 05:04:07.785 [flink-akka.actor.default-dispatcher-4428]
> level=ERROR o.a.f.runtime.rest.handler.cluster.ClusterOverviewHandler  -
> Implementation error: Unhandled exception.*
>
> *org.apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing
> token not set: Ignoring message LocalFencedMessage(null,
> LocalRpcInvocation(requestResourceOverview(Time))) sent to
> akka.tcp://fl...@job-ef80a156-3350-4e85-8761-b0e42edc346f-jm-0.job-ef80a156-3350-4e85-8761-b0e42edc346f-jm-svc.olp-here-test-j-ef80a156-3350-4e85-8761-b0e42edc346f.svc.cluster.local:6126/user/resourcemanager
> because the fencing token is null.*
>
> *at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:59)*
>
> *at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)*
>
> *at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)*
>
> *at
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)*
>
> *at akka.actor.Actor$class.aroundReceive(Actor.scala:502)*
>
> *at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)*
>
> *at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)*
>
> *at akka.actor.ActorCell.invoke(ActorCell.scala:495)*
>
> *at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)*
>
> *at 

Re: flink 缓存本地文件被删除疑问

2019-10-10 Thread pengchenglin
你好,你的任务是用RocksDB存储的Checkpoint吗?任务是每次启动时就报这个错误,还是运行一段时间报这个错误》
 
发件人: 戴嘉诚
发送时间: 2019-10-11 11:00
收件人: user-zh@flink.apache.org
主题: flink 缓存本地文件被删除疑问
大家好:
最近我的程序迁移到了flink1.9 on yarn 
session后,原本有个程序是读取kafka消息,然后key分组,window聚集一秒后,批量异步写入hbase中经常发生异常,其中异常为:
java.lang.Exception: Exception while creating StreamOperatorStateContext.
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:253)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:878)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:392)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state 
backend for WindowOperator_3572e2ca02acab3056013a7c1c4fa645_(6/6) from any of 
the 1 provided restore options.
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:307)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135)
... 6 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught 
unexpected exception.
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:326)
at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:520)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:291)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
... 8 more
Caused by: java.nio.file.NoSuchFileException: 
/data/hadoop/yarn/local/usercache/hdfs/appcache/application_1569065892199_0031/flink-io-6bcd299a-00e7-4f6a-8466-09d8d13d831d/job_add9718b9fe3e5054577d8b9e5799829_op_WindowOperator_3572e2ca02acab3056013a7c1c4fa645__6_6__uuid_53528904-c829-431a-ba0a-b76041cd2d2b/db/000835.sst
 -> 
/data/hadoop/yarn/local/usercache/hdfs/appcache/application_1569065892199_0031/flink-io-6bcd299a-00e7-4f6a-8466-09d8d13d831d/job_add9718b9fe3e5054577d8b9e5799829_op_WindowOperator_3572e2ca02acab3056013a7c1c4fa645__6_6__uuid_53528904-c829-431a-ba0a-b76041cd2d2b/14634105-83be-429e-9cdc-36dbff14a7fa/000835.sst
at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
at 
sun.nio.fs.UnixFileSystemProvider.createLink(UnixFileSystemProvider.java:476)
at java.nio.file.Files.createLink(Files.java:1086)
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreInstanceDirectoryFromPath(RocksDBIncrementalRestoreOperation.java:473)
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromLocalState(RocksDBIncrementalRestoreOperation.java:212)
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:188)
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:162)
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:148)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:270)
... 12 more
 
 
我这里的session中是有多个job在session上运行,但是这里job都是写入hbase中的。
这里看上去是本地机器上的目录文件被删除了?但是这个文件我是没有删除过的…
 
 


Re: Batch Job in a Flink 1.9 Standalone Cluster

2019-10-10 Thread Yang Wang
Hi Tim,

Do you mean the user heap memory used by the tasks of finished jobs is not
freed up? If this is the case,
the memory usage of taskmanger will increase as more and more jobs
finished. However this does not
happen, the memory will be freed up by jvm gc.

BTW, flink has its own memory management strategy, including task
heap/off-heap, framework heap, jvm
overhead and so on. Why do you care about when the memory is freed? I think
it will be done automatically
by flink and jvm.

Best,
Yang

Timothy Victor  于2019年10月10日周四 下午7:55写道:

> After a batch job finishes in a flink standalone cluster, I notice that
> the memory isn't freed up.   I understand Flink uses it's own memory
> manager and just allocates a large tenured byte array that is not GC'ed.
>  But does the memory used in this byte array get released when the batch
> job is done?
>
> The scenario I am facing is that I am running a series of scheduled batch
> jobs on a standalone cluster with 1 TM and 1 Slot.  I notice that after a
> job is complete the memory used in the TM isn't freed up.  I can confirm
> this by running  jmap dump.
>
> Has anyone else run into this issue?   This is on 1.9.
>
> Thanks
>
> Tim
>


Re: Where are uploaded Job jars stored?

2019-10-10 Thread Zhu Zhu
Hi John,

Not sure why you need to know the location of uploaded job jars?

The job jar will be automatically localized to a taskmanager via
BlobService when a task belonging to the job is running on that taskmanager.
The localization dir is blob.storage.directory. If it is not specified, it
will be java.io.tmpdir in standalone mode.

Thanks,
Zhu Zhu

John Smith  于2019年10月11日周五 上午2:41写道:

> And can that folder be shared so that all nodes see it?
>
> On Thu, 10 Oct 2019 at 14:36, Yun Tang  wrote:
>
>> Hi John
>>
>> The jar is not stored in HA path, I think the answer [1] could help you.
>>
>> [1]
>> https://stackoverflow.com/questions/51936608/where-can-i-find-my-jar-on-apache-flink-server-which-i-submitted-using-apache-fl
>>
>> Best
>> Yun Tang
>> --
>> *From:* John Smith 
>> *Sent:* Friday, October 11, 2019 2:06
>> *To:* user 
>> *Subject:* Where are uploaded Job jars stored?
>>
>> Hi using 1.8.0 running on standalone cluster with Zookeeper HA.
>>
>> Are job JARs stored at: high-availability.storageDir ???
>>
>> The thing is when you browse the individual nodes at port 8080 to go
>> submit the job only the node where you uploaded the JAR has it.
>>
>> - Go to any given node
>> - Upload a jar
>> - Browse another node
>> - Jar is not there.
>>
>>
>>


Re: Flink restoring a job from a checkpoint

2019-10-10 Thread Yun Tang
Any checkpoint could only completed if your job not failed. Since checkpoint 
barrier is injected with messages together, if the problematic message would 
cause your job to fail. You cannot complete any checkpoint after that 
problematic message processed. In other words, you could always resume your job 
from kafka offset before that problematic message.

Best
Yun Tang

From: Flavio Pompermaier 
Sent: Friday, October 11, 2019 5:50
To: Yun Tang 
Cc: Congxian Qiu ; theo.diefent...@scoop-software.de 
; user 
Subject: Re: Flink restoring a job from a checkpoint

Sorry for the dumb question but let's suppose to not use retained checkpoint 
and my job processed billions of messages from Kafka. Then a problematic 
message causes my job to fail..am I able to complete a savepoint to fic the job 
and restart from the problematic message (i.e. last "valid" kafka offset)?

Il Gio 10 Ott 2019, 20:01 Yun Tang mailto:myas...@live.com>> 
ha scritto:
Hi Vishwas

Image this scenario, if your last committed offset is A with a savepoint-A and 
then you just stop this job and try a new program logical such as print your 
output instead of writing to previous sink to do some experiments. The new 
experimental job might commit offset-B to kafka. Once verified, and then you 
still need to resume from kafka offset-A to ensure all data has been written to 
target sink. This would be easier If you just restore the job from savepoint-A.

In other words, Flink has already provided a more strong and flexible mechanism 
to resume kafka offsets, why not use this?

Best
Yun Tang

From: Congxian Qiu mailto:qcx978132...@gmail.com>>
Sent: Thursday, October 10, 2019 11:52
To: theo.diefent...@scoop-software.de 
mailto:theo.diefent...@scoop-software.de>>
Cc: user mailto:user@flink.apache.org>>
Subject: Re: Flink restoring a job from a checkpoint

Hi Vishwas

Sorry for the confusing, what Theo said previous is the meaning I want to say.  
Previously, what I said is from Flink's side, if we do not restore from 
checkpoint/savepoint, all the TMs will have no state, so the Job starts from 
scratch.

Best,
Congxian


theo.diefent...@scoop-software.de 
mailto:theo.diefent...@scoop-software.de>> 
于2019年10月10日周四 上午1:15写道:
Hi Vishaws,

With "from scratch", Congxian means that Flink won't load any state 
automatically and starts as if there was no state. Of course if the kafka 
consumer group already exists and you have configured Flink to start from group 
offsets if there is no state yet, it will start from the group offsets.

I think your approach is totally fine. Ignoring savepoints and don't retaining 
checkpoints saves overhead and configuration burdens and works nicely as long 
as you don't have any state in your pipeline.

You should however be certain that nobody in your team will add something with 
state later on and forgets to think about the missing state...

Best regards
Theo




 Ursprüngliche Nachricht 
Betreff: Re: Flink restoring a job from a checkpoint
Von: Vishwas Siravara
An: Congxian Qiu
Cc: Yun Tang ,user

Hi Congxian,
Thanks for getting back. Why would the streaming start from scratch if my 
consumer group does not change ? I start from the group offsets : 
env.addSource(consumer.setStartFromGroupOffsets()).name(source + "- kafka 
source")
So when I restart the job it should consume from the last committed offset to 
kafka isn't it ? Let me know what you think .

Best,
Vishwas
On Tue, Oct 8, 2019 at 9:06 PM Congxian Qiu 
mailto:qcx978132...@gmail.com>> wrote:
Hi Vishwas

Currently, Flink can only restore retained checkpoint or savepoint with 
parameter `-s`[1][2], otherwise, it will start from scratch.

```
checkpoint---> bin/flink run -s :checkpointMetaDataPath [:runArgs]
savepoint --> bin/flink run -s :savepointPath [:runArgs]

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint
[2] 
https://ci.apache.org/projects/flink/flink-docs-master/ops/state/savepoints.html#resuming-from-savepoints

Best,
Congxian


Vishwas Siravara mailto:vsirav...@gmail.com>> 
于2019年10月9日周三 上午5:07写道:
Hi Yun,
Thanks for your reply. I do start from GROUP_OFFSET . Here is the code snippet :

env.addSource(consumer.setStartFromGroupOffsets()).name(source + "- kafka 
source")

I have also enabled and externalized checkpointing to S3 .
Why is it not recommended to just restart the job once I cancel it, as long as 
the topology does not change? What is the advantage of explicitly restoring 
from last checkpoint by passing the -s option to the flink command line if it 
does the same thing? For instance if 
s3://featuretoolkit.checkpoints/qa_streaming/c17f2cb6da5e6cbc897410fe49676edd/chk-1350/
 is my last successful checkpoint, what is the difference between 1 and 2.

1. /usr/mware/flink/bin/flink run -d -C 

Re: Flink 1.8.0 S3 Checkpoints fail with java.security.ProviderException: Could not initialize NSS

2019-10-10 Thread Hao Sun
I saw similar issue when using alpine linux.
https://pkgs.alpinelinux.org/package/v3.3/main/x86/nss

Installing this package fixed my problem

Hao Sun


On Thu, Oct 10, 2019 at 3:46 PM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:

> Hi there,
>
> I'm getting the following error message on a Flink 1.8 cluster deployed on
> Kubernetes. I've already confirmed that the pod has access to S3 and write
> permissions to the bucket, but I can't understand what the SSL issue is and
> if it is related to S3 or not. I have tried both with the default state
> backend and with rocksdb. It happens immediately upon triggering a
> savepoint. Has anyone seen errors like this?
>
> Thank you!
> Austin
>
>
> 2019-10-10T22:21:36.496009042Z 2019-10-10 22:21:36,495 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] -
> Triggering checkpoint 1 @ 1570746096485 for job
> 32a9a430038d440cbfee808101dcccd1.
> 2019-10-10T22:21:36.871364673Z 2019-10-10 22:21:36,858 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph   [] -
> Window(GlobalWindows(), DeltaTrigger, TimeEvictor, ComparableAggregator,
> PassThroughWindowFunction) -> Sink: Print to Std. Out (1/1)
> (6cb94a0782212895ac1e062b4124e425) switched from RUNNING to FAILED.
> 2019-10-10T22:21:36.87141613Z java.lang.ExceptionInInitializerError: null
> 2019-10-10T22:21:36.871422053Z  at sun.security.ssl.SSLSessionImpl.(
> http://SSLSessionImpl.java:183
> )
> ~[?:1.8.0_181]
> 2019-10-10T22:21:36.87142623Z   at sun.security.ssl.SSLSessionImpl.(
> http://SSLSessionImpl.java:148
> )
> ~[?:1.8.0_181]
> 2019-10-10T22:21:36.871429991Z  at
> sun.security.ssl.SSLSessionImpl.(http://SSLSessionImpl.java:79
> )
> ~[?:1.8.0_181]
> 2019-10-10T22:21:36.871434248Z  at
> sun.security.ssl.SSLSocketImpl.init(SSLSocketImpl.java:604) ~[?:1.8.0_181]
> 2019-10-10T22:21:36.871450145Z  at sun.security.ssl.SSLSocketImpl.(
> http://SSLSocketImpl.java:572
> )
> ~[?:1.8.0_181]
> 2019-10-10T22:21:36.871454174Z  at
> sun.security.ssl.SSLSocketFactoryImpl.createSocket(SSLSocketFactoryImpl.java:110)
> ~[?:1.8.0_181]
> 2019-10-10T22:21:36.871457724Z  at
> org.apache.flink.fs.s3base.shaded.org.apache.http.conn.ssl.SSLConnectionSocketFactory.createLayeredSocket(SSLConnectionSocketFactory.java:365)
> ~[flink-s3-fs-presto-1.8.0-stream1.jar:1.8.0-stream1]
> 2019-10-10T22:21:36.871461103Z  at
> org.apache.flink.fs.s3base.shaded.org.apache.http.conn.ssl.SSLConnectionSocketFactory.connectSocket(SSLConnectionSocketFactory.java:355)
> ~[flink-s3-fs-presto-1.8.0-stream1.jar:1.8.0-stream1]
> 2019-10-10T22:21:36.871465705Z  at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.conn.ssl.SdkTLSSocketFactory.connectSocket(SdkTLSSocketFactory.java:132)
> ~[flink-s3-fs-presto-1.8.0-stream1.jar:1.8.0-stream1]
> 2019-10-10T22:21:36.87146981Z   at
> org.apache.flink.fs.s3base.shaded.org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:142)
> ~[flink-s3-fs-presto-1.8.0-stream1.jar:1.8.0-stream1]
> 2019-10-10T22:21:36.871474533Z  at
> org.apache.flink.fs.s3base.shaded.org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:359)
> ~[flink-s3-fs-presto-1.8.0-stream1.jar:1.8.0-stream1]
> 2019-10-10T22:21:36.871494299Z  at
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_181]
> 2019-10-10T22:21:36.87149796Z   at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> ~[?:1.8.0_181]
> 2019-10-10T22:21:36.871502637Z  at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> ~[?:1.8.0_181]
> 2019-10-10T22:21:36.871506464Z  at
> java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_181]
> 2019-10-10T22:21:36.871510239Z  at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.conn.ClientConnectionManagerFactory$Handler.invoke(ClientConnectionManagerFactory.java:76)
> ~[flink-s3-fs-presto-1.8.0-stream1.jar:1.8.0-stream1]
> 2019-10-10T22:21:36.871513871Z  at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.conn.$Proxy19.connect(Unknown
> Source) ~[?:1.8.0-stream1]
> 2019-10-10T22:21:36.871516965Z  at
> org.apache.flink.fs.s3base.shaded.org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:381)
> ~[flink-s3-fs-presto-1.8.0-stream1.jar:1.8.0-stream1]
> 2019-10-10T22:21:36.871520624Z  at
> org.apache.flink.fs.s3base.shaded.org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:237)
> ~[flink-s3-fs-presto-1.8.0-stream1.jar:1.8.0-stream1]
> 2019-10-10T22:21:36.87152418Z   at
> org.apache.flink.fs.s3base.shaded.org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:185)
> ~[flink-s3-fs-presto-1.8.0-stream1.jar:1.8.0-stream1]
> 2019-10-10T22:21:36.871527877Z  at
> 

Re: Challenges Deploying Flink With Savepoints On Kubernetes

2019-10-10 Thread Hao Sun
Yep I know that option. That's where get me confused as well. In a HA
setup, where do I supply this option (allowNonRestoredState)?
This option requires a savepoint path when I start a flink job I remember.
And HA does not require the path

Hao Sun


On Thu, Oct 10, 2019 at 11:16 AM Yun Tang  wrote:

> Just a minor supplement @Hao Sun , if you decided to
> drop a operator, don't forget to add --allowNonRestoredState (short: -n)
> option [1]
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#allowing-non-restored-state
> 
>
> Best
> Yun Tang
>
> --
> *From:* Vijay Bhaskar 
> *Sent:* Thursday, October 10, 2019 19:24
> *To:* Yang Wang 
> *Cc:* Sean Hester ; Aleksandar Mastilovic <
> amastilo...@sightmachine.com>; Yun Tang ; Hao Sun <
> ha...@zendesk.com>; Yuval Itzchakov ; user <
> user@flink.apache.org>
> *Subject:* Re: Challenges Deploying Flink With Savepoints On Kubernetes
>
> Thanks Yang. We will try and let you know if any issues arise
>
> Regards
> Bhaskar
>
> On Thu, Oct 10, 2019 at 1:53 PM Yang Wang  wrote:
>
> @ Hao Sun,
> I have made a confirmation that even we change parallelism and/or modify
> operators, add new operators,
> the flink cluster could also recover from latest checkpoint.
>
> @ Vijay
> a) Some individual jobmanager/taskmanager crashed exceptionally(someother
> jobmanagers
> and taskmanagers are alive), it could recover from the latest checkpoint.
> b) All jobmanagers and taskmanagers fails, it could still recover from the
> latest checkpoint if the cluster-id
> is not changed.
>
> When we enable the HA, The meta of jobgraph and checkpoint is saved on
> zookeeper and the real files are save
> on high-availability storage(HDFS). So when the flink application is
> submitted again with same cluster-id, it could
> recover jobs and checkpoint from zookeeper. I think it has been supported
> for a long time. Maybe you could have a
> try with flink-1.8 or 1.9.
>
> Best,
> Yang
>
>
> Vijay Bhaskar  于2019年10月10日周四 下午2:26写道:
>
> Thanks Yang and Sean. I have couple of questions:
>
> 1) Suppose the scenario of , bringing back entire cluster,
>  a) In that case, at least one job manager out of HA group should be
> up and running right? or
>  b) All the job managers fails, then also this works? In that case
> please let me know the procedure/share the documentation?
>  How to start from previous check point?
>  What Flink version onwards this feature is stable?
>
> Regards
> Bhaskar
>
>
> On Wed, Oct 9, 2019 at 8:51 AM Yang Wang  wrote:
>
> Hi Vijay,
>
> If you are using HA solution, i think you do not need to specify the
> savepoint. Instead the checkpoint is used.
> The checkpoint is done automatically and periodically based on your
> configuration.When the
> jobmanager/taskmanager fails or the whole cluster crashes, it could always
> recover from the latest
> checkpoint. Does this meed your requirement?
>
> Best,
> Yang
>
> Sean Hester  于2019年10月1日周二 上午1:47写道:
>
> Vijay,
>
> That is my understanding as well: the HA solution only solves the problem
> up to the point all job managers fail/restart at the same time. That's
> where my original concern was.
>
> But to Aleksandar and Yun's point, running in HA with 2 or 3 Job Managers
> per cluster--as long as they are all deployed to separate GKE nodes--would
> provide a very high uptime/low failure rate, at least on paper. It's a
> promising enough option that we're going to run in HA for a month or two
> and monitor results before we put in any extra work to customize the
> savepoint start-up behavior.
>
> On Fri, Sep 27, 2019 at 2:24 AM Vijay Bhaskar 
> wrote:
>
> I don't think HA will help to recover from cluster crash, for that we
> should take periodic savepoint right? Please correct me in case i am wrong
>
> Regards
> Bhaskar
>
> On Fri, Sep 27, 2019 at 11:48 AM Vijay Bhaskar 
> wrote:
>
> Suppose my cluster got crashed and need to bring up the entire cluster
> back? Does HA still helps to run the cluster from latest save point?
>
> Regards
> Bhaskar
>
> On Thu, Sep 26, 2019 at 7:44 PM Sean Hester 
> wrote:
>
> thanks to everyone for all the replies.
>
> i think the original concern here with "just" relying on the HA option is
> that there are some disaster recovery and data center migration use cases
> where the continuity of the job managers is difficult to preserve. but
> those are admittedly very edgy use cases. i think it's definitely worth
> reviewing the SLAs with our site reliability engineers to see how likely it
> would be to completely lose all job managers under an HA configuration.
> that small a risk might be acceptable/preferable to a one-off solution.
>
> @Aleksander, would love to learn more about Zookeeper-less HA. i think i
> spotted a thread somewhere between Till and someone (perhaps you) about
> that. feel free to DM me.
>

Flink 1.8.0 S3 Checkpoints fail with java.security.ProviderException: Could not initialize NSS

2019-10-10 Thread Austin Cawley-Edwards
Hi there,

I'm getting the following error message on a Flink 1.8 cluster deployed on
Kubernetes. I've already confirmed that the pod has access to S3 and write
permissions to the bucket, but I can't understand what the SSL issue is and
if it is related to S3 or not. I have tried both with the default state
backend and with rocksdb. It happens immediately upon triggering a
savepoint. Has anyone seen errors like this?

Thank you!
Austin


2019-10-10T22:21:36.496009042Z 2019-10-10 22:21:36,495 INFO
 org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] -
Triggering checkpoint 1 @ 1570746096485 for job
32a9a430038d440cbfee808101dcccd1.
2019-10-10T22:21:36.871364673Z 2019-10-10 22:21:36,858 INFO
 org.apache.flink.runtime.executiongraph.ExecutionGraph   [] -
Window(GlobalWindows(), DeltaTrigger, TimeEvictor, ComparableAggregator,
PassThroughWindowFunction) -> Sink: Print to Std. Out (1/1)
(6cb94a0782212895ac1e062b4124e425) switched from RUNNING to FAILED.
2019-10-10T22:21:36.87141613Z java.lang.ExceptionInInitializerError: null
2019-10-10T22:21:36.871422053Z  at
sun.security.ssl.SSLSessionImpl.(SSLSessionImpl.java:183)
~[?:1.8.0_181]
2019-10-10T22:21:36.87142623Z   at
sun.security.ssl.SSLSessionImpl.(SSLSessionImpl.java:148)
~[?:1.8.0_181]
2019-10-10T22:21:36.871429991Z  at
sun.security.ssl.SSLSessionImpl.(SSLSessionImpl.java:79)
~[?:1.8.0_181]
2019-10-10T22:21:36.871434248Z  at
sun.security.ssl.SSLSocketImpl.init(SSLSocketImpl.java:604) ~[?:1.8.0_181]
2019-10-10T22:21:36.871450145Z  at
sun.security.ssl.SSLSocketImpl.(SSLSocketImpl.java:572) ~[?:1.8.0_181]
2019-10-10T22:21:36.871454174Z  at
sun.security.ssl.SSLSocketFactoryImpl.createSocket(SSLSocketFactoryImpl.java:110)
~[?:1.8.0_181]
2019-10-10T22:21:36.871457724Z  at
org.apache.flink.fs.s3base.shaded.org.apache.http.conn.ssl.SSLConnectionSocketFactory.createLayeredSocket(SSLConnectionSocketFactory.java:365)
~[flink-s3-fs-presto-1.8.0-stream1.jar:1.8.0-stream1]
2019-10-10T22:21:36.871461103Z  at
org.apache.flink.fs.s3base.shaded.org.apache.http.conn.ssl.SSLConnectionSocketFactory.connectSocket(SSLConnectionSocketFactory.java:355)
~[flink-s3-fs-presto-1.8.0-stream1.jar:1.8.0-stream1]
2019-10-10T22:21:36.871465705Z  at
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.conn.ssl.SdkTLSSocketFactory.connectSocket(SdkTLSSocketFactory.java:132)
~[flink-s3-fs-presto-1.8.0-stream1.jar:1.8.0-stream1]
2019-10-10T22:21:36.87146981Z   at
org.apache.flink.fs.s3base.shaded.org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:142)
~[flink-s3-fs-presto-1.8.0-stream1.jar:1.8.0-stream1]
2019-10-10T22:21:36.871474533Z  at
org.apache.flink.fs.s3base.shaded.org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:359)
~[flink-s3-fs-presto-1.8.0-stream1.jar:1.8.0-stream1]
2019-10-10T22:21:36.871494299Z  at
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_181]
2019-10-10T22:21:36.87149796Z   at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
~[?:1.8.0_181]
2019-10-10T22:21:36.871502637Z  at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:1.8.0_181]
2019-10-10T22:21:36.871506464Z  at
java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_181]
2019-10-10T22:21:36.871510239Z  at
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.conn.ClientConnectionManagerFactory$Handler.invoke(ClientConnectionManagerFactory.java:76)
~[flink-s3-fs-presto-1.8.0-stream1.jar:1.8.0-stream1]
2019-10-10T22:21:36.871513871Z  at
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.conn.$Proxy19.connect(Unknown
Source) ~[?:1.8.0-stream1]
2019-10-10T22:21:36.871516965Z  at
org.apache.flink.fs.s3base.shaded.org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:381)
~[flink-s3-fs-presto-1.8.0-stream1.jar:1.8.0-stream1]
2019-10-10T22:21:36.871520624Z  at
org.apache.flink.fs.s3base.shaded.org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:237)
~[flink-s3-fs-presto-1.8.0-stream1.jar:1.8.0-stream1]
2019-10-10T22:21:36.87152418Z   at
org.apache.flink.fs.s3base.shaded.org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:185)
~[flink-s3-fs-presto-1.8.0-stream1.jar:1.8.0-stream1]
2019-10-10T22:21:36.871527877Z  at
org.apache.flink.fs.s3base.shaded.org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
~[flink-s3-fs-presto-1.8.0-stream1.jar:1.8.0-stream1]
2019-10-10T22:21:36.871531559Z  at
org.apache.flink.fs.s3base.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
~[flink-s3-fs-presto-1.8.0-stream1.jar:1.8.0-stream1]
2019-10-10T22:21:36.871537442Z  at
org.apache.flink.fs.s3base.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
~[flink-s3-fs-presto-1.8.0-stream1.jar:1.8.0-stream1]
2019-10-10T22:21:36.871540992Z  at

Re: Flink restoring a job from a checkpoint

2019-10-10 Thread Flavio Pompermaier
Sorry for the dumb question but let's suppose to not use retained
checkpoint and my job processed billions of messages from Kafka. Then a
problematic message causes my job to fail..am I able to complete a
savepoint to fic the job and restart from the problematic message (i.e.
last "valid" kafka offset)?

Il Gio 10 Ott 2019, 20:01 Yun Tang  ha scritto:

> Hi Vishwas
>
> Image this scenario, if your last committed offset is A with a savepoint-A
> and then you just stop this job and try a new program logical such as print
> your output instead of writing to previous sink to do some experiments. The
> new experimental job might commit offset-B to kafka. Once verified, and
> then you still need to resume from kafka offset-A to ensure all data has
> been written to target sink. This would be easier If you just restore the
> job from savepoint-A.
>
> In other words, Flink has already provided a more strong and flexible
> mechanism to resume kafka offsets, why not use this?
>
> Best
> Yun Tang
> --
> *From:* Congxian Qiu 
> *Sent:* Thursday, October 10, 2019 11:52
> *To:* theo.diefent...@scoop-software.de  >
> *Cc:* user 
> *Subject:* Re: Flink restoring a job from a checkpoint
>
> Hi Vishwas
>
> Sorry for the confusing, what Theo said previous is the meaning I want to
> say.  Previously, what I said is from Flink's side, if we do not restore
> from checkpoint/savepoint, all the TMs will have no state, so the Job
> starts from scratch.
>
> Best,
> Congxian
>
>
> theo.diefent...@scoop-software.de 
> 于2019年10月10日周四 上午1:15写道:
>
> Hi Vishaws,
>
> With "from scratch", Congxian means that Flink won't load any state
> automatically and starts as if there was no state. Of course if the kafka
> consumer group already exists and you have configured Flink to start from
> group offsets if there is no state yet, it will start from the group
> offsets.
>
> I think your approach is totally fine. Ignoring savepoints and don't
> retaining checkpoints saves overhead and configuration burdens and works
> nicely as long as you don't have any state in your pipeline.
>
> You should however be certain that nobody in your team will add something
> with state later on and forgets to think about the missing state...
>
> Best regards
> Theo
>
>
>
>
>  Ursprüngliche Nachricht 
> Betreff: Re: Flink restoring a job from a checkpoint
> Von: Vishwas Siravara
> An: Congxian Qiu
> Cc: Yun Tang ,user
>
> Hi Congxian,
> Thanks for getting back. Why would the streaming start from scratch if my
> consumer group does not change ? I start from the group offsets :
> env.addSource(consumer.setStartFromGroupOffsets()).name(source + "- kafka
> source")
> So when I restart the job it should consume from the last committed offset
> to kafka isn't it ? Let me know what you think .
>
> Best,
> Vishwas
> On Tue, Oct 8, 2019 at 9:06 PM Congxian Qiu 
> wrote:
>
> Hi Vishwas
>
> Currently, Flink can only restore retained checkpoint or savepoint with
> parameter `-s`[1][2], otherwise, it will start from scratch.
>
> ```
> checkpoint---> bin/flink run -s :checkpointMetaDataPath [:runArgs]
> savepoint --> bin/flink run -s :savepointPath [:runArgs]
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/ops/state/savepoints.html#resuming-from-savepoints
>
> Best,
> Congxian
>
>
> Vishwas Siravara  于2019年10月9日周三 上午5:07写道:
>
> Hi Yun,
> Thanks for your reply. I do start from GROUP_OFFSET . Here is the code
> snippet :
>
> env.addSource(consumer.setStartFromGroupOffsets()).name(source + "- kafka 
> source")
>
> I have also enabled and externalized checkpointing to S3 .
> Why is it not recommended to just restart the job once I cancel it, as
> long as the topology does not change? What is the advantage of
> explicitly restoring from last checkpoint by passing the -s option to the
> flink command line if it does the same thing? For instance if 
> s3://featuretoolkit.checkpoints/qa_streaming/c17f2cb6da5e6cbc897410fe49676edd/chk-1350/
> is my last successful checkpoint, what is the difference between 1 and 2.
>
> 1. /usr/mware/flink/bin/flink run -d -C 
> file:///usr/mware/flink/externalconfig/ -c com.visa.flink.cli.Main 
> flink-job-assembly.jar flink druid -p 8 -cp qa_streaming
> 2. /usr/mware/flink/bin/flink run -s 
> s3://featuretoolkit.checkpoints/qa_streaming/c17f2cb6da5e6cbc897410fe49676edd/chk-1350/
>  -d -C file:///usr/mware/flink/externalconfig/ -c com.visa.flink.cli.Main 
> flink-job-assembly.jar flink druid -p 4 -cp qa_streaming
>
> Thanks,
> Vishwas
>
> On Tue, Oct 8, 2019 at 1:51 PM Yun Tang  wrote:
>
> Hi Vishwas
>
> If you did not configure your
> org.apache.flink.streaming.connectors.kafka.config.StartupMode, it is
> GROUP_OFFSET by default, which means "Start from committed offsets in ZK /
> Kafka brokers of a specific consumer group". And you need  to enable
> 

Re: [QUESTION] How to parallelize with explicit punctuation in Flink?

2019-10-10 Thread Filip Niksic
Hi Theo,

Isn't the solution I proposed exactly the solution you talk about? Read the
stream sequentially, assign punctuated watermarks, keyBy to achieve
parallelism.

Perhaps you're reading too much into my question. When I sent the first
email, I didn't even know about punctuated watermarks. Dealing with a
sequential stream that cannot be read sequentially was way beyond what I
had in mind. :)

Filip



On Thu, Oct 10, 2019 at 7:55 AM Theo Diefenthal <
theo.diefent...@scoop-software.de> wrote:

> Hi Filip,
>
> My point was not about the computation of the "maximum". My point was: You
> could hopefully read the stream sequentially and just assign punctuated
> watermarks to it. Once you have assigned the watermarks properly (And
> before you do your expensive computatation, like in this case parsing the
> entire event and building the sum), you could tell flink to repartition /
> key the data and shuffle it to the worker tasks in the network, so that the
> downstream operations are performed in parallel. Flink will afaik then take
> care of dealing with the watmark internally and everything is fine.
> I think it is a rare usecase that you have a sequential stream which can
> not be simply read sequentally. If its such a large stream, that you can't
> do on a single host: "Read, extract special event, shuffle to the network
> to other tasks", you probably have a larger issue and need to rethink on
> the source level already, e.g. change the method serialization to something
> which has a really lightweight parsing for finding the special events or
> such.
>
> Best regards
> Theo
>
> --
> *Von: *"Filip Niksic" 
> *An: *"Theo Diefenthal" 
> *CC: *"user" 
> *Gesendet: *Donnerstag, 10. Oktober 2019 00:08:38
> *Betreff: *Re: [QUESTION] How to parallelize with explicit punctuation in
> Flink?
>
> Hi Theo,
>
> It is a single sequential stream.
>
> If I read your response correctly, you are arguing that summing a bunch of
> numbers is not much more computationally intensive than assigning
> timestamps to those numbers, so if the latter has to be done sequentially
> anyway, then why should the former be done in parallel? To that I can only
> say that the example I gave is intentionally simple in order to make the
> problem conceptually clean. By understanding the conceptually clean version
> of the problem, we also gain insight into messier realistic versions where
> the operations we want to parallelize may be much more computationally
> intensive.
>
> Filip
>
>
>
> On Wed, Oct 9, 2019 at 1:28 PM theo.diefent...@scoop-software.de <
> theo.diefent...@scoop-software.de> wrote:
>
>> Hi Filip, I don't really understand your problem here.
>> Do you have a source with a single sequential stream, where from time to
>> time, there is a barrier element? Or do you have a source like Kafka with
>> multiple partitions?
>> If you have case 2 with multiple partitions, what exactly do you mean by
>> "order matters"? Will each partition have its own barrier? Or do you have
>> just one barrier for all partitions? In that case, you will naturally have
>> an ordering problem if your events itself contain no time data.
>> If you have a "sequential source" why do you need parallelism? Won't it
>> work out to read that partition data in one task (possibly skipping
>> deserialization as much as possible to only recognize barrier events) and
>> then add a downstream task with higher parallelism doing the full
>> deserialization and other work?
>> Best regardsTheo
>>  Ursprüngliche Nachricht 
>> Betreff: Re: [QUESTION] How to parallelize with explicit punctuation in
>> Flink?
>> Von: Yun Gao
>> An: Filip Niksic ,user
>> Cc: Chesnay Schepler
>>
>>
>>   Hi Filip,
>>
>>  As a whole, I also think to increase the parallelism of the
>> reduce to more than 1, we should use a parallel window to compute the
>> partial sum and then sum the partial sum with WindowAll.
>>
>> For the assignTimestampAndWatermarks, From my side I think the
>> current usage should be OK and it works the same to the other operators.
>> Besides, for the keyBy Partitioner, I think "% PARALLELISM" is not
>> necessary and Flink will take care of the parallelism. In other words, I
>> think you can use .keyBy(x -> x.getId()) directly.
>>
>> Best,
>> Yun
>>
>>
>> --
>> From:Filip Niksic 
>> Send Time:2019 Oct. 9 (Wed.) 12:21
>> To:user 
>> Cc:Yun Gao ; Chesnay Schepler 
>> Subject:Re: [QUESTION] How to parallelize with explicit punctuation in
>> Flink?
>>
>> Here is the solution I currently have. It turned out to be more
>> complicated than I expected. It would be great if a more experienced Flink
>> user could comment and point out the shortcomings. And if you have other
>> ideas for achieving the same thing, let me know!
>>
>> Let's start like in the original email, except now we set the time
>> characteristic to EventTime and parallelism to a 

Re: Async and checkpointing

2019-10-10 Thread anurag
Hi Yun,
Thanks for your reply. I am sorry if I was not clear . What I meant was
that as records are processed, the checkpoint and the corresponding stream
position will move.
But in case of async with unordered, will the checkpoint and the
corresponding stream position move in the above scenario where Tasks
T1T100 are currently running in parallel and all tasks T1...T100 pass
except for T50.
Many thanks.
Anurag

On Thu, Oct 10, 2019 at 11:13 AM Yun Tang  wrote:

> Hi  Anurag
>
> What do you mean "will the checkpoint pointer move at all or not"?
>
> If one of your thread failed to send record, and if it would cause that
> sub-task to fail, it would lead to the job failover. When job failover, any
> on-going checkpoint would be aborted and job would then just restore from
> last latest checkpoint.
> If failing to send record would not cause that sub-task to fail, nothing
> would happen and job continues to run but this might be not what you want.
>
> Best
> Yun Tang
> --
> *From:* anurag 
> *Sent:* Friday, October 11, 2019 2:03
> *To:* user@flink.apache.org 
> *Subject:* Async and checkpointing
>
> Hi All,
> Thanks for your help in advance. I am using async I/O with  flink 1.5.5, I
> am using
>
> AsyncDataStream.unorderedWait method  also my capacity is set to 100.My 
> question is since my capacity is 100, each thread will be processing one 
> record.Say sequence number of my records is S1,S2S100 and say at a 
> particular point in time , thread T1 is processing record S1, T2 is 
> processing record S2  and T100 is processing record S100 and say all the 
> threads T1..T100 except T50 failed when sending the record to the Sink. In 
> this will the checkpoint pointer move at all or not.
>
> Apologies in advance if my question is not clear.
>
> Thanks,
> Anurag
>
>


Re: Where are uploaded Job jars stored?

2019-10-10 Thread John Smith
And can that folder be shared so that all nodes see it?

On Thu, 10 Oct 2019 at 14:36, Yun Tang  wrote:

> Hi John
>
> The jar is not stored in HA path, I think the answer [1] could help you.
>
> [1]
> https://stackoverflow.com/questions/51936608/where-can-i-find-my-jar-on-apache-flink-server-which-i-submitted-using-apache-fl
>
> Best
> Yun Tang
> --
> *From:* John Smith 
> *Sent:* Friday, October 11, 2019 2:06
> *To:* user 
> *Subject:* Where are uploaded Job jars stored?
>
> Hi using 1.8.0 running on standalone cluster with Zookeeper HA.
>
> Are job JARs stored at: high-availability.storageDir ???
>
> The thing is when you browse the individual nodes at port 8080 to go
> submit the job only the node where you uploaded the JAR has it.
>
> - Go to any given node
> - Upload a jar
> - Browse another node
> - Jar is not there.
>
>
>


Re: Where are uploaded Job jars stored?

2019-10-10 Thread Yun Tang
Hi John

The jar is not stored in HA path, I think the answer [1] could help you.

[1] 
https://stackoverflow.com/questions/51936608/where-can-i-find-my-jar-on-apache-flink-server-which-i-submitted-using-apache-fl

Best
Yun Tang

From: John Smith 
Sent: Friday, October 11, 2019 2:06
To: user 
Subject: Where are uploaded Job jars stored?

Hi using 1.8.0 running on standalone cluster with Zookeeper HA.

Are job JARs stored at: high-availability.storageDir ???

The thing is when you browse the individual nodes at port 8080 to go submit the 
job only the node where you uploaded the JAR has it.

- Go to any given node
- Upload a jar
- Browse another node
- Jar is not there.




Re: Best coding practises guide while programming using flink apis

2019-10-10 Thread Yun Tang
Hi Terry

Flink has a code-style and quality guide when contributes code[1], this might 
not be directly what you want but hope could help a bit.

As more and more big data system recommend high level and declarative API such 
as SQL and Table API [2], I think GOF design patterns might not play an 
important role.

[1] https://flink.apache.org/contributing/code-style-and-quality-preamble.html
[2] 
https://ci.apache.org/projects/flink/flink-docs-stable/concepts/programming-model.html#levels-of-abstraction

Best
Yun Tang



From: Deepak Sharma 
Sent: Monday, September 23, 2019 10:25
To: Terry Wang ; user 
Cc: dev 
Subject: Re: Best coding practises guide while programming using flink apis

Thanks Terry.
I would need some volunteers to speak about their use cases and the best 
practised they have been following around flink.

―DK

On Sun, 22 Sep 2019 at 5:36 PM, Terry Wang 
mailto:zjuwa...@gmail.com>> wrote:
Hi, Deepak~

I appreciate your idea and cc to dev mail too.

Best,
Terry Wang



在 2019年9月22日,下午2:12,Deepak Sharma 
mailto:deepakmc...@gmail.com>> 写道:

Hi All
I guess we need to put some examples in the documentation around best coding 
practises , concurrency , non blocking IO and design patterns while writing 
Apache Flink pipelines.
Is there any such guide available ?
E.g. when and how to use the GOF design patterns . Any code snippet can be put 
as well explaining it.

This guide can come from people already running beam in production and written 
it with all best practices in mind.
It will help in greater and wider adoption.

Just a thought.
Please let me know if anyone wants to contribute and i can lead this initiative 
by documenting in flink wiki.

Thanks
--
Thanks
Deepak
www.bigdatabig.com
www.keosha.net

--
Thanks
Deepak
www.bigdatabig.com
www.keosha.net


Re: Async and checkpointing

2019-10-10 Thread Yun Tang
Hi  Anurag

What do you mean "will the checkpoint pointer move at all or not"?

If one of your thread failed to send record, and if it would cause that 
sub-task to fail, it would lead to the job failover. When job failover, any 
on-going checkpoint would be aborted and job would then just restore from last 
latest checkpoint.
If failing to send record would not cause that sub-task to fail, nothing would 
happen and job continues to run but this might be not what you want.

Best
Yun Tang

From: anurag 
Sent: Friday, October 11, 2019 2:03
To: user@flink.apache.org 
Subject: Async and checkpointing

Hi All,
Thanks for your help in advance. I am using async I/O with  flink 1.5.5, I am 
using

AsyncDataStream.unorderedWait method  also my capacity is set to 100.My 
question is since my capacity is 100, each thread will be processing one 
record.Say sequence number of my records is S1,S2S100 and say at a 
particular point in time , thread T1 is processing record S1, T2 is processing 
record S2  and T100 is processing record S100 and say all the threads T1..T100 
except T50 failed when sending the record to the Sink. In this will the 
checkpoint pointer move at all or not.


Apologies in advance if my question is not clear.

Thanks,
Anurag


Re: Challenges Deploying Flink With Savepoints On Kubernetes

2019-10-10 Thread Yun Tang
Just a minor supplement @Hao Sun, if you decided to 
drop a operator, don't forget to add --allowNonRestoredState (short: -n) option 
[1]


[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#allowing-non-restored-state

Best
Yun Tang


From: Vijay Bhaskar 
Sent: Thursday, October 10, 2019 19:24
To: Yang Wang 
Cc: Sean Hester ; Aleksandar Mastilovic 
; Yun Tang ; Hao Sun 
; Yuval Itzchakov ; user 

Subject: Re: Challenges Deploying Flink With Savepoints On Kubernetes

Thanks Yang. We will try and let you know if any issues arise

Regards
Bhaskar

On Thu, Oct 10, 2019 at 1:53 PM Yang Wang 
mailto:danrtsey...@gmail.com>> wrote:
@ Hao Sun,
I have made a confirmation that even we change parallelism and/or modify 
operators, add new operators,
the flink cluster could also recover from latest checkpoint.

@ Vijay
a) Some individual jobmanager/taskmanager crashed exceptionally(someother 
jobmanagers
and taskmanagers are alive), it could recover from the latest checkpoint.
b) All jobmanagers and taskmanagers fails, it could still recover from the 
latest checkpoint if the cluster-id
is not changed.

When we enable the HA, The meta of jobgraph and checkpoint is saved on 
zookeeper and the real files are save
on high-availability storage(HDFS). So when the flink application is submitted 
again with same cluster-id, it could
recover jobs and checkpoint from zookeeper. I think it has been supported for a 
long time. Maybe you could have a
try with flink-1.8 or 1.9.

Best,
Yang


Vijay Bhaskar mailto:bhaskar.eba...@gmail.com>> 
于2019年10月10日周四 下午2:26写道:
Thanks Yang and Sean. I have couple of questions:

1) Suppose the scenario of , bringing back entire cluster,
 a) In that case, at least one job manager out of HA group should be up and 
running right? or
 b) All the job managers fails, then also this works? In that case please 
let me know the procedure/share the documentation?
 How to start from previous check point?
 What Flink version onwards this feature is stable?

Regards
Bhaskar


On Wed, Oct 9, 2019 at 8:51 AM Yang Wang 
mailto:danrtsey...@gmail.com>> wrote:
Hi Vijay,

If you are using HA solution, i think you do not need to specify the savepoint. 
Instead the checkpoint is used.
The checkpoint is done automatically and periodically based on your 
configuration.When the
jobmanager/taskmanager fails or the whole cluster crashes, it could always 
recover from the latest
checkpoint. Does this meed your requirement?

Best,
Yang

Sean Hester mailto:sean.hes...@bettercloud.com>> 
于2019年10月1日周二 上午1:47写道:
Vijay,

That is my understanding as well: the HA solution only solves the problem up to 
the point all job managers fail/restart at the same time. That's where my 
original concern was.

But to Aleksandar and Yun's point, running in HA with 2 or 3 Job Managers per 
cluster--as long as they are all deployed to separate GKE nodes--would provide 
a very high uptime/low failure rate, at least on paper. It's a promising enough 
option that we're going to run in HA for a month or two and monitor results 
before we put in any extra work to customize the savepoint start-up behavior.

On Fri, Sep 27, 2019 at 2:24 AM Vijay Bhaskar 
mailto:bhaskar.eba...@gmail.com>> wrote:
I don't think HA will help to recover from cluster crash, for that we should 
take periodic savepoint right? Please correct me in case i am wrong

Regards
Bhaskar

On Fri, Sep 27, 2019 at 11:48 AM Vijay Bhaskar 
mailto:bhaskar.eba...@gmail.com>> wrote:
Suppose my cluster got crashed and need to bring up the entire cluster back? 
Does HA still helps to run the cluster from latest save point?

Regards
Bhaskar

On Thu, Sep 26, 2019 at 7:44 PM Sean Hester 
mailto:sean.hes...@bettercloud.com>> wrote:
thanks to everyone for all the replies.

i think the original concern here with "just" relying on the HA option is that 
there are some disaster recovery and data center migration use cases where the 
continuity of the job managers is difficult to preserve. but those are 
admittedly very edgy use cases. i think it's definitely worth reviewing the 
SLAs with our site reliability engineers to see how likely it would be to 
completely lose all job managers under an HA configuration. that small a risk 
might be acceptable/preferable to a one-off solution.

@Aleksander, would love to learn more about Zookeeper-less HA. i think i 
spotted a thread somewhere between Till and someone (perhaps you) about that. 
feel free to DM me.

thanks again to everyone!

On Thu, Sep 26, 2019 at 7:32 AM Yang Wang 
mailto:danrtsey...@gmail.com>> wrote:
Hi, Aleksandar

Savepoint option in standalone job cluster is optional. If you want to always 
recover
from the latest checkpoint, just as Aleksandar and Yun Tang said you could use 
the
high-availability configuration. Make sure the cluster-id is not changed, i 
think the job
could recover both at exceptionally crash and restart 

Where are uploaded Job jars stored?

2019-10-10 Thread John Smith
Hi using 1.8.0 running on standalone cluster with Zookeeper HA.

Are job JARs stored at: high-availability.storageDir ???

The thing is when you browse the individual nodes at port 8080 to go submit
the job only the node where you uploaded the JAR has it.

- Go to any given node
- Upload a jar
- Browse another node
- Jar is not there.


Async and checkpointing

2019-10-10 Thread anurag
Hi All,
Thanks for your help in advance. I am using async I/O with  flink 1.5.5, I
am using

AsyncDataStream.unorderedWait method  also my capacity is set to
100.My question is since my capacity is 100, each thread will be
processing one record.Say sequence number of my records is
S1,S2S100 and say at a particular point in time , thread T1 is
processing record S1, T2 is processing record S2  and T100 is
processing record S100 and say all the threads T1..T100 except T50
failed when sending the record to the Sink. In this will the
checkpoint pointer move at all or not.

Apologies in advance if my question is not clear.

Thanks,
Anurag


Re: Flink restoring a job from a checkpoint

2019-10-10 Thread Yun Tang
Hi Vishwas

Image this scenario, if your last committed offset is A with a savepoint-A and 
then you just stop this job and try a new program logical such as print your 
output instead of writing to previous sink to do some experiments. The new 
experimental job might commit offset-B to kafka. Once verified, and then you 
still need to resume from kafka offset-A to ensure all data has been written to 
target sink. This would be easier If you just restore the job from savepoint-A.

In other words, Flink has already provided a more strong and flexible mechanism 
to resume kafka offsets, why not use this?

Best
Yun Tang

From: Congxian Qiu 
Sent: Thursday, October 10, 2019 11:52
To: theo.diefent...@scoop-software.de 
Cc: user 
Subject: Re: Flink restoring a job from a checkpoint

Hi Vishwas

Sorry for the confusing, what Theo said previous is the meaning I want to say.  
Previously, what I said is from Flink's side, if we do not restore from 
checkpoint/savepoint, all the TMs will have no state, so the Job starts from 
scratch.

Best,
Congxian


theo.diefent...@scoop-software.de 
mailto:theo.diefent...@scoop-software.de>> 
于2019年10月10日周四 上午1:15写道:
Hi Vishaws,

With "from scratch", Congxian means that Flink won't load any state 
automatically and starts as if there was no state. Of course if the kafka 
consumer group already exists and you have configured Flink to start from group 
offsets if there is no state yet, it will start from the group offsets.

I think your approach is totally fine. Ignoring savepoints and don't retaining 
checkpoints saves overhead and configuration burdens and works nicely as long 
as you don't have any state in your pipeline.

You should however be certain that nobody in your team will add something with 
state later on and forgets to think about the missing state...

Best regards
Theo




 Ursprüngliche Nachricht 
Betreff: Re: Flink restoring a job from a checkpoint
Von: Vishwas Siravara
An: Congxian Qiu
Cc: Yun Tang ,user

Hi Congxian,
Thanks for getting back. Why would the streaming start from scratch if my 
consumer group does not change ? I start from the group offsets : 
env.addSource(consumer.setStartFromGroupOffsets()).name(source + "- kafka 
source")
So when I restart the job it should consume from the last committed offset to 
kafka isn't it ? Let me know what you think .

Best,
Vishwas
On Tue, Oct 8, 2019 at 9:06 PM Congxian Qiu 
mailto:qcx978132...@gmail.com>> wrote:
Hi Vishwas

Currently, Flink can only restore retained checkpoint or savepoint with 
parameter `-s`[1][2], otherwise, it will start from scratch.

```
checkpoint---> bin/flink run -s :checkpointMetaDataPath [:runArgs]
savepoint --> bin/flink run -s :savepointPath [:runArgs]

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint
[2] 
https://ci.apache.org/projects/flink/flink-docs-master/ops/state/savepoints.html#resuming-from-savepoints

Best,
Congxian


Vishwas Siravara mailto:vsirav...@gmail.com>> 
于2019年10月9日周三 上午5:07写道:
Hi Yun,
Thanks for your reply. I do start from GROUP_OFFSET . Here is the code snippet :

env.addSource(consumer.setStartFromGroupOffsets()).name(source + "- kafka 
source")

I have also enabled and externalized checkpointing to S3 .
Why is it not recommended to just restart the job once I cancel it, as long as 
the topology does not change? What is the advantage of explicitly restoring 
from last checkpoint by passing the -s option to the flink command line if it 
does the same thing? For instance if 
s3://featuretoolkit.checkpoints/qa_streaming/c17f2cb6da5e6cbc897410fe49676edd/chk-1350/
 is my last successful checkpoint, what is the difference between 1 and 2.

1. /usr/mware/flink/bin/flink run -d -C file:///usr/mware/flink/externalconfig/ 
-c com.visa.flink.cli.Main flink-job-assembly.jar flink druid -p 8 -cp 
qa_streaming
2. /usr/mware/flink/bin/flink run -s 
s3://featuretoolkit.checkpoints/qa_streaming/c17f2cb6da5e6cbc897410fe49676edd/chk-1350/
 -d -C file:///usr/mware/flink/externalconfig/ -c com.visa.flink.cli.Main 
flink-job-assembly.jar flink druid -p 4 -cp qa_streaming

Thanks,
Vishwas

On Tue, Oct 8, 2019 at 1:51 PM Yun Tang 
mailto:myas...@live.com>> wrote:
Hi Vishwas

If you did not configure your 
org.apache.flink.streaming.connectors.kafka.config.StartupMode, it is 
GROUP_OFFSET by default, which means "Start from committed offsets in ZK / 
Kafka brokers of a specific consumer group". And you need  to enable checkpoint 
so that kafka offsets are committed when checkpoint completes.

In other words, even if you don't resume from checkpoint, just enable 
checkpoint in previous jobs and set startupMode as GROUP_OFFSET, you could 
restore from last committed offset if previous checkpoint completed [1][2]. 
However, this is not really recommended, better to resume from last checkpoint 
[3]

[1] 

Re: Re:Memory constrains running Flink on Kubernetes

2019-10-10 Thread Yun Tang
Hi Shengjk1

setBlockCacheSize, setWriteBufferSize and setMaxWriteBufferNumber could help 
you to control memory usage. However, Flink would store state per column family 
which would increase the number of column family and each family has its own 
write buffer. FRocksDB [1] already plan to fix this by introducing RocksDB's 
feature of write buffer manager. We would try to fix FLINK-7289 before 
Flink-1.10 release.

If you are really urgent to fix this problem, I have a non-official built 
frocksDB based on rocksDB-5.18.3 which had been verified work well from Gyula 
Fora's experience. You could contact me in private to get this jar package and 
rebuild your Flink runtime to enable write buffer manager future.


[1] https://github.com/dataArtisans/frocksdb/pull/4
[2] 
https://github.com/facebook/rocksdb/wiki/Write-Buffer-Manager#limit-total-memory-of-memtables

Best
Yun Tang


From: shengjk1 
Sent: Thursday, October 10, 2019 20:37
To: wvl 
Cc: user@flink.apache.org 
Subject: Re:Memory constrains running Flink on Kubernetes

+1

I also encountered a similar problem, but I run flink application that uses 
state in RocksDB on yarn. Yarn container was killed because OOM.
I also saw rockdb tuning guide[1], tune some parameters,but it is useless , 
such as:

class MyOptions1 implements OptionsFactory {
@Override
public DBOptions createDBOptions(DBOptions currentOptions) {
return currentOptions.setDbWriteBufferSize(64 * 1024 * 1024)
.setIncreaseParallelism(2)
.setMaxBackgroundFlushes(2)
.setInfoLogLevel(InfoLogLevel.DEBUG_LEVEL)
.setMaxOpenFiles(4)
.setUseFsync(false);
}
@Override
public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions 
currentOptions) {
return currentOptions.setTableFormatConfig(
new BlockBasedTableConfig()
.setBlockCacheSize(16 * 1024 * 1024)
//increases read amplification but decreases memory useage and space 
amplification
.setBlockSize(16 * 1024 * 1024))
.setWriteBufferSize(16 * 1024 * 1024)
.setMaxWriteBufferNumber(1);
}
}

Additional, this is FLINK-7289, it is similar to us. But I don’t find a good 
way to  fix it.


[1] https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide
[2] https://issues.apache.org/jira/browse/FLINK-7289



Best,
Shengjk1


On 07/24/2019 03:48,wvl wrote:
Hi,

We're running a relatively simply Flink application that uses a bunch of state 
in RocksDB on Kubernetes.
During the course of development and going to production, we found that we were 
often running into memory issues made apparent by Kubernetes OOMKilled and Java 
OOM log events.

In order to tackle these, we're trying to account for all the memory used in 
the container, to allow proper tuning.
Metric-wise we have:
- container_memory_working_set_bytes = 6,5GB
- flink_taskmanager_Status_JVM_Memory_Heap_Max =  4,7GB
- flink_taskmanager_Status_JVM_Memory_NonHeap_Used = 325MB
- flink_taskmanager_Status_JVM_Memory_Direct_MemoryUsed = 500MB

This is my understanding based on all the documentation and observations:
container_memory_working_set_bytes will be the total amount of memory in use, 
disregarding OS page & block cache.
Heap will be heap.
NonHeap is mostly the metaspace.
Direct_Memory is mostly network buffers.

Running the numbers I have 1 GB unaccounted for. I'm also uncertain as to 
RocksDB. According to the docs RocksDB has a "Column Family Write Buffer" where 
"You need to budget for 2 x your worst case memory use".
We have 17 ValueStateDescriptors (ignoring state for windows) which I'm 
assuming corresponds to a "Column Family" in RockDB. Meaning our budget should 
be around 2GB.
Is this accounted for in one of the flink_taskmanager metrics above? We've also 
enabled various rocksdb metrics, but it's unclear where this Write Buffer 
memory would be represented.

Finally, we've seen that when our job has issues and is restarted rapidly, 
NonHeap_Used grows from an initial 50Mb to 700MB, before our containers are 
killed. We're assuming this is due
to no form of cleanup in the metaspace as classes get (re)loaded.

These are our taskmanager JVM settings: -XX:+UseG1GC -XX:MaxDirectMemorySize=1G 
-XX:+UnlockExperimentalVMOptions -XX:+UseCGroupMemoryLimitForHeap 
-XX:MaxRAMFraction=2
With flink config:
  taskmanager.heap.size: 5000m
  state.backend: rocksdb
  state.backend.incremental: true
  state.backend.rocksdb.timer-service.factory: ROCKSDB

Based on what we've observed we're thinking about setting -XX:MaxMetaspaceSize 
to a reasonable value, so that we at least get an error message which can 
easily be traced back to the behavior we're seeing.

Okay, all that said let's sum up what we're asking here:
- Is there any more insight into how memory is accounted for than our current 
metrics?
- Which metric, if any accounts for RocksDB memory usage?
- What's going on with the Metaspace growth we're seeing during job restarts, 
is there something we can do about this such as setting -XX:MaxMetaspaceSize?
- Any other 

Re: Difference between windows in Spark and Flink

2019-10-10 Thread Yun Tang
Hi Felipe

Generally speaking, the key difference which impacts the performance is where 
they store data within windows.
For Flink, it would store data and its related time-stamp within windows in 
state backend[1]. Once window is triggered, it would pull all the stored timer 
with coupled record-key, and then use the record-key to query state backend for 
next actions.

For Spark, first of all, we would talk about structured streaming [2] as it's 
better than previous spark streaming especially on window scenario. Unlike 
Flink built-in supported rocksDB state backend, Spark has only one 
implementation of state store providers. It's HDFSBackedStateStoreProvider 
which stores all of the data in memory, what is a very memory consuming 
approach and might come across OOM errors[3][4][5].

To avoid this, Databricks Runtime offer a 'RocksDBStateStoreProvider' but not 
open-source. We're lucky that open-source Flink already offers built-in RocksDB 
state backend to avoid OOM problem. Moreover, Flink community recently are 
developing spill-able memory state backend [7].

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html
[2] 
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#window-operations-on-event-time
[3] 
https://medium.com/@chandanbaranwal/state-management-in-spark-structured-streaming-aaa87b6c9d31
[4] 
http://apache-spark-user-list.1001560.n3.nabble.com/use-rocksdb-for-spark-structured-streaming-SSS-td34776.html#a34779
[5] https://github.com/chermenin/spark-states
[6] 
https://docs.databricks.com/spark/latest/structured-streaming/production.html#optimize-performance-of-stateful-streaming-queries
[7] https://issues.apache.org/jira/browse/FLINK-12692

Best
Yun Tang




From: Felipe Gutierrez 
Sent: Thursday, October 10, 2019 20:39
To: user 
Subject: Difference between windows in Spark and Flink

Hi all,

I am trying to think about the essential differences between operators in Flink 
and Spark. Especially when I am using Keyed Windows then a reduce operation.
In Flink we develop an application that can logically separate these two 
operators. Thus after a keyed window I can use .reduce/aggregate/fold/apply() 
functions [1].
In Spark we have window/reduceByKeyAndWindow functions which to me appears it 
is less flexible in the options to use with a keyed window operation [2].
Moreover, when these two applications are deployed in a Flink and Spark cluster 
respectively, what are the differences between their physical operators running 
in the cluster?

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#windows
[2] 
https://spark.apache.org/docs/latest/streaming-programming-guide.html#window-operations

Thanks,
Felipe
--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com


Re: Backpressure tuning/failure

2019-10-10 Thread Owen Rees-Hayward
Hey Piotr,

I think we are broadly in agreement, hopefully.

So out of the three scenarios you describe, the code is simulating scenario
2). The only additional comment I would make to this is that the additional
load on a node could be an independent service or job.

I am guessing we can agree, that in the context of multi-tenant Hadoop,
this is quite common? For instance, assuming Flink is deployed on the
datanodes then I could see the following as a few examples:

   - another tenant runs a heavy batch job that overlaps with our streaming
   datanodes
   - someone runs a juicy adhoc Hive query which overlaps with our datanodes
   - HBase performs compaction or replica movement on some of our datanodes

Now in an ideal world, I might have a dedicated cluster or be deployed in
the cloud. Then I have an easier life. However, there are lots of
data-engineers operating in challenging multi-tenant Hadoop environments,
where life is not so easy : o

You stated that Flink does not support scenario 2. Typically, Spark is
deployed onto the datanodes for data-locality. I had assumed the same would
be true for Flink. Is that assumption incorrect?

Cheers, Owen

On Thu, 10 Oct 2019 at 15:23, Piotr Nowojski  wrote:

> Hi Owen,
>
> Thanks for the quick response. No, I haven’t seen the previous blog post,
> yes it clears the things out a bit.
>
> To clarify, the code is attempting to simulate a straggler node due to
> high load, which therefore processes data at a slower rate - not a failing
> node. Some degree of this is a feature of multi-tenant Hadoop.
>
>
> In your benchmark you are manually slowing down just one TaskManager, so
> you are testing for the failing/slow machine case, where either:
> 1. the machine is slow on it’s own because it’s smaller than the others,
> 2. it’s overloaded by some service independent of the Flink
> 3. it's a failing node.
>
> Out of those three options, first two are not supported by Flink, in a
> sense that Flink assumes more or less equal machines in the cluster. The
> third is, as I wrote in the previous response, pretty uncommon scenario
> (until you reach really huge scale). How often one of your machine fails in
> a way that it is 6.6 times slower than the others? I agree Flink doesn’t
> handle this automatically at the moment (currently you would be expected to
> manually shut down the machine). Nevertheless there are some plans how to
> address this (speculative execution and load based balancing channel
> selection), but with no definite schedule.
>
> Also if the issue is "multi-tenant Hadoop.”, I would first try to better
> assign resources in the cluster, using for example CGroups via
> yarn/lxc/docker, or virtual machines.
>
> Cheers, Piotrek
>
> On 10 Oct 2019, at 16:02, Owen Rees-Hayward  wrote:
>
> Hi Piotr,
>
> Thanks for getting back to me and for the info. I try to describe the
> motivation around the scenarios in the original post in the series - see
> the 'Backpressure - why you might care' section on
> http://owenrh.me.uk/blog/2019/09/30/. Maybe it could have been clearer.
>
> As you note, this will not affect every Flink job. However, one persons
> niche is another persons day job. I definitely agree that keyed network
> exchanges, which is going to the majority of analytics queries, are in a
> different problem space. However, this is not an uncommon scenario in
> ingest pipelines.
>
> I'd be interested to know whether you saw the section in the post I
> referred to above and whether this clears anything up? To clarify, the code
> is attempting to simulate a straggler node due to high load,
> which therefore processes data at a slower rate - not a failing node. Some
> degree of this is a feature of multi-tenant Hadoop.
>
> Cheers, Owen
>
> On Thu, 10 Oct 2019 at 10:27, Piotr Nowojski  wrote:
>
>> Hi,
>>
>> I’m not entirely sure what you are testing. I have looked at your code
>> (only the constant straggler scenario) and please correct me if’m wrong, in
>> your job you are basically measuring throughput of
>> `Thread.sleep(straggler.waitMillis)`.
>>
>> In the first RichMap task (`subTaskId == 0`), per every record you do the
>> sleep(50ms), so after filling in all of the network buffers  your whole job
>> will be bottlenecked by this throughput cap of 20 records / second. Every
>> so often when this struggling task will be able to process and free up some
>> buffer from the backlog. This briefly unblocks other three tasks (which are
>> capped at 133 records / second). Apart from those short stints, those other
>> tasks can not process constant 133 records / seconds, because records are
>> evenly distributed by the source between all of those tasks. Which is I
>> think clearly visible on the charts and every system would behave in
>> exactly the same way.
>>
>> But what scenario are you really trying to simulate?
>>
>> A data skew when one task is 6.65 (133 / 20 ) times more
>> overloaded/processing heavier records than the others? Yes, this is
>> expected 

Re: Backpressure tuning/failure

2019-10-10 Thread Piotr Nowojski
Hi Owen,

Thanks for the quick response. No, I haven’t seen the previous blog post, yes 
it clears the things out a bit. 

> To clarify, the code is attempting to simulate a straggler node due to high 
> load, which therefore processes data at a slower rate - not a failing node. 
> Some degree of this is a feature of multi-tenant Hadoop. 


In your benchmark you are manually slowing down just one TaskManager, so you 
are testing for the failing/slow machine case, where either:
1. the machine is slow on it’s own because it’s smaller than the others,
2. it’s overloaded by some service independent of the Flink 
3. it's a failing node. 

Out of those three options, first two are not supported by Flink, in a sense 
that Flink assumes more or less equal machines in the cluster. The third is, as 
I wrote in the previous response, pretty uncommon scenario (until you reach 
really huge scale). How often one of your machine fails in a way that it is 6.6 
times slower than the others? I agree Flink doesn’t handle this automatically 
at the moment (currently you would be expected to manually shut down the 
machine). Nevertheless there are some plans how to address this (speculative 
execution and load based balancing channel selection), but with no definite 
schedule.

Also if the issue is "multi-tenant Hadoop.”, I would first try to better assign 
resources in the cluster, using for example CGroups via yarn/lxc/docker, or 
virtual machines.

Cheers, Piotrek

> On 10 Oct 2019, at 16:02, Owen Rees-Hayward  wrote:
> 
> Hi Piotr,
> 
> Thanks for getting back to me and for the info. I try to describe the 
> motivation around the scenarios in the original post in the series - see the 
> 'Backpressure - why you might care' section on 
> http://owenrh.me.uk/blog/2019/09/30/ . 
> Maybe it could have been clearer.
> 
> As you note, this will not affect every Flink job. However, one persons niche 
> is another persons day job. I definitely agree that keyed network exchanges, 
> which is going to the majority of analytics queries, are in a different 
> problem space. However, this is not an uncommon scenario in ingest pipelines.
> 
> I'd be interested to know whether you saw the section in the post I referred 
> to above and whether this clears anything up? To clarify, the code is 
> attempting to simulate a straggler node due to high load, which therefore 
> processes data at a slower rate - not a failing node. Some degree of this is 
> a feature of multi-tenant Hadoop. 
> 
> Cheers, Owen
> 
> On Thu, 10 Oct 2019 at 10:27, Piotr Nowojski  > wrote:
> Hi,
> 
> I’m not entirely sure what you are testing. I have looked at your code (only 
> the constant straggler scenario) and please correct me if’m wrong, in your 
> job you are basically measuring throughput of 
> `Thread.sleep(straggler.waitMillis)`.
> 
> In the first RichMap task (`subTaskId == 0`), per every record you do the 
> sleep(50ms), so after filling in all of the network buffers  your whole job 
> will be bottlenecked by this throughput cap of 20 records / second. Every so 
> often when this struggling task will be able to process and free up some 
> buffer from the backlog. This briefly unblocks other three tasks (which are 
> capped at 133 records / second). Apart from those short stints, those other 
> tasks can not process constant 133 records / seconds, because records are 
> evenly distributed by the source between all of those tasks. Which is I think 
> clearly visible on the charts and every system would behave in exactly the 
> same way.
> 
> But what scenario are you really trying to simulate? 
> 
> A data skew when one task is 6.65 (133 / 20 ) times more 
> overloaded/processing heavier records than the others? Yes, this is expected 
> behaviour, but your benchmark is testing this in a bit convoluted way.
> 
> A failing machine which has 6.65 times less performance? With keyed network 
> exchanges there is again very little that you can do (except of the 
> speculative execution). Without keyed network exchanges, OK, I agree. In this 
> case, randomly/evenly distributing the records is not the optimal shuffling 
> strategy and there is some room for the improvement in Flink (we could 
> distribute records not randomly but to the less busy machines). However this 
> is a pretty much niche feature (failing machine + non keyed exchanges) and 
> you are not saying anywhere that this is what you are testing for.
> 
> Piotrek
> 
>> On 8 Oct 2019, at 18:10, Owen Rees-Hayward > > wrote:
>> 
>> Hi,
>> 
>> I am having a few issues with the Flink (v1.8.1) backpressure default 
>> settings, which lead to poor throughput in a comparison I am doing between 
>> Storm, Spark and Flink.
>> 
>> I have a setup that simulates a progressively worse straggling task that 
>> Storm and Spark cope with the relatively well. Flink not so much. Code can 
>> be found 

Re: Backpressure tuning/failure

2019-10-10 Thread Owen Rees-Hayward
Hi Piotr,

Thanks for getting back to me and for the info. I try to describe the
motivation around the scenarios in the original post in the series - see
the 'Backpressure - why you might care' section on
http://owenrh.me.uk/blog/2019/09/30/. Maybe it could have been clearer.

As you note, this will not affect every Flink job. However, one persons
niche is another persons day job. I definitely agree that keyed network
exchanges, which is going to the majority of analytics queries, are in a
different problem space. However, this is not an uncommon scenario in
ingest pipelines.

I'd be interested to know whether you saw the section in the post I
referred to above and whether this clears anything up? To clarify, the code
is attempting to simulate a straggler node due to high load,
which therefore processes data at a slower rate - not a failing node. Some
degree of this is a feature of multi-tenant Hadoop.

Cheers, Owen

On Thu, 10 Oct 2019 at 10:27, Piotr Nowojski  wrote:

> Hi,
>
> I’m not entirely sure what you are testing. I have looked at your code
> (only the constant straggler scenario) and please correct me if’m wrong, in
> your job you are basically measuring throughput of
> `Thread.sleep(straggler.waitMillis)`.
>
> In the first RichMap task (`subTaskId == 0`), per every record you do the
> sleep(50ms), so after filling in all of the network buffers  your whole job
> will be bottlenecked by this throughput cap of 20 records / second. Every
> so often when this struggling task will be able to process and free up some
> buffer from the backlog. This briefly unblocks other three tasks (which are
> capped at 133 records / second). Apart from those short stints, those other
> tasks can not process constant 133 records / seconds, because records are
> evenly distributed by the source between all of those tasks. Which is I
> think clearly visible on the charts and every system would behave in
> exactly the same way.
>
> But what scenario are you really trying to simulate?
>
> A data skew when one task is 6.65 (133 / 20 ) times more
> overloaded/processing heavier records than the others? Yes, this is
> expected behaviour, but your benchmark is testing this in a bit convoluted
> way.
>
> A failing machine which has 6.65 times less performance? With keyed
> network exchanges there is again very little that you can do (except of the
> speculative execution). Without keyed network exchanges, OK, I agree. In
> this case, randomly/evenly distributing the records is not the optimal
> shuffling strategy and there is some room for the improvement in Flink (we
> could distribute records not randomly but to the less busy machines).
> However this is a pretty much niche feature (failing machine + non keyed
> exchanges) and you are not saying anywhere that this is what you are
> testing for.
>
> Piotrek
>
> On 8 Oct 2019, at 18:10, Owen Rees-Hayward  wrote:
>
> Hi,
>
> I am having a few issues with the Flink (v1.8.1) backpressure default
> settings, which lead to poor throughput in a comparison I am doing between
> Storm, Spark and Flink.
>
> I have a setup that simulates a progressively worse straggling task that
> Storm and Spark cope with the relatively well. Flink not so much. Code can
> be found here - https://github.com/owenrh/flink-variance.
>
> See this throughput chart for the an idea of how badly -
> https://owenrh.me.uk/assets/images/blog/smackdown/flink-constant-straggler.png
>
> I do not have any production experience with Flink, but I have had a look
> at the Flink docs and there is nothing in there that jumps out at me to
> explain or address this. I presume I am missing something, as I cannot
> believe Flink is this weak in the face of stragglers. It must be
> configuration right?
>
> Would appreciate any help on this. I've got a draft blog post that I will
> publish in a day or two, and don't want to criticise the Flink backpressure
> implementation for what seems most likely some default configuration issue.
>
> Thanks in advance, Owen
>
> --
> Owen Rees-Hayward
> 07912 876046
> twitter.com/owen4d
>
>
>

-- 
Owen Rees-Hayward
07912 876046
twitter.com/owen4d


Re: Flink集群迁移savepoint还保留原集群地址问题讨论

2019-10-10 Thread Congxian Qiu
你好

现在 Savepoint 中保存的是文件的绝对路径,暂时不支持 savepoint 的的移动,如果真的需要迁移你可以自己写一个程序将
savepoint 中 meta 的地址进行转换,然后重新生成一个 meta 文件。

Best,
Congxian


pengchengl...@163.com  于2019年10月10日周四 下午5:31写道:

> 你好,生成savepoint之后,是否把savepoint移动到了其他地方?官方说不能移动。
>
>
> 发件人: 蒋涛涛
> 发送时间: 2019-10-10 17:13
> 收件人: user-zh
> 主题: Flink集群迁移savepoint还保留原集群地址问题讨论
> HI all,
>
> 本来在Flink集群迁移过程中,使用版本flink
>
> 1.6.2,将savepoint进行保存,并迁移到新的hdfs集群的时候,并从该savepoint进行恢复的时候,出现找不到之前hdfs集群信息的错误信息,错误如下:
>
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
> at
>
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:192)
> at
>
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.flink.util.FlinkException: Could not restore operator
> state backend for StreamSource_cbc357ccb763df2852fee8c4fc7d55f2_(1/1) from
> any of the 1 provided restore options.
> at
>
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
> at
>
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:242)
> at
>
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:140)
> ... 5 more
> Caused by: java.io.IOException: Cannot instantiate file system for URI:
>
> hdfs://xxx/user/flink_1.6/savepoints/savepoint-ab68c7-562a00439efe/2e56b78a-fa36-4cf4-a383-990358fcd539
> at
>
> org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:187)
> at
>
> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:399)
> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318)
> at
>
> org.apache.flink.runtime.state.filesystem.FileStateHandle.getFileSystem(FileStateHandle.java:100)
> at
>
> org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:68)
> at
>
> org.apache.flink.runtime.state.OperatorStreamStateHandle.openInputStream(OperatorStreamStateHandle.java:66)
> at
>
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:495)
> at
>
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:64)
> at
>
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
> at
>
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
> ... 7 more
> Caused by: java.lang.IllegalArgumentException:
> java.net.UnknownHostException: xxx
> at
>
> org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:439)
> at
>
> org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:321)
> at
>
> org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176)
> at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:690)
> at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:631)
> at
>
> org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:160)
> at
>
> org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:159)
> ... 16 more
> Caused by: java.net.UnknownHostException: xxx
> ... 23 more
>
> 补充:
>
> 这个的xxx指的是原hdfs的clustername,用这个方式进行任务的迁移会出现这样的问题,checkpoint也有类似的错误,请问下为什么会出现这样的问题,并如何解决,谢谢
>
> 祝好
>


Difference between windows in Spark and Flink

2019-10-10 Thread Felipe Gutierrez
Hi all,

I am trying to think about the essential differences between operators in
Flink and Spark. Especially when I am using Keyed Windows then a reduce
operation.
In Flink we develop an application that can logically separate these two
operators. Thus after a keyed window I can use
.reduce/aggregate/fold/apply() functions [1].
In Spark we have window/reduceByKeyAndWindow functions which to me appears
it is less flexible in the options to use with a keyed window operation [2].
Moreover, when these two applications are deployed in a Flink and Spark
cluster respectively, what are the differences between their physical
operators running in the cluster?

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#windows
[2]
https://spark.apache.org/docs/latest/streaming-programming-guide.html#window-operations

Thanks,
Felipe
*--*
*-- Felipe Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
*


Re:Memory constrains running Flink on Kubernetes

2019-10-10 Thread shengjk1
+1


I also encountered a similar problem, but I run flink application that uses 
state in RocksDB on yarn. Yarn container was killed because OOM.
I also saw rockdb tuning guide[1], tune some parameters,but it is useless , 
such as:


class MyOptions1 implements OptionsFactory {
@Override
public DBOptions createDBOptions(DBOptions currentOptions) {
return currentOptions.setDbWriteBufferSize(64 * 1024 * 1024)
.setIncreaseParallelism(2)
.setMaxBackgroundFlushes(2)
.setInfoLogLevel(InfoLogLevel.DEBUG_LEVEL)
.setMaxOpenFiles(4)
.setUseFsync(false);
}
@Override
public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions 
currentOptions) {
return currentOptions.setTableFormatConfig(
new BlockBasedTableConfig()
.setBlockCacheSize(16 * 1024 * 1024)
//increases read amplification but decreases memory useage and space 
amplification
.setBlockSize(16 * 1024 * 1024))
.setWriteBufferSize(16 * 1024 * 1024)
.setMaxWriteBufferNumber(1);
}
}


Additional, this is FLINK-7289, it is similar to us. But I don’t find a good 
way to  fix it.




[1] https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide
[2] https://issues.apache.org/jira/browse/FLINK-7289






Best,
Shengjk1




On 07/24/2019 03:48,wvl wrote:
Hi,


We're running a relatively simply Flink application that uses a bunch of state 
in RocksDB on Kubernetes.
During the course of development and going to production, we found that we were 
often running into memory issues made apparent by Kubernetes OOMKilled and Java 
OOM log events.


In order to tackle these, we're trying to account for all the memory used in 
the container, to allow proper tuning.
Metric-wise we have:
- container_memory_working_set_bytes = 6,5GB

- flink_taskmanager_Status_JVM_Memory_Heap_Max =  4,7GB

- flink_taskmanager_Status_JVM_Memory_NonHeap_Used = 325MB

- flink_taskmanager_Status_JVM_Memory_Direct_MemoryUsed = 500MB



This is my understanding based on all the documentation and observations:
container_memory_working_set_bytes will be the total amount of memory in use, 
disregarding OS page & block cache.
Heap will be heap.
NonHeap is mostly the metaspace.
Direct_Memory is mostly network buffers.


Running the numbers I have 1 GB unaccounted for. I'm also uncertain as to 
RocksDB. According to the docs RocksDB has a "Column Family Write Buffer" where 
"You need to budget for 2 x your worst case memory use".
We have 17 ValueStateDescriptors (ignoring state for windows) which I'm 
assuming corresponds to a "Column Family" in RockDB. Meaning our budget should 
be around 2GB.

Is this accounted for in one of the flink_taskmanager metrics above? We've also 
enabled various rocksdb metrics, but it's unclear where this Write Buffer 
memory would be represented.



Finally, we've seen that when our job has issues and is restarted rapidly, 
NonHeap_Used grows from an initial 50Mb to 700MB, before our containers are 
killed. We're assuming this is due

to no form of cleanup in the metaspace as classes get (re)loaded. 


These are our taskmanager JVM settings: -XX:+UseG1GC -XX:MaxDirectMemorySize=1G 
-XX:+UnlockExperimentalVMOptions -XX:+UseCGroupMemoryLimitForHeap 
-XX:MaxRAMFraction=2
With flink config:
  taskmanager.heap.size: 5000m
  state.backend: rocksdb
  state.backend.incremental: true
  state.backend.rocksdb.timer-service.factory: ROCKSDB


Based on what we've observed we're thinking about setting -XX:MaxMetaspaceSize 
to a reasonable value, so that we at least get an error message which can 
easily be traced back to the behavior we're seeing.


Okay, all that said let's sum up what we're asking here:
- Is there any more insight into how memory is accounted for than our current 
metrics?
- Which metric, if any accounts for RocksDB memory usage?
- What's going on with the Metaspace growth we're seeing during job restarts, 
is there something we can do about this such as setting -XX:MaxMetaspaceSize?
- Any other tips to improve reliability running in resource constrained 
environments such as Kubernetes?


Thanks,


William




Re: Passing parameters to filter function (in DataStreams)

2019-10-10 Thread Theo Diefenthal
Hi, 

Your original post looks like "computeThreshold" doesn't require any 
parameters, but is just an expensive to compute operation. 

In that case, you can inherit from "RichFilterFunction" instead of 
"FilterFunction". In case of "RichFilterFunction", you can override the 
"open"-method and perform your operation in there just once and store the 
result e.g. in a transient variable. In that case, nothing gets serialized and 
send over the network. The open method is guaranteed to be called only once per 
operator and is called before the first call to "filter" is made. 

The pattern to pass arguments in general is totally fine. I often pass e.g. a 
connection String as a parameter to my RichFunction and within the open method 
of the function, I establish the connection to some remote system. 

Best regards 
Theo 


Von: "Komal Mariam"  
An: "Chesnay Schepler"  
CC: "user"  
Gesendet: Donnerstag, 10. Oktober 2019 04:00:46 
Betreff: Re: Passing parameters to filter function (in DataStreams) 

Thank you @Chesnay! 

I also managed to pass arguments to a RichFilterFunction: new 
MyFilterFunc(Integer threshold ) by defining its constructor. 
If there's a better way to pass arguments I'd appreciate it if you let me know. 

On Tue, 8 Oct 2019 at 19:58, Chesnay Schepler < [ mailto:ches...@apache.org | 
ches...@apache.org ] > wrote: 



You can compute the threshold ahead of time and reference it directly in the 
filter function. 

(Below are 2 examples, depending on whether you like lambdas or not) 
final int threshold = computeThreshold () ; temperatureStream.filter( new 
FilterFunction() { @Override public boolean filter (Integer 
temperature) { return temperature > threshold ; }
}) ; 
final int threshold = computeThreshold () ; 
temperatureStream.filter(temperature -> temperature > threshold ) ; 

On 08/10/2019 12:46, Komal Mariam wrote: 

BQ_BEGIN

Hi everyone, 

Suppose I have to compute a filter condition 

Integer threshold = compute threshold(); 

If I: 

temperatureStream.filter(new FilterFunction() { 
@Override 
public boolean filter(Integer temperature) throws Exception { 
Integer threshold = compute threshold(); 
return temperature > threshold 
} 

would this mean I have computed threshold over and over again, for every new 
element in the stream? 

my threshold does not changes once it computed. I don't want to recompute it 
every time for new elements? is there way I can pass it as a parameter to the 
filter function? 







BQ_END



Batch Job in a Flink 1.9 Standalone Cluster

2019-10-10 Thread Timothy Victor
After a batch job finishes in a flink standalone cluster, I notice that the
memory isn't freed up.   I understand Flink uses it's own memory manager
and just allocates a large tenured byte array that is not GC'ed.   But does
the memory used in this byte array get released when the batch job is done?

The scenario I am facing is that I am running a series of scheduled batch
jobs on a standalone cluster with 1 TM and 1 Slot.  I notice that after a
job is complete the memory used in the TM isn't freed up.  I can confirm
this by running  jmap dump.

Has anyone else run into this issue?   This is on 1.9.

Thanks

Tim


Re: [QUESTION] How to parallelize with explicit punctuation in Flink?

2019-10-10 Thread Theo Diefenthal
Hi Filip, 

My point was not about the computation of the "maximum". My point was: You 
could hopefully read the stream sequentially and just assign punctuated 
watermarks to it. Once you have assigned the watermarks properly (And before 
you do your expensive computatation, like in this case parsing the entire event 
and building the sum), you could tell flink to repartition / key the data and 
shuffle it to the worker tasks in the network, so that the downstream 
operations are performed in parallel. Flink will afaik then take care of 
dealing with the watmark internally and everything is fine. 
I think it is a rare usecase that you have a sequential stream which can not be 
simply read sequentally. If its such a large stream, that you can't do on a 
single host: "Read, extract special event, shuffle to the network to other 
tasks", you probably have a larger issue and need to rethink on the source 
level already, e.g. change the method serialization to something which has a 
really lightweight parsing for finding the special events or such. 

Best regards 
Theo 


Von: "Filip Niksic"  
An: "Theo Diefenthal"  
CC: "user"  
Gesendet: Donnerstag, 10. Oktober 2019 00:08:38 
Betreff: Re: [QUESTION] How to parallelize with explicit punctuation in Flink? 

Hi Theo, 

It is a single sequential stream. 

If I read your response correctly, you are arguing that summing a bunch of 
numbers is not much more computationally intensive than assigning timestamps to 
those numbers, so if the latter has to be done sequentially anyway, then why 
should the former be done in parallel? To that I can only say that the example 
I gave is intentionally simple in order to make the problem conceptually clean. 
By understanding the conceptually clean version of the problem, we also gain 
insight into messier realistic versions where the operations we want to 
parallelize may be much more computationally intensive. 

Filip 



On Wed, Oct 9, 2019 at 1:28 PM [ mailto:theo.diefent...@scoop-software.de | 
theo.diefent...@scoop-software.de ] < [ 
mailto:theo.diefent...@scoop-software.de | theo.diefent...@scoop-software.de ] 
> wrote: 


Hi Filip, I don't really understand your problem here. 
Do you have a source with a single sequential stream, where from time to time, 
there is a barrier element? Or do you have a source like Kafka with multiple 
partitions? 
If you have case 2 with multiple partitions, what exactly do you mean by "order 
matters"? Will each partition have its own barrier? Or do you have just one 
barrier for all partitions? In that case, you will naturally have an ordering 
problem if your events itself contain no time data. 
If you have a "sequential source" why do you need parallelism? Won't it work 
out to read that partition data in one task (possibly skipping deserialization 
as much as possible to only recognize barrier events) and then add a downstream 
task with higher parallelism doing the full deserialization and other work? 
Best regardsTheo 
 Ursprüngliche Nachricht  
Betreff: Re: [QUESTION] How to parallelize with explicit punctuation in Flink? 
Von: Yun Gao 
An: Filip Niksic ,user 
Cc: Chesnay Schepler 


Hi Filip, 

As a whole, I also think to increase the parallelism of the reduce to more than 
1, we should use a parallel window to compute the partial sum and then sum the 
partial sum with WindowAll. 

For the assignTimestampAndWatermarks, From my side I think the current usage 
should be OK and it works the same to the other operators. Besides, for the 
keyBy Partitioner, I think "% PARALLELISM" is not necessary and Flink will take 
care of the parallelism. In other words, I think you can use .keyBy(x -> 
x.getId()) directly. 

Best, 
Yun 


-- 
From:Filip Niksic < [ mailto:fnik...@seas.upenn.edu | fnik...@seas.upenn.edu ] 
> 
Send Time:2019 Oct. 9 (Wed.) 12:21 
To:user < [ mailto:user@flink.apache.org | user@flink.apache.org ] > 
Cc:Yun Gao < [ mailto:yungao...@aliyun.com | yungao...@aliyun.com ] >; Chesnay 
Schepler < [ mailto:ches...@apache.org | ches...@apache.org ] > 
Subject:Re: [QUESTION] How to parallelize with explicit punctuation in Flink? 

Here is the solution I currently have. It turned out to be more complicated 
than I expected. It would be great if a more experienced Flink user could 
comment and point out the shortcomings. And if you have other ideas for 
achieving the same thing, let me know! 

Let's start like in the original email, except now we set the time 
characteristic to EventTime and parallelism to a constant named PARALLELISM. 

StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(); 
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 

final int PARALLELISM = 2; 
env.setParallelism(PARALLELISM); 

DataStream stream = env.fromElements(DataItem.class, 
new Value(1), new Barrier(), new Value(3), new Value(-1), new Barrier()); 

The first step is to use 

Re: Challenges Deploying Flink With Savepoints On Kubernetes

2019-10-10 Thread Vijay Bhaskar
Thanks Yang. We will try and let you know if any issues arise

Regards
Bhaskar

On Thu, Oct 10, 2019 at 1:53 PM Yang Wang  wrote:

> @ Hao Sun,
> I have made a confirmation that even we change parallelism and/or modify
> operators, add new operators,
> the flink cluster could also recover from latest checkpoint.
>
> @ Vijay
> a) Some individual jobmanager/taskmanager crashed exceptionally(someother
> jobmanagers
> and taskmanagers are alive), it could recover from the latest checkpoint.
> b) All jobmanagers and taskmanagers fails, it could still recover from the
> latest checkpoint if the cluster-id
> is not changed.
>
> When we enable the HA, The meta of jobgraph and checkpoint is saved on
> zookeeper and the real files are save
> on high-availability storage(HDFS). So when the flink application is
> submitted again with same cluster-id, it could
> recover jobs and checkpoint from zookeeper. I think it has been supported
> for a long time. Maybe you could have a
> try with flink-1.8 or 1.9.
>
> Best,
> Yang
>
>
> Vijay Bhaskar  于2019年10月10日周四 下午2:26写道:
>
>> Thanks Yang and Sean. I have couple of questions:
>>
>> 1) Suppose the scenario of , bringing back entire cluster,
>>  a) In that case, at least one job manager out of HA group should be
>> up and running right? or
>>  b) All the job managers fails, then also this works? In that case
>> please let me know the procedure/share the documentation?
>>  How to start from previous check point?
>>  What Flink version onwards this feature is stable?
>>
>> Regards
>> Bhaskar
>>
>>
>> On Wed, Oct 9, 2019 at 8:51 AM Yang Wang  wrote:
>>
>>> Hi Vijay,
>>>
>>> If you are using HA solution, i think you do not need to specify the
>>> savepoint. Instead the checkpoint is used.
>>> The checkpoint is done automatically and periodically based on your
>>> configuration.When the
>>> jobmanager/taskmanager fails or the whole cluster crashes, it could
>>> always recover from the latest
>>> checkpoint. Does this meed your requirement?
>>>
>>> Best,
>>> Yang
>>>
>>> Sean Hester  于2019年10月1日周二 上午1:47写道:
>>>
 Vijay,

 That is my understanding as well: the HA solution only solves the
 problem up to the point all job managers fail/restart at the same time.
 That's where my original concern was.

 But to Aleksandar and Yun's point, running in HA with 2 or 3 Job
 Managers per cluster--as long as they are all deployed to separate GKE
 nodes--would provide a very high uptime/low failure rate, at least on
 paper. It's a promising enough option that we're going to run in HA for a
 month or two and monitor results before we put in any extra work to
 customize the savepoint start-up behavior.

 On Fri, Sep 27, 2019 at 2:24 AM Vijay Bhaskar 
 wrote:

> I don't think HA will help to recover from cluster crash, for that we
> should take periodic savepoint right? Please correct me in case i am wrong
>
> Regards
> Bhaskar
>
> On Fri, Sep 27, 2019 at 11:48 AM Vijay Bhaskar <
> bhaskar.eba...@gmail.com> wrote:
>
>> Suppose my cluster got crashed and need to bring up the entire
>> cluster back? Does HA still helps to run the cluster from latest save
>> point?
>>
>> Regards
>> Bhaskar
>>
>> On Thu, Sep 26, 2019 at 7:44 PM Sean Hester <
>> sean.hes...@bettercloud.com> wrote:
>>
>>> thanks to everyone for all the replies.
>>>
>>> i think the original concern here with "just" relying on the HA
>>> option is that there are some disaster recovery and data center 
>>> migration
>>> use cases where the continuity of the job managers is difficult to
>>> preserve. but those are admittedly very edgy use cases. i think it's
>>> definitely worth reviewing the SLAs with our site reliability engineers 
>>> to
>>> see how likely it would be to completely lose all job managers under an 
>>> HA
>>> configuration. that small a risk might be acceptable/preferable to a
>>> one-off solution.
>>>
>>> @Aleksander, would love to learn more about Zookeeper-less HA. i
>>> think i spotted a thread somewhere between Till and someone (perhaps 
>>> you)
>>> about that. feel free to DM me.
>>>
>>> thanks again to everyone!
>>>
>>> On Thu, Sep 26, 2019 at 7:32 AM Yang Wang 
>>> wrote:
>>>
 Hi, Aleksandar

 Savepoint option in standalone job cluster is optional. If you want
 to always recover
 from the latest checkpoint, just as Aleksandar and Yun Tang said
 you could use the
 high-availability configuration. Make sure the cluster-id is not
 changed, i think the job
 could recover both at exceptionally crash and restart by
 expectation.

 @Aleksandar Mastilovic , we are also
 have an zookeeper-less high-availability implementation[1].
 Maybe we could have some 

Re: Flink metrics reporters documentation

2019-10-10 Thread Aleksey Pak
Hi Flavio,

Below is my explanation to your question, based on anecdotal evidence:

As you may know, Flink distribution package is already scala version
specific and bundles some jar artifacts.
User Flink job is supposed to be compiled against some of those jars (with
maven's `provided` scope). For example, it can be Flink CEP library.
In such cases, jar names are usually preserved as is (so you would
reference the same artifact dependency name in your application build and
when you want to copy it from `/opt` to `/lib` folder).

Some of the jars are not supposed to be used by your application directly,
but rather as "plugins" in your Flink cluster (here I mean "plugins" in a
more broader sense, than plugins mechanism used by file systems introduced
in Flink 1.9).
File systems, metrics reporters are good candidates for this. The reason
that original jar artifacts are scala version specific is rather
"incidental" (imo) - it just happens that they may depend on some core
Flink libraries that still have scala code.
In practice the implementation of those libraries is not scala dependent,
but to be strict (and safe) they are built separately for different scala
versions (what you see in the maven central).

My understanding, that one of the goals to move scala away from core
libraries (to some api level library) - this should make some of the
component builds scala independent.
Removal of scala version for those jars in the distribution is probably
done with the future plan in mind (so that it stays the same user
experience).

Regards,
Aleksey


On Thu, Oct 10, 2019 at 10:59 AM Flavio Pompermaier 
wrote:

> Sorry,
> I just discovered that those jars are actually in the opt folder within
> Flink dist..however the second point still holds: why there's a single
> influxdb jar inside flink's opt jar while on maven there are 2 versions
> (one for scala 2.11 and one for 2.12)?
>
> Best,
> Flavio
>
> On Thu, Oct 10, 2019 at 10:49 AM Flavio Pompermaier 
> wrote:
>
>> Hi to all,
>> I was trying to configure monitoring on my cluster so I went to the
>> metric reporters documentation.
>> There are 2 things that are not clear to me:
>>
>>1. In all reporters the documentation says to take the jars from /opt
>>folder..obviously this is not true. Wouldn't be better to provide a link 
>> to
>>the jar directly (on Maven Central for example)?
>>2. If you look to influxdb dependency the documentation says to use
>>flink-metrics-influxdb-1.9.0.jar but there's no such "unified" jar, on
>>maven central there are two version: 1 for scala 2.11 and one for scala 
>> 2.12
>>
>> Should I open 2 JIRA tickets to improve those 2 aspects (if I'm not
>> wrong..)?
>>
>> Best,
>> Flavio
>>
>
>


Re: Flink集群迁移savepoint还保留原集群地址问题讨论

2019-10-10 Thread pengchengl...@163.com
你好,生成savepoint之后,是否把savepoint移动到了其他地方?官方说不能移动。

 
发件人: 蒋涛涛
发送时间: 2019-10-10 17:13
收件人: user-zh
主题: Flink集群迁移savepoint还保留原集群地址问题讨论
HI all,
 
本来在Flink集群迁移过程中,使用版本flink
1.6.2,将savepoint进行保存,并迁移到新的hdfs集群的时候,并从该savepoint进行恢复的时候,出现找不到之前hdfs集群信息的错误信息,错误如下:
 
java.lang.Exception: Exception while creating StreamOperatorStateContext.
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:192)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.util.FlinkException: Could not restore operator
state backend for StreamSource_cbc357ccb763df2852fee8c4fc7d55f2_(1/1) from
any of the 1 provided restore options.
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:242)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:140)
... 5 more
Caused by: java.io.IOException: Cannot instantiate file system for URI:
hdfs://xxx/user/flink_1.6/savepoints/savepoint-ab68c7-562a00439efe/2e56b78a-fa36-4cf4-a383-990358fcd539
at
org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:187)
at
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:399)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318)
at
org.apache.flink.runtime.state.filesystem.FileStateHandle.getFileSystem(FileStateHandle.java:100)
at
org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:68)
at
org.apache.flink.runtime.state.OperatorStreamStateHandle.openInputStream(OperatorStreamStateHandle.java:66)
at
org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:495)
at
org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:64)
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
... 7 more
Caused by: java.lang.IllegalArgumentException:
java.net.UnknownHostException: xxx
at
org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:439)
at
org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:321)
at
org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176)
at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:690)
at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:631)
at
org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:160)
at
org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:159)
... 16 more
Caused by: java.net.UnknownHostException: xxx
... 23 more
 
补充:
这个的xxx指的是原hdfs的clustername,用这个方式进行任务的迁移会出现这样的问题,checkpoint也有类似的错误,请问下为什么会出现这样的问题,并如何解决,谢谢
 
祝好


Re: Flink集群迁移savepoint还保留原集群地址问题讨论

2019-10-10 Thread Qi Kang
Hi,

这是因为savepoint的_metadata文件中存储的是savepoint文件的绝对路径,而非相对路径,所以恢复的时候仍然会找旧的集群。

从现有资料看,savepoint仍然无法迁移。详情可以参考 [1] 和 [2]。

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/savepoints.html#triggering-savepoints
[2] https://issues.apache.org/jira/browse/FLINK-5763

BR,
Qi Kang


> On Oct 10, 2019, at 17:13, 蒋涛涛  wrote:
> 
> HI all,
> 
> 本来在Flink集群迁移过程中,使用版本flink
> 1.6.2,将savepoint进行保存,并迁移到新的hdfs集群的时候,并从该savepoint进行恢复的时候,出现找不到之前hdfs集群信息的错误信息,错误如下:
> 
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
>at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:192)
>at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250)
>at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
>at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
>at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.flink.util.FlinkException: Could not restore operator
> state backend for StreamSource_cbc357ccb763df2852fee8c4fc7d55f2_(1/1) from
> any of the 1 provided restore options.
>at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
>at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:242)
>at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:140)
>... 5 more
> Caused by: java.io.IOException: Cannot instantiate file system for URI:
> hdfs://xxx/user/flink_1.6/savepoints/savepoint-ab68c7-562a00439efe/2e56b78a-fa36-4cf4-a383-990358fcd539
>at
> org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:187)
>at
> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:399)
>at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318)
>at
> org.apache.flink.runtime.state.filesystem.FileStateHandle.getFileSystem(FileStateHandle.java:100)
>at
> org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:68)
>at
> org.apache.flink.runtime.state.OperatorStreamStateHandle.openInputStream(OperatorStreamStateHandle.java:66)
>at
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:495)
>at
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:64)
>at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
>at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
>... 7 more
> Caused by: java.lang.IllegalArgumentException:
> java.net.UnknownHostException: xxx
>at
> org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:439)
>at
> org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:321)
>at
> org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176)
>at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:690)
>at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:631)
>at
> org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:160)
>at
> org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:159)
>... 16 more
> Caused by: java.net.UnknownHostException: xxx
>... 23 more
> 
> 补充:
> 这个的xxx指的是原hdfs的clustername,用这个方式进行任务的迁移会出现这样的问题,checkpoint也有类似的错误,请问下为什么会出现这样的问题,并如何解决,谢谢
> 
> 祝好



Re: Backpressure tuning/failure

2019-10-10 Thread Piotr Nowojski
Hi,

I’m not entirely sure what you are testing. I have looked at your code (only 
the constant straggler scenario) and please correct me if’m wrong, in your job 
you are basically measuring throughput of `Thread.sleep(straggler.waitMillis)`.

In the first RichMap task (`subTaskId == 0`), per every record you do the 
sleep(50ms), so after filling in all of the network buffers  your whole job 
will be bottlenecked by this throughput cap of 20 records / second. Every so 
often when this struggling task will be able to process and free up some buffer 
from the backlog. This briefly unblocks other three tasks (which are capped at 
133 records / second). Apart from those short stints, those other tasks can not 
process constant 133 records / seconds, because records are evenly distributed 
by the source between all of those tasks. Which is I think clearly visible on 
the charts and every system would behave in exactly the same way.

But what scenario are you really trying to simulate? 

A data skew when one task is 6.65 (133 / 20 ) times more overloaded/processing 
heavier records than the others? Yes, this is expected behaviour, but your 
benchmark is testing this in a bit convoluted way.

A failing machine which has 6.65 times less performance? With keyed network 
exchanges there is again very little that you can do (except of the speculative 
execution). Without keyed network exchanges, OK, I agree. In this case, 
randomly/evenly distributing the records is not the optimal shuffling strategy 
and there is some room for the improvement in Flink (we could distribute 
records not randomly but to the less busy machines). However this is a pretty 
much niche feature (failing machine + non keyed exchanges) and you are not 
saying anywhere that this is what you are testing for.

Piotrek

> On 8 Oct 2019, at 18:10, Owen Rees-Hayward  wrote:
> 
> Hi,
> 
> I am having a few issues with the Flink (v1.8.1) backpressure default 
> settings, which lead to poor throughput in a comparison I am doing between 
> Storm, Spark and Flink.
> 
> I have a setup that simulates a progressively worse straggling task that 
> Storm and Spark cope with the relatively well. Flink not so much. Code can be 
> found here - https://github.com/owenrh/flink-variance 
> .
> 
> See this throughput chart for the an idea of how badly - 
> https://owenrh.me.uk/assets/images/blog/smackdown/flink-constant-straggler.png
>  
> 
> 
> I do not have any production experience with Flink, but I have had a look at 
> the Flink docs and there is nothing in there that jumps out at me to explain 
> or address this. I presume I am missing something, as I cannot believe Flink 
> is this weak in the face of stragglers. It must be configuration right?
> 
> Would appreciate any help on this. I've got a draft blog post that I will 
> publish in a day or two, and don't want to criticise the Flink backpressure 
> implementation for what seems most likely some default configuration issue.
> 
> Thanks in advance, Owen
> 
> -- 
> Owen Rees-Hayward
> 07912 876046
> twitter.com/owen4d 


Flink集群迁移savepoint还保留原集群地址问题讨论

2019-10-10 Thread 蒋涛涛
HI all,

本来在Flink集群迁移过程中,使用版本flink
1.6.2,将savepoint进行保存,并迁移到新的hdfs集群的时候,并从该savepoint进行恢复的时候,出现找不到之前hdfs集群信息的错误信息,错误如下:

java.lang.Exception: Exception while creating StreamOperatorStateContext.
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:192)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.util.FlinkException: Could not restore operator
state backend for StreamSource_cbc357ccb763df2852fee8c4fc7d55f2_(1/1) from
any of the 1 provided restore options.
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:242)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:140)
... 5 more
Caused by: java.io.IOException: Cannot instantiate file system for URI:
hdfs://xxx/user/flink_1.6/savepoints/savepoint-ab68c7-562a00439efe/2e56b78a-fa36-4cf4-a383-990358fcd539
at
org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:187)
at
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:399)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318)
at
org.apache.flink.runtime.state.filesystem.FileStateHandle.getFileSystem(FileStateHandle.java:100)
at
org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:68)
at
org.apache.flink.runtime.state.OperatorStreamStateHandle.openInputStream(OperatorStreamStateHandle.java:66)
at
org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:495)
at
org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:64)
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
... 7 more
Caused by: java.lang.IllegalArgumentException:
java.net.UnknownHostException: xxx
at
org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:439)
at
org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:321)
at
org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176)
at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:690)
at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:631)
at
org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:160)
at
org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:159)
... 16 more
Caused by: java.net.UnknownHostException: xxx
... 23 more

补充:
这个的xxx指的是原hdfs的clustername,用这个方式进行任务的迁移会出现这样的问题,checkpoint也有类似的错误,请问下为什么会出现这样的问题,并如何解决,谢谢

祝好


Re: Flink metrics reporters documentation

2019-10-10 Thread Flavio Pompermaier
Sorry,
I just discovered that those jars are actually in the opt folder within
Flink dist..however the second point still holds: why there's a single
influxdb jar inside flink's opt jar while on maven there are 2 versions
(one for scala 2.11 and one for 2.12)?

Best,
Flavio

On Thu, Oct 10, 2019 at 10:49 AM Flavio Pompermaier 
wrote:

> Hi to all,
> I was trying to configure monitoring on my cluster so I went to the metric
> reporters documentation.
> There are 2 things that are not clear to me:
>
>1. In all reporters the documentation says to take the jars from /opt
>folder..obviously this is not true. Wouldn't be better to provide a link to
>the jar directly (on Maven Central for example)?
>2. If you look to influxdb dependency the documentation says to use
>flink-metrics-influxdb-1.9.0.jar but there's no such "unified" jar, on
>maven central there are two version: 1 for scala 2.11 and one for scala 
> 2.12
>
> Should I open 2 JIRA tickets to improve those 2 aspects (if I'm not
> wrong..)?
>
> Best,
> Flavio
>


Flink metrics reporters documentation

2019-10-10 Thread Flavio Pompermaier
Hi to all,
I was trying to configure monitoring on my cluster so I went to the metric
reporters documentation.
There are 2 things that are not clear to me:

   1. In all reporters the documentation says to take the jars from /opt
   folder..obviously this is not true. Wouldn't be better to provide a link to
   the jar directly (on Maven Central for example)?
   2. If you look to influxdb dependency the documentation says to use
   flink-metrics-influxdb-1.9.0.jar but there's no such "unified" jar, on
   maven central there are two version: 1 for scala 2.11 and one for scala 2.12

Should I open 2 JIRA tickets to improve those 2 aspects (if I'm not
wrong..)?

Best,
Flavio


回复: flink1.9 webui exception日志显示问题

2019-10-10 Thread 戴嘉诚
+1 这个我也遇到了这个问题,主要原因是异常了,然后region 重启,重启后,会重新加载,就自动清空了异常日志信息..现在不能再webui上排查异常信息了

发件人: 李杰
发送时间: 2019年10月10日 14:41
收件人: user-zh@flink.apache.org
主题: flink1.9 webui exception日志显示问题

log4j.properties为官方默认。
weib ui exception日志一闪而过,ui上看不到历史异常信息





Re: Challenges Deploying Flink With Savepoints On Kubernetes

2019-10-10 Thread Yang Wang
@ Hao Sun,
I have made a confirmation that even we change parallelism and/or modify
operators, add new operators,
the flink cluster could also recover from latest checkpoint.

@ Vijay
a) Some individual jobmanager/taskmanager crashed exceptionally(someother
jobmanagers
and taskmanagers are alive), it could recover from the latest checkpoint.
b) All jobmanagers and taskmanagers fails, it could still recover from the
latest checkpoint if the cluster-id
is not changed.

When we enable the HA, The meta of jobgraph and checkpoint is saved on
zookeeper and the real files are save
on high-availability storage(HDFS). So when the flink application is
submitted again with same cluster-id, it could
recover jobs and checkpoint from zookeeper. I think it has been supported
for a long time. Maybe you could have a
try with flink-1.8 or 1.9.

Best,
Yang


Vijay Bhaskar  于2019年10月10日周四 下午2:26写道:

> Thanks Yang and Sean. I have couple of questions:
>
> 1) Suppose the scenario of , bringing back entire cluster,
>  a) In that case, at least one job manager out of HA group should be
> up and running right? or
>  b) All the job managers fails, then also this works? In that case
> please let me know the procedure/share the documentation?
>  How to start from previous check point?
>  What Flink version onwards this feature is stable?
>
> Regards
> Bhaskar
>
>
> On Wed, Oct 9, 2019 at 8:51 AM Yang Wang  wrote:
>
>> Hi Vijay,
>>
>> If you are using HA solution, i think you do not need to specify the
>> savepoint. Instead the checkpoint is used.
>> The checkpoint is done automatically and periodically based on your
>> configuration.When the
>> jobmanager/taskmanager fails or the whole cluster crashes, it could
>> always recover from the latest
>> checkpoint. Does this meed your requirement?
>>
>> Best,
>> Yang
>>
>> Sean Hester  于2019年10月1日周二 上午1:47写道:
>>
>>> Vijay,
>>>
>>> That is my understanding as well: the HA solution only solves the
>>> problem up to the point all job managers fail/restart at the same time.
>>> That's where my original concern was.
>>>
>>> But to Aleksandar and Yun's point, running in HA with 2 or 3 Job
>>> Managers per cluster--as long as they are all deployed to separate GKE
>>> nodes--would provide a very high uptime/low failure rate, at least on
>>> paper. It's a promising enough option that we're going to run in HA for a
>>> month or two and monitor results before we put in any extra work to
>>> customize the savepoint start-up behavior.
>>>
>>> On Fri, Sep 27, 2019 at 2:24 AM Vijay Bhaskar 
>>> wrote:
>>>
 I don't think HA will help to recover from cluster crash, for that we
 should take periodic savepoint right? Please correct me in case i am wrong

 Regards
 Bhaskar

 On Fri, Sep 27, 2019 at 11:48 AM Vijay Bhaskar <
 bhaskar.eba...@gmail.com> wrote:

> Suppose my cluster got crashed and need to bring up the entire cluster
> back? Does HA still helps to run the cluster from latest save point?
>
> Regards
> Bhaskar
>
> On Thu, Sep 26, 2019 at 7:44 PM Sean Hester <
> sean.hes...@bettercloud.com> wrote:
>
>> thanks to everyone for all the replies.
>>
>> i think the original concern here with "just" relying on the HA
>> option is that there are some disaster recovery and data center migration
>> use cases where the continuity of the job managers is difficult to
>> preserve. but those are admittedly very edgy use cases. i think it's
>> definitely worth reviewing the SLAs with our site reliability engineers 
>> to
>> see how likely it would be to completely lose all job managers under an 
>> HA
>> configuration. that small a risk might be acceptable/preferable to a
>> one-off solution.
>>
>> @Aleksander, would love to learn more about Zookeeper-less HA. i
>> think i spotted a thread somewhere between Till and someone (perhaps you)
>> about that. feel free to DM me.
>>
>> thanks again to everyone!
>>
>> On Thu, Sep 26, 2019 at 7:32 AM Yang Wang 
>> wrote:
>>
>>> Hi, Aleksandar
>>>
>>> Savepoint option in standalone job cluster is optional. If you want
>>> to always recover
>>> from the latest checkpoint, just as Aleksandar and Yun Tang said you
>>> could use the
>>> high-availability configuration. Make sure the cluster-id is not
>>> changed, i think the job
>>> could recover both at exceptionally crash and restart by expectation.
>>>
>>> @Aleksandar Mastilovic , we are also
>>> have an zookeeper-less high-availability implementation[1].
>>> Maybe we could have some discussion and contribute this useful
>>> feature to the community.
>>>
>>> [1].
>>> https://docs.google.com/document/d/1Z-VdJlPPEQoWT1WLm5woM4y0bFep4FrgdJ9ipQuRv8g/edit
>>>
>>> Best,
>>> Yang
>>>
>>> Aleksandar Mastilovic  于2019年9月26日周四
>>> 上午4:11写道:
>>>
 Would 

flink checkpoint超时问题

2019-10-10 Thread lg
各位好:
   我在使用flink的过程中遇到了下面的问题,刚开始接触flink,对排查问题的思路不是很清晰,麻烦大家提供下思路哈,谢谢。


   应用场景:我这里使用Standalone Cluster方式搭建了一flink集群,其中Task Managers=10,Task Slots=54。
flink的stream过程大致为: kafka topic1的数据 --> 异步调用外部资源对数据填充 --> 
存入kafka topic2。
其中topic1的分区有100个,数据是平均分布的。topic2的分区有50个。
异步调用使用的flink 
RichAsyncFunction,无序。外部资源就是一个web服务,但是业务逻辑很复杂,处理时间比较久,平均5秒左右,使用nginx代理了多个节点进行服务。
   提交任务时指定的并发度为50.


遇到的问题:checkpoint始终超时,而且执行时间非常久。使用bin/flink savepoint 
xxx也会超时。取消了之前的任务,将CheckpointTimeout重新设置为3小时目前成功,之前是1小时全部失败。看了其他的资料,推测是背压造成的。


checkpoint配置:
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1000 * 60 * 3); //3m
env.getCheckpointConfig().setCheckpointTimeout(1000 * 60 * 180); 
//3h,之前配置的1小时,都失败了,这次改为3小时。
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000  * 10); //间隔10s
StateBackend backend = new RocksDBStateBackend(filebackend, true); 
//使用RocksDBStateBackend增量
env.setStateBackend(backend);


求助:请问遇到这个问题该怎么排查呢?为什么checkpoint时间会消耗这么长,可能的原因是什么呢?查看上游背压一直是high,下游因为要调用外部web服务所以消费速度低,怎么解决这个问题呢?请各位提供下建议,谢谢哈。


web ui截图:
刚刚看了下,运行了1个多小时的checkpoint终于成功了一个。
背压情况: 上游一直是high,不正常




祝各位一切顺利!





Re: State & Fault Tolerance in Table API and SQL Implementations

2019-10-10 Thread Dawid Wysakowicz
Hi Vaibhav,

I am not sure if there are specific documentation parts about state
handling in Table API. There are just a few important facts that you
must be aware of:

* in a failover scenario, everything should work just fine, internally
Table API uses Flink's state and all intermediate results should be
successfully restored after failure.

* in case of query update or Flink's version update there is no
guarantee that the resulting execution plan will remain the same (or
have the same uids), therefore there are no guarantees that a previous
state can be mapped to the new plan

I hope this clarifies a bit how Table API interacts with Flink's state.

Best,

Dawid

On 08/10/2019 12:09, Vaibhav Singh wrote:
> Hi,
>
> We are looking into a production use case of using Flink, to process
> multiple streams of data from Kafka topics.
>
> We plan to perform joins on these streams and then output aggregations
> on that data. We plan to use the Table API and SQL capabilities for this.
> We need to prepare a plan to productionize this flow, and were looking
> into how Flink features like Checkpoints and Savepoints and state
> management are being utilized here (In case of Table API).
>
> Can you point me towards any documentation/articles/tutorials
> regarding how Flink is doing these in case of the Table API and SQL?
>
> */Thanks and regards!
> /*
> Vaibhav


signature.asc
Description: OpenPGP digital signature


flink1.9 webui exception日志显示问题

2019-10-10 Thread 李杰
log4j.properties为官方默认。
weib ui exception日志一闪而过,ui上看不到历史异常信息
[image: 1.jpg]
[image: 2.jpg]


Re: Challenges Deploying Flink With Savepoints On Kubernetes

2019-10-10 Thread Vijay Bhaskar
Thanks Yang and Sean. I have couple of questions:

1) Suppose the scenario of , bringing back entire cluster,
 a) In that case, at least one job manager out of HA group should be up
and running right? or
 b) All the job managers fails, then also this works? In that case
please let me know the procedure/share the documentation?
 How to start from previous check point?
 What Flink version onwards this feature is stable?

Regards
Bhaskar


On Wed, Oct 9, 2019 at 8:51 AM Yang Wang  wrote:

> Hi Vijay,
>
> If you are using HA solution, i think you do not need to specify the
> savepoint. Instead the checkpoint is used.
> The checkpoint is done automatically and periodically based on your
> configuration.When the
> jobmanager/taskmanager fails or the whole cluster crashes, it could always
> recover from the latest
> checkpoint. Does this meed your requirement?
>
> Best,
> Yang
>
> Sean Hester  于2019年10月1日周二 上午1:47写道:
>
>> Vijay,
>>
>> That is my understanding as well: the HA solution only solves the problem
>> up to the point all job managers fail/restart at the same time. That's
>> where my original concern was.
>>
>> But to Aleksandar and Yun's point, running in HA with 2 or 3 Job Managers
>> per cluster--as long as they are all deployed to separate GKE nodes--would
>> provide a very high uptime/low failure rate, at least on paper. It's a
>> promising enough option that we're going to run in HA for a month or two
>> and monitor results before we put in any extra work to customize the
>> savepoint start-up behavior.
>>
>> On Fri, Sep 27, 2019 at 2:24 AM Vijay Bhaskar 
>> wrote:
>>
>>> I don't think HA will help to recover from cluster crash, for that we
>>> should take periodic savepoint right? Please correct me in case i am wrong
>>>
>>> Regards
>>> Bhaskar
>>>
>>> On Fri, Sep 27, 2019 at 11:48 AM Vijay Bhaskar 
>>> wrote:
>>>
 Suppose my cluster got crashed and need to bring up the entire cluster
 back? Does HA still helps to run the cluster from latest save point?

 Regards
 Bhaskar

 On Thu, Sep 26, 2019 at 7:44 PM Sean Hester <
 sean.hes...@bettercloud.com> wrote:

> thanks to everyone for all the replies.
>
> i think the original concern here with "just" relying on the HA option
> is that there are some disaster recovery and data center migration use
> cases where the continuity of the job managers is difficult to preserve.
> but those are admittedly very edgy use cases. i think it's definitely 
> worth
> reviewing the SLAs with our site reliability engineers to see how likely 
> it
> would be to completely lose all job managers under an HA configuration.
> that small a risk might be acceptable/preferable to a one-off solution.
>
> @Aleksander, would love to learn more about Zookeeper-less HA. i
> think i spotted a thread somewhere between Till and someone (perhaps you)
> about that. feel free to DM me.
>
> thanks again to everyone!
>
> On Thu, Sep 26, 2019 at 7:32 AM Yang Wang 
> wrote:
>
>> Hi, Aleksandar
>>
>> Savepoint option in standalone job cluster is optional. If you want
>> to always recover
>> from the latest checkpoint, just as Aleksandar and Yun Tang said you
>> could use the
>> high-availability configuration. Make sure the cluster-id is not
>> changed, i think the job
>> could recover both at exceptionally crash and restart by expectation.
>>
>> @Aleksandar Mastilovic , we are also
>> have an zookeeper-less high-availability implementation[1].
>> Maybe we could have some discussion and contribute this useful
>> feature to the community.
>>
>> [1].
>> https://docs.google.com/document/d/1Z-VdJlPPEQoWT1WLm5woM4y0bFep4FrgdJ9ipQuRv8g/edit
>>
>> Best,
>> Yang
>>
>> Aleksandar Mastilovic  于2019年9月26日周四
>> 上午4:11写道:
>>
>>> Would you guys (Flink devs) be interested in our solution for
>>> zookeeper-less HA? I could ask the managers how they feel about
>>> open-sourcing the improvement.
>>>
>>> On Sep 25, 2019, at 11:49 AM, Yun Tang  wrote:
>>>
>>> As Aleksandar said, k8s with HA configuration could solve your
>>> problem. There already have some discussion about how to implement such 
>>> HA
>>> in k8s if we don't have a zookeeper service: FLINK-11105 [1] and
>>> FLINK-12884 [2]. Currently, you might only have to choose zookeeper as
>>> high-availability service.
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-11105
>>> [2] https://issues.apache.org/jira/browse/FLINK-12884
>>>
>>> Best
>>> Yun Tang
>>> --
>>> *From:* Aleksandar Mastilovic 
>>> *Sent:* Thursday, September 26, 2019 1:57
>>> *To:* Sean Hester 
>>> *Cc:* Hao Sun ; Yuval Itzchakov <
>>> yuva...@gmail.com>; user 
>>> *Subject:* Re: Challenges Deploying Flink With