Re: Storing large lists into state per key

2017-12-12 Thread Ovidiu-Cristian MARCU
Hi Jan,

You could associate a key to each element of your Key's list (e.g., hashing the 
value), keep only the keys in heap (e.g., in a list) and the associated state 
key-value/s in an external store like RocksDB/Redis, but you will notice large 
overheads due to de/serializing - a huge penatly for more than 1000s of 
elements (see https://hal.inria.fr/hal-01530744/document 
 for some experimental settings) 
for relatively small rate of new events per Key, if needed to process all 
values of a Key for each new event. Best case you can do some incremental 
processing unless your non-combining means non-associative operations per Key.

Best,
Ovidiu
> On 12 Dec 2017, at 11:54, Jan Lukavský  wrote:
> 
> Hi Fabian,
> 
> thanks for quick reply, what you suggest seems to work at first sight, I will 
> try it. Is there any reason not to implement a RocksDBListState this way in 
> general? Is there any increased overhead of this approach?
> 
> Thanks,
> 
>  Jan
> 
> 
> On 12/12/2017 11:17 AM, Fabian Hueske wrote:
>> Hi Jan,
>> 
>> I cannot comment on the internal design, but you could put the data into a
>> RocksDBStateBackend MapState where the value X is your data
>> type and the key is the list index. You would need another ValueState for
>> the current number of elements that you put into the MapState.
>> A MapState allows to fetch and traverse the key, value, or entry set of the
>> Map without loading it completely into memory.
>> The sets are traversed in sort order of the key, so should be in insertion
>> order (given that you properly increment the list index).
>> 
>> Best, Fabian
>> 
>> 2017-12-12 10:23 GMT+01:00 Jan Lukavský :
>> 
>>> Hi all,
>>> 
>>> I have a question that appears as a user@ question, but brought me into
>>> the dev@ mailing list while I was browsing through the Flink's source
>>> codes. First I'll try to briefly describe my use case. I'm trying to do a
>>> group-by-key operation with a limited number of distinct keys (which I
>>> cannot control), but a non trivial count of values. The operation in the
>>> GBK is non-combining, so that all values per key (many) have to be stored
>>> in a state. Running this on testing data led to a surprise (for me), that
>>> even when using RocksDBStateBackend, the whole list of data is serialized
>>> into single binary blob and then deserialized into List, and therefore has
>>> to fit in memory (multiple times, in fact).
>>> 
>>> I tried to create an alternative RocksDBStateBackend, that would store
>>> each element of list in ListState to a separate key in RocksDB, so that the
>>> whole blob would not have to be loaded by a single get, but a scan over
>>> multiple keys could be made. Digging into the source code I found there was
>>> a hierarchy of classes mirroring the public API in 'internal' package -
>>> InternalKvState, InternalMergingState, InternalListState, and so on. These
>>> classes however have different hierarchy than the public API classes that
>>> they mirror, most notably InternalKvState is superinterface of all others.
>>> This fact seems to be used on multiple places throughout the source code.
>>> 
>>> My question is - is this intentional? Would it be possible to store each
>>> element of a ListState in a separate key in RocksDB (probably by adding
>>> some suffix to the actual key of the state for each element)? What are the
>>> pitfalls? And is it necessary for the InternalListState to be actually
>>> subinterface of InternalKvState? I find this to be a related problem.
>>> 
>>> Many thanks for any comments or thoughts,
>>> 
>>>  Jan
>>> 
>>> 
> 



Re: KeyGroupRangeAssignment ?

2017-02-21 Thread Ovidiu-Cristian MARCU
My case is the following: I have one stream source of elements, each element 
contains some key.
I create a KeyedStream and then window it (so I get a WindowedStream) on top of 
which I apply some window function.

Some numbers to my problem: 1 million records, 1000 keys.
I assume parallelism is 100 and maxParallelism is 200.
So each slot will take 2 groups, I need to split my records evenly into groups 
such that each slot task will process an equal number of records.

In the end, assuming the key is an integer (1 to 1000), I change the method 
assignToKeyGroup such that it returns key%maxParallelism.
This solves my problem.

Because the rate of elements is constant for each key, using current code does 
not ensure equal distribution of keys, so skewed computation gives skewed 
latency.

However, I would like to be able to change the way Flink is assigning key to 
keyGroups without changing the runtime.

I’m using KeyBy for windowed transformations, hope makes sense.
I understand key groups are a nice way of rescaling an application.

Best,
Ovidiu

> On 21 Feb 2017, at 16:18, Aljoscha Krettek  wrote:
> 
> I'm afraid that won't work because we also internally use murmur hash on
> the result of hashCode().
> 
> @Ovidiu I still want to understand why you want to use keyBy() for that
> case. It sounds like you want to use it because you would like to do
> something else but that is not possible with the Flink APIs. The fact that
> key groups exist is more of an implementation detail and exposing that to
> users does not seem like to right way to go.
> 
> On Tue, 21 Feb 2017 at 16:10 Greg Hogan  wrote:
> 
>> Integer's hashCode is the identity function. Store your slot index in an
>> Integer or IntValue and key off that field.
>> 
>> On Tue, Feb 21, 2017 at 6:04 AM, Ovidiu-Cristian MARCU <
>> ovidiu-cristian.ma...@inria.fr> wrote:
>> 
>>> Hi,
>>> 
>>> As in my example, each key is a window so I want to evenly distributed
>>> processing to all slots.
>>> If I have 100 keys and 100 slots, for each key I have the same rate of
>>> events, I don’t want skewed distribution.
>>> 
>>> Best,
>>> Ovidiu
>>> 
>>>> On 21 Feb 2017, at 11:38, Aljoscha Krettek 
>> wrote:
>>>> 
>>>> Hi Ovidiu,
>>>> what's the reason for wanting to make the parallelism equal to the
>> number
>>>> of keys? I think in general it's very hard to ensure that hashes even
>> go
>>> to
>>>> different key groups. It can always happen that all your keys (if you
>>> have
>>>> so few of them) are assigned to the same parallel operator instance.
>>>> 
>>>> Cheers,
>>>> Aljoscha
>>>> 
>>>> On Tue, 21 Feb 2017 at 10:53 Till Rohrmann 
>> wrote:
>>>> 
>>>>> Hi Ovidiu,
>>>>> 
>>>>> at the moment it is not possible to plugin a user defined hash
>>> function/key
>>>>> group assignment function. If you like, then you can file a JIRA issue
>>> to
>>>>> add this functionality.
>>>>> 
>>>>> The key group assignment in your example looks quite skewed. One
>>> question
>>>>> concerning how you calculated it: Shouldn't the number of element in
>>> each
>>>>> group sum up to 1024? this only works for the first case. What do the
>>>>> numbers mean then?
>>>>> 
>>>>> Cheers,
>>>>> Till
>>>>> 
>>>>> On Mon, Feb 20, 2017 at 3:45 PM, Ovidiu-Cristian MARCU <
>>>>> ovidiu-cristian.ma...@inria.fr> wrote:
>>>>> 
>>>>>> Hi,
>>>>>> 
>>>>>> Thank you for clarifications (I am working with KeyedStream so a
>> custom
>>>>>> partitioner does not help).
>>>>>> 
>>>>>> So I should set maxParallelism>=parallelism and change my keys (from
>>>>>> input.keyBy(0)) such that key group assignment works as expected),
>>>>>> but I can’t modify these keys in order to make it work.
>>>>>> 
>>>>>> The other option is to change Flink’s internals in order to evenly
>>>>>> distribute keys (changing computeKeyGroupForKeyHash: is this
>> enough?).
>>>>>> What I was looking for was an api to change the way key group
>>> assignment
>>>>>> is done, but without changing Flink’s runtime.
>>>>>> 
>>>>>> I think that the maxParallelism set

Re: KeyGroupRangeAssignment ?

2017-02-21 Thread Ovidiu-Cristian MARCU
Filled in https://issues.apache.org/jira/browse/FLINK-5873 
<https://issues.apache.org/jira/browse/FLINK-5873>

Best,
Ovidiu

