Re: Global State and Scaling

2017-08-21 Thread Elias Levy
Looks like Gerard asked something along similar lines

just last month and that there is a JIRA
 for official support for
broadcast state.  Looks like the ugly hack is the way to go for now.


On Mon, Aug 21, 2017 at 1:23 PM, Elias Levy 
wrote:

> I am implementing a control stream.  The stream communicates a global
> configuration value for the whole job.  It uses DataStream.broadcast() to
> communicate this to all parallel operator instances.  I would like to save
> this value in state so that it can be recovered when the job
> restarts/recovers.  The control stream is not keyed, so the only option is
> Operator state.
>
> I could implement this using the ListCheckpointed interface, returning
> Collections.singletonList(configValue) from snapshotState.  It is clear
> what I'd need to do in restoreState in the case of scale in.  If I include
> a serial number in the config, and it receives multiple values on restore,
> it can keep the config value with the largest serial number, indicating the
> latest config.
>
> Alas, it is not clear what should happen on scale out, as some operator
> instances will receive empty lists.
>
> It seems the other alternative is to use CheckpointedFunction, along with
> union redistribution via getUnionListState, and then have each operator
> instance select from the union list the config with the latest serial
> number, of which they should be multiple copies.  But this seem like an
> ugly hack.
>
>
> In addition, the documentation is unclear on the relationship and effect,
> if any, of the maximum parallelism Flink job parameter on operator state,
> where as it is much clearer on this regard as it related to keyed state via
> key groups.
>
>
> How are folks handling this use case, i.e. storing and restoring global
> config values via Flink state?
>
>


Re: Deleting files in continuous processing

2017-08-21 Thread Mohit Anchlia
Just checking to see if there is a way to purge files after it's processed.

On Tue, Aug 15, 2017 at 5:11 PM, Mohit Anchlia 
wrote:

> Is there a way to delete a file once it has been processed?
>
> streamEnv
>
> .readFile(format, args[0], FileProcessingMode.*PROCESS_CONTINUOUSLY*,
> 2000)
>


Global State and Scaling

2017-08-21 Thread Elias Levy
I am implementing a control stream.  The stream communicates a global
configuration value for the whole job.  It uses DataStream.broadcast() to
communicate this to all parallel operator instances.  I would like to save
this value in state so that it can be recovered when the job
restarts/recovers.  The control stream is not keyed, so the only option is
Operator state.

I could implement this using the ListCheckpointed interface, returning
Collections.singletonList(configValue) from snapshotState.  It is clear
what I'd need to do in restoreState in the case of scale in.  If I include
a serial number in the config, and it receives multiple values on restore,
it can keep the config value with the largest serial number, indicating the
latest config.

Alas, it is not clear what should happen on scale out, as some operator
instances will receive empty lists.

It seems the other alternative is to use CheckpointedFunction, along with
union redistribution via getUnionListState, and then have each operator
instance select from the union list the config with the latest serial
number, of which they should be multiple copies.  But this seem like an
ugly hack.


In addition, the documentation is unclear on the relationship and effect,
if any, of the maximum parallelism Flink job parameter on operator state,
where as it is much clearer on this regard as it related to keyed state via
key groups.


How are folks handling this use case, i.e. storing and restoring global
config values via Flink state?


Re: Issue with Scala API when using CEP's "notFollowedBy"

2017-08-21 Thread Ted Yu
Please take a look at FLINK-7306, fixed for 1.4.0 release.

On Mon, Aug 21, 2017 at 1:03 PM, Gehad Elrobey 
wrote:

> Hi there,
>
> I have an issue using the Scala API for the CEP library, the notFollowedBy API
> call doesn't return a Pattern it returns a Unit instead, So I am not able
> to chain additional filters. I tried the Java API and it works fine, So I
> believe there is something wrong with the Scala API or I might be missing
> something?
>
> Cheers
>
> Scala version: 2.11.8
> Flink Version: 1.3.1
>
>


Issue with Scala API when using CEP's "notFollowedBy"

2017-08-21 Thread Gehad Elrobey
Hi there,

I have an issue using the Scala API for the CEP library, the notFollowedBy API
call doesn't return a Pattern it returns a Unit instead, So I am not able
to chain additional filters. I tried the Java API and it works fine, So I
believe there is something wrong with the Scala API or I might be missing
something?

Cheers

Scala version: 2.11.8
Flink Version: 1.3.1


