Re: streaming join implementation

2016-04-13 Thread Balaji Rajagopalan
Let me give you specific example, say stream1 event1 happened within your
window 0-5 min with key1, and event2 on stream2 with key2 which could have
matched with key1 happened at 5:01 outside the join window, so now you will
have to co-relate the event2 on stream2 with the event1 with stream1 which
has happened on the previous window, this was the corner case I mentioned
before. I am not aware if flink can solve this problem for you, that would
be nice, instead of solving this in application.

On Thu, Apr 14, 2016 at 12:10 PM, Henry Cai  wrote:

> Thanks Balaji.  Do you mean you spill the non-matching records after 5
> minutes into redis?  Does flink give you control on which records is not
> matching in the current window such that you can copy into a long-term
> storage?
>
>
>
> On Wed, Apr 13, 2016 at 11:20 PM, Balaji Rajagopalan <
> balaji.rajagopa...@olacabs.com> wrote:
>
>> You can implement join in flink (which is a inner join) the below
>> mentioned pseudo code . The below join is for a 5 minute interval, yes will
>> be some corners cases when the data coming after 5 minutes will be  missed
>> out in the join window, I actually had solved this problem but storing some
>> data in redis and wrote correlation logic to take care of the corner cases
>> that were missed out in the join  window.
>>
>> val output: DataStream[(OutputData)] = 
>> stream1.join(stream2).where(_.key1).equalTo(_.key2).
>>   window(TumblingEventTimeWindows.of(Time.of(5, TimeUnit.MINUTE))).apply(new 
>> SomeJoinFunction)
>>
>>
>> On Thu, Apr 14, 2016 at 10:02 AM, Henry Cai  wrote:
>>
>>> Hi,
>>>
>>> We are evaluating different streaming platforms.  For a typical join
>>> between two streams
>>>
>>> select a.*, b.*
>>> FROM a, b
>>> ON a.id == b.id
>>>
>>> How does flink implement the join?  The matching record from either
>>> stream can come late, we consider it's a valid join as long as the event
>>> time for record a and b are in the same day.
>>>
>>> I think some streaming platform (e.g. google data flow) will store the
>>> records from both streams in a K/V lookup store and later do the lookup.
>>> Is this how flink implement the streaming join?
>>>
>>> If we need to store all the records in a state store, that's going to be
>>> a lots of records for a day.
>>>
>>>
>>
>


Re: streaming join implementation

2016-04-13 Thread Henry Cai
Thanks Balaji.  Do you mean you spill the non-matching records after 5
minutes into redis?  Does flink give you control on which records is not
matching in the current window such that you can copy into a long-term
storage?



On Wed, Apr 13, 2016 at 11:20 PM, Balaji Rajagopalan <
balaji.rajagopa...@olacabs.com> wrote:

> You can implement join in flink (which is a inner join) the below
> mentioned pseudo code . The below join is for a 5 minute interval, yes will
> be some corners cases when the data coming after 5 minutes will be  missed
> out in the join window, I actually had solved this problem but storing some
> data in redis and wrote correlation logic to take care of the corner cases
> that were missed out in the join  window.
>
> val output: DataStream[(OutputData)] = 
> stream1.join(stream2).where(_.key1).equalTo(_.key2).
>   window(TumblingEventTimeWindows.of(Time.of(5, TimeUnit.MINUTE))).apply(new 
> SomeJoinFunction)
>
>
> On Thu, Apr 14, 2016 at 10:02 AM, Henry Cai  wrote:
>
>> Hi,
>>
>> We are evaluating different streaming platforms.  For a typical join
>> between two streams
>>
>> select a.*, b.*
>> FROM a, b
>> ON a.id == b.id
>>
>> How does flink implement the join?  The matching record from either
>> stream can come late, we consider it's a valid join as long as the event
>> time for record a and b are in the same day.
>>
>> I think some streaming platform (e.g. google data flow) will store the
>> records from both streams in a K/V lookup store and later do the lookup.
>> Is this how flink implement the streaming join?
>>
>> If we need to store all the records in a state store, that's going to be
>> a lots of records for a day.
>>
>>
>


Re: streaming join implementation

2016-04-13 Thread Balaji Rajagopalan
You can implement join in flink (which is a inner join) the below mentioned
pseudo code . The below join is for a 5 minute interval, yes will be some
corners cases when the data coming after 5 minutes will be  missed out in
the join window, I actually had solved this problem but storing some data
in redis and wrote correlation logic to take care of the corner cases that
were missed out in the join  window.

val output: DataStream[(OutputData)] =
stream1.join(stream2).where(_.key1).equalTo(_.key2).
  window(TumblingEventTimeWindows.of(Time.of(5,
TimeUnit.MINUTE))).apply(new SomeJoinFunction)


On Thu, Apr 14, 2016 at 10:02 AM, Henry Cai  wrote:

> Hi,
>
> We are evaluating different streaming platforms.  For a typical join
> between two streams
>
> select a.*, b.*
> FROM a, b
> ON a.id == b.id
>
> How does flink implement the join?  The matching record from either stream
> can come late, we consider it's a valid join as long as the event time for
> record a and b are in the same day.
>
> I think some streaming platform (e.g. google data flow) will store the
> records from both streams in a K/V lookup store and later do the lookup.
> Is this how flink implement the streaming join?
>
> If we need to store all the records in a state store, that's going to be a
> lots of records for a day.
>
>


streaming join implementation

2016-04-13 Thread Henry Cai
Hi,

We are evaluating different streaming platforms.  For a typical join
between two streams

select a.*, b.*
FROM a, b
ON a.id == b.id

How does flink implement the join?  The matching record from either stream
can come late, we consider it's a valid join as long as the event time for
record a and b are in the same day.

I think some streaming platform (e.g. google data flow) will store the
records from both streams in a K/V lookup store and later do the lookup.
Is this how flink implement the streaming join?

If we need to store all the records in a state store, that's going to be a
lots of records for a day.


Re: Does Kafka connector leverage Kafka message keys?

2016-04-13 Thread Elias Levy
On Wed, Apr 13, 2016 at 2:10 AM, Stephan Ewen  wrote:

> If you want to use Flink's internal key/value state, however, you need to
> let Flink re-partition the data by using "keyBy()". That is because Flink's
> internal sharding of state (including the re-sharding to adjust parallelism
> we are currently working on) follows a dedicated hashing scheme which is
> with all likelihood different from the partition function that writes the
> key/value pairs to the Kafka Topics.
>

That is interesting, if somewhat disappointing.  I was hoping that
performing a keyBy from a Kafka source would perform no reshuffling if you
used the same value as you used for the Kafka message key.  But it makes
sense if you are using different hash functions.

It may be useful to have a variant of keyBy() that converts the stream to a
KeyedStream but performs no shuffling if the caller is certain that the
DataStream is already partitioned by the given key.


Re: RocksDB Statebackend

2016-04-13 Thread Konstantin Knauf
Hi Aljoscha,

thanks for your answers. I am currently not in the office, so I can not
run any further analysis until Monday. Just some quick answers to your
questions.

We are using the partitioned state abstraction, most of the state should
correspond to buffered events in windows. Parallelism is 9. In terms of
stateful operators we basically just have a KafkaSource, a custom
stateful trigger as well as a RollingSink. Overall in this test scenario
the state is very limited (see size of state using FsStateBackend).

I will get back to you once, I have done some more experiments, which
will be in the course of next week.

Cheers,

Konstantin


