Re: [DISCUSS] FLIP-144: Native Kubernetes HA for Flink

2020-09-30 Thread Yang Wang
2. Yes. This is exactly what I mean. Storing the HA information relevant to
a specific component in a single ConfigMap and ensuring that “Get(check the
leader)-and-Update(write back to the ConfigMap)” is a transactional
operation. Since we only store the job graph stateHandler(not the real
data) in the ConfigMap, I think 1MB is big enough for the dispater-leader
ConfigMap(the biggest one with multiple jobs). I roughly calculate that
could we have more than 1000 Flink jobs in a Flink session cluster.

3. Actually, K8s has a stronger guarantee than YARN. And the StatefulSet
could provide at most one semantics if no manually force-deletion
happened[1]. Based on the previous discussion, we have successfully avoided
the "lock-and-release" in the implementation. So I still insist on using
the current Deployment.


[1].
https://kubernetes.io/docs/tasks/run-application/force-delete-stateful-set-pod/#force-deletion


Best,
Yang

Till Rohrmann  于2020年9月30日周三 下午11:57写道:

> Thanks for the clarifications Yang Wang.
>
> 2. Keeping the HA information relevant for a component (Dispatcher,
> JobManager, ResourceManager) in a single ConfigMap sounds good. We should
> check that we don't exceed the 1 MB size limit with this approach though.
> The Dispatcher's ConfigMap would then contain the current leader, the
> running jobs and the pointers to the persisted JobGraphs. The JobManager's
> ConfigMap would then contain the current leader, the pointers to the
> checkpoints and the checkpoint ID counter, for example.
>
> 3. Ah ok, I somehow thought that K8s would give us stronger
> guarantees than Yarn in this regard. That's a pity.
>
> Cheers,
> Till
>
> On Wed, Sep 30, 2020 at 10:03 AM tison  wrote:
>
>> Thanks for your explanation. It would be fine if only checking leadership
>> & actually write information is atomic.
>>
>> Best,
>> tison.
>>
>>
>> Yang Wang  于2020年9月30日周三 下午3:57写道:
>>
>>> Thanks till and tison for your comments.
>>>
>>> @Till Rohrmann 
>>> 1. I am afraid we could not do this if we are going to use fabric8
>>> Kubernetes client SDK for the leader election. The official Kubernetes Java
>>> client[1] also could not support it. Unless we implement a new
>>> LeaderElector in Flink based on the very basic Kubernetes API. But it seems
>>> that we could gain too much from this.
>>>
>>> 2. Yes, the implementation will be a little complicated if we want to
>>> completely eliminate the residual job graphs or checkpoints. Inspired by
>>> your suggestion, another different solution has come into my mind. We could
>>> use a same ConfigMap storing the JobManager leader, job graph,
>>> checkpoint-counter, checkpoint. Each job will have a specific ConfigMap for
>>> the HA meta storage. Then it will be easier to guarantee that only the
>>> leader could write the ConfigMap in a transactional operation. Since
>>> “Get(check the leader)-and-Update(write back to the ConfigMap)” is a
>>> transactional operation.
>>>
>>> 3. Yes, StatefulSet(1) + ConfigMap + HDFS/S3 is also a solution.
>>> However, we still have the chances that two JobManager are running and
>>> trying to get/delete a key in the same ConfigMap concurrently. Imagine that
>>> the kubelet(like NodeManager in YARN) is down, and then the JobManager
>>> could not be deleted. A new JobManager pod will be launched. We are just in
>>> the similar situation as Deployment(1) + ConfigMap + HDFS/S3. The only
>>> benefit is we do not need to implement a leader election/retrieval service.
>>>
>>> @tison
>>> Actually, I do not think we will have such issue in the Kubernetes HA
>>> service. In the Kubernetes LeaderElector[2], we have the leader information
>>> stored on the annotation of leader ConfigMap. So it would not happen the
>>> old leader could wrongly override the leader information. Once a JobManager
>>> want to write his leader information to the ConfigMap, it will check
>>> whether it is the leader now. If not, anything will happen. Moreover, the
>>> Kubernetes Resource Version[3] ensures that no one else has snuck in and
>>> written a different update while the client was in the process of
>>> performing its update.
>>>
>>>
>>> [1].
>>> https://github.com/kubernetes-client/java/blob/master/examples/src/main/java/io/kubernetes/client/examples/LeaderElectionExample.java
>>> [2].
>>> https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java
>>> 
>>> [3].
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink#FLIP144:NativeKubernetesHAforFlink-Resourceversion
>>>
>>>
>>> Best,
>>> Yang
>>>
>>> tison  于2020年9月30日周三 下午3:21写道:
>>>
 Hi,

 Generally +1 for a native k8s HA service.


