Re: [DISCUSS] Feature freeze date for 1.13

2021-04-06 Thread Kurt Young
Hi Yuval,

I think you are good to go, since there is no objection from PMC.

Best,
Kurt


On Wed, Apr 7, 2021 at 12:48 AM Yuval Itzchakov  wrote:

> Hi Guowei,
>
> Who should I speak to regarding this? I am at the final stages of the PR I
> believe (Shengkai is kindly helping me make things work) and I would like
> to push this into 1.13.
>
> On Fri, Apr 2, 2021 at 5:43 AM Guowei Ma  wrote:
>
>> Hi, Yuval
>>
>> Thanks for your contribution. I am not a SQL expert, but it seems to be
>> beneficial to users, and the amount of code is not much and only left is
>> the test. Therefore, I am open to this entry into rc1.
>> But according to the rules, you still have to see if there are other
>> PMC's objections within 48 hours.
>>
>> Best,
>> Guowei
>>
>>
>> On Thu, Apr 1, 2021 at 10:33 PM Yuval Itzchakov 
>> wrote:
>>
>>> Hi All,
>>>
>>> I would really love to merge https://github.com/apache/flink/pull/15307
>>> prior to 1.13 release cutoff, it just needs some more tests which I can
>>> hopefully get to today / tomorrow morning.
>>>
>>> This is a critical fix as now predicate pushdown won't work for any
>>> stream which generates a watermark and wants to push down predicates.
>>>
>>> On Thu, Apr 1, 2021, 10:56 Kurt Young  wrote:
>>>
 Thanks Dawid, I have merged FLINK-20320.

 Best,
 Kurt


 On Thu, Apr 1, 2021 at 2:49 PM Dawid Wysakowicz 
 wrote:

> Hi all,
>
> @Kurt @Arvid I think it's fine to merge those two, as they are pretty
> much finished. We can wait for those two before creating the RC0.
>
> @Leonard Personally I'd be ok with 3 more days for that single PR. I
> find the request reasonable and I second that it's better to have a proper
> review rather than rush unfinished feature and try to fix it later.
> Moreover it got broader support. Unless somebody else objects, I think we
> can merge this PR later and include it in RC1.
>
> Best,
>
> Dawid
> On 01/04/2021 08:39, Arvid Heise wrote:
>
> Hi Dawid and Guowei,
>
> I'd like to merge [FLINK-13550][rest][ui] Vertex Flame Graph [1]. We
> are pretty much just waiting for AZP to turn green, it's separate from
> other components, and it's a super useful feature for Flink users.
>
> Best,
>
> Arvid
>
> [1] https://github.com/apache/flink/pull/15054
>
> On Thu, Apr 1, 2021 at 6:21 AM Kurt Young  wrote:
>
>> Hi Guowei and Dawid,
>>
>> I want to request the permission to merge this feature [1], it's a
>> useful improvement to sql client and won't affect
>> other components too much. We were plan to merge it yesterday but met
>> some tricky multi-process issue which
>> has a very high possibility hanging the tests. It took us a while to
>> find out the root cause and fix it.
>>
>> Since it's not too far away from feature freeze and RC0 also not
>> created yet, thus I would like to include this
>> in 1.13.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-20320
>>
>> Best,
>> Kurt
>>
>>
>> On Wed, Mar 31, 2021 at 5:55 PM Guowei Ma 
>> wrote:
>>
>>> Hi, community:
>>>
>>> Friendly reminder that today (3.31) is the last day of feature
>>> development. Under normal circumstances, you will not be able to submit 
>>> new
>>> features from tomorrow (4.1). Tomorrow we will create 1.13.0-rc0 for
>>> testing, welcome to help test together.
>>> After the test is relatively stable, we will cut the release-1.13
>>> branch.
>>>
>>> Best,
>>> Dawid & Guowei
>>>
>>>
>>> On Mon, Mar 29, 2021 at 5:17 PM Till Rohrmann 
>>> wrote:
>>>
 +1 for the 31st of March for the feature freeze.

 Cheers,
 Till

 On Mon, Mar 29, 2021 at 10:12 AM Robert Metzger <
 rmetz...@apache.org> wrote:

 > +1 for March 31st for the feature freeze.
 >
 >
 >
 > On Fri, Mar 26, 2021 at 3:39 PM Dawid Wysakowicz <
 dwysakow...@apache.org>
 > wrote:
 >
 > > Thank you Thomas! I'll definitely check the issue you linked.
 > >
 > > Best,
 > >
 > > Dawid
 > >
 > > On 23/03/2021 20:35, Thomas Weise wrote:
 > > > Hi Dawid,
 > > >
 > > > Thanks for the heads up.
 > > >
 > > > Regarding the "Rebase and merge" button. I find that merge
 option
 > useful,
 > > > especially for small simple changes and for backports. The
 following
 > > should
 > > > help to safeguard from the issue encountered previously:
 > > > https://github.com/jazzband/pip-tools/issues/1085
 > > >
 > > > Thanks,
 > > > Thomas
 > > >
 > > >
 > > > On Tue, Mar 23, 2021 at 4:58 AM Dawid Wysakowicz <
 > dwysakow...@ap

Re: Application cluster - Job execution and cluster creation timeouts

2021-04-06 Thread Yang Wang
Hi Tamir,

Maybe I did not make myself clear. Here the "deployer" means our internal
Flink application deployer(actually it is ververica platform),
not the *ApplicationDeployer* interface in Flink. It helps with managing
the lifecycle of every Flink application. And it has the same native
K8s integration mechanism with you have mentioned.

In my opinion, cleaning up the infinite failover Flink application(e.g.
wrong image) is the responsibility of your own deployer, not the Flink
client. In such a case, the JobManager usually could not run normally.

However, if the JobManager could be started successfully. Then it will
clean up all the K8s resources once all the jobs reached to the
terminal status(e.g. FAILED, CANCELED, FINISHED). Even the JobManager
crashed, it could recover the jobs from latest checkpoint
successfully if HA[1] enabled.

[1].
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/ha/overview/


Best,
Yang


Tamir Sagi  于2021年4月6日周二 下午6:43写道:

> Hey Yang
>
> Thank you for your respond
>
> We run the application cluster programmatically.
>
> I discussed about it here with an example how to run it from java and not
> CLI.
>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Application-cluster-Best-Practice-td42011.html
>
> following your comment
>
> accessibility of Flink rest endpoint. When it is not ready in the
> timeout(e.g. 120s), the deployer will delete the Flink JobManager
> deployment and try to create a new one.
>
> I have not seen it in action actually, I gave a non-existing image . The
> deployer actually started the k8s deployment but pods failed to
> start(expected) , The k8s deployment was running infinite.
>
> *What configuration is that ? is it possible to override it ?*
>
> I delved into the Flink-Core, and Flink-Kubernetes jars, Since Flink is
> dependent on Kubernetes ,  we both need to leverage the Kubernetes
> client(which Flink does internally) to manage and inspecting the resources.
>
> I am curious why you have "infinite job execution" in your Flink
> application cluster. If all the jobs in the application finished, Flink will
> deregister the application and all the K8s resources should be cleaned up.
>
> My though was about what happens if there is a bug and the job running
> infinite, job manager crashes over and over again?
> What happens if resources don't get cleaned properly ? We don't want to
> keep the cluster up and running in that case and would like to get a
> feedback. Since Flink does not support that we have to inspect that
> externally.(which makes it more complex)
> We could also pull the job status using Flink client, but it become
> useless if the job is executed infinite.
>
> What do you think?
>
> Best,
> Tamir.
>
>
> --
> *From:* Yang Wang 
> *Sent:* Tuesday, April 6, 2021 10:36 AM
> *To:* Tamir Sagi 
> *Cc:* user@flink.apache.org 
> *Subject:* Re: Application cluster - Job execution and cluster creation
> timeouts
>
>
> *EXTERNAL EMAIL*
>
>
> Hi Tamir,
>
> Thanks for trying the native K8s integration.
>
> 1. We do not have a timeout for creating the Flink application cluster.
> The reason is that the job submission happens on the JobManager side.
> So the Flink client does not need to wait for the JobManager running and
> then exit.
>
> I think even though the Flink client internally has the timeout, we still
> have the same problem when the Flink client crashes and then the timeout is
> gone.
>
> I want to share some other solution about the timeout. In our deployer,
> when a new Flink application is created, the deployer will periodically
> check the
> accessibility of Flink rest endpoint. When it is not ready in the
> timeout(e.g. 120s), the deployer will delete the Flink JobManager
> deployment and try to
> create a new one.
>
> 2. Actually, the current "flink run-application" does not support the real
> attached mode(waiting for all the jobs in the application finished).
> I am curious why you have "infinite job execution" in your Flink
> application cluster. If all the jobs in the application finished, Flink will
> deregister the application and all the K8s resources should be cleaned up.
>
>
> Best,
> Yang
>
>
> Tamir Sagi  于2021年4月5日周一 下午11:24写道:
>
> Hey all,
>
> We deploy application cluster natively on Kubernetes.
>
> are there any timeouts for Job execution and cluster creation?
>
> I went over the configuration page here
> 
> but did not find anything relevant.
>
> In order to get an indication about the cluster , we leverage the k8s
> client
> 
>  to watch the deployment
> 
>  in a namespace with specific cluster name and respond accordingly.
>
> we define two timeouts
>
>1. Crea

Flink: Exception from container-launch exitCode=2

2021-04-06 Thread Yik San Chan
*The question is cross-posted on Stack
Overflow 
https://stackoverflow.com/questions/66968180/flink-exception-from-container-launch-exitcode-2
.
Viewing the question on Stack Overflow is preferred as I include a few
images for better description.*

Hi community,

## Flink (Scala) exitCode=2

I have a simple Flink job that reads from 2 columns of a Hive table
`mysource`, add up the columns, then writes the result to another Hive
table `mysink`, which `mysource` has 2 columns `a bigint` and `b bigint`,
and `mysink` has only 1 column `c bigint`.

The job submits successfully, however, I observe it keeps retrying.

[![enter image description here][1]][1]

I click into each attempt, they simply show this.

```
AM Container for appattempt_1607399514900_2511_001267 exited with exitCode:
2
For more detailed output, check application tracking page:
http://cn-hz-h-test-data-flink00:8088/cluster/app/application_1607399514900_2511Then,
click on links to logs of each attempt.
Diagnostics: Exception from container-launch.
Container id: container_e13_1607399514900_2511_1267_01
Exit code: 2
Stack trace: ExitCodeException exitCode=2:
at org.apache.hadoop.util.Shell.runCommand(Shell.java:582)
at org.apache.hadoop.util.Shell.run(Shell.java:479)
at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:773)
at
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:212)
at
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
at
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Container exited with a non-zero exit code 2
Failing this attempt
```

However, the "Logs" has no useful info - it complains about the logging
lib, but I believe they are really warnings, not errors.

```
LogType:jobmanager.err
Log Upload Time:Wed Apr 07 10:30:52 +0800 2021
LogLength:1010
Log Contents:
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
[jar:file:/data/apache/hadoop/hadoop-2.7.3/logs/tmp/nm-local-dir/usercache/zhongtai/appcache/application_1607399514900_2509/filecache/10/featurepipelines-0.1.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]/
SLF4J: Found binding in
[jar:file:/data/apache/hadoop/hadoop-2.7.3/logs/tmp/nm-local-dir/filecache/302/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/data/apache/hadoop/hadoop-2.7.3/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
log4j:WARN No appenders could be found for logger
(org.apache.flink.runtime.entrypoint.ClusterEntrypoint).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for
more info.
End of LogType:jobmanager.err

LogType:jobmanager.out
Log Upload Time:Wed Apr 07 10:30:52 +0800 2021
LogLength:0
Log Contents:
End of LogType:jobmanager.out
```

