Repository: spark
Updated Branches:
  refs/heads/master ac0174e55 -> 42035a4fe


[SPARK-24441][SS] Expose total estimated size of states in 
HDFSBackedStateStoreProvider

## What changes were proposed in this pull request?

This patch exposes the estimation of size of cache (loadedMaps) in 
HDFSBackedStateStoreProvider as a custom metric of StateStore.

The rationalize of the patch is that state backed by 
HDFSBackedStateStoreProvider will consume more memory than the number what we 
can get from query status due to caching multiple versions of states. The 
memory footprint to be much larger than query status reports in situations 
where the state store is getting a lot of updates: while shallow-copying map 
incurs additional small memory usages due to the size of map entities and 
references, but row objects will still be shared across the versions. If 
there're lots of updates between batches, less row objects will be shared and 
more row objects will exist in memory consuming much memory then what we expect.

While HDFSBackedStateStore refers loadedMaps in HDFSBackedStateStoreProvider 
directly, there would be only one `StateStoreWriter` which refers a 
StateStoreProvider, so the value is not exposed as well as being aggregated 
multiple times. Current state metrics are safe to aggregate for the same reason.

## How was this patch tested?

Tested manually. Below is the snapshot of UI page which is reflected by the 
patch:

<img width="601" alt="screen shot 2018-06-05 at 10 16 16 pm" 
src="https://user-images.githubusercontent.com/1317309/40978481-b46ad324-690e-11e8-9b0f-e80528612a62.png";>

Please refer "estimated size of states cache in provider total" as well as 
"count of versions in state cache in provider".

Closes #21469 from HeartSaVioR/SPARK-24441.

Authored-by: Jungtaek Lim <kabh...@gmail.com>
Signed-off-by: Tathagata Das <tathagata.das1...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/42035a4f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/42035a4f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/42035a4f

Branch: refs/heads/master
Commit: 42035a4fec6eb216427486b5067a45fceb65cc2d
Parents: ac0174e
Author: Jungtaek Lim <kabh...@gmail.com>
Authored: Tue Aug 21 15:28:31 2018 -0700
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Tue Aug 21 15:28:31 2018 -0700

----------------------------------------------------------------------
 .../state/HDFSBackedStateStoreProvider.scala    |  39 +++++++-
 .../execution/streaming/state/StateStore.scala  |   2 +
 .../state/SymmetricHashJoinStateManager.scala   |   2 +
 .../execution/streaming/statefulOperators.scala |  12 ++-
 .../apache/spark/sql/streaming/progress.scala   |  15 ++-
 .../streaming/state/StateStoreSuite.scala       | 100 +++++++++++++++++++
 .../streaming/StreamingQueryListenerSuite.scala |   2 +-
 .../StreamingQueryStatusAndProgressSuite.scala  |  13 ++-
 8 files changed, 176 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/42035a4f/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
index 523acef..92a2480 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.streaming.state
 import java.io._
 import java.util
 import java.util.Locale