> On 21 Feb 2017, at 12:00, Ovidiu-Cristian MARCU 
>  wrote:
> 
> Hi Till,
> 
> I will look into filling a jira issue.
> 
> Regarding the key group assignment, you;re right, there was a mistake in my 
> code, here it is code and distribution:
> numServers is maxParallelism
> 
> int numKeys = 1024;
>   HashMap groups = new HashMap Integer>();
>   for (int numServers = 2; numServers < 17; numServers++) {
>   groups = new HashMap();
>   for (int i = 0; i < numKeys; i++) {
>   int targetKeyGroupIndex = 
> MathUtils.murmurHash(i) % numServers;
>   Integer mygroup = 
> groups.get(targetKeyGroupIndex);
>   int count = mygroup == null ? 0 : mygroup;
>   groups.put(targetKeyGroupIndex, ++count);
>   }
>   System.out.println(groups + " " + numServers);
>   }
> 
> {0=517, 1=507} 2
> {0=364, 1=302, 2=358} 3
> {0=258, 1=239, 2=259, 3=268} 4
> {0=180, 1=220, 2=212, 3=205, 4=207} 5
> {0=193, 1=157, 2=179, 3=171, 4=145, 5=179} 6
> {0=144, 1=161, 2=152, 3=137, 4=160, 5=131, 6=139} 7
> {0=125, 1=132, 2=120, 3=127, 4=133, 5=107, 6=139, 7=141} 8
> {0=120, 1=110, 2=115, 3=123, 4=93, 5=112, 6=121, 7=99, 8=131} 9
> {0=95, 1=106, 2=98, 3=103, 4=108, 5=85, 6=114, 7=114, 8=102, 9=99} 10
> {0=98, 1=83, 2=84, 3=92, 4=89, 5=99, 6=97, 7=80, 8=126, 9=75, 10=101} 11
> {0=98, 1=74, 2=92, 3=90, 4=73, 5=84, 6=95, 7=83, 8=87, 9=81, 10=72, 11=95} 12
> {0=65, 1=84, 2=72, 3=80, 4=71, 5=85, 6=80, 7=79, 8=78, 9=85, 10=81, 11=91, 
> 12=73} 13
> {0=73, 1=83, 2=75, 3=62, 4=81, 5=69, 6=73, 7=71, 8=78, 9=77, 10=75, 11=79, 
> 12=62, 13=66} 14
> {0=67, 1=65, 2=81, 3=84, 4=73, 5=57, 6=76, 7=56, 8=69, 9=62, 10=56, 11=79, 
> 12=75, 13=52, 14=72} 15
> {0=57, 1=72, 2=52, 3=61, 4=63, 5=47, 6=64, 7=80, 8=68, 9=60, 10=68, 11=66, 
> 12=70, 13=60, 14=75, 15=61} 16
> 
> 
> Best,
> Ovidiu
> 
>> On 21 Feb 2017, at 10:52, Till Rohrmann > <mailto:trohrm...@apache.org>> wrote:
>> 
>> Hi Ovidiu,
>> 
>> at the moment it is not possible to plugin a user defined hash function/key
>> group assignment function. If you like, then you can file a JIRA issue to
>> add this functionality.
>> 
>> The key group assignment in your example looks quite skewed. One question
>> concerning how you calculated it: Shouldn't the number of element in each
>> group sum up to 1024? this only works for the first case. What do the
>> numbers mean then?
>> 
>> Cheers,
>> Till
>> 
>> On Mon, Feb 20, 2017 at 3:45 PM, Ovidiu-Cristian MARCU <
>> ovidiu-cristian.ma...@inria.fr <mailto:ovidiu-cristian.ma...@inria.fr> 
>> <mailto:ovidiu-cristian.ma...@inria.fr 
>> <mailto:ovidiu-cristian.ma...@inria.fr>>> wrote:
>> 
>>> Hi,
>>> 
>>> Thank you for clarifications (I am working with KeyedStream so a custom
>>> partitioner does not help).
>>> 
>>> So I should set maxParallelism>=parallelism and change my keys (from
>>> input.keyBy(0)) such that key group assignment works as expected),
>>> but I can’t modify these keys in order to make it work.
>>> 
>>> The other option is to change Flink’s internals in order to evenly
>>> distribute keys (changing computeKeyGroupForKeyHash: is this enough?).
>>> What I was looking for was an api to change the way key group assignment
>>> is done, but without changing Flink’s runtime.
>>> 
>>> I think that the maxParallelism setting is not enough (it introduces this
>>> inefficient way of distributing data for processing when using KeyedStream).
>>> Is it possible to expose somehow the key group assignment?
>>> 
>>> This is how keys are distributed (1024 keys, key=1..1024; and groups from
>>> 2 to 16 - equiv. parallelism that is number of slots):
>>> 
>>> {0=517, 1=507} 2
>>> {0=881, 1=809, 2=358} 3
>>> {0=1139, 1=1048, 2=617, 3=268} 4
>>> {0=1319, 1=1268, 2=829, 3=473, 4=207} 5
>>> {0=1512, 1=1425, 2=1008, 3=644, 4=352, 5=179} 6
>>> {0=1656, 1=1586, 2=1160, 3=781, 4=512, 5=310, 6=139} 7
>>> {0=1781, 1=1718, 2=1280, 3=908, 4=645, 5=417, 6=278, 7=141} 8
>>> {0=1901, 1=1828, 2=1395, 3=1031, 4=738, 5=529, 6=399, 7=240, 8=131} 9
>>> {0=1996, 1=1934, 2=1493, 3=1134, 4=846, 5=614, 6=513, 7=354, 8=233, 9=99}
>>> 10
>&g

Re: KeyGroupRangeAssignment ?

2017-02-21 Thread Ovidiu-Cristian MARCU
Hi,

As in my example, each key is a window so I want to evenly distributed 
processing to all slots.
If I have 100 keys and 100 slots, for each key I have the same rate of events, 
I don’t want skewed distribution.

Best,
Ovidiu

