Re: Flink on Yarn, restart job will not destroy original task manager

2018-09-03 Thread Gary Yao
Hi James,

Local recovery is disabled by default. You do not need to configure anything
in addition.

Did you run into problems again or does it work now? If you are stil
experiencing task spread out, can you configure logging on DEBUG level, and
share the jobmanager logs with us?

Best,
Gary

On Tue, Sep 4, 2018 at 5:42 AM, James (Jian Wu) [FDS Data Platform] <
james...@coupang.com> wrote:

> Hi Gary:
>
>
>
> From 1.5/1.6 document:
>
>
>
> Configuring task-local recovery
>
> Task-local recovery is *deactivated by default* and can be activated
> through Flink’s configuration with the key state.backend.local-recovery as
> specified in CheckpointingOptions.LOCAL_RECOVERY. The value for this
> setting can either be *true* to enable or *false*(default) to disable
> local recovery.
>
>
>
> By default, local recovery is deactive. In 1.5.0, I’ve not enable local
> recovery.
>
>
>
> So whether I need manual disable local recovery via flink.conf?
>
>
>
> Regards
>
>
>
> James
>
>
>
> *From: *"James (Jian Wu) [FDS Data Platform]" 
> *Date: *Monday, September 3, 2018 at 4:13 PM
> *To: *Gary Yao 
>
> *Cc: *"user@flink.apache.org" 
> *Subject: *Re: Flink on Yarn, restart job will not destroy original task
> manager
>
>
>
> My Flink version is 1.5, I will rebuild new version flink
>
>
>
> Regards
>
>
>
> James
>
>
>
> *From: *Gary Yao 
> *Date: *Monday, September 3, 2018 at 3:57 PM
> *To: *"James (Jian Wu) [FDS Data Platform]" 
> *Cc: *"user@flink.apache.org" 
> *Subject: *Re: Flink on Yarn, restart job will not destroy original task
> manager
>
>
>
> Hi James,
>
> What version of Flink are you running? In 1.5.0, tasks can spread out due
> to
> changes that were introduced to support "local recovery". There is a
> mitigation in 1.5.1 that prevents task spread out but local recovery must
> be
> disabled [2].
>
> Best,
> Gary
>
> [1] https://issues.apache.org/jira/browse/FLINK-9635
> [2] https://issues.apache.org/jira/browse/FLINK-9634
>
>
>
> On Mon, Sep 3, 2018 at 9:20 AM, James (Jian Wu) [FDS Data Platform] <
> james...@coupang.com> wrote:
>
> Hi:
>
>
>
>   I launch flink application on yarn with 5 task manager, every task
> manager has 5 slots with such script
>
>
>
> #!/bin/sh
>
> CLASSNAME=$1
>
> JARNAME=$2
>
> ARUGMENTS=$3
>
>
>
> export JVM_ARGS="${JVM_ARGS} -Dmill.env.active=aws"
>
> /usr/bin/flink run -m yarn-cluster --parallelism 15  -yn 5 -ys 3 -yjm 8192
> -ytm 8192  -ynm flink-order-detection -yD 
> env.java.opts.jobmanager='-Dmill.env.active=aws'
> -yD env.java.opts.taskmanager='-Dmill.env.active=aws'  -c $CLASSNAME   \
>
> $JARNAME $ARUGMENTS
>
>
>
>
>
> The original flink app occupy 5 containers and 15 vcores, run for 3+ days,
> one of task manage killed by yarn because of memory leak and job manager
> start new task managers. Currently my flink app running normally on yarn,
>  but occupy 10 containers, 28 vcores. (Application Master shows my flink
> job running for 75 hours, click into running job in flink web ui, it shows
> my job running for 28hours because of restart)
>
>
>
> In my opinion, job manager will attempt to start the failed task manager,
> and in the final app still use 5 containers and 15 vcores, why after
> restart job by yarn will occupy double resource.
>
>
>
> Any one can give me some suggestion?
>
>
>
> Regards
>
>
>
> James
>
>
>


Re: This server is not the leader for that topic-partition

2018-09-03 Thread Jayant Ameta
Flink: 1.4.2
flink-connector-kafka-0.11_2.11 (1.4.2)
Kafka: 0.10.1.0

Jayant Ameta


On Tue, Sep 4, 2018 at 10:16 AM vino yang  wrote:

> Hi Jayant,
>
> Can you provide more specific information? For example, the version of
> your Flink, the version of kafka on which Flink-Kafka-Connector depends,
> and the version of kafka server.
>
> Thanks, vino.
>
> Jayant Ameta  于2018年9月4日周二 下午12:32写道:
>
>> I am getting the same error. Is there a way to retry/ignore instead of
>> killing the job?
>>
>> Jayant Ameta
>>
>>
>> On Tue, May 22, 2018 at 7:58 PM gerardg  wrote:
>>
>>> I've seen the same error while upgrading Kafka. We are using
>>> FlinkKafkaProducer011 and Kafka version 1.0.0. While upgrading to Kafka
>>> 1.1.0, each time a server was restarted, an already running Flink job
>>> failed
>>> with the same message.
>>>
>>> Gerard
>>>
>>>
>>>
>>> --
>>> Sent from:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>
>>


Re: This server is not the leader for that topic-partition

2018-09-03 Thread vino yang
Hi Jayant,

Can you provide more specific information? For example, the version of your
Flink, the version of kafka on which Flink-Kafka-Connector depends, and the
version of kafka server.

Thanks, vino.

Jayant Ameta  于2018年9月4日周二 下午12:32写道:

> I am getting the same error. Is there a way to retry/ignore instead of
> killing the job?
>
> Jayant Ameta
>
>
> On Tue, May 22, 2018 at 7:58 PM gerardg  wrote:
>
>> I've seen the same error while upgrading Kafka. We are using
>> FlinkKafkaProducer011 and Kafka version 1.0.0. While upgrading to Kafka
>> 1.1.0, each time a server was restarted, an already running Flink job
>> failed
>> with the same message.
>>
>> Gerard
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>


Re: This server is not the leader for that topic-partition

2018-09-03 Thread Jayant Ameta
I am getting the same error. Is there a way to retry/ignore instead of
killing the job?

Jayant Ameta


On Tue, May 22, 2018 at 7:58 PM gerardg  wrote:

