Re: Flink job on secure Yarn fails after many hours

2015-12-02 Thread Maximilian Michels
I mentioned that the exception gets thrown when requesting container
status information. We need this to send a heartbeat to YARN but it is
not very crucial if this fails once for the running job. Possibly, we
could work around this problem by retrying N times in case of an
exception.

Would it be possible for you to deploy a custom Flink 0.10.1 version
we provide and test again?

On Wed, Dec 2, 2015 at 4:03 PM, Niels Basjes  wrote:
> No, I was just asking.
> No upgrade is possible for the next month or two.
>
> This week is our busiest day of the year ...
> Our shop is doing about 10 orders per second these days ...
>
> So they won't upgrade until next January/February
>
> Niels
>
> On Wed, Dec 2, 2015 at 3:59 PM, Maximilian Michels  wrote:
>>
>> Hi Niels,
>>
>> You mentioned you have the option to update Hadoop and redeploy the
>> job. Would be great if you could do that and let us know how it turns
>> out.
>>
>> Cheers,
>> Max
>>
>> On Wed, Dec 2, 2015 at 3:45 PM, Niels Basjes  wrote:
>> > Hi,
>> >
>> > I posted the entire log from the first log line at the moment of failure
>> > to
>> > the very end of the logfile.
>> > This is all I have.
>> >
>> > As far as I understand the Kerberos and Keytab mechanism in Hadoop Yarn
>> > is
>> > that it catches the "Invalid Token" and then (if keytab) gets a new
>> > Kerberos
>> > ticket (or tgt?).
>> > When the new ticket has been obtained it retries the call that
>> > previously
>> > failed.
>> > To me it seemed that this call can fail over the invalid Token yet it
>> > cannot
>> > be retried.
>> >
>> > At this moment I'm thinking a bug in Hadoop.
>> >
>> > Niels
>> >
>> > On Wed, Dec 2, 2015 at 2:51 PM, Maximilian Michels 
>> > wrote:
>> >>
>> >> Hi Niels,
>> >>
>> >> Sorry for hear you experienced this exception. From a first glance, it
>> >> looks like a bug in Hadoop to me.
>> >>
>> >> > "Not retrying because the invoked method is not idempotent, and
>> >> > unable
>> >> > to determine whether it was invoked"
>> >>
>> >> That is nothing to worry about. This is Hadoop's internal retry
>> >> mechanism that re-attempts to do actions which previously failed if
>> >> that's possible. Since the action is not idempotent (it cannot be
>> >> executed again without risking to change the state of the execution)
>> >> and it also doesn't track its execution states, it won't be retried
>> >> again.
>> >>
>> >> The main issue is this exception:
>> >>
>> >> >org.apache.hadoop.security.token.SecretManager$InvalidToken: Invalid
>> >> > AMRMToken from >appattempt_1443166961758_163901_01
>> >>
>> >> From the stack trace it is clear that this exception occurs upon
>> >> requesting container status information from the Resource Manager:
>> >>
>> >> >at
>> >> >
>> >> > org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnMessage$1.applyOrElse(YarnJobManager.scala:259)
>> >>
>> >> Are there any more exceptions in the log? Do you have the complete
>> >> logs available and could you share them?
>> >>
>> >>
>> >> Best regards,
>> >> Max
>> >>
>> >> On Wed, Dec 2, 2015 at 11:47 AM, Niels Basjes  wrote:
>> >> > Hi,
>> >> >
>> >> >
>> >> > We have a Kerberos secured Yarn cluster here and I'm experimenting
>> >> > with
>> >> > Apache Flink on top of that.
>> >> >
>> >> > A few days ago I started a very simple Flink application (just stream
>> >> > the
>> >> > time as a String into HBase 10 times per second).
>> >> >
>> >> > I (deliberately) asked our IT-ops guys to make my account have a max
>> >> > ticket
>> >> > time of 5 minutes and a max renew time of 10 minutes (yes,
>> >> > ridiculously
>> >> > low
>> >> > timeout values because I needed to validate this
>> >> > https://issues.apache.org/jira/browse/FLINK-2977).
>> >> >
>> >> > This job is started with a keytab file and after running for 31 hours
>> >> > it
>> >> > suddenly failed with the exception you see below.
>> >> >
>> >> > I had the same job running for almost 400 hours until that failed too
>> >> > (I
>> >> > was
>> >> > too late to check the logfiles but I suspect the same problem).
>> >> >
>> >> >
>> >> > So in that time span my tickets have expired and new tickets have
>> >> > been
>> >> > obtained several hundred times.
>> >> >
>> >> >
>> >> > The main error I see is that in the process of a ticket expiring and
>> >> > being
>> >> > renewed I see this message:
>> >> >
>> >> >  Not retrying because the invoked method is not idempotent, and
>> >> > unable
>> >> > to determine whether it was invoked
>> >> >
>> >> >
>> >> > Yarn on the cluster is 2.6.0 ( HDP 2.6.0.2.2.4.2-2 )
>> >> >
>> >> > Flink is version 0.10.1
>> >> >
>> >> >
>> >> > How do I fix this?
>> >> > Is this a bug (in either Hadoop or Flink) or am I doing something
>> >> > wrong?
>> >> > Would upgrading Yarn to 2.7.1  (i.e. HDP 2.3) fix this?
>> >> >
>> >> >
>> >> > Niels Basjes
>> >> >
>> >> >
>> >> >
>> >> > 21:30:27,821 WARN  

Re: Flink job on secure Yarn fails after many hours

2015-12-02 Thread Maximilian Michels
Hi Niels,

You mentioned you have the option to update Hadoop and redeploy the
job. Would be great if you could do that and let us know how it turns
out.

Cheers,
Max

On Wed, Dec 2, 2015 at 3:45 PM, Niels Basjes  wrote:
> Hi,
>
> I posted the entire log from the first log line at the moment of failure to
> the very end of the logfile.
> This is all I have.
>
> As far as I understand the Kerberos and Keytab mechanism in Hadoop Yarn is
> that it catches the "Invalid Token" and then (if keytab) gets a new Kerberos
> ticket (or tgt?).
> When the new ticket has been obtained it retries the call that previously
> failed.
> To me it seemed that this call can fail over the invalid Token yet it cannot
> be retried.
>
> At this moment I'm thinking a bug in Hadoop.
>
> Niels
>
> On Wed, Dec 2, 2015 at 2:51 PM, Maximilian Michels  wrote:
>>
>> Hi Niels,
>>
>> Sorry for hear you experienced this exception. From a first glance, it
>> looks like a bug in Hadoop to me.
>>
>> > "Not retrying because the invoked method is not idempotent, and unable
>> > to determine whether it was invoked"
>>
>> That is nothing to worry about. This is Hadoop's internal retry
>> mechanism that re-attempts to do actions which previously failed if
>> that's possible. Since the action is not idempotent (it cannot be
>> executed again without risking to change the state of the execution)
>> and it also doesn't track its execution states, it won't be retried
>> again.
>>
>> The main issue is this exception:
>>
>> >org.apache.hadoop.security.token.SecretManager$InvalidToken: Invalid
>> > AMRMToken from >appattempt_1443166961758_163901_01
>>
>> From the stack trace it is clear that this exception occurs upon
>> requesting container status information from the Resource Manager:
>>
>> >at
>> > org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnMessage$1.applyOrElse(YarnJobManager.scala:259)
>>
>> Are there any more exceptions in the log? Do you have the complete
>> logs available and could you share them?
>>
>>
>> Best regards,
>> Max
>>
>> On Wed, Dec 2, 2015 at 11:47 AM, Niels Basjes  wrote:
>> > Hi,
>> >
>> >
>> > We have a Kerberos secured Yarn cluster here and I'm experimenting with
>> > Apache Flink on top of that.
>> >
>> > A few days ago I started a very simple Flink application (just stream
>> > the
>> > time as a String into HBase 10 times per second).
>> >
>> > I (deliberately) asked our IT-ops guys to make my account have a max
>> > ticket
>> > time of 5 minutes and a max renew time of 10 minutes (yes, ridiculously
>> > low
>> > timeout values because I needed to validate this
>> > https://issues.apache.org/jira/browse/FLINK-2977).
>> >
>> > This job is started with a keytab file and after running for 31 hours it
>> > suddenly failed with the exception you see below.
>> >
>> > I had the same job running for almost 400 hours until that failed too (I
>> > was
>> > too late to check the logfiles but I suspect the same problem).
>> >
>> >
>> > So in that time span my tickets have expired and new tickets have been
>> > obtained several hundred times.
>> >
>> >
>> > The main error I see is that in the process of a ticket expiring and
>> > being
>> > renewed I see this message:
>> >
>> >  Not retrying because the invoked method is not idempotent, and
>> > unable
>> > to determine whether it was invoked
>> >
>> >
>> > Yarn on the cluster is 2.6.0 ( HDP 2.6.0.2.2.4.2-2 )
>> >
>> > Flink is version 0.10.1
>> >
>> >
>> > How do I fix this?
>> > Is this a bug (in either Hadoop or Flink) or am I doing something wrong?
>> > Would upgrading Yarn to 2.7.1  (i.e. HDP 2.3) fix this?
>> >
>> >
>> > Niels Basjes
>> >
>> >
>> >
>> > 21:30:27,821 WARN  org.apache.hadoop.security.UserGroupInformation
>> > - PriviledgedActionException as:nbasjes (auth:SIMPLE)
>> >
>> > cause:org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
>> > Invalid AMRMToken from appattempt_1443166961758_163901_01
>> > 21:30:27,861 WARN  org.apache.hadoop.ipc.Client
>> > - Exception encountered while connecting to the server :
>> >
>> > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
>> > Invalid AMRMToken from appattempt_1443166961758_163901_01
>> > 21:30:27,861 WARN  org.apache.hadoop.security.UserGroupInformation
>> > - PriviledgedActionException as:nbasjes (auth:SIMPLE)
>> >
>> > cause:org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
>> > Invalid AMRMToken from appattempt_1443166961758_163901_01
>> > 21:30:27,891 WARN  org.apache.hadoop.io.retry.RetryInvocationHandler
>> > - Exception while invoking class
>> >
>> > org.apache.hadoop.yarn.api.impl.pb.client.ApplicationMasterProtocolPBClientImpl.allocate.
>> > Not retrying because the invoked method is not idempotent, and unable to
>> > determine whether it was invoked
>> > 

Re: Iterative queries on Flink

2015-12-02 Thread Flavio Pompermaier
Do you think it is possible to push ahead this thing? I need to implement
this interactive feature of Datasets. Do you think it is possible to
implement the persist() method in Flink (similar to Spark)? If you want I
can work on it with some instructions..

On Wed, Dec 2, 2015 at 3:05 PM, Maximilian Michels  wrote:

> Hi Flavio,
>
> I was working on this some time ago but it didn't make it in yet and
> priorities shifted a bit. The pull request is here:
> https://github.com/apache/flink/pull/640
>
> The basic idea is to remove Flink's ResultPartition buffers in memory
> lazily, i.e. keep them as long as enough memory is available. When a
> new job is resumed, it picks up the old results again. The pull
> request needs some overhaul now and the API integration is not there
> yet.
>
> Cheers,
> Max
>
> On Mon, Nov 30, 2015 at 5:35 PM, Flavio Pompermaier
>  wrote:
> > I think that with some support I could try to implement it...actually I
> just
> > need to add a persist(StorageLevel.OFF_HEAP) method to the Dataset APIs
> > (similar to what Spark does..) and output it to a tachyon directory
> > configured in the flink-conf.yml and then re-read that dataset using its
> > generated name on tachyon. Do you have other suggestions?
> >
> >
> > On Mon, Nov 30, 2015 at 4:58 PM, Fabian Hueske 
> wrote:
> >>
> >> The basic building blocks are there but I am not aware of any efforts to
> >> implement caching and add it to the API.
> >>
> >> 2015-11-30 16:55 GMT+01:00 Flavio Pompermaier :
> >>>
> >>> Is there any effort in this direction? maybe I could achieve something
> >>> like that using Tachyon in some way...?
> >>>
> >>> On Mon, Nov 30, 2015 at 4:52 PM, Fabian Hueske 
> wrote:
> 
>  Hi Flavio,
> 
>  Flink does not support caching of data sets in memory yet.
> 
>  Best, Fabian
> 
>  2015-11-30 16:45 GMT+01:00 Flavio Pompermaier :
> >
> > Hi to all,
> > I was wondering if Flink could fit a use case where a user load a
> > dataset in memory and then he/she wants to explore it interactively.
> Let's
> > say I want to load a csv, then filter out the rows where the column
> value
> > match some criteria, then apply another criteria after seeing the
> results of
> > the first filter.
> > Is there a way to keep the dataset in memory and modify it
> > interactively without re-reading all the dataset every time I want
> to chain
> > another operation to my dataset?
> >
> > Best,
> > Flavio
> 
> 
> >>>
> >>>
> >>
> >
> >
>


Way to get accumulators values *during* job execution ?

2015-12-02 Thread LINZ, Arnaud
Hello,

I use Grafana/Graphite to monitor my applications. The Flink GUI is really 
nice, but it disappears after the job completes and consequently is not 
suitable to long-term monitoring.

For batch applications, I simply send the accumulator’s values at the end of 
the job to my Graphite base.
For streaming applications, it’s more complex as the job never ends. It would 
be nice to have a way of getting current accumulator values (like in the GUI)  
to push it periodically to Graphite in a monitoring thread. Is there any API to 
get the values during execution ?

Best regards,
Arnaud



L'intégrité de ce message n'étant pas assurée sur internet, la société 
expéditrice ne peut être tenue responsable de son contenu ni de ses pièces 
jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous 
n'êtes pas destinataire de ce message, merci de le détruire et d'avertir 
l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company 
that sent this message cannot therefore be held liable for its content nor 
attachments. Any unauthorized use or dissemination is prohibited. If you are 
not the intended recipient of this message, then please delete it and notify 
the sender.


Re: Flink job on secure Yarn fails after many hours

2015-12-02 Thread Niels Basjes
Hi,

I posted the entire log from the first log line at the moment of failure to
the very end of the logfile.
This is all I have.

As far as I understand the Kerberos and Keytab mechanism in Hadoop Yarn is
that it catches the "Invalid Token" and then (if keytab) gets a new
Kerberos ticket (or tgt?).
When the new ticket has been obtained it retries the call that previously
failed.
To me it seemed that this call can fail over the invalid Token yet it
cannot be retried.

At this moment I'm thinking a bug in Hadoop.

Niels

On Wed, Dec 2, 2015 at 2:51 PM, Maximilian Michels  wrote:

> Hi Niels,
>
> Sorry for hear you experienced this exception. From a first glance, it
> looks like a bug in Hadoop to me.
>
> > "Not retrying because the invoked method is not idempotent, and unable
> to determine whether it was invoked"
>
> That is nothing to worry about. This is Hadoop's internal retry
> mechanism that re-attempts to do actions which previously failed if
> that's possible. Since the action is not idempotent (it cannot be
> executed again without risking to change the state of the execution)
> and it also doesn't track its execution states, it won't be retried
> again.
>
> The main issue is this exception:
>
> >org.apache.hadoop.security.token.SecretManager$InvalidToken: Invalid
> AMRMToken from >appattempt_1443166961758_163901_01
>
> From the stack trace it is clear that this exception occurs upon
> requesting container status information from the Resource Manager:
>
> >at
> org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnMessage$1.applyOrElse(YarnJobManager.scala:259)
>
> Are there any more exceptions in the log? Do you have the complete
> logs available and could you share them?
>
>
> Best regards,
> Max
>
> On Wed, Dec 2, 2015 at 11:47 AM, Niels Basjes  wrote:
> > Hi,
> >
> >
> > We have a Kerberos secured Yarn cluster here and I'm experimenting with
> > Apache Flink on top of that.
> >
> > A few days ago I started a very simple Flink application (just stream the
> > time as a String into HBase 10 times per second).
> >
> > I (deliberately) asked our IT-ops guys to make my account have a max
> ticket
> > time of 5 minutes and a max renew time of 10 minutes (yes, ridiculously
> low
> > timeout values because I needed to validate this
> > https://issues.apache.org/jira/browse/FLINK-2977).
> >
> > This job is started with a keytab file and after running for 31 hours it
> > suddenly failed with the exception you see below.
> >
> > I had the same job running for almost 400 hours until that failed too (I
> was
> > too late to check the logfiles but I suspect the same problem).
> >
> >
> > So in that time span my tickets have expired and new tickets have been
> > obtained several hundred times.
> >
> >
> > The main error I see is that in the process of a ticket expiring and
> being
> > renewed I see this message:
> >
> >  Not retrying because the invoked method is not idempotent, and
> unable
> > to determine whether it was invoked
> >
> >
> > Yarn on the cluster is 2.6.0 ( HDP 2.6.0.2.2.4.2-2 )
> >
> > Flink is version 0.10.1
> >
> >
> > How do I fix this?
> > Is this a bug (in either Hadoop or Flink) or am I doing something wrong?
> > Would upgrading Yarn to 2.7.1  (i.e. HDP 2.3) fix this?
> >
> >
> > Niels Basjes
> >
> >
> >
> > 21:30:27,821 WARN  org.apache.hadoop.security.UserGroupInformation
> > - PriviledgedActionException as:nbasjes (auth:SIMPLE)
> >
> cause:org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
> > Invalid AMRMToken from appattempt_1443166961758_163901_01
> > 21:30:27,861 WARN  org.apache.hadoop.ipc.Client
> > - Exception encountered while connecting to the server :
> >
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
> > Invalid AMRMToken from appattempt_1443166961758_163901_01
> > 21:30:27,861 WARN  org.apache.hadoop.security.UserGroupInformation
> > - PriviledgedActionException as:nbasjes (auth:SIMPLE)
> >
> cause:org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
> > Invalid AMRMToken from appattempt_1443166961758_163901_01
> > 21:30:27,891 WARN  org.apache.hadoop.io.retry.RetryInvocationHandler
> > - Exception while invoking class
> >
> org.apache.hadoop.yarn.api.impl.pb.client.ApplicationMasterProtocolPBClientImpl.allocate.
> > Not retrying because the invoked method is not idempotent, and unable to
> > determine whether it was invoked
> > org.apache.hadoop.security.token.SecretManager$InvalidToken: Invalid
> > AMRMToken from appattempt_1443166961758_163901_01
> >   at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> Method)
> >   at
> >
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
> >   at
> >
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> > 