> On 21 Feb 2017, at 11:38, Aljoscha Krettek  wrote:
> 
> Hi Ovidiu,
> what's the reason for wanting to make the parallelism equal to the number
> of keys? I think in general it's very hard to ensure that hashes even go to
> different key groups. It can always happen that all your keys (if you have
> so few of them) are assigned to the same parallel operator instance.
> 
> Cheers,
> Aljoscha
> 
> On Tue, 21 Feb 2017 at 10:53 Till Rohrmann  wrote:
> 
>> Hi Ovidiu,
>> 
>> at the moment it is not possible to plugin a user defined hash function/key
>> group assignment function. If you like, then you can file a JIRA issue to
>> add this functionality.
>> 
>> The key group assignment in your example looks quite skewed. One question
>> concerning how you calculated it: Shouldn't the number of element in each
>> group sum up to 1024? this only works for the first case. What do the
>> numbers mean then?
>> 
>> Cheers,
>> Till
>> 
>> On Mon, Feb 20, 2017 at 3:45 PM, Ovidiu-Cristian MARCU <
>> ovidiu-cristian.ma...@inria.fr> wrote:
>> 
>>> Hi,
>>> 
>>> Thank you for clarifications (I am working with KeyedStream so a custom
>>> partitioner does not help).
>>> 
>>> So I should set maxParallelism>=parallelism and change my keys (from
>>> input.keyBy(0)) such that key group assignment works as expected),
>>> but I can’t modify these keys in order to make it work.
>>> 
>>> The other option is to change Flink’s internals in order to evenly
>>> distribute keys (changing computeKeyGroupForKeyHash: is this enough?).
>>> What I was looking for was an api to change the way key group assignment
>>> is done, but without changing Flink’s runtime.
>>> 
>>> I think that the maxParallelism setting is not enough (it introduces this
>>> inefficient way of distributing data for processing when using
>> KeyedStream).
>>> Is it possible to expose somehow the key group assignment?
>>> 
>>> This is how keys are distributed (1024 keys, key=1..1024; and groups from
>>> 2 to 16 - equiv. parallelism that is number of slots):
>>> 
>>> {0=517, 1=507} 2
>>> {0=881, 1=809, 2=358} 3
>>> {0=1139, 1=1048, 2=617, 3=268} 4
>>> {0=1319, 1=1268, 2=829, 3=473, 4=207} 5
>>> {0=1512, 1=1425, 2=1008, 3=644, 4=352, 5=179} 6
>>> {0=1656, 1=1586, 2=1160, 3=781, 4=512, 5=310, 6=139} 7
>>> {0=1781, 1=1718, 2=1280, 3=908, 4=645, 5=417, 6=278, 7=141} 8
>>> {0=1901, 1=1828, 2=1395, 3=1031, 4=738, 5=529, 6=399, 7=240, 8=131} 9
>>> {0=1996, 1=1934, 2=1493, 3=1134, 4=846, 5=614, 6=513, 7=354, 8=233, 9=99}
>>> 10
>>> {0=2094, 1=2017, 2=1577, 3=1226, 4=935, 5=713, 6=610, 7=434, 8=359,
>> 9=174,
>>> 10=101} 11
>>> {0=2192, 1=2091, 2=1669, 3=1316, 4=1008, 5=797, 6=705, 7=517, 8=446,
>>> 9=255, 10=173, 11=95} 12
>>> {0=2257, 1=2175, 2=1741, 3=1396, 4=1079, 5=882, 6=785, 7=596, 8=524,
>>> 9=340, 10=254, 11=186, 12=73} 13
>>> {0=2330, 1=2258, 2=1816, 3=1458, 4=1160, 5=951, 6=858, 7=667, 8=602,
>>> 9=417, 10=329, 11=265, 12=135, 13=66} 14
>>> {0=2397, 1=2323, 2=1897, 3=1542, 4=1233, 5=1008, 6=934, 7=723, 8=671,
>>> 9=479, 10=385, 11=344, 12=210, 13=118, 14=72} 15
>>> {0=2454, 1=2395, 2=1949, 3=1603, 4=1296, 5=1055, 6=998, 7=803, 8=739,
>>> 9=539, 10=453, 11=410, 12=280, 13=178, 14=147, 15=61} 16
>>> 
>>> Best,
>>> Ovidiu
>>> 
>>>> On 20 Feb 2017, at 12:04, Till Rohrmann  wrote:
>>>> 
>>>> Hi Ovidiu,
>>>> 
>>>> the way Flink works is to assign key group ranges to operators. For
>> each
>>> element you calculate a hash value and based on that you assign it to a
>> key
>>> group. Thus, in your example, you have either a key group with more than
>> 1
>>> key or multiple key groups with 1 or more keys assigned to an operator.
>>>> 
>>>> So what you could try to do is to reduce the number of key groups to
>>> your parallelism via env.setMaxParallelism() and then try to figure a key
>>> out whose hashes are uniformly distributed over the key groups. The key
>>> group assignment is calculated via murmurHash(key.hashCode()) %
>>> maxParallelism.
>>>> 
>>>> Alternative

Re: KeyGroupRangeAssignment ?

2017-02-21 Thread Ovidiu-Cristian MARCU
Hi Till,

I will look into filling a jira issue.

Regarding the key group assignment, you;re right, there was a mistake in my 
code, here it is code and distribution:
numServers is maxParallelism

int numKeys = 1024;
HashMap groups = new HashMap();
for (int numServers = 2; numServers < 17; numServers++) {
groups = new HashMap();
for (int i = 0; i < numKeys; i++) {
int targetKeyGroupIndex = 
MathUtils.murmurHash(i) % numServers;
Integer mygroup = 
groups.get(targetKeyGroupIndex);
int count = mygroup == null ? 0 : mygroup;
groups.put(targetKeyGroupIndex, ++count);
}
System.out.println(groups + " " + numServers);
}

{0=517, 1=507} 2
{0=364, 1=302, 2=358} 3
{0=258, 1=239, 2=259, 3=268} 4
{0=180, 1=220, 2=212, 3=205, 4=207} 5
{0=193, 1=157, 2=179, 3=171, 4=145, 5=179} 6
{0=144, 1=161, 2=152, 3=137, 4=160, 5=131, 6=139} 7
{0=125, 1=132, 2=120, 3=127, 4=133, 5=107, 6=139, 7=141} 8
{0=120, 1=110, 2=115, 3=123, 4=93, 5=112, 6=121, 7=99, 8=131} 9
{0=95, 1=106, 2=98, 3=103, 4=108, 5=85, 6=114, 7=114, 8=102, 9=99} 10
{0=98, 1=83, 2=84, 3=92, 4=89, 5=99, 6=97, 7=80, 8=126, 9=75, 10=101} 11
{0=98, 1=74, 2=92, 3=90, 4=73, 5=84, 6=95, 7=83, 8=87, 9=81, 10=72, 11=95} 12
{0=65, 1=84, 2=72, 3=80, 4=71, 5=85, 6=80, 7=79, 8=78, 9=85, 10=81, 11=91, 
12=73} 13
{0=73, 1=83, 2=75, 3=62, 4=81, 5=69, 6=73, 7=71, 8=78, 9=77, 10=75, 11=79, 
12=62, 13=66} 14
{0=67, 1=65, 2=81, 3=84, 4=73, 5=57, 6=76, 7=56, 8=69, 9=62, 10=56, 11=79, 
12=75, 13=52, 14=72} 15
{0=57, 1=72, 2=52, 3=61, 4=63, 5=47, 6=64, 7=80, 8=68, 9=60, 10=68, 11=66, 
12=70, 13=60, 14=75, 15=61} 16


Best,
Ovidiu

> On 21 Feb 2017, at 10:52, Till Rohrmann  wrote:
> 
> Hi Ovidiu,
> 
> at the moment it is not possible to plugin a user defined hash function/key
> group assignment function. If you like, then you can file a JIRA issue to
> add this functionality.
> 
> The key group assignment in your example looks quite skewed. One question
> concerning how you calculated it: Shouldn't the number of element in each
> group sum up to 1024? this only works for the first case. What do the
> numbers mean then?
> 
> Cheers,
> Till
> 
> On Mon, Feb 20, 2017 at 3:45 PM, Ovidiu-Cristian MARCU <
> ovidiu-cristian.ma...@inria.fr <mailto:ovidiu-cristian.ma...@inria.fr>> wrote:
> 
>> Hi,
>> 
>> Thank you for clarifications (I am working with KeyedStream so a custom
>> partitioner does not help).
>> 
>> So I should set maxParallelism>=parallelism and change my keys (from
>> input.keyBy(0)) such that key group assignment works as expected),
>> but I can’t modify these keys in order to make it work.
>> 
>> The other option is to change Flink’s internals in order to evenly
>> distribute keys (changing computeKeyGroupForKeyHash: is this enough?).
>> What I was looking for was an api to change the way key group assignment
>> is done, but without changing Flink’s runtime.
>> 
>> I think that the maxParallelism setting is not enough (it introduces this
>> inefficient way of distributing data for processing when using KeyedStream).
>> Is it possible to expose somehow the key group assignment?
>> 
>> This is how keys are distributed (1024 keys, key=1..1024; and groups from
>> 2 to 16 - equiv. parallelism that is number of slots):
>> 
>> {0=517, 1=507} 2
>> {0=881, 1=809, 2=358} 3
>> {0=1139, 1=1048, 2=617, 3=268} 4
>> {0=1319, 1=1268, 2=829, 3=473, 4=207} 5
>> {0=1512, 1=1425, 2=1008, 3=644, 4=352, 5=179} 6
>> {0=1656, 1=1586, 2=1160, 3=781, 4=512, 5=310, 6=139} 7
>> {0=1781, 1=1718, 2=1280, 3=908, 4=645, 5=417, 6=278, 7=141} 8
>> {0=1901, 1=1828, 2=1395, 3=1031, 4=738, 5=529, 6=399, 7=240, 8=131} 9
>> {0=1996, 1=1934, 2=1493, 3=1134, 4=846, 5=614, 6=513, 7=354, 8=233, 9=99}
>> 10
>> {0=2094, 1=2017, 2=1577, 3=1226, 4=935, 5=713, 6=610, 7=434, 8=359, 9=174,
>> 10=101} 11
>> {0=2192, 1=2091, 2=1669, 3=1316, 4=1008, 5=797, 6=705, 7=517, 8=446,
>> 9=255, 10=173, 11=95} 12
>> {0=2257, 1=2175, 2=1741, 3=1396, 4=1079, 5=882, 6=785, 7=596, 8=524,
>> 9=340, 10=254, 11=186, 12=73} 13
>> {0=2330, 1=2258, 2=1816, 3=1458, 4=1160, 5=951, 6=858, 7=667, 8=602,
>> 9=417, 10=329, 11=265, 12=135, 13=66} 14
>> {0=2397, 1=2323, 2=1897, 3=1542, 4=1233, 5=1008, 6=934, 7=723, 8=671,
>> 9=479, 10=385, 11=344, 12=210, 13=118, 14=72} 15
>> {0=2454, 1=2395, 2=1949, 3=1603, 4=1296, 5=1055, 6=998, 7=803, 8=739,
>> 9=539, 10=453, 11=410, 12=280, 13=178, 14=147, 15=61} 16
>> 
>> Best,
>> Ovidiu
>