> I've seen the same error while upgrading Kafka. We are using
> FlinkKafkaProducer011 and Kafka version 1.0.0. While upgrading to Kafka
> 1.1.0, each time a server was restarted, an already running Flink job
> failed
> with the same message.
>
> Gerard
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: RocksDB Number of Keys Metric

2018-09-03 Thread vino yang
Hi Ahmed,

If you feel that this metric is necessary, you can create an issue in JIRA,
then the problem may be more easily seen by the relevant people.
If you need to answer this question, maybe it is more effective to ping
@Andrey?

Thanks, vino.

Ahmed  于2018年9月2日周日 上午2:31写道:

> Is there a clean way of exposing a metrics regarding the number of keys
> (even if it is an estimate using 'rocksdb.estimate-num-keys') in a rocksdb
> state store? RocksDBValueState is not public to users code.
>
> Best,
> Ahmed
>
>


Re: Why FlinkKafkaConsumer doesn't subscribe to topics?

2018-09-03 Thread Renjie Liu
Hi, Julio:
1. Flink doesn't use subscribe because it needs to control partition
assignment itself, which is important for implementing exactly once.
2. Can you share the versions you are using, including kafka, kafka client,
flink?  We are also use flink kafka consumer and we can monitor it
correctly.

On Tue, Sep 4, 2018 at 3:09 AM Julio Biason  wrote:

> Hey guys,
>
> We are trying to add external monitoring to our system, but we can only
> get the lag in kafka topics while the Flink job is running -- if, for some
> reason, the Flink job fails, we get no visibility on how big the lag is.
>
> (Besides that, the way Flink reports is not accurate and produces a lot of
> -Inf, which I already discussed before.)
>
> While looking at the problem, we noticed that the FlinkKafkaConsumer never
> uses `subscribe` to subscribe to the topics and that's why the values are
> never stored back into Kafka, even when the driver itself does
> `commitAsync`.
>
> Is there any reason for not subscribing to topics that I may have missed?
>
> --
> *Julio Biason*, Sofware Engineer
> *AZION*  |  Deliver. Accelerate. Protect.
> Office: +55 51 3083 8101   |  Mobile: +55 51
> *99907 0554*
>
-- 
Liu, Renjie
Software Engineer, MVAD


Re: Get stream of rejected data from Elasticsearch6 sink

2018-09-03 Thread vino yang
Hi Nick,

When you get the failed data, the logic for implementing the side output is
similar to the logic for extending the
ActionRequestFailureHandler#onFailure method to output the data to other
places.

Thanks, vino.

Nick Triller  于2018年8月31日周五 下午9:08写道:

> Hi all,
>
>
>
> is it possible to further process data that could not be persisted by the
> Elasticsearch6 sink without breaking checkpointing?
>
> As I understand, the onFailure callback can’t be used to forward rejected
> data into a separate stream.
>
>
>
> I would like to extend the sink if this use case is not covered yet.
>
> What would be a reasonable approach that matches Flink’s overall
> architecture?
>
>
>
> As a new Flink user, my first intuition was to check if it is possible to
> create a side output from the Elasticsearch6 sink.
>
> Does it make sense to support sink side outputs?
>
>
>
> Thank you for your advice.
>
>
>
> Regards,
>
> Nick
>


Re: Flink on Yarn, restart job will not destroy original task manager

2018-09-03 Thread James (Jian Wu) [FDS Data Platform]
Hi Gary:

From 1.5/1.6 document:

Configuring task-local recovery
Task-local recovery is deactivated by default and can be activated through 
Flink’s configuration with the key state.backend.local-recovery as specified in 
CheckpointingOptions.LOCAL_RECOVERY. The value for this setting can either be 
true to enable or false(default) to disable local recovery.

By default, local recovery is deactive. In 1.5.0, I’ve not enable local 
recovery.

So whether I need manual disable local recovery via flink.conf?

Regards

James

From: "James (Jian Wu) [FDS Data Platform]" 
Date: Monday, September 3, 2018 at 4:13 PM
To: Gary Yao 
Cc: "user@flink.apache.org" 
Subject: Re: Flink on Yarn, restart job will not destroy original task manager

My Flink version is 1.5, I will rebuild new version flink

Regards

James

From: Gary Yao 
Date: Monday, September 3, 2018 at 3:57 PM
To: "James (Jian Wu) [FDS Data Platform]" 
Cc: "user@flink.apache.org" 
Subject: Re: Flink on Yarn, restart job will not destroy original task manager

Hi James,

What version of Flink are you running? In 1.5.0, tasks can spread out due to
changes that were introduced to support "local recovery". There is a
mitigation in 1.5.1 that prevents task spread out but local recovery must be
disabled [2].

Best,
Gary

[1] https://issues.apache.org/jira/browse/FLINK-9635
[2] https://issues.apache.org/jira/browse/FLINK-9634

On Mon, Sep 3, 2018 at 9:20 AM, James (Jian Wu) [FDS Data Platform] 
mailto:james...@coupang.com>> wrote:
Hi:

  I launch flink application on yarn with 5 task manager, every task manager 
has 5 slots with such script

#!/bin/sh
CLASSNAME=$1
JARNAME=$2
ARUGMENTS=$3

export JVM_ARGS="${JVM_ARGS} -Dmill.env.active=aws"
/usr/bin/flink run -m yarn-cluster --parallelism 15  -yn 5 -ys 3 -yjm 8192 -ytm 
8192  -ynm flink-order-detection -yD 
env.java.opts.jobmanager='-Dmill.env.active=aws'  -yD 
env.java.opts.taskmanager='-Dmill.env.active=aws'  -c $CLASSNAME   \
$JARNAME $ARUGMENTS


The original flink app occupy 5 containers and 15 vcores, run for 3+ days, one 
of task manage killed by yarn because of memory leak and job manager start new 
task managers. Currently my flink app running normally on yarn,  but occupy 10 
containers, 28 vcores. (Application Master shows my flink job running for 75 
hours, click into running job in flink web ui, it shows my job running for 
28hours because of restart)

In my opinion, job manager will attempt to start the failed task manager, and 
in the final app still use 5 containers and 15 vcores, why after restart job by 
yarn will occupy double resource.

Any one can give me some suggestion?

Regards

James



JAXB Classloading errors when using PMML Library (AWS EMR Flink 1.4.2)