Re: [Survey] How many people use Flink with AWS Kinesis sink

2017-08-21 Thread Bowen Li
Hi Stephan,

It's just Kinesis Producer in KPL (Kinesis Producer Library) causing LOTS
of trouble. flink-connector-kinesis uses Kinesis Producer to write output
results to Kinesis. On the other hand, Kinesis Consumer (KCL) is fine.

If there are any successful use cases of Flink + KPL, I'd love to learn 1)
what KPL configuration values (rate limit, record_ttl, etc) are the best
for Flink 2) what the deployment strategy of KPL (parallelism, any
dedicated cores or memory?) works best with Flink. Thanks!

Bowen


On Mon, Aug 21, 2017 at 10:55 AM, Stephan Ewen  wrote:

> Hi!
>
> I cannot speak for the full survey, only from observation on the mailing
> list and some users I have chatted to directly.
> I do not really know about the Kinesis Producer (don't know a specific
> case there), but the Kinesis Consumer seems to be used quite a bit.
>
> Do your observations pertain to Kinesis Consumer as well, or mainly to the
> Kinesis Producer?
>
> Best,
> Stephan
>
>
> On Mon, Aug 21, 2017 at 8:29 AM, Bowen Li  wrote:
>
>> Hi guys,
>> We want to have a more accurate idea of how many people are writing
>> Flink's computation result to AWS Kinesis, and how many people had
>> successful Flink deployment against Kinesis?
>>
>> The reason I ask for the survey is because we have been trying to
>> make our Flink jobs and Kinesis sink work together for a long time but
>> haven't succeeded yet. We discovered quite a few issues with not only
>> Flink's flink-kinesis-connector but, most importantly, KPL (Kinesis
>> Producer Library) itself. Kinesis/KPL is poorly designed, we hate Kinesis,
>> and we are currently evaluating how much effort it further requires to make
>> Flink works with Kinesis.
>>
>> If not many Flink users had good experience with Kinesis, we'll
>> probably need to look for some alternatives.
>>
>> I really appreciate your time and your insight! Thank you very much!
>>
>> Bowen
>>
>>
>>
>


Re: Prioritize DataStream

2017-08-21 Thread Elias Levy
Flink folks,

A response to the question below?

On Sat, Aug 19, 2017 at 11:02 AM, Elias Levy 
wrote:

> I believe the answer to this question is "no", but I figure I might as
> well ask.  Is there a way to prioritize a stream?
>
> The use case is prioritization of a control stream.  This is mostly needed
> on start-up, where a job might start consuming from the data stream before
> consuming from the control stream.
>
>
>


Re: [Survey] How many people use Flink with AWS Kinesis sink

2017-08-21 Thread Stephan Ewen
Hi!

I cannot speak for the full survey, only from observation on the mailing
list and some users I have chatted to directly.
I do not really know about the Kinesis Producer (don't know a specific case
there), but the Kinesis Consumer seems to be used quite a bit.

Do your observations pertain to Kinesis Consumer as well, or mainly to the
Kinesis Producer?

Best,
Stephan


On Mon, Aug 21, 2017 at 8:29 AM, Bowen Li  wrote:

> Hi guys,
> We want to have a more accurate idea of how many people are writing
> Flink's computation result to AWS Kinesis, and how many people had
> successful Flink deployment against Kinesis?
>
> The reason I ask for the survey is because we have been trying to make
> our Flink jobs and Kinesis sink work together for a long time but haven't
> succeeded yet. We discovered quite a few issues with not only Flink's
> flink-kinesis-connector but, most importantly, KPL (Kinesis Producer
> Library) itself. Kinesis/KPL is poorly designed, we hate Kinesis, and we
> are currently evaluating how much effort it further requires to make Flink
> works with Kinesis.
>
> If not many Flink users had good experience with Kinesis, we'll
> probably need to look for some alternatives.
>
> I really appreciate your time and your insight! Thank you very much!
>
> Bowen
>
>
>


Re: Flink HA with Kubernetes, without Zookeeper

2017-08-21 Thread Hao Sun
Thanks Shannon for the https://github.com/coreos/zetcd
 tips, I will check that
out and share my results if we proceed on that path.
Thanks Stephan for the details, this is very useful, I was about to ask
what exactly is stored into zookeeper, haha.

On Mon, Aug 21, 2017 at 9:31 AM Stephan Ewen  wrote:

> Hi!
>
> That is a very interesting proposition. In cases where you have a single
> master only, you may bet away with quite good guarantees without ZK. In
> fact, Flink does not store significant data in ZK at all, it only uses
> locks and counters.
>
> You can have a setup without ZK, provided you have the following:
>
>   - All processes restart (a lost JobManager restarts eventually). Should
> be given in Kubernetes.
>
>   - A way for TaskManagers to discover the restarted JobManager. Should
> work via Kubernetes as well (restarted containers retain the external
> hostname)
>
>   - A way to isolate different "leader sessions" against each other. Flink
> currently uses ZooKeeper to also attach a "leader session ID" to leader
> election, which is a fencing token to avoid that processes talk to each
> other despite having different views on who is the leader, or whether the
> leaser lost and re-gained leadership.
>
>   - An atomic marker for what is the latest completed checkpoint.
>
>   - A distributed atomic counter for the checkpoint ID. This is crucial to
> ensure correctness of checkpoints in the presence of JobManager failures
> and re-elections or split-brain situations.
>
> I would assume that etcd can provide all of those services. The best way
> to integrate it would probably be to add an implementation of Flink's
> "HighAvailabilityServices" based on etcd.
>
> Have a look at this class:
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
> 
>
> If you want to contribute an extension of Flink using etcd, that would be
> awesome.
> This should have a FLIP though, and a plan on how to set up rigorous unit
> testing of that implementation (because its correctness is very crucial to
> Flink's HA resilience).
>
> Best,
> Stephan
>
>
> On Mon, Aug 21, 2017 at 4:15 PM, Shannon Carey  wrote:
>
>> Zookeeper should still be necessary even in that case, because it is
>> where the JobManager stores information which needs to be recovered after
>> the JobManager fails.
>>
>> We're eyeing https://github.com/coreos/zetcd
>>  as a way to run
>> Zookeeper on top of Kubernetes' etcd cluster so that we don't have to rely
>> on a separate Zookeeper cluster. However, we haven't tried it yet.
>>
>> -Shannon
>>
>> From: Hao Sun 
>> Date: Sunday, August 20, 2017 at 9:04 PM
>> To: "user@flink.apache.org" 
>> Subject: Flink HA with Kubernetes, without Zookeeper
>>
>> Hi, I am new to Flink and trying to bring up a Flink cluster on top of
>> Kubernetes.
>>
>> For HA setup, with kubernetes, I think I just need one job manager and do
>> not need Zookeeper? I will store all states to S3 buckets. So in case of
>> failure, kubernetes can just bring up a new job manager without losing
>> anything?
>>
>> I want to confirm my assumptions above make sense. Thanks
>>
>
>


Re: Flink HA with Kubernetes, without Zookeeper

2017-08-21 Thread Stephan Ewen
Hi!

That is a very interesting proposition. In cases where you have a single
master only, you may bet away with quite good guarantees without ZK. In
fact, Flink does not store significant data in ZK at all, it only uses
locks and counters.

You can have a setup without ZK, provided you have the following:

  - All processes restart (a lost JobManager restarts eventually). Should
be given in Kubernetes.

  - A way for TaskManagers to discover the restarted JobManager. Should
work via Kubernetes as well (restarted containers retain the external
hostname)

  - A way to isolate different "leader sessions" against each other. Flink
currently uses ZooKeeper to also attach a "leader session ID" to leader
election, which is a fencing token to avoid that processes talk to each
other despite having different views on who is the leader, or whether the
leaser lost and re-gained leadership.

  - An atomic marker for what is the latest completed checkpoint.

  - A distributed atomic counter for the checkpoint ID. This is crucial to
ensure correctness of checkpoints in the presence of JobManager failures
and re-elections or split-brain situations.

I would assume that etcd can provide all of those services. The best way to
integrate it would probably be to add an implementation of Flink's
"HighAvailabilityServices" based on etcd.

Have a look at this class:
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java

If you want to contribute an extension of Flink using etcd, that would be
awesome.
This should have a FLIP though, and a plan on how to set up rigorous unit
testing of that implementation (because its correctness is very crucial to
Flink's HA resilience).

Best,
Stephan


On Mon, Aug 21, 2017 at 4:15 PM, Shannon Carey  wrote:

> Zookeeper should still be necessary even in that case, because it is where
> the JobManager stores information which needs to be recovered after the
> JobManager fails.
>
> We're eyeing https://github.com/coreos/zetcd as a way to run Zookeeper on
> top of Kubernetes' etcd cluster so that we don't have to rely on a separate
> Zookeeper cluster. However, we haven't tried it yet.
>
> -Shannon
>
> From: Hao Sun 
> Date: Sunday, August 20, 2017 at 9:04 PM
> To: "user@flink.apache.org" 
> Subject: Flink HA with Kubernetes, without Zookeeper
>
> Hi, I am new to Flink and trying to bring up a Flink cluster on top of
> Kubernetes.
>
> For HA setup, with kubernetes, I think I just need one job manager and do
> not need Zookeeper? I will store all states to S3 buckets. So in case of
> failure, kubernetes can just bring up a new job manager without losing
> anything?
>
> I want to confirm my assumptions above make sense. Thanks
>


Re: Flink multithreading, CoFlatMapFunction, CoProcessFunction, internal state

2017-08-21 Thread Nico Kruber
Hi Chao,
what I meant by "per-record base" was actually supposed to be "per-event base" 
(event = one entity of whatever the stream contains). As from the API: 
processing is supposed to be one event at a time and this is what is performed 
internally, too.

Nico

On Thursday, 17 August 2017 05:06:07 CEST Chao Wang wrote:
> Thank you! Nico. That helps me a lot!
> 
> 2a) That really clarifies my understanding about Flink. Yes, I think I
> have used static references, since I invoked a native function
> (implemented through JNI) which I believe only has one instance per
> process. And I guess the reason why those Java synchronization
> mechanisms were in vain is because of separate function objects at
> runtime, which results in separate lock objects. Now I use c++ mutex
> within the native function and it resolves my case.
> 
> BTW, could you elaborate a bit more about what do you mean by
> "per-record base"? what do you mean by a record?
> 
> 3) I do not intend to store the CoProcessFunction.Context. I was just
> wondering that since the document said it is only valid during the
> invocation, for maintaining custom states of my program logic I guess I
> cannot use it.
> 
> 
> Thank you,
> Chao
> 
> On 08/16/2017 03:31 AM, Nico Kruber wrote:
> > Hi Chao,
> > 
> > 1) regarding the difference of CoFlatMapFunction/CoProcessFunction, let me
> > quote the javadoc of the CoProcessFunction:
> > 
> > "Contrary to the {@link CoFlatMapFunction}, this function can also query
> > the time (both event and processing) and set timers, through the provided
> > {@link Context}. When reacting to the firing of set timers the function
> > can emit yet more elements."
> > 
> > So, imho, both deliver a different level of abstraction and control (high-
> > vs. low-level). Also note the different methods available for you to
> > implement.
> > 
> > 2a) In general, Flink calls functions on a per-record base in a serialized
> > fashion per task. For each task at a TaskManager, in case of it having
> > multiple slots, separate function objects are used where you should only
> > get in trouble if you share static references. Otherwise you do not need
> > to worry about thread-safety.
> > 
> > 2b) From what I see in the code (StreamTwoInputProcessor), the same should
> > apply to CoFlatMapFunction and CoProcessFunction so that calls to
> > flatMap1/2 and processElement1/2 are not called in parallel!
> > 
> > 3) why would you want to store the CoProcessFunction.Context?
> > 
> > 
> > Nico
> > 
> > On Monday, 14 August 2017 18:36:38 CEST Chao Wang wrote:
> >> Hi,
> >> 
> >> I'd like to know if CoFlatMapFunction/CoProcessFunction is thread-safe,
> >> and to what extent? What's the difference between the two Functions? and
> >> in general, how does Flink prevent race conditions? Here's my case:
> >> 
> >> I tried to condition on two input streams and produce the third stream
> >> if the condition is met. I implemented CoFlatMapFunction and tried to
> >> monitor a state using a field in the implemented class (I want to
> >> isolate my application from the checkpointing feature, and therefore I
> >> do not use the states as documented here
> >> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/st
> >> ate .html). The field served as a flag indicating whether there are some
> >> pending data from either input stream, and if yes, processing it along
> >> with the arriving data from the other input stream (the processing
> >> invokes a native function).
> >> 
> >> But then I got double free error and segmentation fault, which I believe
> >> was due to unintentional concurrent access to the native function. Then
> >> I tried to wrap the access into a synchronized method, as well as
> >> explicitly lock/unlock the flatMap1/flatMap2 methods, but all in vain
> >> and the error remained.
> >> 
> >> I considered using CoProcessFunction in my case, but seems to me that it
> >> does not handle customary internal states, stating in the javadoc "The
> >> context [CoProcessFunction.Context] is only valid during the invocation
> >> of this method, do not store it."
> >> 
> >> 
> >> 
> >> Thanks,
> >> Chao