Re: KeyGroupRangeAssignment ?

2017-02-21 Thread Ovidiu-Cristian MARCU
Hi,

Any thoughts on this issue: related to what Till proposed 'to figure a key out 
whose hashes are uniformly distributed over the key groups’ and a way of 
exposing the key group assignment through the api?

I wonder how other users are facing this issue.

Having a small set of keys (related to input.keyBy) could be easily tackled 
with some sort of local mapping but I am considering an use case with millions 
of keys.

Best,
Ovidiu


> On 20 Feb 2017, at 15:45, Ovidiu-Cristian MARCU 
>  wrote:
> 
> Hi,
> 
> Thank you for clarifications (I am working with KeyedStream so a custom 
> partitioner does not help).
> 
> So I should set maxParallelism>=parallelism and change my keys (from 
> input.keyBy(0)) such that key group assignment works as expected), 
> but I can’t modify these keys in order to make it work.
> 
> The other option is to change Flink’s internals in order to evenly distribute 
> keys (changing computeKeyGroupForKeyHash: is this enough?).
> What I was looking for was an api to change the way key group assignment is 
> done, but without changing Flink’s runtime.
> 
> I think that the maxParallelism setting is not enough (it introduces this 
> inefficient way of distributing data for processing when using KeyedStream).
> Is it possible to expose somehow the key group assignment?
> 
> This is how keys are distributed (1024 keys, key=1..1024; and groups from 2 
> to 16 - equiv. parallelism that is number of slots):
> 
> {0=517, 1=507} 2
> {0=881, 1=809, 2=358} 3
> {0=1139, 1=1048, 2=617, 3=268} 4
> {0=1319, 1=1268, 2=829, 3=473, 4=207} 5
> {0=1512, 1=1425, 2=1008, 3=644, 4=352, 5=179} 6
> {0=1656, 1=1586, 2=1160, 3=781, 4=512, 5=310, 6=139} 7
> {0=1781, 1=1718, 2=1280, 3=908, 4=645, 5=417, 6=278, 7=141} 8
> {0=1901, 1=1828, 2=1395, 3=1031, 4=738, 5=529, 6=399, 7=240, 8=131} 9
> {0=1996, 1=1934, 2=1493, 3=1134, 4=846, 5=614, 6=513, 7=354, 8=233, 9=99} 10
> {0=2094, 1=2017, 2=1577, 3=1226, 4=935, 5=713, 6=610, 7=434, 8=359, 9=174, 
> 10=101} 11
> {0=2192, 1=2091, 2=1669, 3=1316, 4=1008, 5=797, 6=705, 7=517, 8=446, 9=255, 
> 10=173, 11=95} 12
> {0=2257, 1=2175, 2=1741, 3=1396, 4=1079, 5=882, 6=785, 7=596, 8=524, 9=340, 
> 10=254, 11=186, 12=73} 13
> {0=2330, 1=2258, 2=1816, 3=1458, 4=1160, 5=951, 6=858, 7=667, 8=602, 9=417, 
> 10=329, 11=265, 12=135, 13=66} 14
> {0=2397, 1=2323, 2=1897, 3=1542, 4=1233, 5=1008, 6=934, 7=723, 8=671, 9=479, 
> 10=385, 11=344, 12=210, 13=118, 14=72} 15
> {0=2454, 1=2395, 2=1949, 3=1603, 4=1296, 5=1055, 6=998, 7=803, 8=739, 9=539, 
> 10=453, 11=410, 12=280, 13=178, 14=147, 15=61} 16
> 
> Best,
> Ovidiu
> 
>> On 20 Feb 2017, at 12:04, Till Rohrmann  wrote:
>> 
>> Hi Ovidiu,
>> 
>> the way Flink works is to assign key group ranges to operators. For each 
>> element you calculate a hash value and based on that you assign it to a key 
>> group. Thus, in your example, you have either a key group with more than 1 
>> key or multiple key groups with 1 or more keys assigned to an operator.
>> 
>> So what you could try to do is to reduce the number of key groups to your 
>> parallelism via env.setMaxParallelism() and then try to figure a key out 
>> whose hashes are uniformly distributed over the key groups. The key group 
>> assignment is calculated via murmurHash(key.hashCode()) % maxParallelism.
>> 
>> Alternatively if you don’t need a keyed stream, you could try to use a 
>> custom partitioner via DataStream.partitionCustom.
>> 
>> Cheers,
>> Till
>> 
>> 
>> On Mon, Feb 20, 2017 at 11:46 AM, Ovidiu-Cristian MARCU 
>> mailto:ovidiu-cristian.ma...@inria.fr>> 
>> wrote:
>> Hi,
>> 
>> Can you please comment on how can I ensure stream input records are 
>> distributed evenly onto task slots?
>> See attached screen Records received issue.
>> 
>> I have a simple application which is applying some window function over a 
>> stream partitioned as follows:
>> (parallelism is equal to the number of keys; records with the same key are 
>> streamed evenly)
>> 
>> // get the execution environment
>> final StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> // get input data by connecting to the socket
>> DataStream text = env.socketTextStream("localhost", port, "\n");
>> DataStream> Long>> input = text.flatMap(...);
>> DataStream counts1 = null;
>> counts1 = input.keyBy(0).countWindow(windowSize, slideSize)
>>  .apply(new WindowFunction> Integer, String, Double, Long, Long>, Double, Tuple, GlobalWindow>() {
>>  ...
>>  });
>> counts1.writeAsText(params.get("output1"));
>> env.execute("Socket Window WordCount”);
>> 
>> Best,
>> Ovidiu
>> 
>> 
> 



Re: KeyGroupRangeAssignment ?

2017-02-20 Thread Ovidiu-Cristian MARCU
Hi,

Thank you for clarifications (I am working with KeyedStream so a custom 
partitioner does not help).

So I should set maxParallelism>=parallelism and change my keys (from 
input.keyBy(0)) such that key group assignment works as expected), 
but I can’t modify these keys in order to make it work.

The other option is to change Flink’s internals in order to evenly distribute 
keys (changing computeKeyGroupForKeyHash: is this enough?).
What I was looking for was an api to change the way key group assignment is 
done, but without changing Flink’s runtime.

I think that the maxParallelism setting is not enough (it introduces this 
inefficient way of distributing data for processing when using KeyedStream).
Is it possible to expose somehow the key group assignment?

This is how keys are distributed (1024 keys, key=1..1024; and groups from 2 to 
16 - equiv. parallelism that is number of slots):

