Re: After OutOfMemoryError State can not be readed

2018-09-07 Thread Edward Alexander Rojas Clavijo
Hi Stefan, Vino,
Thanks for your answers.

We are using full checkpointing, not incremental. We are using custom
serializers for the operators state classes, The serializers perform
encryption before writing and decrypt when reading. The serializer is
stateless.
We register the Serializers by using
env.getConfig()
  .registerTypeWithKryoSerializer(ProcessState.class,
ProcessStateSerializer.class);

In normal cases the Serialization works correctly, even after recovering
from a failure. We get this error only when taskmnager fails due to memory
problems.

Thanks again for your help,
Edward

El vie., 7 sept. 2018 a las 11:51, Stefan Richter (<
s.rich...@data-artisans.com>) escribió:

> Hi,
>
> what I can say is that any failures like OOMs should not corrupt
> checkpoint files, because only successfully completed checkpoints are used
> for recovery by the job manager. Just to get a bit more info, are you using
> full or incremental checkpoints? Unfortunately, it is a bit hard to say
> from the given information what the cause of the problem is. Typically,
> these problems have been observed when something was wrong with a
> serializer or a stateful serializer was used from multiple threads.
>
> Best,
> Stefan
>
> Am 07.09.2018 um 05:04 schrieb vino yang :
>
> Hi Edward,
>
> From this log: Caused by: java.io.EOFException, it seems that the state
> metadata file has been corrupted.
> But I can't confirm it, maybe Stefan knows more details, Ping him for you.
>
> Thanks, vino.
>
> Edward Rojas  于2018年9月7日周五 上午1:22写道:
>
>> Hello all,
>>
>> We are running Flink 1.5.3 on Kubernetes with RocksDB as statebackend.
>> When performing some load testing we got an /OutOfMemoryError: native
>> memory
>> exhausted/, causing the job to fail and be restarted.
>>
>> After the Taskmanager is restarted, the job is recovered from a
>> Checkpoint,
>> but it seems that there is a problem when trying to access the state. We
>> got
>> the error from the *onTimer* function of a *onProcessingTime*.
>>
>> It would be possible that the OOM error could have caused to checkpoint a
>> corrupted state?
>>
>> We get Exceptions like:
>>
>> TimerException{java.lang.RuntimeException: Error while retrieving data
>> from
>> RocksDB.}
>> at
>>
>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:288)
>> at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:522)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:277)
>> at
>>
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:191)
>> at
>>
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
>> at
>>
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1160)
>> at
>>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
>> at java.lang.Thread.run(Thread.java:811)
>> Caused by: java.lang.RuntimeException: Error while retrieving data from
>> RocksDB.
>> at
>>
>> org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:89)
>> at com.xxx.ProcessFunction.*onTimer*(ProcessFunction.java:279)
>> at
>>
>> org.apache.flink.streaming.api.operators.KeyedProcessOperator.invokeUserFunction(KeyedProcessOperator.java:94)
>> at
>>
>> org.apache.flink.streaming.api.operators.KeyedProcessOperator.*onProcessingTime*(KeyedProcessOperator.java:78)
>> at
>>
>> org.apache.flink.streaming.api.operators.HeapInternalTimerService.*onProcessingTime*(HeapInternalTimerService.java:266)
>> at
>>
>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285)
>> ... 7 more
>> Caused by: java.io.EOFException
>> at java.io.DataInputStream.readFully(DataInputStream.java:208)
>> at java.io.DataInputStream.readUTF(DataInputStream.java:618)
>> at java.io.DataInputStream.readUTF(DataInputStream.java:573)
>> at
>>
>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:381)
>> at
>>
>> org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:87)
>> ... 12 more
>>
>>
>> Thanks in advance for any help
>>
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>
>

-- 
*Edward Alexander Rojas Clavijo*



*Software EngineerHybrid CloudIBM France*


RE: Flink Failing to Connect to Kafka org.apache.kafka.common.protocol.types.SchemaException: Error computing size for field 'topics': java.lang.NullPointerException

2018-09-07 Thread Oliver Buckley-Salmon
Sorry, there was a code issue, where I was creating a kafka 10 consumer.
Problem solved.

