[jira] [Commented] (FLINK-2590) DataSetUtils.zipWithUniqueID creates duplicate IDs

2015-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2590?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14726935#comment-14726935
 ] 

ASF GitHub Bot commented on FLINK-2590:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1075


> DataSetUtils.zipWithUniqueID creates duplicate IDs
> --
>
> Key: FLINK-2590
> URL: https://issues.apache.org/jira/browse/FLINK-2590
> Project: Flink
>  Issue Type: Bug
>  Components: Java API, Scala API
>Affects Versions: 0.10, master
>Reporter: Martin Junghanns
>Assignee: Martin Junghanns
>Priority: Minor
>
> The function creates IDs using the following code:
> {code:java}
> shifter = log2(numberOfParallelSubtasks)
> id = counter << shifter + taskId;
> {code}
> As the binary function + is executed before the bitshift <<, this results in 
> cases where different tasks create the same ID. It essentially calculates
> {code}
> counter*2^(shifter+taskId)
> {code}
> which is 0 for counter = 0 and all values of shifter and taskID.
> Consider the following example.
> numberOfParallelSubtaks = 8 
> shifter = log2(8) = 4 (maybe rename the function?)
> produces:
> {code}
> start: 1, shifter: 4 taskId: 4 label: 256
> start: 2, shifter: 4 taskId: 3 label: 256
> start: 4, shifter: 4 taskId: 2 label: 256
> {code}
> I would suggest the following:
> {code}
> counter*2^(shifter)+taskId
> {code}
> which in code is equivalent to
> {code}
> shifter = log2(numberOfParallelSubtasks);
> id = (counter << shifter) + taskId;
> {code}
> and for our example produces:
> {code}
> start: 1, shifter: 4 taskId: 4 label: 20
> start: 2, shifter: 4 taskId: 3 label: 35
> start: 4, shifter: 4 taskId: 2 label: 66
> {code}
> So we move the counter to the left and add the task id. As there is space for 
> 2^shifter numbers, this prevents collisions.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2590) DataSetUtils.zipWithUniqueID creates duplicate IDs

2015-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2590?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14725097#comment-14725097
 ] 

ASF GitHub Bot commented on FLINK-2590:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1075#issuecomment-136655355
  
No problem @s1ck. It might be a bit redundant but it tests that the 
forwarding is done correctly. Therefore, I fixed the test case.


> DataSetUtils.zipWithUniqueID creates duplicate IDs
> --
>
> Key: FLINK-2590
> URL: https://issues.apache.org/jira/browse/FLINK-2590
> Project: Flink
>  Issue Type: Bug
>  Components: Java API, Scala API
>Affects Versions: 0.10, master
>Reporter: Martin Junghanns
>Assignee: Martin Junghanns
>Priority: Minor
>
> The function creates IDs using the following code:
> {code:java}
> shifter = log2(numberOfParallelSubtasks)
> id = counter << shifter + taskId;
> {code}
> As the binary function + is executed before the bitshift <<, this results in 
> cases where different tasks create the same ID. It essentially calculates
> {code}
> counter*2^(shifter+taskId)
> {code}
> which is 0 for counter = 0 and all values of shifter and taskID.
> Consider the following example.
> numberOfParallelSubtaks = 8 
> shifter = log2(8) = 4 (maybe rename the function?)
> produces:
> {code}
> start: 1, shifter: 4 taskId: 4 label: 256
> start: 2, shifter: 4 taskId: 3 label: 256
> start: 4, shifter: 4 taskId: 2 label: 256
> {code}
> I would suggest the following:
> {code}
> counter*2^(shifter)+taskId
> {code}
> which in code is equivalent to
> {code}
> shifter = log2(numberOfParallelSubtasks);
> id = (counter << shifter) + taskId;
> {code}
> and for our example produces:
> {code}
> start: 1, shifter: 4 taskId: 4 label: 20
> start: 2, shifter: 4 taskId: 3 label: 35
> start: 4, shifter: 4 taskId: 2 label: 66
> {code}
> So we move the counter to the left and add the task id. As there is space for 
> 2^shifter numbers, this prevents collisions.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2590) DataSetUtils.zipWithUniqueID creates duplicate IDs

2015-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2590?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14724924#comment-14724924
 ] 

ASF GitHub Bot commented on FLINK-2590:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1075#issuecomment-136619441
  
@s1ck, it's important to note that `1` will be subtracted from 
`getRuntimeContext().getNumberOfParallelSubtasks()` and not `getBitSize()`. The 
reason is that we have `0` based indices for the subtasks. Thus, we only have 
to calculate the maximum needed bits for the highest index we can encounter. 
And this is `getRuntimeContext().getNumberOfParallelSubtasks() - 1`. Thus if 
`getNumberOfParallelSubtasks == 7`, then we would calculate `getBitSize(6) == 
3`.


> DataSetUtils.zipWithUniqueID creates duplicate IDs
> --
>
> Key: FLINK-2590
> URL: https://issues.apache.org/jira/browse/FLINK-2590
> Project: Flink
>  Issue Type: Bug
>  Components: Java API, Scala API
>Affects Versions: 0.10, master
>Reporter: Martin Junghanns
>Assignee: Martin Junghanns
>Priority: Minor
>
> The function creates IDs using the following code:
> {code:java}
> shifter = log2(numberOfParallelSubtasks)
> id = counter << shifter + taskId;
> {code}
> As the binary function + is executed before the bitshift <<, this results in 
> cases where different tasks create the same ID. It essentially calculates
> {code}
> counter*2^(shifter+taskId)
> {code}
> which is 0 for counter = 0 and all values of shifter and taskID.
> Consider the following example.
> numberOfParallelSubtaks = 8 
> shifter = log2(8) = 4 (maybe rename the function?)
> produces:
> {code}
> start: 1, shifter: 4 taskId: 4 label: 256
> start: 2, shifter: 4 taskId: 3 label: 256
> start: 4, shifter: 4 taskId: 2 label: 256
> {code}
> I would suggest the following:
> {code}
> counter*2^(shifter)+taskId
> {code}
> which in code is equivalent to
> {code}
> shifter = log2(numberOfParallelSubtasks);
> id = (counter << shifter) + taskId;
> {code}
> and for our example produces:
> {code}
> start: 1, shifter: 4 taskId: 4 label: 20
> start: 2, shifter: 4 taskId: 3 label: 35
> start: 4, shifter: 4 taskId: 2 label: 66
> {code}
> So we move the counter to the left and add the task id. As there is space for 
> 2^shifter numbers, this prevents collisions.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2590) DataSetUtils.zipWithUniqueID creates duplicate IDs