{0=517, 1=507} 2
{0=881, 1=809, 2=358} 3
{0=1139, 1=1048, 2=617, 3=268} 4
{0=1319, 1=1268, 2=829, 3=473, 4=207} 5
{0=1512, 1=1425, 2=1008, 3=644, 4=352, 5=179} 6
{0=1656, 1=1586, 2=1160, 3=781, 4=512, 5=310, 6=139} 7
{0=1781, 1=1718, 2=1280, 3=908, 4=645, 5=417, 6=278, 7=141} 8
{0=1901, 1=1828, 2=1395, 3=1031, 4=738, 5=529, 6=399, 7=240, 8=131} 9
{0=1996, 1=1934, 2=1493, 3=1134, 4=846, 5=614, 6=513, 7=354, 8=233, 9=99} 10
{0=2094, 1=2017, 2=1577, 3=1226, 4=935, 5=713, 6=610, 7=434, 8=359, 9=174, 
10=101} 11
{0=2192, 1=2091, 2=1669, 3=1316, 4=1008, 5=797, 6=705, 7=517, 8=446, 9=255, 
10=173, 11=95} 12
{0=2257, 1=2175, 2=1741, 3=1396, 4=1079, 5=882, 6=785, 7=596, 8=524, 9=340, 
10=254, 11=186, 12=73} 13
{0=2330, 1=2258, 2=1816, 3=1458, 4=1160, 5=951, 6=858, 7=667, 8=602, 9=417, 
10=329, 11=265, 12=135, 13=66} 14
{0=2397, 1=2323, 2=1897, 3=1542, 4=1233, 5=1008, 6=934, 7=723, 8=671, 9=479, 
10=385, 11=344, 12=210, 13=118, 14=72} 15
{0=2454, 1=2395, 2=1949, 3=1603, 4=1296, 5=1055, 6=998, 7=803, 8=739, 9=539, 
10=453, 11=410, 12=280, 13=178, 14=147, 15=61} 16

Best,
Ovidiu

> On 20 Feb 2017, at 12:04, Till Rohrmann  wrote:
> 
> Hi Ovidiu,
> 
> the way Flink works is to assign key group ranges to operators. For each 
> element you calculate a hash value and based on that you assign it to a key 
> group. Thus, in your example, you have either a key group with more than 1 
> key or multiple key groups with 1 or more keys assigned to an operator.
> 
> So what you could try to do is to reduce the number of key groups to your 
> parallelism via env.setMaxParallelism() and then try to figure a key out 
> whose hashes are uniformly distributed over the key groups. The key group 
> assignment is calculated via murmurHash(key.hashCode()) % maxParallelism.
> 
> Alternatively if you don’t need a keyed stream, you could try to use a custom 
> partitioner via DataStream.partitionCustom.
> 
> Cheers,
> Till
> 
> 
> On Mon, Feb 20, 2017 at 11:46 AM, Ovidiu-Cristian MARCU 
> mailto:ovidiu-cristian.ma...@inria.fr>> 
> wrote:
> Hi,
> 
> Can you please comment on how can I ensure stream input records are 
> distributed evenly onto task slots?
> See attached screen Records received issue.
> 
> I have a simple application which is applying some window function over a 
> stream partitioned as follows:
> (parallelism is equal to the number of keys; records with the same key are 
> streamed evenly)
> 
> // get the execution environment
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> // get input data by connecting to the socket
> DataStream text = env.socketTextStream("localhost", port, "\n");
> DataStream Long>> input = text.flatMap(...);
> DataStream counts1 = null;
> counts1 = input.keyBy(0).countWindow(windowSize, slideSize)
>   .apply(new WindowFunction Integer, String, Double, Long, Long>, Double, Tuple, GlobalWindow>() {
>   ...
>   });
> counts1.writeAsText(params.get("output1"));
> env.execute("Socket Window WordCount”);
> 
> Best,
> Ovidiu
> 
> 



RE: [FLINK-3035] Redis as State Backend

2016-11-07 Thread Ovidiu Cristian Marcu
Thanks, how big was your state (GBs)?
Can you share your benchmark/s?

Best,
Ovidiu

-Original Message-
From: amir bahmanyari [mailto:amirto...@yahoo.com.INVALID] 
Sent: Tuesday, October 25, 2016 7:24 PM
To: dev@flink.apache.org
Subject: Re: [FLINK-3035] Redis as State Backend

FYI.I was using Redis as a state backend in my benchmarking Beam.It proved to 
be a bottleneck. Perhaps due to high frequency of updating state components.I 
replaced it with Java 8 ConcurrentHashmaps and it settled down 
tremendously.Amir-

  From: Ovidiu-Cristian MARCU 
 To: dev@flink.apache.org
 Sent: Tuesday, October 25, 2016 12:40 AM
 Subject: Re: [FLINK-3035] Redis as State Backend
   
Thank you!

Best,
Ovidiu

> On 24 Oct 2016, at 16:11, Aljoscha Krettek  wrote:
> 
> Hi,
> regarding RocksDB, yes this is possible because RocksDB is essentially 
> only used as an out-of-core hash table. When checkpointing we write 
> everything from RocksDB to HDFS. When restoring we repopulate an empty 
> local RocksDB instance from the data in HDFS.
> 
> Cheers,
> Aljoscha
> 
> On Fri, 21 Oct 2016 at 11:24 Ovidiu Cristian Marcu < 
> ovidiu.cristian.ma...@huawei.com> wrote:
> 
>> Hi
>> 
>> I missed your reply, thank you for feedback.
>> Agree with 1, that will be possible only with Ramcloud.
>> It is clear the second point.
>> 
>> A short question: if you checkpoint the operator's state in hdfs I 
>> assume that on failure you are restarting the operator's tasks on 
>> other nodes, is that possible with RocksDB?
>> 
>> Best,
>> Ovidiu
>> 
>> -Original Message-
>> From: Aljoscha Krettek [mailto:aljos...@apache.org]
>> Sent: Monday, October 17, 2016 2:51 PM
>> To: dev@flink.apache.org
>> Subject: Re: [FLINK-3035] Redis as State Backend
>> 
>> Hi,
>> there are two basic ideas for implementing a StateBackend based on Redis:
>> 1. Rely on Redis to keep the state, use nothing else.
>> 2. Use Redis to keep the state and checkpoint to some distributed 
>> file system (such as HDFS) when checkpointing
>> 
>> The first idea seems unwise because Redis is not a "strongly 
>> consistent distributed data store" as Elias pointed out on the issue. 
>> The second Idea is problematic because there is no easy way to read 
>> all state for a given Flink operator from a running Redis instance to 
>> store it in HDFS. That's what I was getting at in my comment.
>> 
>> Cheers,
>> Aljoscha
>> 
>> On Fri, 7 Oct 2016 at 17:19 Ovidiu Cristian Marcu < 
>> ovidiu.cristian.ma...@huawei.com> wrote:
>> 
>>> Hi
>>> 
>>> Can you please expand the last comment:
>>> 
>>> "I think, however, that for other reasons we will probably not be 
>>> able to implement this well. The problem is that we have to somehow 
>>> get at the state in redis for checkpointing. And if we use only one 
>>> Redis instance for all states then this will be problematic." - 
>>> Aljoscha Krettek
>>> 
>>> Any other update on this issue will help, not clear the status.
>>> 
>>> Best,
>>> Ovidiu
>>> 
>>> 
>> 



   


RE: TopSpeedWindowing - in error: Could not forward element to next operator

2016-11-07 Thread Ovidiu Cristian Marcu
Thank you, I will check this fix in my environment.

Best,
Ovidiu

-Original Message-
From: Aljoscha Krettek [mailto:aljos...@apache.org] 
Sent: Friday, October 21, 2016 5:47 PM
To: dev@flink.apache.org
Subject: Re: TopSpeedWindowing - in error: Could not forward element to next 
operator

Hi,
the problem is that EvictingWindowOperator uses StreamRecordSerializer to 
serialise the contents of the windows. This does not serialise timestamps so 
when the objects are deserialised from RocksDB they all have Long.MIN_VALUE as 
timestamp. The evictor in the program therefore always evicts all elements and 
the window function always sees an empty iterable.

I have a fix for this in a recent PR:
https://github.com/apache/flink/pull/2656

Cheers,
Aljoscha

On Fri, 21 Oct 2016 at 16:57 Ovidiu Cristian Marcu < 
ovidiu.cristian.ma...@huawei.com> wrote:

> Hi
>
> I have the latest source code Master.
> I think the refactoring you were doing on State Backend gives this 
> issue, unless the example is not supposed to work With rocksdb.
> At some point collected records's values are  null, giving the error..
>
> What do you think?
>
> Best,
> Ovidiu
>
> -Original Message-
> From: Till Rohrmann [mailto:trohrm...@apache.org]
> Sent: Friday, October 21, 2016 2:09 PM
> To: dev@flink.apache.org
> Subject: Re: TopSpeedWindowing - in error: Could not forward element 
> to next operator
>
> Hi Ovidiu,
>
> which version of Flink are you using?
>
> Cheers,
> Till
>
> On Thu, Oct 20, 2016 at 6:38 PM, Ovidiu Cristian Marcu < 
> ovidiu.cristian.ma...@huawei.com> wrote:
>
> > Could you check the following issue on master?
> >
> > When running this example org.apache.flink.streaming.examples.windowing.
> > TopSpeedWindowing
> > With default configuration I have no errors.
> >
> > When I change the state backend with RocksDB I receive this error:
> >
> > java.lang.RuntimeException: Could not forward element to next operator
> > at
> > org.apache.flink.streaming.runtime.tasks.OperatorChain$
> > CopyingChainingOutput.collect(OperatorChain.java:388)
> > at
> > org.apache.flink.streaming.runtime.tasks.OperatorChain$
> > ChainingOutput.collect(OperatorChain.java:1)
> > at org.apache.flink.streaming.api.operators.
> >
> AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:
> > 393)
> > at org.apache.flink.streaming.api.operators.
> > AbstractStreamOperator$CountingOutput.collect(
> > AbstractStreamOperator.java:1)
> > at org.apache.flink.streaming.api.operators.
> > TimestampedCollector.collect(TimestampedCollector.java:51)
> > at org.apache.flink.streaming.api.functions.windowing.
> > PassThroughWindowFunction.apply(PassThroughWindowFunction.java:32)
> > at org.apache.flink.streaming.api.functions.windowing.
> > ReduceApplyWindowFunction.apply(ReduceApplyWindowFunction.java:56)
> > at
> org.apache.flink.streaming.runtime.operators.windowing.
> > functions.InternalIterableWindowFunction.apply(
> > InternalIterableWindowFunction.java:50)
> > at
> org.apache.flink.streaming.runtime.operators.windowing.
> > functions.InternalIterableWindowFunction.apply(
> > InternalIterableWindowFunction.java:1)
> > at
> org.apache.flink.streaming.runtime.operators.windowing.
> > EvictingWindowOperator.fire(EvictingWindowOperator.java:334)
> > at
> org.apache.flink.streaming.runtime.operators.windowing.
> > EvictingWindowOperator.processElement(EvictingWindowOperator.java:199)
> > at org.apache.flink.streaming.runtime.io.
> > StreamInputProcessor.processInput(StreamInputProcessor.java:177)
> > at org.apache.flink.streaming.runtime.tasks.
> > OneInputStreamTask.run(OneInputStreamTask.java:66)
> > at org.apache.flink.streaming.runtime.tasks.StreamTask.
> > invoke(StreamTask.java:270)
> > at org.apache.flink.runtime.taskmanager.Task.run(Task.
> > java:609)
> > at java.lang.Thread.run(Thread.java:745)
> > Caused by: java.lang.NullPointerException
> > at org.apache.flink.api.java.typeutils.runtime.
> > TupleSerializer.copy(TupleSerializer.java:103)
> > at org.apache.flink.api.java.typeutils.runtime.
> > TupleSerializer.copy(TupleSerializer.java:30)
> > at
> > org.apache.flink.streaming.runtime.tasks.OperatorChain$
> > CopyingChainingOutput.collect(OperatorChain.java:383)
> > ... 15 more
> >
>