Re: Flink job on secure Yarn fails after many hours

2015-12-02 Thread Maximilian Michels
Great. Here is the commit to try out:
https://github.com/mxm/flink/commit/f49b9635bec703541f19cb8c615f302a07ea88b3

If you already have the Flink repository, check it out using

git fetch https://github.com/mxm/flink/
f49b9635bec703541f19cb8c615f302a07ea88b3 && git checkout FETCH_HEAD

Alternatively, here's a direct download link to the sources with the
fix included:
https://github.com/mxm/flink/archive/f49b9635bec703541f19cb8c615f302a07ea88b3.zip

Thanks a lot,
Max

On Wed, Dec 2, 2015 at 5:44 PM, Niels Basjes  wrote:
> Sure, just give me the git repo url to build and I'll give it a try.
>
> Niels
>
> On Wed, Dec 2, 2015 at 4:28 PM, Maximilian Michels  wrote:
>>
>> I mentioned that the exception gets thrown when requesting container
>> status information. We need this to send a heartbeat to YARN but it is
>> not very crucial if this fails once for the running job. Possibly, we
>> could work around this problem by retrying N times in case of an
>> exception.
>>
>> Would it be possible for you to deploy a custom Flink 0.10.1 version
>> we provide and test again?
>>
>> On Wed, Dec 2, 2015 at 4:03 PM, Niels Basjes  wrote:
>> > No, I was just asking.
>> > No upgrade is possible for the next month or two.
>> >
>> > This week is our busiest day of the year ...
>> > Our shop is doing about 10 orders per second these days ...
>> >
>> > So they won't upgrade until next January/February
>> >
>> > Niels
>> >
>> > On Wed, Dec 2, 2015 at 3:59 PM, Maximilian Michels 
>> > wrote:
>> >>
>> >> Hi Niels,
>> >>
>> >> You mentioned you have the option to update Hadoop and redeploy the
>> >> job. Would be great if you could do that and let us know how it turns
>> >> out.
>> >>
>> >> Cheers,
>> >> Max
>> >>
>> >> On Wed, Dec 2, 2015 at 3:45 PM, Niels Basjes  wrote:
>> >> > Hi,
>> >> >
>> >> > I posted the entire log from the first log line at the moment of
>> >> > failure
>> >> > to
>> >> > the very end of the logfile.
>> >> > This is all I have.
>> >> >
>> >> > As far as I understand the Kerberos and Keytab mechanism in Hadoop
>> >> > Yarn
>> >> > is
>> >> > that it catches the "Invalid Token" and then (if keytab) gets a new
>> >> > Kerberos
>> >> > ticket (or tgt?).
>> >> > When the new ticket has been obtained it retries the call that
>> >> > previously
>> >> > failed.
>> >> > To me it seemed that this call can fail over the invalid Token yet it
>> >> > cannot
>> >> > be retried.
>> >> >
>> >> > At this moment I'm thinking a bug in Hadoop.
>> >> >
>> >> > Niels
>> >> >
>> >> > On Wed, Dec 2, 2015 at 2:51 PM, Maximilian Michels 
>> >> > wrote:
>> >> >>
>> >> >> Hi Niels,
>> >> >>
>> >> >> Sorry for hear you experienced this exception. From a first glance,
>> >> >> it
>> >> >> looks like a bug in Hadoop to me.
>> >> >>
>> >> >> > "Not retrying because the invoked method is not idempotent, and
>> >> >> > unable
>> >> >> > to determine whether it was invoked"
>> >> >>
>> >> >> That is nothing to worry about. This is Hadoop's internal retry
>> >> >> mechanism that re-attempts to do actions which previously failed if
>> >> >> that's possible. Since the action is not idempotent (it cannot be
>> >> >> executed again without risking to change the state of the execution)
>> >> >> and it also doesn't track its execution states, it won't be retried
>> >> >> again.
>> >> >>
>> >> >> The main issue is this exception:
>> >> >>
>> >> >> >org.apache.hadoop.security.token.SecretManager$InvalidToken:
>> >> >> > Invalid
>> >> >> > AMRMToken from >appattempt_1443166961758_163901_01
>> >> >>
>> >> >> From the stack trace it is clear that this exception occurs upon
>> >> >> requesting container status information from the Resource Manager:
>> >> >>
>> >> >> >at
>> >> >> >
>> >> >> >
>> >> >> > org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnMessage$1.applyOrElse(YarnJobManager.scala:259)
>> >> >>
>> >> >> Are there any more exceptions in the log? Do you have the complete
>> >> >> logs available and could you share them?
>> >> >>
>> >> >>
>> >> >> Best regards,
>> >> >> Max
>> >> >>
>> >> >> On Wed, Dec 2, 2015 at 11:47 AM, Niels Basjes 
>> >> >> wrote:
>> >> >> > Hi,
>> >> >> >
>> >> >> >
>> >> >> > We have a Kerberos secured Yarn cluster here and I'm experimenting
>> >> >> > with
>> >> >> > Apache Flink on top of that.
>> >> >> >
>> >> >> > A few days ago I started a very simple Flink application (just
>> >> >> > stream
>> >> >> > the
>> >> >> > time as a String into HBase 10 times per second).
>> >> >> >
>> >> >> > I (deliberately) asked our IT-ops guys to make my account have a
>> >> >> > max
>> >> >> > ticket
>> >> >> > time of 5 minutes and a max renew time of 10 minutes (yes,
>> >> >> > ridiculously
>> >> >> > low
>> >> >> > timeout values because I needed to validate this
>> >> >> > https://issues.apache.org/jira/browse/FLINK-2977).
>> >> >> >
>> >> >> > This job is started with a keytab file and after running 

Re: Flink job on secure Yarn fails after many hours

2015-12-02 Thread Niels Basjes
Sure, just give me the git repo url to build and I'll give it a try.

Niels

On Wed, Dec 2, 2015 at 4:28 PM, Maximilian Michels  wrote:

> I mentioned that the exception gets thrown when requesting container
> status information. We need this to send a heartbeat to YARN but it is
> not very crucial if this fails once for the running job. Possibly, we
> could work around this problem by retrying N times in case of an
> exception.
>
> Would it be possible for you to deploy a custom Flink 0.10.1 version
> we provide and test again?
>
> On Wed, Dec 2, 2015 at 4:03 PM, Niels Basjes  wrote:
> > No, I was just asking.
> > No upgrade is possible for the next month or two.
> >
> > This week is our busiest day of the year ...
> > Our shop is doing about 10 orders per second these days ...
> >
> > So they won't upgrade until next January/February
> >
> > Niels
> >
> > On Wed, Dec 2, 2015 at 3:59 PM, Maximilian Michels 
> wrote:
> >>
> >> Hi Niels,
> >>
> >> You mentioned you have the option to update Hadoop and redeploy the
> >> job. Would be great if you could do that and let us know how it turns
> >> out.
> >>
> >> Cheers,
> >> Max
> >>
> >> On Wed, Dec 2, 2015 at 3:45 PM, Niels Basjes  wrote:
> >> > Hi,
> >> >
> >> > I posted the entire log from the first log line at the moment of
> failure
> >> > to
> >> > the very end of the logfile.
> >> > This is all I have.
> >> >
> >> > As far as I understand the Kerberos and Keytab mechanism in Hadoop
> Yarn
> >> > is
> >> > that it catches the "Invalid Token" and then (if keytab) gets a new
> >> > Kerberos
> >> > ticket (or tgt?).
> >> > When the new ticket has been obtained it retries the call that
> >> > previously
> >> > failed.
> >> > To me it seemed that this call can fail over the invalid Token yet it
> >> > cannot
> >> > be retried.
> >> >
> >> > At this moment I'm thinking a bug in Hadoop.
> >> >
> >> > Niels
> >> >
> >> > On Wed, Dec 2, 2015 at 2:51 PM, Maximilian Michels 
> >> > wrote:
> >> >>
> >> >> Hi Niels,
> >> >>
> >> >> Sorry for hear you experienced this exception. From a first glance,
> it
> >> >> looks like a bug in Hadoop to me.
> >> >>
> >> >> > "Not retrying because the invoked method is not idempotent, and
> >> >> > unable
> >> >> > to determine whether it was invoked"
> >> >>
> >> >> That is nothing to worry about. This is Hadoop's internal retry
> >> >> mechanism that re-attempts to do actions which previously failed if
> >> >> that's possible. Since the action is not idempotent (it cannot be
> >> >> executed again without risking to change the state of the execution)
> >> >> and it also doesn't track its execution states, it won't be retried
> >> >> again.
> >> >>
> >> >> The main issue is this exception:
> >> >>
> >> >> >org.apache.hadoop.security.token.SecretManager$InvalidToken: Invalid
> >> >> > AMRMToken from >appattempt_1443166961758_163901_01
> >> >>
> >> >> From the stack trace it is clear that this exception occurs upon
> >> >> requesting container status information from the Resource Manager:
> >> >>
> >> >> >at
> >> >> >
> >> >> >
> org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnMessage$1.applyOrElse(YarnJobManager.scala:259)
> >> >>
> >> >> Are there any more exceptions in the log? Do you have the complete
> >> >> logs available and could you share them?
> >> >>
> >> >>
> >> >> Best regards,
> >> >> Max
> >> >>
> >> >> On Wed, Dec 2, 2015 at 11:47 AM, Niels Basjes 
> wrote:
> >> >> > Hi,
> >> >> >
> >> >> >
> >> >> > We have a Kerberos secured Yarn cluster here and I'm experimenting
> >> >> > with
> >> >> > Apache Flink on top of that.
> >> >> >
> >> >> > A few days ago I started a very simple Flink application (just
> stream
> >> >> > the
> >> >> > time as a String into HBase 10 times per second).
> >> >> >
> >> >> > I (deliberately) asked our IT-ops guys to make my account have a
> max
> >> >> > ticket
> >> >> > time of 5 minutes and a max renew time of 10 minutes (yes,
> >> >> > ridiculously
> >> >> > low
> >> >> > timeout values because I needed to validate this
> >> >> > https://issues.apache.org/jira/browse/FLINK-2977).
> >> >> >
> >> >> > This job is started with a keytab file and after running for 31
> hours
> >> >> > it
> >> >> > suddenly failed with the exception you see below.
> >> >> >
> >> >> > I had the same job running for almost 400 hours until that failed
> too
> >> >> > (I
> >> >> > was
> >> >> > too late to check the logfiles but I suspect the same problem).
> >> >> >
> >> >> >
> >> >> > So in that time span my tickets have expired and new tickets have
> >> >> > been
> >> >> > obtained several hundred times.
> >> >> >
> >> >> >
> >> >> > The main error I see is that in the process of a ticket expiring
> and
> >> >> > being
> >> >> > renewed I see this message:
> >> >> >
> >> >> >  Not retrying because the invoked method is not idempotent, and
> >> >> > unable
> >> >> > to determine whether it was 

Re: Flink job on secure Yarn fails after many hours

2015-12-02 Thread Maximilian Michels
I forgot you're using Flink 0.10.1. The above was for the master.

So here's the commit for Flink 0.10.1:
https://github.com/mxm/flink/commit/a41f3866f4097586a7b2262093088861b62930cd

git fetch https://github.com/mxm/flink/ \
a41f3866f4097586a7b2262093088861b62930cd && git checkout FETCH_HEAD

https://github.com/mxm/flink/archive/a41f3866f4097586a7b2262093088861b62930cd.zip

Thanks,
Max

On Wed, Dec 2, 2015 at 6:39 PM, Maximilian Michels  wrote:
> Great. Here is the commit to try out:
> https://github.com/mxm/flink/commit/f49b9635bec703541f19cb8c615f302a07ea88b3
>
> If you already have the Flink repository, check it out using
>
> git fetch https://github.com/mxm/flink/
> f49b9635bec703541f19cb8c615f302a07ea88b3 && git checkout FETCH_HEAD
>
> Alternatively, here's a direct download link to the sources with the
> fix included:
> https://github.com/mxm/flink/archive/f49b9635bec703541f19cb8c615f302a07ea88b3.zip
>
> Thanks a lot,
> Max
>
> On Wed, Dec 2, 2015 at 5:44 PM, Niels Basjes  wrote:
>> Sure, just give me the git repo url to build and I'll give it a try.
>>
>> Niels
>>
>> On Wed, Dec 2, 2015 at 4:28 PM, Maximilian Michels  wrote:
>>>
>>> I mentioned that the exception gets thrown when requesting container
>>> status information. We need this to send a heartbeat to YARN but it is
>>> not very crucial if this fails once for the running job. Possibly, we
>>> could work around this problem by retrying N times in case of an
>>> exception.
>>>
>>> Would it be possible for you to deploy a custom Flink 0.10.1 version
>>> we provide and test again?
>>>
>>> On Wed, Dec 2, 2015 at 4:03 PM, Niels Basjes  wrote:
>>> > No, I was just asking.
>>> > No upgrade is possible for the next month or two.
>>> >
>>> > This week is our busiest day of the year ...
>>> > Our shop is doing about 10 orders per second these days ...
>>> >
>>> > So they won't upgrade until next January/February
>>> >
>>> > Niels
>>> >
>>> > On Wed, Dec 2, 2015 at 3:59 PM, Maximilian Michels 
>>> > wrote:
>>> >>
>>> >> Hi Niels,
>>> >>
>>> >> You mentioned you have the option to update Hadoop and redeploy the
>>> >> job. Would be great if you could do that and let us know how it turns
>>> >> out.
>>> >>
>>> >> Cheers,
>>> >> Max
>>> >>
>>> >> On Wed, Dec 2, 2015 at 3:45 PM, Niels Basjes  wrote:
>>> >> > Hi,
>>> >> >
>>> >> > I posted the entire log from the first log line at the moment of
>>> >> > failure
>>> >> > to
>>> >> > the very end of the logfile.
>>> >> > This is all I have.
>>> >> >
>>> >> > As far as I understand the Kerberos and Keytab mechanism in Hadoop
>>> >> > Yarn
>>> >> > is
>>> >> > that it catches the "Invalid Token" and then (if keytab) gets a new
>>> >> > Kerberos
>>> >> > ticket (or tgt?).
>>> >> > When the new ticket has been obtained it retries the call that
>>> >> > previously
>>> >> > failed.
>>> >> > To me it seemed that this call can fail over the invalid Token yet it
>>> >> > cannot
>>> >> > be retried.
>>> >> >
>>> >> > At this moment I'm thinking a bug in Hadoop.
>>> >> >
>>> >> > Niels
>>> >> >
>>> >> > On Wed, Dec 2, 2015 at 2:51 PM, Maximilian Michels 
>>> >> > wrote:
>>> >> >>
>>> >> >> Hi Niels,
>>> >> >>
>>> >> >> Sorry for hear you experienced this exception. From a first glance,
>>> >> >> it
>>> >> >> looks like a bug in Hadoop to me.
>>> >> >>
>>> >> >> > "Not retrying because the invoked method is not idempotent, and
>>> >> >> > unable
>>> >> >> > to determine whether it was invoked"
>>> >> >>
>>> >> >> That is nothing to worry about. This is Hadoop's internal retry
>>> >> >> mechanism that re-attempts to do actions which previously failed if
>>> >> >> that's possible. Since the action is not idempotent (it cannot be
>>> >> >> executed again without risking to change the state of the execution)
>>> >> >> and it also doesn't track its execution states, it won't be retried
>>> >> >> again.
>>> >> >>
>>> >> >> The main issue is this exception:
>>> >> >>
>>> >> >> >org.apache.hadoop.security.token.SecretManager$InvalidToken:
>>> >> >> > Invalid
>>> >> >> > AMRMToken from >appattempt_1443166961758_163901_01
>>> >> >>
>>> >> >> From the stack trace it is clear that this exception occurs upon
>>> >> >> requesting container status information from the Resource Manager:
>>> >> >>
>>> >> >> >at
>>> >> >> >
>>> >> >> >
>>> >> >> > org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnMessage$1.applyOrElse(YarnJobManager.scala:259)
>>> >> >>
>>> >> >> Are there any more exceptions in the log? Do you have the complete
>>> >> >> logs available and could you share them?
>>> >> >>
>>> >> >>
>>> >> >> Best regards,
>>> >> >> Max
>>> >> >>
>>> >> >> On Wed, Dec 2, 2015 at 11:47 AM, Niels Basjes 
>>> >> >> wrote:
>>> >> >> > Hi,
>>> >> >> >
>>> >> >> >
>>> >> >> > We have a Kerberos secured Yarn cluster here and I'm experimenting
>>> >> >> > with
>>> >> >> > Apache Flink on top of that.