From: Oliver Buckley-Salmon
Sent: 07 September 2018 15:04
To: user@flink.apache.org
Subject: Flink Failing to Connect to Kafka 
org.apache.kafka.common.protocol.types.SchemaException: Error computing size 
for field 'topics': java.lang.NullPointerException

Hi,
I have a Flink 1.4.0 cluster running on OpenShift with a job that connects to a 
Kafka 0.11.0.1 cluster in the same Openshift project. The job reads from one 
topic and writes to two others.
The job deploys OK but when it starts up it immediately crashes with the 
following exception
org.apache.kafka.common.protocol.types.SchemaException: Error computing size 
for field 'topics': java.lang.NullPointerException
 at org.apache.kafka.common.protocol.types.Schema.sizeOf(Schema.java:93)
 at 
org.apache.kafka.common.protocol.types.Struct.sizeOf(Struct.java:258)
 at 
org.apache.kafka.common.requests.AbstractRequestResponse.serialize(AbstractRequestResponse.java:28)
 at 
org.apache.kafka.common.requests.AbstractRequest.serialize(AbstractRequest.java:81)
 at 
org.apache.kafka.common.requests.AbstractRequest.toSend(AbstractRequest.java:74)
 at 
org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:396)
 at 
org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:370)
 at org.apache.kafka.clients.NetworkClient.send(NetworkClient.java:332)
 at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.trySend(ConsumerNetworkClient.java:409)
 at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:252)
 at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208)
 at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:184)
 at 
org.apache.kafka.clients.consumer.internals.Fetcher.getTopicMetadata(Fetcher.java:314)
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:1386)
 at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka09PartitionDiscoverer.getAllPartitionsForTopics(Kafka09PartitionDiscoverer.java:75)
 at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.discoverPartitions(AbstractPartitionDiscoverer.java:128)
 at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:415)
 at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
 at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
 at java.lang.Thread.run(Thread.java:748)

The version of the Flink Kafka Connector I'm using is


org.apache.flink
flink-connector-kafka-0.11_2.11
1.4.0


I can write and consume from the Kafka cluster and can see the brokers in 
Zookeeper, can anyone tell me what the exception means and what I can do to 
resolve it?
Thanks very much in advance for your help.

Kind regards,
Oliver Buckley-Salmon





---
This e-mail may contain confidential and/or privileged information. If you are 
not the intended recipient (or have received this e-mail in error) please 
notify the sender immediately and delete this e-mail. Any unauthorized copying, 
disclosure or distribution of the material in this e-mail is strictly forbidden.

Please refer to https://www.db.com/disclosures for additional EU corporate and 
regulatory disclosures and to 
http://www.db.com/unitedkingdom/content/privacy.htm for information about 
privacy.


Re: maxOutOfOrderness

2018-09-07 Thread Hequn Cheng
Hi Nicos,

Setting it to Long.MAX_VALUE makes watermark always smaller than timestamp.
In this case, the event time window will never be triggered. It is
meaningless.

Best, Hequn

On Fri, Sep 7, 2018 at 11:17 PM Nicos Maris  wrote:

> Hello,
>
>
> Does maxOutOfOrderness
> 
> affect performance?
>
> Setting it to Long.MAX_VALUE doesn't affect performance, so either flink
> is really fast in my simple pipeline or my understanding is terribly wrong
> :p
>


maxOutOfOrderness

2018-09-07 Thread Nicos Maris
Hello,


Does maxOutOfOrderness

affect performance?

Setting it to Long.MAX_VALUE doesn't affect performance, so either flink is
really fast in my simple pipeline or my understanding is terribly wrong :p


Flink Failing to Connect to Kafka org.apache.kafka.common.protocol.types.SchemaException: Error computing size for field 'topics': java.lang.NullPointerException

2018-09-07 Thread Oliver Buckley-Salmon
Hi,
I have a Flink 1.4.0 cluster running on OpenShift with a job that connects to a 
Kafka 0.11.0.1 cluster in the same Openshift project. The job reads from one 
topic and writes to two others.
The job deploys OK but when it starts up it immediately crashes with the 
following exception
org.apache.kafka.common.protocol.types.SchemaException: Error computing size 
for field 'topics': java.lang.NullPointerException
 at org.apache.kafka.common.protocol.types.Schema.sizeOf(Schema.java:93)
 at 