RE: [FLINK-3035] Redis as State Backend

2016-11-07 Thread Ovidiu Cristian Marcu
Great, thanks!

Best,
Ovidiu

-Original Message-
From: Aljoscha Krettek [mailto:aljos...@apache.org] 
Sent: Monday, October 24, 2016 3:11 PM
To: dev@flink.apache.org
Subject: Re: [FLINK-3035] Redis as State Backend

Hi,
regarding RocksDB, yes this is possible because RocksDB is essentially only 
used as an out-of-core hash table. When checkpointing we write everything from 
RocksDB to HDFS. When restoring we repopulate an empty local RocksDB instance 
from the data in HDFS.

Cheers,
Aljoscha

On Fri, 21 Oct 2016 at 11:24 Ovidiu Cristian Marcu < 
ovidiu.cristian.ma...@huawei.com> wrote:

> Hi
>
> I missed your reply, thank you for feedback.
> Agree with 1, that will be possible only with Ramcloud.
> It is clear the second point.
>
> A short question: if you checkpoint the operator's state in hdfs I 
> assume that on failure you are restarting the operator's tasks on 
> other nodes, is that possible with RocksDB?
>
> Best,
> Ovidiu
>
> -Original Message-
> From: Aljoscha Krettek [mailto:aljos...@apache.org]
> Sent: Monday, October 17, 2016 2:51 PM
> To: dev@flink.apache.org
> Subject: Re: [FLINK-3035] Redis as State Backend
>
> Hi,
> there are two basic ideas for implementing a StateBackend based on Redis:
>  1. Rely on Redis to keep the state, use nothing else.
>  2. Use Redis to keep the state and checkpoint to some distributed 
> file system (such as HDFS) when checkpointing
>
> The first idea seems unwise because Redis is not a "strongly 
> consistent distributed data store" as Elias pointed out on the issue. 
> The second Idea is problematic because there is no easy way to read 
> all state for a given Flink operator from a running Redis instance to 
> store it in HDFS. That's what I was getting at in my comment.
>
> Cheers,
> Aljoscha
>
> On Fri, 7 Oct 2016 at 17:19 Ovidiu Cristian Marcu < 
> ovidiu.cristian.ma...@huawei.com> wrote:
>
> > Hi
> >
> > Can you please expand the last comment:
> >
> > "I think, however, that for other reasons we will probably not be 
> > able to implement this well. The problem is that we have to somehow 
> > get at the state in redis for checkpointing. And if we use only one 
> > Redis instance for all states then this will be problematic." - 
> > Aljoscha Krettek
> >
> > Any other update on this issue will help, not clear the status.
> >
> > Best,
> > Ovidiu
> >
> >
>


Re: [FLINK-3035] Redis as State Backend

2016-10-25 Thread Ovidiu-Cristian MARCU
Thank you!

Best,
Ovidiu

> On 24 Oct 2016, at 16:11, Aljoscha Krettek  wrote:
> 
> Hi,
> regarding RocksDB, yes this is possible because RocksDB is essentially only
> used as an out-of-core hash table. When checkpointing we write everything
> from RocksDB to HDFS. When restoring we repopulate an empty local RocksDB
> instance from the data in HDFS.
> 
> Cheers,
> Aljoscha
> 
> On Fri, 21 Oct 2016 at 11:24 Ovidiu Cristian Marcu <
> ovidiu.cristian.ma...@huawei.com> wrote:
> 
>> Hi
>> 
>> I missed your reply, thank you for feedback.
>> Agree with 1, that will be possible only with Ramcloud.
>> It is clear the second point.
>> 
>> A short question: if you checkpoint the operator's state in hdfs I assume
>> that on failure you are
>> restarting the operator's tasks on other nodes, is that possible with
>> RocksDB?
>> 
>> Best,
>> Ovidiu
>> 
>> -Original Message-
>> From: Aljoscha Krettek [mailto:aljos...@apache.org]
>> Sent: Monday, October 17, 2016 2:51 PM
>> To: dev@flink.apache.org
>> Subject: Re: [FLINK-3035] Redis as State Backend
>> 
>> Hi,
>> there are two basic ideas for implementing a StateBackend based on Redis:
>> 1. Rely on Redis to keep the state, use nothing else.
>> 2. Use Redis to keep the state and checkpoint to some distributed file
>> system (such as HDFS) when checkpointing
>> 
>> The first idea seems unwise because Redis is not a "strongly consistent
>> distributed data store" as Elias pointed out on the issue. The second Idea
>> is problematic because there is no easy way to read all state for a given
>> Flink operator from a running Redis instance to store it in HDFS. That's
>> what I was getting at in my comment.
>> 
>> Cheers,
>> Aljoscha
>> 
>> On Fri, 7 Oct 2016 at 17:19 Ovidiu Cristian Marcu <
>> ovidiu.cristian.ma...@huawei.com> wrote:
>> 
>>> Hi
>>> 
>>> Can you please expand the last comment:
>>> 
>>> "I think, however, that for other reasons we will probably not be able
>>> to implement this well. The problem is that we have to somehow get at
>>> the state in redis for checkpointing. And if we use only one Redis
>>> instance for all states then this will be problematic." - Aljoscha
>>> Krettek
>>> 
>>> Any other update on this issue will help, not clear the status.
>>> 
>>> Best,
>>> Ovidiu
>>> 
>>> 
>> 



