smcmullan-ncirl commented on PR #37206:
URL: https://github.com/apache/spark/pull/37206#issuecomment-1208072897
Hi, so I was able to reproduce the issue with the example application below.
It shows an custom ForEach sink which updates a set of Accumulators. The Map
containing the accumulators is passed in the call to the Foreach sink and thus
serialiazed/deserialized
I've integrated Codahale/DropWizard metrics for reporting metrics based on
the accumulators and enabled the metrics sink like this:
`driver.sink.console.class=org.apache.spark.metrics.sink.ConsoleSink
*.sink.console.period=5
*.sink.console.unit=seconds
`
You may notice in the example application code below that that overridden
method for getMetrics() resets the accumulator by calling its reset() method.
I wonder whether this causes the situation that @JoshRosen wrote about above?
`import com.codahale.metrics.{Gauge, Metric, MetricRegistry, MetricSet}
import org.apache.spark.metrics.source.StatsSource
import org.apache.spark.sql.{ForeachWriter, SparkSession}
import org.apache.spark.util.LongAccumulator
import org.apache.spark.{SparkConf, SparkContext, SparkEnv}
import org.slf4j.LoggerFactory
import java.util
object RateSourceSparkStreamTest {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
.setAppName("RateSourceStreamTest")
val spark = SparkSession
.builder()
.config(conf)
.getOrCreate()
val stats = new PartitionStats(spark.sparkContext)
val statsMap = stats.getStatsMap
val metricRegistry = new MetricRegistry
metricRegistry.registerAll(stats)
SparkEnv.get.metricsSystem.registerSource(
StatsSource("RateSourceSparkStreamTest", metricRegistry)
)
import spark.implicits._
spark
.readStream
.format("rate")
.option("numPartitions", 120)
.option("rowsPerSecond", 12000)
.load()
.map(row => row.mkString("##"))
.writeStream
.foreach(new ForeachWriterImpl(statsMap))
.start()
spark.streams.awaitAnyTermination()
spark.close
}
}
class ForeachWriterImpl(statsMap: Map[PartitionStats.Value,
LongAccumulator]) extends ForeachWriter[String] {
private final val LOGGER = LoggerFactory.getLogger(this.getClass)
override def open(partitionId: Long, epochId: Long): Boolean = {
LOGGER.info(s"Open partition $partitionId, epoch $epochId")
PartitionStats.incMetric(statsMap, PartitionStats.partitionsOpened, 1)
true
}
override def process(value: String): Unit = {
LOGGER.info(s"Process value: $value")
PartitionStats.incMetric(statsMap, PartitionStats.partitionsProcessed, 1)
}
override def close(errorOrNull: Throwable): Unit = {
LOGGER.info(s"Close partition: $errorOrNull")
PartitionStats.incMetric(statsMap, PartitionStats.partitionsClosed, 1)
}
}
object PartitionStats extends Enumeration {
private final val LOGGER = LoggerFactory.getLogger(this.getClass)
final val partitionsOpened = Value("partitionsOpened")
final val partitionsProcessed = Value("partitionsProcessed")
final val partitionsClosed = Value("partitionsClosed")
def incMetric(statsMap: Map[PartitionStats.Value, LongAccumulator], stat:
PartitionStats.Value, count: Long): Unit = {
statsMap.get(stat) match {
case Some(acc) => acc.add(count)
case _ => LOGGER.error(s"Cannot increment accumulator for $stat")
}
}
}
class PartitionStats(sparkContext: SparkContext) extends MetricSet {
private final val statsMap: Map[PartitionStats.Value, LongAccumulator] =
PartitionStats.values.unsorted.map(elem => elem ->
sparkContext.longAccumulator(elem.toString)).toMap
def getStatsMap: Map[PartitionStats.Value, LongAccumulator] = statsMap
override def getMetrics: util.Map[String, Metric] = {
val metricsMap: Map[String, Metric] = statsMap.map(
e =>
(
e._1.toString,
new Gauge[Long]() {
override def getValue: Long = {
val metricValue = e._2.value
e._2.reset() // this is possibly the problem!!!!
metricValue
}
}
)
)
import scala.jdk.CollectionConverters._
metricsMap.asJava
}
}
package org.apache.spark.metrics.source {
case class StatsSource(srcName: String, registry: MetricRegistry) extends
Source {
override def sourceName: String = srcName
override def metricRegistry: MetricRegistry = registry
}
}`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]