Re: Streaming : a way to "key by partition id" without redispatching data

2017-11-13 Thread Nico Kruber
Hi Gwenhaël,
several functions in Flink require keyed streams because they manage their 
internal state by key. These keys, however, should be independent of the 
current execution and its parallelism so that checkpoints may be restored to 
different levels of parallelism (for re-scaling, see [1]).
Also, different operators, e.g. the source vs. the map, may have a different 
number of parallel tasks in which case you'd need to shuffle the data in order 
to adapt. The same goes for possible differences in the parallelism of the 
Kafka partitions vs. the parallelism you use in Flink.

If, however, all your operators have the same parallelism, doing multiple 
keyBy(0) calls in your program will not re-shuffle the data, because of the 
deterministic assignment of keys to operators.


Nico

[1] https://flink.apache.org/features/2017/07/04/flink-rescalable-state.html
On Thursday, 9 November 2017 18:00:13 CET Gwenhael Pasquiers wrote:
> Hello,
> 
> (Flink 1.2.1)
> 
> For performances reasons I'm trying to reduce the volume of data of my
> stream as soon as possible by windowing/folding it for 15 minutes before
> continuing to the rest of the chain that contains keyBys and windows that
> will transfer data everywhere.
> 
> Because of the huge volume of data, I want to avoid "moving" the data
> between partitions as much as possible (not like a naïve KeyBy does). I
> wanted to create a custom ProcessFunction (using timer and state to fold
> data for X minutes) in order to fold my data over itself before keying the
> stream but even ProcessFunction needs a keyed stream...
> 
> Is there a specific "key" value that would ensure me that my data won't be
> moved to another taskmanager (that it's hashcode will match the partition
> it is already in) ? I thought about the subtask id but I doubt I'd be that
> lucky :-)
> 
> Suggestions
> 
> · Wouldn't it be useful to be able to do a "partitionnedKeyBy" that
> would not move data between nodes, for windowing operations that can be
> parallelized.
> 
> o   Something like kafka => partitionnedKeyBy(0) => first folding =>
> keyBy(0) => second folding => 
> 
> · Finally, aren't all streams keyed ? Even if they're keyed by a
> totally arbitrary partition id until the user chooses its own key,
> shouldn't we be able to do a window (not windowAll) or process over any
> normal Stream's partition ?
> 
> B.R.
> 
> Gwenhaël PASQUIERS



signature.asc
Description: This is a digitally signed message part.


RE: Streaming : a way to "key by partition id" without redispatching data

2017-11-13 Thread Gwenhael Pasquiers
>From what I understood, in your case you might solve your issue by using 
>specific key classes instead of Strings.

Maybe you could create key classes that have a user-specified hashcode that 
could take the previous key's hashcode as a value. That way your data shouldn't 
be sent over the wire and stay in the same partition thus on the same 
taskmanager..


Re: Streaming : a way to "key by partition id" without redispatching data

2017-11-10 Thread Derek VerLee

  
  
I was about to ask this question myself.  I find myself re-keying
  by the same keys repeatedly.  I think in principle you could
  always just roll more work into one window operation with a more
  complex series of maps/folds/windowfunctions or processfunction. 
  However this doesn't always feel the most clean or convenient, or
  composible.  It would be great if there was a way to just express
  that you want to keep the same partitions as the last window, or
  that the new key is 1-to-1 with the previous one.  Even more
  generally, if the new key is "based" off the old key in a way that
  is one to one or one to many, in either case it may not be
  necessary to send data over the wire, although in the later case,
  there is a risk of hot-spotting , I suppose.

On 11/10/17 12:01 PM, Gwenhael
  Pasquiers wrote:


  
  
  
  
I think I finally found
a way to “simulate” a Timer thanks to the the
processWatermark function of the AbstractStreamOperator.
 
Sorry for the monologue.
 

  
From: Gwenhael
Pasquiers [mailto:gwenhael.pasqui...@ericsson.com]

Sent: vendredi 10 novembre 2017 16:02
To: 'user@flink.apache.org'

Subject: RE: Streaming : a way to "key by
partition id" without redispatching data
  

 
Hello,
 
Finally, even after
creating my operator, I still get the error : “Timers can
only be used on keyed operators”.
 
Isn’t there any way
around this ? A way to “key” my stream without shuffling the
data ?
 

  
From: Gwenhael
Pasquiers

