RichAsyncFunction for Scala?

2019-05-14 Thread Shannon Carey
I have some awkward code in a few Flink jobs which is converting a Scala stream 
into a Java stream in order to pass it to AsyncDataStream.unorderedWait(), and 
using a Java RichAsyncFunction, due to old versions of Flink not having the 
ability to do async stuff with a Scala stream.

In newer versions of Flink, I see that 
org.apache.flink.streaming.api.scala.AsyncDataStream is available. However, it 
accepts only org.apache.flink.streaming.api.scala.async.AsyncFunction, and 
there does not appear to be an AbstractRichFunction subclass of that trait as I 
expected. Is there a way to use the Scala interfaces but provide a rich 
AsyncFunction to AsyncDataStream.unorderedWait()? If not, I will leave the old 
code as-is.

Thanks,
Shannon


Re: Back-pressure Status shows OK but records are backed up in kafka

2018-01-08 Thread Shannon Carey
Right, backpressure only measures backpressure on the inside of the Flink job. 
Ie. between Flink tasks.

Therefore, it’s up to you to monitor whether your Flink job is “keeping up” 
with the source stream. If you’re using Kafka, there’s a metric that the 
consumer library makes available. For example, for one of our jobs, in Graphite 
we have a metric that matches:

aliasByNode(stats.gauges.myapp.prod.us-east-1.myapp.*.taskmanager.*.job.com.example.$Job.operator.*.*.KafkaConsumer.records-lag-max,
 18, 19)

The “$Job” is a variable which allows you to select the job. You can see that I 
have wildcards on other elements of the path, for example the TaskManager id, 
the operator name, the Task index, etc. Your metric is probably rooted 
somewhere else, but the thing you’re looking for is under 
operator.*.*.KafkaConsumer.records-lag-max.

Flink manages its offsets itself, rather than acting like a “normal” consumer 
which commits offsets to Kafka. However, in the docs I see that 
“setCommitOffsetsOnCheckpoints()” is enabled by default.  So, theoretically you 
can use any sort of tool similar to https://github.com/srotya/kafka-lag-monitor 
or https://github.com/linkedin/Burrow etc. which polls Kafka itself and 
produces metrics about consumer lag. However, for some reason, I don’t see our 
Flink consumer metrics showing up in our lag monitoring tool or in the Kafka 
command-line tools, so I’m not sure what’s going on there. Maybe it’s because 
Flink doesn’t show up as a consumer group? At first I thought that it might be 
because we’re not setting the “group.id” property, but as it turns out we are 
indeed setting it. In any case, we have to use the job’s metrics, and monitor 
that the job is up, rather than monitoring the offset in Kafka itself. 

-Shannon

On 1/8/18, 1:52 AM, "Ufuk Celebi"  wrote:

Hey Jins,

our current back pressure tracking mechanism does not work with Kafka
sources. To gather back pressure indicators we sample the main task
thread of a subtask. For most tasks, this is the thread that emits
records downstream (e.g. if you have a map function) and everything
works as expected. In case of the Kafka source though there is a
separate thread that consumes from Kafka and emits the records.
Therefore we sample the "wrong" thread and don't observe any
indicators for back pressure. :-( Unfortunately, this was not taking
into account when back pressure sampling was implemented.

There is this old issue to track this:
https://issues.apache.org/jira/browse/FLINK-3456

I'm not aware of any other way to track this situation. Maybe others
can chime in here...

– Ufuk


On Mon, Jan 8, 2018 at 8:16 AM, Jins George  wrote:
> I have a Beam Pipeline consuming records from Kafka doing some
> transformations and writing it to Hbase. I faced an issue in which records
> were writing to Hbase at a slower rate than the incoming messages to Kafka
> due to a temporary surge in the incoming traffic.
>
> From the flink UI, if I check the back pressure status, it shows OK. I 
have
> one task which has all the operators including source.
>
> Any idea why backpressure indicator would show OK, but messages are backed
> up in Kafka.
>
> Is there any other mechanism/metrics by which I can identify this 
situation
> ?
>
> I'm running Flink 1.2/w beam 2.0.
>
> Thanks,
> Jins George





Re: Flink HA with Kubernetes, without Zookeeper

2017-08-21 Thread Shannon Carey
Zookeeper should still be necessary even in that case, because it is where the 
JobManager stores information which needs to be recovered after the JobManager 
fails.

We're eyeing https://github.com/coreos/zetcd as a way to run Zookeeper on top 
of Kubernetes' etcd cluster so that we don't have to rely on a separate 
Zookeeper cluster. However, we haven't tried it yet.

-Shannon

From: Hao Sun >
Date: Sunday, August 20, 2017 at 9:04 PM
To: "user@flink.apache.org" 
>
Subject: Flink HA with Kubernetes, without Zookeeper

Hi, I am new to Flink and trying to bring up a Flink cluster on top of 
Kubernetes.

For HA setup, with kubernetes, I think I just need one job manager and do not 
need Zookeeper? I will store all states to S3 buckets. So in case of failure, 
kubernetes can just bring up a new job manager without losing anything?

I want to confirm my assumptions above make sense. Thanks


JobManager HA behind load balancer

2017-08-16 Thread Shannon Carey
Is anyone running multiple JobManagers (in High Availability mode) behind a 
load balancer such as an AWS ELB or a software proxy such as HAProxy or Nginx?

Right now, it appears that server-side redirects that come from the JobManager 
Web UI use the internal IP address of the JobManager (from Akka). Therefore, if 
you're accessing your JobManager via a DNS name or a load balancing proxy, the 
redirect doesn't work properly. Has anyone created a workaround for this?

If there's no workaround, should we perhaps add a config setting to the 
JobManager to tell it what DNS name or root URL to use when sending redirect 
responses?

Also, it looks like at least some types of requests are not supported by 
non-master JobManagers, and therefore they respond with a redirect to the 
internal address of the master. Is it necessary to integrate the proxy with 
Zookeeper so that requests will only be proxied to the master JobManager? If 
the non-master nodes only send redirects, then including them as upstream 
servers in the proxy would be problematic.

Thanks for the info,
Shannon


Re: blob store defaults to /tmp and files get deleted

2017-08-04 Thread Shannon Carey
Stephan,

Regarding your last reply to 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/blob-store-defaults-to-tmp-and-files-get-deleted-td11720.html

You mention "Flink (via the user code class loader) actually holds a reference 
to the JAR files in "/tmp", so even if "/tmp" get wiped, the JAR file remains 
usable by the class loader". In my understanding, even if that's true, it 
doesn't work over a failure of the JobManager/TaskManager process, because the 
handle would be lost and the file would be gone.

We're still running Flink 1.2.1, so maybe we're missing out on some of the 
improvements that have been made. However, we recently had a problem with a 
batch (DataSet) job not restarting successfully, apparently after a JobManager 
failure. This particular job runs in AWS EMR (on Yarn) which means that only 
one JobManager is run at a time, and when it fails it gets restarted.

Here's what I can see from the logs. When the job restarts, it goes from 
CREATED -> RUNNING state, and then logs:

23:23:56,798 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph   
 (flink-akka.actor.default-dispatcher-55): Job com.expedia…MyJob 
(c58185a78dd64cfc9f12374bd1f9a679) switched from state RUNNING to SUSPENDED.
java.lang.Exception: JobManager is no longer the leader.
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:319)

I assume that's normal/expected, because the JobManager was restarted but some 
portion of the job state is still referring to the old one. Next, 
YarnJobManager logs: "Attempting to recover job 
c58185a78dd64cfc9f12374bd1f9a679." However, it subsequently fails:

2017-08-03 00:09:18,991 WARN  org.apache.flink.yarn.YarnJobManager  
(flink-akka.actor.default-dispatcher-96): Failed to recover job 
c58185a78dd64cfc9f12374bd1f9a679.
java.lang.Exception: Failed to retrieve the submitted job graph from state 
handle.
at 
org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore.recoverJobGraph(ZooKeeperSubmittedJobGraphStore.java:180)
…
Caused by: java.lang.RuntimeException: Unable to instantiate the hadoop input 
format
at 
org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase.readObject(HadoopInputFormatBase.java:319)
…
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:305)
at 
org.apache.flink.runtime.state.RetrievableStreamStateHandle.retrieveState(RetrievableStreamStateHandle.java:58)
at 
org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore.recoverJobGraph(ZooKeeperSubmittedJobGraphStore.java:178)
... 15 more
Caused by: java.lang.ClassNotFoundException: 
org.apache.parquet.avro.AvroParquetInputFormat
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at 
org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase.readObject(HadoopInputFormatBase.java:317)
... 69 more

The missing class comes from our job, so it seems like the job jar isn't 
present on the classpath of the JobManager. When I look at the contents of our 
configured blob storage directory (we're not using /tmp), I see subfolders like:

blobStore-7d40f1b9-7b06-400f-8c05-b5456adcd7f1
blobStore-f2d7974c-7d86-4b11-a7fb-d1936a4593ed

Only one of the two has a JAR in it, so it looks like there's a new directory 
created for each new JobManager. When I look in Zookeeper at nodes such as 
"/flink/main/jobgraphs/c58185a78dd64cfc9f12374bd1f9a679", I don't see those 
directories mentioned. I am wondering if someone can explain how Flink knows 
how to retrieve the job jar for job retry when the JobManager has failed? Are 
we running into a Flink bug here?

Thanks for the info,
Shannon



Re: JobManager Web UI

2017-05-11 Thread Shannon Carey
Since YARN must support people running multiple Flink clusters, the JobManager 
web UI binds to an ephemeral port by default (to prevent port usage conflicts). 
Also, the AM (and web UI) may be placed on any of the Yarn nodes. Therefore, if 
you wanted to access it directly instead of through the Yarn web proxy, you'd 
have to find what machine and port it is running on.

-Shannon

From: Shravan R >
Date: Thursday, May 11, 2017 at 12:43 PM
To: >
Subject: JobManager Web UI

I am running flink-1.1.4 on Cloudera distributed Hadoop (Yarn).
I am not able to get through JobManager webUI through 
http://:8081. I am able to get to it through
Yarn Running applications ---> application master. My flink-conf.yaml has 
jobmanager.web.port: 8081.

Amy I missing something here?

- Shravan


Yarn terminating TM for pmem limit cascades causing all jobs to fail

2017-04-18 Thread Shannon Carey
I'm on Flink 1.1.4. We had yet another occurrence of Yarn killing a TM due to 
exceeding pmem limits and all jobs failing as a result. I thought I had 
successfully disabled that check, but apparently the property doesn't work as 
expected in EMR.

From what I can tell in the logs, it looks like after the first TM was killed 
by Yarn, the jobs failed and were retried. However, when they are retried they 
cause increased pmem load on yet another TM, which results in Yarn killing 
another TM. That caused the jobs to fail again. This happened 5 times until our 
job retry policy gave up and allowed the jobs to fail permanently. Obviously, 
this situation is very problematic because it results in the loss of all job 
state, plus it requires manual intervention to start the jobs again.

The job retries eventually fail due to, "Could not restart the job ... The slot 
in which the task was executed has been released. Probably loss of TaskManager" 
or due to "Could not restart the job … Connection unexpectedly closed by remote 
task manager … This might indicate that the remote task manager was lost." 
Those are only the final failure causes: Flink does not appear to log the cause 
of intermediate restart failures.

I assume that the messages logged from the JobManager about "Association with 
remote system … has failed, address is now gated for [5000] ms. Reason is: 
[Disassociated]." is due to the TM failing, and is expected/harmless?

It seems like disabling the pmem check will fix this problem, but I am 
wondering if this is related: 
https://flink.apache.org/faq.html#the-slot-allocated-for-my-task-manager-has-been-released-what-should-i-do
 ? I don't see any log messages about quarantined TMs…

Do you think that increasing the # of job retries so that the jobs don't fail 
until all TMs are replaced with fresh ones fix this issue? The 
"memory.percent-free" metric from Collectd did go down to 2-3% on the TMs 
before they failed, and shot back up to 30-40% on TM restart (though I'm not 
sure how much of that had to do with the loss of state).  So, memory usage may 
be a real problem, but we don't get an OOM exception so I'm not sure we can 
control this from the JVM perspective. Are there other memory adjustments we 
should make which would allow our TMs to run for long periods of time without 
having this problem? Is there perhaps a memory leak in RocksDB?

Thanks for any help you can provide,
Shannon


Re: Connecting workflows in batch

2017-03-08 Thread Shannon Carey
It may not return for batch jobs, either. See my post 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Job-completion-or-failure-callback-td12123.html

In short, if Flink returned an OptimizerPlanEnvironment from your call to 
ExecutionEnvironment.getExecutionEnvironment, when you call execute() it will 
only generate the job plan (the job hasn't been submitted/isn't executing yet), 
and if no exceptions are thrown during creation of the job plan, then a 
ProgramAbortException is always thrown, and none of your code after execute() 
would run, and as a result you're definitely not able to use any 
JobExecutionResult in your main method, even though the code makes it looks 
like you will.

-Shannon

From: Aljoscha Krettek >
Date: Friday, March 3, 2017 at 9:36 AM
To: >
Subject: Re: Connecting workflows in batch

Yes, right now that call never returns for a long-running streaming job. We 
will (in the future) provide a way for that call to return so that the result 
can be used for checking aggregators and other things.


On Thu, Mar 2, 2017, at 19:14, Mohit Anchlia wrote:
Does it mean that for streaming jobs it never returns?

On Thu, Mar 2, 2017 at 6:21 AM, Till Rohrmann 
> wrote:

Hi Mohit,

StreamExecutionEnvironment.execute() will only return giving you the 
JobExecutionResult after the job has reached a final stage. If that works for 
you to schedule the second job, then it should be ok to combine both jobs in 
one program and execute the second job after the first one has completed.

Cheers,
Till


On Thu, Mar 2, 2017 at 2:33 AM, Mohit Anchlia 
> wrote:
It looks like JobExecutionResult can be used here by using the accumulators?

On Wed, Mar 1, 2017 at 8:37 AM, Aljoscha Krettek 
> wrote:
I think right now the best option is the JobManager REST interface: 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/rest_api.html

You would have to know the ID of your job and then you can poll the status of 
your running jobs.

On Mon, 27 Feb 2017 at 18:15 Mohit Anchlia 
> wrote:
What's the best way to track the progress of the job?

On Mon, Feb 27, 2017 at 7:56 AM, Aljoscha Krettek 
> wrote:
Hi Mohit,
I'm afraid there is nothing like this in Flink yet. As you mentioned you 
probably have to manually track the completion of one job and then trigger 
execution of the next one.

Best,
Aljoscha

On Fri, 24 Feb 2017 at 19:16 Mohit Anchlia 
> wrote:
Is there a way to connect 2 workflows such that one triggers the other if 
certain condition is met? However, the workaround may be to insert a 
notification in a topic to trigger another workflow. The problem is that the 
addSink ends the flow so if we need to add a trigger after addSink there 
doesn't seem to be any good way of sending a notification to a queue that the 
batch processing is complete. Any suggestions? One option could be track the 
progress of a job and on a successful completion add a notification. Is there 
such a mechanism available?



Job completion or failure callback?

2017-03-08 Thread Shannon Carey
Hi,

Is there any way we can run a callback on job completion or failure without 
leaving the client running during job execution? For example, when we submit 
the job via the web UI the main() method's call to 
ExecutionEnvironment#execute() appears to by asynchronous with the job 
execution. Therefore, the execute() call returns before the job is completed. 
This is a bit confusing because the behavior is different when run from the IDE 
vs. run in a cluster, and because signature of the returned class 
JobExecutionResult implies that it can tell you how long execution took (it has 
getNetRuntime()). We would like to be able to detect job completion or failure 
so that we can monitor the success or failure of batch jobs, in particular, so 
that we can react to failures appropriately. It seems like the JobManager 
should be capable of executing callbacks like this. Otherwise, we'll have to 
create an external component that eg. polls the web UI/API for job status.

Does the web UI run in the same JVM as the JobManager (when deployed in YARN)? 
If so, I would expect logs from the main method to appear in the JobManager 
logs. However, for some reason I can't find log messages or System.out  
messages when they are logged in the main() method after execute() is called. 
Why is that?
Edit: figured it out: OptimizerPlanEnvironment#execute() ends with "throw new 
ProgramAbortException()". Tricky and unexpected. That should definitely be 
mentioned in the javadocs of the execute() method! Even the documentation says, 
"The execute() method is returning a JobExecutionResult, this contains 
execution times and accumulator results." which isn't true, or at least isn't 
always true.

Thanks,
Shannon


Re: AWS exception serialization problem

2017-03-07 Thread Shannon Carey
> is there some shading logic involved in the dependencies, concerning the AWS 
> libraries?

Not that I am aware of. The AWS code is included in the job's fat jar as-is.



Re: Integrate Flink with S3 on EMR cluster

2017-03-07 Thread Shannon Carey
Generally, using S3 filesystem in EMR with Flink has worked pretty well for me 
in Flink < 1.2 (unless you run out of connections in your HTTP pool). When you 
say, "using Hadoop File System class", what do you mean? In my experience, it's 
sufficient to just use the "s3://" filesystem protocol and Flink's Hadoop 
integration (plus S3 filesystem classes provided by EMR) will do the right 
thing.

-Shannon


Re: AWS exception serialization problem

2017-03-06 Thread Shannon Carey
This happened when running Flink with bin/run-local.sh I notice that there only 
appears to be one Java process. The job manager and the task manager run in the 
same JVM, right? I notice, however, that there are two blob store folders on 
disk. Could the problem be caused by two different FlinkUserCodeClassLoader 
objects pointing to the two different JARs?


From: Shannon Carey <sca...@expedia.com<mailto:sca...@expedia.com>>
Date: Monday, March 6, 2017 at 6:39 PM
To: "user@flink.apache.org<mailto:user@flink.apache.org>" 
<user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: AWS exception serialization problem

Has anyone encountered this or know what might be causing it?



java.lang.RuntimeException: Could not forward element to next operator
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:394)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:366)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:349)
at 
org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:355)
at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:225)
at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.run(Kafka09Fetcher.java:253)
at java.lang.Thread.run(Thread.java:745)
Caused by: com.esotericsoftware.kryo.KryoException: Error during Java 
deserialization.
at 
com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:47)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:172)
at 
org.apache.flink.api.scala.typeutils.TrySerializer.copy(TrySerializer.scala:51)
at 
org.apache.flink.api.scala.typeutils.TrySerializer.copy(TrySerializer.scala:32)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:389)
... 7 more
Caused by: java.lang.ClassNotFoundException: 
com.amazonaws.services.s3.model.AmazonS3Exception
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:677)
at 
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1819)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1986)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
at 
java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:552)
at java.lang.Throwable.readObject(Throwable.java:914)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2122)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
at 
java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:552)
at java.lang.Throwable.readObject(Throwable.java:914)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2122)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
at java.io.ObjectInputStream