+import java.util.concurrent.atomic.LongAdder
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
@@ -165,7 +166,16 @@ private[state] class HDFSBackedStateStoreProvider extends 
StateStoreProvider wit
     }
 
     override def metrics: StateStoreMetrics = {
-      StateStoreMetrics(mapToUpdate.size(), 
SizeEstimator.estimate(mapToUpdate), Map.empty)
+      // NOTE: we provide estimation of cache size as "memoryUsedBytes", and 
size of state for
+      // current version as "stateOnCurrentVersionSizeBytes"
+      val metricsFromProvider: Map[String, Long] = getMetricsForProvider()
+
+      val customMetrics = metricsFromProvider.flatMap { case (name, value) =>
+        // just allow searching from list cause the list is small enough
+        supportedCustomMetrics.find(_.name == name).map(_ -> value)
+      } + (metricStateOnCurrentVersionSizeBytes -> 
SizeEstimator.estimate(mapToUpdate))
+
+      StateStoreMetrics(mapToUpdate.size(), 
metricsFromProvider("memoryUsedBytes"), customMetrics)
     }
 
     /**
@@ -180,6 +190,12 @@ private[state] class HDFSBackedStateStoreProvider extends 
StateStoreProvider wit
     }
   }
 
+  def getMetricsForProvider(): Map[String, Long] = synchronized {
+    Map("memoryUsedBytes" -> SizeEstimator.estimate(loadedMaps),
+      metricLoadedMapCacheHit.name -> loadedMapCacheHitCount.sum(),
+      metricLoadedMapCacheMiss.name -> loadedMapCacheMissCount.sum())
+  }
+
   /** Get the state store for making updates to create a new `version` of the 
store. */
   override def getStore(version: Long): StateStore = synchronized {
     require(version >= 0, "Version cannot be less than 0")
@@ -226,7 +242,8 @@ private[state] class HDFSBackedStateStoreProvider extends 
StateStoreProvider wit
   }
 
   override def supportedCustomMetrics: Seq[StateStoreCustomMetric] = {
-    Nil
+    metricStateOnCurrentVersionSizeBytes :: metricLoadedMapCacheHit :: 
metricLoadedMapCacheMiss ::
+      Nil
   }
 
   override def toString(): String = {
@@ -248,6 +265,21 @@ private[state] class HDFSBackedStateStoreProvider extends 
StateStoreProvider wit
   private lazy val fm = CheckpointFileManager.create(baseDir, hadoopConf)
   private lazy val sparkConf = Option(SparkEnv.get).map(_.conf).getOrElse(new 
SparkConf)
 
+  private val loadedMapCacheHitCount: LongAdder = new LongAdder
+  private val loadedMapCacheMissCount: LongAdder = new LongAdder
+
+  private lazy val metricStateOnCurrentVersionSizeBytes: 
StateStoreCustomSizeMetric =
+    StateStoreCustomSizeMetric("stateOnCurrentVersionSizeBytes",
+      "estimated size of state only on current version")
+
+  private lazy val metricLoadedMapCacheHit: StateStoreCustomMetric =
+    StateStoreCustomSumMetric("loadedMapCacheHitCount",
+      "count of cache hit on states cache in provider")
+
+  private lazy val metricLoadedMapCacheMiss: StateStoreCustomMetric =
+    StateStoreCustomSumMetric("loadedMapCacheMissCount",
+      "count of cache miss on states cache in provider")
+
   private case class StoreFile(version: Long, path: Path, isSnapshot: Boolean)
 
   private def commitUpdates(newVersion: Long, map: MapType, output: 
DataOutputStream): Unit = {
@@ -311,6 +343,7 @@ private[state] class HDFSBackedStateStoreProvider extends 
StateStoreProvider wit
     // Shortcut if the map for this version is already there to avoid a 
redundant put.
     val loadedCurrentVersionMap = synchronized { 
Option(loadedMaps.get(version)) }
     if (loadedCurrentVersionMap.isDefined) {
+      loadedMapCacheHitCount.increment()
       return loadedCurrentVersionMap.get
     }
 
@@ -318,6 +351,8 @@ private[state] class HDFSBackedStateStoreProvider extends 
StateStoreProvider wit
       "Reading snapshot file and delta files if needed..." +
       "Note that this is normal for the first batch of starting query.")
 
+    loadedMapCacheMissCount.increment()
+
     val (result, elapsedMs) = Utils.timeTakenMs {
       val snapshotCurrentVersionMap = readSnapshotFile(version)
       if (snapshotCurrentVersionMap.isDefined) {

http://git-wip-us.apache.org/repos/asf/spark/blob/42035a4f/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
index 7eb68c2..d3313b8 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
@@ -138,6 +138,8 @@ trait StateStoreCustomMetric {
   def name: String
   def desc: String
 }
+
+case class StateStoreCustomSumMetric(name: String, desc: String) extends 
StateStoreCustomMetric
 case class StateStoreCustomSizeMetric(name: String, desc: String) extends 
StateStoreCustomMetric
 case class StateStoreCustomTimingMetric(name: String, desc: String) extends 
StateStoreCustomMetric
 

http://git-wip-us.apache.org/repos/asf/spark/blob/42035a4f/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
index 6e7cd2d..352b3d3 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
@@ -269,6 +269,8 @@ class SymmetricHashJoinStateManager(
       keyWithIndexToValueMetrics.numKeys,       // represent each buffered row 
only once
       keyToNumValuesMetrics.memoryUsedBytes + 
keyWithIndexToValueMetrics.memoryUsedBytes,
       keyWithIndexToValueMetrics.customMetrics.map {
+        case (s @ StateStoreCustomSumMetric(_, desc), value) =>
+          s.copy(desc = newDesc(desc)) -> value
         case (s @ StateStoreCustomSizeMetric(_, desc), value) =>
           s.copy(desc = newDesc(desc)) -> value
         case (s @ StateStoreCustomTimingMetric(_, desc), value) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/42035a4f/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
index 34e26d8..7351db8 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
@@ -88,10 +88,18 @@ trait StateStoreWriter extends StatefulOperator { self: 
SparkPlan =>
    * the driver after this SparkPlan has been executed and metrics have been 
updated.
    */
   def getProgress(): StateOperatorProgress = {
+    val customMetrics = stateStoreCustomMetrics
+      .map(entry => entry._1 -> longMetric(entry._1).value)
+
+    val javaConvertedCustomMetrics: java.util.HashMap[String, java.lang.Long] =
+      new java.util.HashMap(customMetrics.mapValues(long2Long).asJava)
+
     new StateOperatorProgress(
       numRowsTotal = longMetric("numTotalStateRows").value,
       numRowsUpdated = longMetric("numUpdatedStateRows").value,
-      memoryUsedBytes = longMetric("stateMemory").value)
+      memoryUsedBytes = longMetric("stateMemory").value,
+      javaConvertedCustomMetrics
+    )
   }
 
   /** Records the duration of running `body` for the next query progress 
update. */
@@ -113,6 +121,8 @@ trait StateStoreWriter extends StatefulOperator { self: 
SparkPlan =>
   private def stateStoreCustomMetrics: Map[String, SQLMetric] = {
     val provider = 
StateStoreProvider.create(sqlContext.conf.stateStoreProviderClass)
     provider.supportedCustomMetrics.map {
+      case StateStoreCustomSumMetric(name, desc) =>
+        name -> SQLMetrics.createMetric(sparkContext, desc)
       case StateStoreCustomSizeMetric(name, desc) =>
         name -> SQLMetrics.createSizeMetric(sparkContext, desc)
       case StateStoreCustomTimingMetric(name, desc) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/42035a4f/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
index 2fb8796..cf9375d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
@@ -38,7 +38,8 @@ import org.apache.spark.annotation.InterfaceStability
 class StateOperatorProgress private[sql](
     val numRowsTotal: Long,
     val numRowsUpdated: Long,
-    val memoryUsedBytes: Long
+    val memoryUsedBytes: Long,
+    val customMetrics: ju.Map[String, JLong] = new ju.HashMap()
   ) extends Serializable {
 
   /** The compact JSON representation of this progress. */
@@ -48,12 +49,20 @@ class StateOperatorProgress private[sql](
   def prettyJson: String = pretty(render(jsonValue))
 
   private[sql] def copy(newNumRowsUpdated: Long): StateOperatorProgress =
-    new StateOperatorProgress(numRowsTotal, newNumRowsUpdated, memoryUsedBytes)
+    new StateOperatorProgress(numRowsTotal, newNumRowsUpdated, 
memoryUsedBytes, customMetrics)
 
   private[sql] def jsonValue: JValue = {
     ("numRowsTotal" -> JInt(numRowsTotal)) ~
     ("numRowsUpdated" -> JInt(numRowsUpdated)) ~
-    ("memoryUsedBytes" -> JInt(memoryUsedBytes))
+    ("memoryUsedBytes" -> JInt(memoryUsedBytes)) ~
+    ("customMetrics" -> {
+      if (!customMetrics.isEmpty) {
+        val keys = customMetrics.keySet.asScala.toSeq.sorted
+        keys.map { k => k -> JInt(customMetrics.get(k).toLong) : JObject 
}.reduce(_ ~ _)
+      } else {
+        JNothing
+      }
+    })
   }
 
   override def toString: String = prettyJson

http://git-wip-us.apache.org/repos/asf/spark/blob/42035a4f/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
index bfeb2b1..5e97314 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
@@ -317,6 +317,22 @@ class StateStoreSuite extends 
StateStoreSuiteBase[HDFSBackedStateStoreProvider]
     assert(store.metrics.memoryUsedBytes > noDataMemoryUsed)
   }
 
+  test("reports memory usage on current version") {
+    def getSizeOfStateForCurrentVersion(metrics: StateStoreMetrics): Long = {
+      val metricPair = metrics.customMetrics.find(_._1.name == 
"stateOnCurrentVersionSizeBytes")
+      assert(metricPair.isDefined)
+      metricPair.get._2
+    }
+
+    val provider = newStoreProvider()
+    val store = provider.getStore(0)
+    val noDataMemoryUsed = getSizeOfStateForCurrentVersion(store.metrics)
+
+    put(store, "a", 1)
+    store.commit()
+    assert(getSizeOfStateForCurrentVersion(store.metrics) > noDataMemoryUsed)
+  }
+
   test("StateStore.get") {
     quietly {
       val dir = newDir()
@@ -631,6 +647,90 @@ class StateStoreSuite extends 
StateStoreSuiteBase[HDFSBackedStateStoreProvider]
     assert(CreateAtomicTestManager.cancelCalledInCreateAtomic)
   }
 
+  test("expose metrics with custom metrics to StateStoreMetrics") {
+    def getCustomMetric(metrics: StateStoreMetrics, name: String): Long = {
+      val metricPair = metrics.customMetrics.find(_._1.name == name)
+      assert(metricPair.isDefined)
+      metricPair.get._2
+    }
+
+    def getLoadedMapSizeMetric(metrics: StateStoreMetrics): Long = {
+      metrics.memoryUsedBytes
+    }
+
+    def assertCacheHitAndMiss(
+        metrics: StateStoreMetrics,
+        expectedCacheHitCount: Long,
+        expectedCacheMissCount: Long): Unit = {
+      val cacheHitCount = getCustomMetric(metrics, "loadedMapCacheHitCount")
+      val cacheMissCount = getCustomMetric(metrics, "loadedMapCacheMissCount")
+      assert(cacheHitCount === expectedCacheHitCount)
+      assert(cacheMissCount === expectedCacheMissCount)
+    }
+
+    val provider = newStoreProvider()
+
+    // Verify state before starting a new set of updates
+    assert(getLatestData(provider).isEmpty)
+
+    val store = provider.getStore(0)
+    assert(!store.hasCommitted)
+
+    assert(store.metrics.numKeys === 0)
+
+    val initialLoadedMapSize = getLoadedMapSizeMetric(store.metrics)
+    assert(initialLoadedMapSize >= 0)
+    assertCacheHitAndMiss(store.metrics, expectedCacheHitCount = 0, 
expectedCacheMissCount = 0)
+
+    put(store, "a", 1)
+    assert(store.metrics.numKeys === 1)
+
+    put(store, "b", 2)
+    put(store, "aa", 3)
+    assert(store.metrics.numKeys === 3)
+    remove(store, _.startsWith("a"))
+    assert(store.metrics.numKeys === 1)
+    assert(store.commit() === 1)
+
+    assert(store.hasCommitted)
+
+    val loadedMapSizeForVersion1 = getLoadedMapSizeMetric(store.metrics)
+    assert(loadedMapSizeForVersion1 > initialLoadedMapSize)
+    assertCacheHitAndMiss(store.metrics, expectedCacheHitCount = 0, 
expectedCacheMissCount = 0)
+
+    val storeV2 = provider.getStore(1)
+    assert(!storeV2.hasCommitted)
+    assert(storeV2.metrics.numKeys === 1)
+
+    put(storeV2, "cc", 4)
+    assert(storeV2.metrics.numKeys === 2)
+    assert(storeV2.commit() === 2)
+
+    assert(storeV2.hasCommitted)
+
+    val loadedMapSizeForVersion1And2 = getLoadedMapSizeMetric(storeV2.metrics)
+    assert(loadedMapSizeForVersion1And2 > loadedMapSizeForVersion1)
+    assertCacheHitAndMiss(storeV2.metrics, expectedCacheHitCount = 1, 
expectedCacheMissCount = 0)
+
+    val reloadedProvider = newStoreProvider(store.id)
+    // intended to load version 2 instead of 1
+    // version 2 will not be loaded to the cache in provider
+    val reloadedStore = reloadedProvider.getStore(1)
+    assert(reloadedStore.metrics.numKeys === 1)
+
+    assert(getLoadedMapSizeMetric(reloadedStore.metrics) === 
loadedMapSizeForVersion1)
+    assertCacheHitAndMiss(reloadedStore.metrics, expectedCacheHitCount = 0,
+      expectedCacheMissCount = 1)
+
+    // now we are loading version 2
+    val reloadedStoreV2 = reloadedProvider.getStore(2)
+    assert(reloadedStoreV2.metrics.numKeys === 2)
+
+    assert(getLoadedMapSizeMetric(reloadedStoreV2.metrics) > 
loadedMapSizeForVersion1)
+    assertCacheHitAndMiss(reloadedStoreV2.metrics, expectedCacheHitCount = 0,
+      expectedCacheMissCount = 2)
+  }
+
   override def newStoreProvider(): HDFSBackedStateStoreProvider = {
     newStoreProvider(opId = Random.nextInt(), partition = 0)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/42035a4f/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
index b96f2bc..0f15cd6 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
@@ -231,7 +231,7 @@ class StreamingQueryListenerSuite extends StreamTest with 
BeforeAndAfter {
   test("event ordering") {
     val listener = new EventCollector
     withListenerAdded(listener) {
-      for (i <- 1 to 100) {
+      for (i <- 1 to 50) {
         listener.reset()
         require(listener.startEvent === null)
         testStream(MemoryStream[Int].toDS)(

http://git-wip-us.apache.org/repos/asf/spark/blob/42035a4f/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
index 79bb827..7bef687 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
@@ -58,7 +58,12 @@ class StreamingQueryStatusAndProgressSuite extends 
StreamTest with Eventually {
         |  "stateOperators" : [ {
         |    "numRowsTotal" : 0,
         |    "numRowsUpdated" : 1,
-        |    "memoryUsedBytes" : 2
+        |    "memoryUsedBytes" : 3,
+        |    "customMetrics" : {
+        |      "loadedMapCacheHitCount" : 1,
+        |      "loadedMapCacheMissCount" : 0,
+        |      "stateOnCurrentVersionSizeBytes" : 2
+        |    }
         |  } ],
         |  "sources" : [ {
         |    "description" : "source",
@@ -230,7 +235,11 @@ object StreamingQueryStatusAndProgressSuite {
       "avg" -> "2016-12-05T20:54:20.827Z",
       "watermark" -> "2016-12-05T20:54:20.827Z").asJava),
     stateOperators = Array(new StateOperatorProgress(
-      numRowsTotal = 0, numRowsUpdated = 1, memoryUsedBytes = 2)),
+      numRowsTotal = 0, numRowsUpdated = 1, memoryUsedBytes = 3,
+      customMetrics = new 
java.util.HashMap(Map("stateOnCurrentVersionSizeBytes" -> 2L,
+        "loadedMapCacheHitCount" -> 1L, "loadedMapCacheMissCount" -> 0L)
+        .mapValues(long2Long).asJava)
+    )),
     sources = Array(
       new SourceProgress(
         description = "source",


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to