Re: Failed job restart - flink on yarn

2016-07-01 Thread vpra...@gmail.com
Hi Jamie,

Thanks for the reply.

Yeah i looked at save points, i want to start my job only from the last
checkpoint, this means I have to keep track of when the checkpoint was
taken and the trigger a save point. I am not sure this is the way to go. My
state backend is HDFS and I can see that the checkpoint path has the data
that has been buffered in the window.

I want to start the job in a way such that it will read the checkpointed
data before the failure and continue processing.

I realise that the checkpoints are used whenever there is a container
failure, and a new container is obtained. In my case the job failed because
a container failed for the maximum AllowedN umber of failures

Thanks,
Prabhu

On Fri, Jul 1, 2016 at 3:54 PM, Jamie Grier [via Apache Flink User Mailing
List archive.]  wrote:

> Hi Prabhu,
>
> Have you taken a look at Flink's savepoints feature?  This allows you to
> make snapshots of your job's state on demand and then at any time restart
> your job from that point:
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/savepoints.html
>
> Also know that you can use Flink disk-backed state backend as well if
> you're job state is larger than fits in memory.  See
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/state_backends.html#the-rocksdbstatebackend
>
>
> -Jamie
>
>
> On Fri, Jul 1, 2016 at 1:34 PM, [hidden email]
>  <[hidden email]
> > wrote:
>
>> Hi,
>>
>> I have a flink streaming job that reads from kafka, performs a aggregation
>> in a window, it ran fine for a while however when the number of events in
>> a
>> window crossed a certain limit , the yarn containers failed with Out Of
>> Memory. The job was running with 10G containers.
>>
>> We have about 64G memory on the machine and now I want to restart the job
>> with a 20G container (we ran some tests and 20G should be good enough to
>> accomodate all the elements from the window).
>>
>> Is there a way to restart the job from the last checkpoint ?
>>
>> When I resubmit the job, it starts from the last committed offsets however
>> the events that were held in the window at the time of checkpointing seem
>> to
>> get lost. Is there a way to recover the events buffered within the window
>> and were checkpointed before the failure ?
>>
>> Thanks,
>> Prabhu
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Failed-job-restart-flink-on-yarn-tp7764.html
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive at Nabble.com.
>>
>
>
>
> --
>
> Jamie Grier
> data Artisans, Director of Applications Engineering
> @jamiegrier 
> [hidden email] 
>
>
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Failed-job-restart-flink-on-yarn-tp7764p7767.html
> To unsubscribe from Failed job restart - flink on yarn, click here
> 
> .
> NAML
> 
>




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Failed-job-restart-flink-on-yarn-tp7764p7771.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Re: Using standalone single node without HA in production, crazy?

2016-07-01 Thread Ryan Crumley
Jamie,

Thank you for your insight. To answer your questions I am running on AWS
and have access to S3. Further I already have Zookeeper in the mix (its
used by Mesos as well as Kafka). I was hoping to avoid the complexities of
an automated HA setup by running a single jvm and then migrate to HA down
the road. It sounds like I can't have my cake and eat it too (yet). =)

Ryan

On Fri, Jul 1, 2016 at 7:22 PM Jamie Grier  wrote:

> I started to answer these questions and then realized I was making an
> assumption about your environment.  Do you have a reliable persistent file
> system such as HDFS or S3 at your disposal or do you truly mean to run on a
> single node?
>
> If the you are truly thinking to run on a single node only there's no way
> to make this guaranteed to be reliable.  You would be open to machine and
> disk failures, etc.
>
> I think the minimal reasonable production setup must use at least 3
> physical nodes with the following services running:
>
> 1) HDFS or some other reliable filesystem (for persistent state storage)
> 2) Zookeeper for the Flink HA JobManager setup
>
> The rest is configuration..
>
> With regard to scaling up after your initial deployment:  right now in the
> latest Flink release (1.0.3) you cannot stop and restart a job with a
> different parallelism without losing your computed state.  What this means
> is that if you know you will likely scale up and you don't want to lose
> that state you can provision many, many slots on the TaskManagers you do
> run, essentially over-provisioning them, and run your job now with the max
> parallelism you expect to need to scale to.  This will all be much simpler
> to do in future Flink versions (though not in 1.1) but for now this would
> be a decent approach.
>
> In Flink versions after 1.1 Flink will be able to scale parallelism up and
> down while preserving all of the previously computed state.
>
> -Jamie
>
>
> On Fri, Jul 1, 2016 at 6:41 AM, Ryan Crumley  wrote:
>
>> Hi,
>>
>> I am evaluating flink for use in stateful streaming application. Some
>> information about the intended use:
>>
>>  - Will run in a mesos cluster and deployed via marathon in a docker
>> container
>>  - Initial throughput ~ 100 messages per second (from kafka)
>>  - Will need to scale to 10x that soon after launch
>>  - State will be much larger than memory available
>>
>> In order to quickly get this out the door I am considering postponing the
>> YARN / HA setup of a cluster with the idea that the current application can
>> easily fit within a single jvm and handle the throughput. Hopefully by the
>> time I need more scale flink support for mesos will be available and I can
>> use that to distribute the job to the cluster with minimal code rewrite.
>>
>> Questions:
>> 1. Is this a viable approach? Any pitfalls to be aware of?
>>
>> 2. What is the correct term for this deployment mode? Single node
>> standalone? Local?
>>
>> 3. Will the RocksDB state backend work in a single jvm mode?
>>
>> 4. When the single jvm process becomes unhealthy and is restarted by
>> marathon will flink recover appropriately or is failure recovery a function
>> of HA?
>>
>> 5. How would I migrate the RocksDB state once I move to HA mode? Is there
>> a straight forward path?
>>
>> Thanks for your time,
>>
>> Ryan
>>
>
>
>
> --
>
> Jamie Grier
> data Artisans, Director of Applications Engineering
> @jamiegrier 
> ja...@data-artisans.com
>
>


Re: Parameters to Control Intra-node Parallelism

2016-07-01 Thread Saliya Ekanayake
Hi Ufuk,

Looking at the document you sent it seems only 1 task manager per node
exist and within that you have multiple slots. Is it possible to run more
than 1 task manager per node? Also, within a task manager is the
parallelism done through threads or processes?

Thank you,
Saliya

On Thu, Jun 30, 2016 at 5:27 PM, Saliya Ekanayake  wrote:

> Thank you, I'll check these.
>
> In 2.) you said they are likely to exchange through memory. Is there a
> case why they wouldn't?
>
> On Thu, Jun 30, 2016 at 5:03 AM, Ufuk Celebi  wrote:
>
>> On Thu, Jun 30, 2016 at 1:44 AM, Saliya Ekanayake 
>> wrote:
>> > 1. What parameters are available to control parallelism within a node?
>>
>> Task Manager processing slots:
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/config.html#configuring-taskmanager-processing-slots
>>
>> > 2. Does Flink support shared memory-based messaging within a node
>> (without
>> > doing TCP calls)?
>>
>> Yes, local exchanges happen via memory and not TCP, for example if you
>> have a map-reduce, map subtask 1 and reduce subtask 1 are likely to
>> exchange data locally.
>>
>> > 3. Is there support for Infiniband interconnect?
>>
>> No, not that I'm aware of.
>>
>> – Ufuk
>>
>
>
>
> --
> Saliya Ekanayake
> Ph.D. Candidate | Research Assistant
> School of Informatics and Computing | Digital Science Center
> Indiana University, Bloomington
>
>


-- 
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington


Re: Using standalone single node without HA in production, crazy?

2016-07-01 Thread Jamie Grier
I started to answer these questions and then realized I was making an
assumption about your environment.  Do you have a reliable persistent file
system such as HDFS or S3 at your disposal or do you truly mean to run on a
single node?

If the you are truly thinking to run on a single node only there's no way
to make this guaranteed to be reliable.  You would be open to machine and
disk failures, etc.

I think the minimal reasonable production setup must use at least 3
physical nodes with the following services running:

1) HDFS or some other reliable filesystem (for persistent state storage)
2) Zookeeper for the Flink HA JobManager setup

The rest is configuration..

With regard to scaling up after your initial deployment:  right now in the
latest Flink release (1.0.3) you cannot stop and restart a job with a
different parallelism without losing your computed state.  What this means
is that if you know you will likely scale up and you don't want to lose
that state you can provision many, many slots on the TaskManagers you do
run, essentially over-provisioning them, and run your job now with the max
parallelism you expect to need to scale to.  This will all be much simpler
to do in future Flink versions (though not in 1.1) but for now this would
be a decent approach.