AWS exception serialization problem

2017-03-06 Thread Shannon Carey
Has anyone encountered this or know what might be causing it?



java.lang.RuntimeException: Could not forward element to next operator
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:394)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:366)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:349)
at 
org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:355)
at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:225)
at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.run(Kafka09Fetcher.java:253)
at java.lang.Thread.run(Thread.java:745)
Caused by: com.esotericsoftware.kryo.KryoException: Error during Java 
deserialization.
at 
com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:47)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:172)
at 
org.apache.flink.api.scala.typeutils.TrySerializer.copy(TrySerializer.scala:51)
at 
org.apache.flink.api.scala.typeutils.TrySerializer.copy(TrySerializer.scala:32)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:389)
... 7 more
Caused by: java.lang.ClassNotFoundException: 
com.amazonaws.services.s3.model.AmazonS3Exception
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:677)
at 
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1819)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1986)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
at 
java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:552)
at java.lang.Throwable.readObject(Throwable.java:914)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2122)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
at 
java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:552)
at java.lang.Throwable.readObject(Throwable.java:914)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2122)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
at 
java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:552)
at java.lang.Throwable.readObject(Throwable.java:914)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
at 

blob store defaults to /tmp and files get deleted

2017-02-17 Thread Shannon Carey
A few of my jobs recently failed and showed this exception:


org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user 
class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
ClassLoader info: URL ClassLoader:
file: 
'/tmp/blobStore-5f023409-6af5-4de6-8ed0-e80a2eb9633e/cache/blob_d9a9fb884f3b436030afcf7b8e1bce678acceaf2'
 (invalid JAR: zip file is empty)
Class not resolvable through given classloader.
at 
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:208)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:642)
at java.lang.Thread.run(Thread.java:745)


As you can see, Flink is storing things underneath /tmp, which is the 
(undocumented) default for the blob store. As you may know, on Linux, there's 
typically a program such as tmpwatch which is run periodically to clear out 
data from /tmp.


Flink also uses /tmp as the default for jobmanager.web.tmpdir (and 
jobmanager.web.upload.dir in 1.2).


Therefore, assuming that this is indeed the cause of the job failure/the 
exception, it seems highly advisable that when you run a Flink cluster you 
configure blob.storage.directory and jobmanager.web.tmpdir to a specific folder 
that is not beneath /tmp. I don't know if there is any material about setting 
up a production cluster, but this would definitely seem to be a necessary 
configuration to specify if you want to avoid problems. Enabling High 
Availability mode should also be on that list, I think.


-Shannon


Re: Performance tuning

2017-02-17 Thread Shannon Carey
One network setting is mentioned here:

https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/datastream_api.html#controlling-latency


From: Dmitry Golubets >
Date: Friday, February 17, 2017 at 6:43 AM
To: >
Subject: Performance tuning

Hi,

My streaming job cannot benefit much from parallelization unfortunately.
So I'm looking for things I can tune in Flink, to make it process sequential 
stream faster.

So far in our current engine based on Akka Streams (non distributed ofc) we 
have 20k msg/sec.
Ported to Flink I'm getting 14k so far.

My observations are following:

  *   if I chain operations together they execute all in sequence, so I 
basically sum up the time required to process one data item across all my 
stream operators, not good
  *   if I split chains, they execute asynchronously to each other, but there 
is serialization and network overhead

Second approach gives me better results, considering that I have a server with 
more than enough memory and cores to do all side work for serialization. But I 
want to reduce this serialization\data transfer overhead to a minimum.

So what I have now:

environment.getConfig.enableObjectReuse() // cos it's Scala we don't need 
unnecessary serialization
environment.getConfig.disableAutoTypeRegistration() // it works faster with it, 
I'm not sure why
environment.addDefaultKryoSerializer(..) // custom Message Pack serialization 
for all message types, gives about 50% boost

But that's it, I don't know what else to do.
I didn't find any interesting network\buffer settings in docs.

Best regards,
Dmitry


Re: Flink snapshotting to S3 - Timeout waiting for connection from pool

2017-01-26 Thread Shannon Carey
Haha, I see. Thanks.




On 1/26/17, 1:48 PM, "Chen Qin"  wrote:

>We worked around S3 and had a beer with our Hadoop engineers...
>
>
>
>--
>View this message in context: 
>http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-snapshotting-to-S3-Timeout-waiting-for-connection-from-pool-tp10994p11330.html
>Sent from the Apache Flink User Mailing List archive. mailing list archive at 
>Nabble.com.
>


Re: Rapidly failing job eventually causes "Not enough free slots"

2017-01-24 Thread Shannon Carey
I am running 1.1.4. It does look like there were problems with the connection 
to Zookeeper due to overworking the network. I'm not sure what I can do about 
it (not sure what happens when a JM loses leadership), but ideally a 
cluster-wide failure would not result in losing all the jobs, checkpoints, etc.

-Shannon

From: Stephan Ewen <se...@apache.org<mailto:se...@apache.org>>
Date: Tuesday, January 24, 2017 at 8:07 AM
To: <user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Re: Rapidly failing job eventually causes "Not enough free slots"

Hi!

I think there were some issues in the HA recovery of 1.1.3 that were fixed in 
1.1.4 and 1.2.0.
What version are you running on?

Stephan


On Sat, Jan 21, 2017 at 4:58 PM, Ufuk Celebi 
<u...@apache.org<mailto:u...@apache.org>> wrote:
Hey Shannon,

the final truth for recovery is in ZooKeeper. Can you check whether
there also references available in ZooKeeper? Do you have the job
manager logs available from after the failure? On recovery, Flink
checks ZooKeeper for entries. These point to files in the storageDir.
It could have happened that these got out of sync, e.g. entries
deleted from ZK but not from the storageDir.

Maybe the loss of the task managers can also be explained by a
connection loss to ZK or something. When a JM looses leadership, the
TMs cancel all tasks and disconnect from the JM. Here again, we would
need to look into the logs.

Sorry for the bad experience lately :-(

– Ufuk


On Sat, Jan 21, 2017 at 4:31 AM, Shannon Carey 
<sca...@expedia.com<mailto:sca...@expedia.com>> wrote:
> In fact, I can see all my job jar blobs and some checkpoint & job graph
> files in my configured "recovery.zookeeper.storageDir"… however for some
> reason it didn't get restored when my new Flink cluster started up.
>
>
> From: Shannon Carey <sca...@expedia.com<mailto:sca...@expedia.com>>
> Date: Friday, January 20, 2017 at 9:14 PM
> To: "user@flink.apache.org<mailto:user@flink.apache.org>" 
> <user@flink.apache.org<mailto:user@flink.apache.org>>
>
> Subject: Re: Rapidly failing job eventually causes "Not enough free slots"
>
> I recently added some better visibility into the metrics we're gathering
> from Flink. My Flink cluster died again due to the "Not enough free slots
> available to run the job" problem, and this time I can see that the number
> of registered task managers went down from 11 to 7, then waffled and only
> ever got back up to 10 (one short of the requested amount) before dropping
> to 0 just before the cluster died. This would seem to explain why there
> weren't sufficient slots (given that we were probably using them all or
> nearly all)… The metric of "running jobs" went down from 5 to 3 during this
> time period as well. So the problem seems to be loss of taskmanagers due to
> errors (not yet sure what exactly as I have to delve into logs).
>
> The other thing I have to figure out is restoring the jobs… I thought that
> HA would start the jobs back up again if Flink died & I re-launched it, but
> that doesn't appear to be the case.
>
>
> From: Stephan Ewen <se...@apache.org<mailto:se...@apache.org>>
> Date: Thursday, January 5, 2017 at 7:52 AM
> To: <user@flink.apache.org<mailto:user@flink.apache.org>>
> Subject: Re: Rapidly failing job eventually causes "Not enough free slots"
>
> Another thought on the container failure:
>
> in 1.1, the user code is loaded dynamically whenever a Task is started. That
> means that on every task restart the code is reloaded. For that to work
> proper, class unloading needs to happen, or the permgen will eventually
> overflow.
>
> It can happen that class unloading is prevented if the user functions do
> leave references around as "GC roots", which may be threads, or references
> in registries, etc.
>
> In Flink 1.2, YARN will put the user code into the application classpath, so
> code needs not be reloaded on every restart. That should solve that issue.
> To "simulate" that behavior in Flink 1.1, put your application code jars
> into the "lib" folder
>
> Best,
> Stephan
>
>
> On Thu, Jan 5, 2017 at 1:15 PM, Yury Ruchin 
> <yuri.ruc...@gmail.com<mailto:yuri.ruc...@gmail.com>> wrote:
>>
>> Hi,
>>
>> I've faced a similar issue recently. Hope sharing my findings will help.
>> The problem can be split into 2 parts:
>>
>> Source of container failures
>> The logs you provided indicate that YARN kills its containers for
>> exceeding memory limits. Important point here is that memory limit = JVM
>> heap memory + off-heap memory. So if off-heap memory usage is high, Y

Re: Flink snapshotting to S3 - Timeout waiting for connection from pool

2017-01-24 Thread Shannon Carey
I haven't seen it yet, I'll let you know if I do.

My last whole-cluster failure seems to have been caused by placing too much 
load on the cluster. We had a job that got up to 12GB in checkpoint size. 
Current cluster is 6x c3.2xlarge. The logs show a lot of 
"java.net.SocketException: Connection reset" when trying to write checkpoints 
to S3, as well as repeated disconnect/reconnect with Zookeeper "Client session 
timed out, have not heard from server in 28301ms for sessionid 
0x254bb682e214f79, closing socket connection and attempting reconnect", and 
things like "akka.pattern.AskTimeoutException: Ask timed out on 
[Actor[akka.tcp://flink@10.0.88.37:38768/user/taskmanager#-497097074]] after 
[1 ms]". Generally, it seems as if the network got overwhelmed.

-Shannon

From: Stephan Ewen <se...@apache.org<mailto:se...@apache.org>>
Date: Tuesday, January 24, 2017 at 8:30 AM
To: <user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Re: Flink snapshotting to S3 - Timeout waiting for connection from pool

Hi Shannon!

I was wondering if you still see this issue in Flink 1.1.4?

Just thinking that another possible cause for the issue could be that there is 
a connection leak somewhere (Flink code or user code or vendor library) and 
thus the S3 connector's connection pool starves.
For Flink 1.2, there is a safetynet that tracks and closes connections that go 
through Flink's filesystem abstraction. So that should not be an issue there 
any more.

Best,
Stephan



On Fri, Jan 13, 2017 at 1:04 AM, Shannon Carey 
<sca...@expedia.com<mailto:sca...@expedia.com>> wrote:
Good to know someone else has had the same problem... What did you do about it? 
Did it resolve on its own?

-Shannon




On 1/12/17, 11:55 AM, "Chen Qin" 
<qinnc...@gmail.com<mailto:qinnc...@gmail.com>> wrote:

>We have seen this issue back to Flink 1.0. Our finding back then was traffic 
>congestion to AWS in internal network. Many teams too dependent on S3 and 
>bandwidth is shared, cause traffic congestion from time to time.
>
>Hope it helps!
>
>Thanks
>Chen
>
>> On Jan 12, 2017, at 03:30, Ufuk Celebi 
>> <u...@apache.org<mailto:u...@apache.org>> wrote:
>>
>> Hey Shannon!
>>
>> Is this always reproducible and how long does it take to reproduce it?
>>
>> I've not seen this error before but as you say it indicates that some
>> streams are not closed.
>>
>> Did the jobs do any restarts before this happened? Flink 1.1.4
>> contains fixes for more robust releasing of resources in failure
>> scenarios. Is trying 1.1.4 an option?
>>
>> – Ufuk
>>
>>> On Thu, Jan 12, 2017 at 1:18 AM, Shannon Carey 
>>> <sca...@expedia.com<mailto:sca...@expedia.com>> wrote:
>>> I'm having pretty frequent issues with the exception below. It basically
>>> always ends up killing my cluster after forcing a large number of job
>>> restarts. I just can't keep Flink up & running.
>>>
>>> I am running Flink 1.1.3 on EMR 5.2.0. I already tried updating the
>>> emrfs-site config fs.s3.maxConnections from the default (50) to 75, after
>>> AWS support told me the name of the config option. However, that hasn't
>>> fixed the problem. Assuming that increasing the maxConnections again doesn't
>>> fix the problem, is there anything else I can do? Is anyone else having this
>>> problem? Is it possible that the state backend isn't properly calling
>>> close() on its filesystem objects? Or is there a large number of concurrent
>>> open filesystem objects for some reason? I am using the default
>>> checkpointing settings with one checkpoint at a time, checkpointing every 10
>>> minutes. If I am reading the metrics correctly, the checkpoint duration is
>>> between 12s and 3 minutes on one of the jobs, and 5s or less on the other 3.
>>> Any help is appreciated.
>>>
>>> java.lang.RuntimeException: Could not initialize state backend.
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.setup(AbstractStreamOperator.java:121)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.setup(AbstractUdfStreamOperator.java:82)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:276)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:212)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:271)
>>> at
>>> org.apache.flink.str

Re: Rapidly failing job eventually causes "Not enough free slots"

2017-01-20 Thread Shannon Carey
In fact, I can see all my job jar blobs and some checkpoint & job graph files 
in my configured "recovery.zookeeper.storageDir"… however for some reason it 
didn't get restored when my new Flink cluster started up.


From: Shannon Carey <sca...@expedia.com<mailto:sca...@expedia.com>>
Date: Friday, January 20, 2017 at 9:14 PM
To: "user@flink.apache.org<mailto:user@flink.apache.org>" 
<user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Re: Rapidly failing job eventually causes "Not enough free slots"

I recently added some better visibility into the metrics we're gathering from 
Flink. My Flink cluster died again due to the "Not enough free slots available 
to run the job" problem, and this time I can see that the number of registered 
task managers went down from 11 to 7, then waffled and only ever got back up to 
10 (one short of the requested amount) before dropping to 0 just before the 
cluster died. This would seem to explain why there weren't sufficient slots 
(given that we were probably using them all or nearly all)… The metric of 
"running jobs" went down from 5 to 3 during this time period as well. So the 
problem seems to be loss of taskmanagers due to errors (not yet sure what 
exactly as I have to delve into logs).

The other thing I have to figure out is restoring the jobs… I thought that HA 
would start the jobs back up again if Flink died & I re-launched it, but that 
doesn't appear to be the case.


From: Stephan Ewen <se...@apache.org<mailto:se...@apache.org>>
Date: Thursday, January 5, 2017 at 7:52 AM
To: <user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Re: Rapidly failing job eventually causes "Not enough free slots"

Another thought on the container failure:

in 1.1, the user code is loaded dynamically whenever a Task is started. That 
means that on every task restart the code is reloaded. For that to work proper, 
class unloading needs to happen, or the permgen will eventually overflow.

It can happen that class unloading is prevented if the user functions do leave 
references around as "GC roots", which may be threads, or references in 
registries, etc.

In Flink 1.2, YARN will put the user code into the application classpath, so 
code needs not be reloaded on every restart. That should solve that issue.
To "simulate" that behavior in Flink 1.1, put your application code jars into 
the "lib" folder

Best,
Stephan


On Thu, Jan 5, 2017 at 1:15 PM, Yury Ruchin 
<yuri.ruc...@gmail.com<mailto:yuri.ruc...@gmail.com>> wrote:
Hi,

I've faced a similar issue recently. Hope sharing my findings will help. The 
problem can be split into 2 parts:

Source of container failures
The logs you provided indicate that YARN kills its containers for exceeding 
memory limits. Important point here is that memory limit = JVM heap memory + 
off-heap memory. So if off-heap memory usage is high, YARN may kill containers 
despite JVM heap consumption is fine. To solve this issue, Flink reserves a 
share of container memory for off-heap memory. How much will be reserved is 
controlled by yarn.heap-cutoff-ratio and yarn.heap-cutoff-min configuration. By 
default 25% of the requested container memory will be reserved for off-heap. 
This is seems to be a good start, but one should experiment and tune to meet 
their job specifics.

It's also worthwhile to figure out who consumes off-heap memory. Is it Flink 
managed memory moved off heap (taskmanager.memory.off-heap = true)? Is it some 
external library allocating something off heap? Is it your own code?

How Flink handles task manager failures
Whenever a task manager fails, the Flink jobmanager decides whether it should:
- reallocate failed task manager container
- fail application entirely
These decisions can be guided by certain configuration 
(https://ci.apache.org/projects/flink/flink-docs-release-1.1/setup/yarn_setup.html#recovery-behavior-of-flink-on-yarn).
 With default settings, job manager does reallocate task manager containers up 
to the point when N failures have been observed, where N is the number of 
requested task managers. After that the application is stopped.

According to the logs, you have a finite number in 
yarn.maximum-failed-containers (11, as I can see from the logs - this may be 
set by Flink if not provided explicitly). On 12th container restart, jobmanager 
gives up and the application stops. I'm not sure why it keeps reporting not 
enough slots after that point. In my experience this may happen when job eats 
up all the available slots, so that after container failure its tasks cannot be 
restarted in other (live) containers. But I believe once the decision to stop 
the application is made, there should not be any further attempts to restart 
the job, hence no logs like those. Hopefully, someone else will explain this to 
us :)

In my case I 

Re: Rapidly failing job eventually causes "Not enough free slots"

2017-01-20 Thread Shannon Carey
I recently added some better visibility into the metrics we're gathering from 
Flink. My Flink cluster died again due to the "Not enough free slots available 
to run the job" problem, and this time I can see that the number of registered 
task managers went down from 11 to 7, then waffled and only ever got back up to 
10 (one short of the requested amount) before dropping to 0 just before the 
cluster died. This would seem to explain why there weren't sufficient slots 
(given that we were probably using them all or nearly all)… The metric of 
"running jobs" went down from 5 to 3 during this time period as well. So the 
problem seems to be loss of taskmanagers due to errors (not yet sure what 
exactly as I have to delve into logs).

The other thing I have to figure out is restoring the jobs… I thought that HA 
would start the jobs back up again if Flink died & I re-launched it, but that 
doesn't appear to be the case.


From: Stephan Ewen <se...@apache.org<mailto:se...@apache.org>>
Date: Thursday, January 5, 2017 at 7:52 AM
To: <user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Re: Rapidly failing job eventually causes "Not enough free slots"