Re: NPE with Flink Streaming from Kafka

2015-12-02 Thread Stephan Ewen
Hi Mihail!

Do I understand you correctly that the use case is to raise an alarm if an
order has not been processed within a certain time period (certain number
of days) ?

If that is the case, the use case is actually perfect for a special form of
session windows that monitor such timeouts. I have prototyped a sample
application for a different use case, but it should fit your use case as
well:
https://github.com/StephanEwen/flink-demos/blob/master/timeout-monitoring/src/main/java/com/mythingy/streaming/EventStreamAnalysis.java

In that example, the timeout is 5 seconds, but there is no reason why the
timeout could not be multiple days. Windows may be very long - no problem.

Unlike many other streaming systems, each key has an individual window, so
one key's session window may start at one point in time, and the other
key's session window at a very different point. One window may finish
within in a few hours (fast processed order), one window see the timout
after three days (order that was not processed in time).

Greetings,
Stephan


On Wed, Dec 2, 2015 at 6:11 PM, Vieru, Mihail 
wrote:

> Hi Gyula, Hi Stephan,
>
> thank you for your replies.
>
> We need a state which grows indefinitely for the following use case. An
> event is created when a customer places an order. Another event is created
> when the order is sent. These events typically occur within days. We need
> to catch the cases when the said events occur over a specified time period
> to raise an alarm.
>
> So having a window of a couple of days is not feasible. Thus we need the
> state.
>
> I believe having a different state backend would circumvent the OOM issue.
> We were thinking of Redis for performance reasons. MySQL might do as well,
> if it doesn't slow down the processing too much.
>
> Are there limitations for SqlStateBackend when working with state only?
> When would the window state limitation occur?
>
> Cheers,
> Mihail
>
>
> 2015-12-02 13:38 GMT+01:00 Stephan Ewen :
>
>> Mihail!
>>
>> The Flink windows are currently in-memory only. There are plans to relax
>> that, but for the time being, having enough memory in the cluster is
>> important.
>>
>> @Gyula: I think window state is currently also limited when using the
>> SqlStateBackend, by the size of a row in the database (because windows are
>> not key/value state currently)
>>
>>
>> Here are some simple rules-of-thumb to work with:
>>
>> 1) For windows, the number of expected keys can be without bound. It is
>> important to have a rough upper bound for the number of "active keys at a
>> certain time". For example, if you have your time windows (let's say by 10
>> minutes or so), it only matters how many keys you have within each 10
>> minute interval. Those define how much memory you need.
>>
>> 2) If you work with the "OperatorState" abstraction, then you need to
>> think about cleanup a bit. The OperatorState keeps state currently for as
>> long until you set the state for the key to "null". This manual state is
>> explicitly designed to allow you to keep state across windows and across
>> very long time. On the flip side, you need to manage the amount of state
>> you store, by releasing state for keys.
>>
>> 3) If a certain key space grows infinite, you should "scope the state by
>> time". A pragmatic solution for that is to define a session window:
>>   - The session length defines after what inactivity the state is cleaned
>> (let's say 1h session length or so)
>>   - The trigger implements this session (there are a few mails on this
>> list already that explain how to do this) and take care of evaluating on
>> every element.
>>   - A count(1) evictor makes sure only one element is ever stored
>>
>> Greetings,
>> Stephan
>>
>>
>> On Wed, Dec 2, 2015 at 11:37 AM, Gyula Fóra  wrote:
>>
>>> Hi,
>>>
>>> I am working on a use case that involves storing state for billions of
>>> keys. For this we use a MySql state backend that will write each key-value
>>> state to MySql server so it will only hold a limited set of key-value pairs
>>> on heap while maintaining the processing guarantees.
>>>
>>> This will keep our streaming job from running out of memory as most of
>>> the state is off heap. I am not sure if this is relevant to your use case
>>> but if the state size grows indefinitely you might want to give it a try.
>>>
>>> I will write a detailed guide in some days but if you want to get
>>> started check this one out:
>>>
>>> https://docs.google.com/document/d/1xx3J88ZR_kuYYHAil3HD3YSBLqBTKV4Pu1SLwhiRiGs/edit?usp=sharing
>>>
>>> There are some pending improvements that I will commit in the next days
>>> that will increase the performance of the MySql adapter
>>>
>>> Let me know if you are interested in this!
>>>
>>> Cheers,
>>> Gyula
>>>
>>>
>>> Vieru, Mihail  ezt írta (időpont: 2015. dec.
>>> 2., Sze, 11:26):
>>>
 Hi Aljoscha,

 we have no upper bound for the number of expected 

Re: Way to get accumulators values *during* job execution ?

2015-12-02 Thread Stephan Ewen
Hi Arnaud!

One thing you can do is to periodically retrieve them by querying the
monitoring API:
https://ci.apache.org/projects/flink/flink-docs-release-0.10/internals/monitoring_rest_api.html

A nice approach would be to let the JobManager eagerly publish the
metrics.  I think that Christian Kreizfeld was working on an extension that
periodically publishes the accumulators to some metrics store. Maybe he can
comment on that.

Greetings,
Stephan


On Wed, Dec 2, 2015 at 5:21 PM, LINZ, Arnaud 
wrote:

> Hello,
>
>
>
> I use Grafana/Graphite to monitor my applications. The Flink GUI is really
> nice, but it disappears after the job completes and consequently is not
> suitable to long-term monitoring.
>
>
>
> For batch applications, I simply send the accumulator’s values at the end
> of the job to my Graphite base.
>
> For streaming applications, it’s more complex as the job never ends. It
> would be nice to have a way of getting current accumulator values (like in
> the GUI)  to push it periodically to Graphite in a monitoring thread. Is
> there any API to get the values during execution ?
>
>
>
> Best regards,
>
> Arnaud
>
> --
>
> L'intégrité de ce message n'étant pas assurée sur internet, la société
> expéditrice ne peut être tenue responsable de son contenu ni de ses pièces
> jointes. Toute utilisation ou diffusion non autorisée est interdite. Si
> vous n'êtes pas destinataire de ce message, merci de le détruire et
> d'avertir l'expéditeur.
>
> The integrity of this message cannot be guaranteed on the Internet. The
> company that sent this message cannot therefore be held liable for its
> content nor attachments. Any unauthorized use or dissemination is
> prohibited. If you are not the intended recipient of this message, then
> please delete it and notify the sender.
>


unsubscribe

2015-12-02 Thread 范昂


发自我的 iPhone

> 在 2015年12月3日,上午1:41,Maximilian Michels  写道:
> 
> Great. Here is the commit to try out:
> https://github.com/mxm/flink/commit/f49b9635bec703541f19cb8c615f302a07ea88b3
> 
> If you already have the Flink repository, check it out using
> 
> git fetch https://github.com/mxm/flink/
> f49b9635bec703541f19cb8c615f302a07ea88b3 && git checkout FETCH_HEAD
> 
> Alternatively, here's a direct download link to the sources with the
> fix included:
> https://github.com/mxm/flink/archive/f49b9635bec703541f19cb8c615f302a07ea88b3.zip
> 
> Thanks a lot,
> Max
> 
>> On Wed, Dec 2, 2015 at 5:44 PM, Niels Basjes  wrote:
>> Sure, just give me the git repo url to build and I'll give it a try.
>> 
>> Niels
>> 
>>> On Wed, Dec 2, 2015 at 4:28 PM, Maximilian Michels  wrote:
>>> 
>>> I mentioned that the exception gets thrown when requesting container
>>> status information. We need this to send a heartbeat to YARN but it is
>>> not very crucial if this fails once for the running job. Possibly, we
>>> could work around this problem by retrying N times in case of an
>>> exception.
>>> 
>>> Would it be possible for you to deploy a custom Flink 0.10.1 version
>>> we provide and test again?
>>> 
 On Wed, Dec 2, 2015 at 4:03 PM, Niels Basjes  wrote:
 No, I was just asking.
 No upgrade is possible for the next month or two.
 
 This week is our busiest day of the year ...
 Our shop is doing about 10 orders per second these days ...
 
 So they won't upgrade until next January/February
 
 Niels
 
 On Wed, Dec 2, 2015 at 3:59 PM, Maximilian Michels 
 wrote:
> 
> Hi Niels,
> 
> You mentioned you have the option to update Hadoop and redeploy the
> job. Would be great if you could do that and let us know how it turns
> out.
> 
> Cheers,
> Max
> 
>> On Wed, Dec 2, 2015 at 3:45 PM, Niels Basjes  wrote:
>> Hi,
>> 
>> I posted the entire log from the first log line at the moment of
>> failure
>> to
>> the very end of the logfile.
>> This is all I have.
>> 
>> As far as I understand the Kerberos and Keytab mechanism in Hadoop
>> Yarn
>> is
>> that it catches the "Invalid Token" and then (if keytab) gets a new
>> Kerberos
>> ticket (or tgt?).
>> When the new ticket has been obtained it retries the call that
>> previously
>> failed.
>> To me it seemed that this call can fail over the invalid Token yet it
>> cannot
>> be retried.
>> 
>> At this moment I'm thinking a bug in Hadoop.
>> 
>> Niels
>> 
>> On Wed, Dec 2, 2015 at 2:51 PM, Maximilian Michels 
>> wrote:
>>> 
>>> Hi Niels,
>>> 
>>> Sorry for hear you experienced this exception. From a first glance,
>>> it
>>> looks like a bug in Hadoop to me.
>>> 
 "Not retrying because the invoked method is not idempotent, and
 unable
 to determine whether it was invoked"
>>> 
>>> That is nothing to worry about. This is Hadoop's internal retry
>>> mechanism that re-attempts to do actions which previously failed if
>>> that's possible. Since the action is not idempotent (it cannot be
>>> executed again without risking to change the state of the execution)
>>> and it also doesn't track its execution states, it won't be retried
>>> again.
>>> 
>>> The main issue is this exception:
>>> 
 org.apache.hadoop.security.token.SecretManager$InvalidToken:
 Invalid
 AMRMToken from >appattempt_1443166961758_163901_01