2018-09-03 Thread Sameer W
Hi,

I am using PMML dependency as below to execute ML models at prediction time
within a Flink Map operator



org.jpmml

pmml-evaluator

1.4.3






javax.xml.bind

jaxb-api





org.glassfish.jaxb

jaxb-runtime





guava

com.google.guava






Environment is EMR, OpenJDK 1.8 and Flink 1.4.2. My programs run fine in my
Eclipse Development environment. However when we deploy on the cluster we
get Classloading exceptions which are primarily due to the PMML classes
loaded via the Flink Classloader while the JAXB classes are loaded by the
boot classloader. Also the problem seems like the version of the jaxb
classes referenced within the PMML library is different from the ones
loaded by the open JDK.

For example I keep getting this type of error. I have also listed another
error after this which is linked to not being able to use reflection and
unsafe library to set private instances within the PMML class instance
using JAXB Unmarshaller.  -
java.lang.LinkageError: loader constraint violation: when resolving
interface method
"javax.xml.bind.Unmarshaller.unmarshal(Ljavax/xml/transform/Source;)Ljava/lang/Object;"
the class loader (instance of
org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders$ChildFirstClassLoader)
of the current class,
com/comcast/mlarche/featurecreationflows/xreerrors/MyJAXBUtil, and the
class loader (instance of ) for the method's defining class,
javax/xml/bind/Unmarshaller, have different Class objects for the type
javax/xml/transform/Source used in the signature
at
com.comcast.mlarche.featurecreationflows.xreerrors.MyJAXBUtil.unmarshal(MyJAXBUtil.java:52)
at
com.comcast.mlarche.featurecreationflows.xreerrors.MyJAXBUtil.unmarshalPMML(MyJAXBUtil.java:38)
at
com.comcast.mlarche.featurecreationflows.xreerrors.PMMLModelExecution.getMiningModelEvaluator(PMMLModelExecution.java:67)
at
com.comcast.mlarche.featurecreationflows.xreerrors.PMMLModelExecution.predict(PMMLModelExecution.java:126)
at
com.comcast.mlarche.featurecreationflows.xreerrors.XreErrorModelsPredictionServiceService.predict(XreErrorModelsPredictionServiceService.java:61)
at
com.comcast.mlarche.featurecreationflows.xreerrors.XreErrorModelsPredictionServiceService.predictSystemRefresh(XreErrorModelsPredictionServiceService.java:44)
at
com.comcast.mlarche.featurecreationflows.xreerrors.XREErrorsModelExecutionMap.flatMap(XREErrorsModelExecutionMap.java:46)
at
com.comcast.mlarche.featurecreationflows.xreerrors.XREErrorsModelExecutionMap.flatMap(XREErrorsModelExecutionMap.java:17)
at
org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)
at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
at
com.comcast.mlarche.featurecreationflows.xreerrors.XreErrorsModelsApplyFunction.apply(XreErrorsModelsApplyFunction.java:65)
at
com.comcast.mlarche.featurecreationflows.xreerrors.XreErrorsModelsApplyFunction.apply(XreErrorsModelsApplyFunction.java:20)
at
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:44)
at
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:32)
at
org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator.emitWindowContents(EvictingWindowOperator.java:357)
at
org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator.processElement(EvictingWindowOperator.java:218)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:207)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)

Finally when I started using InputStream based constructor (so as not to
use SAXSource classe) below is the last error I got when I finally got as
deep within the library as possible without using any util classes.

java.lang.RuntimeException: java.lang.IllegalArgumentException: Can not set
org.xml.sax.Locator field org.dmg.pmml.PMMLObject.locator to
org.xml.sax.helpers.LocatorImpl

Why FlinkKafkaConsumer doesn't subscribe to topics?

2018-09-03 Thread Julio Biason
Hey guys,

We are trying to add external monitoring to our system, but we can only get
the lag in kafka topics while the Flink job is running -- if, for some
reason, the Flink job fails, we get no visibility on how big the lag is.

(Besides that, the way Flink reports is not accurate and produces a lot of
-Inf, which I already discussed before.)

While looking at the problem, we noticed that the FlinkKafkaConsumer never
uses `subscribe` to subscribe to the topics and that's why the values are
never stored back into Kafka, even when the driver itself does
`commitAsync`.

Is there any reason for not subscribing to topics that I may have missed?

-- 
*Julio Biason*, Sofware Engineer
*AZION*  |  Deliver. Accelerate. Protect.
Office: +55 51 3083 8101   |  Mobile: +55 51
*99907 0554*


RE: Flink 1.5.2 query

2018-09-03 Thread Sarathy, Parth
The message is in WARN level and not in INFO or DEBUG level , so our log 
analysis tool considers this as an issue. Can the CLI print this message in 
INFO/DEBUG level?

-Original Message-
From: Chesnay Schepler [mailto:ches...@apache.org] 
Sent: Monday, September 3, 2018 6:32 PM
To: Parth Sarathy ; user@flink.apache.org
Subject: Re: Flink 1.5.2 query

Cannot be avoided. The CLI eagerly loads client classes for yarn, which as see 
fails since the hadoop classes aren't available.
If you don't use YARN you can safely ignore this.

On 03.09.2018 14:37, Parth Sarathy wrote:
> Hi,
>When using flink 1.5.2, “Apache Flink Only” binary 
> (flink-1.5.2-bin-scala_2.11), following  error is seen in client log:
>
> 2018-08-30 10:56:59.129 [main] WARN  
> org.apache.flink.client.cli.CliFrontend
> - Could not load CLI class org.apache.flink.yarn.cli.FlinkYarnSessionCli.
> java.lang.NoClassDefFoundError:
> org/apache/hadoop/yarn/exceptions/YarnException
>  at java.lang.Class.forName0(Native Method) ~[na:1.8.0_171]
>  at java.lang.Class.forName(Class.java:264) ~[na:1.8.0_171]
>  at
> org.apache.flink.client.cli.CliFrontend.loadCustomCommandLine(CliFront
> end.java:1208)
> [flink-dist_2.11-1.5.2.jar:1.5.2]
>  at
> org.apache.flink.client.cli.CliFrontend.loadCustomCommandLines(CliFron
> tend.java:1164)
> [flink-dist_2.11-1.5.2.jar:1.5.2]
>  at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1090)
> [flink-dist_2.11-1.5.2.jar:1.5.2]
> Caused by: java.lang.ClassNotFoundException:
> org.apache.hadoop.yarn.exceptions.YarnException
>  at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> ~[na:1.8.0_171]
>  at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> ~[na:1.8.0_171]
>  at 
> sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
> ~[na:1.8.0_171]
>  at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> ~[na:1.8.0_171]
>  ... 5 common frames omitted
>
> There are no functional failures, but how to avoid this exception in log?
>
> Thanks,
> Parth Sarathy
>
>
>
>
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>