Another thought on the container failure:

in 1.1, the user code is loaded dynamically whenever a Task is started. That 
means that on every task restart the code is reloaded. For that to work proper, 
class unloading needs to happen, or the permgen will eventually overflow.

It can happen that class unloading is prevented if the user functions do leave 
references around as "GC roots", which may be threads, or references in 
registries, etc.

In Flink 1.2, YARN will put the user code into the application classpath, so 
code needs not be reloaded on every restart. That should solve that issue.
To "simulate" that behavior in Flink 1.1, put your application code jars into 
the "lib" folder

Best,
Stephan


On Thu, Jan 5, 2017 at 1:15 PM, Yury Ruchin 
<yuri.ruc...@gmail.com<mailto:yuri.ruc...@gmail.com>> wrote:
Hi,

I've faced a similar issue recently. Hope sharing my findings will help. The 
problem can be split into 2 parts:

Source of container failures
The logs you provided indicate that YARN kills its containers for exceeding 
memory limits. Important point here is that memory limit = JVM heap memory + 
off-heap memory. So if off-heap memory usage is high, YARN may kill containers 
despite JVM heap consumption is fine. To solve this issue, Flink reserves a 
share of container memory for off-heap memory. How much will be reserved is 
controlled by yarn.heap-cutoff-ratio and yarn.heap-cutoff-min configuration. By 
default 25% of the requested container memory will be reserved for off-heap. 
This is seems to be a good start, but one should experiment and tune to meet 
their job specifics.

It's also worthwhile to figure out who consumes off-heap memory. Is it Flink 
managed memory moved off heap (taskmanager.memory.off-heap = true)? Is it some 
external library allocating something off heap? Is it your own code?

How Flink handles task manager failures
Whenever a task manager fails, the Flink jobmanager decides whether it should:
- reallocate failed task manager container
- fail application entirely
These decisions can be guided by certain configuration 
(https://ci.apache.org/projects/flink/flink-docs-release-1.1/setup/yarn_setup.html#recovery-behavior-of-flink-on-yarn).
 With default settings, job manager does reallocate task manager containers up 
to the point when N failures have been observed, where N is the number of 
requested task managers. After that the application is stopped.

According to the logs, you have a finite number in 
yarn.maximum-failed-containers (11, as I can see from the logs - this may be 
set by Flink if not provided explicitly). On 12th container restart, jobmanager 
gives up and the application stops. I'm not sure why it keeps reporting not 
enough slots after that point. In my experience this may happen when job eats 
up all the available slots, so that after container failure its tasks cannot be 
restarted in other (live) containers. But I believe once the decision to stop 
the application is made, there should not be any further attempts to restart 
the job, hence no logs like those. Hopefully, someone else will explain this to 
us :)

In my case I made jobmanager restart containers infinitely by setting 
yarn.maximum-failed-containers = -1, so that taskmanager failure never results 
in application death. Note this is unlikely a good choice for a batch job.

Regards,
Yury

2017-01-05 3:21 GMT+03:00 Shannon Carey 
<sca...@expedia.com<mailto:sca...@expedia.com>>:
In Flink 1.1.3 on emr-5.2.0, I've experienced a particular problem twice and 
I'm wondering if anyone has some insight about it.

In both cases, we deployed a job that fails very frequently (within 15s-1m of 
launch). Eventually, the Flink cluster dies.

Re: 1.1.1: JobManager config endpoint no longer supplies port

2017-01-17 Thread Shannon Carey
A followup (in case anyone is interested): we worked around this by making a 
request to the "/jars" endpoint of the UI. The response has an attribute called 
"address" which includes the DNS name and port where the UI is accessible.


Re: 1.1.4 on YARN - vcores change?

2017-01-13 Thread Shannon Carey
Ufuk & Robert,

There's a good chance you're right! On the EMR master node, where 
yarn-session.sh is run, /etc/hadoop/conf/yarn-site.xml says that 
"yarn.nodemanager.resource.cpu-vcores" is 4.


Meanwhile, on the core nodes, the value in that file is 8.





Shall I submit a JIRA? This might be pretty easy to fix given that 
"yarn-session.sh -q" already knows how to get the vcore count on the nodes. I 
can try to make a PR for it too. I'm still not sure why the containers are 
showing up as only using one vcore though... or if that is expected.

Meanwhile, it seems like overriding yarn.containers.vcores would be a 
successful workaround. Let me know if you disagree.

The other slightly annoying thing that I have to deal with is leaving enough 
memory for the JobManager. Since all task managers are the same size, I either 
need to reduce the size of every task manager (wasting resources), or I have to 
double the task managers (and halve the memory) & subtract one (basically 
doubling the number of separate JVMs & halving the slot density within the 
JVMs) in order to leave room for the JobManager. What do you guys think of the 
following change in approach?

User specifies:
number of taskmanagers
memory per slot (not per taskmanager)
total number of slots (not slots per taskmanager)

Then, Flink would decide how to organize the task managers & slots in order to 
also leave room for the JobManager. This should be straightforward compared to 
bin packing because all slots are the same size. Maybe I'm oversimplifying... 
might be a little tougher if the nodes are different sizes and we don't know on 
what node the ApplicationMaster/JobManager will run.

-Shannon

On 1/13/17, 2:59 AM, "Ufuk Celebi"  wrote:

>On Fri, Jan 13, 2017 at 9:57 AM, Robert Metzger  wrote:
>> Flink is reading the number of available vcores from the local YARN
>> configuration. Is it possible that the YARN / Hadoop config on the machine
>> where you are submitting your job from sets the number of vcores as 4 ?
>
>Shouldn't we retrieve this number from the cluster instead?
>


Re: Flink snapshotting to S3 - Timeout waiting for connection from pool

2017-01-12 Thread Shannon Carey
Good to know someone else has had the same problem... What did you do about it? 
Did it resolve on its own?

-Shannon




On 1/12/17, 11:55 AM, "Chen Qin" <qinnc...@gmail.com> wrote:

>We have seen this issue back to Flink 1.0. Our finding back then was traffic 
>congestion to AWS in internal network. Many teams too dependent on S3 and 
>bandwidth is shared, cause traffic congestion from time to time.
>
>Hope it helps!
>
>Thanks
>Chen
>
>> On Jan 12, 2017, at 03:30, Ufuk Celebi <u...@apache.org> wrote:
>> 
>> Hey Shannon!
>> 
>> Is this always reproducible and how long does it take to reproduce it?
>> 
>> I've not seen this error before but as you say it indicates that some
>> streams are not closed.
>> 
>> Did the jobs do any restarts before this happened? Flink 1.1.4
>> contains fixes for more robust releasing of resources in failure
>> scenarios. Is trying 1.1.4 an option?
>> 
>> – Ufuk
>> 
>>> On Thu, Jan 12, 2017 at 1:18 AM, Shannon Carey <sca...@expedia.com> wrote:
>>> I'm having pretty frequent issues with the exception below. It basically
>>> always ends up killing my cluster after forcing a large number of job
>>> restarts. I just can't keep Flink up & running.
>>> 
>>> I am running Flink 1.1.3 on EMR 5.2.0. I already tried updating the
>>> emrfs-site config fs.s3.maxConnections from the default (50) to 75, after
>>> AWS support told me the name of the config option. However, that hasn't
>>> fixed the problem. Assuming that increasing the maxConnections again doesn't
>>> fix the problem, is there anything else I can do? Is anyone else having this
>>> problem? Is it possible that the state backend isn't properly calling
>>> close() on its filesystem objects? Or is there a large number of concurrent
>>> open filesystem objects for some reason? I am using the default
>>> checkpointing settings with one checkpoint at a time, checkpointing every 10
>>> minutes. If I am reading the metrics correctly, the checkpoint duration is
>>> between 12s and 3 minutes on one of the jobs, and 5s or less on the other 3.
>>> Any help is appreciated.
>>> 
>>> java.lang.RuntimeException: Could not initialize state backend.
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.setup(AbstractStreamOperator.java:121)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.setup(AbstractUdfStreamOperator.java:82)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:276)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:212)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:271)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:212)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:271)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:212)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:271)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:212)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:105)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)
>>> at java.lang.Thread.run(Thread.java:745)
>>> Caused by:
>>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.AmazonClientException:
>>> Unable to execute HTTP request: Timeout waiting for connection from pool
>>> at
>>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:618)
>>> at
>>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.doExecute(AmazonHttpClient.java:376)
>>> at
>>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeWithTimer(AmazonHttpClient.java:338)
>>> at
>>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:287)
>>> at
>>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(Ama

1.1.4 on YARN - vcores change?

2017-01-12 Thread Shannon Carey
Did anything change in 1.1.4 with regard to YARN & vcores?

I'm getting this error when deploying 1.1.4 to my test cluster. Only the Flink 
version changed.

java.lang.RuntimeException: Couldn't deploy Yarn cluster
at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploy(AbstractYarnClusterDescriptor.java:384)
at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:591)
at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:465)
Caused by: org.apache.flink.configuration.IllegalConfigurationException: 
The number of virtual cores per node were configured with 8 but Yarn only has 4 
virtual cores available. Please note that the number of virtual cores is set to 
the number of task slots by default unless configured in the Flink config with 
'yarn.containers.vcores.'
at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.isReadyForDeployment(AbstractYarnClusterDescriptor.java:273)
at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:393)
at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploy(AbstractYarnClusterDescriptor.java:381)
... 2 more

When I run: ./bin/yarn-session.sh –q
It shows 8 vCores on each machine:


NodeManagers in the ClusterClient 3|Property |Value

+---+

|NodeID   |ip-10-2-…:8041

|Memory   |12288 MB

|vCores   |8

|HealthReport |

|Containers   |0

+---+

|NodeID   |ip-10-2-…:8041

|Memory   |12288 MB

|vCores   |8

|HealthReport |

|Containers   |0

+---+

|NodeID   |ip-10-2-…:8041

|Memory   |12288 MB

|vCores   |8

|HealthReport |

|Containers   |0

+---+

Summary: totalMemory 36864 totalCores 24

Queue: default, Current Capacity: 0.0 Max Capacity: 1.0 Applications: 0

I'm running:
./bin/yarn-session.sh –n 3 --jobManagerMemory 1504 --taskManagerMemory 10764 
--slots 8 —detached

I have not specified any value for "yarn.containers.vcores" in my config.

I switched to –n 5 and —slots 4, and halved the taskManagerMemory, which 
allowed the cluster to start.

However, in the YARN "Nodes" UI I see "VCores Used: 2" and "VCores Avail: 6" on 
all three nodes. And if I look at one of the Containers, it says, "Resource: 
5408 Memory, 1 VCores". I don't understand what's happening here.

Thanks…


Re: Flink snapshotting to S3 - Timeout waiting for connection from pool

2017-01-12 Thread Shannon Carey
I can't predict when it will occur, but usually it's after Flink has been 
running for at least a week.

Yes, I do believe we had several job restarts due to an exception due to a 
Cassandra node being down for maintenance and therefore a query failing to meet 
the QUORUM consistency level requested. I'm fixing the retry consistency logic 
there, but I'm sure we'll run into failing jobs again eventually.

I'm upgrading to 1.1.4 now, hopefully it will help.


-Shannon

On 1/12/17, 5:30 AM, "Ufuk Celebi" <u...@apache.org> wrote:

>Hey Shannon!
>
>Is this always reproducible and how long does it take to reproduce it?
>
>I've not seen this error before but as you say it indicates that some
>streams are not closed.
>
>Did the jobs do any restarts before this happened? Flink 1.1.4
>contains fixes for more robust releasing of resources in failure
>scenarios. Is trying 1.1.4 an option?
>
>– Ufuk
>
>On Thu, Jan 12, 2017 at 1:18 AM, Shannon Carey <sca...@expedia.com> wrote:
>> I'm having pretty frequent issues with the exception below. It basically
>> always ends up killing my cluster after forcing a large number of job
>> restarts. I just can't keep Flink up & running.
>>
>> I am running Flink 1.1.3 on EMR 5.2.0. I already tried updating the
>> emrfs-site config fs.s3.maxConnections from the default (50) to 75, after
>> AWS support told me the name of the config option. However, that hasn't
>> fixed the problem. Assuming that increasing the maxConnections again doesn't
>> fix the problem, is there anything else I can do? Is anyone else having this
>> problem? Is it possible that the state backend isn't properly calling
>> close() on its filesystem objects? Or is there a large number of concurrent
>> open filesystem objects for some reason? I am using the default
>> checkpointing settings with one checkpoint at a time, checkpointing every 10
>> minutes. If I am reading the metrics correctly, the checkpoint duration is
>> between 12s and 3 minutes on one of the jobs, and 5s or less on the other 3.
>> Any help is appreciated.
>>
>> java.lang.RuntimeException: Could not initialize state backend.
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.setup(AbstractStreamOperator.java:121)
>> at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.setup(AbstractUdfStreamOperator.java:82)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:276)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:212)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:271)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:212)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:271)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:212)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:271)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:212)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:105)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)
>> at java.lang.Thread.run(Thread.java:745)
>> Caused by:
>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.AmazonClientException:
>> Unable to execute HTTP request: Timeout waiting for connection from pool
>> at
>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:618)
>> at
>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.doExecute(AmazonHttpClient.java:376)
>> at
>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeWithTimer(AmazonHttpClient.java:338)
>> at
>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:287)
>> at
>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3826)
>> at
>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1015)
>> at
>> com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:22)
>> at
>> com.amazo

Flink snapshotting to S3 - Timeout waiting for connection from pool

2017-01-11 Thread Shannon Carey
I'm having pretty frequent issues with the exception below. It basically always 
ends up killing my cluster after forcing a large number of job restarts. I just 
can't keep Flink up & running.

I am running Flink 1.1.3 on EMR 5.2.0. I already tried updating the emrfs-site 
config fs.s3.maxConnections from the default (50) to 75, after AWS support told 
me the name of the config option. However, that hasn't fixed the problem. 
Assuming that increasing the maxConnections again doesn't fix the problem, is 
there anything else I can do? Is anyone else having this problem? Is it 
possible that the state backend isn't properly calling close() on its 
filesystem objects? Or is there a large number of concurrent open filesystem 
objects for some reason? I am using the default checkpointing settings with one 
checkpoint at a time, checkpointing every 10 minutes. If I am reading the 
metrics correctly, the checkpoint duration is between 12s and 3 minutes on one 
of the jobs, and 5s or less on the other 3. Any help is appreciated.

java.lang.RuntimeException: Could not initialize state backend.
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.setup(AbstractStreamOperator.java:121)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.setup(AbstractUdfStreamOperator.java:82)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:276)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:212)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:271)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:212)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:271)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:212)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:271)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:212)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:105)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)
at java.lang.Thread.run(Thread.java:745)
Caused by: 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.AmazonClientException: Unable 
to execute HTTP request: Timeout waiting for connection from pool
at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:618)
at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.doExecute(AmazonHttpClient.java:376)
at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeWithTimer(AmazonHttpClient.java:338)
at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:287)
at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3826)
at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1015)
at 
com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:22)
at 
com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:7)
at 
com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Executor.execute(GlobalS3Executor.java:75)
at 
com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.invoke(AmazonS3LiteClient.java:176)
at 
com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.getObjectMetadata(AmazonS3LiteClient.java:94)
at 
com.amazon.ws.emr.hadoop.fs.s3.lite.AbstractAmazonS3Lite.getObjectMetadata(AbstractAmazonS3Lite.java:39)
at 
com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:211)
at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy34.retrieveMetadata(Unknown Source)
at 
com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:764)
at 
com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.mkdir(S3NativeFileSystem.java:1169)
at 
com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.mkdirs(S3NativeFileSystem.java:1162)
at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1877)
at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.mkdirs(EmrFileSystem.java:399)
at 
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:429)
at 

Re: failure-rate restart strategy not working?

2017-01-06 Thread Shannon Carey
I think I figured it out: the problem is due to Flink's behavior when a job has 
checkpointing enabled.

When the job graph is created, if checkpointing is enabled but a restart 
strategy hasn't been programmatically configured, Flink changes the job graph's 
execution config to use the fixed delay restart strategy. That gets serialized 
with the job graph. Then, when the JobManager deserializes the execution 
config, it sees that there's a restart strategy configured for the job and uses 
that instead of using the restart strategy that's configured on the cluster.

Clearly, the documentation definitely needs to be adjusted. Maybe I can add 
some changes to https://github.com/apache/flink/pull/3059

However, should we also consider some implementation changes? Is it intentional 
that enabling checkpoint overrides the restart strategy set on the cluster, and 
that the only way to control the restart strategy on a checkpointed job is to 
set it programmatically? If not, then would it be reasonable to only set 
fixed-delay restart strategy if checkpointing is enabled AND the cluster 
doesn't explicitly configure it? Flink would no longer be use the execution 
config to control the strategy, but would instead do it in the 
JobManager#submitJob().

-Shannon

From: Shannon Carey <sca...@expedia.com<mailto:sca...@expedia.com>>
Date: Thursday, January 5, 2017 at 1:50 PM
To: "user@flink.apache.org<mailto:user@flink.apache.org>" 
<user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: failure-rate restart strategy not working?

I recently updated my cluster with the following config:

restart-strategy: failure-rate
restart-strategy.failure-rate.max-failures-per-interval: 3
restart-strategy.failure-rate.failure-rate-interval: 5 min
restart-strategy.failure-rate.delay: 10 s

I see the settings inside the JobManager web UI, as expected. I am not setting 
the restart-strategy programmatically, but the job does have checkpointing 
enabled.

However, if I launch a job that (intentionally) fails every 10 seconds by 
throwing a RuntimeException, it continues to restart beyond the limit of 3 
failures.

Does anyone know why this might be happening? Any ideas of things I could check?

Thanks!
Shannon


failure-rate restart strategy not working?

2017-01-05 Thread Shannon Carey
I recently updated my cluster with the following config:

restart-strategy: failure-rate
restart-strategy.failure-rate.max-failures-per-interval: 3
restart-strategy.failure-rate.failure-rate-interval: 5 min
restart-strategy.failure-rate.delay: 10 s

I see the settings inside the JobManager web UI, as expected. I am not setting 
the restart-strategy programmatically, but the job does have checkpointing 
enabled.

However, if I launch a job that (intentionally) fails every 10 seconds by 
throwing a RuntimeException, it continues to restart beyond the limit of 3 
failures.

Does anyone know why this might be happening? Any ideas of things I could check?

Thanks!
Shannon


Rapidly failing job eventually causes "Not enough free slots"