signature.asc
Description: This is a digitally signed message part.


Re: Aggregation by key hierarchy

2017-08-21 Thread Nico Kruber
Hi Basant,
no, you cannot add data streams or re-wire your program during runtime.
As for any other program changes, you'd have to take a savepoint (to keep 
operator state and exactly-once semantics) and restart the new program code 
from there.

For a few combinations, I'd probably choose the second option for simplicity 
but for more combinations, option 1 seems better (mapping your key 
combinations to different tuple-keys, key-by this one and applying window 
operations afterwards).

Option 2 may also require more slots to be available since it has more 
operators [1] and may not be evenly balanced based on your input data and the 
work associated with it. Since option 1's window operators aggregate all 
different tuples, load distribution may be better. Other than that, the 
communication pattern is similar. To get a better understanding of the 
performance impacts, you'd have to benchmark with your aggregation and input 
data though.


Nico

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/concepts/
programming-model.html#parallel-dataflows

On Wednesday, 16 August 2017 13:24:42 CEST Basanth Gowda wrote:
> Thanks Nico.
> 
> As there are 2 ways to achieve this which is better ?
> 
> 1st option -> dataStream.flatMap( ... ) -> this takes in out and provides
> me N number of outputs, depending on my key combination . On each of the
> output the same windowing logic is applied
> 
> or the one you suggested
> 
> 2nd option -> use keyBy to create N number of streams
> 
> With the fist option I would use an external config, and it allows me to
> change the number of combinations dynamically at runtime. Would it be
> possible with 2nd option as well ? Can I modify or add data stream at
> runtime without restarting  ?
> 
> On Wed, Aug 16, 2017 at 4:37 AM, Nico Kruber  wrote:
> > [back to the ml...]
> > 
> > also including your other mail's additional content...
> > 
> > > I have been able to do this by the following and repeating this for
> > > every
> > > key + window combination. So in the above case there would be 8 blocks
> > 
> > like
> > 
> > > below. (4 combinations and 2 window period for each combination)
> > > 
> > > modelDataStream.keyBy("campaiginId","addId")
> > > 
> > > .timeWindow(Time.minutes(1))
> > > .trigger(CountTrigger.of(2))
> > > .reduce(..)
> > 
> > As mentioned in my last email, I only see one way for reducing duplication
> > (for the key combinations) but this involves more handling from your side
> > and
> > I'd probably not recommend this. Regarding the different windows, I do not
> > see
> > something you may do otherwise here.
> > 
> > Maybe Aljoscha (cc'd) has an idea of how to do this better
> > 
> > 
> > Nico
> > 
> > On Monday, 14 August 2017 19:08:29 CEST Basanth Gowda wrote:
> > > Hi Nico,
> > > Thank you . This is pretty much what I am doing , was wondering if there
> > 
> > is
> > 
> > > a better way.
> > > 
> > > If there are 10 dimensions on which I want to aggregate with 2 windows -
> > > this would become about 20 different combinations
> > > 
> > > Thank you
> > > Basanth
> > > 
> > > On Mon, Aug 14, 2017 at 12:50 PM Nico Kruber 
> > 
> > wrote:
> > > > Hi Basanth,
> > > > Let's assume you have records of the form
> > > > Record = {timestamp, country, state, city, value}
> > > > Then you'd like to create aggregates, e.g. the average, for the
> > 
> > following
> > 
> > > > combinations?
> > > > 1) avg per country
> > > > 2) avg per state and country
> > > > 3) avg per city and state and country
> > > > 
> > > > * You could create three streams and aggregate each individually:
> > > > DataStream ds = //...
> > > > DataStream ds1 = ds.keyBy("country");
> > > > DataStream ds2 = ds.keyBy("country","state");
> > > > DataStream ds3 = ds.keyBy("country","state","city");
> > > > // + your aggregation per stream ds1, ds2, ds3
> > > > 
> > > > You probably want to do different things for each of the resulting
> > > > aggregations anyway, so having separate streams is probably right for
> > 
> > you.
> > 
> > > > * Alternatively, you could go with ds1 only and create the aggregates
> > 
> > of
> > 
> > > > the
> > > > per-state (2) and per-city (3) ones in a stateful aggregation function
> > > > yourself, e.g. in a MapState [1]. At the end of your aggregation
> > 
> > window,
> > 
> > > > you
> > > > could then emit those with different keys to be able to distinguish
> > > > between
> > > > them.
> > > > 
> > > > 
> > > > Nico
> > > > 
> > > > [1]
> > > > https://ci.apache.org/projects/flink/flink-docs-> > 
> > release-1.3/dev/stream/
> > 
> > > > state.html
> > > >  > 
> > release-1.3/dev/stream/st
> > 
> > > > ate.html>>
> > > > 
> > > > On Sunday, 13 August 2017 23:13:00 CEST Basanth Gowda wrote:
> > > > > For example - this is a sample model from one of the Apache Apex
> > > > > presentation.
> > > > > 
> > > > > I would want to aggregate 