>>> 
>>> From the stack trace it is clear that this exception occurs upon
>>> requesting container status information from the Resource Manager:
>>> 
 at
 
 
 org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnMessage$1.applyOrElse(YarnJobManager.scala:259)
>>> 
>>> Are there any more exceptions in the log? Do you have the complete
>>> logs available and could you share them?
>>> 
>>> 
>>> Best regards,
>>> Max
>>> 
>>> On Wed, Dec 2, 2015 at 11:47 AM, Niels Basjes 
>>> wrote:
 Hi,
 
 
 We have a Kerberos secured Yarn cluster here and I'm experimenting
 with
 Apache Flink on top of that.
 
 A few days ago I started a very simple Flink application (just
 stream
 the
 time as a String into HBase 10 times per second).
 
 I (deliberately) asked our IT-ops guys to make my account have a
 max
 ticket
 time of 5 minutes and a max renew time of 10 minutes (yes,
 ridiculously
 low
 timeout values because I needed to validate this
 https://issues.apache.org/jira/browse/FLINK-2977).
 
 This 

Re: NPE with Flink Streaming from Kafka

2015-12-02 Thread Vieru, Mihail
Hi Gyula, Hi Stephan,

thank you for your replies.

We need a state which grows indefinitely for the following use case. An
event is created when a customer places an order. Another event is created
when the order is sent. These events typically occur within days. We need
to catch the cases when the said events occur over a specified time period
to raise an alarm.

So having a window of a couple of days is not feasible. Thus we need the
state.

I believe having a different state backend would circumvent the OOM issue.
We were thinking of Redis for performance reasons. MySQL might do as well,
if it doesn't slow down the processing too much.

Are there limitations for SqlStateBackend when working with state only?
When would the window state limitation occur?

Cheers,
Mihail


2015-12-02 13:38 GMT+01:00 Stephan Ewen :

> Mihail!
>
> The Flink windows are currently in-memory only. There are plans to relax
> that, but for the time being, having enough memory in the cluster is
> important.
>
> @Gyula: I think window state is currently also limited when using the
> SqlStateBackend, by the size of a row in the database (because windows are
> not key/value state currently)
>
>
> Here are some simple rules-of-thumb to work with:
>
> 1) For windows, the number of expected keys can be without bound. It is
> important to have a rough upper bound for the number of "active keys at a
> certain time". For example, if you have your time windows (let's say by 10
> minutes or so), it only matters how many keys you have within each 10
> minute interval. Those define how much memory you need.
>
> 2) If you work with the "OperatorState" abstraction, then you need to
> think about cleanup a bit. The OperatorState keeps state currently for as
> long until you set the state for the key to "null". This manual state is
> explicitly designed to allow you to keep state across windows and across
> very long time. On the flip side, you need to manage the amount of state
> you store, by releasing state for keys.
>
> 3) If a certain key space grows infinite, you should "scope the state by
> time". A pragmatic solution for that is to define a session window:
>   - The session length defines after what inactivity the state is cleaned
> (let's say 1h session length or so)
>   - The trigger implements this session (there are a few mails on this
> list already that explain how to do this) and take care of evaluating on
> every element.
>   - A count(1) evictor makes sure only one element is ever stored
>
> Greetings,
> Stephan
>
>
> On Wed, Dec 2, 2015 at 11:37 AM, Gyula Fóra  wrote:
>
>> Hi,
>>
>> I am working on a use case that involves storing state for billions of
>> keys. For this we use a MySql state backend that will write each key-value
>> state to MySql server so it will only hold a limited set of key-value pairs
>> on heap while maintaining the processing guarantees.
>>
>> This will keep our streaming job from running out of memory as most of
>> the state is off heap. I am not sure if this is relevant to your use case
>> but if the state size grows indefinitely you might want to give it a try.
>>
>> I will write a detailed guide in some days but if you want to get started
>> check this one out:
>>
>> https://docs.google.com/document/d/1xx3J88ZR_kuYYHAil3HD3YSBLqBTKV4Pu1SLwhiRiGs/edit?usp=sharing
>>
>> There are some pending improvements that I will commit in the next days
>> that will increase the performance of the MySql adapter
>>
>> Let me know if you are interested in this!
>>
>> Cheers,
>> Gyula
>>
>>
>> Vieru, Mihail  ezt írta (időpont: 2015. dec.
>> 2., Sze, 11:26):
>>
>>> Hi Aljoscha,
>>>
>>> we have no upper bound for the number of expected keys. The max size for
>>> an element is 1 KB.
>>>
>>> There are 2 Maps, a KeyBy, a timeWindow, a Reduce and a writeAsText
>>> operators in the job. In the first Map we parse the contained JSON object
>>> in each element and forward it as a Flink Tuple. In the Reduce we update
>>> the state for each key. That's about it.
>>>
>>> Best,
>>> Mihail
>>>
>>>
>>> 2015-12-02 11:09 GMT+01:00 Aljoscha Krettek :
>>>
 Hi Mihail,
 could you please give some information about the number of keys that
 you are expecting in the data and how big the elements are that you are
 processing in the window.

 Also, are there any other operations that could be taxing on Memory. I
 think the different exception you see for 500MB mem size is just because
 Java notices that it ran out of memory at a different part in the program.

 Cheers,
 Aljoscha
 > On 02 Dec 2015, at 10:57, Vieru, Mihail 
 wrote:
 >
 > Yes, with the "start-cluster-streaming.sh" script.
 > If the TaskManager gets 5GB of heap it manages to process ~100
 million messages and then throws the above OOM.
 > If it gets only 500MB it manages to process ~8 million and a 

Re: Including option for starting job and task managers in the foreground

2015-12-02 Thread Brian Chhun
Yep, I think this makes sense. I'm currently patching the flink-daemon.sh
script to remove the `&`, but I don't think it's a very robust solution,
particularly when this script changes across versions of Flink. I'm very
new to Docker, but the resources I've found indicates that the process must
run in the foreground, though people seem to get around it with some hacks.

When I have some time, I can look into refactoring some parts of the
scripts so that it can be started in the foreground.

Thanks,
Brian

On Wed, Dec 2, 2015 at 3:22 AM, Maximilian Michels  wrote:

> Hi Brian,
>
> I don't recall Docker requires commands to run in the foreground. Still,
> if that is your requirement, simply remove the "&" at the end of this line
> in flink-daemon.sh:
>
> $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath
> "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`"
> ${CLASS_TO_RUN} "${ARGS[@]}" > "$out" 2>&1 < /dev/null &
>
> Cheers,
> Max
>
> On Wed, Dec 2, 2015 at 9:26 AM, Till Rohrmann 
> wrote:
>
>> Hi Brian,
>>
>> as far as I know this is at the moment not possible with our scripts.
>> However it should be relatively easy to add by simply executing the Java
>> command in flink-daemon.sh in the foreground. Do you want to add this?
>>
>> Cheers,
>> Till
>> On Dec 1, 2015 9:40 PM, "Brian Chhun" 
>> wrote:
>>
>>> Hi All,
>>>
>>> Is it possible to include a command line flag for starting job and task
>>> managers in the foreground? Currently, `bin/jobmanager.sh` and
>>> `bin/taskmanager.sh` rely on `bin/flink-daemon.sh`, which starts these
>>> things in the background. I'd like to execute these commands inside a
>>> docker container, but it's expected that the process is running in the
>>> foreground. I think it might be useful to have it run in the foreground so
>>> that it can be hooked into some process supervisors. Any suggestions are
>>> appreciated.
>>>
>>>
>>> Thanks,
>>> Brian
>>>
>>
>


Re: Including option for starting job and task managers in the foreground

2015-12-02 Thread Till Rohrmann
Hi Brian,

as far as I know this is at the moment not possible with our scripts.
However it should be relatively easy to add by simply executing the Java
command in flink-daemon.sh in the foreground. Do you want to add this?

Cheers,
Till
On Dec 1, 2015 9:40 PM, "Brian Chhun"  wrote:

> Hi All,
>
> Is it possible to include a command line flag for starting job and task
> managers in the foreground? Currently, `bin/jobmanager.sh` and
> `bin/taskmanager.sh` rely on `bin/flink-daemon.sh`, which starts these
> things in the background. I'd like to execute these commands inside a
> docker container, but it's expected that the process is running in the
> foreground. I think it might be useful to have it run in the foreground so
> that it can be hooked into some process supervisors. Any suggestions are
> appreciated.
>
>
> Thanks,
> Brian
>


Re: Question about flink message processing guarantee

2015-12-02 Thread Till Rohrmann
Just a small addition. Your sources have to be replayable to some extent.
With replayable I mean that they can continue from some kind of offset.
Otherwise the check pointing won't help you. The Kafka source supports that
for example.

Cheers,
Till
On Dec 1, 2015 11:55 PM, "Márton Balassi"  wrote:

> Dear Jerry,
>
> If you do not enable checkpointing you get the at most once processing
> guarantee (some might call that no guarantee at all). When you enable
> checkpointing you can choose between exactly and at least once semantics.
> The latter provides better latency.
>
> Best,
>
> Marton
>
> On Tue, Dec 1, 2015 at 11:04 PM, Jerry Peng 
> wrote:
>
>> Hello,
>>
>> I have a question regarding link streaming.  I now if you enable
>> checkpointing you can have exactly once processing guarantee. If I do
>> not enable checkpointing what are the semantics of the processing? At
>> least once?
>>
>> Best,
>>
>> Jerry
>>
>
>


Re: Data Stream union error after upgrading from 0.9 to 0.10.1

2015-12-02 Thread Maximilian Michels
Hi Welly,

We still have to decide on the next release date but I would expect
Flink 0.10.2 within the next weeks. If you can't work around the union
limitation, you may build your own Flink either from the master or the
release-0.10 branch which will eventually be Flink 0.10.2.

Cheers,
Max

On Tue, Dec 1, 2015 at 12:04 PM, Welly Tambunan  wrote:
> Thanks a lot Aljoscha.
>
> When it will be released ?
>
> Cheers
>
> On Tue, Dec 1, 2015 at 5:48 PM, Aljoscha Krettek 
> wrote:
>>
>> Hi,
>> I relaxed the restrictions on union. This should make it into an upcoming
>> 0.10.2 bugfix release.
>>
>> Cheers,
>> Aljoscha
>> > On 01 Dec 2015, at 11:23, Welly Tambunan  wrote:
>> >
>> > Hi All,
>> >
>> > After upgrading our system to the latest version from 0.9 to 0.10.1 we
>> > have this following error.
>> >
>> > Exception in thread "main" java.lang.UnsupportedOperationException: A
>> > DataStream cannot be unioned with itself
>> >
>> > Then i find the relevant JIRA for this one.
>> > https://issues.apache.org/jira/browse/FLINK-3080
>> >
>> > Is there any plan which release this will be ?
>> >
>> >
>> > Another issue i have after upgrading is can't union with different level
>> > of parallelism.
>> >
>> > I think we will need to fall back to 0.9 again for the time being.
>> >
>> > Cheers
>> >
>> > --
>> > Welly Tambunan
>> > Triplelands
>> >
>> > http://weltam.wordpress.com
>> > http://www.triplelands.com
>>
>
>
>
> --
> Welly Tambunan
> Triplelands
>
> http://weltam.wordpress.com
> http://www.triplelands.com


Re: NPE with Flink Streaming from Kafka

2015-12-02 Thread Vieru, Mihail
Thank you, Robert! The issue with Kafka is now solved with the
0.10-SNAPSHOT dependency.

We have run into an OutOfMemory exception though, which appears to be
related to the state. As my colleague, Javier Lopez, mentioned in a
previous thread, state handling is crucial for our use case. And as the
jobs are intended to run for months, stability plays an important role in
choosing a stream processing framework.

12/02/2015 10:03:53Fast TumblingTimeWindows(5000) of Reduce at
main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to
FAILED
java.lang.OutOfMemoryError: Java heap space
at java.util.HashMap.resize(HashMap.java:703)
at java.util.HashMap.putVal(HashMap.java:662)
at java.util.HashMap.put(HashMap.java:611)
at
org.apache.flink.runtime.state.AbstractHeapKvState.update(AbstractHeapKvState.java:98)
at
de.zalando.saiki.gerakanflink.ItemPriceAvgPerOrder$3.reduce(ItemPriceAvgPerOrder.java:121)
at
de.zalando.saiki.gerakanflink.ItemPriceAvgPerOrder$3.reduce(ItemPriceAvgPerOrder.java:108)
at
org.apache.flink.streaming.runtime.operators.windowing.KeyMap.putOrAggregate(KeyMap.java:196)
at
org.apache.flink.streaming.runtime.operators.windowing.AggregatingKeyedTimePanes.addElementToLatestPane(AggregatingKeyedTimePanes.java:50)
at
org.apache.flink.streaming.runtime.operators.windowing.AbstractAlignedProcessingTimeWindowOperator.processElement(AbstractAlignedProcessingTimeWindowOperator.java:210)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:166)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)




