[jira] [Commented] (FLINK-2549) Add topK operator for DataSet

2021-04-22 Thread Flink Jira Bot (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-2549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17329570#comment-17329570
 ] 

Flink Jira Bot commented on FLINK-2549:
---

This issue is assigned but has not received an update in 7 days so it has been 
labeled "stale-assigned". If you are still working on the issue, please give an 
update and remove the label. If you are no longer working on the issue, please 
unassign so someone else may work on it. In 7 days the issue will be 
automatically unassigned.

> Add topK operator for DataSet
> -
>
> Key: FLINK-2549
> URL: https://issues.apache.org/jira/browse/FLINK-2549
> Project: Flink
>  Issue Type: New Feature
>  Components: API / DataSet
>Reporter: Chengxiang Li
>Assignee: Chengxiang Li
>Priority: Minor
>  Labels: pull-request-available, stale-assigned, stale-minor
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> topK is a common operation for user, it would be great to have it in Flink. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-2549) Add topK operator for DataSet

2021-04-14 Thread Flink Jira Bot (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-2549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17321754#comment-17321754
 ] 

Flink Jira Bot commented on FLINK-2549:
---

This issue and all of its Sub-Tasks have not been updated for 180 days. So, it 
has been labeled "stale-minor". If you are still affected by this bug or are 
still interested in this issue, please give an update and remove the label. In 
7 days the issue will be closed automatically.

> Add topK operator for DataSet
> -
>
> Key: FLINK-2549
> URL: https://issues.apache.org/jira/browse/FLINK-2549
> Project: Flink
>  Issue Type: New Feature
>  Components: API / DataSet
>Reporter: Chengxiang Li
>Assignee: Chengxiang Li
>Priority: Minor
>  Labels: pull-request-available, stale-minor
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> topK is a common operation for user, it would be great to have it in Flink. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-2549) Add topK operator for DataSet

2017-03-22 Thread Vladislav Pernin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15936398#comment-15936398
 ] 

Vladislav Pernin commented on FLINK-2549:
-

Is there a chance of seeing it merged within the next months or is this going 
to be abandoned ?

> Add topK operator for DataSet
> -
>
> Key: FLINK-2549
> URL: https://issues.apache.org/jira/browse/FLINK-2549
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataSet API
>Reporter: Chengxiang Li
>Assignee: Chengxiang Li
>Priority: Minor
>
> topK is a common operation for user, it would be great to have it in Flink. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-2549) Add topK operator for DataSet

2016-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15702049#comment-15702049
 ] 

ASF GitHub Bot commented on FLINK-2549:
---

GitHub user joseprupi opened a pull request:

https://github.com/apache/flink/pull/2885

Damping

This is the first commit of the bulk implementation for the Affinity 
Propagation algorithm. The algorithm calculations seem to be ok for the example 
in the pull request (it has been compared with the scikit implementation). 
There are some items pending and just doing the pull request to follow up the 
implementation and to review the graph.

The graph for the implmentation:


https://docs.google.com/drawings/d/1PC3S-6AEt2Gp_TGrSfiWzkTcL7vXhHSxvM6b9HglmtA/edit?usp=sharing

Still pending:

- Convergence
- Greg suggestions:

* Generic label type instead of Long.

* Line 37, the join of similarities and messages (availabilities), we can 
perform a combinable reduce for the top 2 scores (and top label) rather than 
fully sorting all scores. There is an outstanding ticket, FLINK-2549, to add a 
topK operator. I expect that with k = 2 a custom implementation would be faster 
than topK implemented with a min-heap.

* Enable object reuse, reuse objects, and use LongValue and DoubleValue 
rather than Long and Double. Rather than "return new ..." we can create the 
object as a class field and then update and return this object in the function.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/joseprupi/flink damping

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2885.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2885


commit 509dc2f6987cf01ef04a5da2935cf1c9a7b59fec
Author: Josep Rubio 
Date:   2016-11-13T14:21:14Z

Working intermediat calculations

commit 2dd3f761dd1bbd39e91b9b9ce4604c6741bbaa4e
Author: Josep Rubio 
Date:   2016-11-13T15:07:31Z

First propagation commit

commit 04df0d31f1907427c507bc688dca0edda7b045e4
Author: Josep Rubio 
Date:   2016-11-20T16:47:40Z

Implementing 1, wrong availability results

commit 4149bde05e4615c6fc88169f898e3699e43f172b
Author: Josep Rubio 
Date:   2016-11-26T16:39:55Z

Damping working

