This is an automated email from the ASF dual-hosted git repository. yangjie01 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 5970d353360d [SPARK-45767][CORE] Delete `TimeStampedHashMap` and its UT 5970d353360d is described below commit 5970d353360d4fb6647c8fbc10f733abf009eca1 Author: panbingkun <pbk1...@gmail.com> AuthorDate: Thu Nov 2 23:04:07 2023 +0800 [SPARK-45767][CORE] Delete `TimeStampedHashMap` and its UT ### What changes were proposed in this pull request? The pr aims to delete `TimeStampedHashMap` and its UT. ### Why are the changes needed? During Pr https://github.com/apache/spark/pull/43578, we found that the class `TimeStampedHashMap` is no longer in use. Based on the suggestion, we have removed it. https://github.com/apache/spark/pull/43578#discussion_r1378687555 - First time this class `TimeStampedHashMap` be introduced: https://github.com/apache/spark/commit/b18d70870a33a4783c6b3b787bef9b0eec30bce0#diff-77b12178a7036c71135074c6ddf7d659e5a69906264d5e3061087e4352e304ed introduced this data structure - After https://github.com/apache/spark/pull/22339, this class `TimeStampedHashMap` is only being used in UT `TimeStampedHashMapSuite`. So, after Spark 3.0, this data structure has not been used by any production code of Spark. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass GA. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #43633 from panbingkun/remove_TimeStampedHashMap. Authored-by: panbingkun <pbk1...@gmail.com> Signed-off-by: yangjie01 <yangji...@baidu.com> --- .../org/apache/spark/util/TimeStampedHashMap.scala | 143 ---------------- .../spark/util/TimeStampedHashMapSuite.scala | 179 --------------------- 2 files changed, 322 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala b/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala deleted file mode 100644 index b0fb33946520..000000000000 --- a/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.util - -import java.util.Map.Entry -import java.util.Set -import java.util.concurrent.ConcurrentHashMap - -import scala.collection.mutable -import scala.jdk.CollectionConverters._ - -import org.apache.spark.internal.Logging - -private[spark] case class TimeStampedValue[V](value: V, timestamp: Long) - -/** - * This is a custom implementation of scala.collection.mutable.Map which stores the insertion - * timestamp along with each key-value pair. If specified, the timestamp of each pair can be - * updated every time it is accessed. Key-value pairs whose timestamp are older than a particular - * threshold time can then be removed using the clearOldValues method. This is intended to - * be a drop-in replacement of scala.collection.mutable.HashMap. - * - * @param updateTimeStampOnGet Whether timestamp of a pair will be updated when it is accessed - */ -private[spark] class TimeStampedHashMap[A, B](updateTimeStampOnGet: Boolean = false) - extends mutable.Map[A, B]() with Logging { - - // Note: this class supports Scala 2.13. A parallel source tree has a 2.12 implementation. - - private val internalMap = new ConcurrentHashMap[A, TimeStampedValue[B]]() - - def get(key: A): Option[B] = { - val value = internalMap.get(key) - if (value != null && updateTimeStampOnGet) { - internalMap.replace(key, value, TimeStampedValue(value.value, currentTime)) - } - Option(value).map(_.value) - } - - def iterator: Iterator[(A, B)] = { - getEntrySet.iterator.asScala.map(kv => (kv.getKey, kv.getValue.value)) - } - - def getEntrySet: Set[Entry[A, TimeStampedValue[B]]] = internalMap.entrySet - - override def + [B1 >: B](kv: (A, B1)): mutable.Map[A, B1] = { - val newMap = new TimeStampedHashMap[A, B1] - val oldInternalMap = this.internalMap.asInstanceOf[ConcurrentHashMap[A, TimeStampedValue[B1]]] - newMap.internalMap.putAll(oldInternalMap) - kv match { case (a, b) => newMap.internalMap.put(a, TimeStampedValue(b, currentTime)) } - newMap - } - - override def addOne(kv: (A, B)): this.type = { - kv match { case (a, b) => internalMap.put(a, TimeStampedValue(b, currentTime)) } - this - } - - override def subtractOne(key: A): this.type = { - internalMap.remove(key) - this - } - - override def update(key: A, value: B): Unit = { - this += ((key, value)) - } - - override def apply(key: A): B = { - get(key).getOrElse { throw new NoSuchElementException() } - } - - override def filter(p: ((A, B)) => Boolean): mutable.Map[A, B] = { - internalMap.asScala.map { case (k, TimeStampedValue(v, t)) => (k, v) }.filter(p) - } - - override def empty: mutable.Map[A, B] = new TimeStampedHashMap[A, B]() - - override def size: Int = internalMap.size - - override def foreach[U](f: ((A, B)) => U): Unit = { - val it = getEntrySet.iterator - while(it.hasNext) { - val entry = it.next() - val kv = (entry.getKey, entry.getValue.value) - f(kv) - } - } - - def putIfAbsent(key: A, value: B): Option[B] = { - val prev = internalMap.putIfAbsent(key, TimeStampedValue(value, currentTime)) - Option(prev).map(_.value) - } - - def putAll(map: Map[A, B]): Unit = { - map.foreach { case (k, v) => update(k, v) } - } - - def toMap: Map[A, B] = iterator.toMap - - def clearOldValues(threshTime: Long, f: (A, B) => Unit): Unit = { - val it = getEntrySet.iterator - while (it.hasNext) { - val entry = it.next() - if (entry.getValue.timestamp < threshTime) { - f(entry.getKey, entry.getValue.value) - logDebug("Removing key " + entry.getKey) - it.remove() - } - } - } - - /** Removes old key-value pairs that have timestamp earlier than `threshTime`. */ - def clearOldValues(threshTime: Long): Unit = { - clearOldValues(threshTime, (_, _) => ()) - } - - private def currentTime: Long = System.currentTimeMillis - - // For testing - - def getTimeStampedValue(key: A): Option[TimeStampedValue[B]] = { - Option(internalMap.get(key)) - } - - def getTimestamp(key: A): Option[Long] = { - getTimeStampedValue(key).map(_.timestamp) - } -} diff --git a/core/src/test/scala/org/apache/spark/util/TimeStampedHashMapSuite.scala b/core/src/test/scala/org/apache/spark/util/TimeStampedHashMapSuite.scala deleted file mode 100644 index 164454094683..000000000000 --- a/core/src/test/scala/org/apache/spark/util/TimeStampedHashMapSuite.scala +++ /dev/null @@ -1,179 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.util - -import scala.collection.mutable -import scala.collection.mutable.ArrayBuffer -import scala.util.Random - -import org.apache.spark.SparkFunSuite - -class TimeStampedHashMapSuite extends SparkFunSuite { - - // Test the testMap function - a Scala HashMap should obviously pass - testMap(new mutable.HashMap[String, String]()) - - // Test TimeStampedHashMap basic functionality - testMap(new TimeStampedHashMap[String, String]()) - testMapThreadSafety(new TimeStampedHashMap[String, String]()) - - test("TimeStampedHashMap - clearing by timestamp") { - // clearing by insertion time - val map = new TimeStampedHashMap[String, String](updateTimeStampOnGet = false) - map("k1") = "v1" - assert(map("k1") === "v1") - Thread.sleep(10) - val threshTime = System.currentTimeMillis - assert(map.getTimestamp("k1").isDefined) - assert(map.getTimestamp("k1").get < threshTime) - map.clearOldValues(threshTime) - assert(map.get("k1") === None) - - // clearing by modification time - val map1 = new TimeStampedHashMap[String, String](updateTimeStampOnGet = true) - map1("k1") = "v1" - map1("k2") = "v2" - assert(map1("k1") === "v1") - Thread.sleep(10) - val threshTime1 = System.currentTimeMillis - Thread.sleep(10) - assert(map1("k2") === "v2") // access k2 to update its access time to > threshTime - assert(map1.getTimestamp("k1").isDefined) - assert(map1.getTimestamp("k1").get < threshTime1) - assert(map1.getTimestamp("k2").isDefined) - assert(map1.getTimestamp("k2").get >= threshTime1) - map1.clearOldValues(threshTime1) // should only clear k1 - assert(map1.get("k1") === None) - assert(map1.get("k2").isDefined) - } - - /** Test basic operations of a Scala mutable Map. */ - def testMap(hashMapConstructor: => mutable.Map[String, String]): Unit = { - def newMap() = hashMapConstructor - val testMap1 = newMap() - val testMap2 = newMap() - val name = testMap1.getClass.getSimpleName - - test(name + " - basic test") { - // put, get, and apply - testMap1 += (("k1", "v1")) - assert(testMap1.get("k1").isDefined) - assert(testMap1("k1") === "v1") - testMap1("k2") = "v2" - assert(testMap1.get("k2").isDefined) - assert(testMap1("k2") === "v2") - assert(testMap1("k2") === "v2") - testMap1.update("k3", "v3") - assert(testMap1.get("k3").isDefined) - assert(testMap1("k3") === "v3") - - // remove - testMap1.remove("k1") - assert(testMap1.get("k1").isEmpty) - testMap1.remove("k2") - intercept[NoSuchElementException] { - testMap1("k2") // Map.apply(<non-existent-key>) causes exception - } - testMap1 -= "k3" - assert(testMap1.get("k3").isEmpty) - - // multi put - val keys = (1 to 100).map(_.toString) - val pairs = keys.map(x => (x, x * 2)) - assert((testMap2 ++ pairs).iterator.toSet === pairs.toSet) - testMap2 ++= pairs - - // iterator - assert(testMap2.iterator.toSet === pairs.toSet) - - // filter - val filtered = testMap2.filter { case (_, v) => v.toInt % 2 == 0 } - val evenPairs = pairs.filter { case (_, v) => v.toInt % 2 == 0 } - assert(filtered.iterator.toSet === evenPairs.toSet) - - // foreach - val buffer = new ArrayBuffer[(String, String)] - testMap2.foreach(x => buffer += x) - assert(testMap2.toSet === buffer.toSet) - - // multi remove - testMap2("k1") = "v1" - testMap2 --= keys - assert(testMap2.size === 1) - assert(testMap2.iterator.toSeq.head === (("k1", "v1"))) - - // + - val testMap3 = testMap2 + (("k0", "v0")) - assert(testMap3.size === 2) - assert(testMap3.get("k1").isDefined) - assert(testMap3("k1") === "v1") - assert(testMap3.get("k0").isDefined) - assert(testMap3("k0") === "v0") - - // - - val testMap4 = testMap3 - "k0" - assert(testMap4.size === 1) - assert(testMap4.get("k1").isDefined) - assert(testMap4("k1") === "v1") - } - } - - /** Test thread safety of a Scala mutable map. */ - def testMapThreadSafety(hashMapConstructor: => mutable.Map[String, String]): Unit = { - def newMap() = hashMapConstructor - val name = newMap().getClass.getSimpleName - val testMap = newMap() - @volatile var error = false - - def getRandomKey(m: mutable.Map[String, String]): Option[String] = { - val keys = testMap.keysIterator.toSeq - if (keys.nonEmpty) { - Some(keys(Random.nextInt(keys.size))) - } else { - None - } - } - - val threads = (1 to 25).map(i => new Thread() { - override def run(): Unit = { - try { - for (j <- 1 to 1000) { - Random.nextInt(3) match { - case 0 => - testMap(Random.nextString(10)) = Random.nextDouble().toString // put - case 1 => - getRandomKey(testMap).map(testMap.get) // get - case 2 => - getRandomKey(testMap).map(testMap.remove) // remove - } - } - } catch { - case t: Throwable => - error = true - throw t - } - } - }) - - test(name + " - threading safety test") { - threads.foreach(_.start()) - threads.foreach(_.join()) - assert(!error) - } - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org