Need help debugging back pressure job

2017-05-22 Thread Fritz Budiyanto
Hi All,

Any tips on debugging back pressure ? I have a workload where it get stuck 
after it ran for a couple of hours.
I assume the cause of the back pressure is the block next to the one showing as 
having the back pressure, is this right ?

Any idea on how to get the backtrace ? (I’m using standalone combined jm/tm 
with parallelism of 1, and the suspected block is doing ProcessFunction with 
event timers)

—
Fritz




Re: Excessive stdout is causing java heap out of mem

2017-05-22 Thread Fritz Budiyanto
Hi Robert,

Thanks Robert, I’ll start using the logger. 

I didn’t pay attention whether the error occur when I accessed the log from job 
manager.
I will do that in my next test.

Anyone has any suggestion on how to debug out of memory exception on flink 
jm/tm ?

—
Fritz


> On May 22, 2017, at 12:04 PM, Robert Metzger  wrote:
> 
> Hi Fritz,
> 
> The TaskManagers are not buffering all stdout for the webinterface (at least 
> I'm not aware of that). Did the error occur when accessing the log from the 
> JobManager?
> Flinks web front end lazily loads the logs from the taskmanagers.
> 
> The suggested method for logging is to use slf4j for logging, so the 
> following code snippets :
> 
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
> private static final Logger LOG = LoggerFactory.getLogger(MyJob.class);
> Then you can do stuff like:
> LOG.info("My log statement");
> Also, using a logging Framework will allow you to redirect the log contents 
> of your job to a separate file.
> 
> But I'm not sure if the logging is really causing the TaskManager JVMs to die 
> ...
> 
> 
> On Sat, May 20, 2017 at 3:12 AM, Fritz Budiyanto  > wrote:
> Hi,
> 
> I notice that when I enabled DataStreamSink’s print() for debugging, (kinda 
> excessive printing), its causing java Heap out of memory.
> Possibly the Task Manager is buffering all stdout for the WebInterface? I 
> haven’t spent time debugging it, but I wonder if this is expected where 
> massive print will exhaust java heap, and I’m using standalone mode.
> 
> Is there a way to disable this memory logging for web interface, and just 
> redirect stdout to file instead with file rotation?
> What is the suggested method of logging ?
> 
> —
> Fritz
> 



Re: trying to externalize checkpoint to s3

2017-05-22 Thread Ted Yu
Adding back user@

Please check the hadoop-common jar in the classpath.

On Mon, May 22, 2017 at 7:12 PM, Sathi Chowdhury <
sathi.chowdh...@elliemae.com> wrote:

> Tried it ,
>
> It does not fail like before but a new error popped up..looks like a jar
> problem(clash ) to me
>
> thanks
>
> java.lang.Exception: Error while triggering checkpoint 2 for Source:
> Custom Source (1/1)
>
> at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1120)
>
> at java.util.concurrent.Executors$RunnableAdapter.
> call(Executors.java:511)
>
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
>
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
>
> at java.lang.Thread.run(Thread.java:745)
>
> Caused by: java.lang.NoSuchMethodError: org.apache.hadoop.conf.
> Configuration.addResource(Lorg/apache/hadoop/conf/Configuration;)V
>
> at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.initialize(
> EmrFileSystem.java:95)
>
> at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.
> initialize(HadoopFileSystem.java:320)
>
> at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(
> FileSystem.java:271)
>
> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:310)
>
> at org.apache.flink.core.fs.Path.getFileSystem(Path.java:293)
>
> at org.apache.flink.runtime.state.filesystem.
> FsCheckpointStreamFactory.(FsCheckpointStreamFactory.java:105)
>
> at org.apache.flink.runtime.state.filesystem.FsStateBackend.
> createStreamFactory(FsStateBackend.java:172)
>
> at org.apache.flink.streaming.runtime.tasks.StreamTask$
> CheckpointingOperation.createStreamFactory(StreamTask.java:1155)
>
> at org.apache.flink.streaming.runtime.tasks.StreamTask$
> CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1137)
>
> at org.apache.flink.streaming.runtime.tasks.StreamTask$
> CheckpointingOperation.executeCheckpointing(StreamTask.java:1076)
>
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> checkpointState(StreamTask.java:641)
>
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> performCheckpoint(StreamTask.java:586)
>
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> triggerCheckpoint(StreamTask.java:529)
>
> at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:)
>
> ... 5 common frames omitted
>
>
>
>
>
> *From: *Ted Yu 
> *Date: *Monday, May 22, 2017 at 6:52 PM
> *To: *Sathi Chowdhury 
> *Subject: *Re: trying to externalize checkpoint to s3
>
>
>
> Have you tried specifying subdirectory such as the following ?
>
>
>
> s3://abc-checkpoint/subdir
>
>
>
> On Mon, May 22, 2017 at 6:34 PM, Sathi Chowdhury <
> sathi.chowdh...@elliemae.com> wrote:
>
> We are  running flink 1.2 in pre production
>
> I am trying to test checkpoint stored in external location in s3
>
>
>
> I have set these below in flink-conf.yaml
>
>
>
> state.backend: filesystem
>
> state.checkpoints.dir: s3://abc-checkpoint
>
> state.backend.fs.checkpointdir: s3://abc-checkpoint
>
>
>
> I get this failure in job manager log
>
> java.lang.Exception: Cannot initialize File System State Backend with URI
> 's3://abc-checkpoint.
>
> at org.apache.flink.runtime.state.filesystem.
> FsStateBackendFactory.createFromConfig(FsStateBackendFactory.java:57)
>
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> createStateBackend(StreamTask.java:719)
>
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:223)
>
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>
> at java.lang.Thread.run(Thread.java:745)
>
> Caused by: java.lang.IllegalArgumentException: Cannot use the root
> directory for checkpoints.
>
> at org.apache.flink.runtime.state.filesystem.FsStateBackend.
> validateAndNormalizeUri(FsStateBackend.java:225)
>
> at org.apache.flink.runtime.state.filesystem.
> FsStateBackend.(FsStateBackend.java:153)
>
> at org.apache.flink.runtime.state.filesystem.
> FsStateBackendFactory.createFromConfig(FsStateBackendFactory.java:5
>
> Any clue? I thought as I am using EMR Hadoop to s3 integration is already
> working.
>
> Thanks
> Sathi
>
> =Notice to Recipient: This e-mail transmission, and any
> documents, files or previous e-mail messages attached to it may contain
> information that is confidential or legally privileged, and intended for
> the use of the individual or entity named above. If you are not the
> intended recipient, or a person responsible for delivering it to the
> intended recipient, you are hereby notified that you must not read this
> transmission and that any disclosure, copying, printing, distribution or
> use of any of the information contained in 

Re: trying to externalize checkpoint to s3

2017-05-22 Thread SHI Xiaogang
Hi Sathi,

According to the format specification of URI, "abc-checkpoint" is the host
name in the given uri and the path is null. Therefore, FsStateBackend are
complaining about the usage of the root directory.

Maybe "s3:///abc-checkpoint" ("///" instead of "//") is the uri that you
want to use. It will put all checkpoints under the path "/abc-checkpoint".

Regards,
Xiaogang


2017-05-23 9:34 GMT+08:00 Sathi Chowdhury :

> We are  running flink 1.2 in pre production
>
> I am trying to test checkpoint stored in external location in s3
>
>
>
> I have set these below in flink-conf.yaml
>
>
>
> state.backend: filesystem
>
> state.checkpoints.dir: s3://abc-checkpoint
>
> state.backend.fs.checkpointdir: s3://abc-checkpoint
>
>
>
> I get this failure in job manager log
>
> java.lang.Exception: Cannot initialize File System State Backend with URI
> 's3://abc-checkpoint.
>
> at org.apache.flink.runtime.state.filesystem.
> FsStateBackendFactory.createFromConfig(FsStateBackendFactory.java:57)
>
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> createStateBackend(StreamTask.java:719)
>
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:223)
>
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>
> at java.lang.Thread.run(Thread.java:745)
>
> Caused by: java.lang.IllegalArgumentException: Cannot use the root
> directory for checkpoints.
>
> at org.apache.flink.runtime.state.filesystem.FsStateBackend.
> validateAndNormalizeUri(FsStateBackend.java:225)
>
> at org.apache.flink.runtime.state.filesystem.
> FsStateBackend.(FsStateBackend.java:153)
>
> at org.apache.flink.runtime.state.filesystem.
> FsStateBackendFactory.createFromConfig(FsStateBackendFactory.java:5
>
> Any clue? I thought as I am using EMR Hadoop to s3 integration is already
> working.
>
> Thanks
> Sathi
> =Notice to Recipient: This e-mail transmission, and any
> documents, files or previous e-mail messages attached to it may contain
> information that is confidential or legally privileged, and intended for
> the use of the individual or entity named above. If you are not the
> intended recipient, or a person responsible for delivering it to the
> intended recipient, you are hereby notified that you must not read this
> transmission and that any disclosure, copying, printing, distribution or
> use of any of the information contained in or attached to this transmission
> is STRICTLY PROHIBITED. If you have received this transmission in error,
> please immediately notify the sender by telephone or return e-mail and
> delete the original transmission and its attachments without reading or
> saving in any manner. Thank you. =
>


trying to externalize checkpoint to s3

2017-05-22 Thread Sathi Chowdhury
We are  running flink 1.2 in pre production
I am trying to test checkpoint stored in external location in s3

I have set these below in flink-conf.yaml

state.backend: filesystem
state.checkpoints.dir: s3://abc-checkpoint
state.backend.fs.checkpointdir: s3://abc-checkpoint

I get this failure in job manager log
java.lang.Exception: Cannot initialize File System State Backend with URI 
's3://abc-checkpoint.
at 
org.apache.flink.runtime.state.filesystem.FsStateBackendFactory.createFromConfig(FsStateBackendFactory.java:57)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.createStateBackend(StreamTask.java:719)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:223)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalArgumentException: Cannot use the root directory 
for checkpoints.
at 
org.apache.flink.runtime.state.filesystem.FsStateBackend.validateAndNormalizeUri(FsStateBackend.java:225)
at 
org.apache.flink.runtime.state.filesystem.FsStateBackend.(FsStateBackend.java:153)
at 
org.apache.flink.runtime.state.filesystem.FsStateBackendFactory.createFromConfig(FsStateBackendFactory.java:5
Any clue? I thought as I am using EMR Hadoop to s3 integration is already 
working.
Thanks
Sathi
=Notice to Recipient: This e-mail transmission, and any documents, 
files or previous e-mail messages attached to it may contain information that 
is confidential or legally privileged, and intended for the use of the 
individual or entity named above. If you are not the intended recipient, or a 
person responsible for delivering it to the intended recipient, you are hereby 
notified that you must not read this transmission and that any disclosure, 
copying, printing, distribution or use of any of the information contained in 
or attached to this transmission is STRICTLY PROHIBITED. If you have received 
this transmission in error, please immediately notify the sender by telephone 
or return e-mail and delete the original transmission and its attachments 
without reading or saving in any manner. Thank you. =


Re: ERROR while creating save points..

2017-05-22 Thread Sathi Chowdhury
I was able to bypass that one ..by running it from bin/flink …
Now encountering
by: java.lang.NullPointerException: Checkpoint properties say that the 
checkpoint should have been persisted, but missing external path.

From: Sathi Chowdhury 
Date: Monday, May 22, 2017 at 12:55 PM
To: "user@flink.apache.org" 
Subject: ERROR while creating save points..

Hi Flink Dev,
I am running flink on yarn from EMR and I was running this command to test an 
external savepoint
flink savepoint 8c4c885c5899544de556c5caa984502d  /mnt


The program finished with the following exception:

org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException: Could not 
retrieve the leader gateway
at 
org.apache.flink.runtime.util.LeaderRetrievalUtils.retrieveLeaderGateway(LeaderRetrievalUtils.java:127)
at 
org.apache.flink.client.program.ClusterClient.getJobManagerGateway(ClusterClient.java:645)
at 
org.apache.flink.client.CliFrontend.getJobManagerGateway(CliFrontend.java:868)
at 
org.apache.flink.client.CliFrontend.triggerSavepoint(CliFrontend.java:653)
at 
org.apache.flink.client.CliFrontend.savepoint(CliFrontend.java:643)
at 
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1016)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after 
[1 milliseconds]
at 
scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at 
scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at 
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:107)
at scala.concurrent.Await.result(package.scala)
at 
org.apache.flink.runtime.util.LeaderRetrievalUtils.retrieveLeaderGateway(LeaderRetrievalUtils.java:125)
... 6 more
2017-05-22 19:50:10,330 INFO  org.apache.flink.yarn.YarnClusterClient   
- Shutting down YarnClusterClient from the client shutdown hook
2017-05-22 19:50:10,330 INFO  org.apache.flink.yarn.YarnClusterClient   
- Disconnecting YarnClusterClient from ApplicationMaster
[hadoop@ip-10-202-4-24 flink]$ flink savepoint 8c4c885c5899544de556c5caa984502d 
 /mnt