On 12.04.2016 18:41, Aljoscha Krettek wrote:
> Hi,
> I'm going to try and respond to each point:
> 
> 1. This seems strange, could you give some background on parallelism,
> number of operators with state and so on? Also, I'm assuming you are
> using the partitioned state abstraction, i.e. getState(), correct?
> 
> 2. your observations are pretty much correct. The reason why RocksDB is
> slower is that the FsStateBackend basically stores the state in a Java
> HashMap and writes the contents to HDFS when checkpointing. RocksDB
> stores data in on-disk files and goes to them for every state access (of
> course there are caches, but generally it is like this). I'm actually
> impressed that it is still this fast in comparison.
> 
> 3. see 1. (I think for now)
> 
> 4. The checkpointing time is the time from the JobManager deciding to
> start a checkpoint until all tasks have confirmed that checkpoint. I
> have seen this before and I think it results from back pressure. The
> problem is that the checkpoint messages that we sent through the
> topology are sitting at the sources because they are also back pressured
> by the slow processing of normal records. You should be able to see the
> actual checkpointing times (both synchronous and asynchronous) in the
> log files of the task managers, they should be very much lower.
> 
> I can go into details, I'm just writing this quickly before calling it a
> day. :-)
> 
> Cheers,
> Aljoscha
> 
> On Tue, 12 Apr 2016 at 18:21 Konstantin Knauf
> mailto:konstantin.kn...@tngtech.com>> wrote:
> 
> Hi everyone,
> 
> my experience with RocksDBStatebackend have left me a little bit
> confused. Maybe you guys can confirm that my epxierence is the expected
> behaviour ;):
> 
> I have run a "performancetest" twice, once with FsStateBackend and once
> RocksDBStatebackend in comparison. In this particular test the state
> saved is generally not large (in a production scenario it will be
> larger.)
> 
> These are my observations:
> 
> 1. Minimal Checkpoint Size (no records) with RocksDB was 33MB compared
> to <<1MB with the FSStatebackend.
> 
> 2. Throughput dropped from 28k/s -> 18k/s on a small cluster.
> 
> 3. Checkpoint sizes as reported in the Dashboard was ca. 1MB for
> FsStatebackend but >100MB for RocksDbStatebackend. I hope the difference
> gets smaller for very large state. Can you confirm?
> 
> 4. Checkpointing Times as reported in the Dashboard were 26secs for
> RocksDB during the test and <1 second for FsStatebackend. Does the
> reported time correspond to the sync. + asynchronous part of the
> checkpointing in case of RocksDB? Is there any way to tell how long the
> synchronous part takes?
> 
> Form these first observations RocksDB does seem to bring a large
> overhead for state < 1GB, I guess? Is this expected?
> 
> Cheers,
> 
> Konstantin
> 

-- 
Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082


Re: Sink partitioning

2016-04-13 Thread Konstantin Knauf
Hi,

calling DataStream.partitionCustom() with the respective arguments
before the sink should do the trick, I think.

Cheers,

Konstantin

On 14.04.2016 01:22, neo21 zerro wrote:
> Hello everybody,
> 
> I have an elasticsearch sink in my flink topology.
> My requirement is to write the data in a partitioned fashion to my Sink. 
> 
> For example I have Tuple which contains a user id. I want to group all events 
> by a user id and partition all events for one particular user to the same Es 
> Sink. 
> 
> Is it possible to achieve something like this in Flink?
> 
> 
> Thanks!
> 

-- 
Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082


Re: How to perform this join operation?

2016-04-13 Thread Maxim
You could simulate the Samza approach by having a RichFlatMapFunction over
cogrouped streams that maintains the sliding window in its ListState. As I
understand the drawback is that the list state is not maintained in the
managed memory.
I'm interested to hear about the right way to implement this.

On Wed, Apr 13, 2016 at 3:53 PM, Elias Levy 
wrote:

> I am wondering how you would implement the following function in Flink.
> The function takes as input two streams.  One stream can be viewed a a
> tuple with two value *(x, y)*, the second stream is a stream of
> individual values *z*.  The function keeps a time based window on the
> first input (say, 24 hours).  Whenever it receives an element from the
> second stream, it compares the value *z* against the *x* element of each
> tuple in the window, and for each match it emits *(x, y)*.  You are
> basically doing a join on *x=z*.  Note that values from the second stream
> are not windowed and they are only matched to values from the first stream
> with an earlier timestamps.
>
> This was relatively easy to implement in Samza.  Consume off two topics,
> the first keyed by *x* and the second by *z*.  Consume both topics in a
> job.  Messages with the same key would be consumed by the same task.  The
> task could maintain a window of messages from the first stream in its local
> state,  Whenever a message came in via the second stream, it could look up
> in the local state for matching messages, and if it found any, send them to
> the output stream.  Obviously, with Samza you don't have the luxury of the
> system handling event time for you, but this work well and it is easy to
> implement.
>
> I am not clear how this would be implemented in Flink.
>
> It is easy enough to partition by key either stream, and to window the
> first stream using a sliding window, but from there out things get
> complicated.
>
> You can join the two streams by key, but you must do so using the same
> window for both streams.  That means events from the first stream may be
> matched to older events of the second stream, which is not what we want.  I
> suppose if both included a timestamp, you could later add a filter to
> remove such events from the merged stream.  But you would also have to deal
> with duplicates, as the window is a sliding window and the same two
> elements may match across all window panes that contain the matching
> elements.  So you need to dedup as well.
>
> coGroup seems like it would suffer from the same issues.
>
> Maybe the answer is connected streams, but there is scant documentation on
> the semantics of ConnectedStreams.  There isn't even an example that I
> could find that makes use of them.
>
> Thoughts?
>
>
>
>
>


Sink partitioning

2016-04-13 Thread neo21 zerro
Hello everybody,

I have an elasticsearch sink in my flink topology.
My requirement is to write the data in a partitioned fashion to my Sink. 

For example I have Tuple which contains a user id. I want to group all events 
by a user id and partition all events for one particular user to the same Es 
Sink. 

Is it possible to achieve something like this in Flink?


Thanks!


Missing metrics on Flink UI

2016-04-13 Thread neo21 zerro
Hello everybody, 

I have deployed the latest Flink Version 1.0.1 on Yarn 2.5.0-cdh5.3.0.
When I push the WordCount example shipped with the Flink distribution, I can 
see metrics (bytes received) in the Flink Ui on the corresponding operator. 
However, I used the flink kafka connector and when I run my topology I cannot 
see any metrics reported on the operators. Has anybody experienced something 
like this? 

Thanks!


How to perform this join operation?

2016-04-13 Thread Elias Levy
I am wondering how you would implement the following function in Flink.
The function takes as input two streams.  One stream can be viewed a a
tuple with two value *(x, y)*, the second stream is a stream of individual
values *z*.  The function keeps a time based window on the first input
(say, 24 hours).  Whenever it receives an element from the second stream,
it compares the value *z* against the *x* element of each tuple in the
window, and for each match it emits *(x, y)*.  You are basically doing a
join on *x=z*.  Note that values from the second stream are not windowed
and they are only matched to values from the first stream with an earlier
timestamps.

This was relatively easy to implement in Samza.  Consume off two topics,
the first keyed by *x* and the second by *z*.  Consume both topics in a
job.  Messages with the same key would be consumed by the same task.  The
task could maintain a window of messages from the first stream in its local
state,  Whenever a message came in via the second stream, it could look up
in the local state for matching messages, and if it found any, send them to
the output stream.  Obviously, with Samza you don't have the luxury of the
system handling event time for you, but this work well and it is easy to
implement.

I am not clear how this would be implemented in Flink.

It is easy enough to partition by key either stream, and to window the
first stream using a sliding window, but from there out things get
complicated.

You can join the two streams by key, but you must do so using the same
window for both streams.  That means events from the first stream may be
matched to older events of the second stream, which is not what we want.  I
suppose if both included a timestamp, you could later add a filter to
remove such events from the merged stream.  But you would also have to deal
with duplicates, as the window is a sliding window and the same two
elements may match across all window panes that contain the matching
elements.  So you need to dedup as well.

coGroup seems like it would suffer from the same issues.

Maybe the answer is connected streams, but there is scant documentation on
the semantics of ConnectedStreams.  There isn't even an example that I
could find that makes use of them.

Thoughts?


Re: asm IllegalArgumentException with 1.0.0

2016-04-13 Thread Stephan Ewen
Does this problem persist? (It might have been caused by maven caches with
bad artifacts).

The many transitive dependencies you see often come from the connectors -
that is also why we do not put the connectors into the lib folder directly,
so that these libraries are not always part of every Flink program.


On Mon, Mar 14, 2016 at 9:16 PM, Zach Cox  wrote:

> Yes pretty much - we use sbt to run the job in a local environment, not
> Intellij, but should be the same thing. We were also seeing that exception
> running unit tests locally. We did not see the exception when assembling a
> fat jar and submitting to a remote Flink cluster.
>
> It seems like the flink-connector-elasticsearch jar should not have shaded
> classes in it. Maybe that jar in maven central was built incorrectly?
>
> We worked around this by just not depending on that elasticsearch
> connector at all, since we wrote our own connector for Elasticsearch 2.x.
>
> -Zach
>
>
> On Mon, Mar 14, 2016 at 2:03 PM Andrew Whitaker <
> andrew.whita...@braintreepayments.com> wrote:
>
>> We're having the same issue (we also have a dependency on
>> flink-connector-elasticsearch). It's only happening to us in IntelliJ
>> though. Is this the case for you as well?
>>
>> On Thu, Mar 10, 2016 at 3:20 PM, Zach Cox  wrote:
>>
>>> After some poking around I noticed
>>> that flink-connector-elasticsearch_2.10-1.0.0.jar contains shaded asm
>>> classes. If I remove that dependency from my project then I do not get the
>>> IllegalArgumentException.
>>>
>>>
>>> On Thu, Mar 10, 2016 at 11:51 AM Zach Cox  wrote:
>>>
 Here are the jars on the classpath when I try to run our Flink job in a
 local environment (via `sbt run`):


 https://gist.githubusercontent.com/zcox/0992aba1c517b51dc879/raw/7136ec034c2beef04bd65de9f125ce3796db511f/gistfile1.txt

 There are many transitive dependencies pulled in from internal library
 projects that probably need to be cleaned out. Maybe we are including
 something that conflicts? Or maybe something important is being excluded?

 Are the asm classes included in Flink jars in some shaded form?

 Thanks,
 Zach


 On Thu, Mar 10, 2016 at 5:06 AM Stephan Ewen  wrote:

> Dependency shading changed a bit between RC4 and RC5 - maybe a
> different minor ASM version is now included in the "test" scope.
>
> Can you share the dependencies of the problematic project?
>
> On Thu, Mar 10, 2016 at 12:26 AM, Zach Cox  wrote:
>
>> I also noticed when I try to run this application in a local
>> environment, I get the same IllegalArgumentException.
>>
>> When I assemble this application into a fat jar and run it on a Flink
>> cluster using the CLI tools, it seems to run fine.
>>
>> Maybe my local classpath is missing something that is provided on the
>> Flink task managers?
>>
>> -Zach
>>
>>
>> On Wed, Mar 9, 2016 at 5:16 PM Zach Cox  wrote:
>>
>>> Hi - after upgrading to 1.0.0, I'm getting this exception now in a
>>> unit test:
>>>
>>>IllegalArgumentException:   (null:-1)
>>> org.apache.flink.shaded.org.objectweb.asm.ClassVisitor.(Unknown
>>> Source)
>>> org.apache.flink.shaded.org.objectweb.asm.ClassVisitor.(Unknown
>>> Source)
>>>
>>> org.apache.flink.api.scala.InnerClosureFinder.(ClosureCleaner.scala:279)
>>>
>>> org.apache.flink.api.scala.ClosureCleaner$.getInnerClasses(ClosureCleaner.scala:95)
>>>
>>> org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:115)
>>>
>>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:568)
>>>
>>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.scala:498)
>>>
>>> The line that causes that exception is just adding
>>> a FlinkKafkaConsumer08 source.
>>>
>>> ClassVisitor [1] seems to throw that IllegalArgumentException when
>>> it is not given a valid api version number, but InnerClosureFinder [2]
>>> looks fine to me.
>>>
>>> Any idea what might be causing this? This unit test worked fine with
>>> 1.0.0-rc0 jars.
>>>
>>> Thanks,
>>> Zach
>>>
>>> [1]
>>> http://websvn.ow2.org/filedetails.php?repname=asm&path=%2Ftrunk%2Fasm%2Fsrc%2Forg%2Fobjectweb%2Fasm%2FClassVisitor.java
>>> [2]
>>> https://github.com/apache/flink/blob/master/flink-scala/src/main/scala/org/apache/flink/api/scala/ClosureCleaner.scala#L279
>>>
>>>
>>>
>
>>
>>
>> --
>> Andrew Whitaker | andrew.whita...@braintreepayments.com
>> --
>> Note: this information is confidential. It is prohibited to share, post
>> online or otherwise publicize without Braintree's prior written consent.
>>
>


Re: Limit buffer size for a job

2016-04-13 Thread Stephan Ewen
You can reduce Flink's internal network buffering by adjusting the total
number of network buffers.
See
https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/config.html#configuring-the-network-buffers

That should also make a difference.


On Wed, Apr 13, 2016 at 3:57 PM, Andrew Ge Wu 
wrote:

> Thanks guys for the explanation, I will give it a try.
> After all buffers are filled, the back pressure did it’s job, it works so
> far so good, but I will defiantly give a try to control the latency.
>
>
> Thanks again!
>
>
> Andrew
>
>
>
>
> On 11 Apr 2016, at 18:19, Stephan Ewen  wrote:
>
> Hi!
>
> Ufuk's suggestion explains how to buffer less between Flink operators.
>
> Is that what you were looking for, or are you looking for a way to fetch
> more fine grained in the source from the message queue?
> What type of source are you using?
>
> Greetings,
> Stephan
>
>
>
>
> On Mon, Apr 11, 2016 at 5:02 PM, Ufuk Celebi  wrote:
>
>> Hey Andrew,
>>
>> take a look at this here:
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/index.html#controlling-latency
>>
>> Does this help?
>>
>> – Ufuk
>>
>> On Thu, Apr 7, 2016 at 3:04 PM, Andrew Ge Wu 
>> wrote:
>> > Hi guys
>> >
>> > We have a prioritized queue, where high priority item can jump the
>> queue and we do not want to cache too much record in the buffer.
>> > Is there a way to configure my streaming source to use less buffer? so
>> source always fetch and get latest high prio records?
>> >
>> > Any suggestion? thanks!
>> >
>> >
>> > Andrew
>> > --
>> > Confidentiality Notice: This e-mail transmission may contain
>> confidential
>> > or legally privileged information that is intended only for the
>> individual
>> > or entity named in the e-mail address. If you are not the intended
>> > recipient, you are hereby notified that any disclosure, copying,
>> > distribution, or reliance upon the contents of this e-mail is strictly
>> > prohibited and may be unlawful. If you have received this e-mail in
>> error,
>> > please notify the sender immediately by return e-mail and delete all
>> copies
>> > of this message.
>>
>
>
>
> Confidentiality Notice: This e-mail transmission may contain confidential
> or legally privileged information that is intended only for the individual
> or entity named in the e-mail address. If you are not the intended
> recipient, you are hereby notified that any disclosure, copying,
> distribution, or reliance upon the contents of this e-mail is strictly
> prohibited and may be unlawful. If you have received this e-mail in error,
> please notify the sender immediately by return e-mail and delete all copies
> of this message.
>


Flink performance pre-packaged vs. self-compiled

2016-04-13 Thread Robert Schmidtke
Hi everyone,