2017-01-04 Thread Shannon Carey
In Flink 1.1.3 on emr-5.2.0, I've experienced a particular problem twice and 
I'm wondering if anyone has some insight about it.

In both cases, we deployed a job that fails very frequently (within 15s-1m of 
launch). Eventually, the Flink cluster dies.

The sequence of events looks something like this:

  *   bad job is launched
  *   bad job fails & is restarted many times (I didn't have the "failure-rate" 
restart strategy configuration right)
  *   Task manager logs: org.apache.flink.yarn.YarnTaskManagerRunner (SIGTERM 
handler): RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested.
  *   At this point, the YARN resource manager also logs the container failure
  *   More logs: Container 
ResourceID{resourceId='container_1481658997383_0003_01_13'} failed. Exit 
status: Pmem limit exceeded (-104)
  *
Diagnostics for container 
ResourceID{resourceId='container_1481658997383_0003_01_13'} in state 
COMPLETE : exitStatus=Pmem limit exceeded (-104) diagnostics=Container 
[pid=21246,containerID=container_1481658997383_0003_01_13] is running 
beyond physical memory limits. Current usage: 5.6 GB of 5.6 GB physical memory 
used; 9.6 GB of 28.1 GB virtual memory used. Killing container.
Container killed on request. Exit code is 143
Container exited with a non-zero exit code 143
Total number of failed containers so far: 12
Stopping YARN session because the number of failed containers (12) exceeded the 
maximum failed containers (11). This number is controlled by the 
'yarn.maximum-failed-containers' configuration setting. By default its the 
number of requested containers.
  *   From here onward, the logs repeatedly show that jobs fail to restart due 
to "org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: 
Not enough free slots available to run the job. You can decrease the operator 
parallelism or increase the number of slots per TaskManager in the 
configuration. Task to schedule: < Attempt #68 (Source: …) @ (unassigned) - 
[SCHEDULED] > with groupID < 73191c171abfff61fb5102c161274145 > in sharing 
group < SlotSharingGroup [73191c171abfff61fb5102c161274145, 
19596f7834805c8409c419f0edab1f1b] >. Resources available to scheduler: Number 
of instances=0, total number of slots=0, available slots=0"
  *   Eventually, Flink stops for some reason (with another SIGTERM message), 
presumably because of YARN

Does anyone have an idea why a bad job repeatedly failing would eventually 
result in the Flink cluster dying?

Any idea why I'd get "Pmem limit exceeded" or "Not enough free slots available 
to run the job"? The JVM heap usage and the free memory on the machines both 
look reasonable in my monitoring dashboards. Could it possibly be a memory leak 
due to classloading or something?

Thanks for any help or suggestions you can provide! I am hoping that the 
"failure-rate" restart strategy will help avoid this issue in the future, but 
I'd also like to understand what's making the cluster die so that I can prevent 
it.

-Shannon


Re: Monitoring REST API

2016-12-27 Thread Shannon Carey
Although Flink exposes some metrics in the API/UI, it probably only does that 
because it was easy to do and convenient for users. However, I don't think 
Flink is intended to be a complete monitoring solution for your cluster.

Instead, you should take a look at collectd https://collectd.org/ which is 
meant for monitoring OS-level metrics and has, for example, a Graphite plugin 
which you can use to write to a Graphite server or statsd instance… or you can 
integrate it some other way depending on what you have & what you want.

-Shannon

From: Lydia Ickler >
Date: Wednesday, December 21, 2016 at 12:55 PM
To: >
Subject: Monitoring REST API

Hi all,

I have a question regarding the Monitoring REST API;

I want to analyze the behavior of my program with regards to I/O MiB/s, Network 
MiB/s and CPU % as the authors of this paper did. 
(https://hal.inria.fr/hal-01347638v2/document)
From the JSON file at http:master:8081/jobs/jobid/ I get a summary including 
the information of read/write records and read/write bytes.
Unfortunately the entries of Network or CPU are either (unknown) or 0.0. I am 
running my program on a cluster with up to 32 nodes.

Where can I find the values for e.g. CPU or Network?

Thanks in advance!
Lydia



Re: Monitoring Flink on Yarn

2016-12-19 Thread Shannon Carey
Check out Logstash or Splunk. Those can pipe your logs into an external 
database which can be used by a purpose-built UI for examining logs, and as a 
result it doesn't matter if the original files or machines are still around or 
not.


From: Lydia Ickler >
Date: Monday, December 19, 2016 at 7:38 AM
To: >
Subject: Monitoring Flink on Yarn

Hi all,

I am using Flink 1.1.3 on Yarn and I wanted to ask how I can save the 
monitoring logs, e.g. for I/O or network, to HDFS or local FS?
Since Yarn closes the Flink session after finishing the job I can't access the 
log via REST API.

I am looking forward to your answer!
Best regards,
Lydia


Re: How to retrieve values from yarn.taskmanager.env in a Job?

2016-12-13 Thread Shannon Carey
Till,

Unfortunately, System.getenv() doesn't contain the expected variable even 
within the UDFs, but thanks for the info!

In the Yarn setting, "the client" would be either:

  1.  the bin/flink executable (with configuration based on where it's run 
from… which might not be the same as the destination Flink cluster) OR
  2.  the web UI… the job planning runs in the existing JVM of the web UI? That 
runs as part of the Job Manager, right? This is the primary method by which we 
launch jobs, currently.

Is that right?

I will try out "env.java.opts" to see if that has any effect.

-Shannon

From: Till Rohrmann <till.rohrm...@gmail.com<mailto:till.rohrm...@gmail.com>>
Date: Tuesday, December 13, 2016 at 4:34 AM
To: <user@flink.apache.org<mailto:user@flink.apache.org>>
Cc: Chesnay Schepler <ches...@apache.org<mailto:ches...@apache.org>>
Subject: Re: How to retrieve values from yarn.taskmanager.env in a Job?

Hi Shannon,

the job graph generation does not run in the task manager but on the client. 
The job graph is then submitted to the JobManager which then will deploy the 
individual tasks to the TaskManager. Thus, when generating the job graph the 
task manager environment variables are not accessible.

Thus, you are only able to access these environment variables from within your 
UDFs.

What you could do is to union all configuration objects and then reading only 
those entries relevant for a specific environment on the task manager, e.g. 
open method of a RichFunction.

Cheers,
Till

On Mon, Dec 12, 2016 at 7:12 PM, Shannon Carey 
<sca...@expedia.com<mailto:sca...@expedia.com>> wrote:
Hi Chesnay,

Since that configuration option is supposed to apply the environment variables 
to the task managers, I figured it would definitely be available within the 
stream operators. I'm not sure whether the job plan runs within a task manager 
or not, but hopefully it does?

In my particular code, I want to get the name of the environment in order to 
read the correct configuration file(s) so that properly populated config 
objects can be passed to various operators. Therefore, it would be sufficient 
for the job plan execution to have access to the environment. All the operators 
are capable of persisting any necessary configuration through serialization.

It really can work either way, but I think it'd be easiest if it was available 
everywhere. If it's only available during job planning then you have to make 
sure to serialize it everywhere you need it, and if it's only available during 
operator execution then it's less straightforward to do central configuration 
work. Either way it's lying in wait for a programmer to forget where it's 
accessible vs. not.

-Shannon

From: Chesnay Schepler <ches...@apache.org<mailto:ches...@apache.org>>
Date: Monday, December 12, 2016 at 7:36 AM
To: <user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Re: How to retrieve values from yarn.taskmanager.env in a Job?

Hello,

can you clarify one small thing for me: Do you want to access this parameter 
when you define the plan
(aka when you call methods on the StreamExecutionEnvironment or DataStream 
instances)
or from within your functions/operators?

Regards,
Chesnay Schepler


On 12.12.2016 14:21, Till Rohrmann wrote:

Hi Shannon,

have you tried accessing the environment variables via System.getenv()? This 
should give you a map of string-string key value pairs where the key is the 
environment variable name.

If your values are not set in the returned map, then this indicates a bug in 
Flink and it would be great if you could open a JIRA issue.

Cheers,
Till

​

On Fri, Dec 9, 2016 at 7:33 PM, Shannon Carey 
<sca...@expedia.com<mailto:sca...@expedia.com>> wrote:
This thread 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/passing-environment-variables-to-flink-program-td3337.html
 describes the impetus for the addition of yarn.taskmanager.env.

I have configured a value within yarn.taskmanager.env, and I see it appearing 
in the Flink web UI in the list underneath Job Manager -> Configuration. 
However, I can't figure out how to retrieve the value from within a Flink job. 
It doesn't appear in the environment, the system properties, or my 
ParameterTool instance, and I can't figure out how I would get to it via the 
StreamExecutionEnvironment. Can anyone point me in the right direction?

All I want to do is inform my Flink jobs which environment they're running on, 
so that programmers don't have to specify the environment as a job parameter 
every time they run it. I also see that there is a "env.java.opts" 
configuration… does that work in YARN apps (would my jobs be able to see it?)

Thanks!
Shannon





Re: How to retrieve values from yarn.taskmanager.env in a Job?

2016-12-12 Thread Shannon Carey
Hi Till,

Yes, System.getenv() was the first thing I tried. It'd be great if someone else 
can reproduce the issue, but for now I'll submit a JIRA with the assumption 
that it really is not working right. 
https://issues.apache.org/jira/browse/FLINK-5322

-Shannon

From: Till Rohrmann <trohrm...@apache.org<mailto:trohrm...@apache.org>>
Date: Monday, December 12, 2016 at 7:21 AM
To: <user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Re: How to retrieve values from yarn.taskmanager.env in a Job?


Hi Shannon,

have you tried accessing the environment variables via System.getenv()? This 
should give you a map of string-string key value pairs where the key is the 
environment variable name.

If your values are not set in the returned map, then this indicates a bug in 
Flink and it would be great if you could open a JIRA issue.

Cheers,
Till

​

On Fri, Dec 9, 2016 at 7:33 PM, Shannon Carey 
<sca...@expedia.com<mailto:sca...@expedia.com>> wrote:
This thread 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/passing-environment-variables-to-flink-program-td3337.html
 describes the impetus for the addition of yarn.taskmanager.env.

I have configured a value within yarn.taskmanager.env, and I see it appearing 
in the Flink web UI in the list underneath Job Manager -> Configuration. 
However, I can't figure out how to retrieve the value from within a Flink job. 
It doesn't appear in the environment, the system properties, or my 
ParameterTool instance, and I can't figure out how I would get to it via the 
StreamExecutionEnvironment. Can anyone point me in the right direction?

All I want to do is inform my Flink jobs which environment they're running on, 
so that programmers don't have to specify the environment as a job parameter 
every time they run it. I also see that there is a "env.java.opts" 
configuration… does that work in YARN apps (would my jobs be able to see it?)

Thanks!
Shannon



Re: How to retrieve values from yarn.taskmanager.env in a Job?

2016-12-12 Thread Shannon Carey
Hi Chesnay,

Since that configuration option is supposed to apply the environment variables 
to the task managers, I figured it would definitely be available within the 
stream operators. I'm not sure whether the job plan runs within a task manager 
or not, but hopefully it does?

In my particular code, I want to get the name of the environment in order to 
read the correct configuration file(s) so that properly populated config 
objects can be passed to various operators. Therefore, it would be sufficient 
for the job plan execution to have access to the environment. All the operators 
are capable of persisting any necessary configuration through serialization.

It really can work either way, but I think it'd be easiest if it was available 
everywhere. If it's only available during job planning then you have to make 
sure to serialize it everywhere you need it, and if it's only available during 
operator execution then it's less straightforward to do central configuration 
work. Either way it's lying in wait for a programmer to forget where it's 
accessible vs. not.

-Shannon

From: Chesnay Schepler <ches...@apache.org<mailto:ches...@apache.org>>
Date: Monday, December 12, 2016 at 7:36 AM
To: <user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Re: How to retrieve values from yarn.taskmanager.env in a Job?

Hello,

can you clarify one small thing for me: Do you want to access this parameter 
when you define the plan
(aka when you call methods on the StreamExecutionEnvironment or DataStream 
instances)
or from within your functions/operators?

Regards,
Chesnay Schepler


On 12.12.2016 14:21, Till Rohrmann wrote:

Hi Shannon,

have you tried accessing the environment variables via System.getenv()? This 
should give you a map of string-string key value pairs where the key is the 
environment variable name.

If your values are not set in the returned map, then this indicates a bug in 
Flink and it would be great if you could open a JIRA issue.

Cheers,
Till

​

On Fri, Dec 9, 2016 at 7:33 PM, Shannon Carey 
<sca...@expedia.com<mailto:sca...@expedia.com>> wrote:
This thread 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/passing-environment-variables-to-flink-program-td3337.html
 describes the impetus for the addition of yarn.taskmanager.env.

I have configured a value within yarn.taskmanager.env, and I see it appearing 
in the Flink web UI in the list underneath Job Manager -> Configuration. 
However, I can't figure out how to retrieve the value from within a Flink job. 
It doesn't appear in the environment, the system properties, or my 
ParameterTool instance, and I can't figure out how I would get to it via the 
StreamExecutionEnvironment. Can anyone point me in the right direction?

All I want to do is inform my Flink jobs which environment they're running on, 
so that programmers don't have to specify the environment as a job parameter 
every time they run it. I also see that there is a "env.java.opts" 
configuration… does that work in YARN apps (would my jobs be able to see it?)

Thanks!
Shannon




Re: Reg. custom sinks in Flink

2016-12-09 Thread Shannon Carey
You haven't really added a sink in Flink terminology, you're just performing a 
side effect within a map operator. So while it may work, if you want to add a 
sink proper you need have an object that extends SinkFunction or 
RichSinkFunction. The method call on the stream should be ".addSink(…)".

Also, the dbSession isn't really Flink state as it will not vary based on the 
position in or content in the stream. It's a necessary helper object, yes, but 
you don't need Flink to checkpoint it.

You can still use the sinks provided with flink-connector-cassandra and 
customize the cluster building by passing your own ClusterBuilder into the 
constructor.

-Shannon

From: Meghashyam Sandeep V 
>
Date: Friday, December 9, 2016 at 12:26 PM
To: >, 
>
Subject: Reg. custom sinks in Flink

Hi there,

I have a flink streaming app where my source is Kafka and a custom sink to 
Cassandra(I can't use standard C* sink that comes with flink as I have 
customized auth to C*). I'm currently have the following:


messageStream
.rebalance()

.map( s-> {

return mapper.readValue(s, JsonNode.class);)

.filter(//filter some messages)

.map(

 (MapFunction) message -> {

 getDbSession.execute("QUERY_TO_EXEC")

 })

private static Session getDbSession() {
if(dbSession == null && store!=null) {
dbSession = getSession();
}

return dbSession;
}

1. Is this the right way to add a custom sink? As you can see, I have dbSession 
as a class variable here and I'm storing its state.

2. This setup works fine in a standalone flink (java -jar MyJar.jar). When I 
run using flink with YARN on EMR I get a NPE at the session which is kind of 
weird.


Thanks


How to retrieve values from yarn.taskmanager.env in a Job?

2016-12-09 Thread Shannon Carey
This thread 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/passing-environment-variables-to-flink-program-td3337.html
 describes the impetus for the addition of yarn.taskmanager.env.

I have configured a value within yarn.taskmanager.env, and I see it appearing 
in the Flink web UI in the list underneath Job Manager -> Configuration. 
However, I can't figure out how to retrieve the value from within a Flink job. 
It doesn't appear in the environment, the system properties, or my 
ParameterTool instance, and I can't figure out how I would get to it via the 
StreamExecutionEnvironment. Can anyone point me in the right direction?

All I want to do is inform my Flink jobs which environment they're running on, 
so that programmers don't have to specify the environment as a job parameter 
every time they run it. I also see that there is a "env.java.opts" 
configuration… does that work in YARN apps (would my jobs be able to see it?)

Thanks!
Shannon


Re: Flink survey by data Artisans

2016-11-18 Thread Shannon Carey
There's a newline that disrupts the URL.

http://www.surveygizmo.com/s3/3166399/181bdb611f22

Not:

http://www.surveygizmo.com/s3/
3166399/181bdb611f22



Re: Can we do batch writes on cassandra using flink while leveraging the locality?

2016-10-27 Thread Shannon Carey
It certainly seems possible to write a Partitioner that does what you describe. 
I started implementing one but didn't have time to finish it. I think the main 
difficulty is in properly dealing with partition ownership changes in 
Cassandra… if you are maintaining state in Flink and the partitioning changes, 
your job might produce inaccurate output. If, on the other hand, you are only 
using the partitioner just before the output, dynamic partitioning changes 
might be ok.


From: kant kodali >
Date: Thursday, October 27, 2016 at 3:17 AM
To: >
Subject: Can we do batch writes on cassandra using flink while leveraging the 
locality?

locality? For example the batch writes in Cassandra will put pressure on the 
coordinator but since the connectors are built by leveraging the locality I was 
wondering if we could do batch of writes on a node where the batch belongs?


Re: [DISCUSS] Deprecate Hadoop source method from (batch) ExecutionEnvironment

2016-10-14 Thread Shannon Carey
Yep!

From: Fabian Hueske <fhue...@gmail.com<mailto:fhue...@gmail.com>>
Date: Friday, October 14, 2016 at 11:00 AM
To: Shannon Carey <sca...@expedia.com<mailto:sca...@expedia.com>>
Cc: "user@flink.apache.org<mailto:user@flink.apache.org>" 
<user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Re: [DISCUSS] Deprecate Hadoop source method from (batch) 
ExecutionEnvironment

Hi Shannon,

the plan is as follows:

We will keep the methods as they are for 1.2 but deprecate them and at the same 
time we will add alternatives in an optional dependency.
In a later release, the deprecated methods will be removed and everybody has to 
switch to the optional dependency.

Does that work for you?

Best, Fabian

2016-10-14 17:30 GMT+02:00 Shannon Carey 
<sca...@expedia.com<mailto:sca...@expedia.com>>:
Speaking as a user, if you are suggesting that you will retain the 
functionality but move the methods to an optional dependency, it makes sense to 
me. We have used the Hadoop integration for AvroParquetInputFormat and 
CqlBulkOutputFormat in Flink (although we won't be using CqlBulkOutputFormat 
any longer because it doesn't seem to be reliable).

-Shannon

From: Fabian Hueske <fhue...@gmail.com<mailto:fhue...@gmail.com>>
Date: Friday, October 14, 2016 at 4:29 AM
To: <user@flink.apache.org<mailto:user@flink.apache.org>>, 
"d...@flink.apache.org<mailto:d...@flink.apache.org>" 
<d...@flink.apache.org<mailto:d...@flink.apache.org>>
Subject: [DISCUSS] Deprecate Hadoop source method from (batch) 
ExecutionEnvironment

Hi everybody,

I would like to propose to deprecate the utility methods to read data with 
Hadoop InputFormats from the (batch) ExecutionEnvironment.

The motivation for deprecating these methods is reduce Flink's dependency on 
Hadoop but rather have Hadoop as an optional dependency for users that actually 
need it (HDFS, MapRed-Compat, ...). Eventually, we want to have Flink 
distribution that does not have a hard Hadoop dependency.

One step for this is to remove the Hadoop dependency from flink-java (Flink's 
Java DataSet API) which is currently required due to the above utility methods 
(see FLINK-4315). We recently received a PR that addresses FLINK-4315 and 
removes the Hadoop methods from the ExecutionEnvironment. After some 
discussion, it was decided to defer the PR to Flink 2.0 because it breaks the 
API (these methods are delared @PublicEvolving).

I propose to accept this PR for Flink 1.2, but instead of removing the methods 
deprecating them.
This would help to migrate old code and prevent new usage of these methods.
For a later Flink release (1.3 or 2.0) we could remove these methods and the 
Hadoop dependency on flink-java.

What do others think?

Best, Fabian



Re: [DISCUSS] Deprecate Hadoop source method from (batch) ExecutionEnvironment

2016-10-14 Thread Shannon Carey
Speaking as a user, if you are suggesting that you will retain the 
functionality but move the methods to an optional dependency, it makes sense to 
me. We have used the Hadoop integration for AvroParquetInputFormat and 
CqlBulkOutputFormat in Flink (although we won't be using CqlBulkOutputFormat 
any longer because it doesn't seem to be reliable).

-Shannon

From: Fabian Hueske >
Date: Friday, October 14, 2016 at 4:29 AM
To: >, 
"d...@flink.apache.org" 
>
Subject: [DISCUSS] Deprecate Hadoop source method from (batch) 
ExecutionEnvironment

Hi everybody,

I would like to propose to deprecate the utility methods to read data with 
Hadoop InputFormats from the (batch) ExecutionEnvironment.

The motivation for deprecating these methods is reduce Flink's dependency on 
Hadoop but rather have Hadoop as an optional dependency for users that actually 
need it (HDFS, MapRed-Compat, ...). Eventually, we want to have Flink 
distribution that does not have a hard Hadoop dependency.

One step for this is to remove the Hadoop dependency from flink-java (Flink's 
Java DataSet API) which is currently required due to the above utility methods 
(see FLINK-4315). We recently received a PR that addresses FLINK-4315 and 
removes the Hadoop methods from the ExecutionEnvironment. After some 
discussion, it was decided to defer the PR to Flink 2.0 because it breaks the 
API (these methods are delared @PublicEvolving).

I propose to accept this PR for Flink 1.2, but instead of removing the methods 
deprecating them.
This would help to migrate old code and prevent new usage of these methods.
For a later Flink release (1.3 or 2.0) we could remove these methods and the 
Hadoop dependency on flink-java.

What do others think?

Best, Fabian


mapreduce.HadoopOutputFormat config value issue

2016-10-11 Thread Shannon Carey
In Flink 1.1.1, I am seeing what looks like a serialization issue of 
org.apache.hadoop.conf.Configuration or when used with 
org.apache.flink.api.scala.hadoop.mapreduce.HadoopOutputFormat. When I use the 
mapred.HadoopOutputFormat version, it works just fine.

Specifically, the job fails because "java.lang.UnsupportedOperationException: 
You must set the ColumnFamily schema using setColumnFamilySchema." I am 
definitely setting that property, and it appears to be getting serialized, but 
when the config deserializes the setting is gone. Anybody have any ideas? In 
the meantime, I will continue using the "mapred" package.

Stack trace:
java.lang.UnsupportedOperationException: You must set the ColumnFamily schema 
using setColumnFamilySchema.
at 
org.apache.cassandra.hadoop.cql3.CqlBulkOutputFormat.getColumnFamilySchema(CqlBulkOutputFormat.java:184)
at 
org.apache.cassandra.hadoop.cql3.CqlBulkRecordWriter.setConfigs(CqlBulkRecordWriter.java:94)
at 
org.apache.cassandra.hadoop.cql3.CqlBulkRecordWriter.(CqlBulkRecordWriter.java:74)
at 
org.apache.cassandra.hadoop.cql3.CqlBulkOutputFormat.getRecordWriter(CqlBulkOutputFormat.java:86)
at 
org.apache.cassandra.hadoop.cql3.CqlBulkOutputFormat.getRecordWriter(CqlBulkOutputFormat.java:52)
at 
org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormatBase.open(HadoopOutputFormatBase.java:146)
at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:176)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)


Code that works:

val insertStmt = s"INSERT INTO $keyspace.$colFamily (user_id, updated_time, 
value) VALUES (?, ?, ?)"
val config = new JobConf()

ConfigHelper.setOutputInitialAddress(config, 
initialOutputAddress.getHostAddress)

CqlBulkOutputFormat.setColumnFamilySchema(config, colFamily, cqlTableSchema)
CqlBulkOutputFormat.setColumnFamilyInsertStatement(config, colFamily, 
insertStmt)
CqlBulkOutputFormat.setDeleteSourceOnSuccess(config, true)
ConfigHelper.setOutputColumnFamily(config,
  keyspace,
  colFamily)
ConfigHelper.setOutputPartitioner(config, partitionerClass)

val outputFormat = new mapred.HadoopOutputFormat[Object, 
java.util.List[ByteBuffer]](
  new CqlBulkOutputFormat,
  config)

Code that doesn't work:

val insertStmt = s"INSERT INTO $keyspace.$colFamily (user_id, updated_time, 
value) VALUES (?, ?, ?)"
val config = new Configuration()

ConfigHelper.setOutputInitialAddress(config, 
initialOutputAddress.getHostAddress)

CqlBulkOutputFormat.setColumnFamilySchema(config, colFamily, cqlTableSchema)
CqlBulkOutputFormat.setColumnFamilyInsertStatement(config, colFamily, 
insertStmt)
CqlBulkOutputFormat.setDeleteSourceOnSuccess(config, true)
ConfigHelper.setOutputColumnFamily(config,
  keyspace,
  colFamily)
ConfigHelper.setOutputPartitioner(config, partitionerClass)

val hadoopJob: Job = Job.getInstance(config)

val outputFormat = new mapreduce.HadoopOutputFormat[Object, 
java.util.List[ByteBuffer]](
  new CqlBulkOutputFormat,
  hadoopJob)



Re: Error while adding data to RocksDB: No more bytes left

2016-09-30 Thread Shannon Carey
Implementing a custom serialization approach with Flink's CopyableValue 
(instead of relying on Flink to automatically use Kryo) solved the issue. As a 
side benefit, this also reduced the serialized size of my object by about half.


From: Stephan Ewen <se...@apache.org<mailto:se...@apache.org>>
Date: Friday, September 30, 2016 at 3:58 AM
To: <user@flink.apache.org<mailto:user@flink.apache.org>>
Cc: Stephan Ewen <se...@apache.org<mailto:se...@apache.org>>
Subject: Re: Error while adding data to RocksDB: No more bytes left

Agree with Stefan, let's see if the fully async snapshot mode helps. It looks 
suspiciously RocksDB related...

On Fri, Sep 30, 2016 at 10:30 AM, Stefan Richter 
<s.rich...@data-artisans.com<mailto:s.rich...@data-artisans.com>> wrote:
Hi Shannon,

from your new stack trace and the bogus class names, I agree with Stephan that 
either serialization or the database itself is corrupted in some way. Could you 
please check if this problem only happens if checkpointing is enabled? If yes, 
does switching to fully async snapshots change the behavior?

Best,
Stefan

Am 29.09.2016 um 21:12 schrieb Shannon Carey 
<sca...@expedia.com<mailto:sca...@expedia.com>>:

Hi Stephan!

The failure appeared to occur every 10 minutes, which is also the interval for 
checkpointing. However, I agree with you that the stack trace appears to be 
independent. Could this perhaps be an issue with multithreading, where the 
checkpoint mechanism is somehow interfering with ongoing operation of the state 
backend? I've never seen this problem until now, so I am a little suspicious 
that it might be due to something in my code, but so far it's been difficult to 
figure out what that might be.

I am using the default, SemiAsync snapshot mode.

The classes of the data flow are a bit too large to put here in their entirety. 
We are using Scala case classes, Java classes generated by Avro, Tuples, Scala 
Option, java.util.UUID and Scala mutable.Map. The majority of these classes 
have been operational in other jobs before. I added a unit test for the class 
which contains a mutable.Map to see whether that was causing a problem. Does 
this look like a reasonable unit test to verify Flink serializability to you?

it("roundtrip serializes in Flink") {
  val millis: Long = TimeUnit.DAYS.toMillis(2)
  val original: PreferredAirportDailySum = new PreferredAirportDailySum(millis)
  original.add("a", TimestampedAirportCount(4, 6))
  original.add("b", TimestampedAirportCount(7, 8))

  val deserialized: PreferredAirportDailySum = serializationRoundTrip(original, 
100)

  deserialized.timestamp shouldBe millis
  deserialized.immutableItems("a") shouldBe TimestampedAirportCount(4, 6)
  deserialized.immutableItems("b") shouldBe TimestampedAirportCount(7, 8)
}

def serializationRoundTrip[T : ClassTag : TypeInformation](original: T, 
expectedMaxBytes: Int): T = {
  val typeInfo = implicitly[TypeInformation[T]]
  val serializer: TypeSerializer[T] = typeInfo.createSerializer(new 
ExecutionConfig)

  val out: ByteArrayOutputStream = new ByteArrayOutputStream(expectedMaxBytes)
  val outputView: DataOutputView = new DataOutputViewStreamWrapper(out)
  serializer.serialize(original, outputView)

  out.size() should be <= expectedMaxBytes

  val inputView: DataInputViewStreamWrapper =
new DataInputViewStreamWrapper(new ByteArrayInputStream(out.toByteArray))
  val deserialized: T = serializer.deserialize(inputView)

  deserialized
}

I tried running my job in a local one-slot cluster with RocksDB enabled but 
checkpointing to local filesystem. Similar errors occur, but are more sporadic. 
I have not yet been able to capture the error while debugging, but if I do I 
will provide additional information.

I noticed that locally, execution only reaches 
DefaultClassResolver#readName(Input)->Class.forName() when a checkpoint 
completes. Also, the timing of checkpointing a bit odd: in the example below 
the checkpoint takes 200s to complete after being triggered even though RocksDB 
reports that it only took ~100ms.

2016-09-29 12:56:17,619 INFO  CheckpointCoordinator - Triggering checkpoint 
2 @ 1475171777619
2016-09-29 12:59:38,079 INFO  RocksDBStateBackend  - RocksDB 
(/var/folders/…./WindowOperator_38_0/dummy_state/730773a2-bb33-4021-aa9e-9b4e3cb172f3/db)
 backup (synchronous part) took 7 ms.
2016-09-29 12:59:38,214 INFO  RocksDBStateBackend  - RocksDB materialization 
from 
/var/folders/…/WindowOperator_38_0/dummy_state/730773a2-bb33-4021-aa9e-9b4e3cb172f3/local-chk-2
 to file:/var/flinkstate/…/WindowOperator_38_0/dummy_state/chk-2 (asynchronous 
part) took 96 ms.
2016-09-29 12:59:38,333 INFO  CheckpointCoordinator - Completed checkpoint 
2 (in 200621 ms)

Do you have any other advice?

Exceptions from local execution:

java.lang.RuntimeException: Error while addi

Re: Exceptions from collector.collect after cancelling job

2016-09-30 Thread Shannon Carey
My flat map function is catching & logging the exception. The try block happens 
to encompass the call to Collector#collect().

I will move the call to collect outside of the try. That should silence the log 
message.




On 9/30/16, 3:51 AM, "Ufuk Celebi" <u...@apache.org> wrote:

>On Thu, Sep 29, 2016 at 9:29 PM, Shannon Carey <sca...@expedia.com> wrote:
>> It looks like Flink is disabling the objects that the FlatMap collector
>> relies on before disabling the operator itself. Is that expected/normal? Is
>> there anything I should change in my FlatMap function or job code to account
>> for it?
>
>Hey Shannon,
>
>Flink actually does cancel the tasks *before* cleaning up the network
>resources that throw the root Exception here.
>
>We actually don't log any Exceptions that are thrown during
>cancellation, because it is possible that the user code/operator use
>the closed resources concurrently with cancellation (your stack traces
>essentially), but it looks like in some places we don't respect this.
>
>Can you tell which classes actually log this? Would be good to fix
>this if possible as it is very confusing and looks quite bad. I don't
>expect it to be an actual problem though.
>
>– Ufuk
>


Exceptions from collector.collect after cancelling job

2016-09-29 Thread Shannon Carey
When I cancel a job, I get many exceptions that look like this:

java.lang.RuntimeException: Could not forward element to next operator
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:358)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329)
at 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
... lots of Flink and user code (a flat map function) stack entries here
at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:225)
at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.run(Kafka09Fetcher.java:239)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Buffer pool is destroyed.
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:81)
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:39)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329)
at 
org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator.processElement(TimestampsAndPeriodicWatermarksOperator.java:66)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:373)
... 43 more
Caused by: java.lang.IllegalStateException: Buffer pool is destroyed.
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:144)
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:93)
at 
org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:86)
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:78)
... 48 more