This is the job written in Scala.

```scala
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.{EnvironmentSettings, SqlDialect}
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.catalog.hive.HiveCatalog

object HiveToyExample {
  def main(args: Array[String]): Unit = {
val settings = EnvironmentSettings.newInstance.build
val execEnv = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(execEnv, settings)

val hiveCatalog = new HiveCatalog(
  "myhive",
  "aiinfra",
  "/data/apache/hive/apache-hive-2.1.0-bin/conf/"
)
tableEnv.registerCatalog("myhive", hiveCatalog)
tableEnv.useCatalog("myhive")

tableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT)

tableEnv
  .executeSql("""
  |INSERT INTO mysink
  |SELECT a + b
  |FROM mysource
  |""".stripMargin)
  }
}
```

Here's the pom.xml.

```xml
http://maven.apache.org/POM/4.0.0";
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
4.0.0

exmple
featurepipelines
0.1.1
jar

Feature Pipelines


8
8
UTF-8
1.12.0
2.11
2.11.12
2.12.1




org.apache.flink
flink-streaming-scala_${scala.binary.version}
${flink.version}
provided


org.apache.flink
flink-clients_${scala.binary.version}
${flink.

Re: Dynamic configuration via broadcast state

2021-04-06 Thread vishalovercome
I researched a bit more and another suggested solution is to build a custom
source function that somehow waits for each operator to load it's
configuration which is infact set in the open method of the source itself.
I'm not sure if that's a good idea as that just exposes entire job
configuration to an operator. 

Can we leverage watermarks/idle sources somehow? Basically set the timestamp
of configuration stream to a very low number at the start and then force it
to be read before data from other sources start flowing in. As
configurations aren't going to change frequently we can idle these sources.

1. Is the above approach even possible?
2. Can an idle source resume once configuration changes? 

A rough sketch of timestamp assignment, re-activating an idle source would
help!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


questions regarding stateful functions

2021-04-06 Thread Marco Villalobos
Upon reading about stateful functions, it seems as though first, a data
stream has to flow to an event ingress.  Then, the stateful functions will
perform computations via whatever functionality it provides. Finally, the
results of said computations will flow to the event egress which will be
yet another datastream within the data stream job.

Is my understanding correct?

I was hoping that a stateful function could be called by a process
function, is that possible? (I am guessing no).

Please let me know.

Thank you.  Sincerely,

Marco A. Villalobos


Dynamic configuration via broadcast state

2021-04-06 Thread vishalovercome
I have to make my flink job dynamically configurable and I'm thinking about
using broadcast state. My current static job configuration file consists of
configuration of entire set of operators which I load into a case class and
then I explicitly pass the relevant configuration of each operator as its
constructor parameters. Will I have to create individual broadcast streams
for each operator? I.e

val o1conf: BroadcastStream[O1Conf] = ...
someStream.connect(o1conf).map(...)

someOtherStream.connect(o2conf).flatMap(...) and so on?

1. Is there a way to just load the configuration as a whole but only pick
the right subset in the connect method like so:

someStream.connect(jobConfig.o1Conf).map(...)

My job has several operators and it seems rather clumsy to have to
instantiate 1 broadcast stream for each dynamically configurable operator.

2. Is there a way to guarantee that processElement isn't called before the
first processBroadcastElement will be called? How else can we ensure that
each operator always starts with valid configuration? Passing the same
configuration as constructor parameters is one way to deal with it but its
clumsy because that's just repetition of code. Loading configuration in open
method is even worse because each operator will now have access to entire
job configuration.

3. What can we do to make source and sink connectors dynamically
configurable?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink - Pod Identity

2021-04-06 Thread Austin Cawley-Edwards
Great, glad to hear it Swagat!

Did you end up using Flink 1.6 or were you able to upgrade to Flink 1.12?
Could you also link the ticket back here if you've already made it/ make
sure it is not a duplicate of FLINK-18676
?

Best,
Austin

On Tue, Apr 6, 2021 at 12:29 PM Swagat Mishra  wrote:

> I was able to solve the issue by providing a custom version of the presto
> jar. I will create a ticket and raise a pull request so that others can
> benefit from it. I will share the details here shortly.
>
> Thanks everyone for your help and support. Especially Austin, he stands
> out due to his interest in the issue and helping to find ways to resolve it.
>
> Regards,
> Swagat
>
> On Tue, Apr 6, 2021 at 2:35 AM Austin Cawley-Edwards <
> austin.caw...@gmail.com> wrote:
>
>> And actually, I've found that the correct version of the AWS SDK *is*
>> included in Flink 1.12, which was reported and fixed in FLINK-18676
>> (see[1]). Since you said you saw this also occur in 1.12, can you share
>> more details about what you saw there?
>>
>> Best,
>> Austin
>>
>> [1]: https://issues.apache.org/jira/browse/FLINK-18676
>>
>> On Mon, Apr 5, 2021 at 4:53 PM Austin Cawley-Edwards <
>> austin.caw...@gmail.com> wrote:
>>
>>> That looks interesting! I've also found the full list of S3
>>> properties[1] for the version of presto-hive bundled with Flink 1.12 (see
>>> [2]), which includes an option for a KMS key (hive.s3.kms-key-id).
>>>
>>> (also, adding back the user list)
>>>
>>> [1]:
>>> https://prestodb.io/docs/0.187/connector/hive.html#amazon-s3-configuration
>>> [2]:
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/filesystems/s3.html#hadooppresto-s3-file-systems-plugins
>>>
>>> On Mon, Apr 5, 2021 at 4:21 PM Swagat Mishra  wrote:
>>>
 Btw, there is also an option to provide a custom credential provider,
 what are your thoughts on this?

 presto.s3.credentials-provider


 On Tue, Apr 6, 2021 at 12:43 AM Austin Cawley-Edwards <
 austin.caw...@gmail.com> wrote:

> I've confirmed that for the bundled + shaded aws dependency, the only
> way to upgrade it is to build a flink-s3-fs-presto jar with the updated
> dependency. Let me know if this is feasible for you, if the KMS key
> solution doesn't work.
>
> Best,
> Austin
>
> On Mon, Apr 5, 2021 at 2:18 PM Austin Cawley-Edwards <
> austin.caw...@gmail.com> wrote:
>
>> Hi Swagat,
>>
>> I don't believe there is an explicit configuration option for the KMS
>> key – please let me know if you're able to make that work!
>>
>> Best,
>> Austin
>>
>> On Mon, Apr 5, 2021 at 1:45 PM Swagat Mishra 
>> wrote:
>>
>>> Hi Austin,
>>>
>>> Let me know what you think on my latest email, if the approach might
>>> work, or if it is already supported and I am not using the 
>>> configurations
>>> properly.
>>>
>>> Thanks for your interest and support.
>>>
>>> Regards,
>>> Swagat
>>>
>>> On Mon, Apr 5, 2021 at 10:39 PM Austin Cawley-Edwards <
>>> austin.caw...@gmail.com> wrote:
>>>
 Hi Swagat,

 It looks like Flink 1.6 bundles the 1.11.165 version of the
 aws-java-sdk-core with the Presto implementation (transitively from 
 Presto
 0.185[1]).
 The minimum support version for the ServiceAccount authentication
 approach is 1.11.704 (see [2]) which was released on Jan 9th, 2020[3], 
 long
 after Flink 1.6 was released. It looks like even the most recent 
 Presto is
 on a version below that, concretely 1.11.697 in the master branch[4], 
 so I
 don't think even upgrading Flink to 1.6+ will solve this though it 
 looks to
 me like the AWS dependency is managed better in more recent Flink 
 versions.
 I'll have more for you on that front tomorrow, after the Easter break.

 I think what you would have to do to make this authentication
 approach work for Flink 1.6 is building a custom version of the
 flink-s3-fs-presto jar, replacing the bundled AWS dependency with the
 1.11.704 version, and then shading it the same way.

 In the meantime, would you mind creating a JIRA ticket with this
 use case? That'll give you the best insight into the status of fixing 
 this
 :)

 Let me know if that makes sense,
 Austin

 [1]:
 https://github.com/prestodb/presto/blob/1d4ee196df4327568c0982811d8459a44f1792b9/pom.xml#L53
 [2]:
 https://docs.aws.amazon.com/eks/latest/userguide/iam-roles-for-service-accounts-minimum-sdk.html
 [3]: https://github.com/aws/aws-sdk-java/releases/tag/1.11.704
 [4]: https://github.com/prestodb/presto/blob/master/pom.xml#L52

 On Sun

Re: [External] : Re: Need help with executing Flink CLI for native Kubernetes deployment

2021-04-06 Thread Fuyao Li
Hi Yang,

Thanks for the reply, those information is very helpful.

Best,
Fuyao

From: Yang Wang 
Date: Tuesday, April 6, 2021 at 01:11
To: Fuyao Li 
Cc: user 
Subject: Re: [External] : Re: Need help with executing Flink CLI for native 
Kubernetes deployment
Hi Fuyao,

Sorry for the late reply.

It is not very hard to develop your own deployer. Actually, I have 3 days for 
developing the PoC version of flink-native-k8s-operator. So
if you want to have a fully functional K8s operator, maybe two weeks is enough. 
But if you want to put it into production, you may need
some more time to polish it for easier use.

Flink native K8s integration is not going to replace the standalone mode. 
First, not all the Flink standalone clusters are running on the K8s.
And standalone mode could work really well with reactive mode[1].


Flink native K8s integration is not going to replace the K8s operator. 
Actually, the Flink K8s operator is not on the same level of Flink native
integration. The Flink k8s operator is responsible for managing the lifecycle 
of Flink application. Also it is to make the submission more K8s style.
The google and lyft Flink k8s operator could support native mode. They just do 
not have the support right now.


Kubernetes HA could work both for standalone mode and native mode. You could 
find the configuration here[2]. However, you might
need some changes on the Flink k8s operator to make it work. Because we need to 
add more args(e.g. --host) to the JobManager start commands.


[1]. 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-159%3A+Reactive+Mode
[2]. 
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/kubernetes/#high-availability-with-standalone-kubernetes

Best,
Yang


Fuyao Li mailto:fuyao...@oracle.com>> 于2021年4月5日周一 
下午1:33写道:
Hello Yang,

I am just following up the previous email to see if you got some time to reply.
I also took a deeper look into lyft k8s operator recently. It seems it doesn’t 
support HA natively. It still needs the help of ZooKeeper. In terms of this, 
native k8s is better. Any other ideas? Thanks for your help.

Best,
Fuyao

From: Fuyao Li mailto:fuyao...@oracle.com>>
Date: Thursday, April 1, 2021 at 12:22
To: Yang Wang mailto:danrtsey...@gmail.com>>
Cc: user mailto:user@flink.apache.org>>
Subject: Re: [External] : Re: Need help with executing Flink CLI for native 
Kubernetes deployment
Hi Yang,

Thanks for sharing the insights.

For problem 1:
I think I can’t do telnet in the container. I tried to use curl 
144.25.13.78:8081
 and I could see the HTML of Flink dashboard UI. This proves such public IP is 
reachable inside the cluster. Just as you mentioned, there might still be some 
network issues with the cluster. I will do some further check.

For problem 2:
I created a new K8S cluster with bastion server with some public IP assigned to 
it. Finally, I can see something valid from my browser. (There still exist some 
problems with connecting to some databases, but I think these network problems 
are not directly related to Flink, I can investigate into it later.)

For problem 3:
Thanks for sharing the repo you created. I am not sure how much work it could 
take to develop a deployer. I understand is depends on the proficiency, could 
you give a rough estimation? If it is too complicated and some other options 
are not significantly inferior to native Kubernetes. I might prefer to choose 
other options. I am currently comparing different options to deploy in 
Kubernetes.

  1.  Standalone K8S
  2.  Native Kubernetes
  3.  Flink operator (Google Cloud Platform/ Lyft) [1][2]

I also watched the demo video you presented. [3] I noticed you mentioned that 
native K8S is not going to replace the other two options. I still doesn’t fully 
get your idea with limited explanation in the demo. Could you compare the 
tradeoff a little bit? Thanks!
[1] 
https://github.com/GoogleCloudPlatform/flink-on-k8s-operator
[2]  
https://github.com/lyft/flinkk8soperator
[3] 
https://youtu.be/pdFPr_VOWTU

Re: Flink Taskmanager failure recovery and large state

2021-04-06 Thread Robert Metzger
Hey Yaroslav,

GCS is a somewhat popular filesystem that should work fine with Flink.

It seems that the initial scale of a bucket is 5000 read requests per
second (https://cloud.google.com/storage/docs/request-rate), your job
should be at roughly the same rate (depending on how fast your job restarts
in the restart loop).

You could try to tweak the GCS configuration parameters, such as increasing
"fs.gs.http.read-timeout" and "fs.gs.http.max.retry". (see
https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/master/gcs/CONFIGURATION.md
for all available options)


The "ExecutionGraphException: The execution attempt
6f3651a49344754a1e7d1fb20cf2cba3 was not found." error looks weird, but it
should not cause the restarts.


On Fri, Apr 2, 2021 at 2:15 AM Guowei Ma  wrote:

> Hi, Yaroslav
>
> AFAIK Flink does not retry if the download checkpoint from the storage
> fails. On the other hand the FileSystem already has this retry mechanism
> already. So I think there is no need for flink to retry.
> I am not very sure but from the log it seems that the gfs's retry is
> interrupted by some reason. So I think we could get more insight if we
> could find the first fail cause.
>
> Best,
> Guowei
>
>
> On Fri, Apr 2, 2021 at 12:07 AM Yaroslav Tkachenko <
> yaroslav.tkache...@shopify.com> wrote:
>
>> Hi Guowei,
>>
>> I thought Flink can support any HDFS-compatible object store like the
>> majority of Big Data frameworks. So we just added
>> "flink-shaded-hadoop-2-uber" and "gcs-connector-latest-hadoop2"
>> dependencies to the classpath, after that using "gs" prefix seems to be
>> possible:
>>
>> state.checkpoints.dir: gs:///flink-checkpoints
>> state.savepoints.dir: gs:///flink-savepoints
>>
>> And yes, I noticed that retries logging too, but I'm not sure if it's
>> implemented on the Flink side or the GCS connector side? Probably need to
>> dive deeper into the source code. And if it's implemented on the GCS
>> connector side, will Flink wait for all the retries? That's why I asked
>> about the potential timeout on the Flink side.
>>
>> The JM log doesn't have much besides from what I already posted. It's
>> hard for me to share the whole log, but the RocksDB initialization part can
>> be relevant:
>>
>> 16:03:41.987 [cluster-io-thread-3] INFO
>>  org.apache.flink.runtime.jobmaster.JobMaster - Using job/cluster config to
>> configure application-defined state backend:
>> RocksDBStateBackend{checkpointStreamBackend=File State Backend
>> (checkpoints: 'gs:///flink-checkpoints', savepoints:
>> 'gs:///flink-savepoints', asynchronous: TRUE, fileStateThreshold:
>> 1048576), localRocksDbDirectories=[/rocksdb],
>> enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=4,
>> writeBatchSize=2097152}
>> 16:03:41.988 [cluster-io-thread-3] INFO
>>  org.apache.flink.contrib.streaming.state.RocksDBStateBackend - Using
>> predefined options: FLASH_SSD_OPTIMIZED.
>> 16:03:41.988 [cluster-io-thread-3] INFO
>>  org.apache.flink.contrib.streaming.state.RocksDBStateBackend - Using
>> application-defined options factory:
>> DefaultConfigurableOptionsFactory{configuredOptions={state.backend.rocksdb.thread.num=4,
>> state.backend.rocksdb.block.blocksize=16 kb,
>> state.backend.rocksdb.block.cache-size=64 mb}}.
>> 16:03:41.988 [cluster-io-thread-3] INFO
>>  org.apache.flink.runtime.jobmaster.JobMaster - Using application-defined
>> state backend: RocksDBStateBackend{checkpointStreamBackend=File State
>> Backend (checkpoints: 'gs:///flink-checkpoints', savepoints:
>> 'gs:///flink-savepoints', asynchronous: TRUE, fileStateThreshold:
>> 1048576), localRocksDbDirectories=[/rocksdb],
>> enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=4,
>> writeBatchSize=2097152}
>>
>> Thanks!
>>
>> On Thu, Apr 1, 2021 at 2:30 AM Guowei Ma  wrote:
>>
>>> Hi, Yaroslav
>>>
>>> AFAIK there is no official GCS FileSystem support in FLINK.  Does the
>>> GCS is implemented by yourself?
>>> Would you like to share the whole log of jm?
>>>
>>> BTW: From the following log I think the implementation has already some
>>> retry mechanism.
>>> >>> Interrupted while sleeping before retry. Giving up after 1/10
>>> retries for
>>> 'gs:///flink-checkpoints/150a406a50d20e1ee77422d25ef28d
>>>
>>> Best,
>>> Guowei
>>>
>>>
>>> On Thu, Apr 1, 2021 at 12:50 PM Yaroslav Tkachenko <
>>> yaroslav.tkache...@shopify.com> wrote:
>>>
 Hi everyone,

 I'm wondering if people have experienced issues with Taskmanager
 failure recovery when dealing with a lot of state.

 I'm using 1.12.0 on Kubernetes, RocksDB backend with GCS for savepoints
 and checkpoints. ~150 task managers with 4 slots each.

 When I run a pipeline without much state and kill one of the
 taskmanagers, it takes a few minutes to recover (I see a few restarts), but
 eventually when a new replacement taskmanager is registered with the
 jobmanager things go back to healthy.

 But when I run a pipeline with a lot of state (1TB+) and kill one of
 th

Re: Source Operators Stuck in the requestBufferBuilderBlocking

2021-04-06 Thread Roman Khachatryan
Hi Sihan,

Unfortunately, we are unable to reproduce the issue so far. Could you
please describe in more detail the job graph, in particular what are
the downstream operators and whether there is any chaining?

Do I understand correctly, that Flink returned back to normal at
around 8:00; worked fine for ~3 hours; got stuck again; and then it
was restarted?

I'm also wondering whether requestBufferBuilderBlocking is just a
frequent operation popping up in thread dump. Or do you actually see
that Legacy source threads are *stuck* there?

Could you please explain how the other metrics are calculated?
(PURCHASE KAFKA NUM-SEC, PURCHASE OUTPOOL, PLI PURCHASE JOIN INPOOL).
Or do you have rate metrics per source?

Regards,
Roman



On Wed, Mar 31, 2021 at 1:44 AM Sihan You  wrote:
>
> Awesome. Let me know if you need any other information. Our application has a 
> heavy usage on event timer and keyed state. The load is vey heavy. If that 
> matters.
> On Mar 29, 2021, 05:50 -0700, Piotr Nowojski , wrote:
>
> Hi Sihan,
>
> Thanks for the information. Previously I was not able to reproduce this 
> issue, but after adding a union I think I can see it happening.
>
> Best,
> Piotrek
>
> pt., 26 mar 2021 o 22:59 Sihan You  napisał(a):
>>
>> this issue not always reproducible. it happened 2~3 times in our development 
>> period of 3 months.
>>
>> On Fri, Mar 26, 2021 at 2:57 PM Sihan You  wrote:
>>>
>>> Hi,
>>>
>>> Thanks for responding. I'm working in a commercial organization so I cannot 
>>> share the detailed stack with you. I will try to describe the issue as 
>>> specific as I can.
>>> 
>>> above is a more detailed stats of our job.
>>> 1. How long did the job run until it got stuck?
>>> about 9 hours.
>>> 2. How often do you checkpoint or how many checkpoints succeeded?
>>> I don't remember the exact number of the successful checkpoints, but there 
>>> should be around 2. then the checkpoint started to fail because of the 
>>> timeout.
>>> 3. What were the typical checkpoint sizes? How much in-flight data was 
>>> checkpointed? (A screenshot of the checkpoint tab in the Flink UI would 
>>> suffice)
>>> the first checkpoint is 5T and the second is 578G.
>>> 4. Was the parallelism of the whole job 5? How is the topology roughly 
>>> looking? (e.g., Source -> Map -> Sink?)
>>> the source is a union of two source streams. one has a parallelism of 5 and 
>>> the other has 80.
>>> the job graph is like this.
>>> source 1.1 (5 parallelism).  ->
>>>   union ->
>>> source 1.2 (80 parallelism) ->
>>> connect 
>>> -> sink
>>> source 2.1 (5 parallelism).  ->
>>>   union ->
>>> source 2.2 (80 parallelism) ->
>>> 5. Did you see any warns/errors in the logs related to checkpointing and 
>>> I/O?
>>> no error is thrown.
>>> 6. What was your checkpoint storage (e.g. S3)? Is the application running 
>>> in the same data-center (e.g. AWS)?
>>> we are using HDFS as the state backend and the checkpoint dir.
>>> the application is running in our own data center and in Kubernetes as a 
>>> standalone job.
>>>
>>> On Fri, Mar 26, 2021 at 7:31 AM Piotr Nowojski  wrote:

 Hi Sihan,

 More importantly, could you create some example job that can reproduce 
 that problem? It can have some fake sources and no business logic, but if 
 you could provide us with something like that, it would allow us to 
 analyse the problem without going back and forth with tens of questions.

 Best, Piotrek

 pt., 26 mar 2021 o 11:40 Arvid Heise  napisał(a):
>
> Hi Sihan,
>
> thanks for reporting. This looks like a bug to me. I have opened an 
> investigation ticket with the highest priority [1].
>
> Could you please provide some more context, so we have a chance to 
> reproduce?
> 1. How long did the job run until it got stuck?
> 2. How often do you checkpoint or how many checkpoints succeeded?
> 3. What were the typical checkpoint sizes? How much in-flight data was 
> checkpointed? (A screenshot of the checkpoint tab in the Flink UI would 
> suffice)
> 4. Was the parallelism of the whole job 5? How is the topology roughly 
> looking? (e.g., Source -> Map -> Sink?)
> 5. Did you see any warns/errors in the logs related to checkpointing and 
> I/O?
> 6. What was your checkpoint storage (e.g. S3)? Is the application running 
> in the same data-center (e.g. AWS)?
>
> [1] https://issues.apache.org/jira/browse/FLINK-21992
>
> On Thu, Mar 25, 2021 at 3:00 AM Sihan You  wrote:
>>
>> Hi,
>>
>> I keep seeing the following situation where a task is blocked getting a 
>> MemorySegment from the pool but the operator is still reporting.
>>
>> I'm completely stumped as to how to debug or what to look at next so any 
>> hints/help/advice would be gre

Re: [DISCUSS] Feature freeze date for 1.13

2021-04-06 Thread Yuval Itzchakov
Hi Guowei,

Who should I speak to regarding this? I am at the final stages of the PR I
believe (Shengkai is kindly helping me make things work) and I would like
to push this into 1.13.

On Fri, Apr 2, 2021 at 5:43 AM Guowei Ma  wrote:

> Hi, Yuval
>
> Thanks for your contribution. I am not a SQL expert, but it seems to be
> beneficial to users, and the amount of code is not much and only left is
> the test. Therefore, I am open to this entry into rc1.
> But according to the rules, you still have to see if there are other PMC's
> objections within 48 hours.
>
> Best,
> Guowei
>
>
> On Thu, Apr 1, 2021 at 10:33 PM Yuval Itzchakov  wrote:
>
>> Hi All,
>>
>> I would really love to merge https://github.com/apache/flink/pull/15307
>> prior to 1.13 release cutoff, it just needs some more tests which I can
>> hopefully get to today / tomorrow morning.
>>
>> This is a critical fix as now predicate pushdown won't work for any
>> stream which generates a watermark and wants to push down predicates.
>>
>> On Thu, Apr 1, 2021, 10:56 Kurt Young  wrote:
>>
>>> Thanks Dawid, I have merged FLINK-20320.
>>>
>>> Best,
>>> Kurt
>>>
>>>
>>> On Thu, Apr 1, 2021 at 2:49 PM Dawid Wysakowicz 
>>> wrote:
>>>
 Hi all,

 @Kurt @Arvid I think it's fine to merge those two, as they are pretty
 much finished. We can wait for those two before creating the RC0.

 @Leonard Personally I'd be ok with 3 more days for that single PR. I
 find the request reasonable and I second that it's better to have a proper
 review rather than rush unfinished feature and try to fix it later.
 Moreover it got broader support. Unless somebody else objects, I think we
 can merge this PR later and include it in RC1.

 Best,

 Dawid
 On 01/04/2021 08:39, Arvid Heise wrote:

 Hi Dawid and Guowei,

 I'd like to merge [FLINK-13550][rest][ui] Vertex Flame Graph [1]. We
 are pretty much just waiting for AZP to turn green, it's separate from
 other components, and it's a super useful feature for Flink users.

 Best,

 Arvid

 [1] https://github.com/apache/flink/pull/15054

 On Thu, Apr 1, 2021 at 6:21 AM Kurt Young  wrote:

> Hi Guowei and Dawid,
>
> I want to request the permission to merge this feature [1], it's a
> useful improvement to sql client and won't affect
> other components too much. We were plan to merge it yesterday but met
> some tricky multi-process issue which
> has a very high possibility hanging the tests. It took us a while to
> find out the root cause and fix it.
>
> Since it's not too far away from feature freeze and RC0 also not
> created yet, thus I would like to include this
> in 1.13.
>
> [1] https://issues.apache.org/jira/browse/FLINK-20320
>
> Best,
> Kurt
>
>
> On Wed, Mar 31, 2021 at 5:55 PM Guowei Ma 
> wrote:
>
>> Hi, community:
>>
>> Friendly reminder that today (3.31) is the last day of feature
>> development. Under normal circumstances, you will not be able to submit 
>> new
>> features from tomorrow (4.1). Tomorrow we will create 1.13.0-rc0 for
>> testing, welcome to help test together.
>> After the test is relatively stable, we will cut the release-1.13
>> branch.
>>
>> Best,
>> Dawid & Guowei
>>
>>
>> On Mon, Mar 29, 2021 at 5:17 PM Till Rohrmann 
>> wrote:
>>
>>> +1 for the 31st of March for the feature freeze.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Mon, Mar 29, 2021 at 10:12 AM Robert Metzger 
>>> wrote:
>>>
>>> > +1 for March 31st for the feature freeze.
>>> >
>>> >
>>> >
>>> > On Fri, Mar 26, 2021 at 3:39 PM Dawid Wysakowicz <
>>> dwysakow...@apache.org>
>>> > wrote:
>>> >
>>> > > Thank you Thomas! I'll definitely check the issue you linked.
>>> > >
>>> > > Best,
>>> > >
>>> > > Dawid
>>> > >
>>> > > On 23/03/2021 20:35, Thomas Weise wrote:
>>> > > > Hi Dawid,
>>> > > >
>>> > > > Thanks for the heads up.
>>> > > >
>>> > > > Regarding the "Rebase and merge" button. I find that merge
>>> option
>>> > useful,
>>> > > > especially for small simple changes and for backports. The
>>> following
>>> > > should
>>> > > > help to safeguard from the issue encountered previously:
>>> > > > https://github.com/jazzband/pip-tools/issues/1085
>>> > > >
>>> > > > Thanks,
>>> > > > Thomas
>>> > > >
>>> > > >
>>> > > > On Tue, Mar 23, 2021 at 4:58 AM Dawid Wysakowicz <
>>> > dwysakow...@apache.org
>>> > > >
>>> > > > wrote:
>>> > > >
>>> > > >> Hi devs, users!
>>> > > >>
>>> > > >> 1. *Feature freeze date*
>>> > > >>
>>> > > >> We are approaching the end of March which we agreed would be
>>> the time
>>> > > for
>>> > > >> a Feature Freeze. From the knowledge I've gather

Re: Flink - Pod Identity

2021-04-06 Thread Swagat Mishra
I was able to solve the issue by providing a custom version of the presto
jar. I will create a ticket and raise a pull request so that others can
benefit from it. I will share the details here shortly.

Thanks everyone for your help and support. Especially Austin, he stands out
due to his interest in the issue and helping to find ways to resolve it.

Regards,
Swagat

On Tue, Apr 6, 2021 at 2:35 AM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:

> And actually, I've found that the correct version of the AWS SDK *is*
> included in Flink 1.12, which was reported and fixed in FLINK-18676
> (see[1]). Since you said you saw this also occur in 1.12, can you share
> more details about what you saw there?
>
> Best,
> Austin
>
> [1]: https://issues.apache.org/jira/browse/FLINK-18676
>
> On Mon, Apr 5, 2021 at 4:53 PM Austin Cawley-Edwards <
> austin.caw...@gmail.com> wrote:
>
>> That looks interesting! I've also found the full list of S3 properties[1]
>> for the version of presto-hive bundled with Flink 1.12 (see [2]), which
>> includes an option for a KMS key (hive.s3.kms-key-id).
>>
>> (also, adding back the user list)
>>
>> [1]:
>> https://prestodb.io/docs/0.187/connector/hive.html#amazon-s3-configuration
>> [2]:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/filesystems/s3.html#hadooppresto-s3-file-systems-plugins
>>
>> On Mon, Apr 5, 2021 at 4:21 PM Swagat Mishra  wrote:
>>
>>> Btw, there is also an option to provide a custom credential provider,
>>> what are your thoughts on this?
>>>
>>> presto.s3.credentials-provider
>>>
>>>
>>> On Tue, Apr 6, 2021 at 12:43 AM Austin Cawley-Edwards <
>>> austin.caw...@gmail.com> wrote:
>>>
 I've confirmed that for the bundled + shaded aws dependency, the only
 way to upgrade it is to build a flink-s3-fs-presto jar with the updated
 dependency. Let me know if this is feasible for you, if the KMS key
 solution doesn't work.

 Best,
 Austin

 On Mon, Apr 5, 2021 at 2:18 PM Austin Cawley-Edwards <
 austin.caw...@gmail.com> wrote:

> Hi Swagat,
>
> I don't believe there is an explicit configuration option for the KMS
> key – please let me know if you're able to make that work!
>
> Best,
> Austin
>
> On Mon, Apr 5, 2021 at 1:45 PM Swagat Mishra 
> wrote:
>
>> Hi Austin,
>>
>> Let me know what you think on my latest email, if the approach might
>> work, or if it is already supported and I am not using the configurations
>> properly.
>>
>> Thanks for your interest and support.
>>
>> Regards,
>> Swagat
>>
>> On Mon, Apr 5, 2021 at 10:39 PM Austin Cawley-Edwards <
>> austin.caw...@gmail.com> wrote:
>>
>>> Hi Swagat,
>>>
>>> It looks like Flink 1.6 bundles the 1.11.165 version of the
>>> aws-java-sdk-core with the Presto implementation (transitively from 
>>> Presto
>>> 0.185[1]).
>>> The minimum support version for the ServiceAccount authentication
>>> approach is 1.11.704 (see [2]) which was released on Jan 9th, 2020[3], 
>>> long
>>> after Flink 1.6 was released. It looks like even the most recent Presto 
>>> is
>>> on a version below that, concretely 1.11.697 in the master branch[4], 
>>> so I
>>> don't think even upgrading Flink to 1.6+ will solve this though it 
>>> looks to
>>> me like the AWS dependency is managed better in more recent Flink 
>>> versions.
>>> I'll have more for you on that front tomorrow, after the Easter break.
>>>
>>> I think what you would have to do to make this authentication
>>> approach work for Flink 1.6 is building a custom version of the
>>> flink-s3-fs-presto jar, replacing the bundled AWS dependency with the
>>> 1.11.704 version, and then shading it the same way.
>>>
>>> In the meantime, would you mind creating a JIRA ticket with this use
>>> case? That'll give you the best insight into the status of fixing this 
>>> :)
>>>
>>> Let me know if that makes sense,
>>> Austin
>>>
>>> [1]:
>>> https://github.com/prestodb/presto/blob/1d4ee196df4327568c0982811d8459a44f1792b9/pom.xml#L53
>>> [2]:
>>> https://docs.aws.amazon.com/eks/latest/userguide/iam-roles-for-service-accounts-minimum-sdk.html
>>> [3]: https://github.com/aws/aws-sdk-java/releases/tag/1.11.704
>>> [4]: https://github.com/prestodb/presto/blob/master/pom.xml#L52
>>>
>>> On Sun, Apr 4, 2021 at 3:32 AM Swagat Mishra 
>>> wrote:
>>>
 Austin -

 In my case the set up is such that services are deployed on
 Kubernetes with Docker, running on EKS. There is also an istio service
 mesh. So all the services communicate and access AWS resources like S3
 using the service account. Service account is associated with IAM 
 roles. I
 have verified that the service account has access to S3, by running a
>>

Re: How to know if task-local recovery kicked in for some nodes?

2021-04-06 Thread dhanesh arole
Hi Sonam,

We have a similar setup. What I have observed is, when the task manager pod
gets killed and restarts again ( i.e. the entire task manager process
restarts ) then local recovery doesn't happen. Task manager restore process
actually downloads the latest completed checkpoint from the remote state
handle even when the older localState data is available. This happens
because with every run allocation-ids for tasks running on task manager
change as task manager restart causes global job failure and restart.

Local recovery - i.e task restore process using locally stored checkpoint
data kicks in when the task manager process is alive but due to some other
reason ( like timeout from sink or external dependency ) one of the tasks
fails and the flink job gets restarted by the job manager.

Please CMIIW


-
Dhanesh Arole

On Tue, Apr 6, 2021 at 11:35 AM Till Rohrmann  wrote:

> Hi Sonam,
>
> The easiest way to see whether local state has been used for recovery is
> the recovery time. Apart from that you can also look for "Found registered
> local state for checkpoint {} in subtask ({} - {} - {}" in the logs which
> is logged on debug. This indicates that the local state is available.
> However, it does not say whether it is actually used. E.g. when doing a
> rescaling operation we change the assignment of key group ranges which
> prevents local state from being used. However in case of a recovery the
> above-mentioned log message should indicate that we use local state
> recovery.
>
> Cheers,
> Till
>
> On Tue, Apr 6, 2021 at 11:31 AM Tzu-Li (Gordon) Tai 
> wrote:
>
>> Hi Sonam,
>>
>> Pulling in Till (cc'ed), I believe he would likely be able to help you
>> here.
>>
>> Cheers,
>> Gordon
>>
>> On Fri, Apr 2, 2021 at 8:18 AM Sonam Mandal 
>> wrote:
>>
>>> Hello,
>>>
>>> We are experimenting with task local recovery and I wanted to know
>>> whether there is a way to validate that some tasks of the job recovered
>>> from the local state rather than the remote state.
>>>
>>> We've currently set this up to have 2 Task Managers with 2 slots each,
>>> and we run a job with parallelism 4. To simulate failure, we kill one of
>>> the Task Manager pods (we run on Kubernetes). I want to see if the local
>>> state of the other Task Manager was used or not. I do understand that the
>>> state for the killed Task Manager will need to be fetched from the
>>> checkpoint.
>>>
>>> Also, do you have any suggestions on how to test such failure scenarios
>>> in a better way?
>>>
>>> Thanks,
>>> Sonam
>>>
>>


Task manager local state data after crash / recovery

2021-04-06 Thread dhanesh arole
Hey all,

We are running a stateful stream processing job on k8s using per-job
standalone deployment entrypoint. Flink version: 1.12.1

*Problem*: We have observed that whenever a task manager is either
gracefully shut down or killed ( due to OOM, k8s worker node drain out etc
) it doesn't clean up the rocksdb state directories from the local disk.
But when the task manager restarts and it receives new task allocation from
the resource manager it rebuilds its local state for those tasks from the
previous completed checkpoint. Over the period of time after multiple
restarts, the task manager's local disk ends up accumulating lots of such
orphan rocksdb directories.

*Questions*: This isn't causing any functional issues to us, but it adds up
lots of repeated ops overhead of cleaning these disks periodically. As a
workaround, we are thinking of cleaning the local rocksdb directories
except for the *taskmanager.state.local.root-dirs *before starting the task
manager java process. Since, during every task manager restart keyed state
backends for allocated tasks are anyway restored we feel it is the safest
option atm and will solve our problem of ever growing disk on task manager
pods. Is it safe to do so or are there any other consequences of it? Is
there any config or restart policy that takes care of cleaning up such
stale rocksdb directories during the statebackend restore process?.

A sort of similar clean up is required when local task recovery is enabled.
Whenever the task manager is not shut down gracefully the old localState
doesn't get cleaned up on the next restart. This also causes lots of disk
space wastage. It's easier to delete rocksdb working directories from
previou run, but not so straightforward for the localState as one has to
figure out which one of them are actually stale allocation IDs and clean
only those one. Or check the latest completed checkpoint and delete all
localStates directories for older checkpoints and allocation-ids. Is there
any other solution to this problem? Also would like to learn from other
users how are you handling these operational tasks currently?

configurations:

state.backend.local-recovery: true
taskmanager.state.local.root-dirs: /data/flink/

RocksDb backend DB storage path:  /data/flink ( set programmatically )


-
Dhanesh Arole


Re: Application cluster - Job execution and cluster creation timeouts

2021-04-06 Thread Tamir Sagi
Hey Yang

Thank you for your respond

We run the application cluster programmatically.

I discussed about it here with an example how to run it from java and not CLI.
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Application-cluster-Best-Practice-td42011.html

following your comment
accessibility of Flink rest endpoint. When it is not ready in the timeout(e.g. 
120s), the deployer will delete the Flink JobManager deployment and try to 
create a new one.
I have not seen it in action actually, I gave a non-existing image . The 
deployer actually started the k8s deployment but pods failed to start(expected) 
, The k8s deployment was running infinite.

What configuration is that ? is it possible to override it ?

I delved into the Flink-Core, and Flink-Kubernetes jars, Since Flink is 
dependent on Kubernetes ,  we both need to leverage the Kubernetes client(which 
Flink does internally) to manage and inspecting the resources.
I am curious why you have "infinite job execution" in your Flink  application 
cluster. If all the jobs in the application finished, Flink will
deregister the application and all the K8s resources should be cleaned up.
My though was about what happens if there is a bug and the job running 
infinite, job manager crashes over and over again?
What happens if resources don't get cleaned properly ? We don't want to keep 
the cluster up and running in that case and would like to get a feedback. Since 
Flink does not support that we have to inspect that externally.(which makes it 
more complex)
We could also pull the job status using Flink client, but it become useless if 
the job is executed infinite.

What do you think?

Best,
Tamir.


[https://my-email-signature.link/signature.gif?u=1088647&e=145530340&v=5500b7f1f0cbfd289d5f3053790ae0e36932941ce59f5ce3694a2ae0a6341dcd]

From: Yang Wang 
Sent: Tuesday, April 6, 2021 10:36 AM
To: Tamir Sagi 
Cc: user@flink.apache.org 
Subject: Re: Application cluster - Job execution and cluster creation timeouts


EXTERNAL EMAIL


Hi Tamir,

Thanks for trying the native K8s integration.

1. We do not have a timeout for creating the Flink application cluster. The 
reason is that the job submission happens on the JobManager side.
So the Flink client does not need to wait for the JobManager running and then 
exit.

I think even though the Flink client internally has the timeout, we still have 
the same problem when the Flink client crashes and then the timeout is
gone.

I want to share some other solution about the timeout. In our deployer, when a 
new Flink application is created, the deployer will periodically check the
accessibility of Flink rest endpoint. When it is not ready in the timeout(e.g. 
120s), the deployer will delete the Flink JobManager deployment and try to
create a new one.

2. Actually, the current "flink run-application" does not support the real 
attached mode(waiting for all the jobs in the application finished).
I am curious why you have "infinite job execution" in your Flink  application 
cluster. If all the jobs in the application finished, Flink will
deregister the application and all the K8s resources should be cleaned up.


Best,
Yang


Tamir Sagi mailto:tamir.s...@niceactimize.com>> 
于2021年4月5日周一 下午11:24写道:
Hey all,

We deploy application cluster natively on Kubernetes.

are there any timeouts for Job execution and cluster creation?

I went over the configuration page 
here
  but did not find anything relevant.

In order to get an indication about the cluster , we leverage the k8s 
client
 to watch the 
deployment
 in a namespace with specific cluster name and respond accordingly.

we define two timeouts

  1.  Creating the application cluster (i.e. to date if there are errors in 
pods, the k8s deployment is up but the application cluster is not running.)
  2.  Until the application cluster resources get cleaned(upon completion)  - 
which prevent an infinite job execution or k8s glitches

However,  this solution is not ideal because in case this client lib crashes, 
the timeouts are gone.
We don't want to manage these timeouts states ourselves.

Any suggestion or better way?

Thanks,
Tamir.




[https://my-email-signature.link/signature.gif?u=1088647&e=145346582&v=3f32b726c93b8d93869d4a1520a346f1c12902a66bd38eb48abc091003335147]

Confidentiality: This communication and any attachments are intended for the 
above-named persons only and may be confidential and/or legally privileged. Any 
opinions expressed in this communication are not necessarily those of NICE 
Actimize. If this communication has come to you in error you must take no 
action based on it, nor must you copy or show it to anyone; pleas

Re: How to know if task-local recovery kicked in for some nodes?

2021-04-06 Thread Till Rohrmann
Hi Sonam,

The easiest way to see whether local state has been used for recovery is
the recovery time. Apart from that you can also look for "Found registered
local state for checkpoint {} in subtask ({} - {} - {}" in the logs which
is logged on debug. This indicates that the local state is available.
However, it does not say whether it is actually used. E.g. when doing a
rescaling operation we change the assignment of key group ranges which
prevents local state from being used. However in case of a recovery the
above-mentioned log message should indicate that we use local state
recovery.

Cheers,
Till

On Tue, Apr 6, 2021 at 11:31 AM Tzu-Li (Gordon) Tai 
wrote:

> Hi Sonam,
>
> Pulling in Till (cc'ed), I believe he would likely be able to help you
> here.
>
> Cheers,
> Gordon
>
> On Fri, Apr 2, 2021 at 8:18 AM Sonam Mandal  wrote:
>
>> Hello,
>>
>> We are experimenting with task local recovery and I wanted to know
>> whether there is a way to validate that some tasks of the job recovered
>> from the local state rather than the remote state.
>>
>> We've currently set this up to have 2 Task Managers with 2 slots each,
>> and we run a job with parallelism 4. To simulate failure, we kill one of
>> the Task Manager pods (we run on Kubernetes). I want to see if the local
>> state of the other Task Manager was used or not. I do understand that the
>> state for the killed Task Manager will need to be fetched from the
>> checkpoint.
>>
>> Also, do you have any suggestions on how to test such failure scenarios
>> in a better way?
>>
>> Thanks,
>> Sonam
>>
>


Re: period batch job lead to OutOfMemoryError: Metaspace problem

2021-04-06 Thread Yangze Guo
> I have tried this method, but the problem still exist.
How much memory do you configure for it?

> is 21 instances of "org.apache.flink.util.ChildFirstClassLoader" normal
Not quite sure about it. AFAIK, each job will have a classloader.
Multiple tasks of the same job in the same TM will share the same
classloader. The classloader will be removed if there is no more task
running on the TM. Classloader without reference will be finally
cleanup by GC. Could you share JM and TM logs for further analysis?
I'll also involve @Guowei Ma in this thread.


Best,
Yangze Guo

On Tue, Apr 6, 2021 at 6:05 PM 太平洋 <495635...@qq.com> wrote:
>
> I have tried this method, but the problem still exist.
> by heap dump analysis, is 21 instances of 
> "org.apache.flink.util.ChildFirstClassLoader" normal?
>
>
> -- 原始邮件 --
> 发件人: "Yangze Guo" ;
> 发送时间: 2021年4月6日(星期二) 下午4:32
> 收件人: "太平洋"<495635...@qq.com>;
> 抄送: "user";
> 主题: Re: period batch job lead to OutOfMemoryError: Metaspace problem
>
> I think you can try to increase the JVM metaspace option for
> TaskManagers through taskmanager.memory.jvm-metaspace.size. [1]
>
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/memory/mem_trouble/#outofmemoryerror-metaspace
>
> Best,
> Yangze Guo
>
> Best,
> Yangze Guo
>
>
> On Tue, Apr 6, 2021 at 4:22 PM 太平洋 <495635...@qq.com> wrote:
> >
> > batch job:
> > read data from s3 by sql,then by some operators and write data to 
> > clickhouse and kafka.
> > after some times, task-manager quit with OutOfMemoryError: Metaspace.
> >
> > env:
> > flink version:1.12.2
> > task-manager slot count: 5
> > deployment: standalone kubernetes session 模式
> > dependencies:
> >
> > 
> >
> >   org.apache.flink
> >
> >   flink-connector-kafka_2.11
> >
> >   ${flink.version}
> >
> > 
> >
> > 
> >
> >   com.google.code.gson
> >
> >   gson
> >
> >   2.8.5
> >
> > 
> >
> > 
> >
> >   org.apache.flink
> >
> >   flink-connector-jdbc_2.11
> >
> >   ${flink.version}
> >
> > 
> >
> > 
> >
> >   ru.yandex.clickhouse
> >
> >   clickhouse-jdbc
> >
> >   0.3.0
> >
> > 
> >
> > 
> >
> >   org.apache.flink
> >
> > flink-parquet_2.11
> >
> > ${flink.version}
> >
> > 
> >
> > 
> >
> >  org.apache.flink
> >
> >  flink-json
> >
> >  ${flink.version}
> >
> > 
> >
> >
> > heap dump1:
> >
> > Leak Suspects
> >
> > System Overview
> >
> >  Leaks
> >
> >  Overview
> >
> >
> >   Problem Suspect 1
> >
> > 21 instances of "org.apache.flink.util.ChildFirstClassLoader", loaded by 
> > "sun.misc.Launcher$AppClassLoader @ 0x73b2d42e0" occupy 29,656,880 (41.16%) 
> > bytes.
> >
> > Biggest instances:
> >
> > org.apache.flink.util.ChildFirstClassLoader @ 0x73ca2a1e8 - 1,474,760 
> > (2.05%) bytes.
> > org.apache.flink.util.ChildFirstClassLoader @ 0x73d2af820 - 1,474,168 
> > (2.05%) bytes.
> > org.apache.flink.util.ChildFirstClassLoader @ 0x73cdcaa10 - 1,474,160 
> > (2.05%) bytes.
> > org.apache.flink.util.ChildFirstClassLoader @ 0x73cf6aab0 - 1,474,160 
> > (2.05%) bytes.
> > org.apache.flink.util.ChildFirstClassLoader @ 0x73dd8 - 1,474,160 
> > (2.05%) bytes.
> > org.apache.flink.util.ChildFirstClassLoader @ 0x73d2bb108 - 1,474,128 
> > (2.05%) bytes.
> > org.apache.flink.util.ChildFirstClassLoader @ 0x73de202e0 - 1,474,120 
> > (2.05%) bytes.
> > org.apache.flink.util.ChildFirstClassLoader @ 0x73dadc778 - 1,474,112 
> > (2.05%) bytes.
> > org.apache.flink.util.ChildFirstClassLoader @ 0x73d5f70e8 - 1,474,064 
> > (2.05%) bytes.
> > org.apache.flink.util.ChildFirstClassLoader @ 0x73d93aa38 - 1,474,064 
> > (2.05%) bytes.
> > org.apache.flink.util.ChildFirstClassLoader @ 0x73e179638 - 1,474,064 
> > (2.05%) bytes.
> > org.apache.flink.util.ChildFirstClassLoader @ 0x73dc80418 - 1,474,056 
> > (2.05%) bytes.
> > org.apache.flink.util.ChildFirstClassLoader @ 0x73dfcda60 - 1,474,056 
> > (2.05%) bytes.
> > org.apache.flink.util.ChildFirstClassLoader @ 0x73e4bcd38 - 1,474,056 
> > (2.05%) bytes.
> > org.apache.flink.util.ChildFirstClassLoader @ 0x73d6006e8 - 1,474,032 
> > (2.05%) bytes.
> > org.apache.flink.util.ChildFirstClassLoader @ 0x73c7d2ad8 - 1,461,944 
> > (2.03%) bytes.
> > org.apache.flink.util.ChildFirstClassLoader @ 0x73ca1bb98 - 1,460,752 
> > (2.03%) bytes.
> > org.apache.flink.util.ChildFirstClassLoader @ 0x73bf203f0 - 1,460,744 
> > (2.03%) bytes.
> > org.apache.flink.util.ChildFirstClassLoader @ 0x73e3284a8 - 1,445,232 
> > (2.01%) bytes.
> > org.apache.flink.util.ChildFirstClassLoader @ 0x73e65de00 - 1,445,232 
> > (2.01%) bytes.
> >
> >
> >
> > Keywords
> > org.apache.flink.util.ChildFirstClassLoader
> > sun.misc.Launcher$AppClassLoader @ 0x73b2d42e0
> > Details »
> >
> >   Problem Suspect 2
> >
> > 34,407 instances of "org.apache.flink.core.memory.HybridMemorySegment", 
> > loaded by "sun.misc.Launcher$AppClassLoader @ 0x73b2d42e0" occupy 7,707,168 
> > (10.70%) bytes.
> >
>

回复: period batch job lead to OutOfMemoryError: Metaspace problem

2021-04-06 Thread 太平洋
I have tried this method, but the problem still exist.
by heap dump analysis, is 21 instances of 
"org.apache.flink.util.ChildFirstClassLoader" normal?




-- 原始邮件 --
发件人:
"Yangze Guo"

https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/memory/mem_trouble/#outofmemoryerror-metaspace

Best,
Yangze Guo

Best,
Yangze Guo


On Tue, Apr 6, 2021 at 4:22 PM 太平洋 <495635...@qq.com> wrote:
>
> batch job:
> read data from s3 by sql,then by some operators and write data to 
clickhouse and kafka.
> after some times, task-manager quit with OutOfMemoryError: Metaspace.
>
> env:
> flink version:1.12.2
> task-manager slot count: 5
> deployment: standalone kubernetes session 模式
> dependencies:
>
> 

Re: How to know if task-local recovery kicked in for some nodes?

2021-04-06 Thread Tzu-Li (Gordon) Tai
Hi Sonam,

Pulling in Till (cc'ed), I believe he would likely be able to help you here.

Cheers,
Gordon

On Fri, Apr 2, 2021 at 8:18 AM Sonam Mandal  wrote:

> Hello,
>
> We are experimenting with task local recovery and I wanted to know whether
> there is a way to validate that some tasks of the job recovered from the
> local state rather than the remote state.
>
> We've currently set this up to have 2 Task Managers with 2 slots each, and
> we run a job with parallelism 4. To simulate failure, we kill one of the
> Task Manager pods (we run on Kubernetes). I want to see if the local state
> of the other Task Manager was used or not. I do understand that the state
> for the killed Task Manager will need to be fetched from the checkpoint.
>
> Also, do you have any suggestions on how to test such failure scenarios in
> a better way?
>
> Thanks,
> Sonam
>


Re: Checkpoint timeouts at times of high load

2021-04-06 Thread Robert Metzger
It could very well be that your job gets stuck in a restart loop for some
reason. Can you either post the full TaskManager logs here, or try to
figure out yourself why the first checkpoint that timed out, timed out?
Backpressure or blocked operators are a common cause for this. In your
case, it could very well be that the Kafka producer is not confirming the
checkpoint due to the Kafka transactions. If backpressure is causing this,
consider enabling unaligned checkpoints. It could also be. the case that
the transactions of Kafka are too slow, causing backpressure and checkpoint
timeouts?!



On Mon, Apr 5, 2021 at 9:57 AM Geldenhuys, Morgan Karl <
morgan.geldenh...@tu-berlin.de> wrote:

> Thank you for the information. I have a feeling this is more to do with
> EXACTLY_ONCE kafka producers and transactions not playing nice with
> checkpoints and a timeout happens. The jobs seem to fail and hit this
> restart and fail loop. Looking in the logs, taskmanager logs grow very
> large with the same messages repeating over and over again. Ive attacked a
> file for this. The two lines that give me pause are:
>
>
>
> *Closing the Kafka producer with timeoutMillis = 0 ms. *
>
> *Proceeding to force close the producer since pending requests could not
> be completed within timeout 0 ms.*
>
>
> I'm not really sure which timeout this is but it looks like there is a
> timeout loop happening here.
>
>
> The Kafka producer has been configured as such (the transaction timeout
> has been set on the kafka server to match the producer):
>
>
> Properties kafkaProducerProps = new Properties();
> kafkaProducerProps.setProperty("bootstrap.servers", brokerList);
> kafkaProducerProps.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 
> "360");
> kafkaProducerProps.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
>  "5");
> kafkaProducerProps.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, 
> UUID.randomUUID().toString());
> kafkaProducerProps.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, 
> "true");
> kafkaProducerProps.setProperty(ProducerConfig.LINGER_MS_CONFIG, "500");
> kafkaProducerProps.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
> kafkaProducerProps.setProperty(ProducerConfig.BUFFER_MEMORY_CONFIG, 
> "33554432");
> kafkaProducerProps.setProperty(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 
> "3");
> kafkaProducerProps.setProperty(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 
> "12");
>
> FlinkKafkaProducer myProducer =
> new FlinkKafkaProducer<>(
> producerTopic,
> (KafkaSerializationSchema) (value, aLong) -> {
> return new ProducerRecord<>(producerTopic, value.getBytes());
> },
> kafkaProducerProps,
> Semantic.EXACTLY_ONCE,
> 10);
>
>
> And checkpoints have been configured as such:
>
>
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> // configuring RocksDB state backend to use HDFS
> String backupFolder = props.getProperty("hdfs.backupFolder");
> StateBackend backend = new RocksDBStateBackend(backupFolder, true);
> env.setStateBackend(backend);
> // start a checkpoint based on supplied interval
> env.enableCheckpointing(checkpointInterval);
> // set mode to exactly-once (this is the default)
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> // make sure progress happen between checkpoints
> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(checkpointInterval);
> // checkpoints have to complete within two minute, or are discarded
> env.getCheckpointConfig().setCheckpointTimeout(38);
> //env.getCheckpointConfig().setTolerableCheckpointFailureNumber();
> // no external services which could take some time to respond, therefore 1
> // allow only one checkpoint to be in progress at the same time
> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
> // enable externalized checkpoints which are deleted after job cancellation
> env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
>
>
> Additionally, each taskmanager has been configured with 4GB of memory,
> there is a sliding window of 10 seconds with a slide of 1 second, and the
> cluster setup is using flink native.
>
>
> Any hints would be much appreciated!
>
>
> Regards,
>
> M.
>
>
> --
> *From:* Guowei Ma 
> *Sent:* 01 April 2021 14:19
> *To:* Geldenhuys, Morgan Karl
> *Cc:* user
> *Subject:* Re: Checkpoint timeouts at times of high load
>
> Hi,
> I think there are many reasons that could lead to the checkpoint timeout.
> Would you like to share some detailed information of checkpoint? For
> example, the detailed checkpoint information from the web.[1]  And which
> Flink version do you use?
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/monitoring/checkpoint_monitoring.html
>
> Best,
> Guowei
>
>
> On Thu, Apr 1, 2021 at 4:33 PM Geldenhuys, Morgan Karl <
> 

Re: Why is Hive dependency flink-sql-connector-hive not available on Maven Central?

2021-04-06 Thread Yik San Chan
Thanks for the tip!

On Tue, Apr 6, 2021 at 4:25 PM Rui Li  wrote:

> Hi Yik San,
>
> Glad to know you've found the jar. Another option to locate the jar is to
> just use maven dependency plugin like this:
>
> *mvn dependency:get
> -Dartifact=org.apache.flink:flink-sql-connector-hive-2.3.6_2.12:1.12.2*
>
> On Tue, Apr 6, 2021 at 4:10 PM Yik San Chan 
> wrote:
>
>> Hi,
>>
>> I am able to find the jar from Maven central. See updates in the
>> StackOverflow post.
>>
>> Thank you!
>>
>> Best,
>> Yik San
>>
>> On Tue, Apr 6, 2021 at 4:05 PM Tzu-Li (Gordon) Tai 
>> wrote:
>>
>>> Hi,
>>>
>>> I'm pulling in Rui Li (cc'ed) who might be able to help you here as he
>>> actively maintains the hive connectors.
>>>
>>> Cheers,
>>> Gordon
>>>
>>>
>>> On Fri, Apr 2, 2021 at 11:36 AM Yik San Chan 
>>> wrote:
>>>
 The question is cross-posted in StackOverflow
 https://stackoverflow.com/questions/66914119/flink-why-is-hive-dependency-flink-sql-connector-hive-not-available-on-maven-ce

 According to [Flink SQL Hive: Using bundled hive jar](
 https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/hive/#using-bundled-hive-jar
 ):

 > The following tables list all available bundled hive jars. You can
 pick one to the /lib/ directory in Flink distribution.
 > - flink-sql-connector-hive-1.2.2 (download link)
 > - flink-sql-connector-hive-2.2.0 (download link)
 > ...

 However, these dependencies are not available from Maven central. As a
 work around, I use [user defined dependencies](
 https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/hive/#user-defined-dependencies),
 but this is not recommended:

 > the recommended way to add dependency is to use a bundled jar.
 Separate jars should be used only if bundled jars don’t meet your needs.

 I wonder why the bundle jars are not available in Maven central?

 Follow-up: Since they are not available from Maven central, I wonder
 how to include them in pom.xml in order to run `mvn package`?

 Thanks!

>>>
>
> --
> Best regards!
> Rui Li
>


Re: period batch job lead to OutOfMemoryError: Metaspace problem

2021-04-06 Thread Yangze Guo
I think you can try to increase the JVM metaspace option for
TaskManagers through taskmanager.memory.jvm-metaspace.size. [1]

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/memory/mem_trouble/#outofmemoryerror-metaspace

Best,
Yangze Guo

Best,
Yangze Guo


On Tue, Apr 6, 2021 at 4:22 PM 太平洋 <495635...@qq.com> wrote:
>
> batch job:
> read data from s3 by sql,then by some operators and write data to clickhouse 
> and kafka.
> after some times, task-manager quit with OutOfMemoryError: Metaspace.
>
> env:
> flink version:1.12.2
> task-manager slot count: 5
> deployment: standalone kubernetes session 模式
> dependencies:
>
> 
>
>   org.apache.flink
>
>   flink-connector-kafka_2.11
>
>   ${flink.version}
>
> 
>
> 
>
>   com.google.code.gson
>
>   gson
>
>   2.8.5
>
> 
>
> 
>
>   org.apache.flink
>
>   flink-connector-jdbc_2.11
>
>   ${flink.version}
>
> 
>
> 
>
>   ru.yandex.clickhouse
>
>   clickhouse-jdbc
>
>   0.3.0
>
> 
>
> 
>
>   org.apache.flink
>
> flink-parquet_2.11
>
> ${flink.version}
>
> 
>
> 
>
>  org.apache.flink
>
>  flink-json
>
>  ${flink.version}
>
> 
>
>
> heap dump1:
>
> Leak Suspects
>
> System Overview
>
>  Leaks
>
>  Overview
>
>
>   Problem Suspect 1
>
> 21 instances of "org.apache.flink.util.ChildFirstClassLoader", loaded by 
> "sun.misc.Launcher$AppClassLoader @ 0x73b2d42e0" occupy 29,656,880 (41.16%) 
> bytes.
>
> Biggest instances:
>
> org.apache.flink.util.ChildFirstClassLoader @ 0x73ca2a1e8 - 1,474,760 (2.05%) 
> bytes.
> org.apache.flink.util.ChildFirstClassLoader @ 0x73d2af820 - 1,474,168 (2.05%) 
> bytes.
> org.apache.flink.util.ChildFirstClassLoader @ 0x73cdcaa10 - 1,474,160 (2.05%) 
> bytes.
> org.apache.flink.util.ChildFirstClassLoader @ 0x73cf6aab0 - 1,474,160 (2.05%) 
> bytes.
> org.apache.flink.util.ChildFirstClassLoader @ 0x73dd8 - 1,474,160 (2.05%) 
> bytes.
> org.apache.flink.util.ChildFirstClassLoader @ 0x73d2bb108 - 1,474,128 (2.05%) 
> bytes.
> org.apache.flink.util.ChildFirstClassLoader @ 0x73de202e0 - 1,474,120 (2.05%) 
> bytes.
> org.apache.flink.util.ChildFirstClassLoader @ 0x73dadc778 - 1,474,112 (2.05%) 
> bytes.
> org.apache.flink.util.ChildFirstClassLoader @ 0x73d5f70e8 - 1,474,064 (2.05%) 
> bytes.
> org.apache.flink.util.ChildFirstClassLoader @ 0x73d93aa38 - 1,474,064 (2.05%) 
> bytes.
> org.apache.flink.util.ChildFirstClassLoader @ 0x73e179638 - 1,474,064 (2.05%) 
> bytes.
> org.apache.flink.util.ChildFirstClassLoader @ 0x73dc80418 - 1,474,056 (2.05%) 
> bytes.
> org.apache.flink.util.ChildFirstClassLoader @ 0x73dfcda60 - 1,474,056 (2.05%) 
> bytes.
> org.apache.flink.util.ChildFirstClassLoader @ 0x73e4bcd38 - 1,474,056 (2.05%) 
> bytes.
> org.apache.flink.util.ChildFirstClassLoader @ 0x73d6006e8 - 1,474,032 (2.05%) 
> bytes.
> org.apache.flink.util.ChildFirstClassLoader @ 0x73c7d2ad8 - 1,461,944 (2.03%) 
> bytes.
> org.apache.flink.util.ChildFirstClassLoader @ 0x73ca1bb98 - 1,460,752 (2.03%) 
> bytes.
> org.apache.flink.util.ChildFirstClassLoader @ 0x73bf203f0 - 1,460,744 (2.03%) 
> bytes.
> org.apache.flink.util.ChildFirstClassLoader @ 0x73e3284a8 - 1,445,232 (2.01%) 
> bytes.
> org.apache.flink.util.ChildFirstClassLoader @ 0x73e65de00 - 1,445,232 (2.01%) 
> bytes.
>
>
>
> Keywords
> org.apache.flink.util.ChildFirstClassLoader
> sun.misc.Launcher$AppClassLoader @ 0x73b2d42e0
> Details »
>
>   Problem Suspect 2
>
> 34,407 instances of "org.apache.flink.core.memory.HybridMemorySegment", 
> loaded by "sun.misc.Launcher$AppClassLoader @ 0x73b2d42e0" occupy 7,707,168 
> (10.70%) bytes.
>
> Keywords
> org.apache.flink.core.memory.HybridMemorySegment
> sun.misc.Launcher$AppClassLoader @ 0x73b2d42e0
>
> Details »
>
>
>
> heap dump2:
>
> Leak Suspects
>
> System Overview
>
>  Leaks
>
>  Overview
>
>   Problem Suspect 1
>
> 21 instances of "org.apache.flink.util.ChildFirstClassLoader", loaded by 
> "sun.misc.Launcher$AppClassLoader @ 0x73b2d42e0" occupy 26,061,408 (30.68%) 
> bytes.
>
> Biggest instances:
>
> org.apache.flink.util.ChildFirstClassLoader @ 0x73e9e9930 - 1,474,224 (1.74%) 
> bytes.
> org.apache.flink.util.ChildFirstClassLoader @ 0x73edce0b8 - 1,474,224 (1.74%) 
> bytes.
> org.apache.flink.util.ChildFirstClassLoader @ 0x73f1ad7d0 - 1,474,168 (1.74%) 
> bytes.
> org.apache.flink.util.ChildFirstClassLoader @ 0x73f3e5118 - 1,474,168 (1.74%) 
> bytes.
> org.apache.flink.util.ChildFirstClassLoader @ 0x73f5d3fe0 - 1,474,168 (1.74%) 
> bytes.
> org.apache.flink.util.ChildFirstClassLoader @ 0x73ebd8d28 - 1,474,160 (1.74%) 
> bytes.
> org.apache.flink.util.ChildFirstClassLoader @ 0x73efc00c0 - 1,474,160 (1.74%) 
> bytes.
> org.apache.flink.util.ChildFirstClassLoader @ 0x73e2251a8 - 1,474,136 (1.74%) 
> bytes.
> org.apache.flink.util.ChildFirstClassLoader @ 0x73cc24af0 - 1,474,064 (1.74%) 
> bytes.
> org.apache.flink.util.ChildFirstClassLoader @ 0x73cdca3e0 - 1,474,064 (1.74%) 
> bytes.
> org.a

Re: Why is Hive dependency flink-sql-connector-hive not available on Maven Central?

2021-04-06 Thread Rui Li
Hi Yik San,

Glad to know you've found the jar. Another option to locate the jar is to
just use maven dependency plugin like this:

*mvn dependency:get
-Dartifact=org.apache.flink:flink-sql-connector-hive-2.3.6_2.12:1.12.2*

On Tue, Apr 6, 2021 at 4:10 PM Yik San Chan 
wrote:

> Hi,
>
> I am able to find the jar from Maven central. See updates in the
> StackOverflow post.
>
> Thank you!
>
> Best,
> Yik San
>
> On Tue, Apr 6, 2021 at 4:05 PM Tzu-Li (Gordon) Tai 
> wrote:
>
>> Hi,
>>
>> I'm pulling in Rui Li (cc'ed) who might be able to help you here as he
>> actively maintains the hive connectors.
>>
>> Cheers,
>> Gordon
>>
>>
>> On Fri, Apr 2, 2021 at 11:36 AM Yik San Chan 
>> wrote:
>>
>>> The question is cross-posted in StackOverflow
>>> https://stackoverflow.com/questions/66914119/flink-why-is-hive-dependency-flink-sql-connector-hive-not-available-on-maven-ce
>>>
>>> According to [Flink SQL Hive: Using bundled hive jar](
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/hive/#using-bundled-hive-jar
>>> ):
>>>
>>> > The following tables list all available bundled hive jars. You can
>>> pick one to the /lib/ directory in Flink distribution.
>>> > - flink-sql-connector-hive-1.2.2 (download link)
>>> > - flink-sql-connector-hive-2.2.0 (download link)
>>> > ...
>>>
>>> However, these dependencies are not available from Maven central. As a
>>> work around, I use [user defined dependencies](
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/hive/#user-defined-dependencies),
>>> but this is not recommended:
>>>
>>> > the recommended way to add dependency is to use a bundled jar.
>>> Separate jars should be used only if bundled jars don’t meet your needs.
>>>
>>> I wonder why the bundle jars are not available in Maven central?
>>>
>>> Follow-up: Since they are not available from Maven central, I wonder how
>>> to include them in pom.xml in order to run `mvn package`?
>>>
>>> Thanks!
>>>
>>

-- 
Best regards!
Rui Li


Flink 1.12.2 sql api use parquet format error

2021-04-06 Thread ??????
ref: https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/formats/parquet.html


env and error:


Flink version?? 1.12.2
deployment?? standalone kubernetes session
dependency:
        

period batch job lead to OutOfMemoryError: Metaspace problem

2021-04-06 Thread ??????
batch job??
read data from s3 by sql??then by some operators and write data to clickhouse 
and kafka.
after some times, task-manager quit with OutOfMemoryError: Metaspace.


env??
flink version??1.12.2
task-manager slot count: 5
deployment?? standalone kubernetes session 
dependencies??

    

Re: [External] : Re: Need help with executing Flink CLI for native Kubernetes deployment

2021-04-06 Thread Yang Wang
Hi Fuyao,

Sorry for the late reply.

It is not very hard to develop your own deployer. Actually, I have 3 days
for developing the PoC version of flink-native-k8s-operator. So
if you want to have a fully functional K8s operator, maybe two weeks is
enough. But if you want to put it into production, you may need
some more time to polish it for easier use.

Flink native K8s integration is not going to replace the standalone mode.
First, not all the Flink standalone clusters are running on the K8s.
And standalone mode could work really well with reactive mode[1].


Flink native K8s integration is not going to replace the K8s operator.
Actually, the Flink K8s operator is not on the same level of Flink native
integration. The Flink k8s operator is responsible for managing the
lifecycle of Flink application. Also it is to make the submission more K8s
style.
The google and lyft Flink k8s operator could support native mode. They just
do not have the support right now.


Kubernetes HA could work both for standalone mode and native mode. You
could find the configuration here[2]. However, you might
need some changes on the Flink k8s operator to make it work. Because we
need to add more args(e.g. --host) to the JobManager start commands.


[1].
https://cwiki.apache.org/confluence/display/FLINK/FLIP-159%3A+Reactive+Mode
[2].
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/kubernetes/#high-availability-with-standalone-kubernetes

Best,
Yang


Fuyao Li  于2021年4月5日周一 下午1:33写道:

> Hello Yang,
>
>
>
> I am just following up the previous email to see if you got some time to
> reply.
>
> I also took a deeper look into lyft k8s operator recently. It seems it
> doesn’t support HA natively. It still needs the help of ZooKeeper. In terms
> of this, native k8s is better. Any other ideas? Thanks for your help.
>
>
>
> Best,
>
> Fuyao
>
>
>
> *From: *Fuyao Li 
> *Date: *Thursday, April 1, 2021 at 12:22
> *To: *Yang Wang 
> *Cc: *user 
> *Subject: *Re: [External] : Re: Need help with executing Flink CLI for
> native Kubernetes deployment
>
> Hi Yang,
>
>
>
> Thanks for sharing the insights.
>
>
>
> For problem 1:
>
> I think I can’t do telnet in the container. I tried to use curl
> 144.25.13.78:8081 and I could see the HTML of Flink dashboard UI. This
> proves such public IP is reachable inside the cluster. Just as you
> mentioned, there might still be some network issues with the cluster. I
> will do some further check.
>
>
>
> For problem 2:
>
> I created a new K8S cluster with bastion server with some public IP
> assigned to it. Finally, I can see something valid from my browser. (There
> still exist some problems with connecting to some databases, but I think
> these network problems are not directly related to Flink, I can investigate
> into it later.)
>
>
>
> For problem 3:
>
> Thanks for sharing the repo you created. I am not sure how much work it
> could take to develop a deployer. I understand is depends on the
> proficiency, could you give a rough estimation? If it is too complicated
> and some other options are not significantly inferior to native Kubernetes.
> I might prefer to choose other options. I am currently comparing different
> options to deploy in Kubernetes.
>
>1. Standalone K8S
>2. Native Kubernetes
>3. Flink operator (Google Cloud Platform/ Lyft) [1][2]
>
>
>
> I also watched the demo video you presented. [3] I noticed you mentioned
> that native K8S is not going to replace the other two options. I still
> doesn’t fully get your idea with limited explanation in the demo. Could you
> compare the tradeoff a little bit? Thanks!
>
> [1] https://github.com/GoogleCloudPlatform/flink-on-k8s-operator
> 
>
> [2]  https://github.com/lyft/flinkk8soperator
> 
>
> [3] https://youtu.be/pdFPr_VOWTU
> 
>
>
>
> Best,
>
> Fuyao
>
>
>
>
>
> *From: *Yang Wang 
> *Date: *Tuesday, March 30, 2021 at 19:15
> *To: *Fuyao Li 
> *Cc: *user 
> *Subject: *Re: [External] : Re: Need help with executing Flink CLI for
> native Kubernetes deployment
>
> Hi Fuyao,
>
>
>
> Thanks for sharing the progress.
>
>
>
> 1. The flink client is able to list/cancel jobs, based on logs shared
> above, I should be able to ping 144.25.13.78, why I still can NOT ping such
> address?
>
>
>
> I think this is a environment problem. Actually, not every IP address
> could be tested with "ping" command. I suggest you to use "telnet
> 144.25.13.78:8081
> 

Re: Why is Hive dependency flink-sql-connector-hive not available on Maven Central?

2021-04-06 Thread Yik San Chan
Hi,

I am able to find the jar from Maven central. See updates in the
StackOverflow post.

Thank you!

Best,
Yik San

On Tue, Apr 6, 2021 at 4:05 PM Tzu-Li (Gordon) Tai 
wrote:

> Hi,
>
> I'm pulling in Rui Li (cc'ed) who might be able to help you here as he
> actively maintains the hive connectors.
>
> Cheers,
> Gordon
>
>
> On Fri, Apr 2, 2021 at 11:36 AM Yik San Chan 
> wrote:
>
>> The question is cross-posted in StackOverflow
>> https://stackoverflow.com/questions/66914119/flink-why-is-hive-dependency-flink-sql-connector-hive-not-available-on-maven-ce
>>
>> According to [Flink SQL Hive: Using bundled hive jar](
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/hive/#using-bundled-hive-jar
>> ):
>>
>> > The following tables list all available bundled hive jars. You can pick
>> one to the /lib/ directory in Flink distribution.
>> > - flink-sql-connector-hive-1.2.2 (download link)
>> > - flink-sql-connector-hive-2.2.0 (download link)
>> > ...
>>
>> However, these dependencies are not available from Maven central. As a
>> work around, I use [user defined dependencies](
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/hive/#user-defined-dependencies),
>> but this is not recommended:
>>
>> > the recommended way to add dependency is to use a bundled jar. Separate
>> jars should be used only if bundled jars don’t meet your needs.
>>
>> I wonder why the bundle jars are not available in Maven central?
>>
>> Follow-up: Since they are not available from Maven central, I wonder how
>> to include them in pom.xml in order to run `mvn package`?
>>
>> Thanks!
>>
>


Re: Why is Hive dependency flink-sql-connector-hive not available on Maven Central?

2021-04-06 Thread Tzu-Li (Gordon) Tai
Hi,

I'm pulling in Rui Li (cc'ed) who might be able to help you here as he
actively maintains the hive connectors.

Cheers,
Gordon


On Fri, Apr 2, 2021 at 11:36 AM Yik San Chan 
wrote:

> The question is cross-posted in StackOverflow
> https://stackoverflow.com/questions/66914119/flink-why-is-hive-dependency-flink-sql-connector-hive-not-available-on-maven-ce
>
> According to [Flink SQL Hive: Using bundled hive jar](
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/hive/#using-bundled-hive-jar
> ):
>
> > The following tables list all available bundled hive jars. You can pick
> one to the /lib/ directory in Flink distribution.
> > - flink-sql-connector-hive-1.2.2 (download link)
> > - flink-sql-connector-hive-2.2.0 (download link)
> > ...
>
> However, these dependencies are not available from Maven central. As a
> work around, I use [user defined dependencies](
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/hive/#user-defined-dependencies),
> but this is not recommended:
>
> > the recommended way to add dependency is to use a bundled jar. Separate
> jars should be used only if bundled jars don’t meet your needs.
>
> I wonder why the bundle jars are not available in Maven central?
>
> Follow-up: Since they are not available from Maven central, I wonder how
> to include them in pom.xml in order to run `mvn package`?
>
> Thanks!
>


Re: Application cluster - Job execution and cluster creation timeouts

2021-04-06 Thread Yang Wang
Hi Tamir,

Thanks for trying the native K8s integration.

1. We do not have a timeout for creating the Flink application cluster. The
reason is that the job submission happens on the JobManager side.
So the Flink client does not need to wait for the JobManager running and
then exit.

I think even though the Flink client internally has the timeout, we still
have the same problem when the Flink client crashes and then the timeout is
gone.

I want to share some other solution about the timeout. In our deployer,
when a new Flink application is created, the deployer will periodically
check the
accessibility of Flink rest endpoint. When it is not ready in the
timeout(e.g. 120s), the deployer will delete the Flink JobManager
deployment and try to
create a new one.

2. Actually, the current "flink run-application" does not support the real
attached mode(waiting for all the jobs in the application finished).
I am curious why you have "infinite job execution" in your Flink
application cluster. If all the jobs in the application finished, Flink will
deregister the application and all the K8s resources should be cleaned up.


Best,
Yang


Tamir Sagi  于2021年4月5日周一 下午11:24写道:

> Hey all,
>
> We deploy application cluster natively on Kubernetes.
>
> are there any timeouts for Job execution and cluster creation?
>
> I went over the configuration page here
> 
> but did not find anything relevant.
>
> In order to get an indication about the cluster , we leverage the k8s
> client
> 
>  to watch the deployment
> 
>  in a namespace with specific cluster name and respond accordingly.
>
> we define two timeouts
>
>1. Creating the application cluster (i.e. to date if there are errors
>in pods, the k8s deployment is up but the application cluster is not
>running.)
>2. Until the application cluster resources get cleaned(upon
>completion)  - which prevent an infinite job execution or k8s glitches
>
>
> However,  this solution is not ideal because in case this client lib
> crashes, the timeouts are gone.
> We don't want to manage these timeouts states ourselves.
>
> Any suggestion or better way?
>
> Thanks,
> Tamir.
>
>
>
>
>
> Confidentiality: This communication and any attachments are intended for
> the above-named persons only and may be confidential and/or legally
> privileged. Any opinions expressed in this communication are not
> necessarily those of NICE Actimize. If this communication has come to you
> in error you must take no action based on it, nor must you copy or show it
> to anyone; please delete/destroy and inform the sender by e-mail
> immediately.
> Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
> Viruses: Although we have taken steps toward ensuring that this e-mail and
> attachments are free from any virus, we advise that in keeping with good
> computing practice the recipient should ensure they are actually virus free.
>


Re: Zigzag shape in TM JVM used memory

2021-04-06 Thread Piotr Nowojski
Hi,

this should be posted on the user mailing list not the dev.

Apart from that, this looks like normal/standard behaviour of JVM, and has
very little to do with Flink. Garbage Collector (GC) is kicking in when
memory usage is approaching some threshold:
https://www.google.com/search?q=jvm+heap+memory+usage&tbm=isch

Piotrek


pon., 5 kwi 2021 o 22:54 Lu Niu  napisał(a):

> Hi,
>
> we need to update our email system then :) . Here are the links:
>
>
> https://drive.google.com/file/d/1lZ5_P8_NqsN1JeLzutGj4DxkyWJN75mR/view?usp=sharing
>
>
> https://drive.google.com/file/d/1J6c6rQJwtDp1moAGlvQyLQXTqcuG4HjL/view?usp=sharing
>
>
> https://drive.google.com/file/d/1-R2KzsABC471AEjkF5qTm5O3V47cpbBV/view?usp=sharing
>
> All are DataStream job.
>
> Best
> Lu
>
> On Sun, Apr 4, 2021 at 9:17 PM Yun Gao  wrote:
>
> >
> > Hi Lu,
> >
> > The image seems not be able to shown due to the mail server limitation,
> > could you upload it somewhere and paste the link here ?
> >
> > Logically, I think zigzag usually due to there are some small object get
> > created and eliminated soon in the heap. Are you running a SQL job or a
> > DataStream job ?
> >
> > Best,
> > Yun
> >
> > --
> > Sender:Lu Niu
> > Date:2021/04/05 12:06:24
> > Recipient:d...@flink.apache.org
> > Theme:Zigzag shape in TM JVM used memory
> >
> > Hi, Flink dev
> >
> > We observed that the TM JVM used memory metric shows zigzag shape among
> > lots of our applications, although these applications are quite different
> > in business logic. The upper bound is close to the max heap size. Is this
> > expected in flink application? Or does flink internally
> > aggressively pre-allocate memory?
> >
> > app1
> > [image: Screen Shot 2021-04-04 at 8.46.45 PM.png]
> > app2
> > [image: Screen Shot 2021-04-04 at 8.45.35 PM.png]
> > app3
> > [image: Screen Shot 2021-04-04 at 8.43.53 PM.png]
> >
> > Best
> > Lu
> >
> >
>