This is an automated email from the ASF dual-hosted git repository.

yangjie01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 5970d353360d [SPARK-45767][CORE] Delete `TimeStampedHashMap` and its UT
5970d353360d is described below

commit 5970d353360d4fb6647c8fbc10f733abf009eca1
Author: panbingkun <pbk1...@gmail.com>
AuthorDate: Thu Nov 2 23:04:07 2023 +0800

    [SPARK-45767][CORE] Delete `TimeStampedHashMap` and its UT
    
    ### What changes were proposed in this pull request?
    The pr aims to delete `TimeStampedHashMap` and its UT.
    
    ### Why are the changes needed?
    During Pr https://github.com/apache/spark/pull/43578, we found that the 
class `TimeStampedHashMap` is no longer in use. Based on the suggestion, we 
have removed it. 
https://github.com/apache/spark/pull/43578#discussion_r1378687555
    
    - First time this class `TimeStampedHashMap` be introduced:
    
https://github.com/apache/spark/commit/b18d70870a33a4783c6b3b787bef9b0eec30bce0#diff-77b12178a7036c71135074c6ddf7d659e5a69906264d5e3061087e4352e304ed
 introduced this data structure
    - After https://github.com/apache/spark/pull/22339, this class 
`TimeStampedHashMap` is only being used in UT `TimeStampedHashMapSuite`.
    So, after Spark 3.0, this data structure has not been used by any 
production code of Spark.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Pass GA.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #43633 from panbingkun/remove_TimeStampedHashMap.
    
    Authored-by: panbingkun <pbk1...@gmail.com>
    Signed-off-by: yangjie01 <yangji...@baidu.com>
---
 .../org/apache/spark/util/TimeStampedHashMap.scala | 143 ----------------
 .../spark/util/TimeStampedHashMapSuite.scala       | 179 ---------------------
 2 files changed, 322 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala 