commit 8a7bee11fdd25a8e810d0ae1f93b132b5eda
Author: Josep Rubio 
Date:   2016-11-27T04:23:22Z

Damping working




> Add topK operator for DataSet
> -
>
> Key: FLINK-2549
> URL: https://issues.apache.org/jira/browse/FLINK-2549
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataSet API
>Reporter: Chengxiang Li
>Assignee: Chengxiang Li
>Priority: Minor
>
> topK is a common operation for user, it would be great to have it in Flink. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2549) Add topK operator for DataSet

2015-12-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15041701#comment-15041701
 ] 

ASF GitHub Bot commented on FLINK-2549:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1161#issuecomment-162011423
  
@ChengXiangLi Sorry for letting you wait, I have not forgotten about this 
pull request.

There are two things in your comment:

  1. Exposing Managed Memory to UDFs (this pull request), which is much 
more convenient than going through the implementation of a deeply integrated 
operator.

  2. Efficiency for APIs like the Table API. The Table API works on managed 
memory already, since it sits on top of Flinks join/sort/etc. What you are 
hinting at is to have a lower level interface where functions gets the memory 
segments, rather than the row objects, and directly works on the memory 
segments. That has been a long which of mine as well, but that involves having 
a separate type of functions that support working with memory segments. Plus 
more, to handle records that are too large to fit into individual segments.

I am still onto point (1). I aimed a bit too high with how I wanted to 
abstract that, but will continue.

I would love to see point (2) at some point. If you are eager in driving 
point (2), I'd be very happy. We should probably have a chat and get this 
designed, as it involves quite a few things (exposing other abstractions, 
spanning records, etc). 




> Add topK operator for DataSet
> -
>
> Key: FLINK-2549
> URL: https://issues.apache.org/jira/browse/FLINK-2549
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, Java API, Scala API
>Reporter: Chengxiang Li
>Assignee: Chengxiang Li
>Priority: Minor
>
> topK is a common operation for user, it would be great to have it in Flink. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2549) Add topK operator for DataSet

2015-12-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15037155#comment-15037155
 ] 

ASF GitHub Bot commented on FLINK-2549:
---

Github user ChengXiangLi commented on the pull request:

https://github.com/apache/flink/pull/1161#issuecomment-161501441
  
Hi, @StephanEwen , is there any progress of Managed Memory Allocation 
abstractions for UDF? Not only about TopK operator, i think it's also very 
important for users or DSLs to build more robust and efficient applications. 
For example, in Table API queries, as the data schema is predictable during 
each phase of processing, we does not need to create real `Row` object, just 
store the binary data in self managed memory, and use the offset to read `Row` 
fields. So all the intermediate data is store as binary on self managed memory, 
no need to create lots of `Row` object and its fields object anymore, which 
should be more robust, memory-efficient, and with better performance. 


> Add topK operator for DataSet
> -
>
> Key: FLINK-2549
> URL: https://issues.apache.org/jira/browse/FLINK-2549
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, Java API, Scala API
>Reporter: Chengxiang Li
>Assignee: Chengxiang Li
>Priority: Minor
>
> topK is a common operation for user, it would be great to have it in Flink. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2549) Add topK operator for DataSet

2015-09-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14904851#comment-14904851
 ] 

ASF GitHub Bot commented on FLINK-2549:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1161#discussion_r40230604
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java
 ---
