Re: Flink on Yarn, restart job will not destroy original task manager
Hi James, Local recovery is disabled by default. You do not need to configure anything in addition. Did you run into problems again or does it work now? If you are stil experiencing task spread out, can you configure logging on DEBUG level, and share the jobmanager logs with us? Best, Gary On Tue, Sep 4, 2018 at 5:42 AM, James (Jian Wu) [FDS Data Platform] < james...@coupang.com> wrote: > Hi Gary: > > > > From 1.5/1.6 document: > > > > Configuring task-local recovery > > Task-local recovery is *deactivated by default* and can be activated > through Flink’s configuration with the key state.backend.local-recovery as > specified in CheckpointingOptions.LOCAL_RECOVERY. The value for this > setting can either be *true* to enable or *false*(default) to disable > local recovery. > > > > By default, local recovery is deactive. In 1.5.0, I’ve not enable local > recovery. > > > > So whether I need manual disable local recovery via flink.conf? > > > > Regards > > > > James > > > > *From: *"James (Jian Wu) [FDS Data Platform]" > *Date: *Monday, September 3, 2018 at 4:13 PM > *To: *Gary Yao > > *Cc: *"user@flink.apache.org" > *Subject: *Re: Flink on Yarn, restart job will not destroy original task > manager > > > > My Flink version is 1.5, I will rebuild new version flink > > > > Regards > > > > James > > > > *From: *Gary Yao > *Date: *Monday, September 3, 2018 at 3:57 PM > *To: *"James (Jian Wu) [FDS Data Platform]" > *Cc: *"user@flink.apache.org" > *Subject: *Re: Flink on Yarn, restart job will not destroy original task > manager > > > > Hi James, > > What version of Flink are you running? In 1.5.0, tasks can spread out due > to > changes that were introduced to support "local recovery". There is a > mitigation in 1.5.1 that prevents task spread out but local recovery must > be > disabled [2]. > > Best, > Gary > > [1] https://issues.apache.org/jira/browse/FLINK-9635 > [2] https://issues.apache.org/jira/browse/FLINK-9634 > > > > On Mon, Sep 3, 2018 at 9:20 AM, James (Jian Wu) [FDS Data Platform] < > james...@coupang.com> wrote: > > Hi: > > > > I launch flink application on yarn with 5 task manager, every task > manager has 5 slots with such script > > > > #!/bin/sh > > CLASSNAME=$1 > > JARNAME=$2 > > ARUGMENTS=$3 > > > > export JVM_ARGS="${JVM_ARGS} -Dmill.env.active=aws" > > /usr/bin/flink run -m yarn-cluster --parallelism 15 -yn 5 -ys 3 -yjm 8192 > -ytm 8192 -ynm flink-order-detection -yD > env.java.opts.jobmanager='-Dmill.env.active=aws' > -yD env.java.opts.taskmanager='-Dmill.env.active=aws' -c $CLASSNAME \ > > $JARNAME $ARUGMENTS > > > > > > The original flink app occupy 5 containers and 15 vcores, run for 3+ days, > one of task manage killed by yarn because of memory leak and job manager > start new task managers. Currently my flink app running normally on yarn, > but occupy 10 containers, 28 vcores. (Application Master shows my flink > job running for 75 hours, click into running job in flink web ui, it shows > my job running for 28hours because of restart) > > > > In my opinion, job manager will attempt to start the failed task manager, > and in the final app still use 5 containers and 15 vcores, why after > restart job by yarn will occupy double resource. > > > > Any one can give me some suggestion? > > > > Regards > > > > James > > >
Re: This server is not the leader for that topic-partition
Flink: 1.4.2 flink-connector-kafka-0.11_2.11 (1.4.2) Kafka: 0.10.1.0 Jayant Ameta On Tue, Sep 4, 2018 at 10:16 AM vino yang wrote: > Hi Jayant, > > Can you provide more specific information? For example, the version of > your Flink, the version of kafka on which Flink-Kafka-Connector depends, > and the version of kafka server. > > Thanks, vino. > > Jayant Ameta 于2018年9月4日周二 下午12:32写道: > >> I am getting the same error. Is there a way to retry/ignore instead of >> killing the job? >> >> Jayant Ameta >> >> >> On Tue, May 22, 2018 at 7:58 PM gerardg wrote: >> >>> I've seen the same error while upgrading Kafka. We are using >>> FlinkKafkaProducer011 and Kafka version 1.0.0. While upgrading to Kafka >>> 1.1.0, each time a server was restarted, an already running Flink job >>> failed >>> with the same message. >>> >>> Gerard >>> >>> >>> >>> -- >>> Sent from: >>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >>> >>
Re: This server is not the leader for that topic-partition
Hi Jayant, Can you provide more specific information? For example, the version of your Flink, the version of kafka on which Flink-Kafka-Connector depends, and the version of kafka server. Thanks, vino. Jayant Ameta 于2018年9月4日周二 下午12:32写道: > I am getting the same error. Is there a way to retry/ignore instead of > killing the job? > > Jayant Ameta > > > On Tue, May 22, 2018 at 7:58 PM gerardg wrote: > >> I've seen the same error while upgrading Kafka. We are using >> FlinkKafkaProducer011 and Kafka version 1.0.0. While upgrading to Kafka >> 1.1.0, each time a server was restarted, an already running Flink job >> failed >> with the same message. >> >> Gerard >> >> >> >> -- >> Sent from: >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >> >
Re: This server is not the leader for that topic-partition
I am getting the same error. Is there a way to retry/ignore instead of killing the job? Jayant Ameta On Tue, May 22, 2018 at 7:58 PM gerardg wrote: > I've seen the same error while upgrading Kafka. We are using > FlinkKafkaProducer011 and Kafka version 1.0.0. While upgrading to Kafka > 1.1.0, each time a server was restarted, an already running Flink job > failed > with the same message. > > Gerard > > > > -- > Sent from: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >
Re: RocksDB Number of Keys Metric
Hi Ahmed, If you feel that this metric is necessary, you can create an issue in JIRA, then the problem may be more easily seen by the relevant people. If you need to answer this question, maybe it is more effective to ping @Andrey? Thanks, vino. Ahmed 于2018年9月2日周日 上午2:31写道: > Is there a clean way of exposing a metrics regarding the number of keys > (even if it is an estimate using 'rocksdb.estimate-num-keys') in a rocksdb > state store? RocksDBValueState is not public to users code. > > Best, > Ahmed > >
Re: Why FlinkKafkaConsumer doesn't subscribe to topics?
Hi, Julio: 1. Flink doesn't use subscribe because it needs to control partition assignment itself, which is important for implementing exactly once. 2. Can you share the versions you are using, including kafka, kafka client, flink? We are also use flink kafka consumer and we can monitor it correctly. On Tue, Sep 4, 2018 at 3:09 AM Julio Biason wrote: > Hey guys, > > We are trying to add external monitoring to our system, but we can only > get the lag in kafka topics while the Flink job is running -- if, for some > reason, the Flink job fails, we get no visibility on how big the lag is. > > (Besides that, the way Flink reports is not accurate and produces a lot of > -Inf, which I already discussed before.) > > While looking at the problem, we noticed that the FlinkKafkaConsumer never > uses `subscribe` to subscribe to the topics and that's why the values are > never stored back into Kafka, even when the driver itself does > `commitAsync`. > > Is there any reason for not subscribing to topics that I may have missed? > > -- > *Julio Biason*, Sofware Engineer > *AZION* | Deliver. Accelerate. Protect. > Office: +55 51 3083 8101 | Mobile: +55 51 > *99907 0554* > -- Liu, Renjie Software Engineer, MVAD
Re: Get stream of rejected data from Elasticsearch6 sink
Hi Nick, When you get the failed data, the logic for implementing the side output is similar to the logic for extending the ActionRequestFailureHandler#onFailure method to output the data to other places. Thanks, vino. Nick Triller 于2018年8月31日周五 下午9:08写道: > Hi all, > > > > is it possible to further process data that could not be persisted by the > Elasticsearch6 sink without breaking checkpointing? > > As I understand, the onFailure callback can’t be used to forward rejected > data into a separate stream. > > > > I would like to extend the sink if this use case is not covered yet. > > What would be a reasonable approach that matches Flink’s overall > architecture? > > > > As a new Flink user, my first intuition was to check if it is possible to > create a side output from the Elasticsearch6 sink. > > Does it make sense to support sink side outputs? > > > > Thank you for your advice. > > > > Regards, > > Nick >
Re: Flink on Yarn, restart job will not destroy original task manager
Hi Gary: From 1.5/1.6 document: Configuring task-local recovery Task-local recovery is deactivated by default and can be activated through Flink’s configuration with the key state.backend.local-recovery as specified in CheckpointingOptions.LOCAL_RECOVERY. The value for this setting can either be true to enable or false(default) to disable local recovery. By default, local recovery is deactive. In 1.5.0, I’ve not enable local recovery. So whether I need manual disable local recovery via flink.conf? Regards James From: "James (Jian Wu) [FDS Data Platform]" Date: Monday, September 3, 2018 at 4:13 PM To: Gary Yao Cc: "user@flink.apache.org" Subject: Re: Flink on Yarn, restart job will not destroy original task manager My Flink version is 1.5, I will rebuild new version flink Regards James From: Gary Yao Date: Monday, September 3, 2018 at 3:57 PM To: "James (Jian Wu) [FDS Data Platform]" Cc: "user@flink.apache.org" Subject: Re: Flink on Yarn, restart job will not destroy original task manager Hi James, What version of Flink are you running? In 1.5.0, tasks can spread out due to changes that were introduced to support "local recovery". There is a mitigation in 1.5.1 that prevents task spread out but local recovery must be disabled [2]. Best, Gary [1] https://issues.apache.org/jira/browse/FLINK-9635 [2] https://issues.apache.org/jira/browse/FLINK-9634 On Mon, Sep 3, 2018 at 9:20 AM, James (Jian Wu) [FDS Data Platform] mailto:james...@coupang.com>> wrote: Hi: I launch flink application on yarn with 5 task manager, every task manager has 5 slots with such script #!/bin/sh CLASSNAME=$1 JARNAME=$2 ARUGMENTS=$3 export JVM_ARGS="${JVM_ARGS} -Dmill.env.active=aws" /usr/bin/flink run -m yarn-cluster --parallelism 15 -yn 5 -ys 3 -yjm 8192 -ytm 8192 -ynm flink-order-detection -yD env.java.opts.jobmanager='-Dmill.env.active=aws' -yD env.java.opts.taskmanager='-Dmill.env.active=aws' -c $CLASSNAME \ $JARNAME $ARUGMENTS The original flink app occupy 5 containers and 15 vcores, run for 3+ days, one of task manage killed by yarn because of memory leak and job manager start new task managers. Currently my flink app running normally on yarn, but occupy 10 containers, 28 vcores. (Application Master shows my flink job running for 75 hours, click into running job in flink web ui, it shows my job running for 28hours because of restart) In my opinion, job manager will attempt to start the failed task manager, and in the final app still use 5 containers and 15 vcores, why after restart job by yarn will occupy double resource. Any one can give me some suggestion? Regards James
JAXB Classloading errors when using PMML Library (AWS EMR Flink 1.4.2)
Hi, I am using PMML dependency as below to execute ML models at prediction time within a Flink Map operator org.jpmml pmml-evaluator 1.4.3 javax.xml.bind jaxb-api org.glassfish.jaxb jaxb-runtime guava com.google.guava Environment is EMR, OpenJDK 1.8 and Flink 1.4.2. My programs run fine in my Eclipse Development environment. However when we deploy on the cluster we get Classloading exceptions which are primarily due to the PMML classes loaded via the Flink Classloader while the JAXB classes are loaded by the boot classloader. Also the problem seems like the version of the jaxb classes referenced within the PMML library is different from the ones loaded by the open JDK. For example I keep getting this type of error. I have also listed another error after this which is linked to not being able to use reflection and unsafe library to set private instances within the PMML class instance using JAXB Unmarshaller. - java.lang.LinkageError: loader constraint violation: when resolving interface method "javax.xml.bind.Unmarshaller.unmarshal(Ljavax/xml/transform/Source;)Ljava/lang/Object;" the class loader (instance of org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders$ChildFirstClassLoader) of the current class, com/comcast/mlarche/featurecreationflows/xreerrors/MyJAXBUtil, and the class loader (instance of ) for the method's defining class, javax/xml/bind/Unmarshaller, have different Class objects for the type javax/xml/transform/Source used in the signature at com.comcast.mlarche.featurecreationflows.xreerrors.MyJAXBUtil.unmarshal(MyJAXBUtil.java:52) at com.comcast.mlarche.featurecreationflows.xreerrors.MyJAXBUtil.unmarshalPMML(MyJAXBUtil.java:38) at com.comcast.mlarche.featurecreationflows.xreerrors.PMMLModelExecution.getMiningModelEvaluator(PMMLModelExecution.java:67) at com.comcast.mlarche.featurecreationflows.xreerrors.PMMLModelExecution.predict(PMMLModelExecution.java:126) at com.comcast.mlarche.featurecreationflows.xreerrors.XreErrorModelsPredictionServiceService.predict(XreErrorModelsPredictionServiceService.java:61) at com.comcast.mlarche.featurecreationflows.xreerrors.XreErrorModelsPredictionServiceService.predictSystemRefresh(XreErrorModelsPredictionServiceService.java:44) at com.comcast.mlarche.featurecreationflows.xreerrors.XREErrorsModelExecutionMap.flatMap(XREErrorsModelExecutionMap.java:46) at com.comcast.mlarche.featurecreationflows.xreerrors.XREErrorsModelExecutionMap.flatMap(XREErrorsModelExecutionMap.java:17) at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) at com.comcast.mlarche.featurecreationflows.xreerrors.XreErrorsModelsApplyFunction.apply(XreErrorsModelsApplyFunction.java:65) at com.comcast.mlarche.featurecreationflows.xreerrors.XreErrorsModelsApplyFunction.apply(XreErrorsModelsApplyFunction.java:20) at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:44) at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:32) at org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator.emitWindowContents(EvictingWindowOperator.java:357) at org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator.processElement(EvictingWindowOperator.java:218) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:207) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Thread.java:748) Finally when I started using InputStream based constructor (so as not to use SAXSource classe) below is the last error I got when I finally got as deep within the library as possible without using any util classes. java.lang.RuntimeException: java.lang.IllegalArgumentException: Can not set org.xml.sax.Locator field org.dmg.pmml.PMMLObject.locator to org.xml.sax.helpers.LocatorImpl
Why FlinkKafkaConsumer doesn't subscribe to topics?
Hey guys, We are trying to add external monitoring to our system, but we can only get the lag in kafka topics while the Flink job is running -- if, for some reason, the Flink job fails, we get no visibility on how big the lag is. (Besides that, the way Flink reports is not accurate and produces a lot of -Inf, which I already discussed before.) While looking at the problem, we noticed that the FlinkKafkaConsumer never uses `subscribe` to subscribe to the topics and that's why the values are never stored back into Kafka, even when the driver itself does `commitAsync`. Is there any reason for not subscribing to topics that I may have missed? -- *Julio Biason*, Sofware Engineer *AZION* | Deliver. Accelerate. Protect. Office: +55 51 3083 8101 | Mobile: +55 51 *99907 0554*
RE: Flink 1.5.2 query
The message is in WARN level and not in INFO or DEBUG level , so our log analysis tool considers this as an issue. Can the CLI print this message in INFO/DEBUG level? -Original Message- From: Chesnay Schepler [mailto:ches...@apache.org] Sent: Monday, September 3, 2018 6:32 PM To: Parth Sarathy ; user@flink.apache.org Subject: Re: Flink 1.5.2 query Cannot be avoided. The CLI eagerly loads client classes for yarn, which as see fails since the hadoop classes aren't available. If you don't use YARN you can safely ignore this. On 03.09.2018 14:37, Parth Sarathy wrote: > Hi, >When using flink 1.5.2, “Apache Flink Only” binary > (flink-1.5.2-bin-scala_2.11), following error is seen in client log: > > 2018-08-30 10:56:59.129 [main] WARN > org.apache.flink.client.cli.CliFrontend > - Could not load CLI class org.apache.flink.yarn.cli.FlinkYarnSessionCli. > java.lang.NoClassDefFoundError: > org/apache/hadoop/yarn/exceptions/YarnException > at java.lang.Class.forName0(Native Method) ~[na:1.8.0_171] > at java.lang.Class.forName(Class.java:264) ~[na:1.8.0_171] > at > org.apache.flink.client.cli.CliFrontend.loadCustomCommandLine(CliFront > end.java:1208) > [flink-dist_2.11-1.5.2.jar:1.5.2] > at > org.apache.flink.client.cli.CliFrontend.loadCustomCommandLines(CliFron > tend.java:1164) > [flink-dist_2.11-1.5.2.jar:1.5.2] > at > org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1090) > [flink-dist_2.11-1.5.2.jar:1.5.2] > Caused by: java.lang.ClassNotFoundException: > org.apache.hadoop.yarn.exceptions.YarnException > at java.net.URLClassLoader.findClass(URLClassLoader.java:381) > ~[na:1.8.0_171] > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > ~[na:1.8.0_171] > at > sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) > ~[na:1.8.0_171] > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > ~[na:1.8.0_171] > ... 5 common frames omitted > > There are no functional failures, but how to avoid this exception in log? > > Thanks, > Parth Sarathy > > > > > -- > Sent from: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >
Re: Flink 1.5.2 query
Cannot be avoided. The CLI eagerly loads client classes for yarn, which as see fails since the hadoop classes aren't available. If you don't use YARN you can safely ignore this. On 03.09.2018 14:37, Parth Sarathy wrote: Hi, When using flink 1.5.2, “Apache Flink Only” binary (flink-1.5.2-bin-scala_2.11), following error is seen in client log: 2018-08-30 10:56:59.129 [main] WARN org.apache.flink.client.cli.CliFrontend - Could not load CLI class org.apache.flink.yarn.cli.FlinkYarnSessionCli. java.lang.NoClassDefFoundError: org/apache/hadoop/yarn/exceptions/YarnException at java.lang.Class.forName0(Native Method) ~[na:1.8.0_171] at java.lang.Class.forName(Class.java:264) ~[na:1.8.0_171] at org.apache.flink.client.cli.CliFrontend.loadCustomCommandLine(CliFrontend.java:1208) [flink-dist_2.11-1.5.2.jar:1.5.2] at org.apache.flink.client.cli.CliFrontend.loadCustomCommandLines(CliFrontend.java:1164) [flink-dist_2.11-1.5.2.jar:1.5.2] at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1090) [flink-dist_2.11-1.5.2.jar:1.5.2] Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.yarn.exceptions.YarnException at java.net.URLClassLoader.findClass(URLClassLoader.java:381) ~[na:1.8.0_171] at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[na:1.8.0_171] at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) ~[na:1.8.0_171] at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[na:1.8.0_171] ... 5 common frames omitted There are no functional failures, but how to avoid this exception in log? Thanks, Parth Sarathy -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Flink 1.5.2 query
Hi, When using flink 1.5.2, “Apache Flink Only” binary (flink-1.5.2-bin-scala_2.11), following error is seen in client log: 2018-08-30 10:56:59.129 [main] WARN org.apache.flink.client.cli.CliFrontend - Could not load CLI class org.apache.flink.yarn.cli.FlinkYarnSessionCli. java.lang.NoClassDefFoundError: org/apache/hadoop/yarn/exceptions/YarnException at java.lang.Class.forName0(Native Method) ~[na:1.8.0_171] at java.lang.Class.forName(Class.java:264) ~[na:1.8.0_171] at org.apache.flink.client.cli.CliFrontend.loadCustomCommandLine(CliFrontend.java:1208) [flink-dist_2.11-1.5.2.jar:1.5.2] at org.apache.flink.client.cli.CliFrontend.loadCustomCommandLines(CliFrontend.java:1164) [flink-dist_2.11-1.5.2.jar:1.5.2] at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1090) [flink-dist_2.11-1.5.2.jar:1.5.2] Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.yarn.exceptions.YarnException at java.net.URLClassLoader.findClass(URLClassLoader.java:381) ~[na:1.8.0_171] at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[na:1.8.0_171] at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) ~[na:1.8.0_171] at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[na:1.8.0_171] ... 5 common frames omitted There are no functional failures, but how to avoid this exception in log? Thanks, Parth Sarathy -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Promethus - custom metrics at job level
Thank you Reza. I will try your repo first :) Regards, Averell -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Can't access Flink Dashboard at 8081, running Flink program using Eclipse
you can setup a specific port using https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#rest-port. On 03.09.2018 12:12, Mar_zieh wrote: Hello I added these dependencies to "pom.xml"; also, I added configuration to my code like these: Configuration config = new Configuration(); config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true); StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(getP, config); First, I could connect to Dashboard,but now I get this error: "Exception in thread "main" java.net.BindException: Address already in use: bind" I know it is related to port number (8081) which is shared with other programs. But I cannot solve it. Would you please guide me? I really appreciate it. Sincerely -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Can't access Flink Dashboard at 8081, running Flink program using Eclipse
Hello I added these dependencies to "pom.xml"; also, I added configuration to my code like these: Configuration config = new Configuration(); config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true); StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(getP, config); First, I could connect to Dashboard,but now I get this error: "Exception in thread "main" java.net.BindException: Address already in use: bind" I know it is related to port number (8081) which is shared with other programs. But I cannot solve it. Would you please guide me? I really appreciate it. Sincerely -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Flink on kubernetes
Please try to use fsstatebackend as a test to see if the problems disappear. Med venlig hilsen / Best regards Lasse Nedergaard > Den 3. sep. 2018 kl. 11.46 skrev 祁明良 : > > Hi Lasse, > > Is there JIRA ticket I can follow? > > Best, > Mingliang > >> On 3 Sep 2018, at 5:42 PM, Lasse Nedergaard >> wrote: >> >> Hi. >> >> We have documented the same on Flink 1.4.2/1.6 running on Yarn and Mesos. >> If you correlate the none heap memory together with job restart you will see >> none heap increases for every restart until you get an OOM. >> >> I let you know if/when I know how to handle the problem. >> >> Med venlig hilsen / Best regards >> Lasse Nedergaard >> >> >>> Den 3. sep. 2018 kl. 10.08 skrev 祁明良 : >>> >>> Hi All, >>> >>> We are running flink(version 1.5.2) on k8s with rocksdb backend. >>> Each time when the job is cancelled and restarted, we face OOMKilled >>> problem from the container. >>> In our case, we only assign 15% of container memory to JVM and leave others >>> to rocksdb. >>> To us, it looks like memory used by rocksdb is not released after job >>> cancelling. Anyone can gives some suggestions? >>> Currently our tmp fix is to restart the TM pod for each job cancelling, but >>> it has to be manually. >>> >>> Regards, >>> Mingliang >>> >>> 本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、复制、或散发)本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本邮件! >>> This communication may contain privileged or other confidential information >>> of Red. If you have received it in error, please advise the sender by reply >>> e-mail and immediately delete the message and any attachments without >>> copying or disclosing the contents. Thank you. > > > 本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、复制、或散发)本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本邮件! > This communication may contain privileged or other confidential information > of Red. If you have received it in error, please advise the sender by reply > e-mail and immediately delete the message and any attachments without copying > or disclosing the contents. Thank you.
Re: Flink on kubernetes
Hi Lasse, Is there JIRA ticket I can follow? Best, Mingliang > On 3 Sep 2018, at 5:42 PM, Lasse Nedergaard wrote: > > Hi. > > We have documented the same on Flink 1.4.2/1.6 running on Yarn and Mesos. > If you correlate the none heap memory together with job restart you will see > none heap increases for every restart until you get an OOM. > > I let you know if/when I know how to handle the problem. > > Med venlig hilsen / Best regards > Lasse Nedergaard > > >> Den 3. sep. 2018 kl. 10.08 skrev 祁明良 : >> >> Hi All, >> >> We are running flink(version 1.5.2) on k8s with rocksdb backend. >> Each time when the job is cancelled and restarted, we face OOMKilled problem >> from the container. >> In our case, we only assign 15% of container memory to JVM and leave others >> to rocksdb. >> To us, it looks like memory used by rocksdb is not released after job >> cancelling. Anyone can gives some suggestions? >> Currently our tmp fix is to restart the TM pod for each job cancelling, but >> it has to be manually. >> >> Regards, >> Mingliang >> >> 本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、复制、或散发)本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本邮件! >> This communication may contain privileged or other confidential information >> of Red. If you have received it in error, please advise the sender by reply >> e-mail and immediately delete the message and any attachments without >> copying or disclosing the contents. Thank you. 本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、复制、或散发)本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本邮件! This communication may contain privileged or other confidential information of Red. If you have received it in error, please advise the sender by reply e-mail and immediately delete the message and any attachments without copying or disclosing the contents. Thank you.
Re: Flink on kubernetes
Hi. We have documented the same on Flink 1.4.2/1.6 running on Yarn and Mesos. If you correlate the none heap memory together with job restart you will see none heap increases for every restart until you get an OOM. I let you know if/when I know how to handle the problem. Med venlig hilsen / Best regards Lasse Nedergaard > Den 3. sep. 2018 kl. 10.08 skrev 祁明良 : > > Hi All, > > We are running flink(version 1.5.2) on k8s with rocksdb backend. > Each time when the job is cancelled and restarted, we face OOMKilled problem > from the container. > In our case, we only assign 15% of container memory to JVM and leave others > to rocksdb. > To us, it looks like memory used by rocksdb is not released after job > cancelling. Anyone can gives some suggestions? > Currently our tmp fix is to restart the TM pod for each job cancelling, but > it has to be manually. > > Regards, > Mingliang > > 本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、复制、或散发)本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本邮件! > This communication may contain privileged or other confidential information > of Red. If you have received it in error, please advise the sender by reply > e-mail and immediately delete the message and any attachments without copying > or disclosing the contents. Thank you.
Re: CompletedCheckpoints are getting Stale ( Flink 1.4.2 )
One final thought: How to you stop the unbounded streaming application? If you just kill the Yarn/Mesos/K8s cluster, Flink will not know that this is a shutdown, and interpret it as a failure. Because of that, checkpoints will remain (in DFS and in ZooKeeper). On Fri, Aug 31, 2018 at 2:18 PM, vino yang wrote: > Hi Laura: > > Perhaps this is possible because the path to the completed checkpoint on > HDFS does not have a hierarchical relationship to identify which job it > belongs to, it is just a fixed prefix plus a random string generated name. > My personal advice: > > 1) Verify it with a clean cluster (clean up the metadata that > Flink/Zookeeper/HDFS might confuse); > 2) Verify the node and metadata information (/checkpoints/${jobID}/) on > ZooKeeper; > 3) Observe whether there is relevant abnormal information in the log; > > Thanks, vino. > > Laura Uzcátegui 于2018年8月31日周五 下午3:51写道: > >> Hi Stephan and Vino, >> >> Thanks for the quick reply and hints. >> >> The configuration for the checkpoints that should remain is set to 1. >> >> Since this is a unbounded job run and I can't see it finishing, I suspect >> as we tear down the cluster every time we finish with the integration test >> being run, the completedCheckpoint doesn't get deleted, next when the >> integration test runs again it picks up from the latest completedCheckpoint >> but there is cases where that job doesn't run again therefore the >> completedCheckpoint gets staled. Is this something that could happen? >> >> Is there anyway to check by logging wether the job gets to Global Final >> State before we tear down the cluster? >> >> Cheers, >> >> Laura >> >> On Fri, 31 Aug 2018, 8:37 am Stephan Ewen, wrote: >> >>> Hi Laura! >>> >>> Vino had good pointers. There really should be no case in which this is >>> not cleaned up. >>> >>> Is this a bounded job that ends? Is it always the last of the bounded >>> job's checkpoints that remains? >>> >>> Best, >>> Stephan >>> >>> >>> On Fri, Aug 31, 2018 at 5:02 AM, vino yang >>> wrote: >>> Hi Laura, First of all, Flink only keeps one completed checkpoint by default[1]. You need to confirm whether your configuration is consistent with the number of files. If they are consistent, it is for other reasons: 1) The cleaning of the completed checkpoint is done by JM. You need to confirm whether it can access your file.[2] 2) JM will asynchronously clean up the metadata of the old completed checkpoint on the ZK with a background thread. After the cleanup is successful, it will clean the Checkpoint data. If the above reasons are excluded, then maybe you provide JM's log to help us confirm whether this is the reason. I think it is more appropriate to ping Till.[3] [1]: https://ci.apache.org/projects/flink/flink-docs- release-1.6/dev/stream/state/checkpointing.html#state- checkpoints-num-retained [2]: https://stackoverflow.com/questions/44928624/apache- flink-not-deleting-old-checkpoints [3]: https://github.com/apache/flink/blob/master/ flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ ZooKeeperStateHandleStore.java#L437 Thanks, vino. Laura Uzcátegui 于2018年8月30日周四 下午10:52写道: > Hello, > > At work, we are currently standing up a cluster with the following > configuration: > > >- Flink version: 1.4.2 >- HA Enabled with Zookeeper >- State backend : rocksDB >- state.checkpoints.dir: hdfs://namenode:9000/flink/checkpoints >- state.backend.rocksdb.checkpointdir: hdfs://namenode:9000/flink/ >checkpoints >- *high-availability.storageDir*: hdfs://namenode:9000/flink/ >recovery > > We have also a job running with checkpointing enabled and without > externalized checkpoint. > > We run this job multiple times a day since it's run from our > integration-test pipeline, and we started noticing the folder > *high-availability.storageDir *storing the completedCheckpoint files > is increasing constantly the number of files created, which is making us > wonder if there is no cleanup policy for the Filesystem when HA is > enabled. > > Under what circumstance would there be an ever increasing number of > completedCheckpoint files on the HA storage dir when there is only a > single > job running over and over again ? > > Here is a list of what we are seeing accumulating over time and > actually reaching the maximum of files allowed on the Filesystem. > > completedCheckpoint00d86c01d8b9 > completedCheckpoint00d86e9030a9 > completedCheckpoint00d877b74355 > completedCheckpoint00d87b3dd9ad > completedCheckpoint00d8815d9afd > completedCheckpoint00d88973195c > completedCheckpoint00d88b4792f2 > completedCheckpoint00d890d499dc > completedCheckpoint00d91b00ada2 > > > Cheers,
Re: Flink on Yarn, restart job will not destroy original task manager
My Flink version is 1.5, I will rebuild new version flink Regards James From: Gary Yao Date: Monday, September 3, 2018 at 3:57 PM To: "James (Jian Wu) [FDS Data Platform]" Cc: "user@flink.apache.org" Subject: Re: Flink on Yarn, restart job will not destroy original task manager Hi James, What version of Flink are you running? In 1.5.0, tasks can spread out due to changes that were introduced to support "local recovery". There is a mitigation in 1.5.1 that prevents task spread out but local recovery must be disabled [2]. Best, Gary [1] https://issues.apache.org/jira/browse/FLINK-9635 [2] https://issues.apache.org/jira/browse/FLINK-9634 On Mon, Sep 3, 2018 at 9:20 AM, James (Jian Wu) [FDS Data Platform] mailto:james...@coupang.com>> wrote: Hi: I launch flink application on yarn with 5 task manager, every task manager has 5 slots with such script #!/bin/sh CLASSNAME=$1 JARNAME=$2 ARUGMENTS=$3 export JVM_ARGS="${JVM_ARGS} -Dmill.env.active=aws" /usr/bin/flink run -m yarn-cluster --parallelism 15 -yn 5 -ys 3 -yjm 8192 -ytm 8192 -ynm flink-order-detection -yD env.java.opts.jobmanager='-Dmill.env.active=aws' -yD env.java.opts.taskmanager='-Dmill.env.active=aws' -c $CLASSNAME \ $JARNAME $ARUGMENTS The original flink app occupy 5 containers and 15 vcores, run for 3+ days, one of task manage killed by yarn because of memory leak and job manager start new task managers. Currently my flink app running normally on yarn, but occupy 10 containers, 28 vcores. (Application Master shows my flink job running for 75 hours, click into running job in flink web ui, it shows my job running for 28hours because of restart) In my opinion, job manager will attempt to start the failed task manager, and in the final app still use 5 containers and 15 vcores, why after restart job by yarn will occupy double resource. Any one can give me some suggestion? Regards James
Re: Rolling File Sink Exception
You're closing the stream and then call super.close() which calls flush, which fails since you already closed the stream. If you don't close the stream the problem should disappear. On 03.09.2018 09:30, clay wrote: When I want to write compressed string data to hdfs, I found that flink only provides StringWritter, so I used a custom writter, as follows: public class StringCompressWriter extends StreamWriterBase { private static final long serialVersionUID = 1L; private String charsetName; private transient Charset charset; private transient CompressionOutputStream outStream; public StringCompressWriter() { this("UTF-8"); } public StringCompressWriter(String charsetName) { this.charsetName = charsetName; } protected StringCompressWriter(StringCompressWriter other) { super(other); this.charsetName = other.charsetName; } /** * open & write * @return */ @Override public void open(FileSystem fs, Path path) throws IOException { super.open(fs, path); this.charset = Charset.forName(charsetName); Configuration conf = fs.getConf(); CompressionCodecFactory codecFactory = new CompressionCodecFactory(conf); CompressionCodec codec = codecFactory.getCodecByName("GzipCodec"); FSDataOutputStream dataOutputStream = getStream(); Compressor compressor = CodecPool.getCompressor(codec, fs.getConf()); outStream = codec.createOutputStream(dataOutputStream, compressor); } @Override public void write(T element) throws IOException { getStream(); // Throws if the stream is not open outStream.write(element.toString().getBytes(charset)); outStream.write('\n'); } @Override public void close() throws IOException { if (outStream != null) { outStream.close(); //outStream = null; } super.close(); } @Override public Writer duplicate() { return new StringCompressWriter<>(this); } @Override public int hashCode() { return Objects.hash(super.hashCode(), charsetName); } @Override public boolean equals(Object other) { if (this == other) { return true; } if (other == null) { return false; } if (getClass() != other.getClass()) { return false; } StringCompressWriter writer = (StringCompressWriter) other; // field comparison return Objects.equals(charsetName, writer.charsetName) && super.equals(other); } } But when I run my app on yarn, the taskmanager always reports the following error: 2018-09-03 15:25:54,187 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph- Sink: bucketSink (10/15) (67b40f43fc72371f19e61a6ac3f60819) switched from RUNNING to FAILED. java.nio.channels.ClosedChannelException at org.apache.hadoop.hdfs.DFSOutputStream.checkClosed(DFSOutputStream.java:1618) at org.apache.hadoop.hdfs.DFSOutputStream.flushOrSync(DFSOutputStream.java:1982) at org.apache.hadoop.hdfs.DFSOutputStream.hflush(DFSOutputStream.java:1942) at org.apache.hadoop.fs.FSDataOutputStream.hflush(FSDataOutputStream.java:130) at org.apache.flink.streaming.connectors.fs.StreamWriterBase.flush(StreamWriterBase.java:83) at org.apache.flink.streaming.connectors.fs.StreamWriterBase.close(StreamWriterBase.java:99) at com.vipkid.bigdata.sink.StringCompressWriter.close(StringCompressWriter.java:73) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:570) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:514) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:446) at org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) at java.lang.Thread.run(Thread.java:748) 2018-09-03 15:25:54,191 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph- Job TestBucketing (01e7088de6b4f9cbe22d9f6a3fdbd2fe) switched from state RUNNING to FAILING. Moreover, when I looked at the data, I found that the data stream did not seem to be closed properly. hdfs dfs -text /tmp/Test/2018-08-27/part-8-96 |
Flink on kubernetes
Hi All, We are running flink(version 1.5.2) on k8s with rocksdb backend. Each time when the job is cancelled and restarted, we face OOMKilled problem from the container. In our case, we only assign 15% of container memory to JVM and leave others to rocksdb. To us, it looks like memory used by rocksdb is not released after job cancelling. Anyone can gives some suggestions? Currently our tmp fix is to restart the TM pod for each job cancelling, but it has to be manually. Regards, Mingliang 本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、复制、或散发)本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本邮件! This communication may contain privileged or other confidential information of Red. If you have received it in error, please advise the sender by reply e-mail and immediately delete the message and any attachments without copying or disclosing the contents. Thank you.
Re: Flink on Yarn, restart job will not destroy original task manager
Hi James, What version of Flink are you running? In 1.5.0, tasks can spread out due to changes that were introduced to support "local recovery". There is a mitigation in 1.5.1 that prevents task spread out but local recovery must be disabled [2]. Best, Gary [1] https://issues.apache.org/jira/browse/FLINK-9635 [2] https://issues.apache.org/jira/browse/FLINK-9634 On Mon, Sep 3, 2018 at 9:20 AM, James (Jian Wu) [FDS Data Platform] < james...@coupang.com> wrote: > Hi: > > > > I launch flink application on yarn with 5 task manager, every task > manager has 5 slots with such script > > > > #!/bin/sh > > CLASSNAME=$1 > > JARNAME=$2 > > ARUGMENTS=$3 > > > > export JVM_ARGS="${JVM_ARGS} -Dmill.env.active=aws" > > /usr/bin/flink run -m yarn-cluster --parallelism 15 -yn 5 -ys 3 -yjm 8192 > -ytm 8192 -ynm flink-order-detection -yD > env.java.opts.jobmanager='-Dmill.env.active=aws' > -yD env.java.opts.taskmanager='-Dmill.env.active=aws' -c $CLASSNAME \ > > $JARNAME $ARUGMENTS > > > > > > The original flink app occupy 5 containers and 15 vcores, run for 3+ days, > one of task manage killed by yarn because of memory leak and job manager > start new task managers. Currently my flink app running normally on yarn, > but occupy 10 containers, 28 vcores. (Application Master shows my flink > job running for 75 hours, click into running job in flink web ui, it shows > my job running for 28hours because of restart) > > > > In my opinion, job manager will attempt to start the failed task manager, > and in the final app still use 5 containers and 15 vcores, why after > restart job by yarn will occupy double resource. > > > > Any one can give me some suggestion? > > > > Regards > > > > James >
Re: Promethus - custom metrics at job level
Hello Averell Based on my experience, using out-of-the-box reporters & collectors need a little more effort! Of course I hadn't experienced all of them, but after reviewing some of them I tried my way: Writing custom reporters to push metrics in ElasticSearch (the available component in our project & very flexible). The custom reporters are able to group metrics with some configurable/dynamic parameters (in my case based on a defined metric-name and jobs): https://github.com/reza-sameei/elasticreporter I think that maybe writing a simple custom reporter will help you :) On Mon, Sep 3, 2018 at 10:52 AM Averell wrote: > Hi everyone, > > I am trying to publish some counters and meters from my Flink job, to be > scraped by a Prometheus server. It seems to me that all the metrics that I > am publishing are done at the task level, so that my Prometheus server > needs > to be configured to scrape from many targets (the number equivalent to my > max parallelism). And after that, I need to do aggregation at the > Prometheus > server to get the numbers for my whole job. > > My question is: is it possible to have metrics at the job level? And can I > have one single Prometheus target to scrape the data from? > > I found this > > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Service-discovery-for-Prometheus-on-YARN-td21988.html > , > which looks like a "no" answer to my question. However, I still hope for > some easy to use solution. > > Thanks and best regards, > Averell > > > > -- > Sent from: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ > -- رضا سامعی | Reza Sameei | Software Developer | 09126662695
Rolling File Sink Exception
When I want to write compressed string data to hdfs, I found that flink only provides StringWritter, so I used a custom writter, as follows: public class StringCompressWriter extends StreamWriterBase { private static final long serialVersionUID = 1L; private String charsetName; private transient Charset charset; private transient CompressionOutputStream outStream; public StringCompressWriter() { this("UTF-8"); } public StringCompressWriter(String charsetName) { this.charsetName = charsetName; } protected StringCompressWriter(StringCompressWriter other) { super(other); this.charsetName = other.charsetName; } /** * open & write * @return */ @Override public void open(FileSystem fs, Path path) throws IOException { super.open(fs, path); this.charset = Charset.forName(charsetName); Configuration conf = fs.getConf(); CompressionCodecFactory codecFactory = new CompressionCodecFactory(conf); CompressionCodec codec = codecFactory.getCodecByName("GzipCodec"); FSDataOutputStream dataOutputStream = getStream(); Compressor compressor = CodecPool.getCompressor(codec, fs.getConf()); outStream = codec.createOutputStream(dataOutputStream, compressor); } @Override public void write(T element) throws IOException { getStream(); // Throws if the stream is not open outStream.write(element.toString().getBytes(charset)); outStream.write('\n'); } @Override public void close() throws IOException { if (outStream != null) { outStream.close(); //outStream = null; } super.close(); } @Override public Writer duplicate() { return new StringCompressWriter<>(this); } @Override public int hashCode() { return Objects.hash(super.hashCode(), charsetName); } @Override public boolean equals(Object other) { if (this == other) { return true; } if (other == null) { return false; } if (getClass() != other.getClass()) { return false; } StringCompressWriter writer = (StringCompressWriter) other; // field comparison return Objects.equals(charsetName, writer.charsetName) && super.equals(other); } } But when I run my app on yarn, the taskmanager always reports the following error: 2018-09-03 15:25:54,187 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph- Sink: bucketSink (10/15) (67b40f43fc72371f19e61a6ac3f60819) switched from RUNNING to FAILED. java.nio.channels.ClosedChannelException at org.apache.hadoop.hdfs.DFSOutputStream.checkClosed(DFSOutputStream.java:1618) at org.apache.hadoop.hdfs.DFSOutputStream.flushOrSync(DFSOutputStream.java:1982) at org.apache.hadoop.hdfs.DFSOutputStream.hflush(DFSOutputStream.java:1942) at org.apache.hadoop.fs.FSDataOutputStream.hflush(FSDataOutputStream.java:130) at org.apache.flink.streaming.connectors.fs.StreamWriterBase.flush(StreamWriterBase.java:83) at org.apache.flink.streaming.connectors.fs.StreamWriterBase.close(StreamWriterBase.java:99) at com.vipkid.bigdata.sink.StringCompressWriter.close(StringCompressWriter.java:73) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:570) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:514) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:446) at org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) at java.lang.Thread.run(Thread.java:748) 2018-09-03 15:25:54,191 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph- Job TestBucketing (01e7088de6b4f9cbe22d9f6a3fdbd2fe) switched from state RUNNING to FAILING. Moreover, when I looked at the data, I found that the data stream did not seem to be closed properly. hdfs dfs -text /tmp/Test/2018-08-27/part-8-96 | wc -l text: Unexpected end of ZLIB input stream 3268 Can someone tell me what happened? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Flink on Yarn, restart job will not destroy original task manager
Hi: I launch flink application on yarn with 5 task manager, every task manager has 5 slots with such script #!/bin/sh CLASSNAME=$1 JARNAME=$2 ARUGMENTS=$3 export JVM_ARGS="${JVM_ARGS} -Dmill.env.active=aws" /usr/bin/flink run -m yarn-cluster --parallelism 15 -yn 5 -ys 3 -yjm 8192 -ytm 8192 -ynm flink-order-detection -yD env.java.opts.jobmanager='-Dmill.env.active=aws' -yD env.java.opts.taskmanager='-Dmill.env.active=aws' -c $CLASSNAME \ $JARNAME $ARUGMENTS The original flink app occupy 5 containers and 15 vcores, run for 3+ days, one of task manage killed by yarn because of memory leak and job manager start new task managers. Currently my flink app running normally on yarn, but occupy 10 containers, 28 vcores. (Application Master shows my flink job running for 75 hours, click into running job in flink web ui, it shows my job running for 28hours because of restart) In my opinion, job manager will attempt to start the failed task manager, and in the final app still use 5 containers and 15 vcores, why after restart job by yarn will occupy double resource. Any one can give me some suggestion? Regards James
Promethus - custom metrics at job level
Hi everyone, I am trying to publish some counters and meters from my Flink job, to be scraped by a Prometheus server. It seems to me that all the metrics that I am publishing are done at the task level, so that my Prometheus server needs to be configured to scrape from many targets (the number equivalent to my max parallelism). And after that, I need to do aggregation at the Prometheus server to get the numbers for my whole job. My question is: is it possible to have metrics at the job level? And can I have one single Prometheus target to scrape the data from? I found this http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Service-discovery-for-Prometheus-on-YARN-td21988.html, which looks like a "no" answer to my question. However, I still hope for some easy to use solution. Thanks and best regards, Averell -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/