It looks like Flink is disabling the objects that the FlatMap collector relies 
on before disabling the operator itself. Is that expected/normal? Is there 
anything I should change in my FlatMap function or job code to account for it?

-Shannon


Re: Error while adding data to RocksDB: No more bytes left

2016-09-29 Thread Shannon Carey
ava.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at 
com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136)
... 52 more



From: Stephan Ewen <se...@apache.org<mailto:se...@apache.org>>
Date: Wednesday, September 28, 2016 at 1:18 PM
To: <user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Re: Error while adding data to RocksDB: No more bytes left

Hi Shannon!

The stack trace you pasted is independent of checkpointing - it seems to be 
from the regular processing. Does this only happen when checkpoints are 
activated?

Can you also share which checkpoint method you use?
  - FullyAsynchronous
  - SemiAsynchronous

I think there are two possibilities for what can happen
  - There is a serialization inconsistency in the Serializers. If that is the 
case, this error should occur almost in a deterministic fashion. To debug that, 
would be good to know which data types you are using.
  - There is a bug in RocksDB (or Flink's wrapping of it) where data gets 
corrupted when using the snapshot feature. That would explain why this only 
occurs when checkpoints are happening.

Greetings,
Stephan


On Wed, Sep 28, 2016 at 7:28 PM, Shannon Carey 
<sca...@expedia.com<mailto:sca...@expedia.com>> wrote:
It appears that when one of my jobs tries to checkpoint, the following 
exception is triggered. I am using Flink 1.1.1 in Scala 2.11. RocksDB 
checkpoints are being saved to S3.

java.lang.RuntimeException: Error while adding data to RocksDB
at 
org.apache.flink.contrib.streaming.state.RocksDBFoldingState.add(RocksDBFoldingState.java:125)
at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:382)
at 
org.apache.flink.streaming.runtime.io<http://runtime.io>.StreamInputProcessor.processInput(StreamInputProcessor.java:176)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:66)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:266)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.EOFException: No more bytes left.
at 
org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:77)
at 
com.esotericsoftware.kryo.io<http://com.esotericsoftware.kryo.io>.Input.readVarLong(Input.java:690)
at 
com.esotericsoftware.kryo.io<http://com.esotericsoftware.kryo.io>.Input.readLong(Input.java:685)
at 
com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeLongField.read(UnsafeCacheFields.java:160)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:232)
at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:113)
at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:30)
at 
org.apache.flink.contrib.streaming.state.RocksDBFoldingState.add(RocksDBFoldingState.java:118)
... 6 more

Thanks for any help!

Shannon



Error while adding data to RocksDB: No more bytes left

2016-09-28 Thread Shannon Carey
It appears that when one of my jobs tries to checkpoint, the following 
exception is triggered. I am using Flink 1.1.1 in Scala 2.11. RocksDB 
checkpoints are being saved to S3.

java.lang.RuntimeException: Error while adding data to RocksDB
at 
org.apache.flink.contrib.streaming.state.RocksDBFoldingState.add(RocksDBFoldingState.java:125)
at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:382)
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:176)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:66)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:266)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.EOFException: No more bytes left.
at 
org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:77)
at com.esotericsoftware.kryo.io.Input.readVarLong(Input.java:690)
at com.esotericsoftware.kryo.io.Input.readLong(Input.java:685)
at 
com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeLongField.read(UnsafeCacheFields.java:160)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:232)
at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:113)
at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:30)
at 
org.apache.flink.contrib.streaming.state.RocksDBFoldingState.add(RocksDBFoldingState.java:118)
... 6 more

Thanks for any help!

Shannon


Re: Firing windows multiple times

2016-09-02 Thread Shannon Carey
nd not in the direction of 
keeping track of the watermark and manually evicting elements as you go is that 
I think that this approach would be more memory efficient and easier to 
understand. I don't understand yet how a single window computation could keep 
track of aggregates for differently sized time windows and evict the correct 
elements without keeping all the elements in some store. Maybe you could shed 
some light on this? I'd be happy if there was a simple solution for this. :-)

Cheers,
Aljoscha



On Tue, 30 Aug 2016 at 23:49 Shannon Carey 
<sca...@expedia.com<mailto:sca...@expedia.com>> wrote:
I appreciate your suggestion!

However, the main problem with your approach is the amount of time that goes by 
without an updated value from minuteAggregate and hourlyAggregate (lack of a 
continuously updated aggregate).

For example, if we use a tumbling window of 1 month duration, then we only get 
an update for that value once a month! The values from that stream will be on 
average 0.5 months stale. A year-long window is even worse.

-Shannon

From: Aljoscha Krettek <aljos...@apache.org<mailto:aljos...@apache.org>>
Date: Tuesday, August 30, 2016 at 9:08 AM
To: Shannon Carey <sca...@expedia.com<mailto:sca...@expedia.com>>, 
"user@flink.apache.org<mailto:user@flink.apache.org>" 
<user@flink.apache.org<mailto:user@flink.apache.org>>

Subject: Re: Firing windows multiple times

Hi,
I think this can be neatly expressed by using something like a tree of windowed 
aggregations, i.e. you specify your smallest window computation first and then 
specify larger window computations based smaller windows. I've written an 
example that showcases this approach: 
https://gist.github.com/aljoscha/728ac69361f75c3ca87053b1a6f91fcd