2015-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2590?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14725109#comment-14725109
 ] 

ASF GitHub Bot commented on FLINK-2590:
---

Github user s1ck commented on the pull request:

https://github.com/apache/flink/pull/1075#issuecomment-136658385
  
Ok, thank you.


> DataSetUtils.zipWithUniqueID creates duplicate IDs
> --
>
> Key: FLINK-2590
> URL: https://issues.apache.org/jira/browse/FLINK-2590
> Project: Flink
>  Issue Type: Bug
>  Components: Java API, Scala API
>Affects Versions: 0.10, master
>Reporter: Martin Junghanns
>Assignee: Martin Junghanns
>Priority: Minor
>
> The function creates IDs using the following code:
> {code:java}
> shifter = log2(numberOfParallelSubtasks)
> id = counter << shifter + taskId;
> {code}
> As the binary function + is executed before the bitshift <<, this results in 
> cases where different tasks create the same ID. It essentially calculates
> {code}
> counter*2^(shifter+taskId)
> {code}
> which is 0 for counter = 0 and all values of shifter and taskID.
> Consider the following example.
> numberOfParallelSubtaks = 8 
> shifter = log2(8) = 4 (maybe rename the function?)
> produces:
> {code}
> start: 1, shifter: 4 taskId: 4 label: 256
> start: 2, shifter: 4 taskId: 3 label: 256
> start: 4, shifter: 4 taskId: 2 label: 256
> {code}
> I would suggest the following:
> {code}
> counter*2^(shifter)+taskId
> {code}
> which in code is equivalent to
> {code}
> shifter = log2(numberOfParallelSubtasks);
> id = (counter << shifter) + taskId;
> {code}
> and for our example produces:
> {code}
> start: 1, shifter: 4 taskId: 4 label: 20
> start: 2, shifter: 4 taskId: 3 label: 35
> start: 4, shifter: 4 taskId: 2 label: 66
> {code}
> So we move the counter to the left and add the task id. As there is space for 
> 2^shifter numbers, this prevents collisions.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2590) DataSetUtils.zipWithUniqueID creates duplicate IDs

2015-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2590?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14724961#comment-14724961
 ] 

ASF GitHub Bot commented on FLINK-2590:
---

Github user s1ck commented on the pull request:

https://github.com/apache/flink/pull/1075#issuecomment-136633442
  
@tillrohrmann of course you are right, I thought wrong about it. it's 
committed


> DataSetUtils.zipWithUniqueID creates duplicate IDs
> --
>
> Key: FLINK-2590
> URL: https://issues.apache.org/jira/browse/FLINK-2590
> Project: Flink
>  Issue Type: Bug
>  Components: Java API, Scala API
>Affects Versions: 0.10, master
>Reporter: Martin Junghanns
>Assignee: Martin Junghanns
>Priority: Minor
>
> The function creates IDs using the following code:
> {code:java}
> shifter = log2(numberOfParallelSubtasks)
> id = counter << shifter + taskId;
> {code}
> As the binary function + is executed before the bitshift <<, this results in 
> cases where different tasks create the same ID. It essentially calculates
> {code}
> counter*2^(shifter+taskId)
> {code}
> which is 0 for counter = 0 and all values of shifter and taskID.
> Consider the following example.
> numberOfParallelSubtaks = 8 
> shifter = log2(8) = 4 (maybe rename the function?)
> produces:
> {code}
> start: 1, shifter: 4 taskId: 4 label: 256
> start: 2, shifter: 4 taskId: 3 label: 256
> start: 4, shifter: 4 taskId: 2 label: 256
> {code}
> I would suggest the following:
> {code}
> counter*2^(shifter)+taskId
> {code}
> which in code is equivalent to
> {code}
> shifter = log2(numberOfParallelSubtasks);
> id = (counter << shifter) + taskId;
> {code}
> and for our example produces:
> {code}
> start: 1, shifter: 4 taskId: 4 label: 20
> start: 2, shifter: 4 taskId: 3 label: 35
> start: 4, shifter: 4 taskId: 2 label: 66
> {code}
> So we move the counter to the left and add the task id. As there is space for 
> 2^shifter numbers, this prevents collisions.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2590) DataSetUtils.zipWithUniqueID creates duplicate IDs

2015-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2590?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14725086#comment-14725086
 ] 

ASF GitHub Bot commented on FLINK-2590:
---

Github user s1ck commented on the pull request:

https://github.com/apache/flink/pull/1075#issuecomment-136654131
  
Sorry, I did not see that there are also identical test cases in Scala 
which now fail due to the `-1` change. As those scala methods wrap the Java 
methods, is it necessary to run the same tests on them again?