2015-12-01 17:42 GMT+01:00 Maximilian Michels :

> Thanks! I've linked the issue in JIRA.
>
> On Tue, Dec 1, 2015 at 5:39 PM, Robert Metzger 
> wrote:
> > I think its this one https://issues.apache.org/jira/browse/KAFKA-824
> >
> > On Tue, Dec 1, 2015 at 5:37 PM, Maximilian Michels 
> wrote:
> >>
> >> I know this has been fixed already but, out of curiosity, could you
> >> point me to the Kafka JIRA issue for this
> >> bug? From the Flink issue it looks like this is a Zookeeper version
> >> mismatch.
> >>
> >> On Tue, Dec 1, 2015 at 5:16 PM, Robert Metzger 
> >> wrote:
> >> > Hi Gyula,
> >> >
> >> > no, I didn't ;) We still deploy 0.10-SNAPSHOT versions from the
> >> > "release-0.10" branch to Apache's maven snapshot repository.
> >> >
> >> >
> >> > I don't think Mihail's code will run when he's compiling it against
> >> > 1.0-SNAPSHOT.
> >> >
> >> >
> >> > On Tue, Dec 1, 2015 at 5:13 PM, Gyula Fóra 
> wrote:
> >> >>
> >> >> Hi,
> >> >>
> >> >> I think Robert meant to write setting the connector dependency to
> >> >> 1.0-SNAPSHOT.
> >> >>
> >> >> Cheers,
> >> >> Gyula
> >> >>
> >> >> Robert Metzger  ezt írta (időpont: 2015. dec.
> 1.,
> >> >> K,
> >> >> 17:10):
> >> >>>
> >> >>> Hi Mihail,
> >> >>>
> >> >>> the issue is actually a bug in Kafka. We have a JIRA in Flink for
> this
> >> >>> as
> >> >>> well: https://issues.apache.org/jira/browse/FLINK-3067
> >> >>>
> >> >>> Sadly, we haven't released a fix for it yet. Flink 0.10.2 will
> contain
> >> >>> a
> >> >>> fix.
> >> >>>
> >> >>> Since the kafka connector is not contained in the flink binary, you
> >> >>> can
> >> >>> just set the version in your maven pom file to 0.10-SNAPSHOT. Maven
> >> >>> will
> >> >>> then download the code planned for the 0.10-SNAPSHOT release.
> >> >>>
> >> >>> On Tue, Dec 1, 2015 at 4:54 PM, Vieru, Mihail
> >> >>> 
> >> >>> wrote:
> >> 
> >>  Hi,
> >> 
> >>  we get the following NullPointerException after ~50 minutes when
> >>  running
> >>  a streaming job with windowing and state that reads data from Kafka
> >>  and
> >>  writes the result to local FS.
> >>  There are around 170 million messages to be processed, Flink 0.10.1
> >>  stops at ~8 million.
> >>  Flink runs locally, started with the "start-cluster-streaming.sh"
> >>  script.
> >> 
> >>  12/01/2015 15:06:24Job execution switched to status RUNNING.
> >>  12/01/2015 15:06:24Source: Custom Source -> Map -> Map(1/1)
> >>  switched
> >>  to SCHEDULED
> >>  12/01/2015 15:06:24Source: Custom Source -> Map -> Map(1/1)
> >>  switched
> >>  to DEPLOYING
> >>  12/01/2015 15:06:24Fast TumblingTimeWindows(5000) of Reduce at
> >>  main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched
> to
> >>  SCHEDULED
> >>  12/01/2015 15:06:24Fast TumblingTimeWindows(5000) of Reduce at
> >>  main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched
> to
> >>  