Re: Flink HA with Kubernetes, without Zookeeper

2017-08-21 Thread Shannon Carey
Zookeeper should still be necessary even in that case, because it is where the 
JobManager stores information which needs to be recovered after the JobManager 
fails.

We're eyeing https://github.com/coreos/zetcd as a way to run Zookeeper on top 
of Kubernetes' etcd cluster so that we don't have to rely on a separate 
Zookeeper cluster. However, we haven't tried it yet.

-Shannon

From: Hao Sun >
Date: Sunday, August 20, 2017 at 9:04 PM
To: "user@flink.apache.org" 
>
Subject: Flink HA with Kubernetes, without Zookeeper

Hi, I am new to Flink and trying to bring up a Flink cluster on top of 
Kubernetes.

For HA setup, with kubernetes, I think I just need one job manager and do not 
need Zookeeper? I will store all states to S3 buckets. So in case of failure, 
kubernetes can just bring up a new job manager without losing anything?

I want to confirm my assumptions above make sense. Thanks


Re: [EXTERNAL] Re: Fink application failing with kerberos issue after running successfully without any issues for few days

2017-08-21 Thread Raja . Aravapalli

Thanks Gordon.


Regards,
Raja.

From: "Tzu-Li (Gordon) Tai" 
Date: Thursday, August 17, 2017 at 11:47 PM
To: Raja Aravapalli , "user@flink.apache.org" 