Sent: vendredi 10 novembre 2017 11:42
To: Gwenhael Pasquiers ;
'user@flink.apache.org' 
Subject: RE: Streaming : a way to "key by
partition id" without redispatching data
  

 
Maybe you don’t need to
bother with that question.
 
I’m currently
discovering AbstractStreamOperator, OneInputStreamOperator
and Triggerable.
 
That should do it :-)
 

  
From: Gwenhael
Pasquiers [mailto:gwenhael.pasqui...@ericsson.com]

Sent: jeudi 9 novembre 2017 18:00
To: 'user@flink.apache.org' 
Subject: Streaming : a way to "key by partition
id" without redispatching data
  

 
Hello,
 
(Flink 1.2.1)
 
For performances reasons
I’m trying to reduce the volume of data of my stream as soon
as possible by windowing/folding it for 15 minutes before
continuing to the rest of the chain that contains keyBys and
windows that will transfer data everywhere.
 
Because of the huge
volume of data, I want to avoid “moving” the data between
partitions as much as possible (not like a naïve KeyBy
does). I wanted to create a custom ProcessFunction (using
timer and state to fold data for X minutes) in order to fold
my data over itself before keying the stream but even
ProcessFunction needs a keyed stream…
 
Is there a specific
“key” value that would ensure me that my data won’t be moved
to another taskmanager (that it’s hashcode will match the
partition it is already in) ? I thought about the subtask id
but I doubt I’d be that lucky :-) 
 
Suggestions

  ·
  Wouldn’t
it be useful to be able to do a “partitionnedKeyBy” that
would not move data between nodes, for windowing operations
that can be parallelized.

  o  
  Something
like kafka => partitionnedKeyBy(0) => first folding
=> keyBy(0) => second folding => ….

  ·
  Finally,
aren’t all streams keyed ? Even if they’re keyed by a
totally arbitrary partition id until the user chooses its
own key, shouldn’t we be able to do a window (not windowAll)
or process over any normal Stream’s partition ?
 
B.R.
 
Gwenhaël PASQUIERS
  


  



RE: Streaming : a way to "key by partition id" without redispatching data

2017-11-10 Thread Gwenhael Pasquiers
I think I finally found a way to "simulate" a Timer thanks to the the 
processWatermark function of the AbstractStreamOperator.

Sorry for the monologue.

From: Gwenhael Pasquiers [mailto:gwenhael.pasqui...@ericsson.com]
Sent: vendredi 10 novembre 2017 16:02
To: 'user@flink.apache.org' 
Subject: RE: Streaming : a way to "key by partition id" without redispatching 
data

Hello,

Finally, even after creating my operator, I still get the error : "Timers can 
only be used on keyed operators".

Isn't there any way around this ? A way to "key" my stream without shuffling 
the data ?

From: Gwenhael Pasquiers
Sent: vendredi 10 novembre 2017 11:42
To: Gwenhael Pasquiers 
>; 
'user@flink.apache.org' >
Subject: RE: Streaming : a way to "key by partition id" without redispatching 
data

Maybe you don't need to bother with that question.

I'm currently discovering AbstractStreamOperator, OneInputStreamOperator and 
Triggerable.

That should do it :-)

From: Gwenhael Pasquiers [mailto:gwenhael.pasqui...@ericsson.com]
Sent: jeudi 9 novembre 2017 18:00
To: 'user@flink.apache.org' 
>
Subject: Streaming : a way to "key by partition id" without redispatching data

Hello,

(Flink 1.2.1)

For performances reasons I'm trying to reduce the volume of data of my stream 
as soon as possible by windowing/folding it for 15 minutes before continuing to 
the rest of the chain that contains keyBys and windows that will transfer data 
everywhere.

Because of the huge volume of data, I want to avoid "moving" the data between 
partitions as much as possible (not like a naïve KeyBy does). I wanted to 
create a custom ProcessFunction (using timer and state to fold data for X 
minutes) in order to fold my data over itself before keying the stream but even 
ProcessFunction needs a keyed stream...

Is there a specific "key" value that would ensure me that my data won't be 
moved to another taskmanager (that it's hashcode will match the partition it is 
already in) ? I thought about the subtask id but I doubt I'd be that lucky :-)

Suggestions

· Wouldn't it be useful to be able to do a "partitionnedKeyBy" that 
would not move data between nodes, for windowing operations that can be 
parallelized.

o   Something like kafka => partitionnedKeyBy(0) => first folding => keyBy(0) 
=> second folding => 