Re: Including option for starting job and task managers in the foreground

2015-12-02 Thread Maximilian Michels
Hi Brian,

I don't recall Docker requires commands to run in the foreground. Still, if
that is your requirement, simply remove the "&" at the end of this line in
flink-daemon.sh:

$JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath
"`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`"
${CLASS_TO_RUN} "${ARGS[@]}" > "$out" 2>&1 < /dev/null &

Cheers,
Max

On Wed, Dec 2, 2015 at 9:26 AM, Till Rohrmann  wrote:

> Hi Brian,
>
> as far as I know this is at the moment not possible with our scripts.
> However it should be relatively easy to add by simply executing the Java
> command in flink-daemon.sh in the foreground. Do you want to add this?
>
> Cheers,
> Till
> On Dec 1, 2015 9:40 PM, "Brian Chhun" 
> wrote:
>
>> Hi All,
>>
>> Is it possible to include a command line flag for starting job and task
>> managers in the foreground? Currently, `bin/jobmanager.sh` and
>> `bin/taskmanager.sh` rely on `bin/flink-daemon.sh`, which starts these
>> things in the background. I'd like to execute these commands inside a
>> docker container, but it's expected that the process is running in the
>> foreground. I think it might be useful to have it run in the foreground so
>> that it can be hooked into some process supervisors. Any suggestions are
>> appreciated.
>>
>>
>> Thanks,
>> Brian
>>
>


Re: Working with protobuf wrappers

2015-12-02 Thread Krzysztof Zarzycki
Thanks guys for your answers, that is exactly information I was looking
for.

Krzysztof

2015-12-01 19:22 GMT+01:00 Robert Metzger :

> Hi Flavio,
>
> 1. you don't have to register serializers if its working for you. I would
> add a custom serializer if its not working or if the performance is poor.
> 2. I don't think that there is such a performance comparison. Tuples are a
> little faster than POJOs, other types (serialized with Kryo's standard
> serializer) are usually slower.
> 3. There are some plans for the table api to do various optimizations
> (projection/filter push down), which also have some assumptions about the
> serializers. So yes, this might change for the table api.
>
>
>
> On Tue, Dec 1, 2015 at 11:26 AM, Flavio Pompermaier 
> wrote:
>
>> Sorry for the long question but I take advantage of this discussion to
>> ask for something I've never fully understood.. Let's say I have for
>> example a thrift/protobuf/avro object Person.
>>
>>1. Do I have really to register a custom serializer? In my code I
>>create a dataset from parquet-thrift but I never had to register
>>anything...Does this change something if I
>>call registerTypeWithKryoSerializer?
>>2. How are performance of Flink affected by using one serialization
>>wrt another? For example, is there a simple snippet of a Flink program 
>> that
>>show when it's better to the original Person, its POJO version or it's
>>Tuple version (assuming that is a flat object)?
>>3. Does this further change when I use Table APIs?
>>
>>
>> Best,
>> Flavio
>>
>> On Tue, Dec 1, 2015 at 10:25 AM, Robert Metzger 
>> wrote:
>>
>>> Also, we don't add serializers automatically for DataStream programs.
>>> I've opened a JIRA for this a while ago.
>>>
>>> On Tue, Dec 1, 2015 at 10:20 AM, Till Rohrmann 
>>> wrote:
>>>
 Hi Kryzsztof,

 it's true that we once added the Protobuf serializer automatically.
 However, due to versioning conflicts (see
 https://issues.apache.org/jira/browse/FLINK-1635), we removed it
 again. Now you have to register the ProtobufSerializer manually:
 https://ci.apache.org/projects/flink/flink-docs-master/apis/best_practices.html#register-a-custom-serializer-for-your-flink-program
 .

 Cheers,
 Till

 On Mon, Nov 30, 2015 at 8:48 PM, Krzysztof Zarzycki <
 k.zarzy...@gmail.com> wrote:

> Hi!
> I'm trying to use generated Protobuf wrappers compiled with protoc and
> pass them as objects between functions of Flink. I'm using Flink 0.10.0.
> Unfortunately, I get an exception on runtime:
>
> [...]
> Caused by: com.esotericsoftware.kryo.KryoException:
> java.lang.UnsupportedOperationException
> Serialization trace:
> enrichments_ (com.company$MyObject)
> at
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
> at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:162)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:313)
> ... 11 more
> Caused by: java.lang.UnsupportedOperationException
> at
> java.util.Collections$UnmodifiableCollection.add(Collections.java:1055)
> at
> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
> at
> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
> at
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
> ... 15 more
>
>
> I believed that protobuf are now serializable on default Flink
> configuration after fixing this issue in 0.9/0.8.1:
> https://issues.apache.org/jira/browse/FLINK-1392
>
> Maybe it really is, but Flink just requires some configuration?
> I'll be grateful for your help with this issue.
> Cheers,
> Krzysztof
>
>

>>>
>>
>


Re: NPE with Flink Streaming from Kafka

2015-12-02 Thread Aljoscha Krettek
Hi Mihail,
could you please give some information about the number of keys that you are 
expecting in the data and how big the elements are that you are processing in 
the window.

Also, are there any other operations that could be taxing on Memory. I think 
the different exception you see for 500MB mem size is just because Java notices 
that it ran out of memory at a different part in the program.

Cheers,
Aljoscha
> On 02 Dec 2015, at 10:57, Vieru, Mihail  wrote:
> 
> Yes, with the "start-cluster-streaming.sh" script.
> If the TaskManager gets 5GB of heap it manages to process ~100 million 
> messages and then throws the above OOM.
> If it gets only 500MB it manages to process ~8 million and a somewhat 
> misleading exception is thrown:
> 
> 12/01/2015 19:14:07Source: Custom Source -> Map -> Map(1/1) switched to 
> FAILED 
> java.lang.Exception: Java heap space
> at 
> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242)
> at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:399)
> at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
> at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.OutOfMemoryError: Java heap space
> at org.json.simple.parser.Yylex.(Yylex.java:231)
> at org.json.simple.parser.JSONParser.(JSONParser.java:34)
> at 
> de.zalando.saiki.gerakanflink.ItemPriceAvgPerOrder$1.map(ItemPriceAvgPerOrder.java:70)
> at 
> de.zalando.saiki.gerakanflink.ItemPriceAvgPerOrder$1.map(ItemPriceAvgPerOrder.java:65)
> at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:37)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:316)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:300)
> at 
> org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:48)
> at 
> org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:29)
> at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$SourceOutput.collect(SourceStreamTask.java:97)
> at 
> org.apache.flink.streaming.api.operators.StreamSource$NonTimestampContext.collect(StreamSource.java:92)
> at 
> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:450)
> 
> 
> 
> 
> 2015-12-02 10:45 GMT+01:00 Robert Metzger :
> Its good news that the issue has been resolved.
> 
> Regarding the OOM, did you start Flink in the streaming mode?
> 
> On Wed, Dec 2, 2015 at 10:18 AM, Vieru, Mihail  
> wrote:
> Thank you, Robert! The issue with Kafka is now solved with the 0.10-SNAPSHOT 
> dependency.
> 
> We have run into an OutOfMemory exception though, which appears to be related 
> to the state. As my colleague, Javier Lopez, mentioned in a previous thread, 
> state handling is crucial for our use case. And as the jobs are intended to 
> run for months, stability plays an important role in choosing a stream 
> processing framework.
> 
> 12/02/2015 10:03:53Fast TumblingTimeWindows(5000) of Reduce at 
> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to FAILED 
> java.lang.OutOfMemoryError: Java heap space
> at java.util.HashMap.resize(HashMap.java:703)
> at java.util.HashMap.putVal(HashMap.java:662)
> at java.util.HashMap.put(HashMap.java:611)
> at 
> org.apache.flink.runtime.state.AbstractHeapKvState.update(AbstractHeapKvState.java:98)
> at 
> de.zalando.saiki.gerakanflink.ItemPriceAvgPerOrder$3.reduce(ItemPriceAvgPerOrder.java:121)
> at 
> de.zalando.saiki.gerakanflink.ItemPriceAvgPerOrder$3.reduce(ItemPriceAvgPerOrder.java:108)
> at 
> org.apache.flink.streaming.runtime.operators.windowing.KeyMap.putOrAggregate(KeyMap.java:196)
> at 
> org.apache.flink.streaming.runtime.operators.windowing.AggregatingKeyedTimePanes.addElementToLatestPane(AggregatingKeyedTimePanes.java:50)
> at 
> org.apache.flink.streaming.runtime.operators.windowing.AbstractAlignedProcessingTimeWindowOperator.processElement(AbstractAlignedProcessingTimeWindowOperator.java:210)
> at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:166)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> 

Re: NPE with Flink Streaming from Kafka