Re: Flink 1.5.2 query

2018-09-03 Thread Chesnay Schepler
Cannot be avoided. The CLI eagerly loads client classes for yarn, which 
as see fails since the hadoop classes aren't available.

If you don't use YARN you can safely ignore this.

On 03.09.2018 14:37, Parth Sarathy wrote:

Hi,
   When using flink 1.5.2, “Apache Flink Only” binary
(flink-1.5.2-bin-scala_2.11), following  error is seen in client log:

2018-08-30 10:56:59.129 [main] WARN  org.apache.flink.client.cli.CliFrontend
- Could not load CLI class org.apache.flink.yarn.cli.FlinkYarnSessionCli.
java.lang.NoClassDefFoundError:
org/apache/hadoop/yarn/exceptions/YarnException
 at java.lang.Class.forName0(Native Method) ~[na:1.8.0_171]
 at java.lang.Class.forName(Class.java:264) ~[na:1.8.0_171]
 at
org.apache.flink.client.cli.CliFrontend.loadCustomCommandLine(CliFrontend.java:1208)
[flink-dist_2.11-1.5.2.jar:1.5.2]
 at
org.apache.flink.client.cli.CliFrontend.loadCustomCommandLines(CliFrontend.java:1164)
[flink-dist_2.11-1.5.2.jar:1.5.2]
 at
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1090)
[flink-dist_2.11-1.5.2.jar:1.5.2]
Caused by: java.lang.ClassNotFoundException:
org.apache.hadoop.yarn.exceptions.YarnException
 at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
~[na:1.8.0_171]
 at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
~[na:1.8.0_171]
 at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
~[na:1.8.0_171]
 at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
~[na:1.8.0_171]
 ... 5 common frames omitted

There are no functional failures, but how to avoid this exception in log?

Thanks,
Parth Sarathy




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Flink 1.5.2 query

2018-09-03 Thread Parth Sarathy
Hi,
  When using flink 1.5.2, “Apache Flink Only” binary
(flink-1.5.2-bin-scala_2.11), following  error is seen in client log:

2018-08-30 10:56:59.129 [main] WARN  org.apache.flink.client.cli.CliFrontend 
- Could not load CLI class org.apache.flink.yarn.cli.FlinkYarnSessionCli.
java.lang.NoClassDefFoundError:
org/apache/hadoop/yarn/exceptions/YarnException
at java.lang.Class.forName0(Native Method) ~[na:1.8.0_171]
at java.lang.Class.forName(Class.java:264) ~[na:1.8.0_171]
at
org.apache.flink.client.cli.CliFrontend.loadCustomCommandLine(CliFrontend.java:1208)
[flink-dist_2.11-1.5.2.jar:1.5.2]
at
org.apache.flink.client.cli.CliFrontend.loadCustomCommandLines(CliFrontend.java:1164)
[flink-dist_2.11-1.5.2.jar:1.5.2]
at
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1090)
[flink-dist_2.11-1.5.2.jar:1.5.2]
Caused by: java.lang.ClassNotFoundException:
org.apache.hadoop.yarn.exceptions.YarnException
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
~[na:1.8.0_171]
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
~[na:1.8.0_171]
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
~[na:1.8.0_171]
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
~[na:1.8.0_171]
... 5 common frames omitted 

There are no functional failures, but how to avoid this exception in log?

Thanks,
Parth Sarathy




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Promethus - custom metrics at job level

2018-09-03 Thread Averell
Thank you Reza. I will try your repo first :)

Regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Can't access Flink Dashboard at 8081, running Flink program using Eclipse

2018-09-03 Thread Chesnay Schepler
you can setup a specific port using 
https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#rest-port.

On 03.09.2018 12:12, Mar_zieh wrote:

Hello

I added these dependencies to "pom.xml"; also, I added configuration to my
code like these:

Configuration config = new Configuration();
config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);

StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment(getP, config);

First, I could connect to Dashboard,but now I get this error:
"Exception in thread "main" java.net.BindException: Address already in use:
bind"

I know it is related to port number (8081) which is shared with other
programs. But I cannot solve it.

Would you please guide me?

I really appreciate it.

Sincerely




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Re: Can't access Flink Dashboard at 8081, running Flink program using Eclipse

2018-09-03 Thread Mar_zieh
Hello 

I added these dependencies to "pom.xml"; also, I added configuration to my
code like these:

Configuration config = new Configuration();
config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);

StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment(getP, config);

First, I could connect to Dashboard,but now I get this error:
"Exception in thread "main" java.net.BindException: Address already in use:
bind"

I know it is related to port number (8081) which is shared with other
programs. But I cannot solve it.

Would you please guide me? 

I really appreciate it.

Sincerely




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink on kubernetes

2018-09-03 Thread Lasse Nedergaard
Please try to use fsstatebackend as a test to see if the problems disappear. 

Med venlig hilsen / Best regards
Lasse Nedergaard