The basic idea in pseudo code is this:

DataStream input = ...
dailyAggregate = input.keyBy(...).window(Time.days(1)).reduce(new Sum())
weeklyAggregate = dailyAggregate.keyBy(...).window(Time.days(7)).reduce(new 
Sum())
monthlyAggregate = weeklyAggregate(...).window(Time.days(30)).reduce(new Sum())

the benefit of this approach is that you don't duplicate computation and that 
you can have incremental aggregation using a reduce function. When manually 
keeping elements and evicting them based on time the amount of state that would 
have to be kept would be much larger.

Does that make sense and would it help your use case?

Cheers,
Aljoscha

On Mon, 29 Aug 2016 at 23:18 Shannon Carey 
<sca...@expedia.com<mailto:sca...@expedia.com>> wrote:
Yes, let me describe an example use-case that I'm trying to implement 
efficiently within Flink.

We've been asked to aggregate per-user data on a daily level, and from there 
produce aggregates on a variety of time frames. For example, 7 days, 30 days, 
180 days, and 365 days.

We can talk about the hardest one, the 365 day window, with the knowledge that 
adding the other time windows magnifies the problem.

I can easily use tumbling time windows of 1-day size for the first aggregation. 
However, for the longer aggregation, if I take the naive approach and use a 
sliding window, the window size would be 365 days and the slide would be one 
day. If a user comes back every day, I run the risk of magnifying the size of 
the data by up to 365 because each day of data will be included in up to 365 
year-long window panes. Also, if I want to fire the aggregate information more 
rapidly than once a day, then I have to worry about getting 365 different 
windows fired at the same time & trying to figure out which one to pay 
attention to, or coming up with a hare-brained custom firing trigger. We tried 
emitting each day-aggregate into a time series database and doing the final 365 
day aggregation as a query, but that was more complicated than we wanted: in 
particular we'd like to have all the logic in the Flink job not split across 
different technology & infrastructure.

The work-around I'm thinking of is to use a single window that contains 365 
days of data (relative to the current watermark) on an ongoing basis. The 
windowing function would be responsible for evicting old data based on the 
current watermark.

Does that make sense? Does it seem logical, or am I misunderstanding something 
about how Flink works?

-Shannon


From: Aljoscha Krettek <aljos...@apache.org<mailto:aljos...@apache.org>>
Date: Monday, August 29, 2016 at 3:56 AM

To: "user@flink.apache.org<mailto:user@flink.apache.org>" 
<user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Re: Firing windows multiple times

Hi,
that would certainly be possible? What do you think can be gained by having 
knowledge about the current watermark in the WindowFunction, in a specific 
case, possibly?

Cheers,
Aljoscha

On Wed, 24 Aug 2016 at 23:21 Shannon Carey 
<sca...@expedia.com<mailto:sca...@expedia.com>> wrote:
What do you think about adding the curre

Re: Firing windows multiple times

2016-08-30 Thread Shannon Carey
I appreciate your suggestion!

However, the main problem with your approach is the amount of time that goes by 
without an updated value from minuteAggregate and hourlyAggregate (lack of a 
continuously updated aggregate).

For example, if we use a tumbling window of 1 month duration, then we only get 
an update for that value once a month! The values from that stream will be on 
average 0.5 months stale. A year-long window is even worse.

-Shannon

From: Aljoscha Krettek <aljos...@apache.org<mailto:aljos...@apache.org>>
Date: Tuesday, August 30, 2016 at 9:08 AM
To: Shannon Carey <sca...@expedia.com<mailto:sca...@expedia.com>>, 
"user@flink.apache.org<mailto:user@flink.apache.org>" 
<user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Re: Firing windows multiple times

Hi,
I think this can be neatly expressed by using something like a tree of windowed 
aggregations, i.e. you specify your smallest window computation first and then 
specify larger window computations based smaller windows. I've written an 
example that showcases this approach: 
https://gist.github.com/aljoscha/728ac69361f75c3ca87053b1a6f91fcd

The basic idea in pseudo code is this:

DataStream input = ...
dailyAggregate = input.keyBy(...).window(Time.days(1)).reduce(new Sum())
weeklyAggregate = dailyAggregate.keyBy(...).window(Time.days(7)).reduce(new 
Sum())
monthlyAggregate = weeklyAggregate(...).window(Time.days(30)).reduce(new Sum())

the benefit of this approach is that you don't duplicate computation and that 
you can have incremental aggregation using a reduce function. When manually 
keeping elements and evicting them based on time the amount of state that would 
have to be kept would be much larger.

Does that make sense and would it help your use case?

Cheers,
Aljoscha

On Mon, 29 Aug 2016 at 23:18 Shannon Carey 
<sca...@expedia.com<mailto:sca...@expedia.com>> wrote:
Yes, let me describe an example use-case that I'm trying to implement 
efficiently within Flink.

We've been asked to aggregate per-user data on a daily level, and from there 
produce aggregates on a variety of time frames. For example, 7 days, 30 days, 
180 days, and 365 days.

We can talk about the hardest one, the 365 day window, with the knowledge that 
adding the other time windows magnifies the problem.

I can easily use tumbling time windows of 1-day size for the first aggregation. 
However, for the longer aggregation, if I take the naive approach and use a 
sliding window, the window size would be 365 days and the slide would be one 
day. If a user comes back every day, I run the risk of magnifying the size of 
the data by up to 365 because each day of data will be included in up to 365 
year-long window panes. Also, if I want to fire the aggregate information more 
rapidly than once a day, then I have to worry about getting 365 different 
windows fired at the same time & trying to figure out which one to pay 
attention to, or coming up with a hare-brained custom firing trigger. We tried 
emitting each day-aggregate into a time series database and doing the final 365 
day aggregation as a query, but that was more complicated than we wanted: in 
particular we'd like to have all the logic in the Flink job not split across 
different technology & infrastructure.

The work-around I'm thinking of is to use a single window that contains 365 
days of data (relative to the current watermark) on an ongoing basis. The 
windowing function would be responsible for evicting old data based on the 
current watermark.

Does that make sense? Does it seem logical, or am I misunderstanding something 
about how Flink works?

-Shannon


From: Aljoscha Krettek <aljos...@apache.org<mailto:aljos...@apache.org>>
Date: Monday, August 29, 2016 at 3:56 AM

To: "user@flink.apache.org<mailto:user@flink.apache.org>" 
<user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Re: Firing windows multiple times

Hi,
that would certainly be possible? What do you think can be gained by having 
knowledge about the current watermark in the WindowFunction, in a specific 
case, possibly?

Cheers,
Aljoscha

On Wed, 24 Aug 2016 at 23:21 Shannon Carey 
<sca...@expedia.com<mailto:sca...@expedia.com>> wrote:
What do you think about adding the current watermark to the window function 
metadata in FLIP-2?

From: Shannon Carey <sca...@expedia.com<mailto:sca...@expedia.com>>
Date: Friday, August 12, 2016 at 6:24 PM
To: Aljoscha Krettek <aljos...@apache.org<mailto:aljos...@apache.org>>, 
"user@flink.apache.org<mailto:user@flink.apache.org>" 
<user@flink.apache.org<mailto:user@flink.apache.org>>

Subject: Re: Firing windows multiple times

Thanks Aljoscha, I didn't know about those. Yes, they look like handy changes, 
especially to enable flexible approaches for eviction. In particular, having 
the current watermark ava

Re: Firing windows multiple times

2016-08-24 Thread Shannon Carey
What do you think about adding the current watermark to the window function 
metadata in FLIP-2?

From: Shannon Carey <sca...@expedia.com<mailto:sca...@expedia.com>>
Date: Friday, August 12, 2016 at 6:24 PM
To: Aljoscha Krettek <aljos...@apache.org<mailto:aljos...@apache.org>>, 
"user@flink.apache.org<mailto:user@flink.apache.org>" 
<user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Re: Firing windows multiple times

Thanks Aljoscha, I didn't know about those. Yes, they look like handy changes, 
especially to enable flexible approaches for eviction. In particular, having 
the current watermark available to the evictor via EvictorContext is helpful: 
it will be able to evict the old data more easily without needing to rely on 
Window#maxTimestamp().

However, I think you might still be missing a piece. Specifically, it would 
still not be possible for the window function to choose which items to 
aggregate based on the current watermark. In particular, it is desirable to be 
able to aggregate only the items below the watermark, omitting items which have 
come in with timestamps larger than the watermark. Does that make sense?

-Shannon

From: Aljoscha Krettek <aljos...@apache.org<mailto:aljos...@apache.org>>
Date: Friday, August 12, 2016 at 4:25 AM
To: "user@flink.apache.org<mailto:user@flink.apache.org>" 
<user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Re: Firing windows multiple times

Hi,
there is already this FLIP: 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-4+%3A+Enhance+Window+Evictor
 which also links to a mailing list discussion. And this FLIP: 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata.
 The former proposes to enhance the Evictor API a bit, among other things we 
propose to give the evictor access to the current watermark. The other FLIP 
proposes to extend the amount of meta-data we give to the window function. The 
first to things we propose to add is a "firing reason" that would tell you 
whether this was an early firing, an on time firing or a late firing. The 
second thing is a firing counter that would tell you how many times the trigger 
has fired so far for the current window.

Would a combination of these help with your use case?

Cheers,
Aljoscha

On Thu, 11 Aug 2016 at 19:19 Shannon Carey 
<sca...@expedia.com<mailto:sca...@expedia.com>> wrote:
"If Window B is a Folding Window and does not have an evictor then it should 
not keep the list of all received elements."

Agreed! Upon closer inspection, the behavior I'm describing is only present 
when using EvictingWindowOperator, not when using WindowOperator. I misread 
line 382 of WindowOperator which calls windowState.add(): in actuality, the 
windowState is a FoldingState which incorporates the user-provided fold 
function in order to eagerly fold the data. In contrast, if you use an evictor, 
EvictingWindowOperator has the behavior I describe.

I am already using a custom Trigger which uses a processing timer to FIRE a 
short time after a new event comes in, and an event timer to FIRE_AND_PURGE.

It seems that I can achieve the desired effect by avoiding use of an evictor so 
that the intermediate events are not retained in an EvictingWindowOperator's 
state, and perform any necessary eviction within my fold function. This has the 
aforementioned drawbacks of the windowed fold function not knowing about 
watermarks, and therefore it is difficult to be precise about choosing which 
items to evict. However, this seems to be the best choice within the current 
framework.

Interestingly, it appears that TimeEvictor doesn't really know about watermarks 
either. When a window emits an event, regardless of how it was fired, it is 
assigned the timestamp given by its window's maxTimestamp(), which might be 
much greater than the processing time that actually fired the event. Then, 
TimeEvictor compares the max timestamp of all items in the window against the 
other ones in order to determine which ones to evict. Basically, it assumes 
that the events were emitted due to the window terminating with FIRE_AND_PURGE. 
What if we gave more information (specifically, the current watermark) to the 
evictor in order to allow it to deal with a mix of intermediate events (fired 
by processing time) and final events (fired by event time when the watermark 
reaches the window)? That value is already available in the WindowOperator & 
could be passed to the Evictor very easily. It would be an API change, of 
course.

Other than that, is it worth considering a change to EvictingWindowOperator to 
allow user-supplied functions to reduce the size of its state when people fire 
upstream windows repeatedly? From what I see when I monitor the state with 
debugger print statements, the EvictingWindowOperator is definitely holding on 
to all the el

Re: 1.1.1: JobManager config endpoint no longer supplies port

2016-08-15 Thread Shannon Carey
Thanks Ufuk. For now, we will use the Yarn AM proxy.

About uploading JARs: the JobManager UI that is exposed via the Yarn AM proxy 
does not allow manually uploading Flink job jars for execution on the cluster 
(look for "Yarn's AM proxy doesn't allow file uploads." in the code). As I 
understand it, this is due choices Yarn has made about security.



On 8/15/16, 9:25 AM, "Ufuk Celebi" <u...@apache.org> wrote:

>I've verified this. I think this has likely accidentally changed with
>the refactoring of the YARN setup for Flink 1.1. We probably wrote the
>web monitor port explicitly to the config in 1.0 whereas we don't do
>it in 1.1 anymore. I think this should be addressed with the next
>bugfix release 1.1.2.
>
>What comes to mind to find out the port is to parse the logs, but I
>don't think that's feasible, is it? Would also be interested what you
>are referring to with the JARs.
>
>On Mon, Aug 15, 2016 at 4:04 PM, Ufuk Celebi <u...@apache.org> wrote:
>> Hey Shannon! I just took a look at the code and it looks like the
>> Flink REST handler for the config did _not_ change since last year. It
>> could be that somehow the config is loaded differently. Can you verify
>> that using the same config with Flink 1.0 and Flink 1.1 the port is
>> printed correctly and as 0? I will look into it as well.
>>
>> The JAR uploading issue is a separate one, correct?
>>
>> On Fri, Aug 12, 2016 at 6:03 PM, Shannon Carey <sca...@expedia.com> wrote:
>>> It appears that when running Flink 1.1.1 on Yarn, my previous method of
>>> making a request to the yarn AM proxy on the master node at
>>> http://{master_node}:20888/proxy/{app_id}/jobmanager/config doesn't work the
>>> same as it did.
>>>
>>> Previously, the returned JSON value would include an accurate value for
>>> "jobmanager.web.port". Now, however, it appears to return a zero.
>>>
>>> As a result, I cannot expose the JobManager UI itself. I can use the Yarn
>>> proxy but it does not support the useful feature of uploading JARs.
>>>
>>> Is there another way of determining which port the JobManager is running on?
>>> Thanks!
>>>
>>> -Shannon
>


Re: Firing windows multiple times

2016-08-12 Thread Shannon Carey
Thanks Aljoscha, I didn't know about those. Yes, they look like handy changes, 
especially to enable flexible approaches for eviction. In particular, having 
the current watermark available to the evictor via EvictorContext is helpful: 
it will be able to evict the old data more easily without needing to rely on 
Window#maxTimestamp().

However, I think you might still be missing a piece. Specifically, it would 
still not be possible for the window function to choose which items to 
aggregate based on the current watermark. In particular, it is desirable to be 
able to aggregate only the items below the watermark, omitting items which have 
come in with timestamps larger than the watermark. Does that make sense?

-Shannon

From: Aljoscha Krettek <aljos...@apache.org<mailto:aljos...@apache.org>>
Date: Friday, August 12, 2016 at 4:25 AM
To: "user@flink.apache.org<mailto:user@flink.apache.org>" 
<user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Re: Firing windows multiple times

Hi,
there is already this FLIP: 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-4+%3A+Enhance+Window+Evictor
 which also links to a mailing list discussion. And this FLIP: 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata.
 The former proposes to enhance the Evictor API a bit, among other things we 
propose to give the evictor access to the current watermark. The other FLIP 
proposes to extend the amount of meta-data we give to the window function. The 
first to things we propose to add is a "firing reason" that would tell you 
whether this was an early firing, an on time firing or a late firing. The 
second thing is a firing counter that would tell you how many times the trigger 
has fired so far for the current window.

Would a combination of these help with your use case?

Cheers,
Aljoscha

On Thu, 11 Aug 2016 at 19:19 Shannon Carey 
<sca...@expedia.com<mailto:sca...@expedia.com>> wrote:
"If Window B is a Folding Window and does not have an evictor then it should 
not keep the list of all received elements."

Agreed! Upon closer inspection, the behavior I'm describing is only present 
when using EvictingWindowOperator, not when using WindowOperator. I misread 
line 382 of WindowOperator which calls windowState.add(): in actuality, the 
windowState is a FoldingState which incorporates the user-provided fold 
function in order to eagerly fold the data. In contrast, if you use an evictor, 
EvictingWindowOperator has the behavior I describe.

I am already using a custom Trigger which uses a processing timer to FIRE a 
short time after a new event comes in, and an event timer to FIRE_AND_PURGE.

It seems that I can achieve the desired effect by avoiding use of an evictor so 
that the intermediate events are not retained in an EvictingWindowOperator's 
state, and perform any necessary eviction within my fold function. This has the 
aforementioned drawbacks of the windowed fold function not knowing about 
watermarks, and therefore it is difficult to be precise about choosing which 
items to evict. However, this seems to be the best choice within the current 
framework.

Interestingly, it appears that TimeEvictor doesn't really know about watermarks 
either. When a window emits an event, regardless of how it was fired, it is 
assigned the timestamp given by its window's maxTimestamp(), which might be 
much greater than the processing time that actually fired the event. Then, 
TimeEvictor compares the max timestamp of all items in the window against the 
other ones in order to determine which ones to evict. Basically, it assumes 
that the events were emitted due to the window terminating with FIRE_AND_PURGE. 
What if we gave more information (specifically, the current watermark) to the 
evictor in order to allow it to deal with a mix of intermediate events (fired 
by processing time) and final events (fired by event time when the watermark 
reaches the window)? That value is already available in the WindowOperator & 
could be passed to the Evictor very easily. It would be an API change, of 
course.

Other than that, is it worth considering a change to EvictingWindowOperator to 
allow user-supplied functions to reduce the size of its state when people fire 
upstream windows repeatedly? From what I see when I monitor the state with 
debugger print statements, the EvictingWindowOperator is definitely holding on 
to all the elements ever received, not just the aggregated result. You can see 
this clearly because EvictingWindowOperator holds a ListState instead of a 
FoldingState. The user-provided fold function is only applied upon fire().

-Shannon




1.1.1: JobManager config endpoint no longer supplies port

2016-08-12 Thread Shannon Carey
It appears that when running Flink 1.1.1 on Yarn, my previous method of making 
a request to the yarn AM proxy on the master node at 
http://{master_node}:20888/proxy/{app_id}/jobmanager/config doesn't work the 
same as it did.

Previously, the returned JSON value would include an accurate value for 
"jobmanager.web.port". Now, however, it appears to return a zero.

As a result, I cannot expose the JobManager UI itself. I can use the Yarn proxy 
but it does not support the useful feature of uploading JARs.

Is there another way of determining which port the JobManager is running on? 
Thanks!

-Shannon


Re: Firing windows multiple times

2016-08-11 Thread Shannon Carey
"If Window B is a Folding Window and does not have an evictor then it should 
not keep the list of all received elements."

Agreed! Upon closer inspection, the behavior I'm describing is only present 
when using EvictingWindowOperator, not when using WindowOperator. I misread 
line 382 of WindowOperator which calls windowState.add(): in actuality, the 
windowState is a FoldingState which incorporates the user-provided fold 
function in order to eagerly fold the data. In contrast, if you use an evictor, 
EvictingWindowOperator has the behavior I describe.

