Flink redshift table lookup and updates

2016-08-18 Thread Harshith Chennamaneni
Hi,

I've very recently come upon flink and I'm trying to use it to solve a
problem that I have.

I have a stream of User Settings updates coming through kafka queue. I need
to store the most recent settings along with a history of settings for each
user in redshift which then feeds into analytics dashboards.

I've been contemplating using Flink for this problem. I wanted some
guidance from people experienced in Flink to help me decide if Flink is
suited to this problem and if so what approach might work best. I am
considering the following approaches:

1. Create a secondary key-value database with the users latest settings and
lookup these settings after grouping the stream byKey(userId) to check if a
setting has changed and if so create a history record. I came across this
stackoverflow thread:
http://stackoverflow.com/questions/38866078/how-to-look-up-and-update-the-state-of-a-record-from-a-database-in-apache-flink
to help with this approach.

2. Pull the current snapshot of users from redshift and keep it as state in
Flink program at program start (the snapshot isn't huge ~1GB). Subsequently
lookup from this state and update it when processing events.

In both these cases I plan to create a Redshift sink that batches updates
to history as well as latest state and persists to redshift by batches
(through s3 and copy command for history, through a update on join for
snapshot).

Is one of these designs more suited to working with Flink? Is there an
alternative I should consider?

Thanks!

-H


Re: checkpoint state keeps on increasing

2016-08-18 Thread Janardhan Reddy
I also thought that the checkpointing state size was growing due to growing
key space, i registered processing time timer on 'onevent' and wrote a
custom trigger and still the checkpointing state size was growing.

Our code is linked with flink 1.0.0 jar but was running on flink 1.1.1
(yarn session).

On running the code with flink 1.0.0(on yarn), we are no longer noticing
the issue with the same code.

On Thu, Aug 18, 2016 at 2:46 PM, Stephan Ewen  wrote:

> Hi!
>
> Count windows are a little tricky. If you have a growing key space, the
> state keeps growing.
>
> Lets say you have a bunch of values for key "A". You will fire the count
> windows for every two elements and keep one element. If "A" never comes
> again after that, the one element will still be kept around, in order to
> fire when the next element for "A" comes.
>
> That is just the nature of count windows. In practice, count windows will
> mostly need a "timeout" after which they will be cleared. Think of it as a
> session window (cleared when there are x hours of inactivity) inside which
> there are repeated count-based triggers.
>
> Does that explain your case?
>
> Greetings,
> Stephan
>
>
> On Thu, Aug 18, 2016 at 8:11 AM, Janardhan Reddy <
> janardhan.re...@olacabs.com> wrote:
>
>> Hi,
>>
>> I am noticing that the checkpointing state has been constantly growing
>> for the below subtask. Only the current active window elements should be
>> checkpointed ? why is it constantly growing ?
>>
>> finalStream.keyBy("<>").countWindow(2,1)
>>   .apply((_, _, input: scala.Iterable[], out: Collector[]) => {
>> val inputArray = input.toArray
>>
>> ... do something
>>
>> }
>>
>>
>


Re: flink on yarn - Fatal error in AM: The ContainerLaunchContext was not set

2016-08-18 Thread Miroslav Gajdoš
Tried to build it from source as well as use prebuilt binary release
(v1.1.1), the last one produced this log output:
http://pastebin.com/3L5Yhs9x

Application in yarn still fails on "Fatal error in AM: The
ContainerLaunchContext was not set".

Mira

Miroslav Gajdoš píše v Čt 18. 08. 2016 v 10:36 +0200:
> Hi Max,
> 
> we are building it from sources and package it for debian. I can try
> to
> use the binary release for hadoop 2.6.0.
> 
> Regarding zookeeper, we do not share instances between dev and
> production.
> 
> Thanks,
> Miroslav
> 
> Maximilian Michels píše v Čt 18. 08. 2016 v 10:17 +0200:
> > 
> > Hi Miroslav,
> > 
> > From the logs it looks like you're using Flink version 1.0.x. The
> > ContainerLaunchContext is always set by Flink. I'm wondering why
> > this
> > error can still occur. Are you using the default Hadoop version
> > that
> > comes with Flink (2.3.0)? You could try the Hadoop 2.6.0 build of
> > Flink.
> > 
> > Does your Dev cluster share the Zookeeper installation with the
> > production cluster? I'm wondering because it receives incorrect
> > leadership information although the leading JobManager seems to be
> > attempting to register at the ApplicationMaster.
> > 
> > Best,
> > Max
> > 
> > On Tue, Aug 16, 2016 at 1:28 PM, Miroslav Gajdoš
> >  wrote:
> > > 
> > > 
> > > Log from yarn session runner is here:
> > > http://pastebin.com/xW1W4HNP
> > > 
> > > Our hadoop distribution is from cloudera, resourcenanager
> > > version:
> > > 2.6.0-cdh5.4.5, it runs in HA mode (there could be some
> > > redirecting
> > > on
> > > accessing resourcemanager and/or namenode to active one).
> > > 
> > > Ufuk Celebi píše v Út 16. 08. 2016 v 12:18 +0200:
> > > > 
> > > > 
> > > > This could be a bug in Flink. Can you share the complete logs
> > > > of
> > > > the
> > > > run? CC'ing Max who worked on the YARN client recently who
> > > > might
> > > > have
> > > > an idea in which cases Flink would not set the context.
> > > > 
> > > > On Tue, Aug 16, 2016 at 11:00 AM, Miroslav Gajdoš
> > > >  wrote:
> > > > > 
> > > > > 
> > > > > 
> > > > > Hi guys,
> > > > > 
> > > > > i've run into some problems with flink/yarn. I try to deploy
> > > > > flink
> > > > > to
> > > > > our cluster using /usr/lib/flink-scala2.10/bin/yarn-
> > > > > session.sh, 
> > > > > but
> > > > > the
> > > > > yarn application does not even start, it goes from accepted
> > > > > to
> > > > > finished/failed. Yarn info on resourcemanager looks like
> > > > > this:
> > > > > 
> > > > > User:   wa-flink
> > > > > Name:   Flink session with 3 TaskManagers
> > > > > Ap
> > > > > plication Type: Apache Flink
> > > > > Application Tags:
> > > > > State:  FINISHED
> > > > > FinalStatus:FAILED
> > > > > Started:Mon Aug 15 18:02:42 +0200 2016
> > > > > Elapsed:16sec
> > > > > Tracking URL:   History
> > > > > Diagnostics:Fatal error in AM: The ContainerLaunchContext
> > > > > was
> > > > > not set.
> > > > > 
> > > > > On dev cluster, applications deploys without problem, this
> > > > > happens
> > > > > only
> > > > > in production.
> > > > > 
> > > > > What could be wrong?
> > > > > 
> > > > > 
> > > > > Thanks,
> > > > > 
> > > > > --
> > > > > Miroslav Gajdoš
> > > > > vývoj - webová analytika (Brno)
> > > > > https://reporter.seznam.cz
> > > > > miroslav.gaj...@firma.seznam.cz
> > > > > 
> > > > > 
> > > --
> > > Miroslav Gajdoš
> > > vývoj - webová analytika (Brno)
> > > https://reporter.seznam.cz
> > > miroslav.gaj...@firma.seznam.cz
-- 
Miroslav Gajdoš
vývoj - webová analytika (Brno)
https://reporter.seznam.cz
miroslav.gaj...@firma.seznam.cz


Checking for existance of output directory/files before running a batch job

2016-08-18 Thread Niels Basjes
Hi,

I have a batch job that I run on yarn that creates files in HDFS.
I want to avoid running this job at all if the output already exists.

So in my code (before submitting the job into yarn-session) I do this:

String directory = "foo";

Path directory = new Path(directoryName);FileSystem fs =
directory.getFileSystem();
if (!fs.exists(directory)) {

// run the job

}

What I found is that this code apparently checks the 'wrong' file
system. (I always get 'false' even if it exists in hdfs)

I checked the API of the execution environment yet I was unable to get
the 'correct' filesystem from there.

What is the proper way to check this?


-- 
Best regards / Met vriendelijke groeten,

Niels Basjes


Batch jobs with a very large number of input splits

2016-08-18 Thread Niels Basjes
Hi,

I'm working on a batch process using Flink and I ran into an interesting
problem.
The number of input splits in my job is really really large.

I currently have a HBase input (with more than 1000 regions) and in the
past I have worked with MapReduce jobs doing 2000+ files.

The problem I have is that if I run such a job in a "small" yarn-session
(i.e. less than 1000 tasks) I get a fatal error indicating that there are
not enough resources.
For a continuous streaming job this makes sense, yet for a batch job (like
I'm having) this is an undesirable error.

For my HBase situation I currently have a workaround by overriding the
creatInputSplits method from the TableInputFormat and thus control the
input splits that are created.

What is the correct way to solve this (no my cluster is NOT big enough to
run that many parallel tasks) ?


-- 
Best regards / Met vriendelijke groeten,

Niels Basjes


Re: counting elements in datastream

2016-08-18 Thread Sameer W
Use Count windows and keep emitting results say every 1000 elements and do
a sum. Or do without windows something like this which has the disadvantage
that it emits a new updated result for each new element (not a good thing
if your volume is high)-

https://github.com/sameeraxiomine/flinkinaction/blob/master/flinkinactionjava/src/main/java/com/manning/fia/c02/SimpleStreamingWordCount.java

Or use tumbling time windows on processing time -
 
https://github.com/sameeraxiomine/flinkinaction/blob/master/flinkinactionjava/src/main/java/com/manning/fia/c04/TimeWindowExample.java
.
Advantage over count windows is that you get a count every few (configured
seconds) which you can then add up on your client side.

Since you do not need a keyBy operation you would do this directly on the
DataStream instance without doing a keyBy but that way you get multiple
counts per partition of the stream which you will need to add up.





On Thu, Aug 18, 2016 at 5:54 AM, subash basnet  wrote:

> Hello all,
>
> If anyone had idea, what could be the probable way to count the elements
> of a current instance of the datastream. Is it possible?
>
> DataStream> pointsWithGridCoordinates;
>
>
>
> Regards,
> Subash Basnet
>


counting elements in datastream

2016-08-18 Thread subash basnet
Hello all,

If anyone had idea, what could be the probable way to count the elements of
a current instance of the datastream. Is it possible?

DataStream> pointsWithGridCoordinates;



Regards,
Subash Basnet


Re: checkpoint state keeps on increasing

2016-08-18 Thread Stephan Ewen
Hi!

Count windows are a little tricky. If you have a growing key space, the
state keeps growing.

Lets say you have a bunch of values for key "A". You will fire the count
windows for every two elements and keep one element. If "A" never comes
again after that, the one element will still be kept around, in order to
fire when the next element for "A" comes.

That is just the nature of count windows. In practice, count windows will
mostly need a "timeout" after which they will be cleared. Think of it as a
session window (cleared when there are x hours of inactivity) inside which
there are repeated count-based triggers.

Does that explain your case?

Greetings,
Stephan


On Thu, Aug 18, 2016 at 8:11 AM, Janardhan Reddy <
janardhan.re...@olacabs.com> wrote:

> Hi,
>
> I am noticing that the checkpointing state has been constantly growing for
> the below subtask. Only the current active window elements should be
> checkpointed ? why is it constantly growing ?
>
> finalStream.keyBy("<>").countWindow(2,1)
>   .apply((_, _, input: scala.Iterable[], out: Collector[]) => {
> val inputArray = input.toArray
>
> ... do something
>
> }
>
>


Re: flink on yarn - Fatal error in AM: The ContainerLaunchContext was not set

2016-08-18 Thread Miroslav Gajdoš
Hi Max,

we are building it from sources and package it for debian. I can try to
use the binary release for hadoop 2.6.0.

Regarding zookeeper, we do not share instances between dev and
production.

Thanks,
Miroslav

Maximilian Michels píše v Čt 18. 08. 2016 v 10:17 +0200:
> Hi Miroslav,
> 
> From the logs it looks like you're using Flink version 1.0.x. The
> ContainerLaunchContext is always set by Flink. I'm wondering why this
> error can still occur. Are you using the default Hadoop version that
> comes with Flink (2.3.0)? You could try the Hadoop 2.6.0 build of
> Flink.
> 
> Does your Dev cluster share the Zookeeper installation with the
> production cluster? I'm wondering because it receives incorrect
> leadership information although the leading JobManager seems to be
> attempting to register at the ApplicationMaster.
> 
> Best,
> Max
> 
> On Tue, Aug 16, 2016 at 1:28 PM, Miroslav Gajdoš
>  wrote:
> > 
> > Log from yarn session runner is here:
> > http://pastebin.com/xW1W4HNP
> > 
> > Our hadoop distribution is from cloudera, resourcenanager version:
> > 2.6.0-cdh5.4.5, it runs in HA mode (there could be some redirecting
> > on
> > accessing resourcemanager and/or namenode to active one).
> > 
> > Ufuk Celebi píše v Út 16. 08. 2016 v 12:18 +0200:
> > > 
> > > This could be a bug in Flink. Can you share the complete logs of
> > > the
> > > run? CC'ing Max who worked on the YARN client recently who might
> > > have
> > > an idea in which cases Flink would not set the context.
> > > 
> > > On Tue, Aug 16, 2016 at 11:00 AM, Miroslav Gajdoš
> > >  wrote:
> > > > 
> > > > 
> > > > Hi guys,
> > > > 
> > > > i've run into some problems with flink/yarn. I try to deploy
> > > > flink
> > > > to
> > > > our cluster using /usr/lib/flink-scala2.10/bin/yarn-session.sh, 
> > > > but
> > > > the
> > > > yarn application does not even start, it goes from accepted to
> > > > finished/failed. Yarn info on resourcemanager looks like this:
> > > > 
> > > > User:   wa-flink
> > > > Name:   Flink session with 3 TaskManagers
> > > > Ap
> > > > plication Type: Apache Flink
> > > > Application Tags:
> > > > State:  FINISHED
> > > > FinalStatus:FAILED
> > > > Started:Mon Aug 15 18:02:42 +0200 2016
> > > > Elapsed:16sec
> > > > Tracking URL:   History
> > > > Diagnostics:Fatal error in AM: The ContainerLaunchContext
> > > > was
> > > > not set.
> > > > 
> > > > On dev cluster, applications deploys without problem, this
> > > > happens
> > > > only
> > > > in production.
> > > > 
> > > > What could be wrong?
> > > > 
> > > > 
> > > > Thanks,
> > > > 
> > > > --
> > > > Miroslav Gajdoš
> > > > vývoj - webová analytika (Brno)
> > > > https://reporter.seznam.cz
> > > > miroslav.gaj...@firma.seznam.cz
> > > > 
> > > > 
> > --
> > Miroslav Gajdoš
> > vývoj - webová analytika (Brno)
> > https://reporter.seznam.cz
> > miroslav.gaj...@firma.seznam.cz
-- 
Miroslav Gajdoš
vývoj - webová analytika (Brno)
https://reporter.seznam.cz
miroslav.gaj...@firma.seznam.cz



Re: off heap memory deallocation

2016-08-18 Thread Maximilian Michels
Hi,

Off-heap memory currently only gets deallocated once MaxDirectMemory
has been reached. We can't manually clear the memory because some of
the code assumes that it can still access old memory after it has been
released. In case of offheap memory, that would give us a segmentation
fault.

We currently recommend not to use offheap memory with lazy memory
allocation (preallocation: false).

Best,
Max

On Wed, Aug 17, 2016 at 8:01 PM, Janardhan Reddy
 wrote:
> Hi,
>
> When does off heap memory gets deallocated ? Does it get deallocated only
> when gc is triggered ? When does the gc gets triggered other than when the
> direct memory reached -XX::MaxDirectMemory limit passed in jvm flag.
>
> Thanks


Re: flink on yarn - Fatal error in AM: The ContainerLaunchContext was not set

2016-08-18 Thread Maximilian Michels
Hi Miroslav,

>From the logs it looks like you're using Flink version 1.0.x. The
ContainerLaunchContext is always set by Flink. I'm wondering why this
error can still occur. Are you using the default Hadoop version that
comes with Flink (2.3.0)? You could try the Hadoop 2.6.0 build of
Flink.

Does your Dev cluster share the Zookeeper installation with the
production cluster? I'm wondering because it receives incorrect
leadership information although the leading JobManager seems to be
attempting to register at the ApplicationMaster.

Best,
Max

On Tue, Aug 16, 2016 at 1:28 PM, Miroslav Gajdoš
 wrote:
> Log from yarn session runner is here:
> http://pastebin.com/xW1W4HNP
>
> Our hadoop distribution is from cloudera, resourcenanager version:
> 2.6.0-cdh5.4.5, it runs in HA mode (there could be some redirecting on
> accessing resourcemanager and/or namenode to active one).
>
> Ufuk Celebi píše v Út 16. 08. 2016 v 12:18 +0200:
>> This could be a bug in Flink. Can you share the complete logs of the
>> run? CC'ing Max who worked on the YARN client recently who might have
>> an idea in which cases Flink would not set the context.
>>
>> On Tue, Aug 16, 2016 at 11:00 AM, Miroslav Gajdoš
>>  wrote:
>> >
>> > Hi guys,
>> >
>> > i've run into some problems with flink/yarn. I try to deploy flink
>> > to
>> > our cluster using /usr/lib/flink-scala2.10/bin/yarn-session.sh, but
>> > the
>> > yarn application does not even start, it goes from accepted to
>> > finished/failed. Yarn info on resourcemanager looks like this:
>> >
>> > User:   wa-flink
>> > Name:   Flink session with 3 TaskManagers
>> > Ap
>> > plication Type: Apache Flink
>> > Application Tags:
>> > State:  FINISHED
>> > FinalStatus:FAILED
>> > Started:Mon Aug 15 18:02:42 +0200 2016
>> > Elapsed:16sec
>> > Tracking URL:   History
>> > Diagnostics:Fatal error in AM: The ContainerLaunchContext was
>> > not set.
>> >
>> > On dev cluster, applications deploys without problem, this happens
>> > only
>> > in production.
>> >
>> > What could be wrong?
>> >
>> >
>> > Thanks,
>> >
>> > --
>> > Miroslav Gajdoš
>> > vývoj - webová analytika (Brno)
>> > https://reporter.seznam.cz
>> > miroslav.gaj...@firma.seznam.cz
>> >
>> >
> --
> Miroslav Gajdoš
> vývoj - webová analytika (Brno)
> https://reporter.seznam.cz
> miroslav.gaj...@firma.seznam.cz


checkpoint state keeps on increasing

2016-08-18 Thread Janardhan Reddy
Hi,

I am noticing that the checkpointing state has been constantly growing for
the below subtask. Only the current active window elements should be
checkpointed ? why is it constantly growing ?

finalStream.keyBy("<>").countWindow(2,1)
  .apply((_, _, input: scala.Iterable[], out: Collector[]) => {
val inputArray = input.toArray

... do something

}


problem running flink using remote environment

2016-08-18 Thread Baswaraj Kasture
I am using flink 1.1.1.
I am trying to run flink streaming program (kafka as source).
It works perfectly when I use
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

But, problem is when I use one of the following to create env.

StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("103.179.55.121", 6123, new 
Configuration(), "/home/me/my.jar");

Or

StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("103.179.55.121", 6123,  
"/home/me/my.jar");

Here is stack trace of the error:

Using address 103.179.55.121:6123 to connect to JobManager.
JobManager web interface address http://103.179.55.121:8081
Starting execution of program


The program finished with the following exception:

The RemoteEnvironment cannot be used when submitting a program through a 
client, or running in a TestEnvironment context.

org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.(RemoteStreamEnvironment.java:133)

org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.(RemoteStreamEnvironment.java:104)

org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.createRemoteEnvironment(StreamExecutionEnvironment.java:1659)

Am I missing anything ? Where How do I get instruction to get this working ?
I started flink cluster using:
$ bin/start-cluster.sh

Command I used to submit the job:

$ bin/flink run -c com.mine.Kafka  /home/me/my.jar



DISCLAIMER
==
This e-mail may contain privileged and confidential information which is the 
property of Persistent Systems Ltd. It is intended only for the use of the 
individual or entity to which it is addressed. If you are not the intended 
recipient, you are not authorized to read, retain, copy, print, distribute or 
use this message. If you have received this communication in error, please 
notify the sender and delete all copies of this message. Persistent Systems 
Ltd. does not accept any liability for virus infected mails.