b/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala
deleted file mode 100644
index b0fb33946520..000000000000
--- a/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.util
-
-import java.util.Map.Entry
-import java.util.Set
-import java.util.concurrent.ConcurrentHashMap
-
-import scala.collection.mutable
-import scala.jdk.CollectionConverters._
-
-import org.apache.spark.internal.Logging
-
-private[spark] case class TimeStampedValue[V](value: V, timestamp: Long)
-
-/**
- * This is a custom implementation of scala.collection.mutable.Map which 
stores the insertion
- * timestamp along with each key-value pair. If specified, the timestamp of 
each pair can be
- * updated every time it is accessed. Key-value pairs whose timestamp are 
older than a particular
- * threshold time can then be removed using the clearOldValues method. This is 
intended to
- * be a drop-in replacement of scala.collection.mutable.HashMap.
- *
- * @param updateTimeStampOnGet Whether timestamp of a pair will be updated 
when it is accessed
- */
-private[spark] class TimeStampedHashMap[A, B](updateTimeStampOnGet: Boolean = 
false)
-  extends mutable.Map[A, B]() with Logging {
-
-  //  Note: this class supports Scala 2.13. A parallel source tree has a 2.12 
implementation.
-
-  private val internalMap = new ConcurrentHashMap[A, TimeStampedValue[B]]()
-
-  def get(key: A): Option[B] = {
-    val value = internalMap.get(key)
-    if (value != null && updateTimeStampOnGet) {
-      internalMap.replace(key, value, TimeStampedValue(value.value, 
currentTime))
-    }
-    Option(value).map(_.value)
-  }
-
-  def iterator: Iterator[(A, B)] = {
-    getEntrySet.iterator.asScala.map(kv => (kv.getKey, kv.getValue.value))
-  }
-
-  def getEntrySet: Set[Entry[A, TimeStampedValue[B]]] = internalMap.entrySet
-
-  override def + [B1 >: B](kv: (A, B1)): mutable.Map[A, B1] = {
-    val newMap = new TimeStampedHashMap[A, B1]
-    val oldInternalMap = this.internalMap.asInstanceOf[ConcurrentHashMap[A, 
TimeStampedValue[B1]]]
-    newMap.internalMap.putAll(oldInternalMap)
-    kv match { case (a, b) => newMap.internalMap.put(a, TimeStampedValue(b, 
currentTime)) }
-    newMap
-  }
-
-  override def addOne(kv: (A, B)): this.type = {
-    kv match { case (a, b) => internalMap.put(a, TimeStampedValue(b, 
currentTime)) }
-    this
-  }
-
-  override def subtractOne(key: A): this.type = {
-    internalMap.remove(key)
-    this
-  }
-
-  override def update(key: A, value: B): Unit = {
-    this += ((key, value))
-  }
-
-  override def apply(key: A): B = {
-    get(key).getOrElse { throw new NoSuchElementException() }
-  }
-
-  override def filter(p: ((A, B)) => Boolean): mutable.Map[A, B] = {
-    internalMap.asScala.map { case (k, TimeStampedValue(v, t)) => (k, v) 
}.filter(p)
-  }
-
-  override def empty: mutable.Map[A, B] = new TimeStampedHashMap[A, B]()
-
-  override def size: Int = internalMap.size
-
-  override def foreach[U](f: ((A, B)) => U): Unit = {
-    val it = getEntrySet.iterator
-    while(it.hasNext) {
-      val entry = it.next()
-      val kv = (entry.getKey, entry.getValue.value)
-      f(kv)
-    }
-  }
-
-  def putIfAbsent(key: A, value: B): Option[B] = {
-    val prev = internalMap.putIfAbsent(key, TimeStampedValue(value, 
currentTime))
-    Option(prev).map(_.value)
-  }
-
-  def putAll(map: Map[A, B]): Unit = {
-    map.foreach { case (k, v) => update(k, v) }
-  }
-
-  def toMap: Map[A, B] = iterator.toMap
-
-  def clearOldValues(threshTime: Long, f: (A, B) => Unit): Unit = {
-    val it = getEntrySet.iterator
-    while (it.hasNext) {
-      val entry = it.next()
-      if (entry.getValue.timestamp < threshTime) {
-        f(entry.getKey, entry.getValue.value)
-        logDebug("Removing key " + entry.getKey)
-        it.remove()
-      }
-    }
-  }
-
-  /** Removes old key-value pairs that have timestamp earlier than 
`threshTime`. */
-  def clearOldValues(threshTime: Long): Unit = {
-    clearOldValues(threshTime, (_, _) => ())
-  }
-
-  private def currentTime: Long = System.currentTimeMillis
-
-  // For testing
-
-  def getTimeStampedValue(key: A): Option[TimeStampedValue[B]] = {
-    Option(internalMap.get(key))
-  }
-
-  def getTimestamp(key: A): Option[Long] = {
-    getTimeStampedValue(key).map(_.timestamp)
-  }
-}
diff --git 
a/core/src/test/scala/org/apache/spark/util/TimeStampedHashMapSuite.scala 
b/core/src/test/scala/org/apache/spark/util/TimeStampedHashMapSuite.scala
deleted file mode 100644
index 164454094683..000000000000
--- a/core/src/test/scala/org/apache/spark/util/TimeStampedHashMapSuite.scala
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.util
-
-import scala.collection.mutable
-import scala.collection.mutable.ArrayBuffer
-import scala.util.Random
-
-import org.apache.spark.SparkFunSuite
-
-class TimeStampedHashMapSuite extends SparkFunSuite {
-
-  // Test the testMap function - a Scala HashMap should obviously pass
-  testMap(new mutable.HashMap[String, String]())
-
-  // Test TimeStampedHashMap basic functionality
-  testMap(new TimeStampedHashMap[String, String]())
-  testMapThreadSafety(new TimeStampedHashMap[String, String]())
-
-  test("TimeStampedHashMap - clearing by timestamp") {
-    // clearing by insertion time
-    val map = new TimeStampedHashMap[String, String](updateTimeStampOnGet = 
false)
-    map("k1") = "v1"
-    assert(map("k1") === "v1")
-    Thread.sleep(10)
-    val threshTime = System.currentTimeMillis
-    assert(map.getTimestamp("k1").isDefined)
-    assert(map.getTimestamp("k1").get < threshTime)
-    map.clearOldValues(threshTime)
-    assert(map.get("k1") === None)
-
-    // clearing by modification time
-    val map1 = new TimeStampedHashMap[String, String](updateTimeStampOnGet = 
true)
-    map1("k1") = "v1"
-    map1("k2") = "v2"
-    assert(map1("k1") === "v1")
-    Thread.sleep(10)
-    val threshTime1 = System.currentTimeMillis
-    Thread.sleep(10)
-    assert(map1("k2") === "v2")     // access k2 to update its access time to 
> threshTime
-    assert(map1.getTimestamp("k1").isDefined)
-    assert(map1.getTimestamp("k1").get < threshTime1)
-    assert(map1.getTimestamp("k2").isDefined)
-    assert(map1.getTimestamp("k2").get >= threshTime1)
-    map1.clearOldValues(threshTime1) // should only clear k1
-    assert(map1.get("k1") === None)
-    assert(map1.get("k2").isDefined)
-  }
-
-  /** Test basic operations of a Scala mutable Map. */
-  def testMap(hashMapConstructor: => mutable.Map[String, String]): Unit = {
-    def newMap() = hashMapConstructor
-    val testMap1 = newMap()
-    val testMap2 = newMap()
-    val name = testMap1.getClass.getSimpleName
-
-    test(name + " - basic test") {
-      // put, get, and apply
-      testMap1 += (("k1", "v1"))
-      assert(testMap1.get("k1").isDefined)
-      assert(testMap1("k1") === "v1")
-      testMap1("k2") = "v2"
-      assert(testMap1.get("k2").isDefined)
-      assert(testMap1("k2") === "v2")
-      assert(testMap1("k2") === "v2")
-      testMap1.update("k3", "v3")
-      assert(testMap1.get("k3").isDefined)
-      assert(testMap1("k3") === "v3")
-
-      // remove
-      testMap1.remove("k1")
-      assert(testMap1.get("k1").isEmpty)
-      testMap1.remove("k2")
-      intercept[NoSuchElementException] {
-        testMap1("k2") // Map.apply(<non-existent-key>) causes exception
-      }
-      testMap1 -= "k3"
-      assert(testMap1.get("k3").isEmpty)
-
-      // multi put
-      val keys = (1 to 100).map(_.toString)
-      val pairs = keys.map(x => (x, x * 2))
-      assert((testMap2 ++ pairs).iterator.toSet === pairs.toSet)
-      testMap2 ++= pairs
-
-      // iterator
-      assert(testMap2.iterator.toSet === pairs.toSet)
-
-      // filter
-      val filtered = testMap2.filter { case (_, v) => v.toInt % 2 == 0 }
-      val evenPairs = pairs.filter { case (_, v) => v.toInt % 2 == 0 }
-      assert(filtered.iterator.toSet === evenPairs.toSet)
-
-      // foreach
-      val buffer = new ArrayBuffer[(String, String)]
-      testMap2.foreach(x => buffer += x)
-      assert(testMap2.toSet === buffer.toSet)
-
-      // multi remove
-      testMap2("k1") = "v1"
-      testMap2 --= keys
-      assert(testMap2.size === 1)
-      assert(testMap2.iterator.toSeq.head === (("k1", "v1")))
-
-      // +
-      val testMap3 = testMap2 + (("k0", "v0"))
-      assert(testMap3.size === 2)
-      assert(testMap3.get("k1").isDefined)
-      assert(testMap3("k1") === "v1")
-      assert(testMap3.get("k0").isDefined)
-      assert(testMap3("k0") === "v0")
-
-      // -
-      val testMap4 = testMap3 - "k0"
-      assert(testMap4.size === 1)
-      assert(testMap4.get("k1").isDefined)
-      assert(testMap4("k1") === "v1")
-    }
-  }
-
-  /** Test thread safety of a Scala mutable map. */
-  def testMapThreadSafety(hashMapConstructor: => mutable.Map[String, String]): 
Unit = {
-    def newMap() = hashMapConstructor
-    val name = newMap().getClass.getSimpleName
-    val testMap = newMap()
-    @volatile var error = false
-
-    def getRandomKey(m: mutable.Map[String, String]): Option[String] = {
-      val keys = testMap.keysIterator.toSeq
-      if (keys.nonEmpty) {
-        Some(keys(Random.nextInt(keys.size)))
-      } else {
-        None
-      }
-    }
-
-    val threads = (1 to 25).map(i => new Thread() {
-      override def run(): Unit = {
-        try {
-          for (j <- 1 to 1000) {
-            Random.nextInt(3) match {
-              case 0 =>
-                testMap(Random.nextString(10)) = Random.nextDouble().toString 
// put
-              case 1 =>
-                getRandomKey(testMap).map(testMap.get) // get
-              case 2 =>
-                getRandomKey(testMap).map(testMap.remove) // remove
-            }
-          }
-        } catch {
-          case t: Throwable =>
-            error = true
-            throw t
-        }
-      }
-    })
-
-    test(name + " - threading safety test")  {
-      threads.foreach(_.start())
-      threads.foreach(_.join())
-      assert(!error)
-    }
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to