> DataSetUtils.zipWithUniqueID creates duplicate IDs
> --
>
> Key: FLINK-2590
> URL: https://issues.apache.org/jira/browse/FLINK-2590
> Project: Flink
>  Issue Type: Bug
>  Components: Java API, Scala API
>Affects Versions: 0.10, master
>Reporter: Martin Junghanns
>Assignee: Martin Junghanns
>Priority: Minor
>
> The function creates IDs using the following code:
> {code:java}
> shifter = log2(numberOfParallelSubtasks)
> id = counter << shifter + taskId;
> {code}
> As the binary function + is executed before the bitshift <<, this results in 
> cases where different tasks create the same ID. It essentially calculates
> {code}
> counter*2^(shifter+taskId)
> {code}
> which is 0 for counter = 0 and all values of shifter and taskID.
> Consider the following example.
> numberOfParallelSubtaks = 8 
> shifter = log2(8) = 4 (maybe rename the function?)
> produces:
> {code}
> start: 1, shifter: 4 taskId: 4 label: 256
> start: 2, shifter: 4 taskId: 3 label: 256
> start: 4, shifter: 4 taskId: 2 label: 256
> {code}
> I would suggest the following:
> {code}
> counter*2^(shifter)+taskId
> {code}
> which in code is equivalent to
> {code}
> shifter = log2(numberOfParallelSubtasks);
> id = (counter << shifter) + taskId;
> {code}
> and for our example produces:
> {code}
> start: 1, shifter: 4 taskId: 4 label: 20
> start: 2, shifter: 4 taskId: 3 label: 35
> start: 4, shifter: 4 taskId: 2 label: 66
> {code}
> So we move the counter to the left and add the task id. As there is space for 
> 2^shifter numbers, this prevents collisions.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2590) DataSetUtils.zipWithUniqueID creates duplicate IDs

2015-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2590?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14725007#comment-14725007
 ] 

ASF GitHub Bot commented on FLINK-2590:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1075#issuecomment-136640020
  
@s1ck, looks really good. Thanks for your contribution. Will merge it now.


> DataSetUtils.zipWithUniqueID creates duplicate IDs
> --
>
> Key: FLINK-2590
> URL: https://issues.apache.org/jira/browse/FLINK-2590
> Project: Flink
>  Issue Type: Bug
>  Components: Java API, Scala API
>Affects Versions: 0.10, master
>Reporter: Martin Junghanns
>Assignee: Martin Junghanns
>Priority: Minor
>
> The function creates IDs using the following code:
> {code:java}
> shifter = log2(numberOfParallelSubtasks)
> id = counter << shifter + taskId;
> {code}
> As the binary function + is executed before the bitshift <<, this results in 
> cases where different tasks create the same ID. It essentially calculates
> {code}
> counter*2^(shifter+taskId)
> {code}
> which is 0 for counter = 0 and all values of shifter and taskID.
> Consider the following example.
> numberOfParallelSubtaks = 8 
> shifter = log2(8) = 4 (maybe rename the function?)
> produces:
> {code}
> start: 1, shifter: 4 taskId: 4 label: 256
> start: 2, shifter: 4 taskId: 3 label: 256
> start: 4, shifter: 4 taskId: 2 label: 256
> {code}
> I would suggest the following:
> {code}
> counter*2^(shifter)+taskId
> {code}
> which in code is equivalent to
> {code}
> shifter = log2(numberOfParallelSubtasks);
> id = (counter << shifter) + taskId;
> {code}
> and for our example produces:
> {code}
> start: 1, shifter: 4 taskId: 4 label: 20
> start: 2, shifter: 4 taskId: 3 label: 35
> start: 4, shifter: 4 taskId: 2 label: 66
> {code}
> So we move the counter to the left and add the task id. As there is space for 
> 2^shifter numbers, this prevents collisions.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2590) DataSetUtils.zipWithUniqueID creates duplicate IDs

2015-08-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2590?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14723667#comment-14723667
 ] 

ASF GitHub Bot commented on FLINK-2590:
---

Github user s1ck commented on the pull request:

https://github.com/apache/flink/pull/1075#issuecomment-136426216
  
@tillrohrmann While writing the new tests for both methods, I encountered 
that `zipWithIndex` is broken, too. It sometimes throws 
`ConcurrentModificationException`. This is because each task sorts a 
broadcasted list in the `open` method. This could not fail before due to 
parallelism = 1.
I would fix this by creating a local copy of that list (which should be 
small in that specific case). Shall I fix this in the same issue or do you want 
me to create a new issue for that?


> DataSetUtils.zipWithUniqueID creates duplicate IDs
> --
>
> Key: FLINK-2590
> URL: https://issues.apache.org/jira/browse/FLINK-2590
> Project: Flink
>  Issue Type: Bug
>  Components: Java API, Scala API
>Affects Versions: 0.10, master
>Reporter: Martin Junghanns
>Assignee: Martin Junghanns
>Priority: Minor
>
> The function creates IDs using the following code:
> {code:java}
> shifter = log2(numberOfParallelSubtasks)
> id = counter << shifter + taskId;
> {code}
> As the binary function + is executed before the bitshift <<, this results in 
> cases where different tasks create the same ID. It essentially calculates
> {code}
> counter*2^(shifter+taskId)
> {code}
> which is 0 for counter = 0 and all values of shifter and taskID.
> Consider the following example.
> numberOfParallelSubtaks = 8 
> shifter = log2(8) = 4 (maybe rename the function?)
> produces:
> {code}
> start: 1, shifter: 4 taskId: 4 label: 256
> start: 2, shifter: 4 taskId: 3 label: 256
> start: 4, shifter: 4 taskId: 2 label: 256
> {code}
> I would suggest the following:
> {code}
> counter*2^(shifter)+taskId
> {code}
> which in code is equivalent to
> {code}
> shifter = log2(numberOfParallelSubtasks);
> id = (counter << shifter) + taskId;
> {code}
> and for our example produces:
> {code}
> start: 1, shifter: 4 taskId: 4 label: 20
> start: 2, shifter: 4 taskId: 3 label: 35
> start: 4, shifter: 4 taskId: 2 label: 66
> {code}
> So we move the counter to the left and add the task id. As there is space for 
> 2^shifter numbers, this prevents collisions.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2590) DataSetUtils.zipWithUniqueID creates duplicate IDs

2015-08-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2590?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14723681#comment-14723681
 ] 

ASF GitHub Bot commented on FLINK-2590:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1075#issuecomment-136427373
  
There is an issue that tracks the `ConcurrentModificationException`problem. 
As per discussion in that issue, can you use a `BroadcastVariableInitializer`? 
Safes redundant sorts.