org.apache.kafka.common.protocol.types.Struct.sizeOf(Struct.java:258)
 at 
org.apache.kafka.common.requests.AbstractRequestResponse.serialize(AbstractRequestResponse.java:28)
 at 
org.apache.kafka.common.requests.AbstractRequest.serialize(AbstractRequest.java:81)
 at 
org.apache.kafka.common.requests.AbstractRequest.toSend(AbstractRequest.java:74)
 at 
org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:396)
 at 
org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:370)
 at org.apache.kafka.clients.NetworkClient.send(NetworkClient.java:332)
 at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.trySend(ConsumerNetworkClient.java:409)
 at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:252)
 at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208)
 at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:184)
 at 
org.apache.kafka.clients.consumer.internals.Fetcher.getTopicMetadata(Fetcher.java:314)
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:1386)
 at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka09PartitionDiscoverer.getAllPartitionsForTopics(Kafka09PartitionDiscoverer.java:75)
 at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.discoverPartitions(AbstractPartitionDiscoverer.java:128)
 at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:415)
 at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
 at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
 at java.lang.Thread.run(Thread.java:748)

The version of the Flink Kafka Connector I'm using is


org.apache.flink
flink-connector-kafka-0.11_2.11
1.4.0


I can write and consume from the Kafka cluster and can see the brokers in 
Zookeeper, can anyone tell me what the exception means and what I can do to 
resolve it?
Thanks very much in advance for your help.

Kind regards,
Oliver Buckley-Salmon






---
This e-mail may contain confidential and/or privileged information. If you are 
not the intended recipient (or have received this e-mail in error) please 
notify the sender immediately and delete this e-mail. Any unauthorized copying, 
disclosure or distribution of the material in this e-mail is strictly forbidden.

Please refer to https://www.db.com/disclosures for additional EU corporate and 
regulatory disclosures and to 
http://www.db.com/unitedkingdom/content/privacy.htm for information about 
privacy.


Re: Behaviour of Process Window Function

2018-09-07 Thread Hequn Cheng
Hi Harshvardhan,

*> 1) Does the state in the process window function qualify as KeyedState
or OperatorState? *
KeyedState

*> We want to be able to rehydrate the guava cache at the beginning of each
window by making an external rest call and clear the cache at the end of
that respective window. How can we enforce this behaviour in Flink?*
Why do you want to clear cache after window if the cache is shared across
all keys. Do you want to load cache per key?
If you want to aggregate elements incrementally, I think it is hard to get
start and end in `ProcessWindowFunction` or in `IncrementalAggregation`
function. However, I think we can get start and end in the trigger
function, i.e., do cache load and clear in the trigger function.

Best, Hequn


On Fri, Sep 7, 2018 at 11:28 AM vino yang  wrote:

> Hi Harshvardhan,
>
> 1) Yes, ProcessWindowFunction extends AbstractRichFunction, through
> getRuntimeContext,you can access keyed state API.
> 2) ProcessWindowFunction has given you considerable flexibility, you can
> based on processing time / event time / timer / it's clear method /
> customized implementation, the specific design depends on your business
> logic, how long you need to save the cache.
>
> Thanks, vino.
>
> Harshvardhan Agrawal  于2018年9月6日周四
> 下午10:10写道:
>
>> Hello,
>>
>> We have a Flink pipeline where we are windowing our data after a keyBy.
>> i.e.
>> myStream.keyBy().window().process(MyIncrementalAggregation(),
>> MyProcessFunction()).
>>
>> I have two questions about the above line of code:
>> 1) Does the state in the process window function qualify as KeyedState or
>> OperatorState? If operator state, can we access KeyedState from the Process
>> Window function?
>> 2) We also have certain reference information that we want to share
>> across all keys in the process window function. We are currently storing
>> all that info in a Guava cache. We want to be able to rehydrate the guava
>> cache at the beginning of each window by making an external rest call and
>> clear the cache at the end of that respective window. How can we enforce
>> this behaviour in Flink? Do I need to use a timerservice for this where the
>> callback will be a window.maxtimestamp() or just clearing the cache in the
>> clear method will do the trick?
>>
>> --
>>
>> *Regards,Harshvardhan Agrawal*
>>
>