Any clue what’s wrong?
=Notice to Recipient: This e-mail transmission, and any documents, 
files or previous e-mail messages attached to it may contain information that 
is confidential or legally privileged, and intended for the use of the 
individual or entity named above. If you are not the intended recipient, or a 
person responsible for delivering it to the intended recipient, you are hereby 
notified that you must not read this transmission and that any disclosure, 
copying, printing, distribution or use of any of the information contained in 
or attached to this transmission is STRICTLY PROHIBITED. If you have received 
this transmission in error, please immediately notify the sender by telephone 
or return e-mail and delete the original transmission and its attachments 
without reading or saving in any manner. Thank you. =
=Notice to Recipient: This e-mail transmission, and any documents, 
files or previous e-mail messages attached to it may contain information that 
is confidential or legally privileged, and intended for the use of the 
individual or entity named above. If you are not the intended recipient, or a 
person responsible for delivering it to the intended recipient, you are hereby 
notified that you must not read this transmission and that any disclosure, 
copying, printing, distribution or use of any of the information contained in 
or attached to this transmission is STRICTLY PROHIBITED. If you have received 
this transmission in error, please immediately notify the sender by telephone 
or return e-mail and delete the original transmission and its attachments 
without reading or saving in any manner. Thank you. =


Re: yarnship option