@@ -447,11 +447,13 @@ public void writeToOutput(final 
ChannelWriterOutputView output, final int start,
num -= recordsPerSegment;
} else {
// partially filled segment
-   for (; num > 0; num--) {
+   for (; num > 0 && offset <= 
this.lastEntryOffset; num--, offset += this.recordSize) {
--- End diff --

Is this a bug in the current implementation? If yes, that is critical.

We should pull this into a dedicated fix and add a test in that case.


> Add topK operator for DataSet
> -
>
> Key: FLINK-2549
> URL: https://issues.apache.org/jira/browse/FLINK-2549
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, Java API, Scala API
>Reporter: Chengxiang Li
>Assignee: Chengxiang Li
>Priority: Minor
>
> topK is a common operation for user, it would be great to have it in Flink. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2549) Add topK operator for DataSet

2015-09-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14904869#comment-14904869
 ] 

ASF GitHub Bot commented on FLINK-2549:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1161#issuecomment-142672766
  
This looks super impressive and very well tested.

The way that the operator is integrated into the system needs some 
improvement, though. The problem is mainly how the managed memory is obtained.

The MemoryManager's memory is shared among all concurrently running tasks. 
This implementation takes up to half the total memory, which will cause 
programs to crash that have other memory consumers in the same pipeline. The 
tests here run, because the operator is executed in isolation, with no other 
memory consuming operators in the test program.

Memory consumers need to be known to the Optimizer (in the program 
generation) to compute what maximal fraction of memory a certain consumer may 
request. That value is part of the Task's configuration and used by the memory 
consumer to obtain the right maximum amount.

Integrating operators into the optimizer's planning is a bit tedious and 
not as easy as it could be (we did not get around to refactoring this so far, 
unfortunately). Maybe we can add some tooling that would mark a UDF as 
MemoryConsuming and would in that case expose a Memory Allocator that returns 
the right amount of memory.

What we could do is the following: I will try to get to refactoring some of 
the Managed Memory Allocation abstractions (we need this anyways for more 
components) and then expose a MemoryAllocator in the runtime context, which is 
accessible if a user-defined function has been annotated as a memory consumer.

This may take me two weeks (I am currently in the mids of working on the 
streaming windows), but if you don't mind letting this rest for some days, I 
think that is the cleanest approach.

The other parts of the code look good, so after I finish my part, it should 
be a simple rebase of the TopKMapPartition function and the TopKReducer, and 
then this is good to merge.

What do you think?


> Add topK operator for DataSet
> -
>
> Key: FLINK-2549
> URL: https://issues.apache.org/jira/browse/FLINK-2549
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, Java API, Scala API
>Reporter: Chengxiang Li
>Assignee: Chengxiang Li
>Priority: Minor
>
> topK is a common operation for user, it would be great to have it in Flink. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2549) Add topK operator for DataSet

2015-09-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14905686#comment-14905686
 ] 

ASF GitHub Bot commented on FLINK-2549:
---

Github user ChengXiangLi commented on a diff in the pull request:

https://github.com/apache/flink/pull/1161#discussion_r40280217
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java
 ---
@@ -447,11 +447,13 @@ public void writeToOutput(final 
ChannelWriterOutputView output, final int start,
num -= recordsPerSegment;
} else {
// partially filled segment
-   for (; num > 0; num--) {
+   for (; num > 0 && offset <= 
this.lastEntryOffset; num--, offset += this.recordSize) {
--- End diff --

Yes, it is. It only support to write data inside a `MemorySegment` 
previously, it's work well before because it only called with `num = 1`. I 
would create a separate JIRA for this.


> Add topK operator for DataSet
> -
>
> Key: FLINK-2549
> URL: https://issues.apache.org/jira/browse/FLINK-2549
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, Java API, Scala API
>Reporter: Chengxiang Li
>Assignee: Chengxiang Li
>Priority: Minor
>
> topK is a common operation for user, it would be great to have it in Flink. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2549) Add topK operator for DataSet

2015-09-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14905674#comment-14905674
 ] 

ASF GitHub Bot commented on FLINK-2549:
---

Github user ChengXiangLi commented on the pull request:

https://github.com/apache/flink/pull/1161#issuecomment-142782327
  
Thanks, @StephanEwen , I'm totally agree with you about the memory 
allocation. Actually i tried to understand the memory allocation logic and make 
UDF consumer part of it, but it turns out that's not easy and break many 
things, it's better to leave the refactor job to someone who totally understand 
it, like you. I would wait for you refactor work.
BTW: It take half of the total memory as upper boundary, but i guess it 
does not make any different :)


> Add topK operator for DataSet
> -
>
> Key: FLINK-2549
> URL: https://issues.apache.org/jira/browse/FLINK-2549
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, Java API, Scala API
>Reporter: Chengxiang Li
>Assignee: Chengxiang Li
>Priority: Minor
>
> topK is a common operation for user, it would be great to have it in Flink. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2549) Add topK operator for DataSet

2015-09-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14902262#comment-14902262
 ] 

ASF GitHub Bot commented on FLINK-2549:
---

GitHub user ChengXiangLi opened a pull request:

https://github.com/apache/flink/pull/1161

[FLINK-2549] add topK operator to DataSet.

The topK operator is implemented with `mapPartition()` followed by 
`reduceGroup()`, each map tasks select top k elements, and transfer to 
singleton reduce task(no `group()` before `reduceGroup()`), and then select top 
k elements in reduce task. 
The main part of this implementation:
1. An out-of-core self managed memory based PriorityQueue implementation.
2.  Use runtime resources to build PriorityQueue for UDF. topK is not real 
'native' operator actually, it contains UDF which use self managed memory based 
PriorityQueue to select top k elements.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ChengXiangLi/flink priorityqueue

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1161.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1161