I'm using Flink 0.10.2 for some benchmarks and had to add some small
changes to Flink, which led me to compiling and running it myself. This is
when I noticed a performance difference in the pre-packaged Flink version
that I downloaded from the web (
http://archive.apache.org/dist/flink/flink-0.10.2/flink-0.10.2-bin-hadoop27.tgz)
versus the form of the release-0.10 branch I built myself (mvn
-Dhadoop.version=2.7.1 -Dscala-2.11 -DskipTests -Drat.skip=true clean
install // mvn version 3.0.4).

I ran some version of TeraSort (https://github.com/eastcirclek/terasort)
and I noticed that the pre-packaged version of Flink performs 10-20% better
than the one I built myself (the only tweaks I mead are in the CliFrontend
after the Job has finished running, so I would rule out bad programming on
my side).

Has anyone come across this before? Or could you provide me with clearer
build instructions in order to reproduce the downloadable archive as
closely as possible? Thanks in advance!

Robert

-- 
My GPG Key ID: 336E2680


Re: Limit buffer size for a job

2016-04-13 Thread Andrew Ge Wu
Thanks guys for the explanation, I will give it a try.
After all buffers are filled, the back pressure did it’s job, it works so far 
so good, but I will defiantly give a try to control the latency.


Thanks again!


Andrew



> On 11 Apr 2016, at 18:19, Stephan Ewen  wrote:
> 
> Hi!
> 
> Ufuk's suggestion explains how to buffer less between Flink operators.
> 
> Is that what you were looking for, or are you looking for a way to fetch more 
> fine grained in the source from the message queue?
> What type of source are you using?
> 
> Greetings,
> Stephan
> 
> 
> 
> 
> On Mon, Apr 11, 2016 at 5:02 PM, Ufuk Celebi  > wrote:
> Hey Andrew,
> 
> take a look at this here:
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/index.html#controlling-latency
>  
> 
> 
> Does this help?
> 
> – Ufuk
> 
> On Thu, Apr 7, 2016 at 3:04 PM, Andrew Ge Wu  > wrote:
> > Hi guys
> >
> > We have a prioritized queue, where high priority item can jump the queue 
> > and we do not want to cache too much record in the buffer.
> > Is there a way to configure my streaming source to use less buffer? so 
> > source always fetch and get latest high prio records?
> >
> > Any suggestion? thanks!
> >
> >
> > Andrew
> > --
> > Confidentiality Notice: This e-mail transmission may contain confidential
> > or legally privileged information that is intended only for the individual
> > or entity named in the e-mail address. If you are not the intended
> > recipient, you are hereby notified that any disclosure, copying,
> > distribution, or reliance upon the contents of this e-mail is strictly
> > prohibited and may be unlawful. If you have received this e-mail in error,
> > please notify the sender immediately by return e-mail and delete all copies
> > of this message.
> 


-- 
Confidentiality Notice: This e-mail transmission may contain confidential 
or legally privileged information that is intended only for the individual 
or entity named in the e-mail address. If you are not the intended 
recipient, you are hereby notified that any disclosure, copying, 
distribution, or reliance upon the contents of this e-mail is strictly 
prohibited and may be unlawful. If you have received this e-mail in error, 
please notify the sender immediately by return e-mail and delete all copies 
of this message.


Re: RocksDB Statebackend

2016-04-13 Thread Aljoscha Krettek
That's interesting to hear. If you want we can also collaborate on that
one. Using the Flink managed memory for that purpose would require some
changes to lower layers of Flink.

On Wed, 13 Apr 2016 at 13:11 Shannon Carey  wrote:

> This is something that my team and I have discussed building, so it's
> great to know that it's already on the radar. If we beat you to it, I'll
> definitely try to make it a contribution.
>
> Shannon
>
> From: Aljoscha Krettek 
> Date: Wednesday, April 13, 2016 at 1:46 PM
> To: 
> Subject: Re: RocksDB Statebackend
>
> Hi Maxim,
> yes the plan is to have a cache of hot values that uses the managed memory
> abstraction of Flink so that we can make sure that we stay within memory
> bounds and don't run into OOM exceptions.
>
> On Tue, 12 Apr 2016 at 23:37 Maxim  wrote:
>
>> Is it possible to add an option to store the state in the Java HashMap
>> and write its content to RocksDB when checkpointing? For "hot" keys that
>> are updated very frequently such optimization would help with performance.
>>
>> I know that you are also working on incremental checkpoints which would
>> also be big win for jobs with a large number of keys.
>>
>> Thanks,
>>
>> Maxim.
>>
>> On Tue, Apr 12, 2016 at 10:39 AM, Stephan Ewen  wrote:
>>
>>> Concerning the size of RocksDB snapshots - I am wondering if RocksDB
>>> simply does not compact for a long time, thus having a lot of stale data in
>>> the snapshot.
>>>
>>> That would be especially the case, if you have a lot of changing values
>>> for the same set of keys.
>>>
>>> On Tue, Apr 12, 2016 at 6:41 PM, Aljoscha Krettek 
>>> wrote:
>>>
 Hi,
 I'm going to try and respond to each point:

 1. This seems strange, could you give some background on parallelism,
 number of operators with state and so on? Also, I'm assuming you are using
 the partitioned state abstraction, i.e. getState(), correct?

 2. your observations are pretty much correct. The reason why RocksDB is
 slower is that the FsStateBackend basically stores the state in a Java
 HashMap and writes the contents to HDFS when checkpointing. RocksDB stores
 data in on-disk files and goes to them for every state access (of course
 there are caches, but generally it is like this). I'm actually impressed
 that it is still this fast in comparison.

 3. see 1. (I think for now)

 4. The checkpointing time is the time from the JobManager deciding to
 start a checkpoint until all tasks have confirmed that checkpoint. I have
 seen this before and I think it results from back pressure. The problem is
 that the checkpoint messages that we sent through the topology are sitting
 at the sources because they are also back pressured by the slow processing
 of normal records. You should be able to see the actual checkpointing times
 (both synchronous and asynchronous) in the log files of the task managers,
 they should be very much lower.

 I can go into details, I'm just writing this quickly before calling it
 a day. :-)

 Cheers,
 Aljoscha

 On Tue, 12 Apr 2016 at 18:21 Konstantin Knauf <
 konstantin.kn...@tngtech.com> wrote:

> Hi everyone,
>
> my experience with RocksDBStatebackend have left me a little bit
> confused. Maybe you guys can confirm that my epxierence is the expected
> behaviour ;):
>
> I have run a "performancetest" twice, once with FsStateBackend and once
> RocksDBStatebackend in comparison. In this particular test the state
> saved is generally not large (in a production scenario it will be
> larger.)
>
> These are my observations:
>
> 1. Minimal Checkpoint Size (no records) with RocksDB was 33MB compared
> to <<1MB with the FSStatebackend.
>
> 2. Throughput dropped from 28k/s -> 18k/s on a small cluster.
>
> 3. Checkpoint sizes as reported in the Dashboard was ca. 1MB for
> FsStatebackend but >100MB for RocksDbStatebackend. I hope the
> difference
> gets smaller for very large state. Can you confirm?
>
> 4. Checkpointing Times as reported in the Dashboard were 26secs for
> RocksDB during the test and <1 second for FsStatebackend. Does the
> reported time correspond to the sync. + asynchronous part of the
> checkpointing in case of RocksDB? Is there any way to tell how long the
> synchronous part takes?
>
> Form these first observations RocksDB does seem to bring a large
> overhead for state < 1GB, I guess? Is this expected?
>
> Cheers,
>
> Konstantin
>

>>>
>>


Re: RocksDB Statebackend

2016-04-13 Thread Shannon Carey
This is something that my team and I have discussed building, so it's great to 
know that it's already on the radar. If we beat you to it, I'll definitely try 
to make it a contribution.

Shannon

From: Aljoscha Krettek mailto:aljos...@apache.org>>
Date: Wednesday, April 13, 2016 at 1:46 PM
To: mailto:user@flink.apache.org>>
Subject: Re: RocksDB Statebackend

Hi Maxim,
yes the plan is to have a cache of hot values that uses the managed memory 
abstraction of Flink so that we can make sure that we stay within memory bounds 
and don't run into OOM exceptions.

On Tue, 12 Apr 2016 at 23:37 Maxim 
mailto:mfat...@gmail.com>> wrote:
Is it possible to add an option to store the state in the Java HashMap and 
write its content to RocksDB when checkpointing? For "hot" keys that are 
updated very frequently such optimization would help with performance.

I know that you are also working on incremental checkpoints which would also be 
big win for jobs with a large number of keys.

Thanks,

Maxim.

On Tue, Apr 12, 2016 at 10:39 AM, Stephan Ewen 
mailto:se...@apache.org>> wrote:
Concerning the size of RocksDB snapshots - I am wondering if RocksDB simply 
does not compact for a long time, thus having a lot of stale data in the 
snapshot.

That would be especially the case, if you have a lot of changing values for the 
same set of keys.

On Tue, Apr 12, 2016 at 6:41 PM, Aljoscha Krettek 
mailto:aljos...@apache.org>> wrote:
Hi,
I'm going to try and respond to each point:

1. This seems strange, could you give some background on parallelism, number of 
operators with state and so on? Also, I'm assuming you are using the 
partitioned state abstraction, i.e. getState(), correct?

2. your observations are pretty much correct. The reason why RocksDB is slower 
is that the FsStateBackend basically stores the state in a Java HashMap and 
writes the contents to HDFS when checkpointing. RocksDB stores data in on-disk 
files and goes to them for every state access (of course there are caches, but 
generally it is like this). I'm actually impressed that it is still this fast 
in comparison.

3. see 1. (I think for now)

4. The checkpointing time is the time from the JobManager deciding to start a 
checkpoint until all tasks have confirmed that checkpoint. I have seen this 
before and I think it results from back pressure. The problem is that the 
checkpoint messages that we sent through the topology are sitting at the 
sources because they are also back pressured by the slow processing of normal 
records. You should be able to see the actual checkpointing times (both 
synchronous and asynchronous) in the log files of the task managers, they 
should be very much lower.

I can go into details, I'm just writing this quickly before calling it a day. 
:-)