2017-05-22 Thread Mikhail Pryakhin
Hi Robert!Thanks a lot for your reply!>Can you double check if the job-libs/flink-connector-kafka-0.9_2.11-1.2.1.jar contains org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09 ?The jar does contain the class org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.class (The contents of the jar file is listed below)> Also, were there any previous Kafka09 related exceptions in the log??    No, it was the very first exception… You can find log file attached. the flink-connector-kafka-0.9_2.11-1.2.1.jar contains the following:root@host:~# unzip -l job-libs/flink-connector-kafka-0.9_2.11-1.2.1.jarArchive:  job-libs/flink-connector-kafka-0.9_2.11-1.2.1.jar  Length      Date    Time    Name-  -- -           0  2017-04-11 01:59   META-INF/      443  2017-04-11 01:59   META-INF/MANIFEST.MF     1451  2017-04-11 01:59   META-INF/DEPENDENCIES    11358  2017-04-11 01:59   META-INF/LICENSE      182  2017-04-11 01:59   META-INF/NOTICE        0  2017-04-11 01:59   org/        0  2017-04-11 01:59   org/apache/        0  2017-04-11 01:59   org/apache/flink/        0  2017-04-11 01:59   org/apache/flink/streaming/        0  2017-04-11 01:59   org/apache/flink/streaming/connectors/        0  2017-04-11 01:59   org/apache/flink/streaming/connectors/kafka/    11554  2017-04-11 01:59   org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.class     2603  2017-04-11 01:59   org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.class     4463  2017-04-11 01:59   org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.class     2693  2017-04-11 01:59   org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.class        0  2017-04-11 01:59   org/apache/flink/streaming/connectors/kafka/internal/     1125  2017-04-11 01:59   org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge.class      570  2017-04-11 01:59   org/apache/flink/streaming/connectors/kafka/internal/Handover$WakeupException.class    10268  2017-04-11 01:59   org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.class      570  2017-04-11 01:59   org/apache/flink/streaming/connectors/kafka/internal/Handover$ClosedException.class     3018  2017-04-11 01:59   org/apache/flink/streaming/connectors/kafka/internal/Handover.class    11854  2017-04-11 01:59   org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.class     2182  2017-04-11 01:59   org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread$CommitCallback.class      311  2017-04-11 01:59   org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread$1.class     2329  2017-04-11 01:59   org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.class        0  2017-04-11 01:59   META-INF/maven/        0  2017-04-11 01:59   META-INF/maven/org.apache.flink/        0  2017-04-11 01:59   META-INF/maven/org.apache.flink/flink-connector-kafka-0.9_2.11/     6039  2017-04-11 01:59   META-INF/maven/org.apache.flink/flink-connector-kafka-0.9_2.11/pom.xml      131  2017-04-11 01:59   META-INF/maven/org.apache.flink/flink-connector-kafka-0.9_2.11/pom.properties        0  2017-04-11 01:59   META-INF/maven/org.apache.flink/force-shading/     3285  2017-04-11 01:59   META-INF/maven/org.apache.flink/force-shading/pom.xml      114  2017-04-11 01:59   META-INF/maven/org.apache.flink/force-shading/pom.properties-                     ---    76543                     33 files