In Flink versions after 1.1 Flink will be able to scale parallelism up and
down while preserving all of the previously computed state.

-Jamie


On Fri, Jul 1, 2016 at 6:41 AM, Ryan Crumley  wrote:

> Hi,
>
> I am evaluating flink for use in stateful streaming application. Some
> information about the intended use:
>
>  - Will run in a mesos cluster and deployed via marathon in a docker
> container
>  - Initial throughput ~ 100 messages per second (from kafka)
>  - Will need to scale to 10x that soon after launch
>  - State will be much larger than memory available
>
> In order to quickly get this out the door I am considering postponing the
> YARN / HA setup of a cluster with the idea that the current application can
> easily fit within a single jvm and handle the throughput. Hopefully by the
> time I need more scale flink support for mesos will be available and I can
> use that to distribute the job to the cluster with minimal code rewrite.
>
> Questions:
> 1. Is this a viable approach? Any pitfalls to be aware of?
>
> 2. What is the correct term for this deployment mode? Single node
> standalone? Local?
>
> 3. Will the RocksDB state backend work in a single jvm mode?
>
> 4. When the single jvm process becomes unhealthy and is restarted by
> marathon will flink recover appropriately or is failure recovery a function
> of HA?
>
> 5. How would I migrate the RocksDB state once I move to HA mode? Is there
> a straight forward path?
>
> Thanks for your time,
>
> Ryan
>



-- 

Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier 
ja...@data-artisans.com


Re: Failed job restart - flink on yarn

2016-07-01 Thread Jamie Grier
Hi Prabhu,

Have you taken a look at Flink's savepoints feature?  This allows you to
make snapshots of your job's state on demand and then at any time restart
your job from that point:
https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/savepoints.html

Also know that you can use Flink disk-backed state backend as well if
you're job state is larger than fits in memory.  See
https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/state_backends.html#the-rocksdbstatebackend


-Jamie


On Fri, Jul 1, 2016 at 1:34 PM, vpra...@gmail.com  wrote:

> Hi,
>
> I have a flink streaming job that reads from kafka, performs a aggregation
> in a window, it ran fine for a while however when the number of events in a
> window crossed a certain limit , the yarn containers failed with Out Of
> Memory. The job was running with 10G containers.
>
> We have about 64G memory on the machine and now I want to restart the job
> with a 20G container (we ran some tests and 20G should be good enough to
> accomodate all the elements from the window).
>
> Is there a way to restart the job from the last checkpoint ?
>
> When I resubmit the job, it starts from the last committed offsets however
> the events that were held in the window at the time of checkpointing seem
> to
> get lost. Is there a way to recover the events buffered within the window
> and were checkpointed before the failure ?
>
> Thanks,
> Prabhu
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Failed-job-restart-flink-on-yarn-tp7764.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>



-- 

Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier 
ja...@data-artisans.com


Re: Error submitting stand-alone Flink job to EMR YARN cluster

2016-07-01 Thread Jamie Grier
I know this is really basic but have you verified that you're Flink lib
folder contains log4j-1.2.17.jar?  I imagine that's fine given the
yarn-session.sh approach is working fine.  What version of EMR are you
running?  What version of Flink?

-Jamie


On Thu, Jun 30, 2016 at 11:28 AM, Hanson, Bruce 
wrote:

> I’m trying to submit a stand-alone Flink job to a YARN cluster running on
> EMR (Elastic MapReduce) nodes in AWS. When it tries to start a container
> for the Job Manager, it fails. The error message from the container is
> below. The command I’m using is:
>
> $ flink run -m yarn-cluster -yn 1 -ynm test1:WordCount
> ./flink-1.0.3/examples/streaming/WordCount.jar
>
> I have tried adding log4j and slf4j libraries to the classpath using -C
> and that doesn’t help.
>
> This does not happen on other YARN clusters I have that are not EMR nodes.
> And it doesn’t happen on my EMR cluster if I use "yarn-session.sh" to
> create a Flink cluster in the YARN cluster and then use “flink run …” to
> submit the job to the Flink cluster.
>
> Does anyone out there know how I could fix this?
>
> Thanks in advance for any help you can give.
>
> Error message in the jobmanager.err file:
>
> SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
>
> SLF4J: Defaulting to no-operation (NOP) logger implementation
>
> SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further
> details.
>
> Exception in thread "main" java.lang.NoClassDefFoundError:
> org/apache/log4j/Level
>
> at org.apache.hadoop.mapred.JobConf.(JobConf.java:357)
>
> at java.lang.Class.forName0(Native Method)
>
> at java.lang.Class.forName(Class.java:278)
>
> at
> org.apache.hadoop.conf.Configuration.getClassByNameOrNull(Configuration.java:2134)
>
> at
> org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:95)
>
> at
> org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:78)
>
> at
> org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:136)
>
> at org.apache.hadoop.security.Groups.(Groups.java:79)
>
> at org.apache.hadoop.security.Groups.(Groups.java:74)
>
> at
> org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:303)
>
> at
> org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:283)
>
> at
> org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:260)
>
> at
> org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(UserGroupInformation.java:790)
>
> at
> org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:760)
>
> at
> org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:633)
>
> at
> org.apache.flink.yarn.ApplicationMasterBase.run(ApplicationMasterBase.scala:64)
>
> at
> org.apache.flink.yarn.ApplicationMaster$.main(ApplicationMaster.scala:36)
>
> at
> org.apache.flink.yarn.ApplicationMaster.main(ApplicationMaster.scala)
>
> Caused by: java.lang.ClassNotFoundException: org.apache.log4j.Level
>
> at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>
> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>
> at java.security.AccessController.doPrivileged(Native Method)
>
> at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>
> at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>
> at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>
> ... 18 more
>
>
> *Bruce Hanson*
>
> Software Engineer
>
> HERE Predictive Analytics
>
>
>
> *HERE Seattle*
>
> 701 Pike St, Suite 2000, Seattle, WA 98101
>
> *47° 36' 41" N. 122° 19' 57" W
> *
>
>
>
>    
> 
>   
>
>
>
>


-- 

Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier 
ja...@data-artisans.com


Re: Different results on local and on cluster

2016-07-01 Thread Flavio Pompermaier
what do you mean exactly?
On 1 Jul 2016 18:58, "Aljoscha Krettek"  wrote:

> Hi,
> do you have any data in the coGroup/groupBy operators that you use,
> besides the input data?
>
> Cheers,
> Aljoscha
>
> On Fri, 1 Jul 2016 at 14:17 Flavio Pompermaier 
> wrote:
>
>> Hi to all,
>> I have a Flink job that computes data correctly when launched locally
>> from my IDE while it doesn't when launched on the cluster.
>>
>> Is there any suggestion/example to understand the problematic operators
>> in this way?
>> I think the root cause is the fact that some operator (e.g.
>> coGroup/groupBy,etc), which I assume to have all the data for a key, maybe
>> it is not (because the data is partitioned among nodes).
>>
>> Any help is appreciated,
>> Flavio
>>
>


Re: How to avoid breaking states when upgrading Flink job?

2016-07-01 Thread Aljoscha Krettek
Ah, this might be in code that runs at a different layer from the
StateBackend. Can you maybe pinpoint which of your user classes is this
anonymous class and where it is used? Maybe by replacing them by
non-anonymous classes and checking which replacement fixes the problem.

-
Aljoscha

On Fri, 1 Jul 2016 at 16:27 Josh  wrote:

> I've just double checked and I do still get the ClassNotFound error for an
> anonymous class, on a job which uses the RocksDBStateBackend.
>
> In case it helps, this was the full stack trace:
>
> java.lang.RuntimeException: Failed to deserialize state handle and setup 
> initial operator state.
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:551)
>   at java.lang.Thread.run(Thread.
>
> java:745)
> Caused by: java.lang.ClassNotFoundException: 
> com.me.flink.job.MyJob$$anon$1$$anon$4$$anon$3
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>   at java.lang.Class.forName0(Native Method)
>   at java.lang.Class.forName(Class.java:348)
>   at 
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64)
>   at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1620)
>   at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1781)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
>   at 
> java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:503)
>   at 
> org.apache.flink.api.common.state.StateDescriptor.readObject(StateDescriptor.java:268)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1909)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
>   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
>   at java.util.ArrayList.readObject(ArrayList.java:791)
>   at sun.reflect.GeneratedMethodAccessor51.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1909)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
>   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
>   at java.util.HashMap.readObject(HashMap.java:1396)
>   at sun.reflect.GeneratedMethodAccessor27.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1909)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
>   at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1714)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
>   at 
> 