Re: Streaming SQL Job Switches to FINISHED before all records processed

2020-09-30 Thread Austin Cawley-Edwards
Hey all,

Thanks for your patience. I've got a small repo that reproduces the issue
here: https://github.com/austince/flink-1.10-sql-windowing-error

Not sure what I'm doing wrong but it feels silly.

Thanks so much!
Austin

On Tue, Sep 29, 2020 at 3:48 PM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:

> Hey Till,
>
> Thanks for the reply -- I'll try to see if I can reproduce this in a small
> repo and share it with you.
>
> Best,
> Austin
>
> On Tue, Sep 29, 2020 at 3:58 AM Till Rohrmann 
> wrote:
>
>> Hi Austin,
>>
>> could you share with us the exact job you are running (including the
>> custom window trigger)? This would help us to better understand your
>> problem.
>>
>> I am also pulling in Klou and Timo who might help with the windowing
>> logic and the Table to DataStream conversion.
>>
>> Cheers,
>> Till
>>
>> On Mon, Sep 28, 2020 at 11:49 PM Austin Cawley-Edwards <
>> austin.caw...@gmail.com> wrote:
>>
>>> Hey all,
>>>
>>> I'm not sure if I've missed something in the docs, but I'm having a bit
>>> of trouble with a streaming SQL job that starts w/ raw SQL queries and then
>>> transitions to a more traditional streaming job. I'm on Flink 1.10 using
>>> the Blink planner, running locally with no checkpointing.
>>>
>>> The job looks roughly like:
>>>
>>> CSV 1 -->
>>> CSV 2 -->  SQL Query to Join --> toRetractStream --> keyed time window
>>> w/ process func & custom trigger --> some other ops
>>> CSV 3 -->
>>>
>>>
>>> When I remove the windowing directly after the `toRetractStream`, the
>>> records make it to the "some other ops" stage, but with the windowing,
>>> those operations are sometimes not sent any data. I can also get data sent
>>> to the downstream operators by putting in a no-op map before the window and
>>> placing some breakpoints in there to manually slow down processing.
>>>
>>>
>>> The logs don't seem to indicate anything went wrong and generally look
>>> like:
>>>
>>> 4819 [Source: Custom File source (1/1)] INFO
>>>  org.apache.flink.runtime.taskmanager.Task  - Source: Custom File source
>>> (1/1) (3578629787c777320d9ab030c004abd4) switched from RUNNING to
>>> FINISHED.\4819 [Source: Custom File source (1/1)] INFO
>>>  org.apache.flink.runtime.taskmanager.Task  - Freeing task resources for
>>> Source: Custom File source (1/1) (3578629787c777320d9ab030c004abd4).
>>> 4819 [Source: Custom File source (1/1)] INFO
>>>  org.apache.flink.runtime.taskmanager.Task  - Ensuring all FileSystem
>>> streams are closed for task Source: Custom File source (1/1)
>>> (3578629787c777320d9ab030c004abd4) [FINISHED]
>>> 4820 [flink-akka.actor.default-dispatcher-5] INFO
>>>  org.apache.flink.runtime.taskexecutor.TaskExecutor  - Un-registering task
>>> and sending final execution state FINISHED to JobManager for task Source:
>>> Custom File source (1/1) 3578629787c777320d9ab030c004abd4.
>>> ...
>>> 4996 [Window(TumblingProcessingTimeWindows(6), TimedCountTrigger,
>>> ProcessWindowFunction$1) (1/1)] INFO
>>>  org.apache.flink.runtime.taskmanager.Task  -
>>> Window(TumblingProcessingTimeWindows(6), TimedCountTrigger,
>>> ProcessWindowFunction$1) (1/1) (907acf9bfa2f4a9bbd13997b8b34d91f) switched
>>> from RUNNING to FINISHED.
>>> 4996 [Window(TumblingProcessingTimeWindows(6), TimedCountTrigger,
>>> ProcessWindowFunction$1) (1/1)] INFO
>>>  org.apache.flink.runtime.taskmanager.Task  - Freeing task resources for
>>> Window(TumblingProcessingTimeWindows(6), TimedCountTrigger,
>>> ProcessWindowFunction$1) (1/1) (907acf9bfa2f4a9bbd13997b8b34d91f).
>>> 4996 [Window(TumblingProcessingTimeWindows(6), TimedCountTrigger,
>>> ProcessWindowFunction$1) (1/1)] INFO
>>>  org.apache.flink.runtime.taskmanager.Task  - Ensuring all FileSystem
>>> streams are closed for task Window(TumblingProcessingTimeWindows(6),
>>> TimedCountTrigger, ProcessWindowFunction$1) (1/1)
>>> (907acf9bfa2f4a9bbd13997b8b34d91f) [FINISHED]
>>> ...
>>> rest of the shutdown
>>> ...
>>> Program execution finished
>>> Job with JobID 889b161e432c0e69a8d760bbed205d5d has finished.
>>> Job Runtime: 783 ms
>>>
>>>
>>> Is there something I'm missing in my setup? Could it be my custom window
>>> trigger? Bug? I'm stumped.
>>>
>>>
>>> Thanks,
>>> Austin
>>>
>>>
>>>