I am already using a custom Trigger which uses a processing timer to FIRE a 
short time after a new event comes in, and an event timer to FIRE_AND_PURGE.

It seems that I can achieve the desired effect by avoiding use of an evictor so 
that the intermediate events are not retained in an EvictingWindowOperator's 
state, and perform any necessary eviction within my fold function. This has the 
aforementioned drawbacks of the windowed fold function not knowing about 
watermarks, and therefore it is difficult to be precise about choosing which 
items to evict. However, this seems to be the best choice within the current 
framework.

Interestingly, it appears that TimeEvictor doesn't really know about watermarks 
either. When a window emits an event, regardless of how it was fired, it is 
assigned the timestamp given by its window's maxTimestamp(), which might be 
much greater than the processing time that actually fired the event. Then, 
TimeEvictor compares the max timestamp of all items in the window against the 
other ones in order to determine which ones to evict. Basically, it assumes 
that the events were emitted due to the window terminating with FIRE_AND_PURGE. 
What if we gave more information (specifically, the current watermark) to the 
evictor in order to allow it to deal with a mix of intermediate events (fired 
by processing time) and final events (fired by event time when the watermark 
reaches the window)? That value is already available in the WindowOperator & 
could be passed to the Evictor very easily. It would be an API change, of 
course.

Other than that, is it worth considering a change to EvictingWindowOperator to 
allow user-supplied functions to reduce the size of its state when people fire 
upstream windows repeatedly? From what I see when I monitor the state with 
debugger print statements, the EvictingWindowOperator is definitely holding on 
to all the elements ever received, not just the aggregated result. You can see 
this clearly because EvictingWindowOperator holds a ListState instead of a 
FoldingState. The user-provided fold function is only applied upon fire().

-Shannon




Re: Firing windows multiple times

2016-08-10 Thread Shannon Carey
Hi Aljoscha,

Yes, I am using an Evictor, and I think I have seen the problem you are 
referring to. However, that's not what I'm talking about.

If you re-read my first email, the main point is the following: if users desire 
updates more frequently than window watermarks are reached, then window state 
behaves suboptimally. It doesn't matter if there's an evictor or not. 
Specifically:

If I have a windows "A" that I fire multiple times in order to provide 
incremental results as data comes in instead of waiting for the watermark to 
purge the window
And that window's events are gathered into another, bigger window "B"
And I want to keep only the latest event from each upstream window "A" (by 
timestamp, where each window pane has its own timestamp)
Even if I have a fold/reduce method on the bigger window "B" to make sure that 
each updated event from "A" overwrites the previous event (by timestamp)
Window "B" will hold in state all events from windows "A", including all the 
incremental events that were fired by processing-time triggers, even though I 
don't actually need those events because the reducer gets rid of them

An example description of execution flow:

  1.  Event x
  2.  Window A receives event, trigger waits for processing time delay, then 
emits event x(time=1, count=1)
  3.  Window B receives event, trigger waits for processing time delay, then 
executes fold() and emits event(time=1 => count=1), but internal Window state 
looks like [x(time=1, count=1)]
  4.  Event y
  5.  Window A receives event, trigger '', then emits event y(time=1, count=2)
  6.  Window B receives event, trigger '', then executes fold() and emits 
event(time=1 => count=2), but internal Window state looks like [x(time=1, 
count=1), y(time=1, count=2)]
  7.  Watermark z
  8.  Window A receives watermark, trigger's event timer is reached, fires and 
purges and emits current state as event z(time=1, count=2)
  9.  Window B receives event, trigger waits for processing time delay, then 
executes fold() and emits event(time=1 => count=2), but internal Window state 
looks like [x(time=1, count=1), y(time=1, count=2), z(time=1, count=2)]

As you can see, the internal window state continues to grow despite what fold() 
is doing.

Does that explanation help interpret my original email?

-Shannon


From: Aljoscha Krettek <aljos...@apache.org<mailto:aljos...@apache.org>>
Date: Wednesday, August 10, 2016 at 12:18 PM
To: "user@flink.apache.org<mailto:user@flink.apache.org>" 
<user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Re: Firing windows multiple times

Hi,
from your mail I'm gathering that you are in fact using an Evictor, is that 
correct? If not, then the window operator should not keep all the elements ever 
received for a window but only the aggregated result.

Side note, there seems to be a bug in EvictingWindowOperator that causes 
evicted elements to not actually be removed from the state. They are only 
filtered from the Iterable that is given to the WindowFunction. I opened a Jira 
issue for that: https://issues.apache.org/jira/browse/FLINK-4369

Cheers,
Aljoscha

On Wed, 10 Aug 2016 at 18:19 Shannon Carey 
<sca...@expedia.com<mailto:sca...@expedia.com>> wrote:
One unfortunate aspect of using a fold() instead of a window is that the fold 
function has no knowledge of the watermarks. As a result, it is difficult to 
ensure that only items before the current watermark are included in the 
aggregation, and that old items are evicted correctly. This fact lends more 
support to the idea of using a custom operator (though that is more complex) or 
adding support for this use case within Flink.

-Shannon


Re: Firing windows multiple times

2016-08-10 Thread Shannon Carey
One unfortunate aspect of using a fold() instead of a window is that the fold 
function has no knowledge of the watermarks. As a result, it is difficult to 
ensure that only items before the current watermark are included in the 
aggregation, and that old items are evicted correctly. This fact lends more 
support to the idea of using a custom operator (though that is more complex) or 
adding support for this use case within Flink.

-Shannon


Firing windows multiple times

2016-08-10 Thread Shannon Carey
I recently noticed something about windows: they retain (in state) every 
element that they receive regardless of whether the user provides a fold/reduce 
function. I can tell that such an approach is necessary in order for evictors 
to work, but I'm not sure if there are other reasons.

I'll describe a use case where this approach is not optimal, and then maybe we 
can discuss ways to get around it or possible modifications to Flink. My jobs 
include windows that are wider than the frequency at which we want updates. For 
example, I might have a window that is one day long, but I might want an 
updated value to be emitted from that window within (say) one processing-time 
minute of a new event being assigned to it. I can accomplish that with a 
trigger that has processing-time delay FIRE as well as event-time 
FIRE_AND_PURGE. Next, I want to gather those items into a bigger window: 
perhaps a month or a year wide. My fold function can ensure that multiple 
events from an upstream window overwrite each other so that they are not 
counted multiple times. However, as I mentioned, the wide window's state will 
hold all the events: all the processing-time fires as well as the final event 
from the upstream FIRE_AND_PURGE. That will make the state bigger than it needs 
to be.

With regard to solutions within the bounds of the existing framework, I am 
considering using a regular fold() operation instead of a long window. The fold 
function would be responsible for performing the eviction that the window was 
previously responsible for. I could implement that as a RichFoldFunction with a 
ReducingState. The main difference is that there would be no triggering 
involved (incoming items would immediately result in reduce() emitting a new 
aggregate). I could also possibly implement my own operator. Are there 
other/better options I have not considered?

Is it desirable to improve support for this use case within Flink? I can 
imagine that other people may want to get incremental/ongoing results from 
their windows as data comes in instead of waiting for the watermark to purge 
the window. In general, they might want better control over the window state. 
If so, what would the solution look like? Perhaps we could allow users to 
specify an additional method to the window operator which extracts the identity 
of any new event, and then Flink would ensure that new events overwrite 
existing events within the window state, preventing it from growing 
unnecessarily. Or, perhaps there is a way to do it based on the identity of the 
window that produces the event? Or, more generally, perhaps we could allow user 
provided fold/reduce functions to eagerly reduce the state of the window, 
although that might impact the evictor feature?

Thanks for your thoughts,
Shannon


Re: Flink 1.1.0 : Hadoop 1/2 compatibility mismatch

2016-08-10 Thread Shannon Carey
Works for me, thanks!



-Shannon


Re: Classloader issue using AvroParquetInputFormat via HadoopInputFormat

2016-08-08 Thread Shannon Carey
Correction: I cannot work around the problem. If I exclude hadoop1, I get the 
following exception which appears to be due to flink-java-1.1.0's dependency on 
Hadoop1.

Failed to submit job 4b6366d101877d38ef33454acc6ca500 
(com.expedia.www.flink.jobs.DestinationCountsHistoryJob$)
org.apache.flink.runtime.client.JobExecutionException: Failed to submit job 
4b6366d101877d38ef33454acc6ca500 
(com.expedia.www.flink.jobs.DestinationCountsHistoryJob$)
at 
org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1281)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:478)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at 
org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at 
org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at 
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:121)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
at akka.dispatch.Mailbox.run(Mailbox.scala:221)
at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.JobException: Creating the input splits 
caused an error: Found interface org.apache.hadoop.mapreduce.JobContext, but 
class was expected
at 
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.(ExecutionJobVertex.java:172)
at 
org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:695)
at 
org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1178)
... 19 more
Caused by: java.lang.IncompatibleClassChangeError: Found interface 
org.apache.hadoop.mapreduce.JobContext, but class was expected
at 
org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase.createInputSplits(HadoopInputFormatBase.java:158)
at 
org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase.createInputSplits(HadoopInputFormatBase.java:56)
at 
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.(ExecutionJobVertex.java:156)
... 21 more

And if I exclude hadoop2, I get the exception from my previous email with 
AvroParquetInputFormat.



From: Shannon Carey <sca...@expedia.com<mailto:sca...@expedia.com>>
Date: Monday, August 8, 2016 at 2:46 PM
To: "user@flink.apache.org<mailto:user@flink.apache.org>" 
<user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Classloader issue using AvroParquetInputFormat via HadoopInputFormat

Hi folks, congrats on 1.1.0!

FYI, after updating to Flink 1.1.0 I get the exception at bottom when 
attempting to run a job that uses AvroParquetInputFormat wrapped in a Flink 
HadoopInputFormat. The ContextUtil.java:71 is trying to execute:

Class.forName("org.apache.hadoop.mapreduce.task.JobContextImpl");

I am using Scala 2.11.7. JobContextImpl is coming from 
flink-shaded-hadoop2:1.1.0. However, its parent class (JobContext) is actually 
being loaded (according to output with JVM param "-verbose:class") from the 
flink-shaded-hadoop1_2.10 jar.

After adding an exclusion on flink-shaded-hadoop1_2.10, the problem appears to 
be resolved. Is that the right way to fix the problem?

From what I can tell, the problem is that the JARs that are deployed to Maven 
Central were built with different versions of Hadoop (as controlled by 
hadoop.profile):

flink-runtime_2.11 depends on Hadoop 2
flink-java depends on Hadoop 1 (Scala 2.10)
flink-core depends on Hadoop 1 (Scala 2.10)

This seems like a problem with Flink's build process.

As an aside: would it be possible to change the interface of HadoopInputFormat 
to take a Configuration instead of a Job? That would reduce the dependence on 
the Hadoop API somewhat. It doesn't look like the Job itself is ever actually 
used for anything. I'm glad to see you already have 
https://issues.apache.org/jira/browse/FLINK-4316 and 
https://issues.apache.org/jira/browse/FLINK-4315

Thanks,
Shannon

java.lang.IncompatibleClassChangeError: Implementi

Classloader issue using AvroParquetInputFormat via HadoopInputFormat

2016-08-08 Thread Shannon Carey
Hi folks, congrats on 1.1.0!

FYI, after updating to Flink 1.1.0 I get the exception at bottom when 
attempting to run a job that uses AvroParquetInputFormat wrapped in a Flink 
HadoopInputFormat. The ContextUtil.java:71 is trying to execute:

Class.forName("org.apache.hadoop.mapreduce.task.JobContextImpl");

I am using Scala 2.11.7. JobContextImpl is coming from 
flink-shaded-hadoop2:1.1.0. However, its parent class (JobContext) is actually 
being loaded (according to output with JVM param "-verbose:class") from the 
flink-shaded-hadoop1_2.10 jar.

After adding an exclusion on flink-shaded-hadoop1_2.10, the problem appears to 
be resolved. Is that the right way to fix the problem?

From what I can tell, the problem is that the JARs that are deployed to Maven 
Central were built with different versions of Hadoop (as controlled by 
hadoop.profile):

flink-runtime_2.11 depends on Hadoop 2
flink-java depends on Hadoop 1 (Scala 2.10)
flink-core depends on Hadoop 1 (Scala 2.10)

This seems like a problem with Flink's build process.

As an aside: would it be possible to change the interface of HadoopInputFormat 
to take a Configuration instead of a Job? That would reduce the dependence on 
the Hadoop API somewhat. It doesn't look like the Job itself is ever actually 
used for anything. I'm glad to see you already have 
https://issues.apache.org/jira/browse/FLINK-4316 and 
https://issues.apache.org/jira/browse/FLINK-4315

Thanks,
Shannon

java.lang.IncompatibleClassChangeError: Implementing class
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:760)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:264)
at org.apache.parquet.hadoop.util.ContextUtil.(ContextUtil.java:71)
at 
org.apache.parquet.avro.AvroParquetInputFormat.setRequestedProjection(AvroParquetInputFormat.java:54)
at 
com.expedia.www.sdk.flink.HistoricalDataIngestionJob.readHistoricalParquetFile(HistoricalDataIngestionJob.scala:63)
at 
com.expedia.www.flink.jobs.DestinationCountsHistoryJob$.main(DestinationCountsHistoryJob.scala:25)
at 
com.expedia.www.flink.jobs.DestinationCountsHistoryTest$$anonfun$1.apply$mcV$sp(DestinationCountsHistoryTest.scala:23)
at 
com.expedia.www.flink.jobs.DestinationCountsHistoryTest$$anonfun$1.apply(DestinationCountsHistoryTest.scala:20)
at 
com.expedia.www.flink.jobs.DestinationCountsHistoryTest$$anonfun$1.apply(DestinationCountsHistoryTest.scala:20)
at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
at org.scalatest.Transformer.apply(Transformer.scala:22)
at org.scalatest.Transformer.apply(Transformer.scala:20)
at org.scalatest.FlatSpecLike$$anon$1.apply(FlatSpecLike.scala:1647)
at org.scalatest.Suite$class.withFixture(Suite.scala:1122)
at org.scalatest.FlatSpec.withFixture(FlatSpec.scala:1683)
at org.scalatest.FlatSpecLike$class.invokeWithFixture$1(FlatSpecLike.scala:1644)
at org.scalatest.FlatSpecLike$$anonfun$runTest$1.apply(FlatSpecLike.scala:1656)
at org.scalatest.FlatSpecLike$$anonfun$runTest$1.apply(FlatSpecLike.scala:1656)
at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
at org.scalatest.FlatSpecLike$class.runTest(FlatSpecLike.scala:1656)
at org.scalatest.FlatSpec.runTest(FlatSpec.scala:1683)
at org.scalatest.FlatSpecLike$$anonfun$runTests$1.apply(FlatSpecLike.scala:1714)
at org.scalatest.FlatSpecLike$$anonfun$runTests$1.apply(FlatSpecLike.scala:1714)
at 
org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:413)
at 
org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
at 
org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:390)
at 
org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:427)
at 
org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
at 
org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:396)
at 

Re: How to read AVRO data from Kafka using Flink

2016-08-02 Thread Shannon Carey
I can tell you that we are reading Avro data from Kafka on Flink without 
problems. It seems like you have a mistake somewhere in your system. If I were 
you I would try your serialization & deserialization code in a simple program 
within the same JVM, then gradually add the other components in order to narrow 
down where the problem is coming from.

Shannon

From: "Alam, Zeeshan" >
Date: Tuesday, August 2, 2016 at 5:28 AM
To: "user@flink.apache.org" 
>
Subject: How to read AVRO data from Kafka using Flink

Hi All,

I am trying to read AVRO data from Kafka using Flink 1.0.3 but I am getting 
error. I have posted this issue in Stack Overflow: 
http://stackoverflow.com/questions/38715286/how-to-decode-kafka-messages-using-avro-and-flink
 . Is there any mistake we can try to look into or there a better way to read 
AVR data from Kafka using Flink?

Thanks & Regards
Zeeshan Alam




Re: API request to submit job takes over 1hr

2016-06-13 Thread Shannon Carey
Robert,

Thanks for your thoughtful response.


  1.  I understand your concern. User code is not guaranteed to respond to 
thread interrupts. So no matter what you do, you may end up with a stuck 
thread. But I think we can improve the user experience. First, we can update 
the documentation to make it clear that the main() method will be executed 
during job submission, and that jobs should make an effort to avoid doing 
time-consuming work in that main method. Second, I still think it's in your 
best interest to decouple the job submission thread from the HTTP thread. That 
would un-hide the problem, because the end-user could see that their job 
request has been started but is not making it past a certain point (maybe it's 
in one phase/status before main() executes, and in a different status once 
main() completes). Also, it would be obvious if they have made (and failed or 
aborted) multiple job submission API requests that those requests are still 
occupying threads. Right now, it's impossible to tell what has happened to the 
request or whether it is occupying a thread without relying on log output 
(which took us a while to get right in AWS EMR YARN) or a stack dump. Ideally, 
the UI should be able to list all the threads that are currently working on job 
submission.
  2.  I see, the main method will execute on the Application Master, right? I 
created https://issues.apache.org/jira/browse/FLINK-4069 Unfortunately, I don't 
understand very well how Kafka brokers & clients cooperate to make sure that 
partitions are distributed across consumers that share a group id (is there 
documentation about that somewhere?)… Also, I'm not sure how Flink deals with 
repartitioning.

-Shannon

From: Robert Metzger <rmetz...@apache.org<mailto:rmetz...@apache.org>>
Date: Thursday, June 2, 2016 at 4:19 AM
To: "user@flink.apache.org<mailto:user@flink.apache.org>" 
<user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Re: API request to submit job takes over 1hr

Hi Shannon,

thank you for further investigating the issue.
Its fine to keep the discussion on the user@ list. Most devs are on the user 
list as well and we'll probably file some JIRAs.

Regarding your suggestions:
1. Not sure if making the job submission non-blocking is a good idea. We would 
probably need to interrupt the submitting thread after a while, which does not 
always work (we made the experience that Kafka and Hadoop for example often 
ignore interrupts, or even worse gets stuck afterwards). This would just hide 
the problems or introduce new issues.

2. As you've identified correctly, the real issue here is that the Kafka 
consumer is querying the brokers for metadata from the constructor (= on the 
client) not from the workers in the cluster (in the open() method).
Changing the behavior is on my todo list. If you want, you can file a JIRA for 
this. If you have also time to work on this, you can of course also open a pull 
request. Otherwise, some contributors from the Flink community can take care of 
the implementation.
The main reason why we do the querying centrally is: a) avoid overloading the 
brokers b) send the same list of partitions (in the same order) to all parallel 
consumers to do a fixed partition assignments (also across restarts). When we 
do the querying in the open() method, we need to make sure that all partitions 
are assigned, without duplicates (also after restarts in case of failures).