Subject: Re: [EXTERNAL] Re: Fink application failing with kerberos issue after 
running successfully without any issues for few days

Hi Raja,

Can you please confirm if I have to use the below settings to ensure I use 
keytabs?


  *   security.kerberos.login.use-ticket-cache:
Indicates whether to read from your Kerberos ticket cache (default: true).


  *   security.kerberos.login.keytab:
Absolute path to a Kerberos keytab file that contains the user credentials.


  *   security.kerberos.login.principal:
Kerberos principal name associated with the keytab.


  *   security.kerberos.login.contexts: A comma-separated list of login 
contexts to provide the Kerberos credentials to (for example, 
Client,KafkaClient to use the credentials for ZooKeeper authentication and for 
Kafka authentication).

Yes, these are the exact configs that you’ll need to set.

Also a quick question, once I make these changes to use keytabs instead of 
ticket cache, Is there any place in the logs I can check, were the setting I 
made are in use and the applications are not actually using again ticket cache 
again?
You should be able to find logs such as “Adding keytab  to the AM 
container …” at the beginning of the job submission.

Cheers,
Gordon

On 18 August 2017 at 5:51:57 AM, Raja.Aravapalli 
(raja.aravapa...@target.com) wrote:

Thanks a lot Eron…

If I am understanding you correct, you suggest using keytabs to launch 
streaming applications!

Can you please confirm if I have to use the below settings to ensure I use 
keytabs?


  *   security.kerberos.login.use-ticket-cache:
Indicates whether to read from your Kerberos ticket cache (default: true).


  *   security.kerberos.login.keytab:
Absolute path to a Kerberos keytab file that contains the user credentials.


  *   security.kerberos.login.principal:
Kerberos principal name associated with the keytab.


  *   security.kerberos.login.contexts: A comma-separated list of login 
contexts to provide the Kerberos credentials to (for example, 
Client,KafkaClient to use the credentials for ZooKeeper authentication and for 
Kafka authentication).

https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/config.html#kerberos-based-security-1


Also a quick question, once I make these changes to use keytabs instead of 
ticket cache, Is there any place in the logs I can check, were the setting I 
made are in use and the applications are not actually using again ticket cache 
again?

Thanks a lot, in advance.


