Re: Use keyBy to deterministically hash each record to a processor/task/slot

2018-02-21 Thread m@xi
Hello! I have used up till now your method to generate keys for the .keyBy() function, in order to specifically know at which processor id each tuple will end up in the end (w.r.t the key % #procs operation). Though I had to shift to Java cause the documentation is better. And I implemented your

Re: Use keyBy to deterministically hash each record to a processor/task/slot

2017-11-02 Thread m@xi
Hello Dongwon, Thanks a lot for your excellent reply! Seems we have the same problem. Still your solution is less hard coded than mine. Thanks a lot! I am also looking forward to see a capability of creating a custom partitioner for keyBy() in Flink. Best, Max -- Sent from:

Re: Use keyBy to deterministically hash each record to a processor/task/slot

2017-11-02 Thread Dongwon Kim
Hello, As I need to generate the same number of keys as that of partitions, I also suffer from this problem [1]: My current solution is to generate enough keys until I have at least one key per partition, which looks very stupid to me (I copy and paste my code below). If Flink changes its way to

Re: Use keyBy to deterministically hash each record to a processor/task/slot

2017-11-02 Thread m@xi
Hello Tony, Thanks a lot for your answer. Now I know exactly what happens with keyBy function, yet still I haven't figured out a proper (non hard coded way) to deterministically send a tuple to each key. If somenone from the Flink team could help it would be great! Max -- Sent from:

Re: Use keyBy to deterministically hash each record to a processor/task/slot

2017-10-30 Thread Tony Wei
Hi Max, The way that Flink to assign key to which subtask is based on `KeyGroupRangeAssignment.assignKeyToParallelOperator`. Its first step is to assign key to a key group based on the max parallelism [2]. Then, assign each key group to a specific subtask based on the current parallelism [3].

Use keyBy to deterministically hash each record to a processor/task/slot

2017-10-30 Thread m@xi
Hi all, After trying to understand exactly how keyBy works internally, I did not get anything more than "it applies obj.hashcode() % n", where n is the number of tasks/processors. This post for example