Regards,
Robert




On Thu, Jun 2, 2016 at 1:44 AM, Shannon Carey 
<sca...@expedia.com<mailto:sca...@expedia.com>> wrote:
It looks like the problem is due to the stack trace below.

Simply put, connection failure to Kafka when using the default settings causes 
job submission to take over (flink.get-partitions.retry * tries by 
SimpleConsumer * socket.timeout.ms<http://socket.timeout.ms> * # of Kafka 
brokers) = (3 * 2 * 30 * (# of Kafka brokers)) seconds. In my case, since I 
have 36 Kafka brokers, it took over 108 minutes. This is beyond the maximum 
idle connection timeout of an AWS ELB of 60 minutes, and beyond the normal 
length of time most people expect an HTTP request to take. During these 108 
minutes and after, aside from examining logs & stack traces, it is not possible 
to determine what is happening with regard to the run job request. It simply 
appears to hang and then fail, typically with a 504 Gateway Timeout status.

There are a couple problems that are responsible for this situation. Let me 
know if I should move this discussion to the "devs" list: I am not a member 
there yet. I am happy to submit JIRAs and I would be able to submit a Pull 
Request for the change to FlinkKafkaConsumer08 (and 09) initialization as 
suggested below.

  1.  JarRunHandler is provided with a timeout value, but that timeout value is 
ignored when calling getJobGraphAndClassLoader(). This allows HTTP "run" 
requests to take arbitrary amo

Kafka exception "Unable to find a leader for partitions"

2016-06-08 Thread Shannon Carey
Does anyone have a guess what might cause this exception?


java.lang.RuntimeException: Unable to find a leader for partitions: 
[FetchPartition {topic=usersignals, partition=1, offset=2825838}]

at 
org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.findLeaderForPartitions(LegacyFetcher.java:323)

at 
org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:162)

at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.run(FlinkKafkaConsumer08.java:316)

at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)

at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)

at java.lang.Thread.run(Thread.java:745)


Re: API request to submit job takes over 1hr

2016-06-01 Thread Shannon Carey
eMonitorHandler.respondAsLeader(RuntimeMonitorHandler.java:135)
at 
org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler.channelRead0(RuntimeMonitorHandler.java:112)
at 
org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler.channelRead0(RuntimeMonitorHandler.java:60)
at 
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at io.netty.handler.codec.http.router.Handler.routed(Handler.java:62)
at 
io.netty.handler.codec.http.router.DualAbstractHandler.channelRead0(DualAbstractHandler.java:57)
at 
io.netty.handler.codec.http.router.DualAbstractHandler.channelRead0(DualAbstractHandler.java:20)
at 
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at 
org.apache.flink.runtime.webmonitor.HttpRequestHandler.channelRead0(HttpRequestHandler.java:158)
at 
org.apache.flink.runtime.webmonitor.HttpRequestHandler.channelRead0(HttpRequestHandler.java:65)
at 
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at 
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)
at 
io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:147)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at 
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at 
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at 
io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
at java.lang.Thread.run(Thread.java:745)

From: Shannon Carey <shannon.ca...@orbitz.com<mailto:shannon.ca...@orbitz.com>> 
on behalf of Shannon Carey <sca...@expedia.com<mailto:sca...@expedia.com>>
Date: Wednesday, June 1, 2016 at 12:54 PM
To: "user@flink.apache.org<mailto:user@flink.apache.org>" 
<user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: API request to submit job takes over 1hr

Hi folks,

I have deployed a Flink cluster on top of YARN in an AWS EMR cluster in my test 
environment, and everything is working fine. However,  I am unable to submit 
jobs to the prod cluster.

Uploading the JAR containing a Flink job succeeds. However, the request to run 
the job (UI makes API request to /jars//run?) takes so long to 
complete that the ELB finally returns a 504 GATEWAY_TIMEOUT response. This is 
the case even if the ELB timeout is set to 1hr: the request returns 504 after 
1hr. The request appears to fail server-side, also, since no jobs have ever 
showed up in the UI as being in any status (successful/failed/completed or 
otherwise). Shortly after the request is made, it is interesting to note that 
sometimes (but not always), other requests by the UI to the API begin to take 
longer than usual, although they do all eventually complete.

No interesting/suspicious log entries have been found. All YARN nodes appear 
healthy.

Does anyone have ideas about what the problem might be? Or ideas about 
troubleshooting steps I should take?

Also, I was wondering if 1GB is a reasonable amount of memory to use for the 
Flink Job Manager? It appears to be using only ~570MB but I am not sure if the 
Job Manager might be misbehaving due to resource constraints. The prod cluster 
is currently composed of six c3.2xlarge EC2 instances. Task memory is set to 
10496, Job Manager memory is set to 1024, and there are 8 slots set in the 
yarn-session.sh command. Are there any guidelines for memory allocation for the 
Job Manager?

Thanks very much!
Shannon Carey


API request to submit job takes over 1hr

2016-06-01 Thread Shannon Carey
Hi folks,

I have deployed a Flink cluster on top of YARN in an AWS EMR cluster in my test 
environment, and everything is working fine. However,  I am unable to submit 
jobs to the prod cluster.

Uploading the JAR containing a Flink job succeeds. However, the request to run 
the job (UI makes API request to /jars//run?) takes so long to 
complete that the ELB finally returns a 504 GATEWAY_TIMEOUT response. This is 
the case even if the ELB timeout is set to 1hr: the request returns 504 after 
1hr. The request appears to fail server-side, also, since no jobs have ever 
showed up in the UI as being in any status (successful/failed/completed or 
otherwise). Shortly after the request is made, it is interesting to note that 
sometimes (but not always), other requests by the UI to the API begin to take 
longer than usual, although they do all eventually complete.

No interesting/suspicious log entries have been found. All YARN nodes appear 
healthy.

Does anyone have ideas about what the problem might be? Or ideas about 
troubleshooting steps I should take?

Also, I was wondering if 1GB is a reasonable amount of memory to use for the 
Flink Job Manager? It appears to be using only ~570MB but I am not sure if the 
Job Manager might be misbehaving due to resource constraints. The prod cluster 
is currently composed of six c3.2xlarge EC2 instances. Task memory is set to 
10496, Job Manager memory is set to 1024, and there are 8 slots set in the 
yarn-session.sh command. Are there any guidelines for memory allocation for the 
Job Manager?

Thanks very much!
Shannon Carey


Flink YARN job manager web port

2016-04-21 Thread Shannon Carey
The documentation states: "The ports Flink is using for its services are the 
standard ports configured by the user + the application id as an offset"

When I launch Flink via YARN in an AWS EMR cluster, stdout says:
JobManager Web Interface: 
http://ip-xxx.us-west-2.compute.internal:20888/proxy/application_1461178294210_0010/

I need to be able to create an IAM Security Group that allows access to the 
JobManager web interface so that I can make use of it. However, I am confused 
about how port 20888 is chosen. Based on the code, I would have guessed that it 
would use the same port as given by: "yarn application -status 
application_1461178294210_0010". However, that's not the case (they don't 
match). It gives "Tracking-URL : http://ip-xxx.us-west-2.compute.internal:36495;

On the other hand, I see that YarnApplicationMasterRunner sets the port to 0, 
which InetSocketAddress says results in "A port number of zero will let the 
system pick up an ephemeral port in a bind operation."

I couldn't find anything in the code that adds an offset to a port. Changing 
the value of "jobmanager.web.port" appears to have no effect. The documentation 
on "Running Flink on YARN behind Firewalls" only talks about the JobManager and 
BlobServer ports.

Does Flink need logic to allow users to specify a range of ports for 
jobmanager.web.port in the same way as is done in 
BootstrapTools#startActorSystem? If so, I am happy to make that contribution!

-Shannon


Re: RocksDB Statebackend

2016-04-13 Thread Shannon Carey
This is something that my team and I have discussed building, so it's great to 
know that it's already on the radar. If we beat you to it, I'll definitely try 
to make it a contribution.

Shannon

From: Aljoscha Krettek >
Date: Wednesday, April 13, 2016 at 1:46 PM
To: >
Subject: Re: RocksDB Statebackend

Hi Maxim,
yes the plan is to have a cache of hot values that uses the managed memory 
abstraction of Flink so that we can make sure that we stay within memory bounds 
and don't run into OOM exceptions.

On Tue, 12 Apr 2016 at 23:37 Maxim 
> wrote:
Is it possible to add an option to store the state in the Java HashMap and 
write its content to RocksDB when checkpointing? For "hot" keys that are 
updated very frequently such optimization would help with performance.

I know that you are also working on incremental checkpoints which would also be 
big win for jobs with a large number of keys.

Thanks,

Maxim.

On Tue, Apr 12, 2016 at 10:39 AM, Stephan Ewen 
> wrote:
Concerning the size of RocksDB snapshots - I am wondering if RocksDB simply 
does not compact for a long time, thus having a lot of stale data in the 
snapshot.

That would be especially the case, if you have a lot of changing values for the 
same set of keys.

On Tue, Apr 12, 2016 at 6:41 PM, Aljoscha Krettek 
> wrote:
Hi,
I'm going to try and respond to each point:

1. This seems strange, could you give some background on parallelism, number of 
operators with state and so on? Also, I'm assuming you are using the 
partitioned state abstraction, i.e. getState(), correct?

2. your observations are pretty much correct. The reason why RocksDB is slower 
is that the FsStateBackend basically stores the state in a Java HashMap and 
writes the contents to HDFS when checkpointing. RocksDB stores data in on-disk 
files and goes to them for every state access (of course there are caches, but 
generally it is like this). I'm actually impressed that it is still this fast 
in comparison.

3. see 1. (I think for now)

4. The checkpointing time is the time from the JobManager deciding to start a 
checkpoint until all tasks have confirmed that checkpoint. I have seen this 
before and I think it results from back pressure. The problem is that the 
checkpoint messages that we sent through the topology are sitting at the 
sources because they are also back pressured by the slow processing of normal 
records. You should be able to see the actual checkpointing times (both 
synchronous and asynchronous) in the log files of the task managers, they 
should be very much lower.

I can go into details, I'm just writing this quickly before calling it a day. 
:-)

Cheers,
Aljoscha

On Tue, 12 Apr 2016 at 18:21 Konstantin Knauf 
> wrote:
Hi everyone,

my experience with RocksDBStatebackend have left me a little bit
confused. Maybe you guys can confirm that my epxierence is the expected
behaviour ;):

I have run a "performancetest" twice, once with FsStateBackend and once
RocksDBStatebackend in comparison. In this particular test the state
saved is generally not large (in a production scenario it will be larger.)

These are my observations:

1. Minimal Checkpoint Size (no records) with RocksDB was 33MB compared
to <<1MB with the FSStatebackend.

2. Throughput dropped from 28k/s -> 18k/s on a small cluster.

3. Checkpoint sizes as reported in the Dashboard was ca. 1MB for
FsStatebackend but >100MB for RocksDbStatebackend. I hope the difference
gets smaller for very large state. Can you confirm?

4. Checkpointing Times as reported in the Dashboard were 26secs for
RocksDB during the test and <1 second for FsStatebackend. Does the
reported time correspond to the sync. + asynchronous part of the
checkpointing in case of RocksDB? Is there any way to tell how long the
synchronous part takes?

Form these first observations RocksDB does seem to bring a large
overhead for state < 1GB, I guess? Is this expected?

Cheers,

Konstantin




Re: State in external db (dynamodb)

2016-04-07 Thread Shannon Carey
Thanks very kindly for your response, Stephan!

We will definitely use a custom sink for persistence of idempotent mutations 
whenever possible. Exposing state as read-only to external systems is a 
complication we will try to avoid. Also, we will definitely only write to the 
DB upon checkpoint, and the write will be synchronous and transactional (no 
possibility of partial success/failure).

However, we do want Flink state to be durable, we want it to be in memory when 
possible, and we want to avoid running out of memory due to the size of the 
state. For example, if you have a wide window that hasn't gotten an event for a 
long time, we want to evict that window state from memory. We're now thinking 
of using Redis (via AWS Elasticache) which also conveniently has TTL, instead 
of DynamoDB.

I just wanted to check whether eviction of (inactive/quiet) state from memory 
is something that I should consider implementing, or whether Flink already had 
some built-in way of doing it.

Along the same lines, I am also wondering whether Flink already has means of 
compacting the state of a window by applying an aggregation function to the 
elements so-far (eg. every time window is triggered)? For example, if you are 
only executing a sum on the contents of the window, the window state doesn't 
need to store all the individual items in the window, it only needs to store 
the sum. Aggregations other than "sum" might have that characteristic too. I 
don't know if Flink is already that intelligent or whether I should figure out 
how to aggregate window contents myself when possible with something like a 
window fold? Another poster (Aljoscha) was talking about adding incremental 
snapshots, but it sounds like that would only improve the write throughput not 
the memory usage.

Thanks again!
Shannon Carey


From: Stephan Ewen <se...@apache.org<mailto:se...@apache.org>>
Date: Wednesday, April 6, 2016 at 10:37 PM
To: <user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Re: State in external db (dynamodb)

Hi Shannon!

Welcome to the Flink community!

You are right, sinks need in general to be idempotent if you want 
"exactly-once" semantics, because there can be a replay of elements that were 
already written.

However, what you describe later, overwriting of a key with a new value (or the 
same value again) is pretty much sufficient. That means that when a duplicate 
write happens during replay, the value for the key is simply overwritten with 
the same value again.
As long as all computation is purely in Flink and you only write to the 
key/value store (rather than read from k/v, modify in Flink, write to k/v), you 
get the consistency that for example counts/aggregates never have duplicates.

If Flink needs to look up state from the database (because it is no longer in 
Flink), it is a bit more tricky. I assume that is where you are going with 
"Subsequently, when an event is processed, we must be able to quickly load up 
any evicted state".  In that case, there are two things you can do:

(1)  Only write to the DB upon a checkpoint, at which point it is known that no 
replay of that data will occur any more. Values from partially successful 
writes will be overwritten with correct value. I assume that is what you 
thought of when referring to the State Backend, because in some sense, that is 
what that state backend would do.

I think it is simpler to realize that in a custom sink, than developing a new 
state backend.  Another Flink committer (Chesnay) has developed some nice 
tooling for that, to be merged into Flink soon.

(2) You could attach version numbers to every write, and increment the versions 
upon each checkpoint. That allows you to always refer to a consistent previous 
value, if some writes were made, but a failure occurred before the checkpoint 
completed.

I hope these answers apply to your case. Let us know if some things are still 
unclear, or if I misunderstood your question!


Greetings,
Stephan



On Wed, Apr 6, 2016 at 8:14 AM, Sanne de Roever 
<sanne.de.roe...@gmail.com<mailto:sanne.de.roe...@gmail.com>> wrote:
FYI Cassandra has a TTL on data: 
https://docs.datastax.com/en/cql/3.1/cql/cql_using/use_expire_t.html

On Wed, Apr 6, 2016 at 7:55 AM, Shannon Carey 
<sca...@expedia.com<mailto:sca...@expedia.com>> wrote:
Hi, new Flink user here!

I found a discussion on user@flink.apache.org<mailto:user@flink.apache.org> 
about using DynamoDB as a sink. However, as noted, sinks have an at-least-once 
guarantee so your operations must idempotent.

However, another way to go about this (and correct me if I'm wrong) is to write 
the state to the external store via a custom State Backend. Since the state 
participates in checkpointing, you don't have to worry about idempotency: every 
time state is checkpointed, overwrite the value of that key.

We are starting a project with Flink, and we are interested 

Re: Running Flink jobs directly from Eclipse

2016-04-06 Thread Shannon Carey
Thanks for the info! It is a bit difficult to tell based on the documentation 
whether or not you need to put your jar onto the Flink master node and run the 
flink command from there in order to get a job running. The documentation on 
https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/yarn_setup.html
 isn't very explicit about where you can run the flink command from, and 
doesn't mention that you can run the job programmatically instead of using the 
CLI.

From: Christophe Salperwyck 
>
Date: Wednesday, April 6, 2016 at 1:24 PM
To: >
Subject: Re: Running Flink jobs directly from Eclipse

From my side I was starting the YARN session from the cluster:
flink-0.10.1/bin/yarn-session.sh -n 64 -s 4 -jm 4096 -tm 4096

Then getting the IP/port from the WebUI and then from Eclipse:
ExecutionEnvironment env = 
ExecutionEnvironment.createRemoteEnvironment("xx.xx.xx.xx", 40631, 
"target/FlinkTest-0.0.1-SNAPSHOT-jar-with-dependencies.jar");

The JAR need to be compiled before.

Hope it helps!
Christophe

2016-04-06 9:25 GMT+02:00 Serhiy Boychenko 
>:
Cheerz,

I have been working last few month on the comparison of different data 
processing engines and recently came across Apache Flink. After reading 
different academic papers on comparison of Flink with other data processing I 
would definitely give it a shot. The only issue I am currently having is that I 
am unable to submit Flink jobs directly from Eclipse (to YARN cluster). I am 
wondering if you got any guildelines how I could do the submission not from the 
client but from Eclipse directly? (I was unable to find anything related, with 
the exception of setting up Eclipse for working on Flink core)

Best regards,
Serhiy.




State in external db (dynamodb)

2016-04-05 Thread Shannon Carey
Hi, new Flink user here!

I found a discussion on user@flink.apache.org about using DynamoDB as a sink. 
However, as noted, sinks have an at-least-once guarantee so your operations 
must idempotent.

However, another way to go about this (and correct me if I'm wrong) is to write 
the state to the external store via a custom State Backend. Since the state 
participates in checkpointing, you don't have to worry about idempotency: every 
time state is checkpointed, overwrite the value of that key.

We are starting a project with Flink, and we are interested in evicting the 
state from memory once a TTL is reached during which no events have come in for 
that state. Subsequently, when an event is processed, we must be able to 
quickly load up any evicted state. Does this sound reasonable? We are 
considering using DynamoDB for our state backend because it seems like all we 
will need is a key-value store. The only weakness of this is that if state gets 
older than, say, 2 years we would like to get rid of it which might not be easy 
in DynamoDB. I don't suppose Flink has any behind-the-scenes features that deal 
with getting rid of old state (either evicting from memory or TTL/aging out 
entirely)?

Thanks for your time!
Shannon Carey