Re: Flink streaming connect and split streams

2016-07-01 Thread Aljoscha Krettek
Hi,
I'm afraid the only way to do it right now is using the wrapper that can
contain both, as you suggested.

Cheers,
Aljoscha

On Thu, 30 Jun 2016 at 16:50 Martin Neumann  wrote:

> Hej,
>
> I'm currently playing around with some machine learning algorithms in
> Flink streaming.
>
> I have an input stream that I partition by key and then do a map on each
> of the keys, feeding a model and producing a prediction output.
> Periodically each operator needs to send model updates to all other
> operators.
>
> What is the best way to implement the structure?
>
> My current idea is to use the CoMap function as operator. The first stream
> is the raw data the second stream the model updates which I could just
> broadcast from the iterative stream. My problem right now is that I need
> the CoMap to basically have 2 Streams as output the model updates and the
> prediction results.
>
> I could write a wrapper class containing both output types but that would
> require me to separate them afterwards. This feels very clunky, is there a
> better way of dealing with this?
>
> cheers Martin
>


Re: How to avoid breaking states when upgrading Flink job?

2016-07-01 Thread Josh
I've just double checked and I do still get the ClassNotFound error for an
anonymous class, on a job which uses the RocksDBStateBackend.

In case it helps, this was the full stack trace:

java.lang.RuntimeException: Failed to deserialize state handle and
setup initial operator state.
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:551)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException:
com.me.flink.job.MyJob$$anon$1$$anon$4$$anon$3
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at 
org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64)
at 
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1620)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1781)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
at 
java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:503)
at 
org.apache.flink.api.common.state.StateDescriptor.readObject(StateDescriptor.java:268)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1909)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
at java.util.ArrayList.readObject(ArrayList.java:791)
at sun.reflect.GeneratedMethodAccessor51.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1909)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
at java.util.HashMap.readObject(HashMap.java:1396)
at sun.reflect.GeneratedMethodAccessor27.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1909)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1714)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290)
at 
org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:58)
at 

Using standalone single node without HA in production, crazy?

2016-07-01 Thread Ryan Crumley
Hi,

I am evaluating flink for use in stateful streaming application. Some
information about the intended use:

 - Will run in a mesos cluster and deployed via marathon in a docker
container
 - Initial throughput ~ 100 messages per second (from kafka)
 - Will need to scale to 10x that soon after launch
 - State will be much larger than memory available

In order to quickly get this out the door I am considering postponing the
YARN / HA setup of a cluster with the idea that the current application can
easily fit within a single jvm and handle the throughput. Hopefully by the
time I need more scale flink support for mesos will be available and I can
use that to distribute the job to the cluster with minimal code rewrite.

Questions:
1. Is this a viable approach? Any pitfalls to be aware of?

2. What is the correct term for this deployment mode? Single node
standalone? Local?

3. Will the RocksDB state backend work in a single jvm mode?

4. When the single jvm process becomes unhealthy and is restarted by
marathon will flink recover appropriately or is failure recovery a function
of HA?

5. How would I migrate the RocksDB state once I move to HA mode? Is there a
straight forward path?

Thanks for your time,

Ryan


Different results on local and on cluster

2016-07-01 Thread Flavio Pompermaier
Hi to all,
I have a Flink job that computes data correctly when launched locally from
my IDE while it doesn't when launched on the cluster.

Is there any suggestion/example to understand the problematic operators in
this way?
I think the root cause is the fact that some operator (e.g.
coGroup/groupBy,etc), which I assume to have all the data for a key, maybe
it is not (because the data is partitioned among nodes).

Any help is appreciated,
Flavio


Re: How to avoid breaking states when upgrading Flink job?

2016-07-01 Thread Josh
Thanks guys, that's very helpful info!

@Aljoscha I thought I saw this exception on a job that was using the
RocksDB state backend, but I'm not sure. I will do some more tests today to
double check. If it's still a problem I'll try the explicit class
definitions solution.

Josh

On Thu, Jun 30, 2016 at 1:27 PM, Aljoscha Krettek 
wrote:

> Also, you're using the FsStateBackend, correct?
>
> Reason I'm asking is that the problem should not occur for the RocksDB
> state backend. There, we don't serialize any user code, only binary data. A
> while back I wanted to change the FsStateBackend to also work like this.
> Now might be a good time to actually do this. :-)
>
> On Thu, 30 Jun 2016 at 14:10 Till Rohrmann  wrote:
>
>> Hi Josh,
>>
>> you could also try to replace your anonymous classes by explicit class
>> definitions. This should assign these classes a fixed name independent of
>> the other anonymous classes. Then the class loader should be able to
>> deserialize your serialized data.
>>
>> Cheers,
>> Till
>>
>> On Thu, Jun 30, 2016 at 1:55 PM, Aljoscha Krettek 
>> wrote:
>>
>>> Hi Josh,
>>> I think in your case the problem is that Scala might choose different
>>> names for synthetic/generated classes. This will trip up the code that is
>>> trying to restore from a snapshot that was done with an earlier version of
>>> the code where classes where named differently.
>>>
>>> I'm afraid I don't know how to solve this one right now, except by
>>> switching to Java.
>>>
>>> Cheers,
>>> Aljoscha
>>>
>>> On Thu, 30 Jun 2016 at 13:38 Maximilian Michels  wrote:
>>>
 Hi Josh,

 You have to assign UIDs to all operators to change the topology. Plus,
 you have to add dummy operators for all UIDs which you removed; this
 is a limitation currently because Flink will attempt to find all UIDs
 of the old job.

 Cheers,
 Max

 On Wed, Jun 29, 2016 at 9:00 PM, Josh  wrote:
 > Hi all,
 > Is there any information out there on how to avoid breaking saved
 > states/savepoints when making changes to a Flink job and redeploying
 it?
 >
 > I want to know how to avoid exceptions like this:
 >
 > java.lang.RuntimeException: Failed to deserialize state handle and
 setup
 > initial operator state.
 >   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:551)
 >   at java.lang.Thread.run(Thread.java:745)
 > Caused by: java.lang.ClassNotFoundException:
 > com.me.flink.MyJob$$anon$1$$anon$7$$anon$4
 >
 >
 > The best information I could find in the docs is here:
 >
 >
 https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/savepoints.html
 >
 >
 > Having made the suggested changes to my job (i.e. giving a uid to
 every
 > stateful sink and map function), what changes to the job/topology are
 then
 > allowed/not allowed?
 >
 >
 > If I'm 'naming' my states by providing uids, why does Flink need to
 look for
 > a specific class, like com.me.flink.MyJob$$anon$1$$anon$7$$anon$4 ?
 >
 >
 > Thanks for any advice,
 >
 > Josh

>>>
>>


Re: Is it possible to restart only the function that fails instead of entire job?

2016-07-01 Thread Ufuk Celebi
Unfortunately, this is not possible at the moment. This optimization
definitely makes sense in certain situations. How large is your state
and how long does it take to recover?

On Fri, Jul 1, 2016 at 9:18 AM, Chia-Hung Lin  wrote:
> After reading the document and configuring to test failure strategy,
> it seems to me Flink restarts the job once any failures (e.g.
> exception thrown, etc.) occur.
>
> https://ci.apache.org/projects/flink/flink-docs-master/internals/stream_checkpointing.html
>
> My question:
>
> Is it possible to configure in allowing the function that fails to
> recover instead of restarting entire job (like Erlang's One For One
> Supervision)? For instance within a job the parallelism is configured
> to 100, so at runtime 100 maps instances are executed. Now one of map
> functions fails, we want to recover the failed map function because
> other map functions are functioning normally. Is it possible to
> achieve such effect?
>
> Thanks


Is it possible to restart only the function that fails instead of entire job?

2016-07-01 Thread Chia-Hung Lin
After reading the document and configuring to test failure strategy,
it seems to me Flink restarts the job once any failures (e.g.
exception thrown, etc.) occur.

https://ci.apache.org/projects/flink/flink-docs-master/internals/stream_checkpointing.html

My question:

Is it possible to configure in allowing the function that fails to
recover instead of restarting entire job (like Erlang's One For One
Supervision)? For instance within a job the parallelism is configured
to 100, so at runtime 100 maps instances are executed. Now one of map
functions fails, we want to recover the failed map function because
other map functions are functioning normally. Is it possible to
achieve such effect?

Thanks