flink-root-client-dmpkit-dev-dn1.log
Description: Binary data
--Mike PryakhinOn 22 May 2017, at 22:14, Robert Metzger  wrote:Hi,this issue is unexpected :) Can you double check if the job-libs/flink-connector-kafka-0.9_2.11-1.2.1.jar contains org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09 ?Also, were there any previous Kafka09 related exceptions in the log??From this SO answer, it seems that this is not really the classical classNotFoundException, but a bit differenT: https://stackoverflow.com/a/5756989/568695On Mon, May 22, 2017 at 5:12 PM, Mikhail Pryakhin  wrote:Hi all!

I'm playing with flink streaming job on yarn cluster. The job consumes events from kafka and prints them to the standard out.
The job uses flink-connector-kafka-0.9_2.11-1.2.1.jar library that is passed via the --yarnship option.
Here is the way I run the job:

export HADOOP_USER_NAME=hdfs; \
export HADOOP_CONF_DIR=/etc/hadoop/conf/; \
/opt/flink-1.2.1/bin/flink run \
        -yst \
        -yt /home/user/job-libs  \
        -m yarn-cluster \
        -yn 3 \
        -c com.flink.Test \
        flink-test_2.11-1.0.0-SNAPSHOT.jar

Finally the job fails complaing that it can't find the class:
java.lang.NoClassDefFoundError: org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09

I looked through the jobmanager.log and found that the flink-connector-kafka-0.9_2.11-1.2.1.jar library is added to the classpath:

2017-05-22 17:41:59,637 

Re: State in Custom Tumble Window Class

2017-05-22 Thread rhashmi
Could you elaborate this more? If i assume if i set window time to max ..
does it mean my window will stay for infinite time framework, 
Wouldn't this may hit memory overflow with time?




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/State-in-Custom-Tumble-Window-Class-tp13177p13255.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


ERROR while creating save points..

2017-05-22 Thread Sathi Chowdhury
Hi Flink Dev,
I am running flink on yarn from EMR and I was running this command to test an 
external savepoint
flink savepoint 8c4c885c5899544de556c5caa984502d  /mnt


The program finished with the following exception:

org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException: Could not 
retrieve the leader gateway
at 
org.apache.flink.runtime.util.LeaderRetrievalUtils.retrieveLeaderGateway(LeaderRetrievalUtils.java:127)
at 
org.apache.flink.client.program.ClusterClient.getJobManagerGateway(ClusterClient.java:645)
at 
org.apache.flink.client.CliFrontend.getJobManagerGateway(CliFrontend.java:868)
at 
org.apache.flink.client.CliFrontend.triggerSavepoint(CliFrontend.java:653)
at 
org.apache.flink.client.CliFrontend.savepoint(CliFrontend.java:643)
at 
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1016)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after 
[1 milliseconds]
at 
scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at 
scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at 
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:107)
at scala.concurrent.Await.result(package.scala)
at 
org.apache.flink.runtime.util.LeaderRetrievalUtils.retrieveLeaderGateway(LeaderRetrievalUtils.java:125)
... 6 more
2017-05-22 19:50:10,330 INFO  org.apache.flink.yarn.YarnClusterClient   
- Shutting down YarnClusterClient from the client shutdown hook
2017-05-22 19:50:10,330 INFO  org.apache.flink.yarn.YarnClusterClient   
- Disconnecting YarnClusterClient from ApplicationMaster
[hadoop@ip-10-202-4-24 flink]$ flink savepoint 8c4c885c5899544de556c5caa984502d 
 /mnt

Any clue what’s wrong?
=Notice to Recipient: This e-mail transmission, and any documents, 
files or previous e-mail messages attached to it may contain information that 
is confidential or legally privileged, and intended for the use of the 
individual or entity named above. If you are not the intended recipient, or a 
person responsible for delivering it to the intended recipient, you are hereby 
notified that you must not read this transmission and that any disclosure, 
copying, printing, distribution or use of any of the information contained in 
or attached to this transmission is STRICTLY PROHIBITED. If you have received 
this transmission in error, please immediately notify the sender by telephone 
or return e-mail and delete the original transmission and its attachments 
without reading or saving in any manner. Thank you. =


Async IO Question

2017-05-22 Thread Frank Xue
Hi,

I have a question related to async io for Flink. I found that when running
unordered (AsyncDataStream.unorderedWait) failures within each individual
asyncInvoke is added back to be retried, but when I run it ordered
(AsyncDataStream.orderedWait) and an exception is thrown within
asyncInvoke, it just stops the whole process. Is this expected behavior?

Thanks,
Frank

-- 
*Frank Xue* | Software Engineer | w.
3405 Piedmont Rd. NE, Suite 325, Atlanta, GA 30305

*The First Multi-SaaS Management Platform
*


Re: yarnship option

2017-05-22 Thread Robert Metzger
Hi,

this issue is unexpected :) Can you double check if the
job-libs/flink-connector-kafka-0.9_2.11-1.2.1.jar contains
org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09 ?
Also, were there any previous Kafka09 related exceptions in the log??

>From this SO answer, it seems that this is not really the classical
classNotFoundException, but a bit differenT:
https://stackoverflow.com/a/5756989/568695

On Mon, May 22, 2017 at 5:12 PM, Mikhail Pryakhin 
wrote:

> Hi all!
>
> I'm playing with flink streaming job on yarn cluster. The job consumes
> events from kafka and prints them to the standard out.
> The job uses flink-connector-kafka-0.9_2.11-1.2.1.jar library that is
> passed via the --yarnship option.
> Here is the way I run the job:
>
> export HADOOP_USER_NAME=hdfs; \
> export HADOOP_CONF_DIR=/etc/hadoop/conf/; \
> /opt/flink-1.2.1/bin/flink run \
> -yst \
> -yt /home/user/job-libs  \
> -m yarn-cluster \
> -yn 3 \
> -c com.flink.Test \
> flink-test_2.11-1.0.0-SNAPSHOT.jar
>
> Finally the job fails complaing that it can't find the class:
> java.lang.NoClassDefFoundError: org/apache/flink/streaming/
> connectors/kafka/FlinkKafkaConsumer09
>
> I looked through the jobmanager.log and found that the
> flink-connector-kafka-0.9_2.11-1.2.1.jar library is added to the
> classpath:
>
> 2017-05-22 17:41:59,637 INFO  
> org.apache.flink.yarn.YarnApplicationMasterRunner
>-  Classpath: job-libs/flink-connector-
> kafka-0.9_2.11-1.2.1.jar:
>
> Could please help to figure out why the class org/apache/flink/streaming/
> connectors/kafka/FlinkKafkaConsumer09 can't be loaded inspite of the jar
> containing this class was added to the classpath
>
> 
> Mike Pryakhin