Re: Setting Flink Monitoring API Port on YARN Cluster

2018-09-07 Thread Austin Cawley-Edwards
Hi Gary,

Thank you so much for the detailed explanation and links. Extremely
helpful. For all others interested, this is also available through the YARN
CLI command `yarn application -status {appId}`.

Once again, thanks for your help!
Austin


On Fri, Sep 7, 2018, 2:24 AM Gary Yao  wrote:

> Hi Austin,
>
> The config options rest.port, jobmanager.web.port, etc. are intentionally
> ignored on YARN. The port should be chosen randomly to avoid conflicts with
> other containers [1]. I do not see a way how you can set a fixed port at
> the
> moment but there is a related ticket for that [2]. The Flink CLI determines
> the hostname and port from the YARN ApplicationReport [3][4] – you can do
> the
> same.
>
> Best,
> Gary
>
> [1]
> https://github.com/apache/flink/blob/d036417985d3e2b1ca63909007db9710e842abf4/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java#L103
>
> [2] https://issues.apache.org/jira/browse/FLINK-5758
>
> [3]
> https://github.com/apache/flink/blob/d036417985d3e2b1ca63909007db9710e842abf4/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java#L387
>
> [4]
> https://hadoop.apache.org/docs/r2.8.3/api/org/apache/hadoop/yarn/api/records/ApplicationReport.html#getRpcPort()
>
> On Fri, Sep 7, 2018 at 12:33 AM, Austin Cawley-Edwards <
> austin.caw...@gmail.com> wrote:
>
>> Hi everyone,
>>
>> I'm running a YARN session on a cluster with one master and one core and
>> would like to use the Monitoring API programmatically to submit jobs. I
>> have found that the configuration variables are read but ignored when
>> starting the session - it seems to choose a random port each run.
>>
>> Here's a snippet from the startup logs:
>>
>> 2018-09-06 21:44:38,763 INFO
>> org.apache.flink.configuration.GlobalConfiguration- Loading
>> configuration property: env.yarn.conf.dir, /etc/hadoop/conf
>> 2018-09-06 21:44:38,764 INFO
>> org.apache.flink.configuration.GlobalConfiguration- Loading
>> configuration property: env.hadoop.conf.dir, /etc/hadoop/conf
>> 2018-09-06 21:44:38,765 INFO
>> org.apache.flink.configuration.GlobalConfiguration- Loading
>> configuration property: rest.port, 44477
>> 2018-09-06 21:44:38,765 INFO
>> org.apache.flink.configuration.GlobalConfiguration- Loading
>> configuration property: jobmanager.web.port, 44477
>> 2018-09-06 21:44:38,765 INFO
>> org.apache.flink.configuration.GlobalConfiguration- Loading
>> configuration property: high-availability.jobmanager.port, 44477
>> 2018-09-06 21:44:38,775 INFO
>> org.apache.flink.yarn.cli.FlinkYarnSessionCli - Found Yarn
>> properties file under /tmp/.yarn-properties-hadoop.
>> 2018-09-06 21:44:39,615 WARN  org.apache.hadoop.util.NativeCodeLoader
>>- Unable to load native-hadoop library for your
>> platform... using builtin-java classes where applicable
>> 2018-09-06 21:44:39,799 INFO
>> org.apache.flink.runtime.security.modules.HadoopModule- Hadoop user
>> set to hadoop (auth:SIMPLE)
>> 2018-09-06 21:44:40,045 INFO  org.apache.hadoop.yarn.client.RMProxy
>>- Connecting to ResourceManager at
>> ip-10-2-3-71.ec2.internal/10.2.3.71:8032
>> 2018-09-06 21:44:40,312 INFO
>> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Cluster
>> specification: ClusterSpecification{masterMemoryMB=1024,
>> taskManagerMemoryMB=4096, numberTaskManagers=1, slotsPerTaskManager=1}
>> 2018-09-06 21:44:43,564 INFO
>> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Submitting
>> application master application_1536250520330_0007
>> 2018-09-06 21:44:43,802 INFO
>> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted
>> application application_1536250520330_0007
>> 2018-09-06 21:44:43,802 INFO
>> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Waiting for
>> the cluster to be allocated
>> 2018-09-06 21:44:43,804 INFO
>> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deploying
>> cluster, current state ACCEPTED
>> 2018-09-06 21:44:48,326 INFO
>> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - YARN
>> application has been deployed successfully.
>> 2018-09-06 21:44:48,326 INFO
>> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - The Flink
>> YARN client has been started in detached mode. In order to stop Flink on
>> YARN, use the following command or a YARN web interface to stop it:
>> yarn application -kill application_1536250520330_0007
>> Please also note that the temporary files of the YARN session in the home
>> directory will not be removed.
>> 2018-09-06 21:44:48,821 INFO  org.apache.flink.runtime.rest.RestClient
>>   - Rest client endpoint started.
>> Flink JobManager is now running on ip-10-2-3-25.ec2.internal:38683 with
>> leader id ----.
>> JobManager Web Interface: http://ip-10-2-3-25.ec2.internal:38683
>>
>>
>> I'm setting both the 