Blobserver dying mid-application

2020-09-30 Thread Hailu, Andreas
Hello folks, I'm seeing application failures where our Blobserver is refusing 
connections mid application:

2020-09-30 13:56:06,227 INFO  [flink-akka.actor.default-dispatcher-18] 
org.apache.flink.runtime.taskexecutor.TaskExecutor- Un-registering 
task and sending final execution state FINISHED to JobManager for task DataSink 
(TextOutputFormat 
(hdfs:/user/p2epda/lake/delp_prod/PROD/APPROVED/staging/datastore/MandateTradingLine/tmp_pipeline_collapse)
 - UTF-8) 3d1890b47f4398d805cf0c1b54286f71.
2020-09-30 13:56:06,423 INFO  [flink-akka.actor.default-dispatcher-18] 
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable  - Free slot 
TaskSlot(index:0, state:ACTIVE, resource profile: 
ResourceProfile{cpuCores=1.7976931348623157E308, heapMemoryInMB=2147483647, 
directMemoryInMB=2147483647, nativeMemoryInMB=2147483647, 
networkMemoryInMB=2147483647, managedMemoryInMB=3046}, allocationId: 
e8be16ec74f16c795d95b89cd08f5c37, jobId: e808de0373bd515224434b7ec1efe249).
2020-09-30 13:56:06,424 INFO  [flink-akka.actor.default-dispatcher-18] 
org.apache.flink.runtime.taskexecutor.JobLeaderService- Remove job 
e808de0373bd515224434b7ec1efe249 from job leader monitoring.
2020-09-30 13:56:06,424 INFO  [flink-akka.actor.default-dispatcher-18] 
org.apache.flink.runtime.taskexecutor.TaskExecutor- Close 
JobManager connection for job e808de0373bd515224434b7ec1efe249.
2020-09-30 13:56:06,426 INFO  [flink-akka.actor.default-dispatcher-18] 
org.apache.flink.runtime.taskexecutor.TaskExecutor- Close 
JobManager connection for job e808de0373bd515224434b7ec1efe249.
2020-09-30 13:56:06,426 INFO  [flink-akka.actor.default-dispatcher-18] 
org.apache.flink.runtime.taskexecutor.JobLeaderService- Cannot 
reconnect to job e808de0373bd515224434b7ec1efe249 because it is not registered.
2020-09-30 13:56:09,918 INFO  [CHAIN DataSource (dataset | Read Staging From 
File System | AVRO) -> Map (Map at 
readAvroFileWithFilter(FlinkReadUtils.java:82)) -> Filter (Filter at 
validateData(DAXTask.java:97)) -> FlatMap (FlatMap at 
handleBloomFilter(PreMergeTask.java:187)) -> FlatMap (FlatMap at 
collapsePipelineIfRequired(Task.java:160)) (1/1)] 
org.apache.flink.runtime.blob.BlobClient  - Downloading 
48b8ba9f3de039f74483085f90e5ad64/p-0b27cb203799adb76d2a434ed3d64052d832cff3-6915d0cd0fef97f728cd890986b2bf39
 from d43723-430.dc.gs.com/10.48.128.14:46473 (retry 3)
2020-09-30 13:56:09,920 ERROR [CHAIN DataSource (dataset | Read Staging From 
File System | AVRO) -> Map (Map at 
readAvroFileWithFilter(FlinkReadUtils.java:82)) -> Filter (Filter at 
validateData(DAXTask.java:97)) -> FlatMap (FlatMap at 
handleBloomFilter(PreMergeTask.java:187)) -> FlatMap (FlatMap at 
collapsePipelineIfRequired(Task.java:160)) (1/1)] 
org.apache.flink.runtime.blob.BlobClient  - Failed to fetch 
BLOB 
48b8ba9f3de039f74483085f90e5ad64/p-0b27cb203799adb76d2a434ed3d64052d832cff3-6915d0cd0fef97f728cd890986b2bf39
 from d43723-430.dc.gs.com/10.48.128.14:46473 and store it under 