Cheers,
Aljoscha

On Tue, 12 Apr 2016 at 18:21 Konstantin Knauf 
mailto:konstantin.kn...@tngtech.com>> wrote:
Hi everyone,

my experience with RocksDBStatebackend have left me a little bit
confused. Maybe you guys can confirm that my epxierence is the expected
behaviour ;):

I have run a "performancetest" twice, once with FsStateBackend and once
RocksDBStatebackend in comparison. In this particular test the state
saved is generally not large (in a production scenario it will be larger.)

These are my observations:

1. Minimal Checkpoint Size (no records) with RocksDB was 33MB compared
to <<1MB with the FSStatebackend.

2. Throughput dropped from 28k/s -> 18k/s on a small cluster.

3. Checkpoint sizes as reported in the Dashboard was ca. 1MB for
FsStatebackend but >100MB for RocksDbStatebackend. I hope the difference
gets smaller for very large state. Can you confirm?

4. Checkpointing Times as reported in the Dashboard were 26secs for
RocksDB during the test and <1 second for FsStatebackend. Does the
reported time correspond to the sync. + asynchronous part of the
checkpointing in case of RocksDB? Is there any way to tell how long the
synchronous part takes?

Form these first observations RocksDB does seem to bring a large
overhead for state < 1GB, I guess? Is this expected?

Cheers,

Konstantin




Re: Flink ML 1.0.0 - Saving and Loading Models to Score a Single Feature Vector

2016-04-13 Thread KirstiLaurila
Now I got this working in cloud (not locally, but it's ok) so thanks a lot.
Next problem is how to read then these written files and add them to the
als.

I guess it is something like 

   val als = ALS()
   als.factorsOption = Option(users,items)

but I don't get how I could read in the data I have written with the
previous example. I tried with :

val users  = env.readFile(new SerializedInputFormat[Factors], "path")

but I guess I need to use somehow TypeSerializedInputFormat[Factors] but I
couldn't get this working.

Best,
Kirsti



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-ML-1-0-0-Saving-and-Loading-Models-to-Score-a-Single-Feature-Vector-tp5766p6081.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: threads, parallelism and task managers

2016-04-13 Thread Stephan Ewen
No problem ;-)

On Wed, Apr 13, 2016 at 11:38 AM, Stefano Bortoli 
wrote:

> Sounds you are damn right! thanks for the insight, dumb on us for not
> checking this before.
>
> saluti,
> Stefano
>
> 2016-04-13 11:05 GMT+02:00 Stephan Ewen :
>
>> Sounds actually not like a Flink issue. I would look into the commons
>> pool docs.
>> Maybe they size their pools by default with the number of cores, so the
>> pool has only 8 threads, and other requests are queues?
>>
>> On Wed, Apr 13, 2016 at 10:29 AM, Flavio Pompermaier <
>> pomperma...@okkam.it> wrote:
>>
>>> Any feedback about our JDBC InputFormat issue..?
>>>
>>> On Thu, Apr 7, 2016 at 12:37 PM, Flavio Pompermaier <
>>> pomperma...@okkam.it> wrote:
>>>
 We've finally created a running example (For Flink 0.10.2) of our
 improved JDBC imputformat that you can run from an IDE (it creates an
 in-memory derby database with 1000 rows and batch of 10) at
 https://gist.github.com/fpompermaier/bcd704abc93b25b6744ac76ac17ed351.
 The first time you run the program you have to comment the following
 line:

 stmt.executeUpdate("Drop Table users ");

 In your pom declare the following dependencies:

 
 org.apache.derby
 derby
 10.10.1.1
 
 
 org.apache.commons
 commons-pool2
 2.4.2
 

 In my laptop I have 8 cores and if I put parallelism to 16 I expect to
 see 16 calls to the connection pool (i.e. ' CREATING
 NEW CONNECTION!') while I see only 8 (up to my maximum number of cores).
 The number of created task instead is correct (16).

 I hope this could help in understanding where the problem is!

 Best and thank in advance,
 Flavio

 On Wed, Mar 30, 2016 at 11:01 AM, Stefano Bortoli 
 wrote:

> Hi Ufuk,
>
> here is our preliminary input formar implementation:
> https://gist.github.com/anonymous/dbf05cad2a6cc07b8aa88e74a2c23119
>
> if you need a running project, I will have to create a test one cause
> I cannot share the current configuration.
>
> thanks a lot in advance!
>
>
>
> 2016-03-30 10:13 GMT+02:00 Ufuk Celebi :
>
>> Do you have the code somewhere online? Maybe someone can have a quick
>> look over it later. I'm pretty sure that is indeed a problem with the
>> custom input format.
>>
>> – Ufuk
>>
>> On Tue, Mar 29, 2016 at 3:50 PM, Stefano Bortoli 
>> wrote:
>> > Perhaps there is a misunderstanding on my side over the parallelism
>> and
>> > split management given a data source.
>> >
>> > We started from the current JDBCInputFormat to make it
>> multi-thread. Then,
>> > given a space of keys, we create the splits based on a fetchsize
>> set as a
>> > parameter. In the open, we get a connection from the pool, and
>> execute a
>> > query using the split interval. This sets the 'resultSet', and then
>> the
>> > DatasourceTask iterates between reachedEnd, next and close. On
>> close, the
>> > connection is returned to the pool. We set parallelism to 32, and
>> we would
>> > expect 32 connection opened but the connections opened are just 8.
>> >
>> > We tried to make an example with the textinputformat, but being a
>> > delimitedinpurformat, the open is called sequentially when
>> statistics are
>> > built, and then the processing is executed in parallel just after
>> all the
>> > open are executed. This is not feasible in our case, because there
>> would be
>> > millions of queries before the statistics are collected.
>> >
>> > Perhaps we are doing something wrong, still to figure out what. :-/
>> >
>> > thanks a lot for your help.
>> >
>> > saluti,
>> > Stefano
>> >
>> >
>> > 2016-03-29 13:30 GMT+02:00 Stefano Bortoli :
>> >>
>> >> That is exactly my point. I should have 32 threads running, but I
>> have
>> >> only 8. 32 Task are created, but only only 8 are run concurrently.
>> Flavio
>> >> and I will try to make a simple program to produce the problem. If
>> we solve
>> >> our issues on the way, we'll let you know.
>> >>
>> >> thanks a lot anyway.
>> >>
>> >> saluti,
>> >> Stefano
>> >>
>> >> 2016-03-29 12:44 GMT+02:00 Till Rohrmann :
>> >>>
>> >>> Then it shouldn’t be a problem. The ExeuctionContetxt is used to
>> run
>> >>> futures and their callbacks. But as Ufuk said, each task will
>> spawn it’s own
>> >>> thread and if you set the parallelism to 32 then you should have
>> 32 threads
>> >>> running.
>> >>>
>> >>>
>> >>> On Tue, Mar 29, 2016 at 12:29 PM, Stefano Bortoli <
>> s.bort...@gmail.com>
>> >>> wrote:
>> 
>>  In fact, I don't use it. I just had to crawl back the runtime
>>  implementation to get to the point where parallelism wa

Re: threads, parallelism and task managers

2016-04-13 Thread Stefano Bortoli
Sounds you are damn right! thanks for the insight, dumb on us for not
checking this before.

saluti,
Stefano

2016-04-13 11:05 GMT+02:00 Stephan Ewen :

> Sounds actually not like a Flink issue. I would look into the commons pool
> docs.
> Maybe they size their pools by default with the number of cores, so the
> pool has only 8 threads, and other requests are queues?
>
> On Wed, Apr 13, 2016 at 10:29 AM, Flavio Pompermaier  > wrote:
>
>> Any feedback about our JDBC InputFormat issue..?
>>
>> On Thu, Apr 7, 2016 at 12:37 PM, Flavio Pompermaier > > wrote:
>>
>>> We've finally created a running example (For Flink 0.10.2) of our
>>> improved JDBC imputformat that you can run from an IDE (it creates an
>>> in-memory derby database with 1000 rows and batch of 10) at
>>> https://gist.github.com/fpompermaier/bcd704abc93b25b6744ac76ac17ed351.
>>> The first time you run the program you have to comment the following
>>> line:
>>>
>>> stmt.executeUpdate("Drop Table users ");
>>>
>>> In your pom declare the following dependencies:
>>>
>>> 
>>> org.apache.derby
>>> derby
>>> 10.10.1.1
>>> 
>>> 
>>> org.apache.commons
>>> commons-pool2
>>> 2.4.2
>>> 
>>>
>>> In my laptop I have 8 cores and if I put parallelism to 16 I expect to
>>> see 16 calls to the connection pool (i.e. ' CREATING
>>> NEW CONNECTION!') while I see only 8 (up to my maximum number of cores).
>>> The number of created task instead is correct (16).
>>>
>>> I hope this could help in understanding where the problem is!
>>>
>>> Best and thank in advance,
>>> Flavio
>>>
>>> On Wed, Mar 30, 2016 at 11:01 AM, Stefano Bortoli 
>>> wrote:
>>>
 Hi Ufuk,

 here is our preliminary input formar implementation:
 https://gist.github.com/anonymous/dbf05cad2a6cc07b8aa88e74a2c23119

 if you need a running project, I will have to create a test one cause I
 cannot share the current configuration.

 thanks a lot in advance!



 2016-03-30 10:13 GMT+02:00 Ufuk Celebi :

> Do you have the code somewhere online? Maybe someone can have a quick
> look over it later. I'm pretty sure that is indeed a problem with the
> custom input format.
>
> – Ufuk
>
> On Tue, Mar 29, 2016 at 3:50 PM, Stefano Bortoli 
> wrote:
> > Perhaps there is a misunderstanding on my side over the parallelism
> and
> > split management given a data source.
> >
> > We started from the current JDBCInputFormat to make it multi-thread.
> Then,
> > given a space of keys, we create the splits based on a fetchsize set
> as a
> > parameter. In the open, we get a connection from the pool, and
> execute a
> > query using the split interval. This sets the 'resultSet', and then
> the
> > DatasourceTask iterates between reachedEnd, next and close. On
> close, the
> > connection is returned to the pool. We set parallelism to 32, and we
> would
> > expect 32 connection opened but the connections opened are just 8.
> >
> > We tried to make an example with the textinputformat, but being a
> > delimitedinpurformat, the open is called sequentially when
> statistics are
> > built, and then the processing is executed in parallel just after
> all the
> > open are executed. This is not feasible in our case, because there
> would be
> > millions of queries before the statistics are collected.
> >
> > Perhaps we are doing something wrong, still to figure out what. :-/
> >
> > thanks a lot for your help.
> >
> > saluti,
> > Stefano
> >
> >
> > 2016-03-29 13:30 GMT+02:00 Stefano Bortoli :
> >>
> >> That is exactly my point. I should have 32 threads running, but I
> have
> >> only 8. 32 Task are created, but only only 8 are run concurrently.
> Flavio
> >> and I will try to make a simple program to produce the problem. If
> we solve
> >> our issues on the way, we'll let you know.
> >>
> >> thanks a lot anyway.
> >>
> >> saluti,
> >> Stefano
> >>
> >> 2016-03-29 12:44 GMT+02:00 Till Rohrmann :
> >>>
> >>> Then it shouldn’t be a problem. The ExeuctionContetxt is used to
> run
> >>> futures and their callbacks. But as Ufuk said, each task will
> spawn it’s own
> >>> thread and if you set the parallelism to 32 then you should have
> 32 threads
> >>> running.
> >>>
> >>>
> >>> On Tue, Mar 29, 2016 at 12:29 PM, Stefano Bortoli <
> s.bort...@gmail.com>
> >>> wrote:
> 
>  In fact, I don't use it. I just had to crawl back the runtime
>  implementation to get to the point where parallelism was
> switching from 32
>  to 8.
> 
>  saluti,
>  Stefano
> 
>  2016-03-29 12:24 GMT+02:00 Till Rohrmann  >:
> >
> > Hi,
> >
> > for what do you use the ExecutionContext? That should ac

Re: threads, parallelism and task managers

2016-04-13 Thread Stephan Ewen
Sounds actually not like a Flink issue. I would look into the commons pool
docs.
Maybe they size their pools by default with the number of cores, so the
pool has only 8 threads, and other requests are queues?

On Wed, Apr 13, 2016 at 10:29 AM, Flavio Pompermaier 
wrote:

> Any feedback about our JDBC InputFormat issue..?
>
> On Thu, Apr 7, 2016 at 12:37 PM, Flavio Pompermaier 
> wrote:
>
>> We've finally created a running example (For Flink 0.10.2) of our
>> improved JDBC imputformat that you can run from an IDE (it creates an
>> in-memory derby database with 1000 rows and batch of 10) at
>> https://gist.github.com/fpompermaier/bcd704abc93b25b6744ac76ac17ed351.
>> The first time you run the program you have to comment the following line:
>>
>> stmt.executeUpdate("Drop Table users ");
>>
>> In your pom declare the following dependencies:
>>
>> 
>> org.apache.derby
>> derby
>> 10.10.1.1
>> 
>> 
>> org.apache.commons
>> commons-pool2
>> 2.4.2
>> 
>>
>> In my laptop I have 8 cores and if I put parallelism to 16 I expect to
>> see 16 calls to the connection pool (i.e. ' CREATING
>> NEW CONNECTION!') while I see only 8 (up to my maximum number of cores).
>> The number of created task instead is correct (16).
>>
>> I hope this could help in understanding where the problem is!
>>
>> Best and thank in advance,
>> Flavio
>>
>> On Wed, Mar 30, 2016 at 11:01 AM, Stefano Bortoli 
>> wrote:
>>
>>> Hi Ufuk,
>>>
>>> here is our preliminary input formar implementation:
>>> https://gist.github.com/anonymous/dbf05cad2a6cc07b8aa88e74a2c23119
>>>
>>> if you need a running project, I will have to create a test one cause I
>>> cannot share the current configuration.
>>>
>>> thanks a lot in advance!
>>>
>>>
>>>
>>> 2016-03-30 10:13 GMT+02:00 Ufuk Celebi :
>>>
 Do you have the code somewhere online? Maybe someone can have a quick
 look over it later. I'm pretty sure that is indeed a problem with the
 custom input format.

 – Ufuk

 On Tue, Mar 29, 2016 at 3:50 PM, Stefano Bortoli 
 wrote:
 > Perhaps there is a misunderstanding on my side over the parallelism
 and
 > split management given a data source.
 >
 > We started from the current JDBCInputFormat to make it multi-thread.
 Then,
 > given a space of keys, we create the splits based on a fetchsize set
 as a
 > parameter. In the open, we get a connection from the pool, and
 execute a
 > query using the split interval. This sets the 'resultSet', and then
 the
 > DatasourceTask iterates between reachedEnd, next and close. On close,
 the
 > connection is returned to the pool. We set parallelism to 32, and we
 would
 > expect 32 connection opened but the connections opened are just 8.
 >
 > We tried to make an example with the textinputformat, but being a
 > delimitedinpurformat, the open is called sequentially when statistics
 are
 > built, and then the processing is executed in parallel just after all
 the
 > open are executed. This is not feasible in our case, because there
 would be
 > millions of queries before the statistics are collected.
 >
 > Perhaps we are doing something wrong, still to figure out what. :-/
 >
 > thanks a lot for your help.
 >
 > saluti,
 > Stefano
 >
 >
 > 2016-03-29 13:30 GMT+02:00 Stefano Bortoli :
 >>
 >> That is exactly my point. I should have 32 threads running, but I
 have
 >> only 8. 32 Task are created, but only only 8 are run concurrently.
 Flavio
 >> and I will try to make a simple program to produce the problem. If
 we solve
 >> our issues on the way, we'll let you know.
 >>
 >> thanks a lot anyway.
 >>
 >> saluti,
 >> Stefano
 >>
 >> 2016-03-29 12:44 GMT+02:00 Till Rohrmann :
 >>>
 >>> Then it shouldn’t be a problem. The ExeuctionContetxt is used to run
 >>> futures and their callbacks. But as Ufuk said, each task will spawn
 it’s own
 >>> thread and if you set the parallelism to 32 then you should have 32
 threads
 >>> running.
 >>>
 >>>
 >>> On Tue, Mar 29, 2016 at 12:29 PM, Stefano Bortoli <
 s.bort...@gmail.com>
 >>> wrote:
 
  In fact, I don't use it. I just had to crawl back the runtime
  implementation to get to the point where parallelism was switching
 from 32
  to 8.
 
  saluti,
  Stefano
 
  2016-03-29 12:24 GMT+02:00 Till Rohrmann >>> >:
 >
 > Hi,
 >
 > for what do you use the ExecutionContext? That should actually be
 > something which you shouldn’t be concerned with since it is only
 used
 > internally by the runtime.
 >
 > Cheers,
 > Till
 >
 >
 > On Tue, Mar 29, 2016 at 12:09 PM, Stefano Bortoli <
 s.bort...@gmail.com>
 > wrote:
 >>

Re: Does Kafka connector leverage Kafka message keys?

2016-04-13 Thread Stephan Ewen
Hi!

You can exploit that, yes. If you read data from Kafka in Flink, a Kafka
partition is "sticky" to a Flink source subtask. If you have (kafka-source
=> mapFunction) for example, you can be sure that all values with the same
key go through the same parallel mapFunction subtask. If you maintain a
HashMap in there, you basically have state by key based on the Kafka
partitions.

If you want to use Flink's internal key/value state, however, you need to
let Flink re-partition the data by using "keyBy()". That is because Flink's
internal sharding of state (including the re-sharding to adjust parallelism
we are currently working on) follows a dedicated hashing scheme which is
with all likelihood different from the partition function that writes the
key/value pairs to the Kafka Topics.

Hope that helps...

Greetings,
Stephan





On Wed, Apr 13, 2016 at 9:20 AM, Andrew Coates 
wrote:

> Hi Stephan,
>
> If we were to do that, would flink leverage the fact that Kafka has
> already partitioned the data by the key, or would flink attempt to shuffle
> the data again into its own partitions, potentially shuffling data between
> machines for no gain?
>
> Thanks,
>
> Andy
>
> On Sun, 10 Apr 2016, 13:22 Stephan Ewen,  wrote:
>
>> Hi!
>>
>> You are right with your observations. Right now, you would have to create
>> a "Tuple2" in the KeyedDeserializationSchema. That is what also
>> a KeyedStream holds internally.
>>
>> A KeyedStream in Flink is more than just a stream that has a Key and a
>> Value - it is also partitioned by the key, and Flink maintains track of
>> keyed state in those streams. That's why it has to be explicitly created.
>>
>> For convenience, one could make an addition that FlinkKafkaConsumer can
>> accept two DeserializationSchema (one for key, one for value) and return a
>> Tuple2 automatically.
>>
>> Greetings,
>> Stephan
>>
>>
>> On Sun, Apr 10, 2016 at 5:49 AM, Elias Levy 
>> wrote:
>>
>>> I am wondering if the Kafka connectors leverage Kafka message keys at
>>> all?
>>>
>>> Looking through the docs my impression is that it does not.  E.g. if I
>>> use the connector to consume from a partitioned Kafka topic, what I will
>>> get back is a DataStream, rather than a KeyedStream.  And if I want access
>>> to a message's key the key must be within the message to extract it or I
>>> have to make use of a KeyedDeserializationSchema with the connector to
>>> access the Kafka message key and insert it into the type returned by the
>>> connector.
>>>
>>> Similar, it would seem that you have give the Kafka product sink a
>>> KeyedSerializationSchema, which will obtain a Kafka key and a Kafka message
>>> from the events from a DataStream, but you can product from a KeyedStream
>>> where the key if obtained from the stream itself.
>>>
>>> Is this correct?
>>>
>>>
>>


Re: threads, parallelism and task managers

2016-04-13 Thread Flavio Pompermaier
Any feedback about our JDBC InputFormat issue..?

On Thu, Apr 7, 2016 at 12:37 PM, Flavio Pompermaier 
wrote:

> We've finally created a running example (For Flink 0.10.2) of our improved
> JDBC imputformat that you can run from an IDE (it creates an in-memory
> derby database with 1000 rows and batch of 10) at
> https://gist.github.com/fpompermaier/bcd704abc93b25b6744ac76ac17ed351.
> The first time you run the program you have to comment the following line:
>
> stmt.executeUpdate("Drop Table users ");
>
> In your pom declare the following dependencies:
>
> 
> org.apache.derby
> derby
> 10.10.1.1
> 
> 
> org.apache.commons
> commons-pool2
> 2.4.2
> 
>
> In my laptop I have 8 cores and if I put parallelism to 16 I expect to see
> 16 calls to the connection pool (i.e. ' CREATING NEW
> CONNECTION!') while I see only 8 (up to my maximum number of cores).
> The number of created task instead is correct (16).
>
> I hope this could help in understanding where the problem is!
>
> Best and thank in advance,
> Flavio
>
> On Wed, Mar 30, 2016 at 11:01 AM, Stefano Bortoli 
> wrote:
>
>> Hi Ufuk,
>>
>> here is our preliminary input formar implementation:
>> https://gist.github.com/anonymous/dbf05cad2a6cc07b8aa88e74a2c23119
>>
>> if you need a running project, I will have to create a test one cause I
>> cannot share the current configuration.
>>
>> thanks a lot in advance!
>>
>>
>>
>> 2016-03-30 10:13 GMT+02:00 Ufuk Celebi :
>>
>>> Do you have the code somewhere online? Maybe someone can have a quick
>>> look over it later. I'm pretty sure that is indeed a problem with the
>>> custom input format.
>>>
>>> – Ufuk
>>>
>>> On Tue, Mar 29, 2016 at 3:50 PM, Stefano Bortoli 
>>> wrote:
>>> > Perhaps there is a misunderstanding on my side over the parallelism and
>>> > split management given a data source.
>>> >
>>> > We started from the current JDBCInputFormat to make it multi-thread.
>>> Then,
>>> > given a space of keys, we create the splits based on a fetchsize set
>>> as a
>>> > parameter. In the open, we get a connection from the pool, and execute
>>> a
>>> > query using the split interval. This sets the 'resultSet', and then the
>>> > DatasourceTask iterates between reachedEnd, next and close. On close,
>>> the
>>> > connection is returned to the pool. We set parallelism to 32, and we
>>> would
>>> > expect 32 connection opened but the connections opened are just 8.
>>> >
>>> > We tried to make an example with the textinputformat, but being a
>>> > delimitedinpurformat, the open is called sequentially when statistics
>>> are
>>> > built, and then the processing is executed in parallel just after all
>>> the
>>> > open are executed. This is not feasible in our case, because there
>>> would be
>>> > millions of queries before the statistics are collected.
>>> >
>>> > Perhaps we are doing something wrong, still to figure out what. :-/
>>> >
>>> > thanks a lot for your help.
>>> >
>>> > saluti,
>>> > Stefano
>>> >
>>> >
>>> > 2016-03-29 13:30 GMT+02:00 Stefano Bortoli :
>>> >>
>>> >> That is exactly my point. I should have 32 threads running, but I have
>>> >> only 8. 32 Task are created, but only only 8 are run concurrently.
>>> Flavio
>>> >> and I will try to make a simple program to produce the problem. If we
>>> solve
>>> >> our issues on the way, we'll let you know.
>>> >>
>>> >> thanks a lot anyway.
>>> >>
>>> >> saluti,
>>> >> Stefano
>>> >>
>>> >> 2016-03-29 12:44 GMT+02:00 Till Rohrmann :
>>> >>>
>>> >>> Then it shouldn’t be a problem. The ExeuctionContetxt is used to run
>>> >>> futures and their callbacks. But as Ufuk said, each task will spawn
>>> it’s own
>>> >>> thread and if you set the parallelism to 32 then you should have 32
>>> threads
>>> >>> running.
>>> >>>
>>> >>>
>>> >>> On Tue, Mar 29, 2016 at 12:29 PM, Stefano Bortoli <
>>> s.bort...@gmail.com>
>>> >>> wrote:
>>> 
>>>  In fact, I don't use it. I just had to crawl back the runtime
>>>  implementation to get to the point where parallelism was switching
>>> from 32
>>>  to 8.
>>> 
>>>  saluti,
>>>  Stefano
>>> 
>>>  2016-03-29 12:24 GMT+02:00 Till Rohrmann :
>>> >
>>> > Hi,
>>> >
>>> > for what do you use the ExecutionContext? That should actually be
>>> > something which you shouldn’t be concerned with since it is only
>>> used
>>> > internally by the runtime.
>>> >
>>> > Cheers,
>>> > Till
>>> >
>>> >
>>> > On Tue, Mar 29, 2016 at 12:09 PM, Stefano Bortoli <
>>> s.bort...@gmail.com>
>>> > wrote:
>>> >>
>>> >> Well, in theory yes. Each task has a thread, but only a number is
>>> run
>>> >> in parallel (the job of the scheduler).  Parallelism is set in the
>>> >> environment. However, whereas the parallelism parameter is set
>>> and read
>>> >> correctly, when it comes to actual starting of the threads, the
>>> number is
>>> >> fix to 8. We run a debugger to get to the point where the thread
>>> was
>>> >> sta

Re: FoldFunction accumulator checkpointing

2016-04-13 Thread Aljoscha Krettek
Hi,
there are two cases where a FoldFunction can be used in the streaming API:
KeyedStream.fold() and WindowedStream.fold()/apply(). In both cases we
internally use the partitioned state abstraction of Flink to keep the
state. So yes, the accumulator value is consistently maintained and will
survive failures.

Right now, the accumulation function of a window cannot be a rich function
because the underlying state primitives that the windowing system uses can
only take plain functions because supporting rich functions there could
have problematic implications. The most obvious one to me seems that users
could be trying to keep state in the ReduceFunction of a ReducingState when
given the chance to do so, which a RichFunction does.

This is just off the top of my head but I can go into detail if you want.

Cheers,
Aljoscha

On Wed, 13 Apr 2016 at 00:29 Michael Radford  wrote:

> I'm wondering whether the accumulator value maintained by a
> FoldFunction is automatically checkpointed?
>
> In general, but specifically when using the WindowedStream.apply
> variant that takes a FoldFunction:
>
> public  DataStream apply(R initialValue,
>   FoldFunction foldFunction,
>   WindowFunction function,
>   TypeInformation evidence$7)
>
> If not, then Flink 1.0.1 still has the issue that you can't pass a
> RichFoldFunction to WindowedStream.apply
> (java.lang.UnsupportedOperationException: ReduceFunction of apply can
> not be a RichFunction).
>
> But also, if not, it seems like this would be a common pattern when
> doing complex (keyed / multi-valued) aggregations, and if the
> accumulator type R is serializable, there could be a convenience
> method for a checkpointed fold, like the mapWithState mentioned in the
> State section of the streaming guide.
>
> Thanks,
> Mike
>


Re: RocksDB Statebackend

2016-04-13 Thread Aljoscha Krettek
Hi Maxim,
yes the plan is to have a cache of hot values that uses the managed memory
abstraction of Flink so that we can make sure that we stay within memory
bounds and don't run into OOM exceptions.

On Tue, 12 Apr 2016 at 23:37 Maxim  wrote:

> Is it possible to add an option to store the state in the Java HashMap and
> write its content to RocksDB when checkpointing? For "hot" keys that are
> updated very frequently such optimization would help with performance.
>
> I know that you are also working on incremental checkpoints which would
> also be big win for jobs with a large number of keys.
>
> Thanks,
>
> Maxim.
>
> On Tue, Apr 12, 2016 at 10:39 AM, Stephan Ewen  wrote:
>
>> Concerning the size of RocksDB snapshots - I am wondering if RocksDB
>> simply does not compact for a long time, thus having a lot of stale data in
>> the snapshot.
>>
>> That would be especially the case, if you have a lot of changing values
>> for the same set of keys.
>>
>> On Tue, Apr 12, 2016 at 6:41 PM, Aljoscha Krettek 
>> wrote:
>>
>>> Hi,
>>> I'm going to try and respond to each point:
>>>
>>> 1. This seems strange, could you give some background on parallelism,
>>> number of operators with state and so on? Also, I'm assuming you are using
>>> the partitioned state abstraction, i.e. getState(), correct?
>>>
>>> 2. your observations are pretty much correct. The reason why RocksDB is
>>> slower is that the FsStateBackend basically stores the state in a Java
>>> HashMap and writes the contents to HDFS when checkpointing. RocksDB stores
>>> data in on-disk files and goes to them for every state access (of course
>>> there are caches, but generally it is like this). I'm actually impressed
>>> that it is still this fast in comparison.
>>>
>>> 3. see 1. (I think for now)
>>>
>>> 4. The checkpointing time is the time from the JobManager deciding to
>>> start a checkpoint until all tasks have confirmed that checkpoint. I have
>>> seen this before and I think it results from back pressure. The problem is
>>> that the checkpoint messages that we sent through the topology are sitting
>>> at the sources because they are also back pressured by the slow processing
>>> of normal records. You should be able to see the actual checkpointing times
>>> (both synchronous and asynchronous) in the log files of the task managers,
>>> they should be very much lower.
>>>
>>> I can go into details, I'm just writing this quickly before calling it a
>>> day. :-)
>>>
>>> Cheers,
>>> Aljoscha
>>>
>>> On Tue, 12 Apr 2016 at 18:21 Konstantin Knauf <
>>> konstantin.kn...@tngtech.com> wrote:
>>>
 Hi everyone,

 my experience with RocksDBStatebackend have left me a little bit
 confused. Maybe you guys can confirm that my epxierence is the expected
 behaviour ;):

 I have run a "performancetest" twice, once with FsStateBackend and once
 RocksDBStatebackend in comparison. In this particular test the state
 saved is generally not large (in a production scenario it will be
 larger.)

 These are my observations:

 1. Minimal Checkpoint Size (no records) with RocksDB was 33MB compared
 to <<1MB with the FSStatebackend.

 2. Throughput dropped from 28k/s -> 18k/s on a small cluster.

 3. Checkpoint sizes as reported in the Dashboard was ca. 1MB for
 FsStatebackend but >100MB for RocksDbStatebackend. I hope the difference
 gets smaller for very large state. Can you confirm?

 4. Checkpointing Times as reported in the Dashboard were 26secs for
 RocksDB during the test and <1 second for FsStatebackend. Does the
 reported time correspond to the sync. + asynchronous part of the
 checkpointing in case of RocksDB? Is there any way to tell how long the
 synchronous part takes?

 Form these first observations RocksDB does seem to bring a large
 overhead for state < 1GB, I guess? Is this expected?

 Cheers,

 Konstantin

>>>
>>
>


Re: Does Kafka connector leverage Kafka message keys?

2016-04-13 Thread Andrew Coates
Hi Stephan,

If we were to do that, would flink leverage the fact that Kafka has already
partitioned the data by the key, or would flink attempt to shuffle the data
again into its own partitions, potentially shuffling data between machines
for no gain?

Thanks,

Andy

On Sun, 10 Apr 2016, 13:22 Stephan Ewen,  wrote:

> Hi!
>
> You are right with your observations. Right now, you would have to create
> a "Tuple2" in the KeyedDeserializationSchema. That is what also
> a KeyedStream holds internally.
>
> A KeyedStream in Flink is more than just a stream that has a Key and a
> Value - it is also partitioned by the key, and Flink maintains track of
> keyed state in those streams. That's why it has to be explicitly created.
>
> For convenience, one could make an addition that FlinkKafkaConsumer can
> accept two DeserializationSchema (one for key, one for value) and return a
> Tuple2 automatically.
>
> Greetings,
> Stephan
>
>
> On Sun, Apr 10, 2016 at 5:49 AM, Elias Levy 
> wrote:
>
>> I am wondering if the Kafka connectors leverage Kafka message keys at all?
>>
>> Looking through the docs my impression is that it does not.  E.g. if I
>> use the connector to consume from a partitioned Kafka topic, what I will
>> get back is a DataStream, rather than a KeyedStream.  And if I want access
>> to a message's key the key must be within the message to extract it or I
>> have to make use of a KeyedDeserializationSchema with the connector to
>> access the Kafka message key and insert it into the type returned by the
>> connector.
>>
>> Similar, it would seem that you have give the Kafka product sink a
>> KeyedSerializationSchema, which will obtain a Kafka key and a Kafka message
>> from the events from a DataStream, but you can product from a KeyedStream
>> where the key if obtained from the stream itself.
>>
>> Is this correct?
>>
>>
>