2015-12-02 Thread Robert Metzger
Its good news that the issue has been resolved.

Regarding the OOM, did you start Flink in the streaming mode?

On Wed, Dec 2, 2015 at 10:18 AM, Vieru, Mihail 
wrote:

> Thank you, Robert! The issue with Kafka is now solved with the
> 0.10-SNAPSHOT dependency.
>
> We have run into an OutOfMemory exception though, which appears to be
> related to the state. As my colleague, Javier Lopez, mentioned in a
> previous thread, state handling is crucial for our use case. And as the
> jobs are intended to run for months, stability plays an important role in
> choosing a stream processing framework.
>
> 12/02/2015 10:03:53Fast TumblingTimeWindows(5000) of Reduce at
> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to
> FAILED
> java.lang.OutOfMemoryError: Java heap space
> at java.util.HashMap.resize(HashMap.java:703)
> at java.util.HashMap.putVal(HashMap.java:662)
> at java.util.HashMap.put(HashMap.java:611)
> at
> org.apache.flink.runtime.state.AbstractHeapKvState.update(AbstractHeapKvState.java:98)
> at
> de.zalando.saiki.gerakanflink.ItemPriceAvgPerOrder$3.reduce(ItemPriceAvgPerOrder.java:121)
> at
> de.zalando.saiki.gerakanflink.ItemPriceAvgPerOrder$3.reduce(ItemPriceAvgPerOrder.java:108)
> at
> org.apache.flink.streaming.runtime.operators.windowing.KeyMap.putOrAggregate(KeyMap.java:196)
> at
> org.apache.flink.streaming.runtime.operators.windowing.AggregatingKeyedTimePanes.addElementToLatestPane(AggregatingKeyedTimePanes.java:50)
> at
> org.apache.flink.streaming.runtime.operators.windowing.AbstractAlignedProcessingTimeWindowOperator.processElement(AbstractAlignedProcessingTimeWindowOperator.java:210)
> at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:166)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
>
>
>
>
> 2015-12-01 17:42 GMT+01:00 Maximilian Michels :
>
>> Thanks! I've linked the issue in JIRA.
>>
>> On Tue, Dec 1, 2015 at 5:39 PM, Robert Metzger 
>> wrote:
>> > I think its this one https://issues.apache.org/jira/browse/KAFKA-824
>> >
>> > On Tue, Dec 1, 2015 at 5:37 PM, Maximilian Michels 
>> wrote:
>> >>
>> >> I know this has been fixed already but, out of curiosity, could you
>> >> point me to the Kafka JIRA issue for this
>> >> bug? From the Flink issue it looks like this is a Zookeeper version
>> >> mismatch.
>> >>
>> >> On Tue, Dec 1, 2015 at 5:16 PM, Robert Metzger 
>> >> wrote:
>> >> > Hi Gyula,
>> >> >
>> >> > no, I didn't ;) We still deploy 0.10-SNAPSHOT versions from the
>> >> > "release-0.10" branch to Apache's maven snapshot repository.
>> >> >
>> >> >
>> >> > I don't think Mihail's code will run when he's compiling it against
>> >> > 1.0-SNAPSHOT.
>> >> >
>> >> >
>> >> > On Tue, Dec 1, 2015 at 5:13 PM, Gyula Fóra 
>> wrote:
>> >> >>
>> >> >> Hi,
>> >> >>
>> >> >> I think Robert meant to write setting the connector dependency to
>> >> >> 1.0-SNAPSHOT.
>> >> >>
>> >> >> Cheers,
>> >> >> Gyula
>> >> >>
>> >> >> Robert Metzger  ezt írta (időpont: 2015. dec.
>> 1.,
>> >> >> K,
>> >> >> 17:10):
>> >> >>>
>> >> >>> Hi Mihail,
>> >> >>>
>> >> >>> the issue is actually a bug in Kafka. We have a JIRA in Flink for
>> this
>> >> >>> as
>> >> >>> well: https://issues.apache.org/jira/browse/FLINK-3067
>> >> >>>
>> >> >>> Sadly, we haven't released a fix for it yet. Flink 0.10.2 will
>> contain
>> >> >>> a
>> >> >>> fix.
>> >> >>>
>> >> >>> Since the kafka connector is not contained in the flink binary, you
>> >> >>> can
>> >> >>> just set the version in your maven pom file to 0.10-SNAPSHOT. Maven
>> >> >>> will
>> >> >>> then download the code planned for the 0.10-SNAPSHOT release.
>> >> >>>
>> >> >>> On Tue, Dec 1, 2015 at 4:54 PM, Vieru, Mihail
>> >> >>> 
>> >> >>> wrote:
>> >> 
>> >>  Hi,
>> >> 
>> >>  we get the following NullPointerException after ~50 minutes when
>> >>  running
>> >>  a streaming job with windowing and state that reads data from
>> Kafka
>> >>  and
>> >>  writes the result to local FS.
>> >>  There are around 170 million messages to be processed, Flink
>> 0.10.1
>> >>  stops at ~8 million.
>> >>  Flink runs locally, started with the "start-cluster-streaming.sh"
>> >>  script.
>> >> 
>> >>  12/01/2015 15:06:24Job execution switched to status RUNNING.
>> >>  12/01/2015 15:06:24Source: Custom Source -> Map -> Map(1/1)
>> >>  switched
>> >>  to SCHEDULED
>> >>  12/01/2015 15:06:24Source: Custom Source -> Map -> Map(1/1)
>> >>  switched
>> >>  to 

Re: NPE with Flink Streaming from Kafka

2015-12-02 Thread Vieru, Mihail
Yes, with the "start-cluster-streaming.sh" script.
If the TaskManager gets 5GB of heap it manages to process ~100 million
messages and then throws the above OOM.
If it gets only 500MB it manages to process ~8 million and a somewhat
misleading exception is thrown:

12/01/2015 19:14:07Source: Custom Source -> Map -> Map(1/1) switched to
FAILED
java.lang.Exception: Java heap space
at
org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:399)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.OutOfMemoryError: Java heap space
at org.json.simple.parser.Yylex.(Yylex.java:231)
at org.json.simple.parser.JSONParser.(JSONParser.java:34)
at
de.zalando.saiki.gerakanflink.ItemPriceAvgPerOrder$1.map(ItemPriceAvgPerOrder.java:70)
at
de.zalando.saiki.gerakanflink.ItemPriceAvgPerOrder$1.map(ItemPriceAvgPerOrder.java:65)
at
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:37)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:316)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:300)
at
org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:48)
at
org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:29)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$SourceOutput.collect(SourceStreamTask.java:97)
at
org.apache.flink.streaming.api.operators.StreamSource$NonTimestampContext.collect(StreamSource.java:92)
at
org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:450)




2015-12-02 10:45 GMT+01:00 Robert Metzger :

> Its good news that the issue has been resolved.
>
> Regarding the OOM, did you start Flink in the streaming mode?
>
> On Wed, Dec 2, 2015 at 10:18 AM, Vieru, Mihail 
> wrote:
>
>> Thank you, Robert! The issue with Kafka is now solved with the
>> 0.10-SNAPSHOT dependency.
>>
>> We have run into an OutOfMemory exception though, which appears to be
>> related to the state. As my colleague, Javier Lopez, mentioned in a
>> previous thread, state handling is crucial for our use case. And as the
>> jobs are intended to run for months, stability plays an important role in
>> choosing a stream processing framework.
>>
>> 12/02/2015 10:03:53Fast TumblingTimeWindows(5000) of Reduce at
>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to
>> FAILED
>> java.lang.OutOfMemoryError: Java heap space
>> at java.util.HashMap.resize(HashMap.java:703)
>> at java.util.HashMap.putVal(HashMap.java:662)
>> at java.util.HashMap.put(HashMap.java:611)
>> at
>> org.apache.flink.runtime.state.AbstractHeapKvState.update(AbstractHeapKvState.java:98)
>> at
>> de.zalando.saiki.gerakanflink.ItemPriceAvgPerOrder$3.reduce(ItemPriceAvgPerOrder.java:121)
>> at
>> de.zalando.saiki.gerakanflink.ItemPriceAvgPerOrder$3.reduce(ItemPriceAvgPerOrder.java:108)
>> at
>> org.apache.flink.streaming.runtime.operators.windowing.KeyMap.putOrAggregate(KeyMap.java:196)
>> at
>> org.apache.flink.streaming.runtime.operators.windowing.AggregatingKeyedTimePanes.addElementToLatestPane(AggregatingKeyedTimePanes.java:50)
>> at
>> org.apache.flink.streaming.runtime.operators.windowing.AbstractAlignedProcessingTimeWindowOperator.processElement(AbstractAlignedProcessingTimeWindowOperator.java:210)
>> at
>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:166)
>> at
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>> at java.lang.Thread.run(Thread.java:745)
>>
>>
>>
>>
>> 2015-12-01 17:42 GMT+01:00 Maximilian Michels :
>>
>>> Thanks! I've linked the issue in JIRA.
>>>
>>> On Tue, Dec 1, 2015 at 5:39 PM, Robert Metzger 
>>> wrote:
>>> > I think its this one https://issues.apache.org/jira/browse/KAFKA-824
>>> >
>>> > On Tue, Dec 1, 2015 at 5:37 PM, Maximilian Michels 
>>> wrote:
>>> >>
>>> >> I know this has been fixed already but, out of curiosity, could you
>>> >> point me to the Kafka JIRA issue for this
>>> >> bug? From the Flink issue it looks 

Re: Question about flink message processing guarantee

2015-12-02 Thread Stephan Ewen
There is an overview of what guarantees what sources can give you:
https://ci.apache.org/projects/flink/flink-docs-master/apis/fault_tolerance.html#fault-tolerance-guarantees-of-data-sources-and-sinks

On Wed, Dec 2, 2015 at 9:19 AM, Till Rohrmann 
wrote:

> Just a small addition. Your sources have to be replayable to some extent.
> With replayable I mean that they can continue from some kind of offset.
> Otherwise the check pointing won't help you. The Kafka source supports that
> for example.
>
> Cheers,
> Till
> On Dec 1, 2015 11:55 PM, "Márton Balassi" 
> wrote:
>
>> Dear Jerry,
>>
>> If you do not enable checkpointing you get the at most once processing
>> guarantee (some might call that no guarantee at all). When you enable
>> checkpointing you can choose between exactly and at least once semantics.
>> The latter provides better latency.
>>
>> Best,
>>
>> Marton
>>
>> On Tue, Dec 1, 2015 at 11:04 PM, Jerry Peng 
>> wrote:
>>
>>> Hello,
>>>
>>> I have a question regarding link streaming.  I now if you enable
>>> checkpointing you can have exactly once processing guarantee. If I do
>>> not enable checkpointing what are the semantics of the processing? At
>>> least once?
>>>
>>> Best,
>>>
>>> Jerry
>>>
>>
>>


Re: Running WebClient from Windows

2015-12-02 Thread Welly Tambunan
Hi Fabian,

I have already created JIRA for this one.
https://issues.apache.org/jira/browse/FLINK-3099

Thanks a lot for this.

Cheers

On Wed, Dec 2, 2015 at 6:02 PM, Fabian Hueske  wrote:

> Hi Welly,
>
> at the moment we only provide native Windows .bat scripts for start-local
> and the CLI client.
> However, we check that the Unix scripts (including start-webclient.sh)
> work in a Windows Cygwin environment.
> I have to admit, I am not familiar with MinGW, so not sure what is
> happening there.
>
> It would be nice to have a Windows start script for the webclient though.
> Would you mind and open a JIRA for that?
>
> Thanks,
> Fabian
>
> 2015-12-02 3:00 GMT+01:00 Welly Tambunan :
>
>> Hi All,
>>
>> Is there any way to run WebClient for uploading the job from windows ?
>>
>> I try to run that from mingw but has these error
>>
>>
>> $ bin/start-webclient.sh
>> /c/flink-0.10.0/bin/config.sh: line 261: conditional binary operator
>> expected
>> /c/flink-0.10.0/bin/config.sh: line 261: syntax error near `=~'
>> /c/flink-0.10.0/bin/config.sh: line 261: `if [[ "$SLAVE" =~
>> ^.*/([0-9a-zA-Z.
>> -]+)$ ]]; then'
>> /c/flink-0.10.0/bin/config.sh: line 261: conditional binary operator
>> expected
>> /c/flink-0.10.0/bin/config.sh: line 261: syntax error near `=~'
>> /c/flink-0.10.0/bin/config.sh: line 261: `if [[ "$SLAVE" =~
>> ^.*/([0-9a-zA-Z.
>> -]+)$ ]]; then'
>> Starting Flink webclient
>>
>> [Terminate]
>>
>> Cheers
>>
>> --
>> Welly Tambunan
>> Triplelands
>>
>> http://weltam.wordpress.com
>> http://www.triplelands.com 
>>
>
>


-- 
Welly Tambunan
Triplelands

http://weltam.wordpress.com
http://www.triplelands.com 


Re: NPE with Flink Streaming from Kafka

2015-12-02 Thread Stephan Ewen
Mihail!

The Flink windows are currently in-memory only. There are plans to relax
that, but for the time being, having enough memory in the cluster is
important.

@Gyula: I think window state is currently also limited when using the
SqlStateBackend, by the size of a row in the database (because windows are
not key/value state currently)


Here are some simple rules-of-thumb to work with:

1) For windows, the number of expected keys can be without bound. It is
important to have a rough upper bound for the number of "active keys at a
certain time". For example, if you have your time windows (let's say by 10
minutes or so), it only matters how many keys you have within each 10
minute interval. Those define how much memory you need.

2) If you work with the "OperatorState" abstraction, then you need to think
about cleanup a bit. The OperatorState keeps state currently for as long
until you set the state for the key to "null". This manual state is
explicitly designed to allow you to keep state across windows and across
very long time. On the flip side, you need to manage the amount of state
you store, by releasing state for keys.

3) If a certain key space grows infinite, you should "scope the state by
time". A pragmatic solution for that is to define a session window:
  - The session length defines after what inactivity the state is cleaned
(let's say 1h session length or so)
  - The trigger implements this session (there are a few mails on this list
already that explain how to do this) and take care of evaluating on every
element.
  - A count(1) evictor makes sure only one element is ever stored

Greetings,
Stephan


On Wed, Dec 2, 2015 at 11:37 AM, Gyula Fóra  wrote:

> Hi,
>
> I am working on a use case that involves storing state for billions of
> keys. For this we use a MySql state backend that will write each key-value
> state to MySql server so it will only hold a limited set of key-value pairs
> on heap while maintaining the processing guarantees.
>
> This will keep our streaming job from running out of memory as most of the
> state is off heap. I am not sure if this is relevant to your use case but
> if the state size grows indefinitely you might want to give it a try.
>
> I will write a detailed guide in some days but if you want to get started
> check this one out:
>
> https://docs.google.com/document/d/1xx3J88ZR_kuYYHAil3HD3YSBLqBTKV4Pu1SLwhiRiGs/edit?usp=sharing
>
> There are some pending improvements that I will commit in the next days
> that will increase the performance of the MySql adapter
>
> Let me know if you are interested in this!
>
> Cheers,
> Gyula
>
>
> Vieru, Mihail  ezt írta (időpont: 2015. dec. 2.,
> Sze, 11:26):
>
>> Hi Aljoscha,
>>
>> we have no upper bound for the number of expected keys. The max size for
>> an element is 1 KB.
>>
>> There are 2 Maps, a KeyBy, a timeWindow, a Reduce and a writeAsText
>> operators in the job. In the first Map we parse the contained JSON object
>> in each element and forward it as a Flink Tuple. In the Reduce we update
>> the state for each key. That's about it.
>>
>> Best,
>> Mihail
>>
>>
>> 2015-12-02 11:09 GMT+01:00 Aljoscha Krettek :
>>
>>> Hi Mihail,
>>> could you please give some information about the number of keys that you
>>> are expecting in the data and how big the elements are that you are
>>> processing in the window.
>>>
>>> Also, are there any other operations that could be taxing on Memory. I
>>> think the different exception you see for 500MB mem size is just because
>>> Java notices that it ran out of memory at a different part in the program.
>>>
>>> Cheers,
>>> Aljoscha
>>> > On 02 Dec 2015, at 10:57, Vieru, Mihail 
>>> wrote:
>>> >
>>> > Yes, with the "start-cluster-streaming.sh" script.
>>> > If the TaskManager gets 5GB of heap it manages to process ~100 million
>>> messages and then throws the above OOM.
>>> > If it gets only 500MB it manages to process ~8 million and a somewhat
>>> misleading exception is thrown:
>>> >
>>> > 12/01/2015 19:14:07Source: Custom Source -> Map -> Map(1/1)
>>> switched to FAILED
>>> > java.lang.Exception: Java heap space
>>> > at
>>> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242)
>>> > at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:399)
>>> > at
>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
>>> > at
>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55)
>>> > at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
>>> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>>> > at java.lang.Thread.run(Thread.java:745)
>>> > Caused by: java.lang.OutOfMemoryError: Java heap space
>>> > at org.json.simple.parser.Yylex.(Yylex.java:231)
>>> > at 

Re: Flink job on secure Yarn fails after many hours

2015-12-02 Thread Maximilian Michels
Hi Niels,

Sorry for hear you experienced this exception. From a first glance, it
looks like a bug in Hadoop to me.

> "Not retrying because the invoked method is not idempotent, and unable to 
> determine whether it was invoked"

That is nothing to worry about. This is Hadoop's internal retry
mechanism that re-attempts to do actions which previously failed if
that's possible. Since the action is not idempotent (it cannot be
executed again without risking to change the state of the execution)
and it also doesn't track its execution states, it won't be retried
again.

The main issue is this exception:

>org.apache.hadoop.security.token.SecretManager$InvalidToken: Invalid AMRMToken 
>from >appattempt_1443166961758_163901_01

>From the stack trace it is clear that this exception occurs upon
requesting container status information from the Resource Manager:

>at 
>org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnMessage$1.applyOrElse(YarnJobManager.scala:259)

Are there any more exceptions in the log? Do you have the complete
logs available and could you share them?


Best regards,
Max

On Wed, Dec 2, 2015 at 11:47 AM, Niels Basjes  wrote:
> Hi,
>
>
> We have a Kerberos secured Yarn cluster here and I'm experimenting with
> Apache Flink on top of that.
>
> A few days ago I started a very simple Flink application (just stream the
> time as a String into HBase 10 times per second).
>
> I (deliberately) asked our IT-ops guys to make my account have a max ticket
> time of 5 minutes and a max renew time of 10 minutes (yes, ridiculously low
> timeout values because I needed to validate this
> https://issues.apache.org/jira/browse/FLINK-2977).
>
> This job is started with a keytab file and after running for 31 hours it
> suddenly failed with the exception you see below.
>
> I had the same job running for almost 400 hours until that failed too (I was
> too late to check the logfiles but I suspect the same problem).
>
>
> So in that time span my tickets have expired and new tickets have been
> obtained several hundred times.
>
>
> The main error I see is that in the process of a ticket expiring and being
> renewed I see this message:
>
>  Not retrying because the invoked method is not idempotent, and unable
> to determine whether it was invoked
>
>
> Yarn on the cluster is 2.6.0 ( HDP 2.6.0.2.2.4.2-2 )
>
> Flink is version 0.10.1
>
>
> How do I fix this?
> Is this a bug (in either Hadoop or Flink) or am I doing something wrong?
> Would upgrading Yarn to 2.7.1  (i.e. HDP 2.3) fix this?
>
>
> Niels Basjes
>
>
>
> 21:30:27,821 WARN  org.apache.hadoop.security.UserGroupInformation
> - PriviledgedActionException as:nbasjes (auth:SIMPLE)
> cause:org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
> Invalid AMRMToken from appattempt_1443166961758_163901_01
> 21:30:27,861 WARN  org.apache.hadoop.ipc.Client
> - Exception encountered while connecting to the server :
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
> Invalid AMRMToken from appattempt_1443166961758_163901_01
> 21:30:27,861 WARN  org.apache.hadoop.security.UserGroupInformation
> - PriviledgedActionException as:nbasjes (auth:SIMPLE)
> cause:org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
> Invalid AMRMToken from appattempt_1443166961758_163901_01
> 21:30:27,891 WARN  org.apache.hadoop.io.retry.RetryInvocationHandler
> - Exception while invoking class
> org.apache.hadoop.yarn.api.impl.pb.client.ApplicationMasterProtocolPBClientImpl.allocate.
> Not retrying because the invoked method is not idempotent, and unable to
> determine whether it was invoked
> org.apache.hadoop.security.token.SecretManager$InvalidToken: Invalid
> AMRMToken from appattempt_1443166961758_163901_01
>   at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>   at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
>   at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>   at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
>   at 
> org.apache.hadoop.yarn.ipc.RPCUtil.instantiateException(RPCUtil.java:53)
>   at
> org.apache.hadoop.yarn.ipc.RPCUtil.unwrapAndThrowException(RPCUtil.java:104)
>   at
> org.apache.hadoop.yarn.api.impl.pb.client.ApplicationMasterProtocolPBClientImpl.allocate(ApplicationMasterProtocolPBClientImpl.java:79)
>   at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source)
>   at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
>   at
> 

Re: Iterative queries on Flink

2015-12-02 Thread Maximilian Michels
Hi Flavio,

I was working on this some time ago but it didn't make it in yet and
priorities shifted a bit. The pull request is here:
https://github.com/apache/flink/pull/640

The basic idea is to remove Flink's ResultPartition buffers in memory
lazily, i.e. keep them as long as enough memory is available. When a
new job is resumed, it picks up the old results again. The pull
request needs some overhaul now and the API integration is not there
yet.

Cheers,
Max

On Mon, Nov 30, 2015 at 5:35 PM, Flavio Pompermaier
 wrote:
> I think that with some support I could try to implement it...actually I just
> need to add a persist(StorageLevel.OFF_HEAP) method to the Dataset APIs
> (similar to what Spark does..) and output it to a tachyon directory
> configured in the flink-conf.yml and then re-read that dataset using its
> generated name on tachyon. Do you have other suggestions?
>
>
> On Mon, Nov 30, 2015 at 4:58 PM, Fabian Hueske  wrote:
>>
>> The basic building blocks are there but I am not aware of any efforts to
>> implement caching and add it to the API.
>>
>> 2015-11-30 16:55 GMT+01:00 Flavio Pompermaier :
>>>
>>> Is there any effort in this direction? maybe I could achieve something
>>> like that using Tachyon in some way...?
>>>
>>> On Mon, Nov 30, 2015 at 4:52 PM, Fabian Hueske  wrote:

 Hi Flavio,

 Flink does not support caching of data sets in memory yet.

 Best, Fabian

 2015-11-30 16:45 GMT+01:00 Flavio Pompermaier :
>
> Hi to all,
> I was wondering if Flink could fit a use case where a user load a
> dataset in memory and then he/she wants to explore it interactively. Let's
> say I want to load a csv, then filter out the rows where the column value
> match some criteria, then apply another criteria after seeing the results 
> of
> the first filter.
> Is there a way to keep the dataset in memory and modify it
> interactively without re-reading all the dataset every time I want to 
> chain
> another operation to my dataset?
>
> Best,
> Flavio


>>>
>>>
>>
>
>