/fs/htmp/yarn/local/usercache/delp_prod/appcache/application_1599723434208_15328880/blobStore-e2888df1-c7be-4ce6-b6b6-58e7c24a140a/incoming/temp-0004
 Retrying...
2020-09-30 13:56:09,920 INFO  [CHAIN DataSource (dataset | Read Staging From 
File System | AVRO) -> Map (Map at 
readAvroFileWithFilter(FlinkReadUtils.java:82)) -> Filter (Filter at 
validateData(DAXTask.java:97)) -> FlatMap (FlatMap at 
handleBloomFilter(PreMergeTask.java:187)) -> FlatMap (FlatMap at 
collapsePipelineIfRequired(Task.java:160)) (1/1)] 
org.apache.flink.runtime.blob.BlobClient  - Downloading 
48b8ba9f3de039f74483085f90e5ad64/p-0b27cb203799adb76d2a434ed3d64052d832cff3-6915d0cd0fef97f728cd890986b2bf39
 from d43723-430.dc.gs.com/10.48.128.14:46473 (retry 4)
2020-09-30 13:56:09,922 ERROR [CHAIN DataSource (dataset | Read Staging From 
File System | AVRO) -> Map (Map at 
readAvroFileWithFilter(FlinkReadUtils.java:82)) -> Filter (Filter at 
validateData(DAXTask.java:97)) -> FlatMap (FlatMap at 
handleBloomFilter(PreMergeTask.java:187)) -> FlatMap (FlatMap at 
collapsePipelineIfRequired(Task.java:160)) (1/1)] 
org.apache.flink.runtime.blob.BlobClient  - Failed to fetch 
BLOB 
48b8ba9f3de039f74483085f90e5ad64/p-0b27cb203799adb76d2a434ed3d64052d832cff3-6915d0cd0fef97f728cd890986b2bf39
 from d43723-430.dc.gs.com/10.48.128.14:46473 and store it under 
/fs/htmp/yarn/local/usercache/delp_prod/appcache/application_1599723434208_15328880/blobStore-e2888df1-c7be-4ce6-b6b6-58e7c24a140a/incoming/temp-0004
 Retrying...
2020-09-30 13:56:09,922 INFO  [CHAIN DataSource (dataset | Read Staging From 
File System | AVRO) -> Map (Map at 
readAvroFileWithFilter(FlinkReadUtils.java:82)) -> Filter (Filter at 
validateData(DAXTask.java:97)) -> FlatMap (FlatMap at 
handleBloomFilter(PreMergeTask.java:187)) -> FlatMap (FlatMap at 
collapsePipelineIfRequired(Task.java:160)) (1/1)] 
org.apache.flink.runtime.blob.BlobCli

Help with Python Stateful Functions Types

2020-09-30 Thread Clements, Danial C
Hi,

I’m trying to work through an example with Flink Stateful Functions in Python.  
I have a series of custom protobuf messages that I’ve defined but I’m 
struggling with how they should be provided to the runtime so the messages in 
Kafka can be deserialized.  I see,

type: statefun.kafka.io/routable-protobuf-ingress
id: example/names
in the example, but how can I change that to my.namespace.com/IngressMessage?  
Do I need to provide the protobuf compiled JAR in my Python app?

Thanks
Dan

This e-mail, including attachments, may include confidential and/or
proprietary information, and may be used only by the person or entity
to which it is addressed. If the reader of this e-mail is not the intended
recipient or his or her authorized agent, the reader is hereby notified
that any dissemination, distribution or copying of this e-mail is
prohibited. If you have received this e-mail in error, please notify the
sender by replying to this message and delete this e-mail immediately.


Re: [DISCUSS] FLIP-144: Native Kubernetes HA for Flink

2020-09-30 Thread Till Rohrmann
Thanks for the clarifications Yang Wang.

2. Keeping the HA information relevant for a component (Dispatcher,
JobManager, ResourceManager) in a single ConfigMap sounds good. We should
check that we don't exceed the 1 MB size limit with this approach though.
The Dispatcher's ConfigMap would then contain the current leader, the
running jobs and the pointers to the persisted JobGraphs. The JobManager's
ConfigMap would then contain the current leader, the pointers to the
checkpoints and the checkpoint ID counter, for example.

