Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/1297#discussion_r17791303 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ImmutableLongOpenHashSet.scala --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection + +import scala.reflect._ +import com.google.common.hash.Hashing + +/** + * A fast, immutable hash set optimized for insertions and lookups (but not deletions) of `Long` + * elements. Because it exposes the position of a key in the underlying array, this is useful as a + * building block for higher level data structures such as a hash map (for example, + * IndexedRDDPartition). + * + * It uses quadratic probing with a power-of-2 hash table size, which is guaranteed to explore all + * spaces for each key (see http://en.wikipedia.org/wiki/Quadratic_probing). + */ +private[spark] class ImmutableLongOpenHashSet( + /** Underlying array of elements used as a hash table. */ + val data: ImmutableVector[Long], + /** Whether or not there is an element at the corresponding position in `data`. */ + val bitset: ImmutableBitSet, + /** + * Position of a focused element. This is useful when returning a modified set along with a + * pointer to the location of modification. + */ + val focus: Int, + /** Load threshold at which to grow the underlying vectors. */ + loadFactor: Double + ) extends Serializable { + + require(loadFactor < 1.0, "Load factor must be less than 1.0") + require(loadFactor > 0.0, "Load factor must be greater than 0.0") + require(capacity == nextPowerOf2(capacity), "data capacity must be a power of 2") + + import OpenHashSet.{INVALID_POS, NONEXISTENCE_MASK, POSITION_MASK, Hasher, LongHasher} + + private val hasher: Hasher[Long] = new LongHasher + + private def mask = capacity - 1 + private def growThreshold = (loadFactor * capacity).toInt + + def withFocus(focus: Int): ImmutableLongOpenHashSet = + new ImmutableLongOpenHashSet(data, bitset, focus, loadFactor) + + /** The number of elements in the set. */ + def size: Int = bitset.cardinality + + /** The capacity of the set (i.e. size of the underlying vector). */ + def capacity: Int = data.size + + /** Return true if this set contains the specified element. */ + def contains(k: Long): Boolean = getPos(k) != INVALID_POS + + /** + * Nondestructively add an element to the set, returning a new set. If the set is over capacity + * after the insertion, grows the set and rehashes all elements. + */ + def add(k: Long): ImmutableLongOpenHashSet = { + addWithoutResize(k).rehashIfNeeded(ImmutableLongOpenHashSet.grow, ImmutableLongOpenHashSet.move) + } + + /** + * Add an element to the set. This one differs from add in that it doesn't trigger rehashing. + * The caller is responsible for calling rehashIfNeeded. + * + * Use (retval.focus & POSITION_MASK) to get the actual position, and + * (retval.focus & NONEXISTENCE_MASK) == 0 for prior existence. + */ + def addWithoutResize(k: Long): ImmutableLongOpenHashSet = { + var pos = hashcode(hasher.hash(k)) & mask + var i = 1 + var result: ImmutableLongOpenHashSet = null + while (result == null) { + if (!bitset.get(pos)) { + // This is a new key. + result = new ImmutableLongOpenHashSet( + data.updated(pos, k), bitset.set(pos), pos | NONEXISTENCE_MASK, loadFactor) + } else if (data(pos) == k) { + // Found an existing key. + result = this.withFocus(pos) + } else { + val delta = i + pos = (pos + delta) & mask + i += 1 + } + } + result + } + + /** + * Rehash the set if it is overloaded. + * @param allocateFunc Callback invoked when we are allocating a new, larger array. + * @param moveFunc Callback invoked when we move the key from one position (in the old data array) + * to a new position (in the new data array). + */ + def rehashIfNeeded( + allocateFunc: (Int) => Unit, moveFunc: (Int, Int) => Unit): ImmutableLongOpenHashSet = { + if (size > growThreshold) { + rehash(allocateFunc, moveFunc) + } else { + this + } + } + + /** + * Return the position of the element in the underlying array, or INVALID_POS if it is not found. + */ + def getPos(k: Long): Int = { + var pos = hashcode(hasher.hash(k)) & mask + var i = 1 + val maxProbe = capacity + while (i < maxProbe) { + if (!bitset.get(pos)) { + return INVALID_POS + } else if (k == data(pos)) { + return pos + } else { + val delta = i + pos = (pos + delta) & mask + i += 1 + } + } + // Never reached here + INVALID_POS --- End diff -- would it be better to throw an exception here? Just in case something goes wrong and you end up here, you get a stack trace which will pinpoint the problem.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org