Re: Flink 1.6 Job fails with IllegalStateException: Buffer pool is destroyed.

2018-09-07 Thread 杨力
Thank you for you advice. I had not noticed that the log level was set to
WARN.
INFO logs suggest that the job fails because of akka timeout and the root
cause is long gc pause.

On Fri, Sep 7, 2018 at 5:43 PM Zhijiang(wangzhijiang999) <
wangzhijiang...@aliyun.com> wrote:

> You may need to config at least INFO level for logger in flink, and
> currently the messages are so limited for debugging the problem.
>
> Best,
> Zhijiang
>
> --
> 发件人:杨力 
> 发送时间:2018年9月7日(星期五) 17:21
> 收件人:Zhijiang(wangzhijiang999) 
> 主 题:Re: Flink 1.6 Job fails with IllegalStateException: Buffer pool is
> destroyed.
>
>
> I have checked logs from yarn nodemanagers, and there are no killing
> action record. There are no job canceling record in jobmanager's log either.
>
> Here are job logs retrieved from yarn.
>
> https://pastebin.com/raw/1yHLYR65
>
> Zhijiang(wangzhijiang999)  于 2018年9月7日周五
> 下午3:22写道:
> Hi,
>
> I think the problem in the attched image is not the root cause of your job
> failure. It must exist other task or TaskManager failures, then all the
> related tasks will be cancelled by job manager, and the problem in attched
> image is just caused by task cancelled.
>
> You can review the log of job manager to check whether there are any
> failures to cause failing the whole job.
> FYI, the task manager may be killed by yarn because of memory exceed. You
> mentioned the job fails in half an hour after starts, so I guess it exits
> the possibility that the task manager is killed by yarn.
>
> Best,
> Zhijiang
> --
> 发件人:杨力 
> 发送时间:2018年9月7日(星期五) 13:09
> 收件人:user 
> 主 题:Flink 1.6 Job fails with IllegalStateException: Buffer pool is
> destroyed.
>
> Hi all,
> I am encountering a weird problem when running flink 1.6 in yarn per-job
> clusters.
> The job fails in about half an hour after it starts. Related logs is
> attached as an imange.
>
> This piece of log comes from one of the taskmanagers. There are not any
> other related log lines.
> No ERROR-level logs. The job just runs for tens of minutes without
> printing any logs
> and suddenly throws this exception.
>
> It is reproducable in my production environment, but not in my test
> environment.
> The 'Buffer pool is destroed' exception is always thrown while emitting
> latency marker.
>
> cy marker.

>


Re: Operator metrics do not get unregistered after job finishes

2018-09-07 Thread Helmut Zechmann
Hi Vino,

The log shows no problems. The problem can be reproduced easily. I created 
https://issues.apache.org/jira/browse/FLINK-10300 
.

Best,