3. Ah ok, I somehow thought that K8s would give us stronger guarantees than
Yarn in this regard. That's a pity.

Cheers,
Till

On Wed, Sep 30, 2020 at 10:03 AM tison  wrote:

> Thanks for your explanation. It would be fine if only checking leadership
> & actually write information is atomic.
>
> Best,
> tison.
>
>
> Yang Wang  于2020年9月30日周三 下午3:57写道:
>
>> Thanks till and tison for your comments.
>>
>> @Till Rohrmann 
>> 1. I am afraid we could not do this if we are going to use fabric8
>> Kubernetes client SDK for the leader election. The official Kubernetes Java
>> client[1] also could not support it. Unless we implement a new
>> LeaderElector in Flink based on the very basic Kubernetes API. But it seems
>> that we could gain too much from this.
>>
>> 2. Yes, the implementation will be a little complicated if we want to
>> completely eliminate the residual job graphs or checkpoints. Inspired by
>> your suggestion, another different solution has come into my mind. We could
>> use a same ConfigMap storing the JobManager leader, job graph,
>> checkpoint-counter, checkpoint. Each job will have a specific ConfigMap for
>> the HA meta storage. Then it will be easier to guarantee that only the
>> leader could write the ConfigMap in a transactional operation. Since
>> “Get(check the leader)-and-Update(write back to the ConfigMap)” is a
>> transactional operation.
>>
>> 3. Yes, StatefulSet(1) + ConfigMap + HDFS/S3 is also a solution. However,
>> we still have the chances that two JobManager are running and trying to
>> get/delete a key in the same ConfigMap concurrently. Imagine that the
>> kubelet(like NodeManager in YARN) is down, and then the JobManager could
>> not be deleted. A new JobManager pod will be launched. We are just in the
>> similar situation as Deployment(1) + ConfigMap + HDFS/S3. The only benefit
>> is we do not need to implement a leader election/retrieval service.
>>
>> @tison
>> Actually, I do not think we will have such issue in the Kubernetes HA
>> service. In the Kubernetes LeaderElector[2], we have the leader information
>> stored on the annotation of leader ConfigMap. So it would not happen the
>> old leader could wrongly override the leader information. Once a JobManager
>> want to write his leader information to the ConfigMap, it will check
>> whether it is the leader now. If not, anything will happen. Moreover, the
>> Kubernetes Resource Version[3] ensures that no one else has snuck in and
>> written a different update while the client was in the process of
>> performing its update.
>>
>>
>> [1].
>> https://github.com/kubernetes-client/java/blob/master/examples/src/main/java/io/kubernetes/client/examples/LeaderElectionExample.java
>> [2].
>> https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java
>> 
>> [3].
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink#FLIP144:NativeKubernetesHAforFlink-Resourceversion
>>
>>
>> Best,
>> Yang
>>
>> tison  于2020年9月30日周三 下午3:21写道:
>>
>>> Hi,
>>>
>>> Generally +1 for a native k8s HA service.
>>>
>>> For leader election & publish leader information, there was a
>>> discussion[1]
>>> pointed out that since these two actions is NOT atomic, there will be
>>> always
>>> edge case where a previous leader overwrite leader information, even with
>>> versioned write. Versioned write helps on read again if version
>>> mismatches
>>> so if we want version write works, information in the kv pair should
>>> help the
>>> contender reflects whether it is the current leader.
>>>
>>> The idea of writes leader information on contender node or something
>>> equivalent makes sense but the details depends on how it is implemented.
>>> General problems are that
>>>
>>> 1. TM might be a bit late before it updated correct leader information
>>> but
>>> only if the leader election process is short and leadership is stable at
>>> most
>>> time, it won't be a serious issue.
>>> 2. The process TM extract leader information might be a bit more complex
>>> than directly watching a fixed key.
>>>
>>> Atomic issue can be addressed if one leverages low APIs such as lease &
>>> txn
>>

Re: [DISCUSS] FLIP-144: Native Kubernetes HA for Flink

2020-09-30 Thread tison
Thanks for your explanation. It would be fine if only checking leadership &
actually write information is atomic.

Best,
tison.


Yang Wang  于2020年9月30日周三 下午3:57写道:

> Thanks till and tison for your comments.
>
> @Till Rohrmann 
> 1. I am afraid we could not do this if we are going to use fabric8
> Kubernetes client SDK for the leader election. The official Kubernetes Java
> client[1] also could not support it. Unless we implement a new
> LeaderElector in Flink based on the very basic Kubernetes API. But it seems
> that we could gain too much from this.
>
> 2. Yes, the implementation will be a little complicated if we want to
> completely eliminate the residual job graphs or checkpoints. Inspired by
> your suggestion, another different solution has come into my mind. We could
> use a same ConfigMap storing the JobManager leader, job graph,
> checkpoint-counter, checkpoint. Each job will have a specific ConfigMap for
> the HA meta storage. Then it will be easier to guarantee that only the
> leader could write the ConfigMap in a transactional operation. Since
> “Get(check the leader)-and-Update(write back to the ConfigMap)” is a
> transactional operation.
>
> 3. Yes, StatefulSet(1) + ConfigMap + HDFS/S3 is also a solution. However,
> we still have the chances that two JobManager are running and trying to
> get/delete a key in the same ConfigMap concurrently. Imagine that the
> kubelet(like NodeManager in YARN) is down, and then the JobManager could
> not be deleted. A new JobManager pod will be launched. We are just in the
> similar situation as Deployment(1) + ConfigMap + HDFS/S3. The only benefit
> is we do not need to implement a leader election/retrieval service.
>
> @tison
> Actually, I do not think we will have such issue in the Kubernetes HA
> service. In the Kubernetes LeaderElector[2], we have the leader information
> stored on the annotation of leader ConfigMap. So it would not happen the
> old leader could wrongly override the leader information. Once a JobManager
> want to write his leader information to the ConfigMap, it will check
> whether it is the leader now. If not, anything will happen. Moreover, the
> Kubernetes Resource Version[3] ensures that no one else has snuck in and
> written a different update while the client was in the process of
> performing its update.
>
>
> [1].
> https://github.com/kubernetes-client/java/blob/master/examples/src/main/java/io/kubernetes/client/examples/LeaderElectionExample.java
> [2].
> https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java
> 
> [3].
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink#FLIP144:NativeKubernetesHAforFlink-Resourceversion
>
>
> Best,
> Yang
>
> tison  于2020年9月30日周三 下午3:21写道:
>
>> Hi,
>>
>> Generally +1 for a native k8s HA service.
>>
>> For leader election & publish leader information, there was a
>> discussion[1]
>> pointed out that since these two actions is NOT atomic, there will be
>> always
>> edge case where a previous leader overwrite leader information, even with
>> versioned write. Versioned write helps on read again if version mismatches
>> so if we want version write works, information in the kv pair should help
>> the
>> contender reflects whether it is the current leader.
>>
>> The idea of writes leader information on contender node or something
>> equivalent makes sense but the details depends on how it is implemented.
>> General problems are that
>>
>> 1. TM might be a bit late before it updated correct leader information
>> but
>> only if the leader election process is short and leadership is stable at
>> most
>> time, it won't be a serious issue.
>> 2. The process TM extract leader information might be a bit more complex
>> than directly watching a fixed key.
>>
>> Atomic issue can be addressed if one leverages low APIs such as lease &
>> txn
>> but it causes more developing efforts. ConfigMap and encapsulated
>> interface,
>> thought, provides only a self-consistent mechanism which doesn't promise
>> more consistency for extension.
>>
>> Best,
>> tison.
>>
>> [1]
>> https://lists.apache.org/x/thread.html/594b66ecb1d60b560a5c4c08ed1b2a67bc29143cb4e8d368da8c39b2@%3Cuser.zookeeper.apache.org%3E
>>
>>
>>
>> Till Rohrmann  于2020年9月29日周二 下午9:25写道:
>>
>>> For 1. I was wondering whether we can't write the leader connection
>>> information directly when trying to obtain the leadership (trying to
>>> update
>>> the leader key with one's own value)? This might be a little detail,
>>> though.
>>>
>>> 2. Alright, so we are having a similar mechanism as we have in ZooKeeper
>>> with the ephemeral lock nodes. I guess that this complicates the
>>>

Re: [DISCUSS] FLIP-144: Native Kubernetes HA for Flink

2020-09-30 Thread Yang Wang
Thanks till and tison for your comments.

@Till Rohrmann 
1. I am afraid we could not do this if we are going to use fabric8
Kubernetes client SDK for the leader election. The official Kubernetes Java
client[1] also could not support it. Unless we implement a new
LeaderElector in Flink based on the very basic Kubernetes API. But it seems
that we could gain too much from this.

2. Yes, the implementation will be a little complicated if we want to
completely eliminate the residual job graphs or checkpoints. Inspired by
your suggestion, another different solution has come into my mind. We could
use a same ConfigMap storing the JobManager leader, job graph,
checkpoint-counter, checkpoint. Each job will have a specific ConfigMap for
the HA meta storage. Then it will be easier to guarantee that only the
leader could write the ConfigMap in a transactional operation. Since
“Get(check the leader)-and-Update(write back to the ConfigMap)” is a
transactional operation.

3. Yes, StatefulSet(1) + ConfigMap + HDFS/S3 is also a solution. However,
we still have the chances that two JobManager are running and trying to
get/delete a key in the same ConfigMap concurrently. Imagine that the
kubelet(like NodeManager in YARN) is down, and then the JobManager could
not be deleted. A new JobManager pod will be launched. We are just in the
similar situation as Deployment(1) + ConfigMap + HDFS/S3. The only benefit
is we do not need to implement a leader election/retrieval service.

@tison
Actually, I do not think we will have such issue in the Kubernetes HA
service. In the Kubernetes LeaderElector[2], we have the leader information
stored on the annotation of leader ConfigMap. So it would not happen the
old leader could wrongly override the leader information. Once a JobManager
want to write his leader information to the ConfigMap, it will check
whether it is the leader now. If not, anything will happen. Moreover, the
Kubernetes Resource Version[3] ensures that no one else has snuck in and
written a different update while the client was in the process of
performing its update.


[1].
https://github.com/kubernetes-client/java/blob/master/examples/src/main/java/io/kubernetes/client/examples/LeaderElectionExample.java
[2].
https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java

[3].
https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink#FLIP144:NativeKubernetesHAforFlink-Resourceversion


Best,
Yang

tison  于2020年9月30日周三 下午3:21写道:

> Hi,
>
> Generally +1 for a native k8s HA service.
>
> For leader election & publish leader information, there was a discussion[1]
> pointed out that since these two actions is NOT atomic, there will be
> always
> edge case where a previous leader overwrite leader information, even with
> versioned write. Versioned write helps on read again if version mismatches
> so if we want version write works, information in the kv pair should help
> the
> contender reflects whether it is the current leader.
>
> The idea of writes leader information on contender node or something
> equivalent makes sense but the details depends on how it is implemented.
> General problems are that
>
> 1. TM might be a bit late before it updated correct leader information but
> only if the leader election process is short and leadership is stable at
> most
> time, it won't be a serious issue.
> 2. The process TM extract leader information might be a bit more complex
> than directly watching a fixed key.
>
> Atomic issue can be addressed if one leverages low APIs such as lease & txn
> but it causes more developing efforts. ConfigMap and encapsulated
> interface,
> thought, provides only a self-consistent mechanism which doesn't promise
> more consistency for extension.
>
> Best,
> tison.
>
> [1]
> https://lists.apache.org/x/thread.html/594b66ecb1d60b560a5c4c08ed1b2a67bc29143cb4e8d368da8c39b2@%3Cuser.zookeeper.apache.org%3E
>
>
>
> Till Rohrmann  于2020年9月29日周二 下午9:25写道:
>
>> For 1. I was wondering whether we can't write the leader connection
>> information directly when trying to obtain the leadership (trying to
>> update
>> the leader key with one's own value)? This might be a little detail,
>> though.
>>
>> 2. Alright, so we are having a similar mechanism as we have in ZooKeeper
>> with the ephemeral lock nodes. I guess that this complicates the
>> implementation a bit, unfortunately.
>>
>> 3. Wouldn't the StatefulSet solution also work without a PV? One could
>> configure a different persistent storage like HDFS or S3 for storing the
>> checkpoints and job blobs like in the ZooKeeper case. The current benefit
>> I
>> see is that we avoid having to implement thi

Re: [DISCUSS] FLIP-144: Native Kubernetes HA for Flink

2020-09-30 Thread tison
Hi,

Generally +1 for a native k8s HA service.

For leader election & publish leader information, there was a discussion[1]
pointed out that since these two actions is NOT atomic, there will be always
edge case where a previous leader overwrite leader information, even with
versioned write. Versioned write helps on read again if version mismatches
so if we want version write works, information in the kv pair should help
the
contender reflects whether it is the current leader.

The idea of writes leader information on contender node or something
equivalent makes sense but the details depends on how it is implemented.
General problems are that

1. TM might be a bit late before it updated correct leader information but
only if the leader election process is short and leadership is stable at
most
time, it won't be a serious issue.
2. The process TM extract leader information might be a bit more complex
than directly watching a fixed key.

Atomic issue can be addressed if one leverages low APIs such as lease & txn
but it causes more developing efforts. ConfigMap and encapsulated interface,
thought, provides only a self-consistent mechanism which doesn't promise
more consistency for extension.

Best,
tison.

[1]
https://lists.apache.org/x/thread.html/594b66ecb1d60b560a5c4c08ed1b2a67bc29143cb4e8d368da8c39b2@%3Cuser.zookeeper.apache.org%3E



Till Rohrmann  于2020年9月29日周二 下午9:25写道:

> For 1. I was wondering whether we can't write the leader connection
> information directly when trying to obtain the leadership (trying to update
> the leader key with one's own value)? This might be a little detail,
> though.
>
> 2. Alright, so we are having a similar mechanism as we have in ZooKeeper
> with the ephemeral lock nodes. I guess that this complicates the
> implementation a bit, unfortunately.
>
> 3. Wouldn't the StatefulSet solution also work without a PV? One could
> configure a different persistent storage like HDFS or S3 for storing the
> checkpoints and job blobs like in the ZooKeeper case. The current benefit I
> see is that we avoid having to implement this multi locking mechanism in
> the ConfigMaps using the annotations because we can be sure that there is
> only a single leader at a time if I understood the guarantees of K8s
> correctly.
>
> Cheers,
> Till
>
> On Tue, Sep 29, 2020 at 8:10 AM Yang Wang  wrote:
>
> > Hi Till, thanks for your valuable feedback.
> >
> > 1. Yes, leader election and storing leader information will use a same
> > ConfigMap. When a contender successfully performs a versioned annotation
> > update operation to the ConfigMap, it means that it has been elected as
> the
> > leader. And it will write the leader information in the callback of
> leader
> > elector[1]. The Kubernetes resource version will help us to avoid the
> > leader ConfigMap is wrongly updated.
> >
> > 2. The lock and release is really a valid concern. Actually in current
> > design, we could not guarantee that the node who tries to write his
> > ownership is the real leader. Who writes later, who is the owner. To
> > address this issue, we need to store all the owners of the key. Only when
> > the owner is empty, the specific key(means a checkpoint or job graph)
> could
> > be deleted. However, we may have a residual checkpoint or job graph when
> > the old JobManager crashed exceptionally and do not release the lock. To
> > solve this problem completely, we need a timestamp renew mechanism
> > for CompletedCheckpointStore and JobGraphStore, which could help us to
> the
> > check the JobManager timeout and then clean up the residual keys.
> >
> > 3. Frankly speaking, I am not against with this solution. However, in my
> > opinion, it is more like a temporary proposal. We could use StatefulSet
> to
> > avoid leader election and leader retrieval. But I am not sure whether
> > TaskManager could properly handle the situation that same hostname with
> > different IPs, because the JobManager failed and relaunched. Also we may
> > still have two JobManagers running in some corner cases(e.g. kubelet is
> > down but the pod is running). Another concern is we have a strong
> > dependency on the PersistentVolume(aka PV) in FileSystemHAService. But it
> > is not always true especially in self-build Kubernetes cluster. Moreover,
> > PV provider should guarantee that each PV could only be mounted once.
> Since
> > the native HA proposal could cover all the functionality of StatefulSet
> > proposal, that's why I prefer the former.
> >
> >
> > [1].
> >
> https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java#L70
> >
> > Best,
> > Yang
> >
> > Till Rohrmann  于2020年9月28日周一 下午9:29写道:
> >
> >> Thanks for creating this FLIP Yang Wang. I believe that many of our
> users
> >> will like a ZooKeeper-less HA setup.
> >>
> >> +1 for not separating the leader information and the leader election if
> >> possibl

Re:Flink on k8s

2020-09-30 Thread superainbower
And I got this error log 


Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: 
Hadoop is not in the classpath/dependencies.
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could 
not find a file system implementation for scheme 'hdfs'. The scheme is not 
directly supported by Flink and no Hadoop file system to support this scheme 
could be loaded. For a full list of supported file systems, please 
seehttps://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.




| |
superainbower
|
|
superainbo...@163.com
|
签名由网易邮箱大师定制


On 09/30/2020 14:48,superainbower wrote:
Hi
How to configure statebackend when I deploy flink on k8s , I just add the 
following to flink-conf.yaml, but it doesn’t work


state.backend: rocksdb
state.checkpoints.dir: hdfs://slave2:8020/flink/checkpoints
state.savepoints.dir: hdfs://slave2:8020/flink/savepoints
state.backend.incremental: true


| |
superainbower
|
|
superainbo...@163.com
|
签名由网易邮箱大师定制