Re: [DISCUSS] FLIP-144: Native Kubernetes HA for Flink

2020-09-28 Thread Yang Wang
Hi Till, thanks for your valuable feedback.

1. Yes, leader election and storing leader information will use a same
ConfigMap. When a contender successfully performs a versioned annotation
update operation to the ConfigMap, it means that it has been elected as the
leader. And it will write the leader information in the callback of leader
elector[1]. The Kubernetes resource version will help us to avoid the
leader ConfigMap is wrongly updated.

2. The lock and release is really a valid concern. Actually in current
design, we could not guarantee that the node who tries to write his
ownership is the real leader. Who writes later, who is the owner. To
address this issue, we need to store all the owners of the key. Only when
the owner is empty, the specific key(means a checkpoint or job graph) could
be deleted. However, we may have a residual checkpoint or job graph when
the old JobManager crashed exceptionally and do not release the lock. To
solve this problem completely, we need a timestamp renew mechanism
for CompletedCheckpointStore and JobGraphStore, which could help us to the
check the JobManager timeout and then clean up the residual keys.

3. Frankly speaking, I am not against with this solution. However, in my
opinion, it is more like a temporary proposal. We could use StatefulSet to
avoid leader election and leader retrieval. But I am not sure whether
TaskManager could properly handle the situation that same hostname with
different IPs, because the JobManager failed and relaunched. Also we may
still have two JobManagers running in some corner cases(e.g. kubelet is
down but the pod is running). Another concern is we have a strong
dependency on the PersistentVolume(aka PV) in FileSystemHAService. But it
is not always true especially in self-build Kubernetes cluster. Moreover,
PV provider should guarantee that each PV could only be mounted once. Since
the native HA proposal could cover all the functionality of StatefulSet
proposal, that's why I prefer the former.


[1].
https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java#L70

Best,
Yang

Till Rohrmann  于2020年9月28日周一 下午9:29写道:

> Thanks for creating this FLIP Yang Wang. I believe that many of our users
> will like a ZooKeeper-less HA setup.
>
> +1 for not separating the leader information and the leader election if
> possible. Maybe it is even possible that the contender writes his leader
> information directly when trying to obtain the leadership by performing a
> versioned write operation.
>
> Concerning the lock and release operation I have a question: Can there be
> multiple owners for a given key-value pair in a ConfigMap? If not, how can
> we ensure that the node which writes his ownership is actually the leader
> w/o transactional support from K8s? In ZooKeeper we had the same problem
> (we should probably change it at some point to simply use a
> transaction which checks whether the writer is still the leader) and
> therefore introduced the ephemeral lock nodes. What they allow is that
> there can be multiple owners of a given ZNode at a time. The last owner
> will then be responsible for the cleanup of the node.
>
> I see the benefit of your proposal over the stateful set proposal because
> it can support multiple standby JMs. Given the problem of locking key-value
> pairs it might be simpler to start with this approach where we only have
> single JM. This might already add a lot of benefits for our users. Was
> there a specific reason why you discarded this proposal (other than
> generality)?
>
> @Uce it would be great to hear your feedback on the proposal since you
> already implemented a K8s based HA service.
>
> Cheers,
> Till
>
> On Thu, Sep 17, 2020 at 5:06 AM Yang Wang  wrote:
>
>> Hi Xintong and Stephan,
>>
>> Thanks a lot for your attention on this FLIP. I will address the comments
>> inline.
>>
>> # Architecture -> One or two ConfigMaps
>>
>> Both of you are right. One ConfigMap will make the design and
>> implementation easier. Actually, in my POC codes,
>> I am using just one ConfigMap(e.g. "k8s-ha-app1-restserver" for rest
>> server component) for the leader election
>> and storage. Once a JobManager win the election, it will update the
>> ConfigMap with leader address and periodically
>> renew the lock annotation to keep as the active leader. I will update the
>> FLIP document, including the architecture diagram,
>> to avoid the misunderstanding.
>>
>>
>> # HA storage > Lock and release
>>
>> This is a valid concern. Since for Zookeeper ephemeral nodes, it will be
>> deleted by the ZK server automatically when
>> the client is timeout. It could happen in a bad network environment or
>> the ZK client crashed exceptionally. For Kubernetes,
>> we need to implement a similar mechanism. First, when we want to lock a
>> specific key in ConfigMap, we will put the owner identify,
>> lease duration,