Helmut

> On 18. Aug 2018, at 04:53, vino yang  wrote:
> 
> Hi Helmut,
> 
> Is the metrics of all the sub task instances of a job not unregistered, or 
> part of it is not unregistered. Is there any exception log information 
> available?
> 
> Please feel free to create a JIRA issue and clearly describe your problem.
> 
> Thanks, vino.
> 
> Helmut Zechmann mailto:hel...@adeven.com>> 于2018年8月17日周五 
> 下午11:14写道:
> Hi all,
> 
> 
> we are using flink 1.5.2 in batch mode with prometheus monitoring.
> 
> We noticed that a few metrics do not get unregistered after a job is finished:
> 
> flink_taskmanager_job_task_operator_numRecordsIn
> flink_taskmanager_job_task_operator_numRecordsInPerSecond
> flink_taskmanager_job_task_operator_numRecordsOut
> flink_taskmanager_job_task_operator_numRecordsOutPerSecond
> 
> 
> Those metrics stay in the taksmanager metrics list until the task manger gets 
> restarted.
> 
> Our metrics config is:
> 
> metrics.reporters: prom
> metrics.reporter.prom.class: 
> org.apache.flink.metrics.prometheus.PrometheusReporter
> metrics.reporter.prom.port: 7000-7001
> 
> metrics.scope.jm : flink..jobmanager
> metrics.scope.tm : flink..taskmanager.
> metrics.scope.jm.job: flink..jobmanager.
> metrics.scope.tm.job: flink..taskmanager..
> metrics.scope.task: 
> flink..taskmanager
> metrics.scope.operator: 
> flink..taskmanager
> 
> 
> Since we run many batch jobs, this makes prometheus monitoring unusable for 
> us. Is this a known issue?
> 
> 
> Best,
> 
> Helmut



Re: How to customize schedule mode and result partition type?

2018-09-07 Thread 陈梓立
Sorry to attach this message here, but if someone see this email please
reply an ack. Without any reply I wonder if this email has been received by
someone in the mail list.

Best,
tison.


Re: After OutOfMemoryError State can not be readed

2018-09-07 Thread Stefan Richter
Hi,

what I can say is that any failures like OOMs should not corrupt checkpoint 
files, because only successfully completed checkpoints are used for recovery by 
the job manager. Just to get a bit more info, are you using full or incremental 
checkpoints? Unfortunately, it is a bit hard to say from the given information 
what the cause of the problem is. Typically, these problems have been observed 
when something was wrong with a serializer or a stateful serializer was used 
from multiple threads.

Best,
Stefan 

> Am 07.09.2018 um 05:04 schrieb vino yang :
> 
> Hi Edward,
> 
> From this log: Caused by: java.io.EOFException, it seems that the state 
> metadata file has been corrupted.
> But I can't confirm it, maybe Stefan knows more details, Ping him for you.
> 
> Thanks, vino.
> 
> Edward Rojas mailto:edward.roja...@gmail.com>> 
> 于2018年9月7日周五 上午1:22写道:
> Hello all,
> 
> We are running Flink 1.5.3 on Kubernetes with RocksDB as statebackend. 
> When performing some load testing we got an /OutOfMemoryError: native memory
> exhausted/, causing the job to fail and be restarted.
> 
> After the Taskmanager is restarted, the job is recovered from a Checkpoint,
> but it seems that there is a problem when trying to access the state. We got
> the error from the *onTimer* function of a *onProcessingTime*.
> 
> It would be possible that the OOM error could have caused to checkpoint a
> corrupted state?
> 
> We get Exceptions like:
> 
> TimerException{java.lang.RuntimeException: Error while retrieving data from
> RocksDB.}
> at
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:288)
> at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:522)
> at java.util.concurrent.FutureTask.run(FutureTask.java:277)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:191)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1160)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
> at java.lang.Thread.run(Thread.java:811)
> Caused by: java.lang.RuntimeException: Error while retrieving data from
> RocksDB.
> at
> org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:89)
> at com.xxx.ProcessFunction.*onTimer*(ProcessFunction.java:279)
> at
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.invokeUserFunction(KeyedProcessOperator.java:94)
> at
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.*onProcessingTime*(KeyedProcessOperator.java:78)
> at
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.*onProcessingTime*(HeapInternalTimerService.java:266)
> at
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285)
> ... 7 more
> Caused by: java.io.EOFException
> at java.io.DataInputStream.readFully(DataInputStream.java:208)
> at java.io.DataInputStream.readUTF(DataInputStream.java:618)
> at java.io.DataInputStream.readUTF(DataInputStream.java:573)
> at
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:381)
> at
> org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:87)
> ... 12 more
> 
> 
> Thanks in advance for any help
> 
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ 
> 