Regards,
Raja.

From: Eron Wright 
Date: Thursday, August 17, 2017 at 1:06 PM
To: Ted Yu 
Cc: Raja Aravapalli , "user@flink.apache.org" 

Subject: Re: [EXTERNAL] Re: Fink application failing with kerberos issue after 
running successfully without any issues for few days

Raja,
According to those configuration values, the delegation token would be 
automatically renewed every 24 hours, then expire entirely after 7 days.   You 
say that the job ran without issue for 'a few days'.  Can we conclude that the 
job hit the 7-day DT expiration?

Flink supports the use of Kerberos keytabs as an alternative to delegation 
tokens for exactly this reason, that delegation tokens eventually expire and so 
aren't useful to a long-running program.   Consider making use of keytabs here.

Hope this helps!
-Eron


On Thu, Aug 17, 2017 at 9:58 AM, Ted Yu 
> wrote:
I think this needs to be done by the admin.

On Thu, Aug 17, 2017 at 9:37 AM, Raja.Aravapalli 
> wrote:

I don’t have access to the site.xml files, it is controlled by a support team.

Does flink has any configuration settings or api’s thru which we can control 
this ?


Regards,
Raja.

From: Ted Yu >
Date: Thursday, August 17, 2017 at 11:07 AM
To: Raja Aravapalli 
>
Cc: "user@flink.apache.org" 
>
Subject: Re: [EXTERNAL] Re: Fink application failing with kerberos issue after 
running successfully without any issues for few days

Can you try shortening renewal interval to something like 2880 ?

Cheers

On Thu, Aug 17, 2017 at 8:58 AM, Raja.Aravapalli 
> wrote:
Hi Ted,

Below is what I see in the environment:

dfs.namenode.delegation.token.max-lifetime:  60480
dfs.namenode.delegation.token.renew-interval:  8640


Thanks.


Regards,
Raja.

From: Ted Yu 

RE: Great number of jobs and numberOfBuffers

2017-08-21 Thread Gwenhael Pasquiers
Hi,

1/ Yes, the loop is part of the application I run on yarn. Something like :
public class MyFlinkApp {
public static void main(String[] args){
// parse arguments etc
for(String datehour:datehours){
ExecutionEnvironment env = 
ExecutionEnvironment.getExectionEnvironment();
env.readText(datehour)
.union(env.readText(datehour-1))
.union(env.readText(datehour-2))
.map()
.groupby()
.sortGroup()
.reduceGroup()
...

// other steps, unions, processing, inputs, outputs

JobExecutionResult result = env.execute();

// read accumulators and send some statsd statistics at 
the end of batch
}
}
}

2/ The prod settings are something like 6 nodes with 8 taskslots each, 32Gib 
per node.

3/ I remember that we had the same error (not enough buffers) right at startup. 
I guess that it was trying to allocate all buffers at startup as it is now 
doing it progressively (but still fails at the same limit)

4/ The program has many steps, it has about 5 inputs (readTextFile) and 2 
outputs (TextHadoopOutputFormat, one in the middle of the processing, the other 
at the end), it is composed of multiple union, flatmap, map, groupby, 
sortGroup, reduceGroup, filter, for each "batch". And if we start the flink app 
on a whole week of data, we will have to start (24 * 7) batches. Parallelism 
has the default value except for the output writers (32 and 4) in order to 
limit the numbers of files on HDFS.



-Original Message-
From: Nico Kruber [mailto:n...@data-artisans.com] 
Sent: vendredi 18 août 2017 14:58
To: Gwenhael Pasquiers 
Cc: Ufuk Celebi ; user@flink.apache.org
Subject: Re: Great number of jobs and numberOfBuffers

Hi Gwenhael,
the effect you describe sounds a bit strange. Just to clarify your setup:

1) Is the loop you were posting part of the application you run on yarn?
2) How many nodes are you running with?
3) What is the error you got when you tried to run the full program without 
splitting it?
4) can you give a rough sketch of what your program is composed of (operators, 
parallelism,...)? 


Nico

On Thursday, 17 August 2017 11:53:25 CEST Gwenhael Pasquiers wrote:
> Hello,
> 
> This bug was met in flink 1.0.1 over yarn (maybe the yarn behavior is 
> different ?). We've been having this issue for a long time and we were 
> careful not to schedule too many jobs.
 
> I'm currently upgrading the application towards flink 1.2.1 and I'd 
> like to try to solve this issue.
 