> DataSetUtils.zipWithUniqueID creates duplicate IDs
> --
>
> Key: FLINK-2590
> URL: https://issues.apache.org/jira/browse/FLINK-2590
> Project: Flink
>  Issue Type: Bug
>  Components: Java API, Scala API
>Affects Versions: 0.10, master
>Reporter: Martin Junghanns
>Assignee: Martin Junghanns
>Priority: Minor
>
> The function creates IDs using the following code:
> {code:java}
> shifter = log2(numberOfParallelSubtasks)
> id = counter << shifter + taskId;
> {code}
> As the binary function + is executed before the bitshift <<, this results in 
> cases where different tasks create the same ID. It essentially calculates
> {code}
> counter*2^(shifter+taskId)
> {code}
> which is 0 for counter = 0 and all values of shifter and taskID.
> Consider the following example.
> numberOfParallelSubtaks = 8 
> shifter = log2(8) = 4 (maybe rename the function?)
> produces:
> {code}
> start: 1, shifter: 4 taskId: 4 label: 256
> start: 2, shifter: 4 taskId: 3 label: 256
> start: 4, shifter: 4 taskId: 2 label: 256
> {code}
> I would suggest the following:
> {code}
> counter*2^(shifter)+taskId
> {code}
> which in code is equivalent to
> {code}
> shifter = log2(numberOfParallelSubtasks);
> id = (counter << shifter) + taskId;
> {code}
> and for our example produces:
> {code}
> start: 1, shifter: 4 taskId: 4 label: 20
> start: 2, shifter: 4 taskId: 3 label: 35
> start: 4, shifter: 4 taskId: 2 label: 66
> {code}
> So we move the counter to the left and add the task id. As there is space for 
> 2^shifter numbers, this prevents collisions.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2590) DataSetUtils.zipWithUniqueID creates duplicate IDs

2015-08-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2590?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14723701#comment-14723701
 ] 

ASF GitHub Bot commented on FLINK-2590:
---

Github user s1ck commented on the pull request:

https://github.com/apache/flink/pull/1075#issuecomment-136431165
  
@StephanEwen thx for the hint. works fine! Will cleanup and commit now.


> DataSetUtils.zipWithUniqueID creates duplicate IDs
> --
>
> Key: FLINK-2590
> URL: https://issues.apache.org/jira/browse/FLINK-2590
> Project: Flink
>  Issue Type: Bug
>  Components: Java API, Scala API
>Affects Versions: 0.10, master
>Reporter: Martin Junghanns
>Assignee: Martin Junghanns
>Priority: Minor
>
> The function creates IDs using the following code:
> {code:java}
> shifter = log2(numberOfParallelSubtasks)
> id = counter << shifter + taskId;
> {code}
> As the binary function + is executed before the bitshift <<, this results in 
> cases where different tasks create the same ID. It essentially calculates
> {code}
> counter*2^(shifter+taskId)
> {code}
> which is 0 for counter = 0 and all values of shifter and taskID.
> Consider the following example.
> numberOfParallelSubtaks = 8 
> shifter = log2(8) = 4 (maybe rename the function?)
> produces:
> {code}
> start: 1, shifter: 4 taskId: 4 label: 256
> start: 2, shifter: 4 taskId: 3 label: 256
> start: 4, shifter: 4 taskId: 2 label: 256
> {code}
> I would suggest the following:
> {code}
> counter*2^(shifter)+taskId
> {code}
> which in code is equivalent to
> {code}
> shifter = log2(numberOfParallelSubtasks);
> id = (counter << shifter) + taskId;
> {code}
> and for our example produces:
> {code}
> start: 1, shifter: 4 taskId: 4 label: 20
> start: 2, shifter: 4 taskId: 3 label: 35
> start: 4, shifter: 4 taskId: 2 label: 66
> {code}
> So we move the counter to the left and add the task id. As there is space for 
> 2^shifter numbers, this prevents collisions.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2590) DataSetUtils.zipWithUniqueID creates duplicate IDs

2015-08-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2590?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14723779#comment-14723779
 ] 

ASF GitHub Bot commented on FLINK-2590:
---

Github user s1ck commented on the pull request:

https://github.com/apache/flink/pull/1075#issuecomment-136445029
  
@tillrohrmann I did not include the `shifter = 
getBitSize(getRuntimeContext().getNumberOfParallelSubtasks() - 1)` as your hint 
only applies for power of 2 values. E.g., `getBitSize(7)` returns 3 and we need 
3 bits to cover the range from 0 to 6.


> DataSetUtils.zipWithUniqueID creates duplicate IDs
> --
>
> Key: FLINK-2590
> URL: https://issues.apache.org/jira/browse/FLINK-2590
> Project: Flink
>  Issue Type: Bug
>  Components: Java API, Scala API
>Affects Versions: 0.10, master
>Reporter: Martin Junghanns
>Assignee: Martin Junghanns
>Priority: Minor
>
> The function creates IDs using the following code:
> {code:java}
> shifter = log2(numberOfParallelSubtasks)
> id = counter << shifter + taskId;
> {code}
> As the binary function + is executed before the bitshift <<, this results in 
> cases where different tasks create the same ID. It essentially calculates
> {code}
> counter*2^(shifter+taskId)
> {code}
> which is 0 for counter = 0 and all values of shifter and taskID.
> Consider the following example.
> numberOfParallelSubtaks = 8 
> shifter = log2(8) = 4 (maybe rename the function?)
> produces:
> {code}
> start: 1, shifter: 4 taskId: 4 label: 256
> start: 2, shifter: 4 taskId: 3 label: 256
> start: 4, shifter: 4 taskId: 2 label: 256
> {code}
> I would suggest the following:
> {code}
> counter*2^(shifter)+taskId
> {code}
> which in code is equivalent to
> {code}
> shifter = log2(numberOfParallelSubtasks);
> id = (counter << shifter) + taskId;
> {code}
> and for our example produces:
> {code}
> start: 1, shifter: 4 taskId: 4 label: 20
> start: 2, shifter: 4 taskId: 3 label: 35
> start: 4, shifter: 4 taskId: 2 label: 66
> {code}
> So we move the counter to the left and add the task id. As there is space for 
> 2^shifter numbers, this prevents collisions.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2590) DataSetUtils.zipWithUniqueID creates duplicate IDs