RE: TopSpeedWindowing - in error: Could not forward element to next operator

2016-10-21 Thread Ovidiu Cristian Marcu
Hi 

I have the latest source code Master.
I think the refactoring you were doing on State Backend gives this issue, 
unless the example is not supposed to work
With rocksdb.
At some point collected records's values are  null, giving the error..

What do you think?

Best,
Ovidiu

-Original Message-
From: Till Rohrmann [mailto:trohrm...@apache.org] 
Sent: Friday, October 21, 2016 2:09 PM
To: dev@flink.apache.org
Subject: Re: TopSpeedWindowing - in error: Could not forward element to next 
operator

Hi Ovidiu,

which version of Flink are you using?

Cheers,
Till

On Thu, Oct 20, 2016 at 6:38 PM, Ovidiu Cristian Marcu < 
ovidiu.cristian.ma...@huawei.com> wrote:

> Could you check the following issue on master?
>
> When running this example org.apache.flink.streaming.examples.windowing.
> TopSpeedWindowing
> With default configuration I have no errors.
>
> When I change the state backend with RocksDB I receive this error:
>
> java.lang.RuntimeException: Could not forward element to next operator
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.java:388)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$
> ChainingOutput.collect(OperatorChain.java:1)
> at org.apache.flink.streaming.api.operators.
> AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:
> 393)
> at org.apache.flink.streaming.api.operators.
> AbstractStreamOperator$CountingOutput.collect(
> AbstractStreamOperator.java:1)
> at org.apache.flink.streaming.api.operators.
> TimestampedCollector.collect(TimestampedCollector.java:51)
> at org.apache.flink.streaming.api.functions.windowing.
> PassThroughWindowFunction.apply(PassThroughWindowFunction.java:32)
> at org.apache.flink.streaming.api.functions.windowing.
> ReduceApplyWindowFunction.apply(ReduceApplyWindowFunction.java:56)
> at org.apache.flink.streaming.runtime.operators.windowing.
> functions.InternalIterableWindowFunction.apply(
> InternalIterableWindowFunction.java:50)
> at org.apache.flink.streaming.runtime.operators.windowing.
> functions.InternalIterableWindowFunction.apply(
> InternalIterableWindowFunction.java:1)
> at org.apache.flink.streaming.runtime.operators.windowing.
> EvictingWindowOperator.fire(EvictingWindowOperator.java:334)
> at org.apache.flink.streaming.runtime.operators.windowing.
> EvictingWindowOperator.processElement(EvictingWindowOperator.java:199)
> at org.apache.flink.streaming.runtime.io.
> StreamInputProcessor.processInput(StreamInputProcessor.java:177)
> at org.apache.flink.streaming.runtime.tasks.
> OneInputStreamTask.run(OneInputStreamTask.java:66)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:270)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.
> java:609)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NullPointerException
> at org.apache.flink.api.java.typeutils.runtime.
> TupleSerializer.copy(TupleSerializer.java:103)
> at org.apache.flink.api.java.typeutils.runtime.
> TupleSerializer.copy(TupleSerializer.java:30)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.java:383)
> ... 15 more
>


RE: [FLINK-3035] Redis as State Backend

2016-10-21 Thread Ovidiu Cristian Marcu
Hi

I missed your reply, thank you for feedback.
Agree with 1, that will be possible only with Ramcloud.
It is clear the second point.

A short question: if you checkpoint the operator's state in hdfs I assume that 
on failure you are
restarting the operator's tasks on other nodes, is that possible with RocksDB?

Best,
Ovidiu 

-Original Message-
From: Aljoscha Krettek [mailto:aljos...@apache.org] 
Sent: Monday, October 17, 2016 2:51 PM
To: dev@flink.apache.org
Subject: Re: [FLINK-3035] Redis as State Backend

Hi,
there are two basic ideas for implementing a StateBackend based on Redis:
 1. Rely on Redis to keep the state, use nothing else.
 2. Use Redis to keep the state and checkpoint to some distributed file system 
(such as HDFS) when checkpointing

The first idea seems unwise because Redis is not a "strongly consistent 
distributed data store" as Elias pointed out on the issue. The second Idea is 
problematic because there is no easy way to read all state for a given Flink 
operator from a running Redis instance to store it in HDFS. That's what I was 
getting at in my comment.

Cheers,
Aljoscha

On Fri, 7 Oct 2016 at 17:19 Ovidiu Cristian Marcu < 
ovidiu.cristian.ma...@huawei.com> wrote:

> Hi
>
> Can you please expand the last comment:
>
> "I think, however, that for other reasons we will probably not be able 
> to implement this well. The problem is that we have to somehow get at 
> the state in redis for checkpointing. And if we use only one Redis 
> instance for all states then this will be problematic." - Aljoscha 
> Krettek
>
> Any other update on this issue will help, not clear the status.
>
> Best,
> Ovidiu
>
>


TopSpeedWindowing - in error: Could not forward element to next operator

2016-10-20 Thread Ovidiu Cristian Marcu
Could you check the following issue on master?

When running this example org.apache.flink.streaming.examples.windowing. 
TopSpeedWindowing
With default configuration I have no errors.

When I change the state backend with RocksDB I receive this error:

java.lang.RuntimeException: Could not forward element to next operator
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:388)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:1)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:393)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:1)
at 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
at 
org.apache.flink.streaming.api.functions.windowing.PassThroughWindowFunction.apply(PassThroughWindowFunction.java:32)
at 
org.apache.flink.streaming.api.functions.windowing.ReduceApplyWindowFunction.apply(ReduceApplyWindowFunction.java:56)
at 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:50)
at 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:1)
at 
org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator.fire(EvictingWindowOperator.java:334)
at 
org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator.processElement(EvictingWindowOperator.java:199)
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:177)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:66)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:270)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:609)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:103)
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:30)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:383)
... 15 more


[FLINK-3035] Redis as State Backend

2016-10-07 Thread Ovidiu Cristian Marcu
Hi

Can you please expand the last comment:

"I think, however, that for other reasons we will probably not be able to 
implement this well. The problem is that we have to somehow get at the state in 
redis for checkpointing. And if we use only one Redis instance for all states 
then this will be problematic." - Aljoscha Krettek

Any other update on this issue will help, not clear the status.

Best,
Ovidiu



Re: Evaluating Apache Flink

2016-07-18 Thread Ovidiu-Cristian MARCU
Hi Kevin,

I have orchestrated an evaluation of Spark and Flink for various batch and 
graph processing workloads (no streaming, no sql) (this work has been accepted 
as a paper at Cluster and I will publish soon a report, for more details please 
contact me directly). 
Both engines did well, stable and scalable, in some case one is better but with 
some differences in tuning and resource usage.
Depending on your use case, you can or cannot compare these two engines, but 
overall they complete each other :).

I think the biggest issue is with the recovery on failure (more for batch use 
cases) , something which is to be handled (better) with this proposal 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-1+%3A+Fine+Grained+Recovery+from+Task+Failures
 


Best,
Ovidiu