> Den 3. sep. 2018 kl. 11.46 skrev 祁明良 :
> 
> Hi Lasse,
> 
> Is there JIRA ticket I can follow?
> 
> Best,
> Mingliang
> 
>> On 3 Sep 2018, at 5:42 PM, Lasse Nedergaard  
>> wrote:
>> 
>> Hi.
>> 
>> We have documented the same on Flink 1.4.2/1.6 running on Yarn and Mesos.
>> If you correlate the none heap memory together with job restart you will see 
>> none heap increases for every restart until you get an OOM.
>> 
>> I let you know if/when I know how to handle the problem.
>> 
>> Med venlig hilsen / Best regards
>> Lasse Nedergaard
>> 
>> 
>>> Den 3. sep. 2018 kl. 10.08 skrev 祁明良 :
>>> 
>>> Hi All,
>>> 
>>> We are running flink(version 1.5.2) on k8s with rocksdb backend.
>>> Each time when the job is cancelled and restarted, we face OOMKilled 
>>> problem from the container.
>>> In our case, we only assign 15% of container memory to JVM and leave others 
>>> to rocksdb.
>>> To us, it looks like memory used by rocksdb is not released after job 
>>> cancelling. Anyone can gives some suggestions?
>>> Currently our tmp fix is to restart the TM pod for each job cancelling, but 
>>> it has to be manually.
>>> 
>>> Regards,
>>> Mingliang
>>> 
>>> 本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、复制、或散发)本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本邮件!
>>> This communication may contain privileged or other confidential information 
>>> of Red. If you have received it in error, please advise the sender by reply 
>>> e-mail and immediately delete the message and any attachments without 
>>> copying or disclosing the contents. Thank you.
> 
> 
> 本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、复制、或散发)本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本邮件!
> This communication may contain privileged or other confidential information 
> of Red. If you have received it in error, please advise the sender by reply 
> e-mail and immediately delete the message and any attachments without copying 
> or disclosing the contents. Thank you.


Re: Flink on kubernetes

2018-09-03 Thread 祁明良
Hi Lasse,

Is there JIRA ticket I can follow?

Best,
Mingliang

> On 3 Sep 2018, at 5:42 PM, Lasse Nedergaard  wrote:
>
> Hi.
>
> We have documented the same on Flink 1.4.2/1.6 running on Yarn and Mesos.
> If you correlate the none heap memory together with job restart you will see 
> none heap increases for every restart until you get an OOM.
>
> I let you know if/when I know how to handle the problem.
>
> Med venlig hilsen / Best regards
> Lasse Nedergaard
>
>
>> Den 3. sep. 2018 kl. 10.08 skrev 祁明良 :
>>
>> Hi All,
>>
>> We are running flink(version 1.5.2) on k8s with rocksdb backend.
>> Each time when the job is cancelled and restarted, we face OOMKilled problem 
>> from the container.
>> In our case, we only assign 15% of container memory to JVM and leave others 
>> to rocksdb.
>> To us, it looks like memory used by rocksdb is not released after job 
>> cancelling. Anyone can gives some suggestions?
>> Currently our tmp fix is to restart the TM pod for each job cancelling, but 
>> it has to be manually.
>>
>> Regards,
>> Mingliang
>>
>> 本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、复制、或散发)本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本邮件!
>> This communication may contain privileged or other confidential information 
>> of Red. If you have received it in error, please advise the sender by reply 
>> e-mail and immediately delete the message and any attachments without 
>> copying or disclosing the contents. Thank you.


本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、复制、或散发)本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本邮件!
This communication may contain privileged or other confidential information of 
Red. If you have received it in error, please advise the sender by reply e-mail 
and immediately delete the message and any attachments without copying or 
disclosing the contents. Thank you.


Re: Flink on kubernetes

2018-09-03 Thread Lasse Nedergaard
Hi. 

We have documented the same on Flink 1.4.2/1.6 running on Yarn and Mesos. 
If you correlate the none heap memory together with job restart you will see 
none heap increases for every restart until you get an OOM. 

I let you know if/when I know how to handle the problem. 

Med venlig hilsen / Best regards
Lasse Nedergaard


> Den 3. sep. 2018 kl. 10.08 skrev 祁明良 :
> 
> Hi All,
> 
> We are running flink(version 1.5.2) on k8s with rocksdb backend.
> Each time when the job is cancelled and restarted, we face OOMKilled problem 
> from the container.
> In our case, we only assign 15% of container memory to JVM and leave others 
> to rocksdb.
> To us, it looks like memory used by rocksdb is not released after job 
> cancelling. Anyone can gives some suggestions?
> Currently our tmp fix is to restart the TM pod for each job cancelling, but 
> it has to be manually.
> 
> Regards,
> Mingliang
> 
> 本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、复制、或散发)本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本邮件!
> This communication may contain privileged or other confidential information 
> of Red. If you have received it in error, please advise the sender by reply 
> e-mail and immediately delete the message and any attachments without copying 
> or disclosing the contents. Thank you.


Re: CompletedCheckpoints are getting Stale ( Flink 1.4.2 )

2018-09-03 Thread Stephan Ewen
One final thought: How to you stop the unbounded streaming application?

If you just kill the Yarn/Mesos/K8s cluster, Flink will not know that this
is a shutdown, and interpret it as a failure. Because of that, checkpoints
will remain (in DFS and in ZooKeeper).

On Fri, Aug 31, 2018 at 2:18 PM, vino yang  wrote:

> Hi Laura:
>
> Perhaps this is possible because the path to the completed checkpoint on
> HDFS does not have a hierarchical relationship to identify which job it
> belongs to, it is just a fixed prefix plus a random string generated name.
> My personal advice:
>
> 1) Verify it with a clean cluster (clean up the metadata that
> Flink/Zookeeper/HDFS might confuse);
> 2) Verify the node and metadata information (/checkpoints/${jobID}/) on
> ZooKeeper;
> 3) Observe whether there is relevant abnormal information in the log;
>
> Thanks, vino.
>
> Laura Uzcátegui  于2018年8月31日周五 下午3:51写道:
>
>> Hi Stephan and Vino,
>>
>> Thanks for the quick reply and hints.
>>
>> The configuration for the checkpoints that should remain is set to 1.
>>
>> Since this is a unbounded job run and I can't see it finishing, I suspect
>> as we tear down the cluster every time we finish with the integration test
>> being run, the completedCheckpoint doesn't get deleted, next when the
>> integration test runs again it picks up from the latest completedCheckpoint
>> but there is cases where that job doesn't run again therefore the
>> completedCheckpoint gets staled. Is this something that could happen?
>>
>> Is there anyway to check by logging wether the job gets to Global Final
>> State before we tear down the cluster?
>>
>> Cheers,
>>
>> Laura
>>
>> On Fri, 31 Aug 2018, 8:37 am Stephan Ewen,  wrote:
>>
>>> Hi Laura!
>>>
>>> Vino had good pointers. There really should be no case in which this is
>>> not cleaned up.
>>>
>>> Is this a bounded job that ends? Is it always the last of the bounded
>>> job's checkpoints that remains?
>>>
>>> Best,
>>> Stephan
>>>
>>>
>>> On Fri, Aug 31, 2018 at 5:02 AM, vino yang 
>>> wrote:
>>>
 Hi Laura,

 First of all, Flink only keeps one completed checkpoint by default[1].
 You need to confirm whether your configuration is consistent with the
 number of files. If they are consistent, it is for other reasons:

 1) The cleaning of the completed checkpoint is done by JM. You need to
 confirm whether it can access your file.[2]
 2) JM will asynchronously clean up the metadata of the old completed
 checkpoint on the ZK with a background thread. After the cleanup is
 successful, it will clean the Checkpoint data. If the above reasons are
 excluded, then maybe you provide JM's log to help us confirm whether this
 is the reason. I think it is more appropriate to ping Till.[3]

 [1]: https://ci.apache.org/projects/flink/flink-docs-
 release-1.6/dev/stream/state/checkpointing.html#state-
 checkpoints-num-retained
 [2]: https://stackoverflow.com/questions/44928624/apache-
 flink-not-deleting-old-checkpoints
 [3]: https://github.com/apache/flink/blob/master/
 flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/
 ZooKeeperStateHandleStore.java#L437

 Thanks, vino.

 Laura Uzcátegui  于2018年8月30日周四 下午10:52写道:

> Hello,
>
>  At work, we are currently standing up a cluster with the following
> configuration:
>
>
>- Flink version: 1.4.2
>- HA Enabled with Zookeeper
>- State backend : rocksDB
>- state.checkpoints.dir: hdfs://namenode:9000/flink/checkpoints
>- state.backend.rocksdb.checkpointdir: hdfs://namenode:9000/flink/
>checkpoints
>- *high-availability.storageDir*: hdfs://namenode:9000/flink/
>recovery
>
> We have also a job running with checkpointing enabled and without
> externalized checkpoint.
>
> We run this job multiple times a day since it's run from our
> integration-test pipeline, and we started noticing the folder
> *high-availability.storageDir  *storing the completedCheckpoint files
> is increasing constantly the number of files created, which is making us
> wonder if there is no cleanup policy for the Filesystem when HA is 
> enabled.
>
> Under what  circumstance would there be an ever increasing number of
> completedCheckpoint files on the HA storage dir when there is only a 
> single
> job running over and over again ?
>
> Here is a list of what we are seeing accumulating over time and
> actually reaching the maximum of files allowed on the Filesystem.
>
> completedCheckpoint00d86c01d8b9
> completedCheckpoint00d86e9030a9
> completedCheckpoint00d877b74355
> completedCheckpoint00d87b3dd9ad
> completedCheckpoint00d8815d9afd
> completedCheckpoint00d88973195c
> completedCheckpoint00d88b4792f2
> completedCheckpoint00d890d499dc
> completedCheckpoint00d91b00ada2
>
>
> Cheers,

Re: Flink on Yarn, restart job will not destroy original task manager

2018-09-03 Thread James (Jian Wu) [FDS Data Platform]
My Flink version is 1.5, I will rebuild new version flink

Regards

James

From: Gary Yao 
Date: Monday, September 3, 2018 at 3:57 PM
To: "James (Jian Wu) [FDS Data Platform]" 
Cc: "user@flink.apache.org" 
Subject: Re: Flink on Yarn, restart job will not destroy original task manager

Hi James,

What version of Flink are you running? In 1.5.0, tasks can spread out due to
changes that were introduced to support "local recovery". There is a
mitigation in 1.5.1 that prevents task spread out but local recovery must be
disabled [2].

Best,
Gary

[1] https://issues.apache.org/jira/browse/FLINK-9635
[2] https://issues.apache.org/jira/browse/FLINK-9634

On Mon, Sep 3, 2018 at 9:20 AM, James (Jian Wu) [FDS Data Platform] 
mailto:james...@coupang.com>> wrote:
Hi:

  I launch flink application on yarn with 5 task manager, every task manager 
has 5 slots with such script

#!/bin/sh
CLASSNAME=$1
JARNAME=$2
ARUGMENTS=$3

export JVM_ARGS="${JVM_ARGS} -Dmill.env.active=aws"
/usr/bin/flink run -m yarn-cluster --parallelism 15  -yn 5 -ys 3 -yjm 8192 -ytm 
8192  -ynm flink-order-detection -yD 
env.java.opts.jobmanager='-Dmill.env.active=aws'  -yD 
env.java.opts.taskmanager='-Dmill.env.active=aws'  -c $CLASSNAME   \
$JARNAME $ARUGMENTS


The original flink app occupy 5 containers and 15 vcores, run for 3+ days, one 
of task manage killed by yarn because of memory leak and job manager start new 
task managers. Currently my flink app running normally on yarn,  but occupy 10 
containers, 28 vcores. (Application Master shows my flink job running for 75 
hours, click into running job in flink web ui, it shows my job running for 
28hours because of restart)

In my opinion, job manager will attempt to start the failed task manager, and 
in the final app still use 5 containers and 15 vcores, why after restart job by 
yarn will occupy double resource.

Any one can give me some suggestion?

Regards

James



Re: Rolling File Sink Exception

2018-09-03 Thread Chesnay Schepler
You're closing the stream and then call super.close() which calls flush, 
which fails since you already closed the stream.


If you don't close the stream the problem should disappear.

On 03.09.2018 09:30, clay wrote:

When I want to write compressed string data to hdfs, I found that flink only
provides StringWritter, so I used a custom writter, as follows:

public class StringCompressWriter extends StreamWriterBase {

 private static final long serialVersionUID = 1L;

 private String charsetName;

 private transient Charset charset;

 private transient CompressionOutputStream outStream;


 public StringCompressWriter() {
 this("UTF-8");
 }

 public StringCompressWriter(String charsetName) {
 this.charsetName = charsetName;
 }

 protected StringCompressWriter(StringCompressWriter other) {
 super(other);
 this.charsetName = other.charsetName;
 }


 /**
  * open & write
  * @return
  */
 @Override
 public void open(FileSystem fs, Path path) throws IOException {
 super.open(fs, path);

 this.charset = Charset.forName(charsetName);

 Configuration conf = fs.getConf();

 CompressionCodecFactory codecFactory = new
CompressionCodecFactory(conf);
 CompressionCodec codec = codecFactory.getCodecByName("GzipCodec");

 FSDataOutputStream dataOutputStream = getStream();
 Compressor compressor = CodecPool.getCompressor(codec,
fs.getConf());
 outStream = codec.createOutputStream(dataOutputStream, compressor);
 }