Re: Job submission: Fail using command line. Success using web (flink1.2.0)

2017-05-22 Thread Robert Metzger
Hi Rami,

I think the problem is that when submitting your job through the web
interface, Akka will not use remoting (= it will not send your message to a
remote machine). When you submit your message from the client, it'll go
through akka's network stack (=remoting).
Akka rejects messages above a certain size. It looks like your job exceeds
that size.

Here, you'll find the config keys to increase the size:
https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/config.html#distributed-coordination-via-akka

On Mon, May 22, 2017 at 2:56 PM, Rami Al-Isawi 
wrote:

> Hi Robert,
>
> Yes there is an OversizedPayloadException in the job manager log:
> ---
> 2017-05-22 15:39:18,942 INFO  
> org.apache.flink.runtime.client.JobSubmissionClientActor
> - Upload jar files to job manager akka.tcp://flink@localhost:
> 6123/user/jobmanager.
> 2017-05-22 15:39:18,957 INFO  org.apache.flink.runtime.blob.BlobClient
>   - Blob client connecting to akka.tcp://flink@localhost:
> 6123/user/jobmanager
> 2017-05-22 15:39:19,451 INFO  
> org.apache.flink.runtime.client.JobSubmissionClientActor
> - Submit job to the job manager akka.tcp://flink@localhost:
> 6123/user/jobmanager.
> 2017-05-22 15:39:19,632 ERROR akka.remote.EndpointWriter
>   - Transient association error (association remains live)
> akka.remote.OversizedPayloadException: Discarding oversized payload sent
> to Actor[akka.tcp://flink@localhost:6123/user/jobmanager#1736150911]: max
> allowed size 10485760 bytes, actual size of encoded class
> org.apache.flink.runtime.messages.JobManagerMessages$LeaderSessionMessage
> was 24555177 bytes.
> 2017-05-22 15:41:19,472 ERROR org.apache.flink.client.CliFrontend
>   - Error while running the command.
> org.apache.flink.client.program.ProgramInvocationException: The program
> execution failed: Couldn't retrieve the JobExecutionResult from the
> JobManager.
> at org.apache.flink.client.program.ClusterClient.run(
> ClusterClient.java:427)
> at org.apache.flink.client.program.StandaloneClusterClient.submitJob(
> StandaloneClusterClient.java:101)
> at org.apache.flink.client.program.ClusterClient.run(
> ClusterClient.java:400)
> at org.apache.flink.streaming.api.environment.StreamContextEnvironment.
> execute(StreamContextEnvironment.java:66)
> at fwdnxt.Sonar.main(Sonar.java:162)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at org.apache.flink.client.program.PackagedProgram.callMainMethod(
> PackagedProgram.java:528)
> at org.apache.flink.client.program.PackagedProgram.
> invokeInteractiveModeForExecution(PackagedProgram.java:419)
> at org.apache.flink.client.program.ClusterClient.run(
> ClusterClient.java:339)
> at org.apache.flink.client.CliFrontend.executeProgram(
> CliFrontend.java:831)
> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256)
> at org.apache.flink.client.CliFrontend.parseParameters(
> CliFrontend.java:1073)
> at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120)
> at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117)
> at org.apache.flink.runtime.security.HadoopSecurityContext$1.run(
> HadoopSecurityContext.java:43)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at org.apache.hadoop.security.UserGroupInformation.doAs(
> UserGroupInformation.java:1548)
> at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(
> HadoopSecurityContext.java:40)
> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1116)
> Caused by: org.apache.flink.runtime.client.JobExecutionException:
> Couldn't retrieve the JobExecutionResult from the JobManager.
> at org.apache.flink.runtime.client.JobClient.
> awaitJobResult(JobClient.java:294)
> at org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.
> java:382)
> at org.apache.flink.client.program.ClusterClient.run(
> ClusterClient.java:423)
> ... 22 more
> Caused by: 
> org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException:
> Job submission to the JobManager timed out. You may increase
> 'akka.client.timeout' in case the JobManager needs more time to configure
> and confirm the job submission.
> -
> Does this help in explaining why it fails only in the command line client?
> How can I fix it, what is "JobManagerMessages$LeaderSessionMessage was
> 24555177 bytes”?
>
>
> Regards,
> -Rami
>
> On 12 May 2017, at 19:21, Robert Metzger  wrote:
>
> Hi,
> did you check the jobmanager log for any incoming messages? I would be
> interesting to see if the JM failed after the initial akka message, or if
> there's any kind of hiccup ?
>
> On 

yarnship option

2017-05-22 Thread Mikhail Pryakhin
Hi all!

I'm playing with flink streaming job on yarn cluster. The job consumes events 
from kafka and prints them to the standard out.
The job uses flink-connector-kafka-0.9_2.11-1.2.1.jar library that is passed 
via the --yarnship option.
Here is the way I run the job:

export HADOOP_USER_NAME=hdfs; \
export HADOOP_CONF_DIR=/etc/hadoop/conf/; \
/opt/flink-1.2.1/bin/flink run \
-yst \
-yt /home/user/job-libs  \
-m yarn-cluster \
-yn 3 \
-c com.flink.Test \
flink-test_2.11-1.0.0-SNAPSHOT.jar

Finally the job fails complaing that it can't find the class:
java.lang.NoClassDefFoundError: 
org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09

