[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...

2016-02-23 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1517#issuecomment-187626905
  
I created a few sub-issues for the rather generic issue FLINK-2237. 
FLINK-3477 reflects the changes of this PR. Can you please update the 
commit message to FLINK-3477. Thanks, Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...

2016-02-22 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1517#issuecomment-187439412
  
Thanks @aalexandrov. I added a few comments to the document.
Looking forward to the results :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...

2016-02-22 Thread aalexandrov
Github user aalexandrov commented on the pull request:

https://github.com/apache/flink/pull/1517#issuecomment-187243415
  
I've added a [Google 
Doc](https://docs.google.com/document/d/12yx7olVrkooceaQPoR1nkk468lIq0xOObY5ukWuNEcM/edit?usp=sharing)
 where we can collaborate on the design of the experiments.

Once we're fixed on that, we will proceed by implementing them. The code 
[will be available in the `flink-hashagg-experiments` 
repository](https://github.com/TU-Berlin-DIMA/flink-hashagg-experiments).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...

2016-02-19 Thread aalexandrov
Github user aalexandrov commented on the pull request:

https://github.com/apache/flink/pull/1517#issuecomment-186257996
  
@fhueske We will propose three experiments based on your suggestions in a 
Google Doc on Monday. Once we have fixed the setup we will prepare a Peel 
bundle an run them on one of the clusters in the lab. 

I would be also happy to promote Peel by leading a joint blog post for the 
Peel website together with @ggevay and you if you are interested. I think that 
the hash-table makes for a perfect use-case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...

2016-02-19 Thread ggevay
Github user ggevay commented on the pull request:

https://github.com/apache/flink/pull/1517#issuecomment-186244959
  
@fhueske:

So far, I have only run benchmarks on my work laptop and home laptop.

I have used the newly created `ReducePerformance`, with varying the 
`(number of elements / number of different keys)` ratio, and also the memory 
usage (between 250 MB - 6 GB). The hash-based strategy was always faster, up to 
3 times, and mostly by a factor of 1.5 - 2.5.

I have also used `ReduceHashTableTest.testWithIntPair`. To make this a 
benchmark test, the lines that deal with the `reference` hash table for 
checking the correct output should be commented out, and the time measurement 
commented in. (This only tests the hash table, not end-to-end; and it can't be 
directly compared with the sort based reduce.)

I have also used `HashTablePerformanceComparison` to compare with the other 
hash tables. This has the disadvantage that the workload here is not 
characteristic of a reduce (eg. no processRecordWithReduce calls, because the 
other hash tables don't have that).

I have also benchmarked using my game theory stuff (which originally 
prompted me to start working on this), and the hash-based combiner was faster 
here as well.

> Did you check the combine rate (input / output ratio) compared to the 
sort-based strategy?

I didn't measure it directly, but `ReducePerformance` shows the difference 
nicely when the number of input elements is large enough that the sort-based 
strategy has to emit often, but the number of different keys is small enough 
that the hash-based strategy can fit all the keys in memory. In this case, the 
speedup of the final reduce phase is striking.

> Have you checked heap memory consumption / GC activity compared to the 
sort-based strategy?

I haven't actually checked it, but it should be the same or better as with 
the sort-based strategy:
- enabled object reuse: no allocations at all
- disabled object reuse: one allocation per input element, and one 
allocation per reduce step

The sort-based strategy might additionally have allocations inside maybe 
the sorter and/or MergeIterator.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...

2016-02-19 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1517#issuecomment-186247461
  
Great, thanks for the detailed response @ggevay!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...

2016-02-19 Thread ggevay
Github user ggevay commented on the pull request:

https://github.com/apache/flink/pull/1517#issuecomment-186226971
  
OK, thanks @aalexandrov ! I wanted to try Peel anyway, this is a good 
opportunity.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...

2016-02-19 Thread aalexandrov
Github user aalexandrov commented on the pull request:

https://github.com/apache/flink/pull/1517#issuecomment-186226154
  
@ggevay I would be interesting in helping you prepare a 
[`peel-flink-bundle`](http://peel-framework.org/getting-started.html) for the 
benchmarks @fhueske mentioned.

It will make for a perfect use-case for what Peel is intended and a nice 
first contribution to the [bundles 
repository](http://peel-framework.org/repository/bundles.html).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...

2016-02-19 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1517#issuecomment-186219363
  
OK, then lets keep the data in one partition for now. In case of var-length 
updates, this can default to a memory usage / combine behavior which is 
somewhat similar to the sort-based strategy: Filling the memory with records 
and emitting it (putting compaction aside).

I'll review the PR once more will run a few end-to-end benchmarks as well.
What kind of benchmarks have you done so far? 
- Did you check the combine rate (input / output ratio) compared to the 
sort-based strategy? 
- How much memory did you use for tests (upper bound)? Did you vary the 
memory?
- Have you checked heap memory consumption / GC activity compared to the 
sort-based strategy?

It might take a few more days before I actually get to this, but it is on 
my list.

Thanks,
Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...

2016-02-16 Thread ggevay
Github user ggevay commented on the pull request:

https://github.com/apache/flink/pull/1517#issuecomment-184696741
  
Hello Fabian,

I did 3. 
(https://github.com/apache/flink/commit/ef644821d5417b992cfda366225bd5622faa9b2b),
 because the machinery for that was already in place (see the condition in 
`compactOrThrow`). I chose the threshold to be 5%. (This can probably be the 
same with the solution set case, because if lengths change a lot then we get 
very slow as memory load gets near the total memory, so it is probably better 
to indicate the memory problem to the user with an exception than to silently 
be very slow.)

I also did some changes to the tests.

For 2., the situation doesn't seem straightforward to me. For example, if 
there are not many length changes, then exactly the opposite should be done: we 
should emit from the end of the record area (rather than the beginning), 
because if there is skew in the data, then the more common keys will appear 
sooner, so they tend to appear near the beginning of the record area.

The other ideas are also interesting, and I would love to experiment with 
them, but unfortunately I don't really have that much time for this at the 
moment. So I would suggest to merge the non-partitioned version, and then the 
partitioned version can be implemented later when I or someone else has a lot 
of free time on their hands.

(Btw., it would be very interesting to try machine learning techniques for 
dynamically making these decisions that involve complicated trade-offs, based 
on the actual data:
- Have some switches which control these things like
  - what part of the record area to emit (begin or end; how much)
  - at how much fragmentation should we do compacting instead of emitting
  - what load factor should trigger a resize
  - size of bucket area
  - how to choose which partition to emit
  - maybe even do spilling also in the combiner
  - whether to insert prefetch instructions for the random memory accesses 
that will probably involve a CPU cache miss (the trade-off here is that then 
you have to work with multiple consecutive input records at the same time, so 
you have to do extra copies if object reuse is enabled, which might cost a lot) 
(I have actually experimented with this a little, and there were 20-35% 
speedups, if copies are cheap)
  - ... (it's easy to come up with many more)
- Gather some statistics about what is happening, and turn them into 
features
  - avg. record size
  - #keys / #elements ratio
  - skew
  - time it takes to serialize a record
  - time it takes to run the ReduceFunction
  - ratio of updates that involve size changes
  - size is changing up or down on average
  - backpressure
- that we are generating
- that we get from our outputs (if this is large (eg. because of a 
saturated network), then we should set the switches to do more aggressive 
combining)
  - how many CPU cache misses occur while looking up keys (eg. for 
recognizing the situation where records with matching keys are often close to 
each other for some reason)
  - hash collisions (so that we can start with a more simple hash function 
(few percent speedup), and change it, if it is bad)
  - ... (it's easy to come up with many more)
- Train some machine learning model which will figure out how to set the 
switches based on the features

I think a pretty good speedup could result from tuning all these things to 
the actual data at hand.
Maybe in a few years, when data flow systems get more mature, then this can 
become a reality.)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...

2016-02-15 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1517#issuecomment-184167321
  
Hi Gabor,

thanks for the updates and additions you made to the PR. If didn't go 
through them yet, but would like to follow up on the discussion and propose a 
few potential improvements.

I agree that using multiple partitions would make the implementation more 
complex and the reasoning about the behavior less straightforward. The 
motivation for partitions is twofold. First, it should helps to maintain a 
continuous data flow without backpressure effects. Second, it should improve 
the effectiveness of the combiner by avoiding to evict hot elements which are 
frequently updated which would happen for each full eviction of the table. 
Emitting the partition with the least segments wouldn't be a good strategy for 
the reasons you mentioned. However, a strategy could be to remove the partition 
with the smallest updates / keys ratio. However, this would require more 
thoughts…

Independent of the partitions discussion, I have a few ideas to make the 
compaction more effective. However, these would break the compatibility with 
the AbstractMutableHashTable interface. Maybe we can provide a common base 
class and special implementations for the SolutionSet case and the Combiner 
case?

1. Remember a pointer to the hole which is closest to the start of the 
RecordArea (or partition).
2. When compacting, emit all elements which are before the first hole. 
These elements are "cold" and have not been updated since the last compaction 
(assuming non-in-place updates, but in-place updates should not be compacted 
anyways). Compact all elements after the first hole as usual. The benefit of 
this is to free more segments by compaction which delays (or even completely 
avoids) full eviction, and thus improving the compaction rate for hot elements.
3. Do only compact if more than (for instance) 50% of space can be freed 
and do a full eviction otherwise. If there are too few holes in the table, most 
elements have not been touched (in case of non-in-place updates). Hence, we 
should just emit everything. 

We can also think about adding a by-pass switch if the HT observes that the 
combiner does not help to significantly reduce the data (too many unique keys 
for not enough memory) but only adds additional overhead. For instance if the 
number of records is not reduced by 2x, we could simply forward the records to 
the collector (In case of partitions, this decision could be done on partition 
level).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...

2016-02-14 Thread ggevay
Github user ggevay commented on the pull request:

https://github.com/apache/flink/pull/1517#issuecomment-183896027
  
I've added some more documentation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...

2016-02-11 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r52608138
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java
 ---
@@ -0,0 +1,1014 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.hash;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.SameTypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.RandomAccessInputView;
+import org.apache.flink.runtime.memory.AbstractPagedOutputView;
+import org.apache.flink.runtime.util.MathUtils;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * This hash table supports updating elements, and it also has 
processRecordWithReduce,
+ * which makes one reduce step with the given record.
+ *
+ * The memory is divided into three areas:
+ *  - Bucket area: they contain bucket heads:
+ *an 8 byte pointer to the first link of a linked list in the record 
area
+ *  - Record area: this contains the actual data in linked list elements. 
A linked list element starts
+ *with an 8 byte pointer to the next element, and then the record 
follows.
+ *  - Staging area: This is a small, temporary storage area for writing 
updated records. This is needed,
+ *because before serializing a record, there is no way to know in 
advance how large will it be.
+ *Therefore, we can't serialize directly into the record area when we 
are doing an update, because
+ *if it turns out to be larger then the old record, then it would 
override some other record
+ *that happens to be after the old one in memory. The solution is to 
serialize to the staging area first,
+ *and then copy it to the place of the original if it has the same 
size, otherwise allocate a new linked
+ *list element at the end of the record area, and mark the old one as 
abandoned. This creates "holes" in
+ *the record area, so compactions are eventually needed.
+ *
+ *  Compaction happens by deleting everything in the bucket area, and then 
reinserting all elements.
+ *  The reinsertion happens by forgetting the structure (the linked lists) 
of the record area, and reading it
+ *  sequentially, and inserting all non-abandoned records, starting from 
the beginning of the record area.
+ *  Note, that insertions never override a record that have not been read 
by the reinsertion sweep, because
+ *  both the insertions and readings happen sequentially in the record 
area, and the insertions obviously
+ *  never overtake the reading sweep.
+ *
+ *  Note: we have to abandon the old linked list element even when the 
updated record has a smaller size
+ *  than the original, because otherwise we wouldn't know where the next 
record starts during a reinsertion
+ *  sweep.
+ *
+ *  The number of buckets depends on how large are the records. The 
serializer might be able to tell us this,
+ *  so in this case, we will calculate the number of buckets upfront, and 
won't do resizes.
+ *  If the serializer doesn't know the size, then we start with a small 
number of buckets, and do resizes as more
+ *  elements are inserted than the number of buckets.
+ *
+ *  The number of memory segments given to the staging area is usually 
one, because it just needs to hold
+ *  one record.

[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...

2016-02-11 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r52608113
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java
 ---
@@ -0,0 +1,1014 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.hash;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.SameTypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.RandomAccessInputView;
+import org.apache.flink.runtime.memory.AbstractPagedOutputView;
+import org.apache.flink.runtime.util.MathUtils;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * This hash table supports updating elements, and it also has 
processRecordWithReduce,
+ * which makes one reduce step with the given record.
+ *
+ * The memory is divided into three areas:
+ *  - Bucket area: they contain bucket heads:
+ *an 8 byte pointer to the first link of a linked list in the record 
area
+ *  - Record area: this contains the actual data in linked list elements. 
A linked list element starts
+ *with an 8 byte pointer to the next element, and then the record 
follows.
+ *  - Staging area: This is a small, temporary storage area for writing 
updated records. This is needed,
+ *because before serializing a record, there is no way to know in 
advance how large will it be.
+ *Therefore, we can't serialize directly into the record area when we 
are doing an update, because
+ *if it turns out to be larger then the old record, then it would 
override some other record
+ *that happens to be after the old one in memory. The solution is to 
serialize to the staging area first,
+ *and then copy it to the place of the original if it has the same 
size, otherwise allocate a new linked
+ *list element at the end of the record area, and mark the old one as 
abandoned. This creates "holes" in
+ *the record area, so compactions are eventually needed.
+ *
+ *  Compaction happens by deleting everything in the bucket area, and then 
reinserting all elements.
+ *  The reinsertion happens by forgetting the structure (the linked lists) 
of the record area, and reading it
+ *  sequentially, and inserting all non-abandoned records, starting from 
the beginning of the record area.
+ *  Note, that insertions never override a record that have not been read 
by the reinsertion sweep, because
+ *  both the insertions and readings happen sequentially in the record 
area, and the insertions obviously
+ *  never overtake the reading sweep.
+ *
+ *  Note: we have to abandon the old linked list element even when the 
updated record has a smaller size
+ *  than the original, because otherwise we wouldn't know where the next 
record starts during a reinsertion
+ *  sweep.
+ *
+ *  The number of buckets depends on how large are the records. The 
serializer might be able to tell us this,
+ *  so in this case, we will calculate the number of buckets upfront, and 
won't do resizes.
+ *  If the serializer doesn't know the size, then we start with a small 
number of buckets, and do resizes as more
+ *  elements are inserted than the number of buckets.
+ *
+ *  The number of memory segments given to the staging area is usually 
one, because it just needs to hold
+ *  one record.

[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...

2016-02-11 Thread ggevay
Github user ggevay commented on the pull request:

https://github.com/apache/flink/pull/1517#issuecomment-182872207
  
I'm thinking about partitioning, but again, I'm finding it too difficult to 
imagine how this algorithm behaves, when we emit/rebuild only some part of the 
table at a time. In the partitioned case, there seem to be a lot of potential 
for messing things up in complicated and hard to foresee ways.

For example, if I always emit the partition with the least number of 
segments, then what will actually happen is that I will kinda always emit the 
same partition (because after an emit, it starts from a "disadvantage" compared 
to the others), and the others just keep growing until they eat all the memory, 
so this degenerates into the situation where there isn't a single memory 
segment left for the smallest one and _bad things_ happen.

In contrast, an advantage of the current design is that its performance 
characteristics are not that hard to grok, and it is relatively straightforward 
to see how the sizes of the 3 memory regions (bucket segments, record segments, 
staging segments) relate to each other.

Btw. I think I figured out a simple way to do even the final reduce with 
the spilling without partitioning the memory: when memory is full I write 
everything to disk, and I partition only at this time, by writing to different 
files (the same files across multiple spillings, so eg. the second spilling 
appends to the same files as the first one created), and then when the input 
ends, I recurse on processing the files one by one (and I start a new level of 
partitioning).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...

2016-02-11 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r52605362
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/RandomAccessInputView.java
 ---
@@ -45,9 +45,12 @@ public RandomAccessInputView(ArrayList 
segments, int segmentSize)
{
this(segments, segmentSize, segmentSize);
}
-   
+
+   /**
+* Warning: setReadPosition has to be called before reading.
+*/
public RandomAccessInputView(ArrayList segments, int 
segmentSize, int limitInLastSegment) {
--- End diff --

I tried this, but it turned out that there is no nice way to do it, because 
the two constructors would have the same parameters, so I had to add a dummy 
parameter. (See 
https://github.com/apache/flink/commit/508ce22c3167dc6e6e2a4a6b9776b6c79e62698f)
 Do you think that this ugly solution is worth it, or should I revert to the 
original?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...

2016-02-11 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r52617426
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java
 ---
@@ -0,0 +1,1014 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.hash;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.SameTypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.RandomAccessInputView;
+import org.apache.flink.runtime.memory.AbstractPagedOutputView;
+import org.apache.flink.runtime.util.MathUtils;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * This hash table supports updating elements, and it also has 
processRecordWithReduce,
+ * which makes one reduce step with the given record.
+ *
+ * The memory is divided into three areas:
+ *  - Bucket area: they contain bucket heads:
+ *an 8 byte pointer to the first link of a linked list in the record 
area
+ *  - Record area: this contains the actual data in linked list elements. 
A linked list element starts
+ *with an 8 byte pointer to the next element, and then the record 
follows.
+ *  - Staging area: This is a small, temporary storage area for writing 
updated records. This is needed,
+ *because before serializing a record, there is no way to know in 
advance how large will it be.
+ *Therefore, we can't serialize directly into the record area when we 
are doing an update, because
+ *if it turns out to be larger then the old record, then it would 
override some other record
+ *that happens to be after the old one in memory. The solution is to 
serialize to the staging area first,
+ *and then copy it to the place of the original if it has the same 
size, otherwise allocate a new linked
+ *list element at the end of the record area, and mark the old one as 
abandoned. This creates "holes" in
+ *the record area, so compactions are eventually needed.
+ *
+ *  Compaction happens by deleting everything in the bucket area, and then 
reinserting all elements.
+ *  The reinsertion happens by forgetting the structure (the linked lists) 
of the record area, and reading it
+ *  sequentially, and inserting all non-abandoned records, starting from 
the beginning of the record area.
+ *  Note, that insertions never override a record that have not been read 
by the reinsertion sweep, because
+ *  both the insertions and readings happen sequentially in the record 
area, and the insertions obviously
+ *  never overtake the reading sweep.
+ *
+ *  Note: we have to abandon the old linked list element even when the 
updated record has a smaller size
+ *  than the original, because otherwise we wouldn't know where the next 
record starts during a reinsertion
+ *  sweep.
+ *
+ *  The number of buckets depends on how large are the records. The 
serializer might be able to tell us this,
+ *  so in this case, we will calculate the number of buckets upfront, and 
won't do resizes.
+ *  If the serializer doesn't know the size, then we start with a small 
number of buckets, and do resizes as more
+ *  elements are inserted than the number of buckets.
+ *
+ *  The number of memory segments given to the staging area is usually 
one, because it just needs to hold
+ *  one record.

[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...

2016-02-11 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r52642844
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java
 ---
@@ -52,6 +52,30 @@
  */
 public class ReduceOperatorBase extends 
SingleInputOperator {
 
+   /**
+* An enumeration of hints, optionally usable to tell the system 
exactly how to execute the reduce.
+*/
+   public enum ReduceHint {
--- End diff --


https://github.com/ggevay/flink/commit/7c29f78ebbe4e772d55aae8dd7ee0802d2cdcf42


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...

2016-02-11 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r52642960
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java
 ---
@@ -0,0 +1,1014 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.hash;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.SameTypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.RandomAccessInputView;
+import org.apache.flink.runtime.memory.AbstractPagedOutputView;
+import org.apache.flink.runtime.util.MathUtils;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * This hash table supports updating elements, and it also has 
processRecordWithReduce,
+ * which makes one reduce step with the given record.
+ *
+ * The memory is divided into three areas:
+ *  - Bucket area: they contain bucket heads:
+ *an 8 byte pointer to the first link of a linked list in the record 
area
+ *  - Record area: this contains the actual data in linked list elements. 
A linked list element starts
+ *with an 8 byte pointer to the next element, and then the record 
follows.
+ *  - Staging area: This is a small, temporary storage area for writing 
updated records. This is needed,
+ *because before serializing a record, there is no way to know in 
advance how large will it be.
+ *Therefore, we can't serialize directly into the record area when we 
are doing an update, because
+ *if it turns out to be larger then the old record, then it would 
override some other record
+ *that happens to be after the old one in memory. The solution is to 
serialize to the staging area first,
+ *and then copy it to the place of the original if it has the same 
size, otherwise allocate a new linked
+ *list element at the end of the record area, and mark the old one as 
abandoned. This creates "holes" in
+ *the record area, so compactions are eventually needed.
+ *
+ *  Compaction happens by deleting everything in the bucket area, and then 
reinserting all elements.
+ *  The reinsertion happens by forgetting the structure (the linked lists) 
of the record area, and reading it
+ *  sequentially, and inserting all non-abandoned records, starting from 
the beginning of the record area.
+ *  Note, that insertions never override a record that have not been read 
by the reinsertion sweep, because
+ *  both the insertions and readings happen sequentially in the record 
area, and the insertions obviously
+ *  never overtake the reading sweep.
+ *
+ *  Note: we have to abandon the old linked list element even when the 
updated record has a smaller size
+ *  than the original, because otherwise we wouldn't know where the next 
record starts during a reinsertion
+ *  sweep.
+ *
+ *  The number of buckets depends on how large are the records. The 
serializer might be able to tell us this,
+ *  so in this case, we will calculate the number of buckets upfront, and 
won't do resizes.
+ *  If the serializer doesn't know the size, then we start with a small 
number of buckets, and do resizes as more
+ *  elements are inserted than the number of buckets.
+ *
+ *  The number of memory segments given to the staging area is usually 
one, because it just needs to hold
+ *  one record.

[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...

2016-02-11 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r52643138
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java
 ---
@@ -0,0 +1,1014 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.hash;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.SameTypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.RandomAccessInputView;
+import org.apache.flink.runtime.memory.AbstractPagedOutputView;
+import org.apache.flink.runtime.util.MathUtils;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * This hash table supports updating elements, and it also has 
processRecordWithReduce,
+ * which makes one reduce step with the given record.
+ *
+ * The memory is divided into three areas:
+ *  - Bucket area: they contain bucket heads:
+ *an 8 byte pointer to the first link of a linked list in the record 
area
+ *  - Record area: this contains the actual data in linked list elements. 
A linked list element starts
+ *with an 8 byte pointer to the next element, and then the record 
follows.
+ *  - Staging area: This is a small, temporary storage area for writing 
updated records. This is needed,
+ *because before serializing a record, there is no way to know in 
advance how large will it be.
+ *Therefore, we can't serialize directly into the record area when we 
are doing an update, because
+ *if it turns out to be larger then the old record, then it would 
override some other record
+ *that happens to be after the old one in memory. The solution is to 
serialize to the staging area first,
+ *and then copy it to the place of the original if it has the same 
size, otherwise allocate a new linked
+ *list element at the end of the record area, and mark the old one as 
abandoned. This creates "holes" in
+ *the record area, so compactions are eventually needed.
+ *
+ *  Compaction happens by deleting everything in the bucket area, and then 
reinserting all elements.
+ *  The reinsertion happens by forgetting the structure (the linked lists) 
of the record area, and reading it
+ *  sequentially, and inserting all non-abandoned records, starting from 
the beginning of the record area.
+ *  Note, that insertions never override a record that have not been read 
by the reinsertion sweep, because
+ *  both the insertions and readings happen sequentially in the record 
area, and the insertions obviously
+ *  never overtake the reading sweep.
+ *
+ *  Note: we have to abandon the old linked list element even when the 
updated record has a smaller size
+ *  than the original, because otherwise we wouldn't know where the next 
record starts during a reinsertion
+ *  sweep.
+ *
+ *  The number of buckets depends on how large are the records. The 
serializer might be able to tell us this,
+ *  so in this case, we will calculate the number of buckets upfront, and 
won't do resizes.
+ *  If the serializer doesn't know the size, then we start with a small 
number of buckets, and do resizes as more
+ *  elements are inserted than the number of buckets.
+ *
+ *  The number of memory segments given to the staging area is usually 
one, because it just needs to hold
+ *  one record.

[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...

2016-02-11 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r52643218
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java
 ---
@@ -0,0 +1,1014 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.hash;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.SameTypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.RandomAccessInputView;
+import org.apache.flink.runtime.memory.AbstractPagedOutputView;
+import org.apache.flink.runtime.util.MathUtils;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * This hash table supports updating elements, and it also has 
processRecordWithReduce,
+ * which makes one reduce step with the given record.
+ *
+ * The memory is divided into three areas:
+ *  - Bucket area: they contain bucket heads:
+ *an 8 byte pointer to the first link of a linked list in the record 
area
+ *  - Record area: this contains the actual data in linked list elements. 
A linked list element starts
+ *with an 8 byte pointer to the next element, and then the record 
follows.
+ *  - Staging area: This is a small, temporary storage area for writing 
updated records. This is needed,
+ *because before serializing a record, there is no way to know in 
advance how large will it be.
+ *Therefore, we can't serialize directly into the record area when we 
are doing an update, because
+ *if it turns out to be larger then the old record, then it would 
override some other record
+ *that happens to be after the old one in memory. The solution is to 
serialize to the staging area first,
+ *and then copy it to the place of the original if it has the same 
size, otherwise allocate a new linked
+ *list element at the end of the record area, and mark the old one as 
abandoned. This creates "holes" in
+ *the record area, so compactions are eventually needed.
+ *
+ *  Compaction happens by deleting everything in the bucket area, and then 
reinserting all elements.
+ *  The reinsertion happens by forgetting the structure (the linked lists) 
of the record area, and reading it
+ *  sequentially, and inserting all non-abandoned records, starting from 
the beginning of the record area.
+ *  Note, that insertions never override a record that have not been read 
by the reinsertion sweep, because
+ *  both the insertions and readings happen sequentially in the record 
area, and the insertions obviously
+ *  never overtake the reading sweep.
+ *
+ *  Note: we have to abandon the old linked list element even when the 
updated record has a smaller size
+ *  than the original, because otherwise we wouldn't know where the next 
record starts during a reinsertion
+ *  sweep.
+ *
+ *  The number of buckets depends on how large are the records. The 
serializer might be able to tell us this,
+ *  so in this case, we will calculate the number of buckets upfront, and 
won't do resizes.
+ *  If the serializer doesn't know the size, then we start with a small 
number of buckets, and do resizes as more
+ *  elements are inserted than the number of buckets.
+ *
+ *  The number of memory segments given to the staging area is usually 
one, because it just needs to hold
+ *  one record.

[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...

2016-02-11 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r52643336
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReduceHashTableTest.java
 ---
@@ -0,0 +1,501 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.hash;
+
+import com.google.common.collect.Ordering;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.util.CopyingListCollector;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongComparator;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.runtime.TupleComparator;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import 
org.apache.flink.runtime.operators.testutils.UniformStringPairGenerator;
+import org.apache.flink.runtime.operators.testutils.types.*;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.junit.Test;
+
+import java.util.*;
+
+import static org.junit.Assert.*;
+import static org.junit.Assert.fail;
+
+public class ReduceHashTableTest {
+
+   private static final long RANDOM_SEED = 58723953465322L;
+
+   private static final int PAGE_SIZE = 16 * 1024;
+
+   private final TypeSerializer> serializer;
+   private final TypeComparator> comparator;
+
+   private final TypeComparator probeComparator;
+
+   private final TypePairComparator> 
pairComparator;
+
+   //
+   // -- Note: This part was mostly copied from 
CompactingHashTableTest --
+
+   public ReduceHashTableTest() {
+   TypeSerializer[] fieldSerializers = { 
LongSerializer.INSTANCE, StringSerializer.INSTANCE };
+   @SuppressWarnings("unchecked")
+   Class> clazz = (Class>) (Class) Tuple2.class;
+   this.serializer = new TupleSerializer>(clazz, fieldSerializers);
+
+   TypeComparator[] comparators = { new LongComparator(true) };
+   TypeSerializer[] comparatorSerializers = { 
LongSerializer.INSTANCE };
+
+   this.comparator = new TupleComparator>(new 
int[] {0}, comparators, comparatorSerializers);
+
+   this.probeComparator = new LongComparator(true);
+
+   this.pairComparator = new TypePairComparator>() {
+
+   private long ref;
+
+   @Override
+   public void setReference(Long reference) {
+   ref = reference;
+   }
+
+   @Override
+   public boolean equalToReference(Tuple2 
candidate) {
+   //noinspection UnnecessaryUnboxing
+   return candidate.f0.longValue() == ref;
+   }
+
+   @Override
+   public int compareToReference(Tuple2 
candidate) {
+   long x = ref;
+   long y = candidate.f0;
+   return (x < y) ? -1 : ((x == y) ? 0 : 1);
+   }
+   };
+   }
+
+   @Test
+   public void testHashTableGrowthWithInsert() 

[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...

2016-02-11 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r52643374
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/manual/ReducePerformance.java 
---
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.manual;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.util.SplittableIterator;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.Random;
+
+public class ReducePerformance {
--- End diff --


https://github.com/ggevay/flink/commit/68c8f1e1468d6efe3649246007c6024028b45c7c


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...

2016-02-11 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r52643279
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MemoryHashTableTest.java
 ---
@@ -100,340 +101,371 @@
@Test
public void testDifferentProbers() {
final int NUM_MEM_PAGES = 32 * NUM_PAIRS / PAGE_SIZE;
-   
-   AbstractMutableHashTable table = new 
CompactingHashTable(serializer, comparator, getMemory(NUM_MEM_PAGES, 
PAGE_SIZE));
-   
+   testDifferentProbersCore(new CompactingHashTable<>(serializer, 
comparator, getMemory(NUM_MEM_PAGES, PAGE_SIZE)), NUM_MEM_PAGES);
--- End diff --


https://github.com/ggevay/flink/commit/fdf7abea42c3cd9587d70cac302c19834de37434


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...

2016-02-11 Thread ggevay
Github user ggevay commented on the pull request:

https://github.com/apache/flink/pull/1517#issuecomment-182993653
  
I think I've addressed all the comments, except documenting `CombineHint` 
(I will do that tomorrow). Thanks again for reviewing the PR!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...

2016-02-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r52461048
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MemoryHashTableTest.java
 ---
@@ -100,340 +101,371 @@
@Test
public void testDifferentProbers() {
final int NUM_MEM_PAGES = 32 * NUM_PAIRS / PAGE_SIZE;
-   
-   AbstractMutableHashTable table = new 
CompactingHashTable(serializer, comparator, getMemory(NUM_MEM_PAGES, 
PAGE_SIZE));
-   
+   testDifferentProbersCore(new CompactingHashTable<>(serializer, 
comparator, getMemory(NUM_MEM_PAGES, PAGE_SIZE)), NUM_MEM_PAGES);
--- End diff --

Tests for different tables should be put into different methods or even 
better into different test classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...

2016-02-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r52470915
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTablePerformanceComparison.java
 ---
@@ -102,7 +102,7 @@ public void testCompactingHashMapPerformance() {
System.out.println("Starting update...");
start = System.currentTimeMillis();
while(updater.next(target) != null) {
-   target.setValue(target.getValue()*-1);
--- End diff --

Oh yes, sorry.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...

2016-02-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r52461236
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReduceHashTableTest.java
 ---
@@ -0,0 +1,501 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.hash;
+
+import com.google.common.collect.Ordering;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.util.CopyingListCollector;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongComparator;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.runtime.TupleComparator;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import 
org.apache.flink.runtime.operators.testutils.UniformStringPairGenerator;
+import org.apache.flink.runtime.operators.testutils.types.*;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.junit.Test;
+
+import java.util.*;
+
+import static org.junit.Assert.*;
+import static org.junit.Assert.fail;
+
+public class ReduceHashTableTest {
+
+   private static final long RANDOM_SEED = 58723953465322L;
+
+   private static final int PAGE_SIZE = 16 * 1024;
+
+   private final TypeSerializer> serializer;
+   private final TypeComparator> comparator;
+
+   private final TypeComparator probeComparator;
+
+   private final TypePairComparator> 
pairComparator;
+
+   //
+   // -- Note: This part was mostly copied from 
CompactingHashTableTest --
+
+   public ReduceHashTableTest() {
+   TypeSerializer[] fieldSerializers = { 
LongSerializer.INSTANCE, StringSerializer.INSTANCE };
+   @SuppressWarnings("unchecked")
+   Class> clazz = (Class>) (Class) Tuple2.class;
+   this.serializer = new TupleSerializer>(clazz, fieldSerializers);
+
+   TypeComparator[] comparators = { new LongComparator(true) };
+   TypeSerializer[] comparatorSerializers = { 
LongSerializer.INSTANCE };
+
+   this.comparator = new TupleComparator>(new 
int[] {0}, comparators, comparatorSerializers);
+
+   this.probeComparator = new LongComparator(true);
+
+   this.pairComparator = new TypePairComparator>() {
+
+   private long ref;
+
+   @Override
+   public void setReference(Long reference) {
+   ref = reference;
+   }
+
+   @Override
+   public boolean equalToReference(Tuple2 
candidate) {
+   //noinspection UnnecessaryUnboxing
+   return candidate.f0.longValue() == ref;
+   }
+
+   @Override
+   public int compareToReference(Tuple2 
candidate) {
+   long x = ref;
+   long y = candidate.f0;
+   return (x < y) ? -1 : ((x == y) ? 0 : 1);
+   }
+   };
+   }
+
+   @Test
+   public void 

[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...

2016-02-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r52461306
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/manual/ReducePerformance.java 
---
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.manual;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.util.SplittableIterator;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.Random;
+
+public class ReducePerformance {
--- End diff --

The name of the class is misleading.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...

2016-02-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r52461281
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/manual/ReducePerformance.java 
---
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.manual;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.util.SplittableIterator;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.Random;
+
+public class ReducePerformance {
+   
+   public static void main(String[] args) throws Exception {
+
+   ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+   //env.getConfig().enableObjectReuse();
+   //env.setParallelism(1);
+
+   @SuppressWarnings("unchecked")
+   DataSet> output =
+   env.fromParallelCollection(new 
SplittableRandomIterator(40 * 1000 * 1000, new TupleIntIntIterator(4 * 1000 * 
1000)),
+   TupleTypeInfo.>getBasicTupleTypeInfo(Integer.class, Integer.class))
+   .groupBy("0")
+   .reduce(new SumReducer(), 
ReduceOperatorBase.ReduceHint.HASH);
+
+// DataSet> output =
--- End diff --

Please remove commented code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...

2016-02-10 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1517#issuecomment-182398031
  
Hi @ggevay, sorry it took me very long to review your PR.

As I said before, this is a very desirable feature and a solid 
implementation.
I think a few things can be improved. Especially the full stop to resize / 
rebuild / emit records might take some time, depending on the size of the table.

I have the following suggestions / ideas:
- Split the single record into multiple partitions (i.e., use multiple 
`RecordArea`s). Each partition holds the data of multiple buckets. This allows 
to restrict rebuilding, compaction, etc. to only a part of the whole table. In 
fact this is what I meant in my initial comment about tracking the updates of 
buckets (which I confused with partitions...). Partitioning is also used in the 
other hash tables in Flink. It can also help to make your implementation more 
suitable for the final reduce case, because it allows to spill individual 
partitions to disk. In `CompactingHashTable` the max number of partitions is 
set to `32`.
- Should we think about [linear 
hashing](https://en.wikipedia.org/wiki/Linear_hashing) for resizing the table. 
This technique grows the table by splitting individual buckets without the need 
to reorganize the whole table.
- Do you think it is possible to extract the ReduceFunction from the table? 
IMO this would be a cleaner design if we want to use the table instead of the 
`CompactingHashTable`.

What do you think?

In any case, we need to update the documentation for the added 
`CombineHint`. I would also be good to extend `ReduceITCase` with a few tests 
that use `CombineHint.HASH`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...

2016-02-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r52471114
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MemoryHashTableTest.java
 ---
@@ -100,340 +101,371 @@
@Test
public void testDifferentProbers() {
final int NUM_MEM_PAGES = 32 * NUM_PAIRS / PAGE_SIZE;
-   
-   AbstractMutableHashTable table = new 
CompactingHashTable(serializer, comparator, getMemory(NUM_MEM_PAGES, 
PAGE_SIZE));
-   
+   testDifferentProbersCore(new CompactingHashTable<>(serializer, 
comparator, getMemory(NUM_MEM_PAGES, PAGE_SIZE)), NUM_MEM_PAGES);
--- End diff --

I think it is good practice to separate tests for different components into 
different test classes. That way you see directly for which component a test 
failed. I also keeps logic separated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...

2016-02-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r52460829
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java
 ---
@@ -0,0 +1,1014 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.hash;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.SameTypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.RandomAccessInputView;
+import org.apache.flink.runtime.memory.AbstractPagedOutputView;
+import org.apache.flink.runtime.util.MathUtils;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * This hash table supports updating elements, and it also has 
processRecordWithReduce,
+ * which makes one reduce step with the given record.
+ *
+ * The memory is divided into three areas:
+ *  - Bucket area: they contain bucket heads:
+ *an 8 byte pointer to the first link of a linked list in the record 
area
+ *  - Record area: this contains the actual data in linked list elements. 
A linked list element starts
+ *with an 8 byte pointer to the next element, and then the record 
follows.
+ *  - Staging area: This is a small, temporary storage area for writing 
updated records. This is needed,
+ *because before serializing a record, there is no way to know in 
advance how large will it be.
+ *Therefore, we can't serialize directly into the record area when we 
are doing an update, because
+ *if it turns out to be larger then the old record, then it would 
override some other record
+ *that happens to be after the old one in memory. The solution is to 
serialize to the staging area first,
+ *and then copy it to the place of the original if it has the same 
size, otherwise allocate a new linked
+ *list element at the end of the record area, and mark the old one as 
abandoned. This creates "holes" in
+ *the record area, so compactions are eventually needed.
+ *
+ *  Compaction happens by deleting everything in the bucket area, and then 
reinserting all elements.
+ *  The reinsertion happens by forgetting the structure (the linked lists) 
of the record area, and reading it
+ *  sequentially, and inserting all non-abandoned records, starting from 
the beginning of the record area.
+ *  Note, that insertions never override a record that have not been read 
by the reinsertion sweep, because
+ *  both the insertions and readings happen sequentially in the record 
area, and the insertions obviously
+ *  never overtake the reading sweep.
+ *
+ *  Note: we have to abandon the old linked list element even when the 
updated record has a smaller size
+ *  than the original, because otherwise we wouldn't know where the next 
record starts during a reinsertion
+ *  sweep.
+ *
+ *  The number of buckets depends on how large are the records. The 
serializer might be able to tell us this,
+ *  so in this case, we will calculate the number of buckets upfront, and 
won't do resizes.
+ *  If the serializer doesn't know the size, then we start with a small 
number of buckets, and do resizes as more
+ *  elements are inserted than the number of buckets.
+ *
+ *  The number of memory segments given to the staging area is usually 
one, because it just needs to hold
+ *  one record.
   

[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...

2016-02-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r52460943
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTablePerformanceComparison.java
 ---
@@ -102,7 +102,7 @@ public void testCompactingHashMapPerformance() {
System.out.println("Starting update...");
start = System.currentTimeMillis();
while(updater.next(target) != null) {
-   target.setValue(target.getValue()*-1);
--- End diff --

Why are you changing these lines?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...

2016-02-10 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r52464062
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java
 ---
@@ -0,0 +1,1014 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.hash;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.SameTypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.RandomAccessInputView;
+import org.apache.flink.runtime.memory.AbstractPagedOutputView;
+import org.apache.flink.runtime.util.MathUtils;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * This hash table supports updating elements, and it also has 
processRecordWithReduce,
+ * which makes one reduce step with the given record.
+ *
+ * The memory is divided into three areas:
+ *  - Bucket area: they contain bucket heads:
+ *an 8 byte pointer to the first link of a linked list in the record 
area
+ *  - Record area: this contains the actual data in linked list elements. 
A linked list element starts
+ *with an 8 byte pointer to the next element, and then the record 
follows.
+ *  - Staging area: This is a small, temporary storage area for writing 
updated records. This is needed,
+ *because before serializing a record, there is no way to know in 
advance how large will it be.
+ *Therefore, we can't serialize directly into the record area when we 
are doing an update, because
+ *if it turns out to be larger then the old record, then it would 
override some other record
+ *that happens to be after the old one in memory. The solution is to 
serialize to the staging area first,
+ *and then copy it to the place of the original if it has the same 
size, otherwise allocate a new linked
+ *list element at the end of the record area, and mark the old one as 
abandoned. This creates "holes" in
+ *the record area, so compactions are eventually needed.
+ *
+ *  Compaction happens by deleting everything in the bucket area, and then 
reinserting all elements.
+ *  The reinsertion happens by forgetting the structure (the linked lists) 
of the record area, and reading it
+ *  sequentially, and inserting all non-abandoned records, starting from 
the beginning of the record area.
+ *  Note, that insertions never override a record that have not been read 
by the reinsertion sweep, because
+ *  both the insertions and readings happen sequentially in the record 
area, and the insertions obviously
+ *  never overtake the reading sweep.
+ *
+ *  Note: we have to abandon the old linked list element even when the 
updated record has a smaller size
+ *  than the original, because otherwise we wouldn't know where the next 
record starts during a reinsertion
+ *  sweep.
+ *
+ *  The number of buckets depends on how large are the records. The 
serializer might be able to tell us this,
+ *  so in this case, we will calculate the number of buckets upfront, and 
won't do resizes.
+ *  If the serializer doesn't know the size, then we start with a small 
number of buckets, and do resizes as more
+ *  elements are inserted than the number of buckets.
+ *
+ *  The number of memory segments given to the staging area is usually 
one, because it just needs to hold
+ *  one record.

[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...

2016-02-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r52469979
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java
 ---
@@ -0,0 +1,1014 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.hash;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.SameTypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.RandomAccessInputView;
+import org.apache.flink.runtime.memory.AbstractPagedOutputView;
+import org.apache.flink.runtime.util.MathUtils;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * This hash table supports updating elements, and it also has 
processRecordWithReduce,
+ * which makes one reduce step with the given record.
+ *
+ * The memory is divided into three areas:
+ *  - Bucket area: they contain bucket heads:
+ *an 8 byte pointer to the first link of a linked list in the record 
area
+ *  - Record area: this contains the actual data in linked list elements. 
A linked list element starts
+ *with an 8 byte pointer to the next element, and then the record 
follows.
+ *  - Staging area: This is a small, temporary storage area for writing 
updated records. This is needed,
+ *because before serializing a record, there is no way to know in 
advance how large will it be.
+ *Therefore, we can't serialize directly into the record area when we 
are doing an update, because
+ *if it turns out to be larger then the old record, then it would 
override some other record
+ *that happens to be after the old one in memory. The solution is to 
serialize to the staging area first,
+ *and then copy it to the place of the original if it has the same 
size, otherwise allocate a new linked
+ *list element at the end of the record area, and mark the old one as 
abandoned. This creates "holes" in
+ *the record area, so compactions are eventually needed.
+ *
+ *  Compaction happens by deleting everything in the bucket area, and then 
reinserting all elements.
+ *  The reinsertion happens by forgetting the structure (the linked lists) 
of the record area, and reading it
+ *  sequentially, and inserting all non-abandoned records, starting from 
the beginning of the record area.
+ *  Note, that insertions never override a record that have not been read 
by the reinsertion sweep, because
+ *  both the insertions and readings happen sequentially in the record 
area, and the insertions obviously
+ *  never overtake the reading sweep.
+ *
+ *  Note: we have to abandon the old linked list element even when the 
updated record has a smaller size
+ *  than the original, because otherwise we wouldn't know where the next 
record starts during a reinsertion
+ *  sweep.
+ *
+ *  The number of buckets depends on how large are the records. The 
serializer might be able to tell us this,
+ *  so in this case, we will calculate the number of buckets upfront, and 
won't do resizes.
+ *  If the serializer doesn't know the size, then we start with a small 
number of buckets, and do resizes as more
+ *  elements are inserted than the number of buckets.
+ *
+ *  The number of memory segments given to the staging area is usually 
one, because it just needs to hold
+ *  one record.
   

[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...

2016-02-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r52459151
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/AbstractBlockResettableIterator.java
 ---
@@ -83,6 +83,7 @@ protected 
AbstractBlockResettableIterator(TypeSerializer serializer, MemoryMa
this.collectingView = new 
SimpleCollectingOutputView(this.fullSegments, 
new 
ListMemorySegmentSource(this.emptySegments), memoryManager.getPageSize());
this.readView = new RandomAccessInputView(this.fullSegments, 
memoryManager.getPageSize());
+   this.readView.setReadPosition(0);
--- End diff --

Not needed if additional constructor is added to `RandomAccessInputView`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...

2016-02-10 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r52465416
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java
 ---
@@ -0,0 +1,1014 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.hash;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.SameTypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.RandomAccessInputView;
+import org.apache.flink.runtime.memory.AbstractPagedOutputView;
+import org.apache.flink.runtime.util.MathUtils;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * This hash table supports updating elements, and it also has 
processRecordWithReduce,
+ * which makes one reduce step with the given record.
+ *
+ * The memory is divided into three areas:
+ *  - Bucket area: they contain bucket heads:
+ *an 8 byte pointer to the first link of a linked list in the record 
area
+ *  - Record area: this contains the actual data in linked list elements. 
A linked list element starts
+ *with an 8 byte pointer to the next element, and then the record 
follows.
+ *  - Staging area: This is a small, temporary storage area for writing 
updated records. This is needed,
+ *because before serializing a record, there is no way to know in 
advance how large will it be.
+ *Therefore, we can't serialize directly into the record area when we 
are doing an update, because
+ *if it turns out to be larger then the old record, then it would 
override some other record
+ *that happens to be after the old one in memory. The solution is to 
serialize to the staging area first,
+ *and then copy it to the place of the original if it has the same 
size, otherwise allocate a new linked
+ *list element at the end of the record area, and mark the old one as 
abandoned. This creates "holes" in
+ *the record area, so compactions are eventually needed.
+ *
+ *  Compaction happens by deleting everything in the bucket area, and then 
reinserting all elements.
+ *  The reinsertion happens by forgetting the structure (the linked lists) 
of the record area, and reading it
+ *  sequentially, and inserting all non-abandoned records, starting from 
the beginning of the record area.
+ *  Note, that insertions never override a record that have not been read 
by the reinsertion sweep, because
+ *  both the insertions and readings happen sequentially in the record 
area, and the insertions obviously
+ *  never overtake the reading sweep.
+ *
+ *  Note: we have to abandon the old linked list element even when the 
updated record has a smaller size
+ *  than the original, because otherwise we wouldn't know where the next 
record starts during a reinsertion
+ *  sweep.
+ *
+ *  The number of buckets depends on how large are the records. The 
serializer might be able to tell us this,
+ *  so in this case, we will calculate the number of buckets upfront, and 
won't do resizes.
+ *  If the serializer doesn't know the size, then we start with a small 
number of buckets, and do resizes as more
+ *  elements are inserted than the number of buckets.
+ *
+ *  The number of memory segments given to the staging area is usually 
one, because it just needs to hold
+ *  one record.

[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...

2016-02-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r52470557
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java
 ---
@@ -0,0 +1,1014 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.hash;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.SameTypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.RandomAccessInputView;
+import org.apache.flink.runtime.memory.AbstractPagedOutputView;
+import org.apache.flink.runtime.util.MathUtils;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * This hash table supports updating elements, and it also has 
processRecordWithReduce,
+ * which makes one reduce step with the given record.
+ *
+ * The memory is divided into three areas:
+ *  - Bucket area: they contain bucket heads:
+ *an 8 byte pointer to the first link of a linked list in the record 
area
+ *  - Record area: this contains the actual data in linked list elements. 
A linked list element starts
+ *with an 8 byte pointer to the next element, and then the record 
follows.
+ *  - Staging area: This is a small, temporary storage area for writing 
updated records. This is needed,
+ *because before serializing a record, there is no way to know in 
advance how large will it be.
+ *Therefore, we can't serialize directly into the record area when we 
are doing an update, because
+ *if it turns out to be larger then the old record, then it would 
override some other record
+ *that happens to be after the old one in memory. The solution is to 
serialize to the staging area first,
+ *and then copy it to the place of the original if it has the same 
size, otherwise allocate a new linked
+ *list element at the end of the record area, and mark the old one as 
abandoned. This creates "holes" in
+ *the record area, so compactions are eventually needed.
+ *
+ *  Compaction happens by deleting everything in the bucket area, and then 
reinserting all elements.
+ *  The reinsertion happens by forgetting the structure (the linked lists) 
of the record area, and reading it
+ *  sequentially, and inserting all non-abandoned records, starting from 
the beginning of the record area.
+ *  Note, that insertions never override a record that have not been read 
by the reinsertion sweep, because
+ *  both the insertions and readings happen sequentially in the record 
area, and the insertions obviously
+ *  never overtake the reading sweep.
+ *
+ *  Note: we have to abandon the old linked list element even when the 
updated record has a smaller size
+ *  than the original, because otherwise we wouldn't know where the next 
record starts during a reinsertion
+ *  sweep.
+ *
+ *  The number of buckets depends on how large are the records. The 
serializer might be able to tell us this,
+ *  so in this case, we will calculate the number of buckets upfront, and 
won't do resizes.
+ *  If the serializer doesn't know the size, then we start with a small 
number of buckets, and do resizes as more
+ *  elements are inserted than the number of buckets.
+ *
+ *  The number of memory segments given to the staging area is usually 
one, because it just needs to hold
+ *  one record.
   

[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...

2016-02-10 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r52472588
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/manual/ReducePerformance.java 
---
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.manual;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.util.SplittableIterator;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.Random;
+
+public class ReducePerformance {
--- End diff --

I wanted this test to be a performance comparison between the sort-based 
and hash-based combines.

The way I was using it is to run it unmodified, note the time, then modify 
the hint to sorting, and run it again. But I'll modify it to make it more 
convenient to run both of them.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...

2016-02-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r52460226
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java
 ---
@@ -0,0 +1,1014 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.hash;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.SameTypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.RandomAccessInputView;
+import org.apache.flink.runtime.memory.AbstractPagedOutputView;
+import org.apache.flink.runtime.util.MathUtils;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * This hash table supports updating elements, and it also has 
processRecordWithReduce,
+ * which makes one reduce step with the given record.
+ *
+ * The memory is divided into three areas:
+ *  - Bucket area: they contain bucket heads:
+ *an 8 byte pointer to the first link of a linked list in the record 
area
+ *  - Record area: this contains the actual data in linked list elements. 
A linked list element starts
+ *with an 8 byte pointer to the next element, and then the record 
follows.
+ *  - Staging area: This is a small, temporary storage area for writing 
updated records. This is needed,
+ *because before serializing a record, there is no way to know in 
advance how large will it be.
+ *Therefore, we can't serialize directly into the record area when we 
are doing an update, because
+ *if it turns out to be larger then the old record, then it would 
override some other record
+ *that happens to be after the old one in memory. The solution is to 
serialize to the staging area first,
+ *and then copy it to the place of the original if it has the same 
size, otherwise allocate a new linked
+ *list element at the end of the record area, and mark the old one as 
abandoned. This creates "holes" in
+ *the record area, so compactions are eventually needed.
+ *
+ *  Compaction happens by deleting everything in the bucket area, and then 
reinserting all elements.
+ *  The reinsertion happens by forgetting the structure (the linked lists) 
of the record area, and reading it
+ *  sequentially, and inserting all non-abandoned records, starting from 
the beginning of the record area.
+ *  Note, that insertions never override a record that have not been read 
by the reinsertion sweep, because
+ *  both the insertions and readings happen sequentially in the record 
area, and the insertions obviously
+ *  never overtake the reading sweep.
+ *
+ *  Note: we have to abandon the old linked list element even when the 
updated record has a smaller size
+ *  than the original, because otherwise we wouldn't know where the next 
record starts during a reinsertion
+ *  sweep.
+ *
+ *  The number of buckets depends on how large are the records. The 
serializer might be able to tell us this,
+ *  so in this case, we will calculate the number of buckets upfront, and 
won't do resizes.
+ *  If the serializer doesn't know the size, then we start with a small 
number of buckets, and do resizes as more
+ *  elements are inserted than the number of buckets.
+ *
+ *  The number of memory segments given to the staging area is usually 
one, because it just needs to hold
+ *  one record.
   

[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...

2016-02-10 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r52463167
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java
 ---
@@ -0,0 +1,1014 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.hash;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.SameTypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.RandomAccessInputView;
+import org.apache.flink.runtime.memory.AbstractPagedOutputView;
+import org.apache.flink.runtime.util.MathUtils;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * This hash table supports updating elements, and it also has 
processRecordWithReduce,
+ * which makes one reduce step with the given record.
+ *
+ * The memory is divided into three areas:
+ *  - Bucket area: they contain bucket heads:
+ *an 8 byte pointer to the first link of a linked list in the record 
area
+ *  - Record area: this contains the actual data in linked list elements. 
A linked list element starts
+ *with an 8 byte pointer to the next element, and then the record 
follows.
+ *  - Staging area: This is a small, temporary storage area for writing 
updated records. This is needed,
+ *because before serializing a record, there is no way to know in 
advance how large will it be.
+ *Therefore, we can't serialize directly into the record area when we 
are doing an update, because
+ *if it turns out to be larger then the old record, then it would 
override some other record
+ *that happens to be after the old one in memory. The solution is to 
serialize to the staging area first,
+ *and then copy it to the place of the original if it has the same 
size, otherwise allocate a new linked
+ *list element at the end of the record area, and mark the old one as 
abandoned. This creates "holes" in
+ *the record area, so compactions are eventually needed.
+ *
+ *  Compaction happens by deleting everything in the bucket area, and then 
reinserting all elements.
+ *  The reinsertion happens by forgetting the structure (the linked lists) 
of the record area, and reading it
+ *  sequentially, and inserting all non-abandoned records, starting from 
the beginning of the record area.
+ *  Note, that insertions never override a record that have not been read 
by the reinsertion sweep, because
+ *  both the insertions and readings happen sequentially in the record 
area, and the insertions obviously
+ *  never overtake the reading sweep.
+ *
+ *  Note: we have to abandon the old linked list element even when the 
updated record has a smaller size
+ *  than the original, because otherwise we wouldn't know where the next 
record starts during a reinsertion
+ *  sweep.
+ *
+ *  The number of buckets depends on how large are the records. The 
serializer might be able to tell us this,
+ *  so in this case, we will calculate the number of buckets upfront, and 
won't do resizes.
+ *  If the serializer doesn't know the size, then we start with a small 
number of buckets, and do resizes as more
+ *  elements are inserted than the number of buckets.
+ *
+ *  The number of memory segments given to the staging area is usually 
one, because it just needs to hold
+ *  one record.

[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...

2016-02-10 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r52468711
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/manual/ReducePerformance.java 
---
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.manual;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.util.SplittableIterator;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.Random;
+
+public class ReducePerformance {
--- End diff --

I'm not sure what you mean. Should it be ReducePerformanceTest? Or 
CombinePerformance?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...

2016-02-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r52473636
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/manual/ReducePerformance.java 
---
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.manual;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.util.SplittableIterator;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.Random;
+
+public class ReducePerformance {
--- End diff --

Ah, OK. Why not add two private methods. One for each strategy and call 
them from `main()`. In this case, `ReduceBenchmark` is fine. We had also other 
performance benchmarks but (temporarily) removed them due to licensing reasons 
(see FLINK-2973).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...

2016-02-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r52458969
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java
 ---
@@ -52,6 +52,30 @@
  */
 public class ReduceOperatorBase extends 
SingleInputOperator {
 
+   /**
+* An enumeration of hints, optionally usable to tell the system 
exactly how to execute the reduce.
+*/
+   public enum ReduceHint {
--- End diff --

Should be renamed to `CombineHint` and all corresponding methods and member 
variables as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...

2016-02-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r52459830
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java
 ---
@@ -0,0 +1,1014 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.hash;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.SameTypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.RandomAccessInputView;
+import org.apache.flink.runtime.memory.AbstractPagedOutputView;
+import org.apache.flink.runtime.util.MathUtils;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * This hash table supports updating elements, and it also has 
processRecordWithReduce,
+ * which makes one reduce step with the given record.
+ *
+ * The memory is divided into three areas:
+ *  - Bucket area: they contain bucket heads:
+ *an 8 byte pointer to the first link of a linked list in the record 
area
+ *  - Record area: this contains the actual data in linked list elements. 
A linked list element starts
+ *with an 8 byte pointer to the next element, and then the record 
follows.
+ *  - Staging area: This is a small, temporary storage area for writing 
updated records. This is needed,
+ *because before serializing a record, there is no way to know in 
advance how large will it be.
+ *Therefore, we can't serialize directly into the record area when we 
are doing an update, because
+ *if it turns out to be larger then the old record, then it would 
override some other record
+ *that happens to be after the old one in memory. The solution is to 
serialize to the staging area first,
+ *and then copy it to the place of the original if it has the same 
size, otherwise allocate a new linked
+ *list element at the end of the record area, and mark the old one as 
abandoned. This creates "holes" in
+ *the record area, so compactions are eventually needed.
+ *
+ *  Compaction happens by deleting everything in the bucket area, and then 
reinserting all elements.
+ *  The reinsertion happens by forgetting the structure (the linked lists) 
of the record area, and reading it
+ *  sequentially, and inserting all non-abandoned records, starting from 
the beginning of the record area.
+ *  Note, that insertions never override a record that have not been read 
by the reinsertion sweep, because
+ *  both the insertions and readings happen sequentially in the record 
area, and the insertions obviously
+ *  never overtake the reading sweep.
+ *
+ *  Note: we have to abandon the old linked list element even when the 
updated record has a smaller size
+ *  than the original, because otherwise we wouldn't know where the next 
record starts during a reinsertion
+ *  sweep.
+ *
+ *  The number of buckets depends on how large are the records. The 
serializer might be able to tell us this,
+ *  so in this case, we will calculate the number of buckets upfront, and 
won't do resizes.
+ *  If the serializer doesn't know the size, then we start with a small 
number of buckets, and do resizes as more
+ *  elements are inserted than the number of buckets.
+ *
+ *  The number of memory segments given to the staging area is usually 
one, because it just needs to hold
+ *  one record.
   

[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...

2016-02-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r52460590
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java
 ---
@@ -0,0 +1,1014 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.hash;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.SameTypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.RandomAccessInputView;
+import org.apache.flink.runtime.memory.AbstractPagedOutputView;
+import org.apache.flink.runtime.util.MathUtils;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * This hash table supports updating elements, and it also has 
processRecordWithReduce,
+ * which makes one reduce step with the given record.
+ *
+ * The memory is divided into three areas:
+ *  - Bucket area: they contain bucket heads:
+ *an 8 byte pointer to the first link of a linked list in the record 
area
+ *  - Record area: this contains the actual data in linked list elements. 
A linked list element starts
+ *with an 8 byte pointer to the next element, and then the record 
follows.
+ *  - Staging area: This is a small, temporary storage area for writing 
updated records. This is needed,
+ *because before serializing a record, there is no way to know in 
advance how large will it be.
+ *Therefore, we can't serialize directly into the record area when we 
are doing an update, because
+ *if it turns out to be larger then the old record, then it would 
override some other record
+ *that happens to be after the old one in memory. The solution is to 
serialize to the staging area first,
+ *and then copy it to the place of the original if it has the same 
size, otherwise allocate a new linked
+ *list element at the end of the record area, and mark the old one as 
abandoned. This creates "holes" in
+ *the record area, so compactions are eventually needed.
+ *
+ *  Compaction happens by deleting everything in the bucket area, and then 
reinserting all elements.
+ *  The reinsertion happens by forgetting the structure (the linked lists) 
of the record area, and reading it
+ *  sequentially, and inserting all non-abandoned records, starting from 
the beginning of the record area.
+ *  Note, that insertions never override a record that have not been read 
by the reinsertion sweep, because
+ *  both the insertions and readings happen sequentially in the record 
area, and the insertions obviously
+ *  never overtake the reading sweep.
+ *
+ *  Note: we have to abandon the old linked list element even when the 
updated record has a smaller size
+ *  than the original, because otherwise we wouldn't know where the next 
record starts during a reinsertion
+ *  sweep.
+ *
+ *  The number of buckets depends on how large are the records. The 
serializer might be able to tell us this,
+ *  so in this case, we will calculate the number of buckets upfront, and 
won't do resizes.
+ *  If the serializer doesn't know the size, then we start with a small 
number of buckets, and do resizes as more
+ *  elements are inserted than the number of buckets.
+ *
+ *  The number of memory segments given to the staging area is usually 
one, because it just needs to hold
+ *  one record.
   

[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...

2016-02-10 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r52460659
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java
 ---
@@ -52,6 +52,30 @@
  */
 public class ReduceOperatorBase extends 
SingleInputOperator {
 
+   /**
+* An enumeration of hints, optionally usable to tell the system 
exactly how to execute the reduce.
+*/
+   public enum ReduceHint {
--- End diff --

Ah, of course, you are right.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...

2016-02-10 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r52460684
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/RandomAccessInputView.java
 ---
@@ -45,9 +45,12 @@ public RandomAccessInputView(ArrayList 
segments, int segmentSize)
{
this(segments, segmentSize, segmentSize);
}
-   
+
+   /**
+* Warning: setReadPosition has to be called before reading.
+*/
public RandomAccessInputView(ArrayList segments, int 
segmentSize, int limitInLastSegment) {
--- End diff --

ok, will do


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...

2016-02-10 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r52468223
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/manual/ReducePerformance.java 
---
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.manual;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.util.SplittableIterator;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.Random;
+
+public class ReducePerformance {
+   
+   public static void main(String[] args) throws Exception {
+
+   ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+   //env.getConfig().enableObjectReuse();
+   //env.setParallelism(1);
+
+   @SuppressWarnings("unchecked")
+   DataSet> output =
+   env.fromParallelCollection(new 
SplittableRandomIterator(40 * 1000 * 1000, new TupleIntIntIterator(4 * 1000 * 
1000)),
+   TupleTypeInfo.>getBasicTupleTypeInfo(Integer.class, Integer.class))
+   .groupBy("0")
+   .reduce(new SumReducer(), 
ReduceOperatorBase.ReduceHint.HASH);
+
+// DataSet> output =
--- End diff --

OK, sorry, I'll split the two versions to different methods.
The purpose here was to comment this in and comment out the similar part 
above when someone wants to run the test with `TupleStringIntIterator` instead 
of `TupleIntIntIterator`, but I realize now that this is not a good way to do 
this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...

2016-02-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r52471392
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/manual/ReducePerformance.java 
---
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.manual;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.util.SplittableIterator;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.Random;
+
+public class ReducePerformance {
--- End diff --

How about something like `HashCombinedReduceBenchmark`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...

2016-02-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r52459053
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/RandomAccessInputView.java
 ---
@@ -45,9 +45,12 @@ public RandomAccessInputView(ArrayList 
segments, int segmentSize)
{
this(segments, segmentSize, segmentSize);
}
-   
+
+   /**
+* Warning: setReadPosition has to be called before reading.
+*/
public RandomAccessInputView(ArrayList segments, int 
segmentSize, int limitInLastSegment) {
--- End diff --

Can you add a new constructor instead of modifying the behavior of this one?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...

2016-02-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r52460049
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java
 ---
@@ -0,0 +1,1014 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.hash;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.SameTypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.RandomAccessInputView;
+import org.apache.flink.runtime.memory.AbstractPagedOutputView;
+import org.apache.flink.runtime.util.MathUtils;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * This hash table supports updating elements, and it also has 
processRecordWithReduce,
+ * which makes one reduce step with the given record.
+ *
+ * The memory is divided into three areas:
+ *  - Bucket area: they contain bucket heads:
+ *an 8 byte pointer to the first link of a linked list in the record 
area
+ *  - Record area: this contains the actual data in linked list elements. 
A linked list element starts
+ *with an 8 byte pointer to the next element, and then the record 
follows.
+ *  - Staging area: This is a small, temporary storage area for writing 
updated records. This is needed,
+ *because before serializing a record, there is no way to know in 
advance how large will it be.
+ *Therefore, we can't serialize directly into the record area when we 
are doing an update, because
+ *if it turns out to be larger then the old record, then it would 
override some other record
+ *that happens to be after the old one in memory. The solution is to 
serialize to the staging area first,
+ *and then copy it to the place of the original if it has the same 
size, otherwise allocate a new linked
+ *list element at the end of the record area, and mark the old one as 
abandoned. This creates "holes" in
+ *the record area, so compactions are eventually needed.
+ *
+ *  Compaction happens by deleting everything in the bucket area, and then 
reinserting all elements.
+ *  The reinsertion happens by forgetting the structure (the linked lists) 
of the record area, and reading it
+ *  sequentially, and inserting all non-abandoned records, starting from 
the beginning of the record area.
+ *  Note, that insertions never override a record that have not been read 
by the reinsertion sweep, because
+ *  both the insertions and readings happen sequentially in the record 
area, and the insertions obviously
+ *  never overtake the reading sweep.
+ *
+ *  Note: we have to abandon the old linked list element even when the 
updated record has a smaller size
+ *  than the original, because otherwise we wouldn't know where the next 
record starts during a reinsertion
+ *  sweep.
+ *
+ *  The number of buckets depends on how large are the records. The 
serializer might be able to tell us this,
+ *  so in this case, we will calculate the number of buckets upfront, and 
won't do resizes.
+ *  If the serializer doesn't know the size, then we start with a small 
number of buckets, and do resizes as more
+ *  elements are inserted than the number of buckets.
+ *
+ *  The number of memory segments given to the staging area is usually 
one, because it just needs to hold
+ *  one record.
   

[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...

2016-02-10 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r52465854
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java
 ---
@@ -0,0 +1,1014 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.hash;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.SameTypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.RandomAccessInputView;
+import org.apache.flink.runtime.memory.AbstractPagedOutputView;
+import org.apache.flink.runtime.util.MathUtils;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * This hash table supports updating elements, and it also has 
processRecordWithReduce,
+ * which makes one reduce step with the given record.
+ *
+ * The memory is divided into three areas:
+ *  - Bucket area: they contain bucket heads:
+ *an 8 byte pointer to the first link of a linked list in the record 
area
+ *  - Record area: this contains the actual data in linked list elements. 
A linked list element starts
+ *with an 8 byte pointer to the next element, and then the record 
follows.
+ *  - Staging area: This is a small, temporary storage area for writing 
updated records. This is needed,
+ *because before serializing a record, there is no way to know in 
advance how large will it be.
+ *Therefore, we can't serialize directly into the record area when we 
are doing an update, because
+ *if it turns out to be larger then the old record, then it would 
override some other record
+ *that happens to be after the old one in memory. The solution is to 
serialize to the staging area first,
+ *and then copy it to the place of the original if it has the same 
size, otherwise allocate a new linked
+ *list element at the end of the record area, and mark the old one as 
abandoned. This creates "holes" in
+ *the record area, so compactions are eventually needed.
+ *
+ *  Compaction happens by deleting everything in the bucket area, and then 
reinserting all elements.
+ *  The reinsertion happens by forgetting the structure (the linked lists) 
of the record area, and reading it
+ *  sequentially, and inserting all non-abandoned records, starting from 
the beginning of the record area.
+ *  Note, that insertions never override a record that have not been read 
by the reinsertion sweep, because
+ *  both the insertions and readings happen sequentially in the record 
area, and the insertions obviously
+ *  never overtake the reading sweep.
+ *
+ *  Note: we have to abandon the old linked list element even when the 
updated record has a smaller size
+ *  than the original, because otherwise we wouldn't know where the next 
record starts during a reinsertion
+ *  sweep.
+ *
+ *  The number of buckets depends on how large are the records. The 
serializer might be able to tell us this,
+ *  so in this case, we will calculate the number of buckets upfront, and 
won't do resizes.
+ *  If the serializer doesn't know the size, then we start with a small 
number of buckets, and do resizes as more
+ *  elements are inserted than the number of buckets.
+ *
+ *  The number of memory segments given to the staging area is usually 
one, because it just needs to hold
+ *  one record.

[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...

2016-02-10 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r52466336
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTablePerformanceComparison.java
 ---
@@ -102,7 +102,7 @@ public void testCompactingHashMapPerformance() {
System.out.println("Starting update...");
start = System.currentTimeMillis();
while(updater.next(target) != null) {
-   target.setValue(target.getValue()*-1);
--- End diff --

`target.getValue()` is always 0 at this point. (see the 6th paragraph in 
the PR opening comment)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...

2016-02-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r52460672
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java
 ---
@@ -0,0 +1,1014 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.hash;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.SameTypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.RandomAccessInputView;
+import org.apache.flink.runtime.memory.AbstractPagedOutputView;
+import org.apache.flink.runtime.util.MathUtils;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * This hash table supports updating elements, and it also has 
processRecordWithReduce,
+ * which makes one reduce step with the given record.
+ *
+ * The memory is divided into three areas:
+ *  - Bucket area: they contain bucket heads:
+ *an 8 byte pointer to the first link of a linked list in the record 
area
+ *  - Record area: this contains the actual data in linked list elements. 
A linked list element starts
+ *with an 8 byte pointer to the next element, and then the record 
follows.
+ *  - Staging area: This is a small, temporary storage area for writing 
updated records. This is needed,
+ *because before serializing a record, there is no way to know in 
advance how large will it be.
+ *Therefore, we can't serialize directly into the record area when we 
are doing an update, because
+ *if it turns out to be larger then the old record, then it would 
override some other record
+ *that happens to be after the old one in memory. The solution is to 
serialize to the staging area first,
+ *and then copy it to the place of the original if it has the same 
size, otherwise allocate a new linked
+ *list element at the end of the record area, and mark the old one as 
abandoned. This creates "holes" in
+ *the record area, so compactions are eventually needed.
+ *
+ *  Compaction happens by deleting everything in the bucket area, and then 
reinserting all elements.
+ *  The reinsertion happens by forgetting the structure (the linked lists) 
of the record area, and reading it
+ *  sequentially, and inserting all non-abandoned records, starting from 
the beginning of the record area.
+ *  Note, that insertions never override a record that have not been read 
by the reinsertion sweep, because
+ *  both the insertions and readings happen sequentially in the record 
area, and the insertions obviously
+ *  never overtake the reading sweep.
+ *
+ *  Note: we have to abandon the old linked list element even when the 
updated record has a smaller size
+ *  than the original, because otherwise we wouldn't know where the next 
record starts during a reinsertion
+ *  sweep.
+ *
+ *  The number of buckets depends on how large are the records. The 
serializer might be able to tell us this,
+ *  so in this case, we will calculate the number of buckets upfront, and 
won't do resizes.
+ *  If the serializer doesn't know the size, then we start with a small 
number of buckets, and do resizes as more
+ *  elements are inserted than the number of buckets.
+ *
+ *  The number of memory segments given to the staging area is usually 
one, because it just needs to hold
+ *  one record.
   

[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...

2016-02-10 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r52460698
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/AbstractBlockResettableIterator.java
 ---
@@ -83,6 +83,7 @@ protected 
AbstractBlockResettableIterator(TypeSerializer serializer, MemoryMa
this.collectingView = new 
SimpleCollectingOutputView(this.fullSegments, 
new 
ListMemorySegmentSource(this.emptySegments), memoryManager.getPageSize());
this.readView = new RandomAccessInputView(this.fullSegments, 
memoryManager.getPageSize());
+   this.readView.setReadPosition(0);
--- End diff --

ok


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...

2016-02-10 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r52461392
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java
 ---
@@ -0,0 +1,1014 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.hash;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.SameTypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.RandomAccessInputView;
+import org.apache.flink.runtime.memory.AbstractPagedOutputView;
+import org.apache.flink.runtime.util.MathUtils;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * This hash table supports updating elements, and it also has 
processRecordWithReduce,
+ * which makes one reduce step with the given record.
+ *
+ * The memory is divided into three areas:
+ *  - Bucket area: they contain bucket heads:
+ *an 8 byte pointer to the first link of a linked list in the record 
area
+ *  - Record area: this contains the actual data in linked list elements. 
A linked list element starts
+ *with an 8 byte pointer to the next element, and then the record 
follows.
+ *  - Staging area: This is a small, temporary storage area for writing 
updated records. This is needed,
+ *because before serializing a record, there is no way to know in 
advance how large will it be.
+ *Therefore, we can't serialize directly into the record area when we 
are doing an update, because
+ *if it turns out to be larger then the old record, then it would 
override some other record
+ *that happens to be after the old one in memory. The solution is to 
serialize to the staging area first,
+ *and then copy it to the place of the original if it has the same 
size, otherwise allocate a new linked
+ *list element at the end of the record area, and mark the old one as 
abandoned. This creates "holes" in
+ *the record area, so compactions are eventually needed.
+ *
+ *  Compaction happens by deleting everything in the bucket area, and then 
reinserting all elements.
+ *  The reinsertion happens by forgetting the structure (the linked lists) 
of the record area, and reading it
+ *  sequentially, and inserting all non-abandoned records, starting from 
the beginning of the record area.
+ *  Note, that insertions never override a record that have not been read 
by the reinsertion sweep, because
+ *  both the insertions and readings happen sequentially in the record 
area, and the insertions obviously
+ *  never overtake the reading sweep.
+ *
+ *  Note: we have to abandon the old linked list element even when the 
updated record has a smaller size
+ *  than the original, because otherwise we wouldn't know where the next 
record starts during a reinsertion
+ *  sweep.
+ *
+ *  The number of buckets depends on how large are the records. The 
serializer might be able to tell us this,
+ *  so in this case, we will calculate the number of buckets upfront, and 
won't do resizes.
+ *  If the serializer doesn't know the size, then we start with a small 
number of buckets, and do resizes as more
+ *  elements are inserted than the number of buckets.
+ *
+ *  The number of memory segments given to the staging area is usually 
one, because it just needs to hold
+ *  one record.

[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...

2016-02-10 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r52467783
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReduceHashTableTest.java
 ---
@@ -0,0 +1,501 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.hash;
+
+import com.google.common.collect.Ordering;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.util.CopyingListCollector;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongComparator;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.runtime.TupleComparator;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import 
org.apache.flink.runtime.operators.testutils.UniformStringPairGenerator;
+import org.apache.flink.runtime.operators.testutils.types.*;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.junit.Test;
+
+import java.util.*;
+
+import static org.junit.Assert.*;
+import static org.junit.Assert.fail;
+
+public class ReduceHashTableTest {
+
+   private static final long RANDOM_SEED = 58723953465322L;
+
+   private static final int PAGE_SIZE = 16 * 1024;
+
+   private final TypeSerializer> serializer;
+   private final TypeComparator> comparator;
+
+   private final TypeComparator probeComparator;
+
+   private final TypePairComparator> 
pairComparator;
+
+   //
+   // -- Note: This part was mostly copied from 
CompactingHashTableTest --
+
+   public ReduceHashTableTest() {
+   TypeSerializer[] fieldSerializers = { 
LongSerializer.INSTANCE, StringSerializer.INSTANCE };
+   @SuppressWarnings("unchecked")
+   Class> clazz = (Class>) (Class) Tuple2.class;
+   this.serializer = new TupleSerializer>(clazz, fieldSerializers);
+
+   TypeComparator[] comparators = { new LongComparator(true) };
+   TypeSerializer[] comparatorSerializers = { 
LongSerializer.INSTANCE };
+
+   this.comparator = new TupleComparator>(new 
int[] {0}, comparators, comparatorSerializers);
+
+   this.probeComparator = new LongComparator(true);
+
+   this.pairComparator = new TypePairComparator>() {
+
+   private long ref;
+
+   @Override
+   public void setReference(Long reference) {
+   ref = reference;
+   }
+
+   @Override
+   public boolean equalToReference(Tuple2 
candidate) {
+   //noinspection UnnecessaryUnboxing
+   return candidate.f0.longValue() == ref;
+   }
+
+   @Override
+   public int compareToReference(Tuple2 
candidate) {
+   long x = ref;
+   long y = candidate.f0;
+   return (x < y) ? -1 : ((x == y) ? 0 : 1);
+   }
+   };
+   }
+
+   @Test
+   public void testHashTableGrowthWithInsert() 

[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...

2016-02-10 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r52461679
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java
 ---
@@ -0,0 +1,1014 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.hash;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.SameTypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.RandomAccessInputView;
+import org.apache.flink.runtime.memory.AbstractPagedOutputView;
+import org.apache.flink.runtime.util.MathUtils;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * This hash table supports updating elements, and it also has 
processRecordWithReduce,
+ * which makes one reduce step with the given record.
+ *
+ * The memory is divided into three areas:
+ *  - Bucket area: they contain bucket heads:
+ *an 8 byte pointer to the first link of a linked list in the record 
area
+ *  - Record area: this contains the actual data in linked list elements. 
A linked list element starts
+ *with an 8 byte pointer to the next element, and then the record 
follows.
+ *  - Staging area: This is a small, temporary storage area for writing 
updated records. This is needed,
+ *because before serializing a record, there is no way to know in 
advance how large will it be.
+ *Therefore, we can't serialize directly into the record area when we 
are doing an update, because
+ *if it turns out to be larger then the old record, then it would 
override some other record
+ *that happens to be after the old one in memory. The solution is to 
serialize to the staging area first,
+ *and then copy it to the place of the original if it has the same 
size, otherwise allocate a new linked
+ *list element at the end of the record area, and mark the old one as 
abandoned. This creates "holes" in
+ *the record area, so compactions are eventually needed.
+ *
+ *  Compaction happens by deleting everything in the bucket area, and then 
reinserting all elements.
+ *  The reinsertion happens by forgetting the structure (the linked lists) 
of the record area, and reading it
+ *  sequentially, and inserting all non-abandoned records, starting from 
the beginning of the record area.
+ *  Note, that insertions never override a record that have not been read 
by the reinsertion sweep, because
+ *  both the insertions and readings happen sequentially in the record 
area, and the insertions obviously
+ *  never overtake the reading sweep.
+ *
+ *  Note: we have to abandon the old linked list element even when the 
updated record has a smaller size
+ *  than the original, because otherwise we wouldn't know where the next 
record starts during a reinsertion
+ *  sweep.
+ *
+ *  The number of buckets depends on how large are the records. The 
serializer might be able to tell us this,
+ *  so in this case, we will calculate the number of buckets upfront, and 
won't do resizes.
+ *  If the serializer doesn't know the size, then we start with a small 
number of buckets, and do resizes as more
+ *  elements are inserted than the number of buckets.
+ *
+ *  The number of memory segments given to the staging area is usually 
one, because it just needs to hold
+ *  one record.

[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...

2016-02-10 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r52467302
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MemoryHashTableTest.java
 ---
@@ -100,340 +101,371 @@
@Test
public void testDifferentProbers() {
final int NUM_MEM_PAGES = 32 * NUM_PAIRS / PAGE_SIZE;
-   
-   AbstractMutableHashTable table = new 
CompactingHashTable(serializer, comparator, getMemory(NUM_MEM_PAGES, 
PAGE_SIZE));
-   
+   testDifferentProbersCore(new CompactingHashTable<>(serializer, 
comparator, getMemory(NUM_MEM_PAGES, PAGE_SIZE)), NUM_MEM_PAGES);
--- End diff --

I can do that, but I would like to ask why is that important. I mean if one 
of these tests fail, then one of the lines can be simply commented out to see 
which hash table is making it fail.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...

2016-02-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r52471181
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReduceHashTableTest.java
 ---
@@ -0,0 +1,501 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.hash;
+
+import com.google.common.collect.Ordering;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.util.CopyingListCollector;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongComparator;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.runtime.TupleComparator;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import 
org.apache.flink.runtime.operators.testutils.UniformStringPairGenerator;
+import org.apache.flink.runtime.operators.testutils.types.*;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.junit.Test;
+
+import java.util.*;
+
+import static org.junit.Assert.*;
+import static org.junit.Assert.fail;
+
+public class ReduceHashTableTest {
+
+   private static final long RANDOM_SEED = 58723953465322L;
+
+   private static final int PAGE_SIZE = 16 * 1024;
+
+   private final TypeSerializer> serializer;
+   private final TypeComparator> comparator;
+
+   private final TypeComparator probeComparator;
+
+   private final TypePairComparator> 
pairComparator;
+
+   //
+   // -- Note: This part was mostly copied from 
CompactingHashTableTest --
+
+   public ReduceHashTableTest() {
+   TypeSerializer[] fieldSerializers = { 
LongSerializer.INSTANCE, StringSerializer.INSTANCE };
+   @SuppressWarnings("unchecked")
+   Class> clazz = (Class>) (Class) Tuple2.class;
+   this.serializer = new TupleSerializer>(clazz, fieldSerializers);
+
+   TypeComparator[] comparators = { new LongComparator(true) };
+   TypeSerializer[] comparatorSerializers = { 
LongSerializer.INSTANCE };
+
+   this.comparator = new TupleComparator>(new 
int[] {0}, comparators, comparatorSerializers);
+
+   this.probeComparator = new LongComparator(true);
+
+   this.pairComparator = new TypePairComparator>() {
+
+   private long ref;
+
+   @Override
+   public void setReference(Long reference) {
+   ref = reference;
+   }
+
+   @Override
+   public boolean equalToReference(Tuple2 
candidate) {
+   //noinspection UnnecessaryUnboxing
+   return candidate.f0.longValue() == ref;
+   }
+
+   @Override
+   public int compareToReference(Tuple2 
candidate) {
+   long x = ref;
+   long y = candidate.f0;
+   return (x < y) ? -1 : ((x == y) ? 0 : 1);
+   }
+   };
+   }
+
+   @Test
+   public void 

[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...

2016-02-10 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r52471297
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java
 ---
@@ -0,0 +1,1014 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.hash;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.SameTypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.RandomAccessInputView;
+import org.apache.flink.runtime.memory.AbstractPagedOutputView;
+import org.apache.flink.runtime.util.MathUtils;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * This hash table supports updating elements, and it also has 
processRecordWithReduce,
+ * which makes one reduce step with the given record.
+ *
+ * The memory is divided into three areas:
+ *  - Bucket area: they contain bucket heads:
+ *an 8 byte pointer to the first link of a linked list in the record 
area
+ *  - Record area: this contains the actual data in linked list elements. 
A linked list element starts
+ *with an 8 byte pointer to the next element, and then the record 
follows.
+ *  - Staging area: This is a small, temporary storage area for writing 
updated records. This is needed,
+ *because before serializing a record, there is no way to know in 
advance how large will it be.
+ *Therefore, we can't serialize directly into the record area when we 
are doing an update, because
+ *if it turns out to be larger then the old record, then it would 
override some other record
+ *that happens to be after the old one in memory. The solution is to 
serialize to the staging area first,
+ *and then copy it to the place of the original if it has the same 
size, otherwise allocate a new linked
+ *list element at the end of the record area, and mark the old one as 
abandoned. This creates "holes" in
+ *the record area, so compactions are eventually needed.
+ *
+ *  Compaction happens by deleting everything in the bucket area, and then 
reinserting all elements.
+ *  The reinsertion happens by forgetting the structure (the linked lists) 
of the record area, and reading it
+ *  sequentially, and inserting all non-abandoned records, starting from 
the beginning of the record area.
+ *  Note, that insertions never override a record that have not been read 
by the reinsertion sweep, because
+ *  both the insertions and readings happen sequentially in the record 
area, and the insertions obviously
+ *  never overtake the reading sweep.
+ *
+ *  Note: we have to abandon the old linked list element even when the 
updated record has a smaller size
+ *  than the original, because otherwise we wouldn't know where the next 
record starts during a reinsertion
+ *  sweep.
+ *
+ *  The number of buckets depends on how large are the records. The 
serializer might be able to tell us this,
+ *  so in this case, we will calculate the number of buckets upfront, and 
won't do resizes.
+ *  If the serializer doesn't know the size, then we start with a small 
number of buckets, and do resizes as more
+ *  elements are inserted than the number of buckets.
+ *
+ *  The number of memory segments given to the staging area is usually 
one, because it just needs to hold
+ *  one record.

[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...

2016-02-10 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r52488669
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MemoryHashTableTest.java
 ---
@@ -100,340 +101,371 @@
@Test
public void testDifferentProbers() {
final int NUM_MEM_PAGES = 32 * NUM_PAIRS / PAGE_SIZE;
-   
-   AbstractMutableHashTable table = new 
CompactingHashTable(serializer, comparator, getMemory(NUM_MEM_PAGES, 
PAGE_SIZE));
-   
+   testDifferentProbersCore(new CompactingHashTable<>(serializer, 
comparator, getMemory(NUM_MEM_PAGES, PAGE_SIZE)), NUM_MEM_PAGES);
--- End diff --

OK, I'll split the top-level methods into two methods, and move these to 
different classes. However, the core of the tests can remain shared, right?

For example, I would split `testBuildAndRetrieve` to two methods, and move 
these to different classes, but both of them would still call the same 
`testBuildAndRetrieveCore`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...

2016-02-10 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1517#issuecomment-182483003
  
The benefit of having multiple partitions is that you do not need to go 
over the complete RecordArea but only over a partition. After the partition is 
compacted or was emitted, it's MemorySegments can be used by other Partitions 
as well. Depending on the strategy to choose the Partition to emit or compact 
(#segments in partition, update count on partition, #unique values in 
partition), this should improve the combine rate (less emitted records from the 
table) and also improve the pipelined behavior of the combiner, because you'll 
get more but much shorter pauses.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...

2016-02-10 Thread ggevay
Github user ggevay commented on the pull request:

https://github.com/apache/flink/pull/1517#issuecomment-182477373
  
> In any case, we need to update the documentation for the added 
CombineHint. I would also be good to extend ReduceITCase with a few tests that 
use CombineHint.HASH.

OK, I'll do these.

> the full stop to resize / rebuild / emit records might take some time, 
depending on the size of the table.

Emit is very fast, because `EntryIterator` is reading the record area 
sequentially. I don't think there is much room for improvement there. (Except 
perhaps your idea of storing the lengths as negative values, and skipping over 
abandoned records :))

__Rebuilds because of compactions:__

Typically, the size of the records don't change, in which case no 
compactions happen.

If the records change size, that's a mess. But even in this case, 
performance should be reduced by less than 2x, because the number of insertions 
done in rebuilds should be proportional to the number of normal update/insert 
operations, and one insert during rebuild is faster then one usual 
insert/update, because the linked lists are not traversed (only the bucket 
segments are accessed randomly; the elements are inserted to the beginning of 
the lists).

__Rebuilds because of resizing__:

If the serializer knows the length of the records in advance, then no 
resizings will happen, because we just calculate the final number of bucket 
segments at the beginning (see `calcInitialNumBucketSegments`). Unfortunately, 
currently the serializers almost never know the length, but I hope this 
situation will improve in the future (for example, see FLINK-3321).

If the serializer doesn't know the length, then how much time is spent with 
resizings is greatly affected by the average number of records per key. This is 
because the time spent with doing resizes is proportional to the number of 
elements inserted, and if there are lots of elements with matching keys, then 
most operations will be updates (rather than insertions). I will probably do 
some performance tests to confirm this, and see how much time is spent in 
resizes in the worst case (when every key has only one element). (Also, the 
"performance should be reduced by less than 2x" argument from above should be 
applicable in this case as well.)

> Do you think it is possible to extract the ReduceFunction from the table? 

This should be doable, I'll try.

> Should we think about linear hashing for resizing the table.

Thanks, I didn't know about this algorithm. I'll think about this.

> Split the single record into multiple partitions (i.e., use multiple 
RecordAreas). Each partition holds the data of multiple buckets. This allows to 
restrict rebuilding, compaction, etc. to only a part of the whole table.

Do you think that this would have a performance benefit? The time of a 
rebuild is linear with the size, so I don't see how doing it in chunks would 
improve performance. (I mean, the (frequency of rebuilds of a partition) / 
(frequency of rebuilds of the full table) is probably the same ratio as the 
(time to rebuild a partition) / (time to rebuild the full table).)

> It can also help to make your implementation more suitable for the final 
reduce case

Yes, some partitioning will be needed for that. However, I would like to 
get this in first, and then I'll design the implementation of that case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...

2016-02-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r52489247
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MemoryHashTableTest.java
 ---
@@ -100,340 +101,371 @@
@Test
public void testDifferentProbers() {
final int NUM_MEM_PAGES = 32 * NUM_PAIRS / PAGE_SIZE;
-   
-   AbstractMutableHashTable table = new 
CompactingHashTable(serializer, comparator, getMemory(NUM_MEM_PAGES, 
PAGE_SIZE));
-   
+   testDifferentProbersCore(new CompactingHashTable<>(serializer, 
comparator, getMemory(NUM_MEM_PAGES, PAGE_SIZE)), NUM_MEM_PAGES);
--- End diff --

Yes, that's what I had in mind.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...

2016-02-10 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r52488021
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/manual/ReducePerformance.java 
---
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.manual;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.util.SplittableIterator;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.Random;
+
+public class ReducePerformance {
--- End diff --

OK, I'll do this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...

2016-02-09 Thread ggevay
Github user ggevay commented on the pull request:

https://github.com/apache/flink/pull/1517#issuecomment-181907722
  
What I'm not sure about is the `closeUserCode` call in 
`ChainedReduceCombineDriver.closeTask`. Those other chained drivers that have a 
`running` flag for indicating canceling, make this call only when the driver 
was not canceled. But those other chained drivers where there is no `running` 
flag seem to make this call also when they were canceled. What is the reasoning 
behind this situation?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...

2016-02-08 Thread ggevay
Github user ggevay commented on the pull request:

https://github.com/apache/flink/pull/1517#issuecomment-181437507
  
I have added the strategy hint. At the moment, it defaults to sorting, but 
I couldn't actually find any case where hashing is slower, and it is often 
about 2 times faster.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...

2016-01-30 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1517#issuecomment-177204208
  
I agree with @mbalassi. Adding a strategy hint to the API, similar to the 
join hints for inner and outer joins is a good idea. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...

2016-01-29 Thread mbalassi
Github user mbalassi commented on the pull request:

https://github.com/apache/flink/pull/1517#issuecomment-177094172
  
I think exposing the feature through the API similarly to the way join 
strategies are exposed is the way to properly make this feature available for 
users. I would leave the well-tested sorting version the default for now. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...

2016-01-27 Thread ggevay
Github user ggevay commented on the pull request:

https://github.com/apache/flink/pull/1517#issuecomment-175734134
  
I have changed the hash function to include that bit-magic that is done in 
the other hash tables. (Originally, I thought that it is not needed here, 
because I haven't realized that it is not only for diffusing clusters, but also 
to have the high bits affect the low bits of the hashes.)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...

2016-01-25 Thread ggevay
Github user ggevay commented on the pull request:

https://github.com/apache/flink/pull/1517#issuecomment-174492669
  
I have added a commit to this PR that adds the chained version of 
`ReduceCombineDriver`. (It doesn't have the sort-based code, only the 
hash-based.)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...

2016-01-22 Thread ggevay
Github user ggevay commented on the pull request:

https://github.com/apache/flink/pull/1517#issuecomment-173997370
  
Quick question: Is there a reason why there is no chained version of 
ReduceCombineDriver? (GroupReduceDriver and AllReduceDriver both have chained 
versions: SynchronousChainedCombineDriver and ChainedAllReduceDriver.) If it is 
just because no one got around to it yet, then I'll open a JIRA and implement 
it quickly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...

2016-01-22 Thread ggevay
Github user ggevay commented on the pull request:

https://github.com/apache/flink/pull/1517#issuecomment-174012431
  
Thanks! I've found the JIRA: 
https://issues.apache.org/jira/browse/FLINK-2246


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...

2016-01-22 Thread zentol
Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/1517#issuecomment-174010704
  
No one got around to do it in an efficient/safe manner. There is a JIRA for 
it afaik.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...

2016-01-21 Thread ggevay
Github user ggevay commented on the pull request:

https://github.com/apache/flink/pull/1517#issuecomment-173571311
  
I've fixed a minor issue related to closing the table, and improved some 
comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...

2016-01-20 Thread ggevay
Github user ggevay commented on the pull request:

https://github.com/apache/flink/pull/1517#issuecomment-173319246
  
I've added some performance testing code and comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...

2016-01-19 Thread ggevay
Github user ggevay commented on the pull request:

https://github.com/apache/flink/pull/1517#issuecomment-172939642
  
Hello @fhueske, thanks for the comment and shepherding the PR!

> You said, the combiner flushes the full hash table if it runs out of 
memory. Do you think it would be possible to track the update frequency of 
buckets and only flush the bucket with the least updates (or n buckets with 
least updates)? This might improve the performance for skewed input data.

If we flush only some of the elements when the memory is full, then the 
append position of the record area doesn't change, but only some holes appear 
in the record area, so we immediately need to do a compaction. Since the 
compaction traverses the entire table, this idea could only work if we flush a 
substantial amount of the elements (like half of them).

This is an interesting idea, but I'm finding it quite hard to form even 
just an intuitive understanding about its performance effects. I mean doing 
this would have some overhead, and at the moment I totally can't see how much 
skew in the data would make this worth it (or what percentage of the data 
should be flushed at a time, etc.).

I'll think about this some more, but getting right the trade-offs here 
would probably require quite a lot of work: experimentation with different 
variants of the algorithm, with differently skewed data, with different 
`distinct keys / total number of input elements` ratios, so we should probably 
postpone this to some later time, after this basic version is merged.

> I agree that the sort-based strategy for reduce combiners can be removed 
eventually, when the hash-based strategy proves to work well. I would like to 
give it a bit more exposure though, before we drop the code.

OK.

> Porting the built-in aggregation functions, distinct, etc. from 
GroupReduceFunctions to ReduceFunctions sounds good. I think the reason for 
this design decision was that for the sort-based strategies ReduceFunctions did 
not had a benefit over GroupReduceFunctions. Instead they caused more method 
invocations (once for each input record) compared to once per key.

I see, thanks! I'll keep this in mind for now, and open a JIRA when the 
dust settles around the hash-based combiner, and we see the performance 
differences more clearly.

> It would be great if you could open a JIRA and add a short design 
document / API proposal for the changes on the serializers that you talked 
about. This would allow the community to review and discuss your proposal.

OK, I will.

Best,
Gábor


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...

2016-01-18 Thread ggevay
GitHub user ggevay opened a pull request:

https://github.com/apache/flink/pull/1517

[FLINK-2237] [runtime] Add hash-based combiner.

This adds ReduceHashTable, and modifies ReduceCombineDriver to use it 
instead of sorting. When an input element comes in, the driver probes the table 
for the key, and either inserts the element if it is not present, or updates it 
with one reduce step. When memory runs out, all the current elements (partial 
aggregates) in the table are sent to the output.

I haven't yet deleted the code of the sort based solution from the driver, 
but it seems that the hash-based solution is always faster (sometimes by a 
factor of 3), so there isn't a need for dynamically choosing between them. I 
will do some more performance tests to confirm this, and then probably delete 
the sort based code. The hash-based approach also has the advantage that the 
memory used is proportional to the number of different keys, instead of the 
number of input elements.

The interface of ReduceHashTable is a subset of the interface of 
CompactingHashTable, so in theory, it could replace CompactingHashTable 
everywhere (this is only JoinWithSolutionSet and coGroupWithSolutionSet, if I'm 
not mistaken). I haven't actually tried this yet though, but if you think that 
it worth to give it a go, then I will try it, and run some real performance 
tests. I've already added ReduceHashTable to HashTablePerformanceComparison, 
and when there is plenty of memory available, ReduceHashTable is about 5-10% 
faster than CompactingHashTable. It also uses ~15% less memory, and it also has 
the advantage that if the sizes of the records don't change during the updates, 
then no compactings are needed (because records are updated in-place), so 
performance doesn't degrade when we are close to occupying all the memory. (In 
contrast to CompactingHashTable, where updates start getting slow, when there 
is little free memory left.)

For details about the operation of ReduceHashTable, see the comment at the 
beginning of the file.

I've changed the ctor of RandomAccessInputView to not do a seek. (This 
behaviour is more practical for me, because I want to create an instance before 
allocating memory segments to it.) I verified at most places of usage that the 
code does an explicit seek after the ctor, but I couldn't see this in 
AbstractBlockResettableIterator, so I inserted an explicit seek there.

I fixed a small bug in HashTablePerformanceComparison: the *-1 was not 
doing anything, because that value was always 0 there.

The predefined aggregations (eg. DataSet.sum()) currently use groupReduce. 
I'm not sure what was the reason for this originally, but in light of this 
speedup of reduce now, maybe this could be reconsidered. (And the same stands 
for DataSet.distinct().)

Additionally, I have plans for making a different hash table for handling 
the case when the serializer can tell the size of the records. This would allow 
open addressing to be used with linear probing, which would avoid a few cache 
misses per element access, so I'm expecting maybe a 2x speedup from this. (This 
open addressing hash table would actually be simpler than ReduceHashTable.) The 
difficulty with this is that currently the serializers are not very smart in 
this respect (I mean knowing the size), so some improvements would be needed 
for them, but I have some ideas for this.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ggevay/flink hashedReduceSquashed

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1517.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1517


commit d5eb20c10dcc875e6ae14f3298da829a2a4a5d61
Author: Gabor Gevay 
Date:   2015-12-04T14:28:23Z

[FLINK-2237] [runtime] Add hash-based combiner.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---