Re: Flink 1.6 Job fails with IllegalStateException: Buffer pool is destroyed.

2018-09-07 Thread vino yang
Hi Bill,

Can you provide more information, such as whether Checkpoint is enabled and
whether exact-once is specified, and whether there is back pressure
generated in the Flink web UI.
Here is a ticket that also gives feedback to this question. [1]
Stackoverflow has also been asked the same question, but I don't know if
the answer is valid.[2]

[1]: https://issues.apache.org/jira/browse/FLINK-9054
[2]:
Https://stackoverflow.com/questions/48276484/flink-throwing-java-lang-runtimeexception-buffer-pool-is-destroyed

Thanks, vino.

杨力  于2018年9月7日周五 下午1:09写道:

> Hi all,
> I am encountering a weird problem when running flink 1.6 in yarn per-job
> clusters.
> The job fails in about half an hour after it starts. Related logs is
> attached as an imange.
>
> This piece of log comes from one of the taskmanagers. There are not any
> other related log lines.
> No ERROR-level logs. The job just runs for tens of minutes without
> printing any logs
> and suddenly throws this exception.
>
> It is reproducable in my production environment, but not in my test
> environment.
> The 'Buffer pool is destroed' exception is always thrown while emitting
> latency marker.
>


回复:Flink 1.6 Job fails with IllegalStateException: Buffer pool is destroyed.

2018-09-07 Thread Zhijiang(wangzhijiang999)
Hi,

I think the problem in the attched image is not the root cause of your job 
failure. It must exist other task or TaskManager failures, then all the related 
tasks will be cancelled by job manager, and the problem in attched image is 
just caused by task cancelled.

You can review the log of job manager to check whether there are any failures 
to cause failing the whole job.
 FYI, the task manager may be killed by yarn because of memory exceed. You 
mentioned the job fails in half an hour after starts, so I guess it exits the 
possibility that the task manager is killed by yarn.

Best,
Zhijiang
--
发件人:杨力 
发送时间:2018年9月7日(星期五) 13:09
收件人:user 
主 题:Flink 1.6 Job fails with IllegalStateException: Buffer pool is destroyed.

Hi all,
I am encountering a weird problem when running flink 1.6 in yarn per-job 
clusters.
The job fails in about half an hour after it starts. Related logs is attached 
as an imange.

This piece of log comes from one of the taskmanagers. There are not any other 
related log lines.
No ERROR-level logs. The job just runs for tens of minutes without printing any 
logs
and suddenly throws this exception.

It is reproducable in my production environment, but not in my test environment.
The 'Buffer pool is destroed' exception is always thrown while emitting latency 
marker.



Re: Setting Flink Monitoring API Port on YARN Cluster

2018-09-07 Thread Gary Yao
Hi Austin,

The config options rest.port, jobmanager.web.port, etc. are intentionally
ignored on YARN. The port should be chosen randomly to avoid conflicts with
other containers [1]. I do not see a way how you can set a fixed port at the
moment but there is a related ticket for that [2]. The Flink CLI determines
the hostname and port from the YARN ApplicationReport [3][4] – you can do
the
same.

Best,
Gary

[1]
https://github.com/apache/flink/blob/d036417985d3e2b1ca63909007db9710e842abf4/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java#L103

[2] https://issues.apache.org/jira/browse/FLINK-5758

[3]
https://github.com/apache/flink/blob/d036417985d3e2b1ca63909007db9710e842abf4/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java#L387