· Finally, aren't all streams keyed ? Even if they're keyed by a 
totally arbitrary partition id until the user chooses its own key, shouldn't we 
be able to do a window (not windowAll) or process over any normal Stream's 
partition ?

B.R.

Gwenhaël PASQUIERS


RE: Streaming : a way to "key by partition id" without redispatching data

2017-11-10 Thread Gwenhael Pasquiers
Hello,

Finally, even after creating my operator, I still get the error : "Timers can 
only be used on keyed operators".

Isn't there any way around this ? A way to "key" my stream without shuffling 
the data ?

From: Gwenhael Pasquiers
Sent: vendredi 10 novembre 2017 11:42
To: Gwenhael Pasquiers ; 
'user@flink.apache.org' 
Subject: RE: Streaming : a way to "key by partition id" without redispatching 
data

Maybe you don't need to bother with that question.

I'm currently discovering AbstractStreamOperator, OneInputStreamOperator and 
Triggerable.

That should do it :-)

From: Gwenhael Pasquiers [mailto:gwenhael.pasqui...@ericsson.com]
Sent: jeudi 9 novembre 2017 18:00
To: 'user@flink.apache.org' 
>
Subject: Streaming : a way to "key by partition id" without redispatching data

Hello,

(Flink 1.2.1)

For performances reasons I'm trying to reduce the volume of data of my stream 
as soon as possible by windowing/folding it for 15 minutes before continuing to 
the rest of the chain that contains keyBys and windows that will transfer data 
everywhere.

Because of the huge volume of data, I want to avoid "moving" the data between 
partitions as much as possible (not like a naïve KeyBy does). I wanted to 
create a custom ProcessFunction (using timer and state to fold data for X 
minutes) in order to fold my data over itself before keying the stream but even 
ProcessFunction needs a keyed stream...

Is there a specific "key" value that would ensure me that my data won't be 
moved to another taskmanager (that it's hashcode will match the partition it is 
already in) ? I thought about the subtask id but I doubt I'd be that lucky :-)

Suggestions

· Wouldn't it be useful to be able to do a "partitionnedKeyBy" that 
would not move data between nodes, for windowing operations that can be 
parallelized.

o   Something like kafka => partitionnedKeyBy(0) => first folding => keyBy(0) 
=> second folding => 

· Finally, aren't all streams keyed ? Even if they're keyed by a 
totally arbitrary partition id until the user chooses its own key, shouldn't we 
be able to do a window (not windowAll) or process over any normal Stream's 
partition ?

B.R.

Gwenhaël PASQUIERS


RE: Streaming : a way to "key by partition id" without redispatching data

2017-11-10 Thread Gwenhael Pasquiers
Maybe you don't need to bother with that question.

I'm currently discovering AbstractStreamOperator, OneInputStreamOperator and 
Triggerable.

That should do it :-)

From: Gwenhael Pasquiers [mailto:gwenhael.pasqui...@ericsson.com]
Sent: jeudi 9 novembre 2017 18:00
To: 'user@flink.apache.org' 
Subject: Streaming : a way to "key by partition id" without redispatching data

Hello,

(Flink 1.2.1)

For performances reasons I'm trying to reduce the volume of data of my stream 
as soon as possible by windowing/folding it for 15 minutes before continuing to 
the rest of the chain that contains keyBys and windows that will transfer data 
everywhere.

Because of the huge volume of data, I want to avoid "moving" the data between 
partitions as much as possible (not like a naïve KeyBy does). I wanted to 
create a custom ProcessFunction (using timer and state to fold data for X 
minutes) in order to fold my data over itself before keying the stream but even 
ProcessFunction needs a keyed stream...

Is there a specific "key" value that would ensure me that my data won't be 
moved to another taskmanager (that it's hashcode will match the partition it is 
already in) ? I thought about the subtask id but I doubt I'd be that lucky :-)

Suggestions

· Wouldn't it be useful to be able to do a "partitionnedKeyBy" that 
would not move data between nodes, for windowing operations that can be 
parallelized.

o   Something like kafka => partitionnedKeyBy(0) => first folding => keyBy(0) 
=> second folding => 

· Finally, aren't all streams keyed ? Even if they're keyed by a 
totally arbitrary partition id until the user chooses its own key, shouldn't we 
be able to do a window (not windowAll) or process over any normal Stream's 
partition ?

B.R.

Gwenhaël PASQUIERS