2015-08-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2590?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14724544#comment-14724544
 ] 

ASF GitHub Bot commented on FLINK-2590:
---

Github user HuangWHWHW commented on the pull request:

https://github.com/apache/flink/pull/1075#issuecomment-136545997
  
Ah, thank you for the proof.
And didn`t see the log2 in detail before, sorry.


> DataSetUtils.zipWithUniqueID creates duplicate IDs
> --
>
> Key: FLINK-2590
> URL: https://issues.apache.org/jira/browse/FLINK-2590
> Project: Flink
>  Issue Type: Bug
>  Components: Java API, Scala API
>Affects Versions: 0.10, master
>Reporter: Martin Junghanns
>Assignee: Martin Junghanns
>Priority: Minor
>
> The function creates IDs using the following code:
> {code:java}
> shifter = log2(numberOfParallelSubtasks)
> id = counter << shifter + taskId;
> {code}
> As the binary function + is executed before the bitshift <<, this results in 
> cases where different tasks create the same ID. It essentially calculates
> {code}
> counter*2^(shifter+taskId)
> {code}
> which is 0 for counter = 0 and all values of shifter and taskID.
> Consider the following example.
> numberOfParallelSubtaks = 8 
> shifter = log2(8) = 4 (maybe rename the function?)
> produces:
> {code}
> start: 1, shifter: 4 taskId: 4 label: 256
> start: 2, shifter: 4 taskId: 3 label: 256
> start: 4, shifter: 4 taskId: 2 label: 256
> {code}
> I would suggest the following:
> {code}
> counter*2^(shifter)+taskId
> {code}
> which in code is equivalent to
> {code}
> shifter = log2(numberOfParallelSubtasks);
> id = (counter << shifter) + taskId;
> {code}
> and for our example produces:
> {code}
> start: 1, shifter: 4 taskId: 4 label: 20
> start: 2, shifter: 4 taskId: 3 label: 35
> start: 4, shifter: 4 taskId: 2 label: 66
> {code}
> So we move the counter to the left and add the task id. As there is space for 
> 2^shifter numbers, this prevents collisions.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2590) DataSetUtils.zipWithUniqueID creates duplicate IDs

2015-08-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2590?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14723446#comment-14723446
 ] 

ASF GitHub Bot commented on FLINK-2590:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1075#issuecomment-136382814
  
@s1ck Good idea. You can also call `collect()`, add the IDs to a set and 
make sure the set has the right cardinality. In general, avoiding temp files 
and Strings for comparison is a good idea.


> DataSetUtils.zipWithUniqueID creates duplicate IDs
> --
>
> Key: FLINK-2590
> URL: https://issues.apache.org/jira/browse/FLINK-2590
> Project: Flink
>  Issue Type: Bug
>  Components: Java API, Scala API
>Affects Versions: 0.10, master
>Reporter: Martin Junghanns
>Assignee: Martin Junghanns
>Priority: Minor
>
> The function creates IDs using the following code:
> {code:java}
> shifter = log2(numberOfParallelSubtasks)
> id = counter << shifter + taskId;
> {code}
> As the binary function + is executed before the bitshift <<, this results in 
> cases where different tasks create the same ID. It essentially calculates
> {code}
> counter*2^(shifter+taskId)
> {code}
> which is 0 for counter = 0 and all values of shifter and taskID.
> Consider the following example.
> numberOfParallelSubtaks = 8 
> shifter = log2(8) = 4 (maybe rename the function?)
> produces:
> {code}
> start: 1, shifter: 4 taskId: 4 label: 256
> start: 2, shifter: 4 taskId: 3 label: 256
> start: 4, shifter: 4 taskId: 2 label: 256
> {code}
> I would suggest the following:
> {code}
> counter*2^(shifter)+taskId
> {code}
> which in code is equivalent to
> {code}
> shifter = log2(numberOfParallelSubtasks);
> id = (counter << shifter) + taskId;
> {code}
> and for our example produces:
> {code}
> start: 1, shifter: 4 taskId: 4 label: 20
> start: 2, shifter: 4 taskId: 3 label: 35
> start: 4, shifter: 4 taskId: 2 label: 66
> {code}
> So we move the counter to the left and add the task id. As there is space for 
> 2^shifter numbers, this prevents collisions.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2590) DataSetUtils.zipWithUniqueID creates duplicate IDs

2015-08-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2590?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14723339#comment-14723339
 ] 

ASF GitHub Bot commented on FLINK-2590:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1075#issuecomment-136352327
  
+1 for a test, otherwise this looks good!


> DataSetUtils.zipWithUniqueID creates duplicate IDs
> --
>
> Key: FLINK-2590
> URL: https://issues.apache.org/jira/browse/FLINK-2590
> Project: Flink
>  Issue Type: Bug
>  Components: Java API, Scala API
>Affects Versions: 0.10, master
>Reporter: Martin Junghanns
>Assignee: Martin Junghanns
>Priority: Minor
>
> The function creates IDs using the following code:
> {code:java}
> shifter = log2(numberOfParallelSubtasks)
> id = counter << shifter + taskId;
> {code}
> As the binary function + is executed before the bitshift <<, this results in 
> cases where different tasks create the same ID. It essentially calculates
> {code}
> counter*2^(shifter+taskId)
> {code}
> which is 0 for counter = 0 and all values of shifter and taskID.
> Consider the following example.
> numberOfParallelSubtaks = 8 
> shifter = log2(8) = 4 (maybe rename the function?)
> produces:
> {code}
> start: 1, shifter: 4 taskId: 4 label: 256
> start: 2, shifter: 4 taskId: 3 label: 256
> start: 4, shifter: 4 taskId: 2 label: 256
> {code}
> I would suggest the following:
> {code}
> counter*2^(shifter)+taskId
> {code}
> which in code is equivalent to
> {code}
> shifter = log2(numberOfParallelSubtasks);
> id = (counter << shifter) + taskId;
> {code}
> and for our example produces:
> {code}
> start: 1, shifter: 4 taskId: 4 label: 20
> start: 2, shifter: 4 taskId: 3 label: 35
> start: 4, shifter: 4 taskId: 2 label: 66
> {code}
> So we move the counter to the left and add the task id. As there is space for 
> 2^shifter numbers, this prevents collisions.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2590) DataSetUtils.zipWithUniqueID creates duplicate IDs

2015-08-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2590?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14723434#comment-14723434
 ] 

ASF GitHub Bot commented on FLINK-2590:
---

Github user s1ck commented on the pull request:

https://github.com/apache/flink/pull/1075#issuecomment-136380943
  
There is already a test case for zipWithUniqueId() in 
https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/apache/flink/test/util/DataSetUtilsITCase.java#L66
However, this test is under the assumption that there is only one task 
running, which is why it did not fail in the first place.
If there are multiple tasks, the resulting unique id is not deterministic 
for a single dataset element. I would implement a test, that creates a dataset, 
applies the `zipWithUniqueId` method, calls `distinct(0)` on the created ids 
and checks the number of resulting elements (must be equal to the input 
dataset). Would this be sufficient?
Furthermore, the current test cases for `DataSetUtils` assume a resulting 
dataset as string and check this after each test run. My proposed test would 
not fit in that scheme. Should I create a new test case class for this method?

@StephanEwen I wanted to do this, but static doesn't work with anonymous 
classes. However, I can declare the UDF as a private inner class (didn't want 
to change much code).
@HuangWHWHW the `log2` method already existed and in the issue, I proposed 
to rename it. Maybe `getBitSize(long value)`? As for the "proof": if each task 
id is smaller than the total number of parallel tasks t, its bit representation 
is also smaller than the bit representation of t. Thus, when we shift the 
counter by the number of bits of t, there cannot be a collision for different 
task ids


> DataSetUtils.zipWithUniqueID creates duplicate IDs
> --
>
> Key: FLINK-2590
> URL: https://issues.apache.org/jira/browse/FLINK-2590
> Project: Flink
>  Issue Type: Bug
>  Components: Java API, Scala API
>Affects Versions: 0.10, master
>Reporter: Martin Junghanns
>Assignee: Martin Junghanns
>Priority: Minor
>
> The function creates IDs using the following code:
> {code:java}
> shifter = log2(numberOfParallelSubtasks)
> id = counter << shifter + taskId;
> {code}
> As the binary function + is executed before the bitshift <<, this results in 
> cases where different tasks create the same ID. It essentially calculates
> {code}
> counter*2^(shifter+taskId)
> {code}
> which is 0 for counter = 0 and all values of shifter and taskID.
> Consider the following example.
> numberOfParallelSubtaks = 8 
> shifter = log2(8) = 4 (maybe rename the function?)
> produces:
> {code}
> start: 1, shifter: 4 taskId: 4 label: 256
> start: 2, shifter: 4 taskId: 3 label: 256
> start: 4, shifter: 4 taskId: 2 label: 256
> {code}
> I would suggest the following:
> {code}
> counter*2^(shifter)+taskId
> {code}
> which in code is equivalent to
> {code}
> shifter = log2(numberOfParallelSubtasks);
> id = (counter << shifter) + taskId;
> {code}
> and for our example produces:
> {code}
> start: 1, shifter: 4 taskId: 4 label: 20
> start: 2, shifter: 4 taskId: 3 label: 35
> start: 4, shifter: 4 taskId: 2 label: 66
> {code}
> So we move the counter to the left and add the task id. As there is space for 
> 2^shifter numbers, this prevents collisions.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2590) DataSetUtils.zipWithUniqueID creates duplicate IDs

2015-08-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2590?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14723462#comment-14723462
 ] 

ASF GitHub Bot commented on FLINK-2590:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1075#issuecomment-136386406
  
@s1ck, the `testZipWithUniqueId` test is bogus. You can remove this test 
case an replace it with your described test. It would also be great if you 
could set the parallelism of `testZipWithIndex` to something greater than `1`. 
Here it would also make sense to use `collect` instead of writing to disk.

+1 for renaming `log2` into `getBitSize(long value)`. When you rename the 
method, could you also change the line `shifter = 
getBitSize(getRuntimeContext().getNumberOfParallelSubtasks())` into `shifter = 
getBitSize(getRuntimeContext().getNumberOfParallelSubtasks() - 1)`. That way, 
we would also get the right unique ids in case of `parallelism = 1`.


> DataSetUtils.zipWithUniqueID creates duplicate IDs
> --
>
> Key: FLINK-2590
> URL: https://issues.apache.org/jira/browse/FLINK-2590
> Project: Flink
>  Issue Type: Bug
>  Components: Java API, Scala API
>Affects Versions: 0.10, master
>Reporter: Martin Junghanns
>Assignee: Martin Junghanns
>Priority: Minor
>
> The function creates IDs using the following code:
> {code:java}
> shifter = log2(numberOfParallelSubtasks)
> id = counter << shifter + taskId;
> {code}
> As the binary function + is executed before the bitshift <<, this results in 
> cases where different tasks create the same ID. It essentially calculates
> {code}
> counter*2^(shifter+taskId)
> {code}
> which is 0 for counter = 0 and all values of shifter and taskID.
> Consider the following example.
> numberOfParallelSubtaks = 8 
> shifter = log2(8) = 4 (maybe rename the function?)
> produces:
> {code}
> start: 1, shifter: 4 taskId: 4 label: 256
> start: 2, shifter: 4 taskId: 3 label: 256
> start: 4, shifter: 4 taskId: 2 label: 256
> {code}
> I would suggest the following:
> {code}
> counter*2^(shifter)+taskId
> {code}
> which in code is equivalent to
> {code}
> shifter = log2(numberOfParallelSubtasks);
> id = (counter << shifter) + taskId;
> {code}
> and for our example produces:
> {code}
> start: 1, shifter: 4 taskId: 4 label: 20
> start: 2, shifter: 4 taskId: 3 label: 35
> start: 4, shifter: 4 taskId: 2 label: 66
> {code}
> So we move the counter to the left and add the task id. As there is space for 
> 2^shifter numbers, this prevents collisions.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2590) DataSetUtils.zipWithUniqueID creates duplicate IDs

2015-08-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2590?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14723335#comment-14723335
 ] 

ASF GitHub Bot commented on FLINK-2590:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1075#discussion_r38305177
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/utils/DataSetUtils.java ---
@@ -121,6 +122,7 @@ public void mapPartition(Iterable values, 
Collector> out) thr
 
return input.mapPartition(new RichMapPartitionFunction>() {
 
+   long maxLength = log2(Long.MAX_VALUE);
--- End diff --

You can make this `static final`


> DataSetUtils.zipWithUniqueID creates duplicate IDs
> --
>
> Key: FLINK-2590
> URL: https://issues.apache.org/jira/browse/FLINK-2590
> Project: Flink
>  Issue Type: Bug
>  Components: Java API, Scala API
>Affects Versions: 0.10, master
>Reporter: Martin Junghanns
>Assignee: Martin Junghanns
>Priority: Minor
>
> The function creates IDs using the following code:
> {code:java}
> shifter = log2(numberOfParallelSubtasks)
> id = counter << shifter + taskId;
> {code}
> As the binary function + is executed before the bitshift <<, this results in 
> cases where different tasks create the same ID. It essentially calculates
> {code}
> counter*2^(shifter+taskId)
> {code}
> which is 0 for counter = 0 and all values of shifter and taskID.
> Consider the following example.
> numberOfParallelSubtaks = 8 
> shifter = log2(8) = 4 (maybe rename the function?)
> produces:
> {code}
> start: 1, shifter: 4 taskId: 4 label: 256
> start: 2, shifter: 4 taskId: 3 label: 256
> start: 4, shifter: 4 taskId: 2 label: 256
> {code}
> I would suggest the following:
> {code}
> counter*2^(shifter)+taskId
> {code}
> which in code is equivalent to
> {code}
> shifter = log2(numberOfParallelSubtasks);
> id = (counter << shifter) + taskId;
> {code}
> and for our example produces:
> {code}
> start: 1, shifter: 4 taskId: 4 label: 20
> start: 2, shifter: 4 taskId: 3 label: 35
> start: 4, shifter: 4 taskId: 2 label: 66
> {code}
> So we move the counter to the left and add the task id. As there is space for 
> 2^shifter numbers, this prevents collisions.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2590) DataSetUtils.zipWithUniqueID creates duplicate IDs

2015-08-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2590?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14721489#comment-14721489
 ] 

ASF GitHub Bot commented on FLINK-2590:
---

Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1075#issuecomment-136129581
  
Thanks a lot for the contribution.
Can you add a test case for the method to make sure the issue is not 
re-introduced again when somebody else is changing the code?


 DataSetUtils.zipWithUniqueID creates duplicate IDs
 --

 Key: FLINK-2590
 URL: https://issues.apache.org/jira/browse/FLINK-2590
 Project: Flink
  Issue Type: Bug
  Components: Java API, Scala API
Affects Versions: 0.10, master
Reporter: Martin Junghanns
Assignee: Martin Junghanns
Priority: Minor

 The function creates IDs using the following code:
 {code:java}
 shifter = log2(numberOfParallelSubtasks)
 id = counter  shifter + taskId;
 {code}
 As the binary function + is executed before the bitshift , this results in 
 cases where different tasks create the same ID. It essentially calculates
 {code}
 counter*2^(shifter+taskId)
 {code}
 which is 0 for counter = 0 and all values of shifter and taskID.
 Consider the following example.
 numberOfParallelSubtaks = 8 
 shifter = log2(8) = 4 (maybe rename the function?)
 produces:
 {code}
 start: 1, shifter: 4 taskId: 4 label: 256
 start: 2, shifter: 4 taskId: 3 label: 256
 start: 4, shifter: 4 taskId: 2 label: 256
 {code}
 I would suggest the following:
 {code}
 counter*2^(shifter)+taskId
 {code}
 which in code is equivalent to
 {code}
 shifter = log2(numberOfParallelSubtasks);
 id = (counter  shifter) + taskId;
 {code}
 and for our example produces:
 {code}
 start: 1, shifter: 4 taskId: 4 label: 20
 start: 2, shifter: 4 taskId: 3 label: 35
 start: 4, shifter: 4 taskId: 2 label: 66
 {code}
 So we move the counter to the left and add the task id. As there is space for 
 2^shifter numbers, this prevents collisions.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2590) DataSetUtils.zipWithUniqueID creates duplicate IDs

2015-08-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2590?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14721935#comment-14721935
 ] 

ASF GitHub Bot commented on FLINK-2590:
---

Github user HuangWHWHW commented on the pull request:

https://github.com/apache/flink/pull/1075#issuecomment-136245681
  
@rmetzger  +1. I think add a test is helpful.
Otherwise can you give us a infomation that prove the 'id = (counter  
shifter) + taskId; ' will never generate the same id in different task?
And a minor thing in you issue description:
Is log2(8)=3 not 4?


 DataSetUtils.zipWithUniqueID creates duplicate IDs
 --

 Key: FLINK-2590
 URL: https://issues.apache.org/jira/browse/FLINK-2590
 Project: Flink
  Issue Type: Bug
  Components: Java API, Scala API
Affects Versions: 0.10, master
Reporter: Martin Junghanns
Assignee: Martin Junghanns
Priority: Minor

 The function creates IDs using the following code:
 {code:java}
 shifter = log2(numberOfParallelSubtasks)
 id = counter  shifter + taskId;
 {code}
 As the binary function + is executed before the bitshift , this results in 
 cases where different tasks create the same ID. It essentially calculates
 {code}
 counter*2^(shifter+taskId)
 {code}
 which is 0 for counter = 0 and all values of shifter and taskID.
 Consider the following example.
 numberOfParallelSubtaks = 8 
 shifter = log2(8) = 4 (maybe rename the function?)
 produces:
 {code}
 start: 1, shifter: 4 taskId: 4 label: 256
 start: 2, shifter: 4 taskId: 3 label: 256
 start: 4, shifter: 4 taskId: 2 label: 256
 {code}
 I would suggest the following:
 {code}
 counter*2^(shifter)+taskId
 {code}
 which in code is equivalent to
 {code}
 shifter = log2(numberOfParallelSubtasks);
 id = (counter  shifter) + taskId;
 {code}
 and for our example produces:
 {code}
 start: 1, shifter: 4 taskId: 4 label: 20
 start: 2, shifter: 4 taskId: 3 label: 35
 start: 4, shifter: 4 taskId: 2 label: 66
 {code}
 So we move the counter to the left and add the task id. As there is space for 
 2^shifter numbers, this prevents collisions.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2590) DataSetUtils.zipWithUniqueID creates duplicate IDs

2015-08-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2590?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14721272#comment-14721272
 ] 

ASF GitHub Bot commented on FLINK-2590:
---

GitHub user s1ck opened a pull request:

https://github.com/apache/flink/pull/1075

[FLINK-2590] fixing DataSetUtils.zipWithUniqueId()

* modified algorithm as explained in the issue
* updated method documentation

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/s1ck/flink FLINK-2590

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1075.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1075


commit ab362b5b5ae390449972cc03f398d75c0231cb3c
Author: Martin Junghanns martin.jungha...@gmx.net
Date:   2015-08-29T20:51:19Z

[FLINK-2590] fixing DataSetUtils.zipWithUniqueId()

* modified algorithm as explained in the issue
* updated method documentation




 DataSetUtils.zipWithUniqueID creates duplicate IDs
 --

 Key: FLINK-2590
 URL: https://issues.apache.org/jira/browse/FLINK-2590
 Project: Flink
  Issue Type: Bug
  Components: Java API, Scala API
Affects Versions: 0.10, master
Reporter: Martin Junghanns
Assignee: Martin Junghanns
Priority: Minor

 The function creates IDs using the following code:
 {code:java}
 shifter = log2(numberOfParallelSubtasks)
 id = counter  shifter + taskId;
 {code}
 As the binary function + is executed before the bitshift , this results in 
 cases where different tasks create the same ID. It essentially calculates
 {code}
 counter*2^(shifter+taskId)
 {code}
 which is 0 for counter = 0 and all values of shifter and taskID.
 Consider the following example.
 numberOfParallelSubtaks = 8 
 shifter = log2(8) = 4 (maybe rename the function?)
 produces:
 {code}
 start: 1, shifter: 4 taskId: 4 label: 256
 start: 2, shifter: 4 taskId: 3 label: 256
 start: 4, shifter: 4 taskId: 2 label: 256
 {code}
 I would suggest the following:
 {code}
 counter*2^(shifter)+taskId
 {code}
 which in code is equivalent to
 {code}
 shifter = log2(numberOfParallelSubtasks);
 id = (counter  shifter) + taskId;
 {code}
 and for our example produces:
 {code}
 start: 1, shifter: 4 taskId: 4 label: 20
 start: 2, shifter: 4 taskId: 3 label: 35
 start: 4, shifter: 4 taskId: 2 label: 66
 {code}
 So we move the counter to the left and add the task id. As there is space for 
 2^shifter numbers, this prevents collisions.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2590) DataSetUtils.zipWithUniqueID creates duplicate IDs

2015-08-28 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2590?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14718249#comment-14718249
 ] 

Till Rohrmann commented on FLINK-2590:
--

Great catch [~mju] :-) You're solution is exactly how it should have been 
implemented in the first place. Would be great if you opened a pull request for 
it.

 DataSetUtils.zipWithUniqueID creates duplicate IDs
 --

 Key: FLINK-2590
 URL: https://issues.apache.org/jira/browse/FLINK-2590
 Project: Flink
  Issue Type: Bug
  Components: Java API, Scala API
Affects Versions: 0.10, master
Reporter: Martin Junghanns
Assignee: Martin Junghanns
Priority: Minor

 The function creates IDs using the following code:
 {code:java}
 shifter = log2(numberOfParallelSubtasks)
 id = counter  shifter + taskId;
 {code}
 As the binary function + is executed before the bitshift , this results in 
 cases where different tasks create the same ID. It essentially calculates
 {code}
 counter*2^(shifter+taskId)
 {code}
 which is 0 for counter = 0 and all values of shifter and taskID.
 Consider the following example.
 numberOfParallelSubtaks = 8 
 shifter = log2(8) = 4 (maybe rename the function?)
 produces:
 {code}
 start: 1, shifter: 4 taskId: 4 label: 256
 start: 2, shifter: 4 taskId: 3 label: 256
 start: 4, shifter: 4 taskId: 2 label: 256
 {code}
 I would suggest the following:
 {code}
 counter*2^(shifter)+taskId
 {code}
 which in code is equivalent to
 {code}
 shifter = log2(numberOfParallelSubtasks);
 id = (counter  shifter) + taskId;
 {code}
 and for our example produces:
 {code}
 start: 1, shifter: 4 taskId: 4 label: 20
 start: 2, shifter: 4 taskId: 3 label: 35
 start: 4, shifter: 4 taskId: 2 label: 66
 {code}
 So we move the counter to the left and add the task id. As there is space for 
 2^shifter numbers, this prevents collisions.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)