Streaming SQL Job Switches to FINISHED before all records processed

2020-09-28 Thread Austin Cawley-Edwards
Hey all,

I'm not sure if I've missed something in the docs, but I'm having a bit of
trouble with a streaming SQL job that starts w/ raw SQL queries and then
transitions to a more traditional streaming job. I'm on Flink 1.10 using
the Blink planner, running locally with no checkpointing.

The job looks roughly like:

CSV 1 -->
CSV 2 -->  SQL Query to Join --> toRetractStream --> keyed time window w/
process func & custom trigger --> some other ops
CSV 3 -->


When I remove the windowing directly after the `toRetractStream`, the
records make it to the "some other ops" stage, but with the windowing,
those operations are sometimes not sent any data. I can also get data sent
to the downstream operators by putting in a no-op map before the window and
placing some breakpoints in there to manually slow down processing.


The logs don't seem to indicate anything went wrong and generally look like:

4819 [Source: Custom File source (1/1)] INFO
 org.apache.flink.runtime.taskmanager.Task  - Source: Custom File source
(1/1) (3578629787c777320d9ab030c004abd4) switched from RUNNING to
FINISHED.\4819 [Source: Custom File source (1/1)] INFO
 org.apache.flink.runtime.taskmanager.Task  - Freeing task resources for
Source: Custom File source (1/1) (3578629787c777320d9ab030c004abd4).
4819 [Source: Custom File source (1/1)] INFO
 org.apache.flink.runtime.taskmanager.Task  - Ensuring all FileSystem
streams are closed for task Source: Custom File source (1/1)
(3578629787c777320d9ab030c004abd4) [FINISHED]
4820 [flink-akka.actor.default-dispatcher-5] INFO
 org.apache.flink.runtime.taskexecutor.TaskExecutor  - Un-registering task
and sending final execution state FINISHED to JobManager for task Source:
Custom File source (1/1) 3578629787c777320d9ab030c004abd4.
...
4996 [Window(TumblingProcessingTimeWindows(6), TimedCountTrigger,
ProcessWindowFunction$1) (1/1)] INFO
 org.apache.flink.runtime.taskmanager.Task  -
Window(TumblingProcessingTimeWindows(6), TimedCountTrigger,
ProcessWindowFunction$1) (1/1) (907acf9bfa2f4a9bbd13997b8b34d91f) switched
from RUNNING to FINISHED.
4996 [Window(TumblingProcessingTimeWindows(6), TimedCountTrigger,
ProcessWindowFunction$1) (1/1)] INFO
 org.apache.flink.runtime.taskmanager.Task  - Freeing task resources for
Window(TumblingProcessingTimeWindows(6), TimedCountTrigger,
ProcessWindowFunction$1) (1/1) (907acf9bfa2f4a9bbd13997b8b34d91f).
4996 [Window(TumblingProcessingTimeWindows(6), TimedCountTrigger,
ProcessWindowFunction$1) (1/1)] INFO
 org.apache.flink.runtime.taskmanager.Task  - Ensuring all FileSystem
streams are closed for task Window(TumblingProcessingTimeWindows(6),
TimedCountTrigger, ProcessWindowFunction$1) (1/1)
(907acf9bfa2f4a9bbd13997b8b34d91f) [FINISHED]
...
rest of the shutdown
...
Program execution finished
Job with JobID 889b161e432c0e69a8d760bbed205d5d has finished.
Job Runtime: 783 ms


Is there something I'm missing in my setup? Could it be my custom window
trigger? Bug? I'm stumped.


Thanks,
Austin


Re: Checkpoint metadata deleted by Flink after ZK connection issues

2020-09-28 Thread Kostas Kloudas
Hi all,

I will have a look.

Kostas