I looked through the jobmanager.log and found that the 
flink-connector-kafka-0.9_2.11-1.2.1.jar library is added to the classpath:

2017-05-22 17:41:59,637 INFO  org.apache.flink.yarn.YarnApplicationMasterRunner 
-  Classpath: 
job-libs/flink-connector-kafka-0.9_2.11-1.2.1.jar:

Could please help to figure out why the class 
org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09 can't be 
loaded inspite of the jar containing this class was added to the classpath


Mike Pryakhin

Re: Flink metrics related problems/questions

2017-05-22 Thread Aljoscha Krettek
Ah ok, the onTimer() and processElement() methods are all protected by 
synchronized blocks on the same lock. So that shouldn’t be a problem.

> On 22. May 2017, at 15:08, Chesnay Schepler  wrote:
> 
> Yes, that could cause the observed issue.
> 
> The default implementations are not thread-safe; if you do concurrent writes 
> they may be lost/overwritten.
> You will have to either guard accesses to that metric with a synchronized 
> block or implement your own thread-safe counter.
> 
> On 22.05.2017 14:17, Aljoscha Krettek wrote:
>> @Chesnay With timers it will happen that onTimer() is called from a 
>> different Thread than the Tread that is calling processElement(). If Metrics 
>> updates happen in both, would that be a problem?
>> 
>>> On 19. May 2017, at 11:57, Chesnay Schepler  wrote:
>>> 
>>> 2. isn't quite accurate actually; metrics on the TaskManager are not 
>>> persisted across restarts.
>>> 
>>> On 19.05.2017 11:21, Chesnay Schepler wrote:
 1. This shouldn't happen. Do you access the counter from different threads?
 
 2. Metrics in general are not persisted across restarts, and there is no 
 way to configure flink to do so at the moment.
 
 3. Counters are sent as gauges since as far as I know StatsD counters are 
 not allowed to be decremented.
 
 On 19.05.2017 08:56, jaxbihani wrote:
> Background: We are using a job using ProcessFunction which reads data from
> kafka fires ~5-10K timers per second and sends matched events to 
> KafkaSink.
> We are collecting metrics for collecting no of active timers, no of timers
> scheduled etc. We use statsd reporter and monitor using Grafana dashboard 
> &
> RocksDBStateBackend backed by HDFS as state.
> 
> Observations/Problems:
> 1. *Counter value suddenly got reset:*  While job was running fine, on one
> fine moment, metric of a monotonically increasing counter (Counter where 
> we
> just used inc() operation) suddenly became 0 and then resumed from there
> onwards. Only exception in the logs were related to transient connectivity
> issues to datanodes. Also there was no other indicator of any failure
> observed after inspecting system metrics/checkpoint metrics.  It happened
> just once across multiple runs of a same job.
> 2. *Counters not retained during flink restart with savepoint*: Cancelled
> job with -s option taking savepoint and then restarted the job using the
> savepoint.  After restart metrics started from 0. I was expecting metric
> value of a given operator would also be part of state.
> 3. *Counter metrics getting sent as Gauge*: Using tcpdump I was inspecting
> the format in which metric are sent to statsd. I observed that even the
> metric which in my code were counters, were sent as gauges. I didn't get 
> why
> that was so.
> 
> Can anyone please add more insights into why above mentioned behaviors 
> would
> have happened?
> Also does flink store metric values as a part of state for stateful
> operators? Is there any way to configure that?
> 
> 
> 
> 
> -- 
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-metrics-related-problems-questions-tp13218.html
> Sent from the Apache Flink User Mailing List archive. mailing list 
> archive at Nabble.com.
> 
 
>> 
> 



Re: Flink metrics related problems/questions

2017-05-22 Thread Chesnay Schepler

Yes, that could cause the observed issue.

The default implementations are not thread-safe; if you do concurrent 
writes they may be lost/overwritten.
You will have to either guard accesses to that metric with a 
synchronized block or implement your own thread-safe counter.


On 22.05.2017 14:17, Aljoscha Krettek wrote:

@Chesnay With timers it will happen that onTimer() is called from a different 
Thread than the Tread that is calling processElement(). If Metrics updates 
happen in both, would that be a problem?


On 19. May 2017, at 11:57, Chesnay Schepler  wrote:

2. isn't quite accurate actually; metrics on the TaskManager are not persisted 
across restarts.

On 19.05.2017 11:21, Chesnay Schepler wrote:

1. This shouldn't happen. Do you access the counter from different threads?

2. Metrics in general are not persisted across restarts, and there is no way to 
configure flink to do so at the moment.

3. Counters are sent as gauges since as far as I know StatsD counters are not 
allowed to be decremented.

On 19.05.2017 08:56, jaxbihani wrote:

Background: We are using a job using ProcessFunction which reads data from
kafka fires ~5-10K timers per second and sends matched events to KafkaSink.
We are collecting metrics for collecting no of active timers, no of timers
scheduled etc. We use statsd reporter and monitor using Grafana dashboard &
RocksDBStateBackend backed by HDFS as state.

Observations/Problems:
1. *Counter value suddenly got reset:*  While job was running fine, on one
fine moment, metric of a monotonically increasing counter (Counter where we
just used inc() operation) suddenly became 0 and then resumed from there
onwards. Only exception in the logs were related to transient connectivity
issues to datanodes. Also there was no other indicator of any failure
observed after inspecting system metrics/checkpoint metrics.  It happened
just once across multiple runs of a same job.
2. *Counters not retained during flink restart with savepoint*: Cancelled
job with -s option taking savepoint and then restarted the job using the
savepoint.  After restart metrics started from 0. I was expecting metric
value of a given operator would also be part of state.
3. *Counter metrics getting sent as Gauge*: Using tcpdump I was inspecting
the format in which metric are sent to statsd. I observed that even the
metric which in my code were counters, were sent as gauges. I didn't get why
that was so.

Can anyone please add more insights into why above mentioned behaviors would
have happened?
Also does flink store metric values as a part of state for stateful
operators? Is there any way to configure that?




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-metrics-related-problems-questions-tp13218.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.