 @Override
 public void write(T element) throws IOException {
 getStream(); // Throws if the stream is not open
 outStream.write(element.toString().getBytes(charset));
 outStream.write('\n');
 }

 @Override
 public void close() throws IOException {
 if (outStream != null) {
 outStream.close();
//outStream = null;
 }
 super.close();
 }

 @Override
 public Writer duplicate() {
 return new StringCompressWriter<>(this);
 }

 @Override
 public int hashCode() {
 return Objects.hash(super.hashCode(), charsetName);
 }

 @Override
 public boolean equals(Object other) {
 if (this == other) {
 return true;
 }
 if (other == null) {
 return false;
 }
 if (getClass() != other.getClass()) {
 return false;
 }
 StringCompressWriter writer = (StringCompressWriter) other;
 // field comparison
 return Objects.equals(charsetName, writer.charsetName)
 && super.equals(other);
 }
}

But when I run my app on yarn, the taskmanager always reports the following
error:
2018-09-03 15:25:54,187 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph- Sink:
bucketSink (10/15) (67b40f43fc72371f19e61a6ac3f60819) switched from RUNNING
to FAILED.
java.nio.channels.ClosedChannelException
at
org.apache.hadoop.hdfs.DFSOutputStream.checkClosed(DFSOutputStream.java:1618)
at
org.apache.hadoop.hdfs.DFSOutputStream.flushOrSync(DFSOutputStream.java:1982)
at 
org.apache.hadoop.hdfs.DFSOutputStream.hflush(DFSOutputStream.java:1942)
at
org.apache.hadoop.fs.FSDataOutputStream.hflush(FSDataOutputStream.java:130)
at
org.apache.flink.streaming.connectors.fs.StreamWriterBase.flush(StreamWriterBase.java:83)
at
org.apache.flink.streaming.connectors.fs.StreamWriterBase.close(StreamWriterBase.java:99)
at
com.vipkid.bigdata.sink.StringCompressWriter.close(StringCompressWriter.java:73)
at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:570)
at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:514)
at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:446)
at
org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
at
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:748)
2018-09-03 15:25:54,191 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph- Job
TestBucketing (01e7088de6b4f9cbe22d9f6a3fdbd2fe) switched from state RUNNING
to FAILING.

Moreover, when I looked at the data, I found that the data stream did not
seem to be closed properly.

hdfs dfs -text /tmp/Test/2018-08-27/part-8-96 | 

Flink on kubernetes

2018-09-03 Thread 祁明良
Hi All,

We are running flink(version 1.5.2) on k8s with rocksdb backend.
Each time when the job is cancelled and restarted, we face OOMKilled problem 
from the container.
In our case, we only assign 15% of container memory to JVM and leave others to 
rocksdb.
To us, it looks like memory used by rocksdb is not released after job 
cancelling. Anyone can gives some suggestions?
Currently our tmp fix is to restart the TM pod for each job cancelling, but it 
has to be manually.

Regards,
Mingliang

本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、复制、或散发)本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本邮件!
This communication may contain privileged or other confidential information of 
Red. If you have received it in error, please advise the sender by reply e-mail 
and immediately delete the message and any attachments without copying or 
disclosing the contents. Thank you.


Re: Flink on Yarn, restart job will not destroy original task manager

2018-09-03 Thread Gary Yao
Hi James,

What version of Flink are you running? In 1.5.0, tasks can spread out due to
changes that were introduced to support "local recovery". There is a
mitigation in 1.5.1 that prevents task spread out but local recovery must be
disabled [2].

Best,
Gary

[1] https://issues.apache.org/jira/browse/FLINK-9635
[2] https://issues.apache.org/jira/browse/FLINK-9634

On Mon, Sep 3, 2018 at 9:20 AM, James (Jian Wu) [FDS Data Platform] <
james...@coupang.com> wrote:

> Hi:
>
>
>
>   I launch flink application on yarn with 5 task manager, every task
> manager has 5 slots with such script
>
>
>
> #!/bin/sh
>
> CLASSNAME=$1
>
> JARNAME=$2
>
> ARUGMENTS=$3
>
>
>
> export JVM_ARGS="${JVM_ARGS} -Dmill.env.active=aws"
>
> /usr/bin/flink run -m yarn-cluster --parallelism 15  -yn 5 -ys 3 -yjm 8192
> -ytm 8192  -ynm flink-order-detection -yD 
> env.java.opts.jobmanager='-Dmill.env.active=aws'
> -yD env.java.opts.taskmanager='-Dmill.env.active=aws'  -c $CLASSNAME   \
>
> $JARNAME $ARUGMENTS
>
>
>
>
>
> The original flink app occupy 5 containers and 15 vcores, run for 3+ days,
> one of task manage killed by yarn because of memory leak and job manager
> start new task managers. Currently my flink app running normally on yarn,
>  but occupy 10 containers, 28 vcores. (Application Master shows my flink
> job running for 75 hours, click into running job in flink web ui, it shows
> my job running for 28hours because of restart)
>
>
>
> In my opinion, job manager will attempt to start the failed task manager,
> and in the final app still use 5 containers and 15 vcores, why after
> restart job by yarn will occupy double resource.
>
>
>
> Any one can give me some suggestion?
>
>
>
> Regards
>
>
>
> James
>


Re: Promethus - custom metrics at job level

2018-09-03 Thread Reza Sameei
Hello Averell

Based on my experience, using out-of-the-box reporters & collectors need a
little more effort!
Of course I hadn't experienced all of them, but after reviewing some of
them I tried my way:
Writing custom reporters to push metrics in ElasticSearch (the available
component
in our project & very flexible). The custom reporters are able to group
metrics with
some configurable/dynamic parameters (in my case based on a defined
metric-name
and jobs): https://github.com/reza-sameei/elasticreporter
I think that maybe writing a simple custom reporter will help you :)



On Mon, Sep 3, 2018 at 10:52 AM Averell  wrote:

> Hi everyone,
>
> I am trying to publish some counters and meters from my Flink job, to be
> scraped by a Prometheus server. It seems to me that all the metrics that I
> am publishing are done at the task level, so that my Prometheus server
> needs
> to be configured to scrape from many targets (the number equivalent to my
> max parallelism). And after that, I need to do aggregation at the
> Prometheus
> server to get the numbers for my whole job.
>
> My question is: is it possible to have metrics at the job level? And can I
> have one single Prometheus target to scrape the data from?
>
> I found this
>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Service-discovery-for-Prometheus-on-YARN-td21988.html
> ,
> which looks like a "no" answer to my question. However, I still hope for
> some easy to use solution.
>
> Thanks and best regards,
> Averell
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


-- 
رضا سامعی  | Reza Sameei | Software Developer | 09126662695


Rolling File Sink Exception

2018-09-03 Thread clay4444
When I want to write compressed string data to hdfs, I found that flink only
provides StringWritter, so I used a custom writter, as follows:

public class StringCompressWriter extends StreamWriterBase {

private static final long serialVersionUID = 1L;

private String charsetName;

private transient Charset charset;

private transient CompressionOutputStream outStream;


public StringCompressWriter() {
this("UTF-8");
}

public StringCompressWriter(String charsetName) {
this.charsetName = charsetName;
}

protected StringCompressWriter(StringCompressWriter other) {
super(other);
this.charsetName = other.charsetName;
}


/**
 * open & write
 * @return
 */
@Override
public void open(FileSystem fs, Path path) throws IOException {
super.open(fs, path);

this.charset = Charset.forName(charsetName);

Configuration conf = fs.getConf();

CompressionCodecFactory codecFactory = new
CompressionCodecFactory(conf);
CompressionCodec codec = codecFactory.getCodecByName("GzipCodec");

FSDataOutputStream dataOutputStream = getStream();
Compressor compressor = CodecPool.getCompressor(codec,
fs.getConf());
outStream = codec.createOutputStream(dataOutputStream, compressor);
}

@Override
public void write(T element) throws IOException {
getStream(); // Throws if the stream is not open
outStream.write(element.toString().getBytes(charset));
outStream.write('\n');
}

@Override
public void close() throws IOException {
if (outStream != null) {
outStream.close();
//outStream = null;
}
super.close();
}

@Override
public Writer duplicate() {
return new StringCompressWriter<>(this);
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), charsetName);
}

@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (other == null) {
return false;
}
if (getClass() != other.getClass()) {
return false;
}
StringCompressWriter writer = (StringCompressWriter) other;
// field comparison
return Objects.equals(charsetName, writer.charsetName)
&& super.equals(other);
}
}

But when I run my app on yarn, the taskmanager always reports the following
error:
2018-09-03 15:25:54,187 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph- Sink:
bucketSink (10/15) (67b40f43fc72371f19e61a6ac3f60819) switched from RUNNING
to FAILED.
java.nio.channels.ClosedChannelException
at
org.apache.hadoop.hdfs.DFSOutputStream.checkClosed(DFSOutputStream.java:1618)
at
org.apache.hadoop.hdfs.DFSOutputStream.flushOrSync(DFSOutputStream.java:1982)
at 
org.apache.hadoop.hdfs.DFSOutputStream.hflush(DFSOutputStream.java:1942)
at
org.apache.hadoop.fs.FSDataOutputStream.hflush(FSDataOutputStream.java:130)
at
org.apache.flink.streaming.connectors.fs.StreamWriterBase.flush(StreamWriterBase.java:83)
at
org.apache.flink.streaming.connectors.fs.StreamWriterBase.close(StreamWriterBase.java:99)
at
com.vipkid.bigdata.sink.StringCompressWriter.close(StringCompressWriter.java:73)
at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:570)
at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:514)
at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:446)
at
org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
at
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:748)
2018-09-03 15:25:54,191 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph- Job
TestBucketing (01e7088de6b4f9cbe22d9f6a3fdbd2fe) switched from state RUNNING
to FAILING.

Moreover, when I looked at the data, I found that the data stream did not
seem to be closed properly.

hdfs dfs -text /tmp/Test/2018-08-27/part-8-96 | wc -l 

text: Unexpected end of ZLIB input stream
3268

Can someone tell me what happened?




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Flink on Yarn, restart job will not destroy original task manager

2018-09-03 Thread James (Jian Wu) [FDS Data Platform]
Hi:

  I launch flink application on yarn with 5 task manager, every task manager 
has 5 slots with such script

#!/bin/sh
CLASSNAME=$1
JARNAME=$2
ARUGMENTS=$3

export JVM_ARGS="${JVM_ARGS} -Dmill.env.active=aws"
/usr/bin/flink run -m yarn-cluster --parallelism 15  -yn 5 -ys 3 -yjm 8192 -ytm 
8192  -ynm flink-order-detection -yD 
env.java.opts.jobmanager='-Dmill.env.active=aws'  -yD 
env.java.opts.taskmanager='-Dmill.env.active=aws'  -c $CLASSNAME   \
$JARNAME $ARUGMENTS


The original flink app occupy 5 containers and 15 vcores, run for 3+ days, one 
of task manage killed by yarn because of memory leak and job manager start new 
task managers. Currently my flink app running normally on yarn,  but occupy 10 
containers, 28 vcores. (Application Master shows my flink job running for 75 
hours, click into running job in flink web ui, it shows my job running for 
28hours because of restart)

In my opinion, job manager will attempt to start the failed task manager, and 
in the final app still use 5 containers and 15 vcores, why after restart job by 
yarn will occupy double resource.

Any one can give me some suggestion?

Regards

James


Promethus - custom metrics at job level

2018-09-03 Thread Averell
Hi everyone,

I am trying to publish some counters and meters from my Flink job, to be
scraped by a Prometheus server. It seems to me that all the metrics that I
am publishing are done at the task level, so that my Prometheus server needs
to be configured to scrape from many targets (the number equivalent to my
max parallelism). And after that, I need to do aggregation at the Prometheus
server to get the numbers for my whole job.

My question is: is it possible to have metrics at the job level? And can I
have one single Prometheus target to scrape the data from?

I found this
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Service-discovery-for-Prometheus-on-YARN-td21988.html,
which looks like a "no" answer to my question. However, I still hope for
some easy to use solution.

Thanks and best regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/