On Mon, Sep 28, 2020 at 3:56 PM Till Rohrmann  wrote:
>
> Hi Cristian,
>
> thanks for reporting this issue. It looks indeed like a very critical problem.
>
> The problem seems to be that the ApplicationDispatcherBootstrap class 
> produces an exception (that the request job can no longer be found because of 
> a lost ZooKeeper connection) which will be interpreted as a job failure. Due 
> to this interpretation, the cluster will be shut down with a terminal state 
> of FAILED which will cause the HA data to be cleaned up. The exact problem 
> occurs in the JobStatusPollingUtils.getJobResult which is called by 
> ApplicationDispatcherBootstrap.getJobResult().
>
> I think there are two problems here: First of all not every exception 
> bubbling up in the future returned by 
> ApplicationDispatcherBootstrap.fixJobIdAndRunApplicationAsync() indicates a 
> job failure. Some of them can also indicate a framework failure which should 
> not lead to the clean up of HA data. The other problem is that the polling 
> logic cannot properly handle a temporary connection loss to ZooKeeper which 
> is a normal situation.
>
> I am pulling in Aljoscha and Klou who worked on this feature and might be 
> able to propose a solution for these problems. I've also updated the JIRA 
> issue FLINK-19154.
>
> Cheers,
> Till
>
> On Wed, Sep 9, 2020 at 9:00 AM Yang Wang  wrote:
>>
>> > The job sub directory will be cleaned up when the job 
>> > finished/canceled/failed.
>> Since we could submit multiple jobs into a Flink session, what i mean is 
>> when a job
>> reached to the terminal state, the sub node(e.g. 
>> /flink/application_/running_job_registry/4d255397c7aeb5327adb567238c983c1)
>> on the Zookeeper will be cleaned up. But the root 
>> directory(/flink/application_/) still exists.
>>
>>
>> For your current case, it is a different case(perjob cluster). I think we 
>> need to figure out why the only
>> running job reached the terminal state. For example, the restart attempts 
>> are exhausted. And you
>> could find the following logs in your JobManager log.
>>
>> "org.apache.flink.runtime.JobException: Recovery is suppressed by 
>> NoRestartBackoffTimeStrategy"
>>
>>
>> Best,
>> Yang
>>
>>
>>
>>
>> Cristian  于2020年9月9日周三 上午11:26写道:
>>>
>>> > The job sub directory will be cleaned up when the job 
>>> > finished/canceled/failed.
>>>
>>> What does this mean?
>>>
>>> Also, to clarify: I'm a very sloppy developer. My jobs crash ALL the 
>>> time... and yet, the jobs would ALWAYS resume from the last checkpoint.
>>>
>>> The only cases where I expect Flink to clean up the checkpoint data from ZK 
>>> is when I explicitly stop or cancel the job (in those cases the job manager 
>>> takes a savepoint before cleaning up zk and finishing the cluster).
>>>
>>> Which is not the case here. Flink was on autopilot here and decided to wipe 
>>> my poor, good checkpoint metadata as the logs show.
>>>
>>> On Tue, Sep 8, 2020, at 7:59 PM, Yang Wang wrote:
>>>
>>> AFAIK, the HA data, including Zookeeper meta data and real data on DFS, 
>>> will only be cleaned up
>>> when the Flink cluster reached terminated state.
>>>
>>> So if you are using a session cluster, the root cluster node on Zk will be 
>>> cleaned up after you manually
>>> stop the session cluster. The job sub directory will be cleaned up when the 
>>> job finished/canceled/failed.
>>>
>>> If you are using a job/application cluster, once the only running job 
>>> finished/failed, all the HA data will
>>> be cleaned up. I think you need to check the job restart strategy you have 
>>> set. For example, the following
>>> configuration will make the Flink cluster terminated after 10 attempts.
>>>
>>> restart-strategy: fixed-delay
>>> restart-strategy.fixed-delay.attempts: 10
>>>
>>>
>>> Best,
>>> Yang
>>>
>>> Cristian  于2020年9月9日周三 上午12:28写道:
>>>
>>>
>>> I'm using the standalone script to start the cluster.
>>>
>>> As far as I can tell, it's not easy to reproduce. We found that zookeeper 
>>> lost a node around the time this happened, but all of our other 75 Flink 
>>> jobs which use the same setup, version and zookeeper, didn't have any 
>>> issues. They didn't even restart.
>>>
>>> So unfortunately I don't know how to reproduce this. All I know is I can't 
>>> sleep. I have nightmares were my precious state is deleted. I wake up 
>>> crying and quickly start manually savepointing all jobs just in case, 
>>> because I feel the day of reckon is near. Flinkpocalypse!
>>>
>>> On Tue, Sep 8, 2020, at 5:54 AM, Robert Metzger wrote:
>>>
>>> Thanks a lot for reporting this problem here Cristian!
>>>
>>> I am not super familiar with the involved components, but the behavior you 
>>> are describing doesn't sound right to me.
>>> Which entrypoint are you using? This is logged at the beginning, like this: 
>>> "2020-09-08 14:45:32,807 INFO  
>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -  Starting 
>>> StandaloneSessionClusterEntrypo