Re: Checkpoint ?

2017-05-22 Thread Aljoscha Krettek
Hi Jim,

What are your checkpointing settings? Are you checkpointing to a distributed 
file system, such as HDFS or S3 or the local file system. The latter should not 
be used in a production setting and I would not expect this to work properly. 
(Except if the local filesystem is actually a network mounted file system)

Best,
Aljoscha

> On 15. May 2017, at 17:05, Jim Langston  wrote:
> 
> Hi all,
>  
> I have a long running , streaming app saving checkpoints to
> the file system. 
>  
> What is the layout of the checkpoint directory ? My current
> checkpoint directory has >2000 directories in it , similar to this:
>  
> chk-4645
>  
>  
> Also, the directory has grown to >3GB
>  
> I have a small cluster, and all were started at the same time, nothing
> has been restarted, but this is occurring one of the nodes, the others have
> about the same number of directories in the checkpoint directory, but
> not nearly as large.
>  
>  
> Why are there so many chk- directories ? And why can they become
> so large ? Is there something I should be setting in the yaml file ?
>  
> I was going to just remove them , but it just struck me as odd that there
> are so many …
>  
>  
> Thanks
>  
> Jim



Re: Implementing Flink Jobs :: Java-API or Scala-API

2017-05-22 Thread Aljoscha Krettek
Hi,

New features should be available on both the Java and Scala API at the same 
time so you can pick whatever suits you best. If you ever find something that 
doesn’t work in one API but does work in the other, please file and issue for 
that.

Best,
Aljoscha
> On 21. May 2017, at 17:11, Mikhail Pryakhin  wrote:
> 
> Hi all! 
> 
> I'm currently choosing which API to stick with while implementing Flink jobs 
> (primarily streaming jobs). 
> Could you please shed a light on in which API new features are implemented 
> first? Do all Flink features are available in both APIs?
> 
> Many thanks in advance.
> 
> Best Regards, 
> Mike Pryakhin



Re: Need help on Streaming API | Flink | GlobalWindow and Customized Trigger

2017-05-22 Thread Aljoscha Krettek
Hi,

If your could give us a look at your custom Trigger we might be able to figure 
out what’s going on.

Best,
Aljoscha
> On 22. May 2017, at 09:06, Samim Ahmed  wrote:
> 
> Hello All,
> 
> Hope you are doing well..
>  
> Myself Samim and I am working of POC(proof of concept) for a project. In this 
> project we are using Apache Flink to process the stream data and find the 
> required pattern and finally dump those patterns in DB.
>  
> So to implement this we have used the global window and customized trigger to 
> done our work.
> While testing we observed that output is coming as expected but we are 
> loosing the data for few minutes when the Stream ends at input.
>  
> For example If the data streaming stared at 1pm and it ends at 5pm on the 
> same day and in out put we found the data is missing for the time 4:55pm to 5 
> pm. Also we observed when the input data stream finishes immediately the 
> entire process stops and the last few minutes data are remains inside the 
> window.
>  
> We need your help here to overcome this last minutes data missing issue as I 
> am new to this flink framework. Do we have any API available to solve this 
> problem or it is the Flink limitation?
>  
> It’ll be great if you share your views and do let me know if you need any 
> further information.
>  
> I am waiting for your inputs, Thanks in advance.
>  
> Thanks,
> Samim.



Re: Job submission: Fail using command line. Success using web (flink1.2.0)

2017-05-22 Thread Rami Al-Isawi
Hi Robert,

Yes there is an OversizedPayloadException in the job manager log:
---
2017-05-22 15:39:18,942 INFO  
org.apache.flink.runtime.client.JobSubmissionClientActor  - Upload jar 
files to job manager akka.tcp://flink@localhost:6123/user/jobmanager.
2017-05-22 15:39:18,957 INFO  org.apache.flink.runtime.blob.BlobClient  
- Blob client connecting to 
akka.tcp://flink@localhost:6123/user/jobmanager
2017-05-22 15:39:19,451 INFO  
org.apache.flink.runtime.client.JobSubmissionClientActor  - Submit job to 
the job manager akka.tcp://flink@localhost:6123/user/jobmanager.
2017-05-22 15:39:19,632 ERROR akka.remote.EndpointWriter
- Transient association error (association remains live)
akka.remote.OversizedPayloadException: Discarding oversized payload sent to 
Actor[akka.tcp://flink@localhost:6123/user/jobmanager#1736150911]: max allowed 
size 10485760 bytes, actual size of encoded class 
org.apache.flink.runtime.messages.JobManagerMessages$LeaderSessionMessage was 
24555177 bytes.
2017-05-22 15:41:19,472 ERROR org.apache.flink.client.CliFrontend   
- Error while running the command.
org.apache.flink.client.program.ProgramInvocationException: The program 
execution failed: Couldn't retrieve the JobExecutionResult from the JobManager.
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
at 
org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:101)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400)
at 
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
at fwdnxt.Sonar.main(Sonar.java:162)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:419)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:339)
at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:831)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1073)
at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120)
at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117)
at 
org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1116)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Couldn't 
retrieve the JobExecutionResult from the JobManager.
at org.apache.flink.runtime.client.JobClient.awaitJobResult(JobClient.java:294)
at 
org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:382)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:423)
... 22 more
Caused by: 
org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException: Job 
submission to the JobManager timed out. You may increase 'akka.client.timeout' 
in case the JobManager needs more time to configure and confirm the job 
submission.
-
Does this help in explaining why it fails only in the command line client? How 
can I fix it, what is "JobManagerMessages$LeaderSessionMessage was 24555177 
bytes”?


Regards,
-Rami
On 12 May 2017, at 19:21, Robert Metzger 
> wrote:

Hi,
did you check the jobmanager log for any incoming messages? I would be 
interesting to see if the JM failed after the initial akka message, or if 
there's any kind of hiccup ?