> On 08 Jul 2016, at 15:33, Kevin Jacobs  wrote:
> 
> Hi Marton,
> 
> Thank you for your elaborate answer. I will comment in your e-mail below:
> 
> On 08.07.2016 15:13, Márton Balassi wrote:
>> Hi Kevin,
>> 
>> Thanks for being willing to contribute such an effort. I think it is a
>> completely valid discussion to ask in your organization and please feel
>> free to ask us questions during your evaluation. Putting statements on the
>> Flink website highlighting the differences would be very tricky though. I
>> would advise against that. Let me elaborate on that.
> 
> Thank you, I will definitely ask questions during the evaluation, next week 
> we will be setting up some experiments.
> 
>> 
>> The "How does it compare to Spark?" is definitely one of the most
>> frequently asked questions that we get and we can generally give three
>> types of answers:
>> 
>> *1. General architecture decisions*
>> 
>>- Streaming (pipelined) execution engine (or long running opreator
>>model).
>>- Native iteration operator.
>>- ...
>> 
>> The issue with this approach is that in itself it states borderline no
>> useful information for a decision maker. There you need benchmarks or fancy
>> features, so let us evaluate them.
> 
> That is definitely true, but don't you think that Flink and Spark will 
> "collapse" at some point in time? The differences between the two frameworks 
> are getting smaller and smaller, Spark also has support for streaming. Or 
> will the difference in the architecture be key in differentiating the two 
> frameworks?
> 
>> 
>> *2. Benchmarks*
>> You can find plenty of third-party benchmarks and soft evaluations [1,2,3]
>> of the two systems out there. The problem with these are that they are very
>> reliant on the version of the systems used, tuning and understanding the
>> general architecture. E.g. [1] favors Storm, but if you re-do the whole
>> benchmark from a Flink point of view you get [4]. After a couple of
>> versions the benchmark results can be very different.
>> 
>> *3. Fancy Features*
>> 
>>- Exactly once spillable streaming state stored locally
>>- Savepoints
>>- ...
>> 
>> Similarly to the previous point these might be an edge at some point in
>> time, but the whole streaming space is moving very quickly and as it is
>> open source projects tend to copy each other to a certain extent.
> 
> Why is this spacing moving so quickly? Is it due to the new technologies that 
> arise of processing streaming data? Would that not converge to only a handful 
> of stable frameworks in the future (just speculating)?
> 
>> 
>> This of course does not mean that doing evaluations at any point in time is
>> meaningless, but you need to update them frequently (check [5] and [6]) and
>> they can do more harm then good if not treated with care.
> 
> It would be great if there were evaluation methods that are reusable, so this 
> process does not have to be repeated every time. Unfortunately, there always 
> is a difference with previous frameworks, so that implies that custom made 
> evaluations should be made for every new framework. I like the 
> TeraGen/TeraSort/TeraValidate benchmark, that is at least a general benchmark 
> approach too some extend.
> 
>> 
>> I hope I was not too discouraging and could help you with your endeavor. It
>> is also very important to take your specific use cases into account.
> 
> It is definitely not discouraging, thank you for the answer :-)!
> 
>> 
>> Best,
>> 
>> Marton
>> 
>> [1]
>> https://yahooeng.tumblr.com/post/135321837876/benchmarking-streaming-computation-engines-at
>> [2] https://tech.zalando.de/blog/apache-showdown-flink-vs.-spark/
>> [3] http://data-artisans.com/how-we-selected-apache-flink-at-otto-group/
>> [4] http://data-artisans.com/extending-the-yahoo-streaming-benchmark/
>> [5]
>> http://www.slideshare.net/GyulaFra/largescale-stream-processing-in-the-hadoop-ecosystem
>> [6]
>> http://www.slideshare.net/GyulaFra/largescale-stream-processing-in-the-hadoop-ecosystem-hadoop-summit-2016-60887821
>> 
>> On Fri, Jul 8, 2016 at 2:23 PM, Kevin Jacobs  wrote:
>> 
>>> 

Re: Side-effects of DataSet::count

2016-05-31 Thread Ovidiu-Cristian MARCU
Hi Stephan and all,

Some reference to this may be https://issues.apache.org/jira/browse/FLINK-2250 
 ?
I agree your priorities on streaming are very high, it will make a big +1 for 
the community to create a discussion/place for the design proposal improvement 
and eventually launch an initial draft (including new requirements). As one can 
try to dig in, is quite complex what you have already achieved (for example 
FLINK-2097, FLINK-1350, FLINK-1359 and related, mainly FLINK-986). These issues 
are a pain for DataSets.

Best,
Ovidiu

> On 31 May 2016, at 11:27, Stephan Ewen  wrote:
> 
> Hi!
> 
> There was some preliminary work on this. By now, the requirements have
> grown a bit. The backtracking needs to handle
> 
>  - Scheduling for execution (the here raised point), possibly resuming
> from available intermediate results
>  - Recovery from partially executed programs, where operators execute
> whole or not (batch style)
>  - Recover from intermediate result since latest completed checkpoint
>  - Eventually even recover superstep-based iterations.
> 
> So the design needs to be extended slightly. We do not have a design
> writeup for this, but I agree, it would be great to have one.
> I have a pretty good general idea about this, let me see if I can get to
> that next week.
> 
> In general, for such things (long standing ideas and designs), we should
> have something like Kafka has with its KIPs (Kafka Improvement Proposal) -
> a place where to collect them, refine them over time, and
> see how people react to them or step up to implement them. We could call
> them 3Fs (Flink Feature Forms) ;-)
> 
> Greetings,
> Stephan
> 
> 
> On Tue, May 31, 2016 at 1:02 AM, Greg Hogan  wrote:
> 
>> Hi Stephan,
>> 
>> Is there a design document, prior discussion, or background material on
>> this enhancement? Am I correct in understanding that this only applies to
>> DataSet since streams run indefinitely?
>> 
>> Thanks,
>> Greg
>> 
>> On Mon, May 30, 2016 at 5:49 PM, Stephan Ewen  wrote:
>> 
>>> Hi Eron!
>>> 
>>> Yes, the idea is to actually switch all executions to a backtracking
>>> scheduling mode. That simultaneously solves both fine grained recovery
>> and
>>> lazy execution, where later stages build on prior stages.
>>> 
>>> With all the work around streaming, we have not gotten to this so far,
>> but
>>> it is one feature still in the list...
>>> 
>>> Greetings,
>>> Stephan
>>> 
>>> 
>>> On Mon, May 30, 2016 at 9:55 PM, Eron Wright  wrote:
>>> 
 Thinking out loud now…
 
 Is the job graph fully mutable?   Can it be cleared?   For example,
 shouldn’t the count method remove the sink after execution completes?
 
 Can numerous job graphs co-exist within a single driver program?How
 would that relate to the session concept?
 
 Seems the count method should use ‘backtracking’ schedule mode, and
>> only
 execute the minimum needed to materialize the count sink.
 
> On May 29, 2016, at 3:08 PM, Márton Balassi <
>> balassi.mar...@gmail.com>
 wrote:
> 
> Hey Eron,
> 
> Yes, DataSet#collect and count methods implicitly trigger a JobGraph
> execution, thus they also trigger writing to any previously defined
 sinks.
> The idea behind this behavior is to enable interactive querying (the
>>> one
> that you are used to get from a shell environment) and it is also a
>>> great
> debugging tool.
> 
> Best,
> 
> Marton
> 
> On Sun, May 29, 2016 at 11:28 PM, Eron Wright 
>>> wrote:
> 
>> I was curious as to how the `count` method on DataSet worked, and
>> was
>> surprised to see that it executes the entire program graph.
>> Wouldn’t
 this
>> cause undesirable side-effects like writing to sinks?Also
>> strange
 that
>> the graph is mutated with the addition of a sink (that isn’t
 subsequently
>> removed).
>> 
>> Surveying the Flink code, there aren’t many situations where the
>>> program
>> graph is implicitly executed (`collect` is another).   Nonetheless,
>>> this
>> has deepened my appreciation for how dynamic the application might
>> be.
>> 
>> // DataSet.java
>> public long count() throws Exception {
>>  final String id = new AbstractID().toString();
>> 
>>  output(new Utils.CountHelper(id)).name("count()");
>> 
>>  JobExecutionResult res = getExecutionEnvironment().execute();
>>  return res. getAccumulatorResult(id);
>> }
>> Eron
 
 
>>> 
>>