> I'm not submitting individual jobs to a standalone cluster.
> 
> I'm starting a single application that has a loop in its main function :
> for(. . .) {
>   Environment env = Environment.getExectionEnvironment();
>   env. . . .;
>   env.execute();
> }
> 
> 
> The job fails at some point later during execution with the following
> error:
 java.io.IOException: Insufficient number of network buffers:
> required 96, but only 35 available. The total number of network 
> buffers is currently set to 36864. You can increase this number by 
> setting the configuration key 'taskmanager.network.numberOfBuffers'. 
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBuf
> ferPo
> ol(NetworkBufferPool.java:196)
> Before splitting the job in multiple sub-jobs it failed right at startup.
> 
> Each "batch" job takes 10 to 30 minutes and it fails after about dozen 
> of them (the first ones should have had enough time to be recycled).
 
> We've already increased the jobmanager and "numberOfBuffers" values 
> quite a bit. That way we can handle days of data, but not weeks or 
> months. This is not very scalable. And as you say, I felt that those 
> buffers should be recycled and that way we should have no limit as 
> long as each batch is small enough.
 
> If I start my command again (removing the datehours that were 
> successfully
> processed) it will work since it's a fresh new cluster.
 
> -Original Message-
> From: Ufuk Celebi [mailto:u...@apache.org]
> Sent: jeudi 17 août 2017 11:24
> To: Ufuk Celebi 
> Cc: Gwenhael Pasquiers ;
> user@flink.apache.org; Nico Kruber 
 Subject: Re:
> Great number of jobs and numberOfBuffers
> 
> PS: Also pulling in Nico (CC'd) who is working on the network stack.
> 
> On Thu, Aug 17, 2017 at 11:23 AM, Ufuk Celebi  wrote:
> 
> > Hey Gwenhael,
> >
> >
> >
> > the network buffers are recycled automatically after a job terminates.
> > If this does not happen, it would be quite a major bug.
> >
> >
> >
> > To 

[Survey] How many people use Flink with AWS Kinesis sink

2017-08-21 Thread Bowen Li
Hi guys,
We want to have a more accurate idea of how many people are writing
Flink's computation result to AWS Kinesis, and how many people had
successful Flink deployment against Kinesis?

The reason I ask for the survey is because we have been trying to make
our Flink jobs and Kinesis sink work together for a long time but haven't
succeeded yet. We discovered quite a few issues with not only Flink's
flink-kinesis-connector but, most importantly, KPL (Kinesis Producer
Library) itself. Kinesis/KPL is poorly designed, we hate Kinesis, and we
are currently evaluating how much effort it further requires to make Flink
works with Kinesis.

If not many Flink users had good experience with Kinesis, we'll
probably need to look for some alternatives.

I really appreciate your time and your insight! Thank you very much!

Bowen


Re: Memory Issue

2017-08-21 Thread Jörn Franke
One would need to look at your code and possible on some heap statistics. Maybe 
something wrong happens when you cache them (do you use a 3rd party library or 
your own implementation?). Do you use a stable version of your protobuf library 
(not necessarily the most recent). You also may want to look at buffers to 
avoid creating objects (bytebuffer, stringbuffer etc).

Probably you are creating a lot of objects due to conversion into PoJo. You 
could increase the heap for the Java objects of the young generation.
You can also switch to the G1-Garbage collector (if Jdk 8) or at least the 
parallel one.
Generally you should avoid creating PoJo/objects as much as possible in a long 
running Streaming job.



> On 21. Aug 2017, at 05:29, Govindarajan Srinivasaraghavan 
>  wrote:
> 
> Hi,
> 
> I have a pipeline running on flink which ingests around 6k messages per 
> second. Each message is around 1kb and it passes through various stages like 
> filter, 5 sec tumbling window per key etc.. and finally flatmap to 
> computation before sending it to kafka sink. The data is first ingested as 
> protocol buffers and then in subsequent operators they are converted into 
> POJO's.
> 
> There are lots objects created inside the user functions and some of them are 
> cached as well. I have been running this pipeline on 48 task slots across 3 
> task manages with each one allocated with 22GB memory.
> 
> The issue I'm having is within a period of 10 hours, almost 19k young 
> generation GC have been run which is roughly every 2 seconds and GC time 
> taken value is more than 2 million. I have also enabled object reuse. Any 
> suggestions on how this issue could be resolved? Thanks.
> 
> Regards,
> Govind
>