On Thu, May 11, 2017 at 5:07 PM, Rami Al-Isawi 
> wrote:
Hi,

The same exact jar on the same machine is being deployed just fine in couple of 
seconds using the web interface. On the other hand, if I used the command line, 
I get:

org.apache.flink.client.program.ProgramInvocationException: The program 
execution failed: Couldn't retrieve the JobExecutionResult from the JobManager.
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
at 
org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:101)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400)
at 

Re: State in Custom Tumble Window Class

2017-05-22 Thread Aljoscha Krettek
Hi,

Why don’t you set the allowed lateness to Long.MAX_VALUE? This way no data will 
ever be considered late. If you make the trigger via 
PurgingTrigger.of(EventTimeTrigger.of(…)). You ensure that window state is not 
kept after a window fires.

Best,
Aljoscha

> On 17. May 2017, at 13:39, rizhashmi  wrote:
> 
> Yes .. is there any possibility access flink state variables in
> WindowAssigner.assignWindows method?
> 
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/State-in-Custom-Tumble-Window-Class-tp13177p13196.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.



Re: Flink metrics related problems/questions

2017-05-22 Thread Aljoscha Krettek
@Chesnay With timers it will happen that onTimer() is called from a different 
Thread than the Tread that is calling processElement(). If Metrics updates 
happen in both, would that be a problem?

> On 19. May 2017, at 11:57, Chesnay Schepler  wrote:
> 
> 2. isn't quite accurate actually; metrics on the TaskManager are not 
> persisted across restarts.
> 
> On 19.05.2017 11:21, Chesnay Schepler wrote:
>> 1. This shouldn't happen. Do you access the counter from different threads?
>> 
>> 2. Metrics in general are not persisted across restarts, and there is no way 
>> to configure flink to do so at the moment.
>> 
>> 3. Counters are sent as gauges since as far as I know StatsD counters are 
>> not allowed to be decremented.
>> 
>> On 19.05.2017 08:56, jaxbihani wrote:
>>> Background: We are using a job using ProcessFunction which reads data from
>>> kafka fires ~5-10K timers per second and sends matched events to KafkaSink.
>>> We are collecting metrics for collecting no of active timers, no of timers
>>> scheduled etc. We use statsd reporter and monitor using Grafana dashboard &
>>> RocksDBStateBackend backed by HDFS as state.
>>> 
>>> Observations/Problems:
>>> 1. *Counter value suddenly got reset:*  While job was running fine, on one
>>> fine moment, metric of a monotonically increasing counter (Counter where we
>>> just used inc() operation) suddenly became 0 and then resumed from there
>>> onwards. Only exception in the logs were related to transient connectivity
>>> issues to datanodes. Also there was no other indicator of any failure
>>> observed after inspecting system metrics/checkpoint metrics.  It happened
>>> just once across multiple runs of a same job.
>>> 2. *Counters not retained during flink restart with savepoint*: Cancelled
>>> job with -s option taking savepoint and then restarted the job using the
>>> savepoint.  After restart metrics started from 0. I was expecting metric
>>> value of a given operator would also be part of state.
>>> 3. *Counter metrics getting sent as Gauge*: Using tcpdump I was inspecting
>>> the format in which metric are sent to statsd. I observed that even the
>>> metric which in my code were counters, were sent as gauges. I didn't get why
>>> that was so.
>>> 
>>> Can anyone please add more insights into why above mentioned behaviors would
>>> have happened?
>>> Also does flink store metric values as a part of state for stateful
>>> operators? Is there any way to configure that?
>>> 
>>> 
>>> 
>>> 
>>> -- 
>>> View this message in context: 
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-metrics-related-problems-questions-tp13218.html
>>> Sent from the Apache Flink User Mailing List archive. mailing list archive 
>>> at Nabble.com.
>>> 
>> 
>> 
> 



Re: ConnectedStream keyby issues

2017-05-22 Thread Aljoscha Krettek
Hi,

The State will never be automatically GC’ed. You have to do it in the onTimer() 
callback, as mentioned earlier.

Best,
Aljoscha
> On 19. May 2017, at 10:39, gaurav  wrote:
> 
> Hello 
> 
> I am little confused on when the state will be gc. For example,
> 
> Example 1:
> 
> Class abc extends RichProcessFunction,Tuple<>>
> {
>   public void processElement(..)
>   {
>   if(timer never set)
>   {
>ctx.timerService().registerEventTimeTimer(...);
>   }
>   }
>   public void onTimer(.)
>   {
>   // do some work 
>   ctx.timerService().registerEventTimeTimer(...);
>   }
> }
> 
> In example 1, will it ever be garbage collected? Also, in example1 in
> processElement we are only once registering eventTimer. Will it be gc when
> the second event arrives?
> 
> And  if we have:
> Example 2
> public void onTimer(.)
>   {
>   // do some work 
>  // no timer registeration 
>   }
> Will it be gc after completion of onTimer ?  
> 
> 
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ConnectedStream-keyby-issues-tp12999p13219.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.



Need help on Streaming API | Flink | GlobalWindow and Customized Trigger

2017-05-22 Thread Samim Ahmed
Hello All,


Hope you are doing well..



Myself Samim and I am working of POC(proof of concept) for a project. In
this project we are using Apache Flink to process the stream data and find
the required pattern and finally dump those patterns in DB.



So to implement this we have used the global window and customized trigger
to done our work.

While testing we observed that output is coming as expected but we are
loosing the data for few minutes when the Stream ends at input.



For example If the data streaming stared at 1pm and it ends at 5pm on the
same day and in out put we found the data is missing for the time 4:55pm to
5 pm. Also we observed when the input data stream finishes immediately the
entire process stops and the last few minutes data are remains inside the
window.



We need your help here to overcome this last minutes data missing issue as
I am new to this flink framework. Do we have any API available to solve
this problem or it is the Flink limitation?



It’ll be great if you share your views and do let me know if you need any
further information.



I am waiting for your inputs, Thanks in advance.



Thanks,

Samim.