[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1517#issuecomment-187626905 I created a few sub-issues for the rather generic issue FLINK-2237. FLINK-3477 reflects the changes of this PR. Can you please update the commit message to FLINK-3477. Thanks, Fabian --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1517#issuecomment-187439412 Thanks @aalexandrov. I added a few comments to the document. Looking forward to the results :-) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...
Github user aalexandrov commented on the pull request: https://github.com/apache/flink/pull/1517#issuecomment-187243415 I've added a [Google Doc](https://docs.google.com/document/d/12yx7olVrkooceaQPoR1nkk468lIq0xOObY5ukWuNEcM/edit?usp=sharing) where we can collaborate on the design of the experiments. Once we're fixed on that, we will proceed by implementing them. The code [will be available in the `flink-hashagg-experiments` repository](https://github.com/TU-Berlin-DIMA/flink-hashagg-experiments). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...
Github user aalexandrov commented on the pull request: https://github.com/apache/flink/pull/1517#issuecomment-186257996 @fhueske We will propose three experiments based on your suggestions in a Google Doc on Monday. Once we have fixed the setup we will prepare a Peel bundle an run them on one of the clusters in the lab. I would be also happy to promote Peel by leading a joint blog post for the Peel website together with @ggevay and you if you are interested. I think that the hash-table makes for a perfect use-case. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...
Github user ggevay commented on the pull request: https://github.com/apache/flink/pull/1517#issuecomment-186244959 @fhueske: So far, I have only run benchmarks on my work laptop and home laptop. I have used the newly created `ReducePerformance`, with varying the `(number of elements / number of different keys)` ratio, and also the memory usage (between 250 MB - 6 GB). The hash-based strategy was always faster, up to 3 times, and mostly by a factor of 1.5 - 2.5. I have also used `ReduceHashTableTest.testWithIntPair`. To make this a benchmark test, the lines that deal with the `reference` hash table for checking the correct output should be commented out, and the time measurement commented in. (This only tests the hash table, not end-to-end; and it can't be directly compared with the sort based reduce.) I have also used `HashTablePerformanceComparison` to compare with the other hash tables. This has the disadvantage that the workload here is not characteristic of a reduce (eg. no processRecordWithReduce calls, because the other hash tables don't have that). I have also benchmarked using my game theory stuff (which originally prompted me to start working on this), and the hash-based combiner was faster here as well. > Did you check the combine rate (input / output ratio) compared to the sort-based strategy? I didn't measure it directly, but `ReducePerformance` shows the difference nicely when the number of input elements is large enough that the sort-based strategy has to emit often, but the number of different keys is small enough that the hash-based strategy can fit all the keys in memory. In this case, the speedup of the final reduce phase is striking. > Have you checked heap memory consumption / GC activity compared to the sort-based strategy? I haven't actually checked it, but it should be the same or better as with the sort-based strategy: - enabled object reuse: no allocations at all - disabled object reuse: one allocation per input element, and one allocation per reduce step The sort-based strategy might additionally have allocations inside maybe the sorter and/or MergeIterator. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1517#issuecomment-186247461 Great, thanks for the detailed response @ggevay! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...
Github user ggevay commented on the pull request: https://github.com/apache/flink/pull/1517#issuecomment-186226971 OK, thanks @aalexandrov ! I wanted to try Peel anyway, this is a good opportunity. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...
Github user aalexandrov commented on the pull request: https://github.com/apache/flink/pull/1517#issuecomment-186226154 @ggevay I would be interesting in helping you prepare a [`peel-flink-bundle`](http://peel-framework.org/getting-started.html) for the benchmarks @fhueske mentioned. It will make for a perfect use-case for what Peel is intended and a nice first contribution to the [bundles repository](http://peel-framework.org/repository/bundles.html). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1517#issuecomment-186219363 OK, then lets keep the data in one partition for now. In case of var-length updates, this can default to a memory usage / combine behavior which is somewhat similar to the sort-based strategy: Filling the memory with records and emitting it (putting compaction aside). I'll review the PR once more will run a few end-to-end benchmarks as well. What kind of benchmarks have you done so far? - Did you check the combine rate (input / output ratio) compared to the sort-based strategy? - How much memory did you use for tests (upper bound)? Did you vary the memory? - Have you checked heap memory consumption / GC activity compared to the sort-based strategy? It might take a few more days before I actually get to this, but it is on my list. Thanks, Fabian --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...
Github user ggevay commented on the pull request: https://github.com/apache/flink/pull/1517#issuecomment-184696741 Hello Fabian, I did 3. (https://github.com/apache/flink/commit/ef644821d5417b992cfda366225bd5622faa9b2b), because the machinery for that was already in place (see the condition in `compactOrThrow`). I chose the threshold to be 5%. (This can probably be the same with the solution set case, because if lengths change a lot then we get very slow as memory load gets near the total memory, so it is probably better to indicate the memory problem to the user with an exception than to silently be very slow.) I also did some changes to the tests. For 2., the situation doesn't seem straightforward to me. For example, if there are not many length changes, then exactly the opposite should be done: we should emit from the end of the record area (rather than the beginning), because if there is skew in the data, then the more common keys will appear sooner, so they tend to appear near the beginning of the record area. The other ideas are also interesting, and I would love to experiment with them, but unfortunately I don't really have that much time for this at the moment. So I would suggest to merge the non-partitioned version, and then the partitioned version can be implemented later when I or someone else has a lot of free time on their hands. (Btw., it would be very interesting to try machine learning techniques for dynamically making these decisions that involve complicated trade-offs, based on the actual data: - Have some switches which control these things like - what part of the record area to emit (begin or end; how much) - at how much fragmentation should we do compacting instead of emitting - what load factor should trigger a resize - size of bucket area - how to choose which partition to emit - maybe even do spilling also in the combiner - whether to insert prefetch instructions for the random memory accesses that will probably involve a CPU cache miss (the trade-off here is that then you have to work with multiple consecutive input records at the same time, so you have to do extra copies if object reuse is enabled, which might cost a lot) (I have actually experimented with this a little, and there were 20-35% speedups, if copies are cheap) - ... (it's easy to come up with many more) - Gather some statistics about what is happening, and turn them into features - avg. record size - #keys / #elements ratio - skew - time it takes to serialize a record - time it takes to run the ReduceFunction - ratio of updates that involve size changes - size is changing up or down on average - backpressure - that we are generating - that we get from our outputs (if this is large (eg. because of a saturated network), then we should set the switches to do more aggressive combining) - how many CPU cache misses occur while looking up keys (eg. for recognizing the situation where records with matching keys are often close to each other for some reason) - hash collisions (so that we can start with a more simple hash function (few percent speedup), and change it, if it is bad) - ... (it's easy to come up with many more) - Train some machine learning model which will figure out how to set the switches based on the features I think a pretty good speedup could result from tuning all these things to the actual data at hand. Maybe in a few years, when data flow systems get more mature, then this can become a reality.) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1517#issuecomment-184167321 Hi Gabor, thanks for the updates and additions you made to the PR. If didn't go through them yet, but would like to follow up on the discussion and propose a few potential improvements. I agree that using multiple partitions would make the implementation more complex and the reasoning about the behavior less straightforward. The motivation for partitions is twofold. First, it should helps to maintain a continuous data flow without backpressure effects. Second, it should improve the effectiveness of the combiner by avoiding to evict hot elements which are frequently updated which would happen for each full eviction of the table. Emitting the partition with the least segments wouldn't be a good strategy for the reasons you mentioned. However, a strategy could be to remove the partition with the smallest updates / keys ratio. However, this would require more thoughts⦠Independent of the partitions discussion, I have a few ideas to make the compaction more effective. However, these would break the compatibility with the AbstractMutableHashTable interface. Maybe we can provide a common base class and special implementations for the SolutionSet case and the Combiner case? 1. Remember a pointer to the hole which is closest to the start of the RecordArea (or partition). 2. When compacting, emit all elements which are before the first hole. These elements are "cold" and have not been updated since the last compaction (assuming non-in-place updates, but in-place updates should not be compacted anyways). Compact all elements after the first hole as usual. The benefit of this is to free more segments by compaction which delays (or even completely avoids) full eviction, and thus improving the compaction rate for hot elements. 3. Do only compact if more than (for instance) 50% of space can be freed and do a full eviction otherwise. If there are too few holes in the table, most elements have not been touched (in case of non-in-place updates). Hence, we should just emit everything. We can also think about adding a by-pass switch if the HT observes that the combiner does not help to significantly reduce the data (too many unique keys for not enough memory) but only adds additional overhead. For instance if the number of records is not reduced by 2x, we could simply forward the records to the collector (In case of partitions, this decision could be done on partition level). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...
Github user ggevay commented on the pull request: https://github.com/apache/flink/pull/1517#issuecomment-183896027 I've added some more documentation. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...
Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r52608138 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java --- @@ -0,0 +1,1014 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.hash; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.typeutils.SameTypePairComparator; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.disk.RandomAccessInputView; +import org.apache.flink.runtime.memory.AbstractPagedOutputView; +import org.apache.flink.runtime.util.MathUtils; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.EOFException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * This hash table supports updating elements, and it also has processRecordWithReduce, + * which makes one reduce step with the given record. + * + * The memory is divided into three areas: + * - Bucket area: they contain bucket heads: + *an 8 byte pointer to the first link of a linked list in the record area + * - Record area: this contains the actual data in linked list elements. A linked list element starts + *with an 8 byte pointer to the next element, and then the record follows. + * - Staging area: This is a small, temporary storage area for writing updated records. This is needed, + *because before serializing a record, there is no way to know in advance how large will it be. + *Therefore, we can't serialize directly into the record area when we are doing an update, because + *if it turns out to be larger then the old record, then it would override some other record + *that happens to be after the old one in memory. The solution is to serialize to the staging area first, + *and then copy it to the place of the original if it has the same size, otherwise allocate a new linked + *list element at the end of the record area, and mark the old one as abandoned. This creates "holes" in + *the record area, so compactions are eventually needed. + * + * Compaction happens by deleting everything in the bucket area, and then reinserting all elements. + * The reinsertion happens by forgetting the structure (the linked lists) of the record area, and reading it + * sequentially, and inserting all non-abandoned records, starting from the beginning of the record area. + * Note, that insertions never override a record that have not been read by the reinsertion sweep, because + * both the insertions and readings happen sequentially in the record area, and the insertions obviously + * never overtake the reading sweep. + * + * Note: we have to abandon the old linked list element even when the updated record has a smaller size + * than the original, because otherwise we wouldn't know where the next record starts during a reinsertion + * sweep. + * + * The number of buckets depends on how large are the records. The serializer might be able to tell us this, + * so in this case, we will calculate the number of buckets upfront, and won't do resizes. + * If the serializer doesn't know the size, then we start with a small number of buckets, and do resizes as more + * elements are inserted than the number of buckets. + * + * The number of memory segments given to the staging area is usually one, because it just needs to hold + * one record.
[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...
Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r52608113 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java --- @@ -0,0 +1,1014 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.hash; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.typeutils.SameTypePairComparator; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.disk.RandomAccessInputView; +import org.apache.flink.runtime.memory.AbstractPagedOutputView; +import org.apache.flink.runtime.util.MathUtils; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.EOFException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * This hash table supports updating elements, and it also has processRecordWithReduce, + * which makes one reduce step with the given record. + * + * The memory is divided into three areas: + * - Bucket area: they contain bucket heads: + *an 8 byte pointer to the first link of a linked list in the record area + * - Record area: this contains the actual data in linked list elements. A linked list element starts + *with an 8 byte pointer to the next element, and then the record follows. + * - Staging area: This is a small, temporary storage area for writing updated records. This is needed, + *because before serializing a record, there is no way to know in advance how large will it be. + *Therefore, we can't serialize directly into the record area when we are doing an update, because + *if it turns out to be larger then the old record, then it would override some other record + *that happens to be after the old one in memory. The solution is to serialize to the staging area first, + *and then copy it to the place of the original if it has the same size, otherwise allocate a new linked + *list element at the end of the record area, and mark the old one as abandoned. This creates "holes" in + *the record area, so compactions are eventually needed. + * + * Compaction happens by deleting everything in the bucket area, and then reinserting all elements. + * The reinsertion happens by forgetting the structure (the linked lists) of the record area, and reading it + * sequentially, and inserting all non-abandoned records, starting from the beginning of the record area. + * Note, that insertions never override a record that have not been read by the reinsertion sweep, because + * both the insertions and readings happen sequentially in the record area, and the insertions obviously + * never overtake the reading sweep. + * + * Note: we have to abandon the old linked list element even when the updated record has a smaller size + * than the original, because otherwise we wouldn't know where the next record starts during a reinsertion + * sweep. + * + * The number of buckets depends on how large are the records. The serializer might be able to tell us this, + * so in this case, we will calculate the number of buckets upfront, and won't do resizes. + * If the serializer doesn't know the size, then we start with a small number of buckets, and do resizes as more + * elements are inserted than the number of buckets. + * + * The number of memory segments given to the staging area is usually one, because it just needs to hold + * one record.
[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...
Github user ggevay commented on the pull request: https://github.com/apache/flink/pull/1517#issuecomment-182872207 I'm thinking about partitioning, but again, I'm finding it too difficult to imagine how this algorithm behaves, when we emit/rebuild only some part of the table at a time. In the partitioned case, there seem to be a lot of potential for messing things up in complicated and hard to foresee ways. For example, if I always emit the partition with the least number of segments, then what will actually happen is that I will kinda always emit the same partition (because after an emit, it starts from a "disadvantage" compared to the others), and the others just keep growing until they eat all the memory, so this degenerates into the situation where there isn't a single memory segment left for the smallest one and _bad things_ happen. In contrast, an advantage of the current design is that its performance characteristics are not that hard to grok, and it is relatively straightforward to see how the sizes of the 3 memory regions (bucket segments, record segments, staging segments) relate to each other. Btw. I think I figured out a simple way to do even the final reduce with the spilling without partitioning the memory: when memory is full I write everything to disk, and I partition only at this time, by writing to different files (the same files across multiple spillings, so eg. the second spilling appends to the same files as the first one created), and then when the input ends, I recurse on processing the files one by one (and I start a new level of partitioning). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...
Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r52605362 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/RandomAccessInputView.java --- @@ -45,9 +45,12 @@ public RandomAccessInputView(ArrayList segments, int segmentSize) { this(segments, segmentSize, segmentSize); } - + + /** +* Warning: setReadPosition has to be called before reading. +*/ public RandomAccessInputView(ArrayList segments, int segmentSize, int limitInLastSegment) { --- End diff -- I tried this, but it turned out that there is no nice way to do it, because the two constructors would have the same parameters, so I had to add a dummy parameter. (See https://github.com/apache/flink/commit/508ce22c3167dc6e6e2a4a6b9776b6c79e62698f) Do you think that this ugly solution is worth it, or should I revert to the original? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...
Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r52617426 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java --- @@ -0,0 +1,1014 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.hash; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.typeutils.SameTypePairComparator; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.disk.RandomAccessInputView; +import org.apache.flink.runtime.memory.AbstractPagedOutputView; +import org.apache.flink.runtime.util.MathUtils; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.EOFException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * This hash table supports updating elements, and it also has processRecordWithReduce, + * which makes one reduce step with the given record. + * + * The memory is divided into three areas: + * - Bucket area: they contain bucket heads: + *an 8 byte pointer to the first link of a linked list in the record area + * - Record area: this contains the actual data in linked list elements. A linked list element starts + *with an 8 byte pointer to the next element, and then the record follows. + * - Staging area: This is a small, temporary storage area for writing updated records. This is needed, + *because before serializing a record, there is no way to know in advance how large will it be. + *Therefore, we can't serialize directly into the record area when we are doing an update, because + *if it turns out to be larger then the old record, then it would override some other record + *that happens to be after the old one in memory. The solution is to serialize to the staging area first, + *and then copy it to the place of the original if it has the same size, otherwise allocate a new linked + *list element at the end of the record area, and mark the old one as abandoned. This creates "holes" in + *the record area, so compactions are eventually needed. + * + * Compaction happens by deleting everything in the bucket area, and then reinserting all elements. + * The reinsertion happens by forgetting the structure (the linked lists) of the record area, and reading it + * sequentially, and inserting all non-abandoned records, starting from the beginning of the record area. + * Note, that insertions never override a record that have not been read by the reinsertion sweep, because + * both the insertions and readings happen sequentially in the record area, and the insertions obviously + * never overtake the reading sweep. + * + * Note: we have to abandon the old linked list element even when the updated record has a smaller size + * than the original, because otherwise we wouldn't know where the next record starts during a reinsertion + * sweep. + * + * The number of buckets depends on how large are the records. The serializer might be able to tell us this, + * so in this case, we will calculate the number of buckets upfront, and won't do resizes. + * If the serializer doesn't know the size, then we start with a small number of buckets, and do resizes as more + * elements are inserted than the number of buckets. + * + * The number of memory segments given to the staging area is usually one, because it just needs to hold + * one record.
[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...
Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r52642844 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java --- @@ -52,6 +52,30 @@ */ public class ReduceOperatorBaseextends SingleInputOperator { + /** +* An enumeration of hints, optionally usable to tell the system exactly how to execute the reduce. +*/ + public enum ReduceHint { --- End diff -- https://github.com/ggevay/flink/commit/7c29f78ebbe4e772d55aae8dd7ee0802d2cdcf42 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...
Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r52642960 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java --- @@ -0,0 +1,1014 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.hash; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.typeutils.SameTypePairComparator; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.disk.RandomAccessInputView; +import org.apache.flink.runtime.memory.AbstractPagedOutputView; +import org.apache.flink.runtime.util.MathUtils; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.EOFException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * This hash table supports updating elements, and it also has processRecordWithReduce, + * which makes one reduce step with the given record. + * + * The memory is divided into three areas: + * - Bucket area: they contain bucket heads: + *an 8 byte pointer to the first link of a linked list in the record area + * - Record area: this contains the actual data in linked list elements. A linked list element starts + *with an 8 byte pointer to the next element, and then the record follows. + * - Staging area: This is a small, temporary storage area for writing updated records. This is needed, + *because before serializing a record, there is no way to know in advance how large will it be. + *Therefore, we can't serialize directly into the record area when we are doing an update, because + *if it turns out to be larger then the old record, then it would override some other record + *that happens to be after the old one in memory. The solution is to serialize to the staging area first, + *and then copy it to the place of the original if it has the same size, otherwise allocate a new linked + *list element at the end of the record area, and mark the old one as abandoned. This creates "holes" in + *the record area, so compactions are eventually needed. + * + * Compaction happens by deleting everything in the bucket area, and then reinserting all elements. + * The reinsertion happens by forgetting the structure (the linked lists) of the record area, and reading it + * sequentially, and inserting all non-abandoned records, starting from the beginning of the record area. + * Note, that insertions never override a record that have not been read by the reinsertion sweep, because + * both the insertions and readings happen sequentially in the record area, and the insertions obviously + * never overtake the reading sweep. + * + * Note: we have to abandon the old linked list element even when the updated record has a smaller size + * than the original, because otherwise we wouldn't know where the next record starts during a reinsertion + * sweep. + * + * The number of buckets depends on how large are the records. The serializer might be able to tell us this, + * so in this case, we will calculate the number of buckets upfront, and won't do resizes. + * If the serializer doesn't know the size, then we start with a small number of buckets, and do resizes as more + * elements are inserted than the number of buckets. + * + * The number of memory segments given to the staging area is usually one, because it just needs to hold + * one record.
[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...
Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r52643138 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java --- @@ -0,0 +1,1014 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.hash; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.typeutils.SameTypePairComparator; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.disk.RandomAccessInputView; +import org.apache.flink.runtime.memory.AbstractPagedOutputView; +import org.apache.flink.runtime.util.MathUtils; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.EOFException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * This hash table supports updating elements, and it also has processRecordWithReduce, + * which makes one reduce step with the given record. + * + * The memory is divided into three areas: + * - Bucket area: they contain bucket heads: + *an 8 byte pointer to the first link of a linked list in the record area + * - Record area: this contains the actual data in linked list elements. A linked list element starts + *with an 8 byte pointer to the next element, and then the record follows. + * - Staging area: This is a small, temporary storage area for writing updated records. This is needed, + *because before serializing a record, there is no way to know in advance how large will it be. + *Therefore, we can't serialize directly into the record area when we are doing an update, because + *if it turns out to be larger then the old record, then it would override some other record + *that happens to be after the old one in memory. The solution is to serialize to the staging area first, + *and then copy it to the place of the original if it has the same size, otherwise allocate a new linked + *list element at the end of the record area, and mark the old one as abandoned. This creates "holes" in + *the record area, so compactions are eventually needed. + * + * Compaction happens by deleting everything in the bucket area, and then reinserting all elements. + * The reinsertion happens by forgetting the structure (the linked lists) of the record area, and reading it + * sequentially, and inserting all non-abandoned records, starting from the beginning of the record area. + * Note, that insertions never override a record that have not been read by the reinsertion sweep, because + * both the insertions and readings happen sequentially in the record area, and the insertions obviously + * never overtake the reading sweep. + * + * Note: we have to abandon the old linked list element even when the updated record has a smaller size + * than the original, because otherwise we wouldn't know where the next record starts during a reinsertion + * sweep. + * + * The number of buckets depends on how large are the records. The serializer might be able to tell us this, + * so in this case, we will calculate the number of buckets upfront, and won't do resizes. + * If the serializer doesn't know the size, then we start with a small number of buckets, and do resizes as more + * elements are inserted than the number of buckets. + * + * The number of memory segments given to the staging area is usually one, because it just needs to hold + * one record.
[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...
Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r52643218 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java --- @@ -0,0 +1,1014 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.hash; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.typeutils.SameTypePairComparator; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.disk.RandomAccessInputView; +import org.apache.flink.runtime.memory.AbstractPagedOutputView; +import org.apache.flink.runtime.util.MathUtils; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.EOFException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * This hash table supports updating elements, and it also has processRecordWithReduce, + * which makes one reduce step with the given record. + * + * The memory is divided into three areas: + * - Bucket area: they contain bucket heads: + *an 8 byte pointer to the first link of a linked list in the record area + * - Record area: this contains the actual data in linked list elements. A linked list element starts + *with an 8 byte pointer to the next element, and then the record follows. + * - Staging area: This is a small, temporary storage area for writing updated records. This is needed, + *because before serializing a record, there is no way to know in advance how large will it be. + *Therefore, we can't serialize directly into the record area when we are doing an update, because + *if it turns out to be larger then the old record, then it would override some other record + *that happens to be after the old one in memory. The solution is to serialize to the staging area first, + *and then copy it to the place of the original if it has the same size, otherwise allocate a new linked + *list element at the end of the record area, and mark the old one as abandoned. This creates "holes" in + *the record area, so compactions are eventually needed. + * + * Compaction happens by deleting everything in the bucket area, and then reinserting all elements. + * The reinsertion happens by forgetting the structure (the linked lists) of the record area, and reading it + * sequentially, and inserting all non-abandoned records, starting from the beginning of the record area. + * Note, that insertions never override a record that have not been read by the reinsertion sweep, because + * both the insertions and readings happen sequentially in the record area, and the insertions obviously + * never overtake the reading sweep. + * + * Note: we have to abandon the old linked list element even when the updated record has a smaller size + * than the original, because otherwise we wouldn't know where the next record starts during a reinsertion + * sweep. + * + * The number of buckets depends on how large are the records. The serializer might be able to tell us this, + * so in this case, we will calculate the number of buckets upfront, and won't do resizes. + * If the serializer doesn't know the size, then we start with a small number of buckets, and do resizes as more + * elements are inserted than the number of buckets. + * + * The number of memory segments given to the staging area is usually one, because it just needs to hold + * one record.
[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...
Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r52643336 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReduceHashTableTest.java --- @@ -0,0 +1,501 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.hash; + +import com.google.common.collect.Ordering; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.functions.util.CopyingListCollector; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.LongComparator; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.runtime.TupleComparator; +import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.operators.testutils.UniformStringPairGenerator; +import org.apache.flink.runtime.operators.testutils.types.*; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; +import org.junit.Test; + +import java.util.*; + +import static org.junit.Assert.*; +import static org.junit.Assert.fail; + +public class ReduceHashTableTest { + + private static final long RANDOM_SEED = 58723953465322L; + + private static final int PAGE_SIZE = 16 * 1024; + + private final TypeSerializer> serializer; + private final TypeComparator > comparator; + + private final TypeComparator probeComparator; + + private final TypePairComparator > pairComparator; + + // + // -- Note: This part was mostly copied from CompactingHashTableTest -- + + public ReduceHashTableTest() { + TypeSerializer[] fieldSerializers = { LongSerializer.INSTANCE, StringSerializer.INSTANCE }; + @SuppressWarnings("unchecked") + Class > clazz = (Class >) (Class) Tuple2.class; + this.serializer = new TupleSerializer >(clazz, fieldSerializers); + + TypeComparator[] comparators = { new LongComparator(true) }; + TypeSerializer[] comparatorSerializers = { LongSerializer.INSTANCE }; + + this.comparator = new TupleComparator >(new int[] {0}, comparators, comparatorSerializers); + + this.probeComparator = new LongComparator(true); + + this.pairComparator = new TypePairComparator >() { + + private long ref; + + @Override + public void setReference(Long reference) { + ref = reference; + } + + @Override + public boolean equalToReference(Tuple2 candidate) { + //noinspection UnnecessaryUnboxing + return candidate.f0.longValue() == ref; + } + + @Override + public int compareToReference(Tuple2 candidate) { + long x = ref; + long y = candidate.f0; + return (x < y) ? -1 : ((x == y) ? 0 : 1); + } + }; + } + + @Test + public void testHashTableGrowthWithInsert()
[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...
Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r52643374 --- Diff: flink-tests/src/test/java/org/apache/flink/test/manual/ReducePerformance.java --- @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.manual; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.operators.base.ReduceOperatorBase; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.util.SplittableIterator; + +import java.io.Serializable; +import java.util.Iterator; +import java.util.Random; + +public class ReducePerformance { --- End diff -- https://github.com/ggevay/flink/commit/68c8f1e1468d6efe3649246007c6024028b45c7c --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...
Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r52643279 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MemoryHashTableTest.java --- @@ -100,340 +101,371 @@ @Test public void testDifferentProbers() { final int NUM_MEM_PAGES = 32 * NUM_PAIRS / PAGE_SIZE; - - AbstractMutableHashTable table = new CompactingHashTable(serializer, comparator, getMemory(NUM_MEM_PAGES, PAGE_SIZE)); - + testDifferentProbersCore(new CompactingHashTable<>(serializer, comparator, getMemory(NUM_MEM_PAGES, PAGE_SIZE)), NUM_MEM_PAGES); --- End diff -- https://github.com/ggevay/flink/commit/fdf7abea42c3cd9587d70cac302c19834de37434 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...
Github user ggevay commented on the pull request: https://github.com/apache/flink/pull/1517#issuecomment-182993653 I think I've addressed all the comments, except documenting `CombineHint` (I will do that tomorrow). Thanks again for reviewing the PR! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r52461048 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MemoryHashTableTest.java --- @@ -100,340 +101,371 @@ @Test public void testDifferentProbers() { final int NUM_MEM_PAGES = 32 * NUM_PAIRS / PAGE_SIZE; - - AbstractMutableHashTable table = new CompactingHashTable(serializer, comparator, getMemory(NUM_MEM_PAGES, PAGE_SIZE)); - + testDifferentProbersCore(new CompactingHashTable<>(serializer, comparator, getMemory(NUM_MEM_PAGES, PAGE_SIZE)), NUM_MEM_PAGES); --- End diff -- Tests for different tables should be put into different methods or even better into different test classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r52470915 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTablePerformanceComparison.java --- @@ -102,7 +102,7 @@ public void testCompactingHashMapPerformance() { System.out.println("Starting update..."); start = System.currentTimeMillis(); while(updater.next(target) != null) { - target.setValue(target.getValue()*-1); --- End diff -- Oh yes, sorry. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r52461236 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReduceHashTableTest.java --- @@ -0,0 +1,501 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.hash; + +import com.google.common.collect.Ordering; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.functions.util.CopyingListCollector; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.LongComparator; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.runtime.TupleComparator; +import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.operators.testutils.UniformStringPairGenerator; +import org.apache.flink.runtime.operators.testutils.types.*; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; +import org.junit.Test; + +import java.util.*; + +import static org.junit.Assert.*; +import static org.junit.Assert.fail; + +public class ReduceHashTableTest { + + private static final long RANDOM_SEED = 58723953465322L; + + private static final int PAGE_SIZE = 16 * 1024; + + private final TypeSerializer> serializer; + private final TypeComparator > comparator; + + private final TypeComparator probeComparator; + + private final TypePairComparator > pairComparator; + + // + // -- Note: This part was mostly copied from CompactingHashTableTest -- + + public ReduceHashTableTest() { + TypeSerializer[] fieldSerializers = { LongSerializer.INSTANCE, StringSerializer.INSTANCE }; + @SuppressWarnings("unchecked") + Class > clazz = (Class >) (Class) Tuple2.class; + this.serializer = new TupleSerializer >(clazz, fieldSerializers); + + TypeComparator[] comparators = { new LongComparator(true) }; + TypeSerializer[] comparatorSerializers = { LongSerializer.INSTANCE }; + + this.comparator = new TupleComparator >(new int[] {0}, comparators, comparatorSerializers); + + this.probeComparator = new LongComparator(true); + + this.pairComparator = new TypePairComparator >() { + + private long ref; + + @Override + public void setReference(Long reference) { + ref = reference; + } + + @Override + public boolean equalToReference(Tuple2 candidate) { + //noinspection UnnecessaryUnboxing + return candidate.f0.longValue() == ref; + } + + @Override + public int compareToReference(Tuple2 candidate) { + long x = ref; + long y = candidate.f0; + return (x < y) ? -1 : ((x == y) ? 0 : 1); + } + }; + } + + @Test + public void
[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r52461306 --- Diff: flink-tests/src/test/java/org/apache/flink/test/manual/ReducePerformance.java --- @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.manual; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.operators.base.ReduceOperatorBase; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.util.SplittableIterator; + +import java.io.Serializable; +import java.util.Iterator; +import java.util.Random; + +public class ReducePerformance { --- End diff -- The name of the class is misleading. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r52461281 --- Diff: flink-tests/src/test/java/org/apache/flink/test/manual/ReducePerformance.java --- @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.manual; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.operators.base.ReduceOperatorBase; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.util.SplittableIterator; + +import java.io.Serializable; +import java.util.Iterator; +import java.util.Random; + +public class ReducePerformance { + + public static void main(String[] args) throws Exception { + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + //env.getConfig().enableObjectReuse(); + //env.setParallelism(1); + + @SuppressWarnings("unchecked") + DataSet> output = + env.fromParallelCollection(new SplittableRandomIterator(40 * 1000 * 1000, new TupleIntIntIterator(4 * 1000 * 1000)), + TupleTypeInfo. >getBasicTupleTypeInfo(Integer.class, Integer.class)) + .groupBy("0") + .reduce(new SumReducer(), ReduceOperatorBase.ReduceHint.HASH); + +// DataSet > output = --- End diff -- Please remove commented code. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1517#issuecomment-182398031 Hi @ggevay, sorry it took me very long to review your PR. As I said before, this is a very desirable feature and a solid implementation. I think a few things can be improved. Especially the full stop to resize / rebuild / emit records might take some time, depending on the size of the table. I have the following suggestions / ideas: - Split the single record into multiple partitions (i.e., use multiple `RecordArea`s). Each partition holds the data of multiple buckets. This allows to restrict rebuilding, compaction, etc. to only a part of the whole table. In fact this is what I meant in my initial comment about tracking the updates of buckets (which I confused with partitions...). Partitioning is also used in the other hash tables in Flink. It can also help to make your implementation more suitable for the final reduce case, because it allows to spill individual partitions to disk. In `CompactingHashTable` the max number of partitions is set to `32`. - Should we think about [linear hashing](https://en.wikipedia.org/wiki/Linear_hashing) for resizing the table. This technique grows the table by splitting individual buckets without the need to reorganize the whole table. - Do you think it is possible to extract the ReduceFunction from the table? IMO this would be a cleaner design if we want to use the table instead of the `CompactingHashTable`. What do you think? In any case, we need to update the documentation for the added `CombineHint`. I would also be good to extend `ReduceITCase` with a few tests that use `CombineHint.HASH`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r52471114 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MemoryHashTableTest.java --- @@ -100,340 +101,371 @@ @Test public void testDifferentProbers() { final int NUM_MEM_PAGES = 32 * NUM_PAIRS / PAGE_SIZE; - - AbstractMutableHashTable table = new CompactingHashTable(serializer, comparator, getMemory(NUM_MEM_PAGES, PAGE_SIZE)); - + testDifferentProbersCore(new CompactingHashTable<>(serializer, comparator, getMemory(NUM_MEM_PAGES, PAGE_SIZE)), NUM_MEM_PAGES); --- End diff -- I think it is good practice to separate tests for different components into different test classes. That way you see directly for which component a test failed. I also keeps logic separated. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r52460829 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java --- @@ -0,0 +1,1014 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.hash; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.typeutils.SameTypePairComparator; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.disk.RandomAccessInputView; +import org.apache.flink.runtime.memory.AbstractPagedOutputView; +import org.apache.flink.runtime.util.MathUtils; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.EOFException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * This hash table supports updating elements, and it also has processRecordWithReduce, + * which makes one reduce step with the given record. + * + * The memory is divided into three areas: + * - Bucket area: they contain bucket heads: + *an 8 byte pointer to the first link of a linked list in the record area + * - Record area: this contains the actual data in linked list elements. A linked list element starts + *with an 8 byte pointer to the next element, and then the record follows. + * - Staging area: This is a small, temporary storage area for writing updated records. This is needed, + *because before serializing a record, there is no way to know in advance how large will it be. + *Therefore, we can't serialize directly into the record area when we are doing an update, because + *if it turns out to be larger then the old record, then it would override some other record + *that happens to be after the old one in memory. The solution is to serialize to the staging area first, + *and then copy it to the place of the original if it has the same size, otherwise allocate a new linked + *list element at the end of the record area, and mark the old one as abandoned. This creates "holes" in + *the record area, so compactions are eventually needed. + * + * Compaction happens by deleting everything in the bucket area, and then reinserting all elements. + * The reinsertion happens by forgetting the structure (the linked lists) of the record area, and reading it + * sequentially, and inserting all non-abandoned records, starting from the beginning of the record area. + * Note, that insertions never override a record that have not been read by the reinsertion sweep, because + * both the insertions and readings happen sequentially in the record area, and the insertions obviously + * never overtake the reading sweep. + * + * Note: we have to abandon the old linked list element even when the updated record has a smaller size + * than the original, because otherwise we wouldn't know where the next record starts during a reinsertion + * sweep. + * + * The number of buckets depends on how large are the records. The serializer might be able to tell us this, + * so in this case, we will calculate the number of buckets upfront, and won't do resizes. + * If the serializer doesn't know the size, then we start with a small number of buckets, and do resizes as more + * elements are inserted than the number of buckets. + * + * The number of memory segments given to the staging area is usually one, because it just needs to hold + * one record.
[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r52460943 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTablePerformanceComparison.java --- @@ -102,7 +102,7 @@ public void testCompactingHashMapPerformance() { System.out.println("Starting update..."); start = System.currentTimeMillis(); while(updater.next(target) != null) { - target.setValue(target.getValue()*-1); --- End diff -- Why are you changing these lines? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...
Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r52464062 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java --- @@ -0,0 +1,1014 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.hash; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.typeutils.SameTypePairComparator; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.disk.RandomAccessInputView; +import org.apache.flink.runtime.memory.AbstractPagedOutputView; +import org.apache.flink.runtime.util.MathUtils; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.EOFException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * This hash table supports updating elements, and it also has processRecordWithReduce, + * which makes one reduce step with the given record. + * + * The memory is divided into three areas: + * - Bucket area: they contain bucket heads: + *an 8 byte pointer to the first link of a linked list in the record area + * - Record area: this contains the actual data in linked list elements. A linked list element starts + *with an 8 byte pointer to the next element, and then the record follows. + * - Staging area: This is a small, temporary storage area for writing updated records. This is needed, + *because before serializing a record, there is no way to know in advance how large will it be. + *Therefore, we can't serialize directly into the record area when we are doing an update, because + *if it turns out to be larger then the old record, then it would override some other record + *that happens to be after the old one in memory. The solution is to serialize to the staging area first, + *and then copy it to the place of the original if it has the same size, otherwise allocate a new linked + *list element at the end of the record area, and mark the old one as abandoned. This creates "holes" in + *the record area, so compactions are eventually needed. + * + * Compaction happens by deleting everything in the bucket area, and then reinserting all elements. + * The reinsertion happens by forgetting the structure (the linked lists) of the record area, and reading it + * sequentially, and inserting all non-abandoned records, starting from the beginning of the record area. + * Note, that insertions never override a record that have not been read by the reinsertion sweep, because + * both the insertions and readings happen sequentially in the record area, and the insertions obviously + * never overtake the reading sweep. + * + * Note: we have to abandon the old linked list element even when the updated record has a smaller size + * than the original, because otherwise we wouldn't know where the next record starts during a reinsertion + * sweep. + * + * The number of buckets depends on how large are the records. The serializer might be able to tell us this, + * so in this case, we will calculate the number of buckets upfront, and won't do resizes. + * If the serializer doesn't know the size, then we start with a small number of buckets, and do resizes as more + * elements are inserted than the number of buckets. + * + * The number of memory segments given to the staging area is usually one, because it just needs to hold + * one record.
[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r52469979 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java --- @@ -0,0 +1,1014 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.hash; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.typeutils.SameTypePairComparator; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.disk.RandomAccessInputView; +import org.apache.flink.runtime.memory.AbstractPagedOutputView; +import org.apache.flink.runtime.util.MathUtils; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.EOFException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * This hash table supports updating elements, and it also has processRecordWithReduce, + * which makes one reduce step with the given record. + * + * The memory is divided into three areas: + * - Bucket area: they contain bucket heads: + *an 8 byte pointer to the first link of a linked list in the record area + * - Record area: this contains the actual data in linked list elements. A linked list element starts + *with an 8 byte pointer to the next element, and then the record follows. + * - Staging area: This is a small, temporary storage area for writing updated records. This is needed, + *because before serializing a record, there is no way to know in advance how large will it be. + *Therefore, we can't serialize directly into the record area when we are doing an update, because + *if it turns out to be larger then the old record, then it would override some other record + *that happens to be after the old one in memory. The solution is to serialize to the staging area first, + *and then copy it to the place of the original if it has the same size, otherwise allocate a new linked + *list element at the end of the record area, and mark the old one as abandoned. This creates "holes" in + *the record area, so compactions are eventually needed. + * + * Compaction happens by deleting everything in the bucket area, and then reinserting all elements. + * The reinsertion happens by forgetting the structure (the linked lists) of the record area, and reading it + * sequentially, and inserting all non-abandoned records, starting from the beginning of the record area. + * Note, that insertions never override a record that have not been read by the reinsertion sweep, because + * both the insertions and readings happen sequentially in the record area, and the insertions obviously + * never overtake the reading sweep. + * + * Note: we have to abandon the old linked list element even when the updated record has a smaller size + * than the original, because otherwise we wouldn't know where the next record starts during a reinsertion + * sweep. + * + * The number of buckets depends on how large are the records. The serializer might be able to tell us this, + * so in this case, we will calculate the number of buckets upfront, and won't do resizes. + * If the serializer doesn't know the size, then we start with a small number of buckets, and do resizes as more + * elements are inserted than the number of buckets. + * + * The number of memory segments given to the staging area is usually one, because it just needs to hold + * one record.
[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r52459151 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/AbstractBlockResettableIterator.java --- @@ -83,6 +83,7 @@ protected AbstractBlockResettableIterator(TypeSerializer serializer, MemoryMa this.collectingView = new SimpleCollectingOutputView(this.fullSegments, new ListMemorySegmentSource(this.emptySegments), memoryManager.getPageSize()); this.readView = new RandomAccessInputView(this.fullSegments, memoryManager.getPageSize()); + this.readView.setReadPosition(0); --- End diff -- Not needed if additional constructor is added to `RandomAccessInputView` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...
Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r52465416 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java --- @@ -0,0 +1,1014 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.hash; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.typeutils.SameTypePairComparator; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.disk.RandomAccessInputView; +import org.apache.flink.runtime.memory.AbstractPagedOutputView; +import org.apache.flink.runtime.util.MathUtils; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.EOFException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * This hash table supports updating elements, and it also has processRecordWithReduce, + * which makes one reduce step with the given record. + * + * The memory is divided into three areas: + * - Bucket area: they contain bucket heads: + *an 8 byte pointer to the first link of a linked list in the record area + * - Record area: this contains the actual data in linked list elements. A linked list element starts + *with an 8 byte pointer to the next element, and then the record follows. + * - Staging area: This is a small, temporary storage area for writing updated records. This is needed, + *because before serializing a record, there is no way to know in advance how large will it be. + *Therefore, we can't serialize directly into the record area when we are doing an update, because + *if it turns out to be larger then the old record, then it would override some other record + *that happens to be after the old one in memory. The solution is to serialize to the staging area first, + *and then copy it to the place of the original if it has the same size, otherwise allocate a new linked + *list element at the end of the record area, and mark the old one as abandoned. This creates "holes" in + *the record area, so compactions are eventually needed. + * + * Compaction happens by deleting everything in the bucket area, and then reinserting all elements. + * The reinsertion happens by forgetting the structure (the linked lists) of the record area, and reading it + * sequentially, and inserting all non-abandoned records, starting from the beginning of the record area. + * Note, that insertions never override a record that have not been read by the reinsertion sweep, because + * both the insertions and readings happen sequentially in the record area, and the insertions obviously + * never overtake the reading sweep. + * + * Note: we have to abandon the old linked list element even when the updated record has a smaller size + * than the original, because otherwise we wouldn't know where the next record starts during a reinsertion + * sweep. + * + * The number of buckets depends on how large are the records. The serializer might be able to tell us this, + * so in this case, we will calculate the number of buckets upfront, and won't do resizes. + * If the serializer doesn't know the size, then we start with a small number of buckets, and do resizes as more + * elements are inserted than the number of buckets. + * + * The number of memory segments given to the staging area is usually one, because it just needs to hold + * one record.
[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r52470557 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java --- @@ -0,0 +1,1014 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.hash; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.typeutils.SameTypePairComparator; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.disk.RandomAccessInputView; +import org.apache.flink.runtime.memory.AbstractPagedOutputView; +import org.apache.flink.runtime.util.MathUtils; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.EOFException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * This hash table supports updating elements, and it also has processRecordWithReduce, + * which makes one reduce step with the given record. + * + * The memory is divided into three areas: + * - Bucket area: they contain bucket heads: + *an 8 byte pointer to the first link of a linked list in the record area + * - Record area: this contains the actual data in linked list elements. A linked list element starts + *with an 8 byte pointer to the next element, and then the record follows. + * - Staging area: This is a small, temporary storage area for writing updated records. This is needed, + *because before serializing a record, there is no way to know in advance how large will it be. + *Therefore, we can't serialize directly into the record area when we are doing an update, because + *if it turns out to be larger then the old record, then it would override some other record + *that happens to be after the old one in memory. The solution is to serialize to the staging area first, + *and then copy it to the place of the original if it has the same size, otherwise allocate a new linked + *list element at the end of the record area, and mark the old one as abandoned. This creates "holes" in + *the record area, so compactions are eventually needed. + * + * Compaction happens by deleting everything in the bucket area, and then reinserting all elements. + * The reinsertion happens by forgetting the structure (the linked lists) of the record area, and reading it + * sequentially, and inserting all non-abandoned records, starting from the beginning of the record area. + * Note, that insertions never override a record that have not been read by the reinsertion sweep, because + * both the insertions and readings happen sequentially in the record area, and the insertions obviously + * never overtake the reading sweep. + * + * Note: we have to abandon the old linked list element even when the updated record has a smaller size + * than the original, because otherwise we wouldn't know where the next record starts during a reinsertion + * sweep. + * + * The number of buckets depends on how large are the records. The serializer might be able to tell us this, + * so in this case, we will calculate the number of buckets upfront, and won't do resizes. + * If the serializer doesn't know the size, then we start with a small number of buckets, and do resizes as more + * elements are inserted than the number of buckets. + * + * The number of memory segments given to the staging area is usually one, because it just needs to hold + * one record.
[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...
Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r52472588 --- Diff: flink-tests/src/test/java/org/apache/flink/test/manual/ReducePerformance.java --- @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.manual; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.operators.base.ReduceOperatorBase; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.util.SplittableIterator; + +import java.io.Serializable; +import java.util.Iterator; +import java.util.Random; + +public class ReducePerformance { --- End diff -- I wanted this test to be a performance comparison between the sort-based and hash-based combines. The way I was using it is to run it unmodified, note the time, then modify the hint to sorting, and run it again. But I'll modify it to make it more convenient to run both of them. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r52460226 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java --- @@ -0,0 +1,1014 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.hash; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.typeutils.SameTypePairComparator; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.disk.RandomAccessInputView; +import org.apache.flink.runtime.memory.AbstractPagedOutputView; +import org.apache.flink.runtime.util.MathUtils; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.EOFException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * This hash table supports updating elements, and it also has processRecordWithReduce, + * which makes one reduce step with the given record. + * + * The memory is divided into three areas: + * - Bucket area: they contain bucket heads: + *an 8 byte pointer to the first link of a linked list in the record area + * - Record area: this contains the actual data in linked list elements. A linked list element starts + *with an 8 byte pointer to the next element, and then the record follows. + * - Staging area: This is a small, temporary storage area for writing updated records. This is needed, + *because before serializing a record, there is no way to know in advance how large will it be. + *Therefore, we can't serialize directly into the record area when we are doing an update, because + *if it turns out to be larger then the old record, then it would override some other record + *that happens to be after the old one in memory. The solution is to serialize to the staging area first, + *and then copy it to the place of the original if it has the same size, otherwise allocate a new linked + *list element at the end of the record area, and mark the old one as abandoned. This creates "holes" in + *the record area, so compactions are eventually needed. + * + * Compaction happens by deleting everything in the bucket area, and then reinserting all elements. + * The reinsertion happens by forgetting the structure (the linked lists) of the record area, and reading it + * sequentially, and inserting all non-abandoned records, starting from the beginning of the record area. + * Note, that insertions never override a record that have not been read by the reinsertion sweep, because + * both the insertions and readings happen sequentially in the record area, and the insertions obviously + * never overtake the reading sweep. + * + * Note: we have to abandon the old linked list element even when the updated record has a smaller size + * than the original, because otherwise we wouldn't know where the next record starts during a reinsertion + * sweep. + * + * The number of buckets depends on how large are the records. The serializer might be able to tell us this, + * so in this case, we will calculate the number of buckets upfront, and won't do resizes. + * If the serializer doesn't know the size, then we start with a small number of buckets, and do resizes as more + * elements are inserted than the number of buckets. + * + * The number of memory segments given to the staging area is usually one, because it just needs to hold + * one record.
[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...
Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r52463167 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java --- @@ -0,0 +1,1014 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.hash; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.typeutils.SameTypePairComparator; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.disk.RandomAccessInputView; +import org.apache.flink.runtime.memory.AbstractPagedOutputView; +import org.apache.flink.runtime.util.MathUtils; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.EOFException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * This hash table supports updating elements, and it also has processRecordWithReduce, + * which makes one reduce step with the given record. + * + * The memory is divided into three areas: + * - Bucket area: they contain bucket heads: + *an 8 byte pointer to the first link of a linked list in the record area + * - Record area: this contains the actual data in linked list elements. A linked list element starts + *with an 8 byte pointer to the next element, and then the record follows. + * - Staging area: This is a small, temporary storage area for writing updated records. This is needed, + *because before serializing a record, there is no way to know in advance how large will it be. + *Therefore, we can't serialize directly into the record area when we are doing an update, because + *if it turns out to be larger then the old record, then it would override some other record + *that happens to be after the old one in memory. The solution is to serialize to the staging area first, + *and then copy it to the place of the original if it has the same size, otherwise allocate a new linked + *list element at the end of the record area, and mark the old one as abandoned. This creates "holes" in + *the record area, so compactions are eventually needed. + * + * Compaction happens by deleting everything in the bucket area, and then reinserting all elements. + * The reinsertion happens by forgetting the structure (the linked lists) of the record area, and reading it + * sequentially, and inserting all non-abandoned records, starting from the beginning of the record area. + * Note, that insertions never override a record that have not been read by the reinsertion sweep, because + * both the insertions and readings happen sequentially in the record area, and the insertions obviously + * never overtake the reading sweep. + * + * Note: we have to abandon the old linked list element even when the updated record has a smaller size + * than the original, because otherwise we wouldn't know where the next record starts during a reinsertion + * sweep. + * + * The number of buckets depends on how large are the records. The serializer might be able to tell us this, + * so in this case, we will calculate the number of buckets upfront, and won't do resizes. + * If the serializer doesn't know the size, then we start with a small number of buckets, and do resizes as more + * elements are inserted than the number of buckets. + * + * The number of memory segments given to the staging area is usually one, because it just needs to hold + * one record.
[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...
Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r52468711 --- Diff: flink-tests/src/test/java/org/apache/flink/test/manual/ReducePerformance.java --- @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.manual; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.operators.base.ReduceOperatorBase; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.util.SplittableIterator; + +import java.io.Serializable; +import java.util.Iterator; +import java.util.Random; + +public class ReducePerformance { --- End diff -- I'm not sure what you mean. Should it be ReducePerformanceTest? Or CombinePerformance? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r52473636 --- Diff: flink-tests/src/test/java/org/apache/flink/test/manual/ReducePerformance.java --- @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.manual; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.operators.base.ReduceOperatorBase; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.util.SplittableIterator; + +import java.io.Serializable; +import java.util.Iterator; +import java.util.Random; + +public class ReducePerformance { --- End diff -- Ah, OK. Why not add two private methods. One for each strategy and call them from `main()`. In this case, `ReduceBenchmark` is fine. We had also other performance benchmarks but (temporarily) removed them due to licensing reasons (see FLINK-2973). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r52458969 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java --- @@ -52,6 +52,30 @@ */ public class ReduceOperatorBaseextends SingleInputOperator { + /** +* An enumeration of hints, optionally usable to tell the system exactly how to execute the reduce. +*/ + public enum ReduceHint { --- End diff -- Should be renamed to `CombineHint` and all corresponding methods and member variables as well. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r52459830 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java --- @@ -0,0 +1,1014 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.hash; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.typeutils.SameTypePairComparator; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.disk.RandomAccessInputView; +import org.apache.flink.runtime.memory.AbstractPagedOutputView; +import org.apache.flink.runtime.util.MathUtils; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.EOFException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * This hash table supports updating elements, and it also has processRecordWithReduce, + * which makes one reduce step with the given record. + * + * The memory is divided into three areas: + * - Bucket area: they contain bucket heads: + *an 8 byte pointer to the first link of a linked list in the record area + * - Record area: this contains the actual data in linked list elements. A linked list element starts + *with an 8 byte pointer to the next element, and then the record follows. + * - Staging area: This is a small, temporary storage area for writing updated records. This is needed, + *because before serializing a record, there is no way to know in advance how large will it be. + *Therefore, we can't serialize directly into the record area when we are doing an update, because + *if it turns out to be larger then the old record, then it would override some other record + *that happens to be after the old one in memory. The solution is to serialize to the staging area first, + *and then copy it to the place of the original if it has the same size, otherwise allocate a new linked + *list element at the end of the record area, and mark the old one as abandoned. This creates "holes" in + *the record area, so compactions are eventually needed. + * + * Compaction happens by deleting everything in the bucket area, and then reinserting all elements. + * The reinsertion happens by forgetting the structure (the linked lists) of the record area, and reading it + * sequentially, and inserting all non-abandoned records, starting from the beginning of the record area. + * Note, that insertions never override a record that have not been read by the reinsertion sweep, because + * both the insertions and readings happen sequentially in the record area, and the insertions obviously + * never overtake the reading sweep. + * + * Note: we have to abandon the old linked list element even when the updated record has a smaller size + * than the original, because otherwise we wouldn't know where the next record starts during a reinsertion + * sweep. + * + * The number of buckets depends on how large are the records. The serializer might be able to tell us this, + * so in this case, we will calculate the number of buckets upfront, and won't do resizes. + * If the serializer doesn't know the size, then we start with a small number of buckets, and do resizes as more + * elements are inserted than the number of buckets. + * + * The number of memory segments given to the staging area is usually one, because it just needs to hold + * one record.
[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r52460590 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java --- @@ -0,0 +1,1014 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.hash; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.typeutils.SameTypePairComparator; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.disk.RandomAccessInputView; +import org.apache.flink.runtime.memory.AbstractPagedOutputView; +import org.apache.flink.runtime.util.MathUtils; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.EOFException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * This hash table supports updating elements, and it also has processRecordWithReduce, + * which makes one reduce step with the given record. + * + * The memory is divided into three areas: + * - Bucket area: they contain bucket heads: + *an 8 byte pointer to the first link of a linked list in the record area + * - Record area: this contains the actual data in linked list elements. A linked list element starts + *with an 8 byte pointer to the next element, and then the record follows. + * - Staging area: This is a small, temporary storage area for writing updated records. This is needed, + *because before serializing a record, there is no way to know in advance how large will it be. + *Therefore, we can't serialize directly into the record area when we are doing an update, because + *if it turns out to be larger then the old record, then it would override some other record + *that happens to be after the old one in memory. The solution is to serialize to the staging area first, + *and then copy it to the place of the original if it has the same size, otherwise allocate a new linked + *list element at the end of the record area, and mark the old one as abandoned. This creates "holes" in + *the record area, so compactions are eventually needed. + * + * Compaction happens by deleting everything in the bucket area, and then reinserting all elements. + * The reinsertion happens by forgetting the structure (the linked lists) of the record area, and reading it + * sequentially, and inserting all non-abandoned records, starting from the beginning of the record area. + * Note, that insertions never override a record that have not been read by the reinsertion sweep, because + * both the insertions and readings happen sequentially in the record area, and the insertions obviously + * never overtake the reading sweep. + * + * Note: we have to abandon the old linked list element even when the updated record has a smaller size + * than the original, because otherwise we wouldn't know where the next record starts during a reinsertion + * sweep. + * + * The number of buckets depends on how large are the records. The serializer might be able to tell us this, + * so in this case, we will calculate the number of buckets upfront, and won't do resizes. + * If the serializer doesn't know the size, then we start with a small number of buckets, and do resizes as more + * elements are inserted than the number of buckets. + * + * The number of memory segments given to the staging area is usually one, because it just needs to hold + * one record.
[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...
Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r52460659 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java --- @@ -52,6 +52,30 @@ */ public class ReduceOperatorBaseextends SingleInputOperator { + /** +* An enumeration of hints, optionally usable to tell the system exactly how to execute the reduce. +*/ + public enum ReduceHint { --- End diff -- Ah, of course, you are right. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...
Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r52460684 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/RandomAccessInputView.java --- @@ -45,9 +45,12 @@ public RandomAccessInputView(ArrayList segments, int segmentSize) { this(segments, segmentSize, segmentSize); } - + + /** +* Warning: setReadPosition has to be called before reading. +*/ public RandomAccessInputView(ArrayList segments, int segmentSize, int limitInLastSegment) { --- End diff -- ok, will do --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...
Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r52468223 --- Diff: flink-tests/src/test/java/org/apache/flink/test/manual/ReducePerformance.java --- @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.manual; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.operators.base.ReduceOperatorBase; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.util.SplittableIterator; + +import java.io.Serializable; +import java.util.Iterator; +import java.util.Random; + +public class ReducePerformance { + + public static void main(String[] args) throws Exception { + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + //env.getConfig().enableObjectReuse(); + //env.setParallelism(1); + + @SuppressWarnings("unchecked") + DataSet> output = + env.fromParallelCollection(new SplittableRandomIterator(40 * 1000 * 1000, new TupleIntIntIterator(4 * 1000 * 1000)), + TupleTypeInfo. >getBasicTupleTypeInfo(Integer.class, Integer.class)) + .groupBy("0") + .reduce(new SumReducer(), ReduceOperatorBase.ReduceHint.HASH); + +// DataSet > output = --- End diff -- OK, sorry, I'll split the two versions to different methods. The purpose here was to comment this in and comment out the similar part above when someone wants to run the test with `TupleStringIntIterator` instead of `TupleIntIntIterator`, but I realize now that this is not a good way to do this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r52471392 --- Diff: flink-tests/src/test/java/org/apache/flink/test/manual/ReducePerformance.java --- @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.manual; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.operators.base.ReduceOperatorBase; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.util.SplittableIterator; + +import java.io.Serializable; +import java.util.Iterator; +import java.util.Random; + +public class ReducePerformance { --- End diff -- How about something like `HashCombinedReduceBenchmark`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r52459053 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/RandomAccessInputView.java --- @@ -45,9 +45,12 @@ public RandomAccessInputView(ArrayList segments, int segmentSize) { this(segments, segmentSize, segmentSize); } - + + /** +* Warning: setReadPosition has to be called before reading. +*/ public RandomAccessInputView(ArrayList segments, int segmentSize, int limitInLastSegment) { --- End diff -- Can you add a new constructor instead of modifying the behavior of this one? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r52460049 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java --- @@ -0,0 +1,1014 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.hash; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.typeutils.SameTypePairComparator; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.disk.RandomAccessInputView; +import org.apache.flink.runtime.memory.AbstractPagedOutputView; +import org.apache.flink.runtime.util.MathUtils; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.EOFException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * This hash table supports updating elements, and it also has processRecordWithReduce, + * which makes one reduce step with the given record. + * + * The memory is divided into three areas: + * - Bucket area: they contain bucket heads: + *an 8 byte pointer to the first link of a linked list in the record area + * - Record area: this contains the actual data in linked list elements. A linked list element starts + *with an 8 byte pointer to the next element, and then the record follows. + * - Staging area: This is a small, temporary storage area for writing updated records. This is needed, + *because before serializing a record, there is no way to know in advance how large will it be. + *Therefore, we can't serialize directly into the record area when we are doing an update, because + *if it turns out to be larger then the old record, then it would override some other record + *that happens to be after the old one in memory. The solution is to serialize to the staging area first, + *and then copy it to the place of the original if it has the same size, otherwise allocate a new linked + *list element at the end of the record area, and mark the old one as abandoned. This creates "holes" in + *the record area, so compactions are eventually needed. + * + * Compaction happens by deleting everything in the bucket area, and then reinserting all elements. + * The reinsertion happens by forgetting the structure (the linked lists) of the record area, and reading it + * sequentially, and inserting all non-abandoned records, starting from the beginning of the record area. + * Note, that insertions never override a record that have not been read by the reinsertion sweep, because + * both the insertions and readings happen sequentially in the record area, and the insertions obviously + * never overtake the reading sweep. + * + * Note: we have to abandon the old linked list element even when the updated record has a smaller size + * than the original, because otherwise we wouldn't know where the next record starts during a reinsertion + * sweep. + * + * The number of buckets depends on how large are the records. The serializer might be able to tell us this, + * so in this case, we will calculate the number of buckets upfront, and won't do resizes. + * If the serializer doesn't know the size, then we start with a small number of buckets, and do resizes as more + * elements are inserted than the number of buckets. + * + * The number of memory segments given to the staging area is usually one, because it just needs to hold + * one record.
[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...
Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r52465854 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java --- @@ -0,0 +1,1014 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.hash; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.typeutils.SameTypePairComparator; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.disk.RandomAccessInputView; +import org.apache.flink.runtime.memory.AbstractPagedOutputView; +import org.apache.flink.runtime.util.MathUtils; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.EOFException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * This hash table supports updating elements, and it also has processRecordWithReduce, + * which makes one reduce step with the given record. + * + * The memory is divided into three areas: + * - Bucket area: they contain bucket heads: + *an 8 byte pointer to the first link of a linked list in the record area + * - Record area: this contains the actual data in linked list elements. A linked list element starts + *with an 8 byte pointer to the next element, and then the record follows. + * - Staging area: This is a small, temporary storage area for writing updated records. This is needed, + *because before serializing a record, there is no way to know in advance how large will it be. + *Therefore, we can't serialize directly into the record area when we are doing an update, because + *if it turns out to be larger then the old record, then it would override some other record + *that happens to be after the old one in memory. The solution is to serialize to the staging area first, + *and then copy it to the place of the original if it has the same size, otherwise allocate a new linked + *list element at the end of the record area, and mark the old one as abandoned. This creates "holes" in + *the record area, so compactions are eventually needed. + * + * Compaction happens by deleting everything in the bucket area, and then reinserting all elements. + * The reinsertion happens by forgetting the structure (the linked lists) of the record area, and reading it + * sequentially, and inserting all non-abandoned records, starting from the beginning of the record area. + * Note, that insertions never override a record that have not been read by the reinsertion sweep, because + * both the insertions and readings happen sequentially in the record area, and the insertions obviously + * never overtake the reading sweep. + * + * Note: we have to abandon the old linked list element even when the updated record has a smaller size + * than the original, because otherwise we wouldn't know where the next record starts during a reinsertion + * sweep. + * + * The number of buckets depends on how large are the records. The serializer might be able to tell us this, + * so in this case, we will calculate the number of buckets upfront, and won't do resizes. + * If the serializer doesn't know the size, then we start with a small number of buckets, and do resizes as more + * elements are inserted than the number of buckets. + * + * The number of memory segments given to the staging area is usually one, because it just needs to hold + * one record.
[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...
Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r52466336 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTablePerformanceComparison.java --- @@ -102,7 +102,7 @@ public void testCompactingHashMapPerformance() { System.out.println("Starting update..."); start = System.currentTimeMillis(); while(updater.next(target) != null) { - target.setValue(target.getValue()*-1); --- End diff -- `target.getValue()` is always 0 at this point. (see the 6th paragraph in the PR opening comment) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r52460672 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java --- @@ -0,0 +1,1014 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.hash; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.typeutils.SameTypePairComparator; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.disk.RandomAccessInputView; +import org.apache.flink.runtime.memory.AbstractPagedOutputView; +import org.apache.flink.runtime.util.MathUtils; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.EOFException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * This hash table supports updating elements, and it also has processRecordWithReduce, + * which makes one reduce step with the given record. + * + * The memory is divided into three areas: + * - Bucket area: they contain bucket heads: + *an 8 byte pointer to the first link of a linked list in the record area + * - Record area: this contains the actual data in linked list elements. A linked list element starts + *with an 8 byte pointer to the next element, and then the record follows. + * - Staging area: This is a small, temporary storage area for writing updated records. This is needed, + *because before serializing a record, there is no way to know in advance how large will it be. + *Therefore, we can't serialize directly into the record area when we are doing an update, because + *if it turns out to be larger then the old record, then it would override some other record + *that happens to be after the old one in memory. The solution is to serialize to the staging area first, + *and then copy it to the place of the original if it has the same size, otherwise allocate a new linked + *list element at the end of the record area, and mark the old one as abandoned. This creates "holes" in + *the record area, so compactions are eventually needed. + * + * Compaction happens by deleting everything in the bucket area, and then reinserting all elements. + * The reinsertion happens by forgetting the structure (the linked lists) of the record area, and reading it + * sequentially, and inserting all non-abandoned records, starting from the beginning of the record area. + * Note, that insertions never override a record that have not been read by the reinsertion sweep, because + * both the insertions and readings happen sequentially in the record area, and the insertions obviously + * never overtake the reading sweep. + * + * Note: we have to abandon the old linked list element even when the updated record has a smaller size + * than the original, because otherwise we wouldn't know where the next record starts during a reinsertion + * sweep. + * + * The number of buckets depends on how large are the records. The serializer might be able to tell us this, + * so in this case, we will calculate the number of buckets upfront, and won't do resizes. + * If the serializer doesn't know the size, then we start with a small number of buckets, and do resizes as more + * elements are inserted than the number of buckets. + * + * The number of memory segments given to the staging area is usually one, because it just needs to hold + * one record.
[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...
Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r52460698 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/AbstractBlockResettableIterator.java --- @@ -83,6 +83,7 @@ protected AbstractBlockResettableIterator(TypeSerializer serializer, MemoryMa this.collectingView = new SimpleCollectingOutputView(this.fullSegments, new ListMemorySegmentSource(this.emptySegments), memoryManager.getPageSize()); this.readView = new RandomAccessInputView(this.fullSegments, memoryManager.getPageSize()); + this.readView.setReadPosition(0); --- End diff -- ok --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...
Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r52461392 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java --- @@ -0,0 +1,1014 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.hash; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.typeutils.SameTypePairComparator; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.disk.RandomAccessInputView; +import org.apache.flink.runtime.memory.AbstractPagedOutputView; +import org.apache.flink.runtime.util.MathUtils; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.EOFException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * This hash table supports updating elements, and it also has processRecordWithReduce, + * which makes one reduce step with the given record. + * + * The memory is divided into three areas: + * - Bucket area: they contain bucket heads: + *an 8 byte pointer to the first link of a linked list in the record area + * - Record area: this contains the actual data in linked list elements. A linked list element starts + *with an 8 byte pointer to the next element, and then the record follows. + * - Staging area: This is a small, temporary storage area for writing updated records. This is needed, + *because before serializing a record, there is no way to know in advance how large will it be. + *Therefore, we can't serialize directly into the record area when we are doing an update, because + *if it turns out to be larger then the old record, then it would override some other record + *that happens to be after the old one in memory. The solution is to serialize to the staging area first, + *and then copy it to the place of the original if it has the same size, otherwise allocate a new linked + *list element at the end of the record area, and mark the old one as abandoned. This creates "holes" in + *the record area, so compactions are eventually needed. + * + * Compaction happens by deleting everything in the bucket area, and then reinserting all elements. + * The reinsertion happens by forgetting the structure (the linked lists) of the record area, and reading it + * sequentially, and inserting all non-abandoned records, starting from the beginning of the record area. + * Note, that insertions never override a record that have not been read by the reinsertion sweep, because + * both the insertions and readings happen sequentially in the record area, and the insertions obviously + * never overtake the reading sweep. + * + * Note: we have to abandon the old linked list element even when the updated record has a smaller size + * than the original, because otherwise we wouldn't know where the next record starts during a reinsertion + * sweep. + * + * The number of buckets depends on how large are the records. The serializer might be able to tell us this, + * so in this case, we will calculate the number of buckets upfront, and won't do resizes. + * If the serializer doesn't know the size, then we start with a small number of buckets, and do resizes as more + * elements are inserted than the number of buckets. + * + * The number of memory segments given to the staging area is usually one, because it just needs to hold + * one record.
[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...
Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r52467783 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReduceHashTableTest.java --- @@ -0,0 +1,501 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.hash; + +import com.google.common.collect.Ordering; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.functions.util.CopyingListCollector; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.LongComparator; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.runtime.TupleComparator; +import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.operators.testutils.UniformStringPairGenerator; +import org.apache.flink.runtime.operators.testutils.types.*; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; +import org.junit.Test; + +import java.util.*; + +import static org.junit.Assert.*; +import static org.junit.Assert.fail; + +public class ReduceHashTableTest { + + private static final long RANDOM_SEED = 58723953465322L; + + private static final int PAGE_SIZE = 16 * 1024; + + private final TypeSerializer> serializer; + private final TypeComparator > comparator; + + private final TypeComparator probeComparator; + + private final TypePairComparator > pairComparator; + + // + // -- Note: This part was mostly copied from CompactingHashTableTest -- + + public ReduceHashTableTest() { + TypeSerializer[] fieldSerializers = { LongSerializer.INSTANCE, StringSerializer.INSTANCE }; + @SuppressWarnings("unchecked") + Class > clazz = (Class >) (Class) Tuple2.class; + this.serializer = new TupleSerializer >(clazz, fieldSerializers); + + TypeComparator[] comparators = { new LongComparator(true) }; + TypeSerializer[] comparatorSerializers = { LongSerializer.INSTANCE }; + + this.comparator = new TupleComparator >(new int[] {0}, comparators, comparatorSerializers); + + this.probeComparator = new LongComparator(true); + + this.pairComparator = new TypePairComparator >() { + + private long ref; + + @Override + public void setReference(Long reference) { + ref = reference; + } + + @Override + public boolean equalToReference(Tuple2 candidate) { + //noinspection UnnecessaryUnboxing + return candidate.f0.longValue() == ref; + } + + @Override + public int compareToReference(Tuple2 candidate) { + long x = ref; + long y = candidate.f0; + return (x < y) ? -1 : ((x == y) ? 0 : 1); + } + }; + } + + @Test + public void testHashTableGrowthWithInsert()
[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...
Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r52461679 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java --- @@ -0,0 +1,1014 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.hash; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.typeutils.SameTypePairComparator; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.disk.RandomAccessInputView; +import org.apache.flink.runtime.memory.AbstractPagedOutputView; +import org.apache.flink.runtime.util.MathUtils; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.EOFException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * This hash table supports updating elements, and it also has processRecordWithReduce, + * which makes one reduce step with the given record. + * + * The memory is divided into three areas: + * - Bucket area: they contain bucket heads: + *an 8 byte pointer to the first link of a linked list in the record area + * - Record area: this contains the actual data in linked list elements. A linked list element starts + *with an 8 byte pointer to the next element, and then the record follows. + * - Staging area: This is a small, temporary storage area for writing updated records. This is needed, + *because before serializing a record, there is no way to know in advance how large will it be. + *Therefore, we can't serialize directly into the record area when we are doing an update, because + *if it turns out to be larger then the old record, then it would override some other record + *that happens to be after the old one in memory. The solution is to serialize to the staging area first, + *and then copy it to the place of the original if it has the same size, otherwise allocate a new linked + *list element at the end of the record area, and mark the old one as abandoned. This creates "holes" in + *the record area, so compactions are eventually needed. + * + * Compaction happens by deleting everything in the bucket area, and then reinserting all elements. + * The reinsertion happens by forgetting the structure (the linked lists) of the record area, and reading it + * sequentially, and inserting all non-abandoned records, starting from the beginning of the record area. + * Note, that insertions never override a record that have not been read by the reinsertion sweep, because + * both the insertions and readings happen sequentially in the record area, and the insertions obviously + * never overtake the reading sweep. + * + * Note: we have to abandon the old linked list element even when the updated record has a smaller size + * than the original, because otherwise we wouldn't know where the next record starts during a reinsertion + * sweep. + * + * The number of buckets depends on how large are the records. The serializer might be able to tell us this, + * so in this case, we will calculate the number of buckets upfront, and won't do resizes. + * If the serializer doesn't know the size, then we start with a small number of buckets, and do resizes as more + * elements are inserted than the number of buckets. + * + * The number of memory segments given to the staging area is usually one, because it just needs to hold + * one record.
[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...
Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r52467302 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MemoryHashTableTest.java --- @@ -100,340 +101,371 @@ @Test public void testDifferentProbers() { final int NUM_MEM_PAGES = 32 * NUM_PAIRS / PAGE_SIZE; - - AbstractMutableHashTable table = new CompactingHashTable(serializer, comparator, getMemory(NUM_MEM_PAGES, PAGE_SIZE)); - + testDifferentProbersCore(new CompactingHashTable<>(serializer, comparator, getMemory(NUM_MEM_PAGES, PAGE_SIZE)), NUM_MEM_PAGES); --- End diff -- I can do that, but I would like to ask why is that important. I mean if one of these tests fail, then one of the lines can be simply commented out to see which hash table is making it fail. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r52471181 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReduceHashTableTest.java --- @@ -0,0 +1,501 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.hash; + +import com.google.common.collect.Ordering; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.functions.util.CopyingListCollector; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.LongComparator; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.runtime.TupleComparator; +import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.operators.testutils.UniformStringPairGenerator; +import org.apache.flink.runtime.operators.testutils.types.*; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; +import org.junit.Test; + +import java.util.*; + +import static org.junit.Assert.*; +import static org.junit.Assert.fail; + +public class ReduceHashTableTest { + + private static final long RANDOM_SEED = 58723953465322L; + + private static final int PAGE_SIZE = 16 * 1024; + + private final TypeSerializer> serializer; + private final TypeComparator > comparator; + + private final TypeComparator probeComparator; + + private final TypePairComparator > pairComparator; + + // + // -- Note: This part was mostly copied from CompactingHashTableTest -- + + public ReduceHashTableTest() { + TypeSerializer[] fieldSerializers = { LongSerializer.INSTANCE, StringSerializer.INSTANCE }; + @SuppressWarnings("unchecked") + Class > clazz = (Class >) (Class) Tuple2.class; + this.serializer = new TupleSerializer >(clazz, fieldSerializers); + + TypeComparator[] comparators = { new LongComparator(true) }; + TypeSerializer[] comparatorSerializers = { LongSerializer.INSTANCE }; + + this.comparator = new TupleComparator >(new int[] {0}, comparators, comparatorSerializers); + + this.probeComparator = new LongComparator(true); + + this.pairComparator = new TypePairComparator >() { + + private long ref; + + @Override + public void setReference(Long reference) { + ref = reference; + } + + @Override + public boolean equalToReference(Tuple2 candidate) { + //noinspection UnnecessaryUnboxing + return candidate.f0.longValue() == ref; + } + + @Override + public int compareToReference(Tuple2 candidate) { + long x = ref; + long y = candidate.f0; + return (x < y) ? -1 : ((x == y) ? 0 : 1); + } + }; + } + + @Test + public void
[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...
Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r52471297 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java --- @@ -0,0 +1,1014 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.hash; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.typeutils.SameTypePairComparator; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.disk.RandomAccessInputView; +import org.apache.flink.runtime.memory.AbstractPagedOutputView; +import org.apache.flink.runtime.util.MathUtils; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.EOFException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * This hash table supports updating elements, and it also has processRecordWithReduce, + * which makes one reduce step with the given record. + * + * The memory is divided into three areas: + * - Bucket area: they contain bucket heads: + *an 8 byte pointer to the first link of a linked list in the record area + * - Record area: this contains the actual data in linked list elements. A linked list element starts + *with an 8 byte pointer to the next element, and then the record follows. + * - Staging area: This is a small, temporary storage area for writing updated records. This is needed, + *because before serializing a record, there is no way to know in advance how large will it be. + *Therefore, we can't serialize directly into the record area when we are doing an update, because + *if it turns out to be larger then the old record, then it would override some other record + *that happens to be after the old one in memory. The solution is to serialize to the staging area first, + *and then copy it to the place of the original if it has the same size, otherwise allocate a new linked + *list element at the end of the record area, and mark the old one as abandoned. This creates "holes" in + *the record area, so compactions are eventually needed. + * + * Compaction happens by deleting everything in the bucket area, and then reinserting all elements. + * The reinsertion happens by forgetting the structure (the linked lists) of the record area, and reading it + * sequentially, and inserting all non-abandoned records, starting from the beginning of the record area. + * Note, that insertions never override a record that have not been read by the reinsertion sweep, because + * both the insertions and readings happen sequentially in the record area, and the insertions obviously + * never overtake the reading sweep. + * + * Note: we have to abandon the old linked list element even when the updated record has a smaller size + * than the original, because otherwise we wouldn't know where the next record starts during a reinsertion + * sweep. + * + * The number of buckets depends on how large are the records. The serializer might be able to tell us this, + * so in this case, we will calculate the number of buckets upfront, and won't do resizes. + * If the serializer doesn't know the size, then we start with a small number of buckets, and do resizes as more + * elements are inserted than the number of buckets. + * + * The number of memory segments given to the staging area is usually one, because it just needs to hold + * one record.
[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...
Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r52488669 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MemoryHashTableTest.java --- @@ -100,340 +101,371 @@ @Test public void testDifferentProbers() { final int NUM_MEM_PAGES = 32 * NUM_PAIRS / PAGE_SIZE; - - AbstractMutableHashTable table = new CompactingHashTable(serializer, comparator, getMemory(NUM_MEM_PAGES, PAGE_SIZE)); - + testDifferentProbersCore(new CompactingHashTable<>(serializer, comparator, getMemory(NUM_MEM_PAGES, PAGE_SIZE)), NUM_MEM_PAGES); --- End diff -- OK, I'll split the top-level methods into two methods, and move these to different classes. However, the core of the tests can remain shared, right? For example, I would split `testBuildAndRetrieve` to two methods, and move these to different classes, but both of them would still call the same `testBuildAndRetrieveCore`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1517#issuecomment-182483003 The benefit of having multiple partitions is that you do not need to go over the complete RecordArea but only over a partition. After the partition is compacted or was emitted, it's MemorySegments can be used by other Partitions as well. Depending on the strategy to choose the Partition to emit or compact (#segments in partition, update count on partition, #unique values in partition), this should improve the combine rate (less emitted records from the table) and also improve the pipelined behavior of the combiner, because you'll get more but much shorter pauses. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...
Github user ggevay commented on the pull request: https://github.com/apache/flink/pull/1517#issuecomment-182477373 > In any case, we need to update the documentation for the added CombineHint. I would also be good to extend ReduceITCase with a few tests that use CombineHint.HASH. OK, I'll do these. > the full stop to resize / rebuild / emit records might take some time, depending on the size of the table. Emit is very fast, because `EntryIterator` is reading the record area sequentially. I don't think there is much room for improvement there. (Except perhaps your idea of storing the lengths as negative values, and skipping over abandoned records :)) __Rebuilds because of compactions:__ Typically, the size of the records don't change, in which case no compactions happen. If the records change size, that's a mess. But even in this case, performance should be reduced by less than 2x, because the number of insertions done in rebuilds should be proportional to the number of normal update/insert operations, and one insert during rebuild is faster then one usual insert/update, because the linked lists are not traversed (only the bucket segments are accessed randomly; the elements are inserted to the beginning of the lists). __Rebuilds because of resizing__: If the serializer knows the length of the records in advance, then no resizings will happen, because we just calculate the final number of bucket segments at the beginning (see `calcInitialNumBucketSegments`). Unfortunately, currently the serializers almost never know the length, but I hope this situation will improve in the future (for example, see FLINK-3321). If the serializer doesn't know the length, then how much time is spent with resizings is greatly affected by the average number of records per key. This is because the time spent with doing resizes is proportional to the number of elements inserted, and if there are lots of elements with matching keys, then most operations will be updates (rather than insertions). I will probably do some performance tests to confirm this, and see how much time is spent in resizes in the worst case (when every key has only one element). (Also, the "performance should be reduced by less than 2x" argument from above should be applicable in this case as well.) > Do you think it is possible to extract the ReduceFunction from the table? This should be doable, I'll try. > Should we think about linear hashing for resizing the table. Thanks, I didn't know about this algorithm. I'll think about this. > Split the single record into multiple partitions (i.e., use multiple RecordAreas). Each partition holds the data of multiple buckets. This allows to restrict rebuilding, compaction, etc. to only a part of the whole table. Do you think that this would have a performance benefit? The time of a rebuild is linear with the size, so I don't see how doing it in chunks would improve performance. (I mean, the (frequency of rebuilds of a partition) / (frequency of rebuilds of the full table) is probably the same ratio as the (time to rebuild a partition) / (time to rebuild the full table).) > It can also help to make your implementation more suitable for the final reduce case Yes, some partitioning will be needed for that. However, I would like to get this in first, and then I'll design the implementation of that case. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r52489247 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MemoryHashTableTest.java --- @@ -100,340 +101,371 @@ @Test public void testDifferentProbers() { final int NUM_MEM_PAGES = 32 * NUM_PAIRS / PAGE_SIZE; - - AbstractMutableHashTable table = new CompactingHashTable(serializer, comparator, getMemory(NUM_MEM_PAGES, PAGE_SIZE)); - + testDifferentProbersCore(new CompactingHashTable<>(serializer, comparator, getMemory(NUM_MEM_PAGES, PAGE_SIZE)), NUM_MEM_PAGES); --- End diff -- Yes, that's what I had in mind. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...
Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r52488021 --- Diff: flink-tests/src/test/java/org/apache/flink/test/manual/ReducePerformance.java --- @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.manual; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.operators.base.ReduceOperatorBase; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.util.SplittableIterator; + +import java.io.Serializable; +import java.util.Iterator; +import java.util.Random; + +public class ReducePerformance { --- End diff -- OK, I'll do this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...
Github user ggevay commented on the pull request: https://github.com/apache/flink/pull/1517#issuecomment-181907722 What I'm not sure about is the `closeUserCode` call in `ChainedReduceCombineDriver.closeTask`. Those other chained drivers that have a `running` flag for indicating canceling, make this call only when the driver was not canceled. But those other chained drivers where there is no `running` flag seem to make this call also when they were canceled. What is the reasoning behind this situation? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...
Github user ggevay commented on the pull request: https://github.com/apache/flink/pull/1517#issuecomment-181437507 I have added the strategy hint. At the moment, it defaults to sorting, but I couldn't actually find any case where hashing is slower, and it is often about 2 times faster. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1517#issuecomment-177204208 I agree with @mbalassi. Adding a strategy hint to the API, similar to the join hints for inner and outer joins is a good idea. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...
Github user mbalassi commented on the pull request: https://github.com/apache/flink/pull/1517#issuecomment-177094172 I think exposing the feature through the API similarly to the way join strategies are exposed is the way to properly make this feature available for users. I would leave the well-tested sorting version the default for now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...
Github user ggevay commented on the pull request: https://github.com/apache/flink/pull/1517#issuecomment-175734134 I have changed the hash function to include that bit-magic that is done in the other hash tables. (Originally, I thought that it is not needed here, because I haven't realized that it is not only for diffusing clusters, but also to have the high bits affect the low bits of the hashes.) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...
Github user ggevay commented on the pull request: https://github.com/apache/flink/pull/1517#issuecomment-174492669 I have added a commit to this PR that adds the chained version of `ReduceCombineDriver`. (It doesn't have the sort-based code, only the hash-based.) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...
Github user ggevay commented on the pull request: https://github.com/apache/flink/pull/1517#issuecomment-173997370 Quick question: Is there a reason why there is no chained version of ReduceCombineDriver? (GroupReduceDriver and AllReduceDriver both have chained versions: SynchronousChainedCombineDriver and ChainedAllReduceDriver.) If it is just because no one got around to it yet, then I'll open a JIRA and implement it quickly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...
Github user ggevay commented on the pull request: https://github.com/apache/flink/pull/1517#issuecomment-174012431 Thanks! I've found the JIRA: https://issues.apache.org/jira/browse/FLINK-2246 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...
Github user zentol commented on the pull request: https://github.com/apache/flink/pull/1517#issuecomment-174010704 No one got around to do it in an efficient/safe manner. There is a JIRA for it afaik. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...
Github user ggevay commented on the pull request: https://github.com/apache/flink/pull/1517#issuecomment-173571311 I've fixed a minor issue related to closing the table, and improved some comments. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...
Github user ggevay commented on the pull request: https://github.com/apache/flink/pull/1517#issuecomment-173319246 I've added some performance testing code and comments. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...
Github user ggevay commented on the pull request: https://github.com/apache/flink/pull/1517#issuecomment-172939642 Hello @fhueske, thanks for the comment and shepherding the PR! > You said, the combiner flushes the full hash table if it runs out of memory. Do you think it would be possible to track the update frequency of buckets and only flush the bucket with the least updates (or n buckets with least updates)? This might improve the performance for skewed input data. If we flush only some of the elements when the memory is full, then the append position of the record area doesn't change, but only some holes appear in the record area, so we immediately need to do a compaction. Since the compaction traverses the entire table, this idea could only work if we flush a substantial amount of the elements (like half of them). This is an interesting idea, but I'm finding it quite hard to form even just an intuitive understanding about its performance effects. I mean doing this would have some overhead, and at the moment I totally can't see how much skew in the data would make this worth it (or what percentage of the data should be flushed at a time, etc.). I'll think about this some more, but getting right the trade-offs here would probably require quite a lot of work: experimentation with different variants of the algorithm, with differently skewed data, with different `distinct keys / total number of input elements` ratios, so we should probably postpone this to some later time, after this basic version is merged. > I agree that the sort-based strategy for reduce combiners can be removed eventually, when the hash-based strategy proves to work well. I would like to give it a bit more exposure though, before we drop the code. OK. > Porting the built-in aggregation functions, distinct, etc. from GroupReduceFunctions to ReduceFunctions sounds good. I think the reason for this design decision was that for the sort-based strategies ReduceFunctions did not had a benefit over GroupReduceFunctions. Instead they caused more method invocations (once for each input record) compared to once per key. I see, thanks! I'll keep this in mind for now, and open a JIRA when the dust settles around the hash-based combiner, and we see the performance differences more clearly. > It would be great if you could open a JIRA and add a short design document / API proposal for the changes on the serializers that you talked about. This would allow the community to review and discuss your proposal. OK, I will. Best, Gábor --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...
GitHub user ggevay opened a pull request: https://github.com/apache/flink/pull/1517 [FLINK-2237] [runtime] Add hash-based combiner. This adds ReduceHashTable, and modifies ReduceCombineDriver to use it instead of sorting. When an input element comes in, the driver probes the table for the key, and either inserts the element if it is not present, or updates it with one reduce step. When memory runs out, all the current elements (partial aggregates) in the table are sent to the output. I haven't yet deleted the code of the sort based solution from the driver, but it seems that the hash-based solution is always faster (sometimes by a factor of 3), so there isn't a need for dynamically choosing between them. I will do some more performance tests to confirm this, and then probably delete the sort based code. The hash-based approach also has the advantage that the memory used is proportional to the number of different keys, instead of the number of input elements. The interface of ReduceHashTable is a subset of the interface of CompactingHashTable, so in theory, it could replace CompactingHashTable everywhere (this is only JoinWithSolutionSet and coGroupWithSolutionSet, if I'm not mistaken). I haven't actually tried this yet though, but if you think that it worth to give it a go, then I will try it, and run some real performance tests. I've already added ReduceHashTable to HashTablePerformanceComparison, and when there is plenty of memory available, ReduceHashTable is about 5-10% faster than CompactingHashTable. It also uses ~15% less memory, and it also has the advantage that if the sizes of the records don't change during the updates, then no compactings are needed (because records are updated in-place), so performance doesn't degrade when we are close to occupying all the memory. (In contrast to CompactingHashTable, where updates start getting slow, when there is little free memory left.) For details about the operation of ReduceHashTable, see the comment at the beginning of the file. I've changed the ctor of RandomAccessInputView to not do a seek. (This behaviour is more practical for me, because I want to create an instance before allocating memory segments to it.) I verified at most places of usage that the code does an explicit seek after the ctor, but I couldn't see this in AbstractBlockResettableIterator, so I inserted an explicit seek there. I fixed a small bug in HashTablePerformanceComparison: the *-1 was not doing anything, because that value was always 0 there. The predefined aggregations (eg. DataSet.sum()) currently use groupReduce. I'm not sure what was the reason for this originally, but in light of this speedup of reduce now, maybe this could be reconsidered. (And the same stands for DataSet.distinct().) Additionally, I have plans for making a different hash table for handling the case when the serializer can tell the size of the records. This would allow open addressing to be used with linear probing, which would avoid a few cache misses per element access, so I'm expecting maybe a 2x speedup from this. (This open addressing hash table would actually be simpler than ReduceHashTable.) The difficulty with this is that currently the serializers are not very smart in this respect (I mean knowing the size), so some improvements would be needed for them, but I have some ideas for this. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ggevay/flink hashedReduceSquashed Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1517.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1517 commit d5eb20c10dcc875e6ae14f3298da829a2a4a5d61 Author: Gabor GevayDate: 2015-12-04T14:28:23Z [FLINK-2237] [runtime] Add hash-based combiner. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---