[4]
https://hadoop.apache.org/docs/r2.8.3/api/org/apache/hadoop/yarn/api/records/ApplicationReport.html#getRpcPort()

On Fri, Sep 7, 2018 at 12:33 AM, Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:

> Hi everyone,
>
> I'm running a YARN session on a cluster with one master and one core and
> would like to use the Monitoring API programmatically to submit jobs. I
> have found that the configuration variables are read but ignored when
> starting the session - it seems to choose a random port each run.
>
> Here's a snippet from the startup logs:
>
> 2018-09-06 21:44:38,763 INFO  
> org.apache.flink.configuration.GlobalConfiguration
>   - Loading configuration property: env.yarn.conf.dir,
> /etc/hadoop/conf
> 2018-09-06 21:44:38,764 INFO  
> org.apache.flink.configuration.GlobalConfiguration
>   - Loading configuration property: env.hadoop.conf.dir,
> /etc/hadoop/conf
> 2018-09-06 21:44:38,765 INFO  
> org.apache.flink.configuration.GlobalConfiguration
>   - Loading configuration property: rest.port, 44477
> 2018-09-06 21:44:38,765 INFO  
> org.apache.flink.configuration.GlobalConfiguration
>   - Loading configuration property: jobmanager.web.port, 44477
> 2018-09-06 21:44:38,765 INFO  
> org.apache.flink.configuration.GlobalConfiguration
>   - Loading configuration property: high-availability.jobmanager.port,
> 44477
> 2018-09-06 21:44:38,775 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli
>- Found Yarn properties file under
> /tmp/.yarn-properties-hadoop.
> 2018-09-06 21:44:39,615 WARN  org.apache.hadoop.util.NativeCodeLoader
>- Unable to load native-hadoop library for your
> platform... using builtin-java classes where applicable
> 2018-09-06 21:44:39,799 INFO  
> org.apache.flink.runtime.security.modules.HadoopModule
>   - Hadoop user set to hadoop (auth:SIMPLE)
> 2018-09-06 21:44:40,045 INFO  org.apache.hadoop.yarn.client.RMProxy
>- Connecting to ResourceManager at
> ip-10-2-3-71.ec2.internal/10.2.3.71:8032
> 2018-09-06 21:44:40,312 INFO  
> org.apache.flink.yarn.AbstractYarnClusterDescriptor
>  - Cluster specification: ClusterSpecification{masterMemoryMB=1024,
> taskManagerMemoryMB=4096, numberTaskManagers=1, slotsPerTaskManager=1}
> 2018-09-06 21:44:43,564 INFO  
> org.apache.flink.yarn.AbstractYarnClusterDescriptor
>  - Submitting application master application_1536250520330_0007
> 2018-09-06 21:44:43,802 INFO  
> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl
>- Submitted application application_1536250520330_0007
> 2018-09-06 21:44:43,802 INFO  
> org.apache.flink.yarn.AbstractYarnClusterDescriptor
>  - Waiting for the cluster to be allocated
> 2018-09-06 21:44:43,804 INFO  
> org.apache.flink.yarn.AbstractYarnClusterDescriptor
>  - Deploying cluster, current state ACCEPTED
> 2018-09-06 21:44:48,326 INFO  
> org.apache.flink.yarn.AbstractYarnClusterDescriptor
>  - YARN application has been deployed successfully.
> 2018-09-06 21:44:48,326 INFO  
> org.apache.flink.yarn.AbstractYarnClusterDescriptor
>  - The Flink YARN client has been started in detached mode. In
> order to stop Flink on YARN, use the following command or a YARN web
> interface to stop it:
> yarn application -kill application_1536250520330_0007
> Please also note that the temporary files of the YARN session in the home
> directory will not be removed.
> 2018-09-06 21:44:48,821 INFO  org.apache.flink.runtime.rest.RestClient
>   - Rest client endpoint started.
> Flink JobManager is now running on ip-10-2-3-25.ec2.internal:38683 with
> leader id ----.
> JobManager Web Interface: http://ip-10-2-3-25.ec2.internal:38683
>
>
> I'm setting both the rest.port and jobmanager.web.port, but both are
> ignored. Has anyone seen this before?
>
> Thanks!
>