Re: Checkpoint metadata deleted by Flink after ZK connection issues

2020-09-28 Thread Till Rohrmann
Hi Cristian,

thanks for reporting this issue. It looks indeed like a very critical
problem.

The problem seems to be that the ApplicationDispatcherBootstrap class
produces an exception (that the request job can no longer be found because
of a lost ZooKeeper connection) which will be interpreted as a job failure.
Due to this interpretation, the cluster will be shut down with a terminal
state of FAILED which will cause the HA data to be cleaned up. The exact
problem occurs in the JobStatusPollingUtils.getJobResult which is called by
ApplicationDispatcherBootstrap.getJobResult().

I think there are two problems here: First of all not every exception
bubbling up in the future returned by
ApplicationDispatcherBootstrap.fixJobIdAndRunApplicationAsync() indicates a
job failure. Some of them can also indicate a framework failure which
should not lead to the clean up of HA data. The other problem is that the
polling logic cannot properly handle a temporary connection loss to
ZooKeeper which is a normal situation.

I am pulling in Aljoscha and Klou who worked on this feature and might be
able to propose a solution for these problems. I've also updated the JIRA
issue FLINK-19154.

Cheers,
Till

On Wed, Sep 9, 2020 at 9:00 AM Yang Wang  wrote:

> > The job sub directory will be cleaned up when the job
> finished/canceled/failed.
> Since we could submit multiple jobs into a Flink session, what i mean is
> when a job
> reached to the terminal state, the sub node(e.g.
> /flink/application_/running_job_registry/4d255397c7aeb5327adb567238c983c1)
> on the Zookeeper will be cleaned up. But the root
> directory(/flink/application_/) still exists.
>
>
> For your current case, it is a different case(perjob cluster). I think we
> need to figure out why the only
> running job reached the terminal state. For example, the restart attempts
> are exhausted. And you
> could find the following logs in your JobManager log.
>
> "org.apache.flink.runtime.JobException: Recovery is suppressed by
> NoRestartBackoffTimeStrategy"
>
>
> Best,
> Yang
>
>
>
>
> Cristian  于2020年9月9日周三 上午11:26写道:
>
>> > The job sub directory will be cleaned up when the job
>> finished/canceled/failed.
>>
>> What does this mean?
>>
>> Also, to clarify: I'm a very sloppy developer. My jobs crash ALL the
>> time... and yet, the jobs would ALWAYS resume from the last checkpoint.
>>
>> The only cases where I expect Flink to clean up the checkpoint data from
>> ZK is when I explicitly stop or cancel the job (in those cases the job
>> manager takes a savepoint before cleaning up zk and finishing the cluster).
>>
>> Which is not the case here. Flink was on autopilot here and decided to
>> wipe my poor, good checkpoint metadata as the logs show.
>>
>> On Tue, Sep 8, 2020, at 7:59 PM, Yang Wang wrote:
>>
>> AFAIK, the HA data, including Zookeeper meta data and real data on DFS,
>> will only be cleaned up
>> when the Flink cluster reached terminated state.
>>
>> So if you are using a session cluster, the root cluster node on Zk will
>> be cleaned up after you manually
>> stop the session cluster. The job sub directory will be cleaned up when
>> the job finished/canceled/failed.
>>
>> If you are using a job/application cluster, once the only running job
>> finished/failed, all the HA data will
>> be cleaned up. I think you need to check the job restart strategy you
>> have set. For example, the following
>> configuration will make the Flink cluster terminated after 10 attempts.
>>
>> restart-strategy: fixed-delay
>> restart-strategy.fixed-delay.attempts: 10
>>
>>
>> Best,
>> Yang
>>
>> Cristian  于2020年9月9日周三 上午12:28写道:
>>
>>
>> I'm using the standalone script to start the cluster.
>>
>> As far as I can tell, it's not easy to reproduce. We found that zookeeper
>> lost a node around the time this happened, but all of our other 75 Flink
>> jobs which use the same setup, version and zookeeper, didn't have any
>> issues. They didn't even restart.
>>
>> So unfortunately I don't know how to reproduce this. All I know is I
>> can't sleep. I have nightmares were my precious state is deleted. I wake up
>> crying and quickly start manually savepointing all jobs just in case,
>> because I feel the day of reckon is near. Flinkpocalypse!
>>
>> On Tue, Sep 8, 2020, at 5:54 AM, Robert Metzger wrote:
>>
>> Thanks a lot for reporting this problem here Cristian!
>>
>> I am not super familiar with the involved components, but the behavior
>> you are describing doesn't sound right to me.
>> Which entrypoint are you using? This is logged at the beginning, like
>> this: "2020-09-08 14:45:32,807 INFO
>>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
>>  Starting StandaloneSessionClusterEntrypoint (Version: 1.11.1, Scala: 2.12,
>> Rev:7eb514a, Date:2020-07-15T07:02:09+02:00)"
>>
>> Do you know by chance if this problem is reproducible? With
>> the StandaloneSessionClusterEntrypoint I was not able to reproduce the
>> problem.
>>
>>
>>
>>
>> On Tue, Sep 8, 2020 at 4:00 AM Hu