commit 7165d5ba78376cec1b3c752afb949e6273d45869
Author: chengxiang li 
Date:   2015-09-16T02:10:05Z

[FLINK-2549] add topK operator to DataSet.




> Add topK operator for DataSet
> -
>
> Key: FLINK-2549
> URL: https://issues.apache.org/jira/browse/FLINK-2549
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, Java API, Scala API
>Reporter: Chengxiang Li
>Assignee: Chengxiang Li
>Priority: Minor
>
> topK is a common operation for user, it would be great to have it in Flink. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2549) Add topK operator for DataSet

2015-08-21 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14706744#comment-14706744
 ] 

Till Rohrmann commented on FLINK-2549:
--

Is required to implement sample operator which works on Flink's managed memory.

 Add topK operator for DataSet
 -

 Key: FLINK-2549
 URL: https://issues.apache.org/jira/browse/FLINK-2549
 Project: Flink
  Issue Type: New Feature
  Components: Core, Java API, Scala API
Reporter: Chengxiang Li
Assignee: Chengxiang Li
Priority: Minor

 topK is a common operation for user, it would be great to have it in Flink. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2549) Add topK operator for DataSet

2015-08-20 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14704565#comment-14704565
 ] 

Till Rohrmann commented on FLINK-2549:
--

I agree with [~StephanEwen]. Sorting the complete input with n elements has a 
complexity of O(n * log(n)) whereas keeping the k top most elements in a 
priority queue gives you in worst case O(n * log(k)). Assuming k  n, then 
this is worth the effort.

 Add topK operator for DataSet
 -

 Key: FLINK-2549
 URL: https://issues.apache.org/jira/browse/FLINK-2549
 Project: Flink
  Issue Type: New Feature
  Components: Core, Java API, Scala API
Reporter: Chengxiang Li
Assignee: Chengxiang Li
Priority: Minor

 topK is a common operation for user, it would be great to have it in Flink. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2549) Add topK operator for DataSet

2015-08-20 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14704437#comment-14704437
 ] 

Chesnay Schepler commented on FLINK-2549:
-

is the only difference between topK() and first() that the values are sorted? 
if so, why can't this be implemented in a similar fashion, like

{code}
//this is just the general idea
public GroupReduceOperatorT, T topK(int key) {
return this.sort(key).reduceGroup(new FirstReducerT(n));
}
{code}

 Add topK operator for DataSet
 -

 Key: FLINK-2549
 URL: https://issues.apache.org/jira/browse/FLINK-2549
 Project: Flink
  Issue Type: New Feature
  Components: Core, Java API, Scala API
Reporter: Chengxiang Li
Assignee: Chengxiang Li
Priority: Minor

 topK is a common operation for user, it would be great to have it in Flink. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2549) Add topK operator for DataSet

2015-08-20 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14704498#comment-14704498
 ] 

Stephan Ewen commented on FLINK-2549:
-

You can implement topK on top if sort()/first().

It will be much less efficient then it could be, though. In that strategy, you 
need to sort the whole input, which is computationally more intensive and may 
need to spill to disk for large data.

Using a heap, you can simply always keep the lowest k elements. That way, you 
avoid the sort operations for most elements (that can be immediately discarded) 
and require little memory (only for k elements), most likely never spilling.

 Add topK operator for DataSet
 -

 Key: FLINK-2549
 URL: https://issues.apache.org/jira/browse/FLINK-2549
 Project: Flink
  Issue Type: New Feature
  Components: Core, Java API, Scala API
Reporter: Chengxiang Li
Assignee: Chengxiang Li
Priority: Minor

 topK is a common operation for user, it would be great to have it in Flink. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2549) Add topK operator for DataSet

2015-08-19 Thread Chengxiang Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14704244#comment-14704244
 ] 

Chengxiang Li commented on FLINK-2549:
--

The basic idea of implementation is as following:
# In map stage, sort and pick top K elements in each partition.
# A single reduce task handle all map output, sort and pick top K elements as 
the final result.

To fully manage the memory used for this operator, we may need a customized 
PriorityQueue which is built upon MemoryManager of Flink to sort unpredictable 
size elements with fixed size memory, as discussed at 
[here|https://github.com/apache/flink/pull/949#issuecomment-132692640].

 Add topK operator for DataSet
 -

 Key: FLINK-2549
 URL: https://issues.apache.org/jira/browse/FLINK-2549
 Project: Flink
  Issue Type: New Feature
  Components: Core, Java API, Scala API
Reporter: Chengxiang Li
Assignee: Chengxiang Li
Priority: Minor

 topK is a common operation for user, it would be great to have it in Flink. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)