Re: Flink Batch Processing

2020-09-28 Thread Piotr Nowojski
Hi Sunitha,

First and foremost, the DataSet API will be deprecated soon [1] so I would
suggest trying to migrate to the DataStream API. When using the DataStream
API it doesn't mean that you can not work with bounded inputs - you can.
Flink SQL (Blink planner) is in fact using DataStream API to execute both
streaming and batch queries. Maybe this path would be easier?

And about answering your question using the DataSet API - sorry, I don't
know it :( I will try to ping someone who could help here.

Piotrek

[1]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741

pon., 28 wrz 2020 o 15:14 s_penakalap...@yahoo.com 
napisał(a):

> Hi All,
>
> Need your help in Flink Batch processing: scenario described below:
>
> we have multiple vehicles, we get data from each vehicle at a very high
> speed, 1 record per minute.
> thresholds can be set by the owner for each vehicle.
>
> Say: we have 3 vehicles, threshold is set for 2 vehicles.
> Vehicle 1, threshold 20 hours, allowedPetrolConsumption=15
> vehicle 2, threshold 35 hours, allowedPetrolConsumption=28
> vehicle 3  no threshold set by owner.
>
> All the vehicle data is stored in HBase tables. We have a scheduled Batch
> Job every day at 12 pm to check the status of vehicle movement and Petrol
> consumption against threshold and raise an alert (vehicle1 did not move for
> past 20 hours, vehicle 2 consumed more petrol. )
>
> Since it is a Batch Job, I loaded all threshold data in one DataSet and
> HBase Data in another Dataset using HbaseInputFormat.
>
> What I am failing to figure out is:
> 1> vehicle 1 is having threshold of 20 hours where as vehicle 2 has
> threshold of 35 hours, I need to fetch data from Hbase for different
> scenario. Is there any better approach to get all data using one Hbase
> connection.
> 2> how to apply alert on Dataset.  CEP pattern/ Match_recognize is allowed
> only on DataStream. Please help me with a simple example. (alert can be
> raised if count is zero or like petrol consumption is too high)
>
>
> I could not get any example for Dataset on google where an alert is
> raised. Kindly guide me if there is any better approach
>
> Regards,
> Sunitha.
>


Re: Flink SQL - can I force the planner to reuse a temporary view to be reused across multiple queries that access different fields?

2020-09-28 Thread Timo Walther

Hi Dan,

unfortunetely, it is very difficult to read you plan? Maybe you can 
share a higher resolution and highlight which part of the pipeline is A, 
B etc. In general, the planner should be smart enough to reuse subplans 
where appropriate. Maybe this is a bug or shortcoming in the optimizer 
rules that we can fix.


Piotr's suggestion would work to "materialize" a part of the plan to 
DataStream API such that this part is a black box for the optimizer and 
read only once. Currently, there is no API for performing this in the 
Table API itself.


Regards,
Timo

On 28.09.20 15:13, Piotr Nowojski wrote:

Hi Dan,

Are we talking about Streaming SQL (from the presence of IntervalJoin 
node I presume so)? Are you using blink planner?


I'm not super familiar with the Flink SQL, but my best guess would be 
that if you would "export" the view "A" as a DataStream, then 
re-register it as a new table "A2" and use "A2" in your query, it could 
do the trick. [1]
But I might be wrong or there might be a better way to do it (maybe 
someone else can help here?).


Piotrek

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/common.html#integration-with-datastream-and-dataset-api


sob., 26 wrz 2020 o 00:02 Dan Hill > napisał(a):


I have a temporary views, A and B, and I want to output a union like
the following:
SELECT * FROM ((SELECT ... FROM A) UNION ALL (SELECT ... FROM B JOIN
A ...))

Since the columns being requested in both parts of the union are
different, the planner appears to be separating these out.  A is
pretty complex so I want to reuse A.  Here's the graph for A.  A
bunch of extra join nodes are introduced.

Just A.
Screen Shot 2020-09-22 at 11.14.07 PM.png

How the planner currently handles the union.  It creates a bunch of
inefficient extra join nodes since the columns are slightly different.
Screen Shot 2020-09-23 at 12.24.59 PM.png





Re: Scala: Static methods in interface require -target:jvm-1.8

2020-09-28 Thread Piotr Nowojski
Hi,

It sounds more like an Intellij issue, not a Flink issue. But have you
checked your configured target language level for your modules?

Best regards,
Piotrek

pon., 28 wrz 2020 o 10:57 Lu Weizheng  napisał(a):

> Hi all,
>
> I recently upgraded Intellij IEDA from 2019 to 2020.2 Community Edition. I
> didn’t do anything to Maven.
> My code could compile correctly before. But now I get the following error:
>
> Static methods in interface require -target:jvm-1.8
>
> Probably because I use new WatermarkStrategy Scala API:
>
> .assignTimestampsAndWatermarks(
>   WatermarkStrategy
> .forBoundedOutOfOrderness(Duration.ofSeconds(1))
> .withTimestampAssigner(new SerializableTimestampAssigner[(String,
> Long, Int)] {
>   override def extractTimestamp(t: (String, Long,
> Int), l: Long): Long = t._2
> })
> )
>
> My project have both java and scala code. Here’s my POM.xml file:
>
> 
>  
> net.alchim31.maven
> scala-maven-plugin
> 
> 
>
>   scala-compile-first
>   process-resources
>   
>  compile
>   
>
> 
> 
> 
> 
> 
> 
> 
> 
> 
>  
>
> Is there something I need to add to my POM file?
>
>
>
>
> Best Regards,
> Weizheng Lu
>


Re: [DISCUSS] FLIP-144: Native Kubernetes HA for Flink

2020-09-28 Thread Till Rohrmann
Thanks for creating this FLIP Yang Wang. I believe that many of our users
will like a ZooKeeper-less HA setup.

+1 for not separating the leader information and the leader election if
possible. Maybe it is even possible that the contender writes his leader
information directly when trying to obtain the leadership by performing a
versioned write operation.

Concerning the lock and release operation I have a question: Can there be
multiple owners for a given key-value pair in a ConfigMap? If not, how can
we ensure that the node which writes his ownership is actually the leader
w/o transactional support from K8s? In ZooKeeper we had the same problem
(we should probably change it at some point to simply use a
transaction which checks whether the writer is still the leader) and
therefore introduced the ephemeral lock nodes. What they allow is that
there can be multiple owners of a given ZNode at a time. The last owner
will then be responsible for the cleanup of the node.

I see the benefit of your proposal over the stateful set proposal because
it can support multiple standby JMs. Given the problem of locking key-value
pairs it might be simpler to start with this approach where we only have
single JM. This might already add a lot of benefits for our users. Was
there a specific reason why you discarded this proposal (other than
generality)?

@Uce it would be great to hear your feedback on the proposal since you
already implemented a K8s based HA service.

Cheers,
Till

On Thu, Sep 17, 2020 at 5:06 AM Yang Wang  wrote:

> Hi Xintong and Stephan,
>
> Thanks a lot for your attention on this FLIP. I will address the comments
> inline.
>
> # Architecture -> One or two ConfigMaps
>
> Both of you are right. One ConfigMap will make the design and
> implementation easier. Actually, in my POC codes,
> I am using just one ConfigMap(e.g. "k8s-ha-app1-restserver" for rest
> server component) for the leader election
> and storage. Once a JobManager win the election, it will update the
> ConfigMap with leader address and periodically
> renew the lock annotation to keep as the active leader. I will update the
> FLIP document, including the architecture diagram,
> to avoid the misunderstanding.
>
>
> # HA storage > Lock and release
>
> This is a valid concern. Since for Zookeeper ephemeral nodes, it will be
> deleted by the ZK server automatically when
> the client is timeout. It could happen in a bad network environment or the
> ZK client crashed exceptionally. For Kubernetes,
> we need to implement a similar mechanism. First, when we want to lock a
> specific key in ConfigMap, we will put the owner identify,
> lease duration, renew time in the ConfigMap annotation. The annotation
> will be cleaned up when releasing the lock. When
> we want to remove a job graph or checkpoints, it should satisfy the
> following conditions. If not, the delete operation could not be done.
> * Current instance is the owner of the key.
> * The owner annotation is empty, which means the owner has released the
> lock.
> * The owner annotation timed out, which usually indicate the owner died.
>
>
> # HA storage > HA data clean up
>
> Sorry for that I do not describe how the HA related ConfigMap is retained
> clearly. Benefit from the Kubernetes OwnerReference[1],
> we set owner of the flink-conf configmap, service and TaskManager pods to
> JobManager Deployment. So when we want to
> destroy a Flink cluster, we just need to delete the deployment[2]. For the
> HA related ConfigMaps, we do not set the owner
> so that they could be retained even though we delete the whole Flink
> cluster.
>
>
> [1].
> https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/
> [2].
> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/native_kubernetes.html#stop-flink-session
>
>
> Best,
> Yang
>
>
> Stephan Ewen  于2020年9月16日周三 下午8:16写道:
>
>> This is a very cool feature proposal.
>>
>> One lesson-learned from the ZooKeeper-based HA is that it is overly
>> complicated to have the Leader RPC address in a different node than the
>> LeaderLock. There is extra code needed to make sure these converge and the
>> can be temporarily out of sync.
>>
>> A much easier design would be to have the RPC address as payload in the
>> lock entry (ZNode in ZK), the same way that the leader fencing token is
>> stored as payload of the lock.
>> I think for the design above it would mean having a single ConfigMap for
>> both leader lock and leader RPC address discovery.
>>
>> This probably serves as a good design principle in general - not divide
>> information that is updated together over different resources.
>>
>> Best,
>> Stephan
>>
>>
>> On Wed, Sep 16, 2020 at 11:26 AM Xintong Song 
>> wrote:
>>
>>> Thanks for preparing this FLIP, @Yang.
>>>
>>> In general, I'm +1 for this new feature. Leveraging Kubernetes's
>>> buildtin ConfigMap for Flink's HA services should significantly reduce the
>>> maintenance overhead compared to deploying a ZK cluster. I th

Re: Dynamic Kafka Source

2020-09-28 Thread Piotr Nowojski
Hi Prasanna,

As Theo has suggested on Stackoverflow, can you use multiple independent
jobs instead?

Piotrek

sob., 26 wrz 2020 o 19:17 Prasanna kumar 
napisał(a):

> Hi,
>
> My requirement has been captured by the following stack overflow question.
>
>
> https://stackoverflow.com/questions/61876849/custom-kafka-source-on-apache-flink
>
> Could anyone take a shot at it ?
>
> Thanks,
> Prasanna.
>


Flink Batch Processing

2020-09-28 Thread s_penakalap...@yahoo.com
Hi All,
Need your help in Flink Batch processing: scenario described below:
we have multiple vehicles, we get data from each vehicle at a very high speed, 
1 record per minute.thresholds can be set by the owner for each vehicle. 
Say: we have 3 vehicles, threshold is set for 2 vehicles. Vehicle 1, threshold 
20 hours, allowedPetrolConsumption=15vehicle 2, threshold 35 hours, 
allowedPetrolConsumption=28vehicle 3  no threshold set by owner.
All the vehicle data is stored in HBase tables. We have a scheduled Batch Job 
every day at 12 pm to check the status of vehicle movement and Petrol 
consumption against threshold and raise an alert (vehicle1 did not move for 
past 20 hours, vehicle 2 consumed more petrol. )
Since it is a Batch Job, I loaded all threshold data in one DataSet and HBase 
Data in another Dataset using HbaseInputFormat.
What I am failing to figure out is:1> vehicle 1 is having threshold of 20 hours 
where as vehicle 2 has threshold of 35 hours, I need to fetch data from Hbase 
for different scenario. Is there any better approach to get all data using one 
Hbase connection.2> how to apply alert on Dataset.  CEP pattern/ 
Match_recognize is allowed only on DataStream. Please help me with a simple 
example. (alert can be raised if count is zero or like petrol consumption is 
too high)

I could not get any example for Dataset on google where an alert is raised. 
Kindly guide me if there is any better approach
Regards,Sunitha.

Re: Flink Collector issue when Collection Object

2020-09-28 Thread Arvid Heise
Hi Anuj,

I answered on SO. Let's keep the discussion there.

On Mon, Sep 28, 2020 at 9:14 AM aj  wrote:

>
> Hello All,
>
> Can somebody help me to resolve this and understand what is wrong i am
> doing.
>
>
> https://stackoverflow.com/questions/64063833/flink-collector-issue-when-collection-object-with-map-of-object-class
>
>
> --
> Thanks & Regards,
> Anuj Jain
>
>
>
> 
>


-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Scala: Static methods in interface require -target:jvm-1.8

2020-09-28 Thread Lu Weizheng
Hi all,

I recently upgraded Intellij IEDA from 2019 to 2020.2 Community Edition. I 
didn’t do anything to Maven.
My code could compile correctly before. But now I get the following error:

Static methods in interface require -target:jvm-1.8

Probably because I use new WatermarkStrategy Scala API:

.assignTimestampsAndWatermarks(
  WatermarkStrategy
.forBoundedOutOfOrderness(Duration.ofSeconds(1))
.withTimestampAssigner(new SerializableTimestampAssigner[(String, Long, 
Int)] {
  override def extractTimestamp(t: (String, Long, Int), l: Long): Long = 
t._2
})
)

My project have both java and scala code. Here’s my POM.xml file:


 
net.alchim31.maven
scala-maven-plugin


   
  scala-compile-first
  process-resources
  
 compile
  
   









 

Is there something I need to add to my POM file?




Best Regards,
Weizheng Lu


Flink Collector issue when Collection Object

2020-09-28 Thread aj
Hello All,

Can somebody help me to resolve this and understand what is wrong i am
doing.

https://stackoverflow.com/questions/64063833/flink-collector-issue-when-collection-object-with-map-of-object-class


-- 
Thanks & Regards,
Anuj Jain