smcmullan-ncirl commented on PR #37206:
URL: https://github.com/apache/spark/pull/37206#issuecomment-1208072897

   Hi, so I was able to reproduce the issue with the example application below.
   
   It shows an custom ForEach sink which updates a set of Accumulators. The Map 
containing the accumulators is passed in the call to the Foreach sink and thus 
serialiazed/deserialized
   
   I've integrated Codahale/DropWizard metrics for reporting metrics based on 
the accumulators and enabled the metrics sink like this:
   
   `driver.sink.console.class=org.apache.spark.metrics.sink.ConsoleSink
   *.sink.console.period=5
   *.sink.console.unit=seconds
   `
   
   You may notice in the example application code below that that overridden 
method for getMetrics() resets the accumulator by calling its reset() method.
   
   I wonder whether this causes the situation that @JoshRosen wrote about above?
   
   
   `import com.codahale.metrics.{Gauge, Metric, MetricRegistry, MetricSet}
   import org.apache.spark.metrics.source.StatsSource
   import org.apache.spark.sql.{ForeachWriter, SparkSession}
   import org.apache.spark.util.LongAccumulator
   import org.apache.spark.{SparkConf, SparkContext, SparkEnv}
   import org.slf4j.LoggerFactory
   
   import java.util
   
   object RateSourceSparkStreamTest {
     def main(args: Array[String]): Unit = {
       val conf: SparkConf = new SparkConf()
         .setAppName("RateSourceStreamTest")
   
       val spark = SparkSession
         .builder()
         .config(conf)
         .getOrCreate()
   
       val stats = new PartitionStats(spark.sparkContext)
       val statsMap = stats.getStatsMap
   
       val metricRegistry = new MetricRegistry
       metricRegistry.registerAll(stats)
   
       SparkEnv.get.metricsSystem.registerSource(
         StatsSource("RateSourceSparkStreamTest", metricRegistry)
       )
   
       import spark.implicits._
   
       spark
         .readStream
         .format("rate")
         .option("numPartitions", 120)
         .option("rowsPerSecond", 12000)
         .load()
         .map(row => row.mkString("##"))
         .writeStream
         .foreach(new ForeachWriterImpl(statsMap))
         .start()
   
       spark.streams.awaitAnyTermination()
   
       spark.close
     }
   }
   
   class ForeachWriterImpl(statsMap: Map[PartitionStats.Value, 
LongAccumulator]) extends ForeachWriter[String] {
     private final val LOGGER = LoggerFactory.getLogger(this.getClass)
   
     override def open(partitionId: Long, epochId: Long): Boolean = {
       LOGGER.info(s"Open partition $partitionId, epoch $epochId")
       PartitionStats.incMetric(statsMap, PartitionStats.partitionsOpened, 1)
       true
     }
   
     override def process(value: String): Unit = {
       LOGGER.info(s"Process value: $value")
       PartitionStats.incMetric(statsMap, PartitionStats.partitionsProcessed, 1)
     }
   
     override def close(errorOrNull: Throwable): Unit = {
       LOGGER.info(s"Close partition: $errorOrNull")
       PartitionStats.incMetric(statsMap, PartitionStats.partitionsClosed, 1)
     }
   }
   
   object PartitionStats extends Enumeration {
     private final val LOGGER = LoggerFactory.getLogger(this.getClass)
   
     final val partitionsOpened = Value("partitionsOpened")
     final val partitionsProcessed = Value("partitionsProcessed")
     final val partitionsClosed = Value("partitionsClosed")
   
     def incMetric(statsMap: Map[PartitionStats.Value, LongAccumulator], stat: 
PartitionStats.Value, count: Long): Unit = {
       statsMap.get(stat) match {
         case Some(acc) => acc.add(count)
         case _ => LOGGER.error(s"Cannot increment accumulator for $stat")
       }
     }
   }
   
   class PartitionStats(sparkContext: SparkContext) extends MetricSet {
     private final val statsMap: Map[PartitionStats.Value, LongAccumulator] =
       PartitionStats.values.unsorted.map(elem => elem -> 
sparkContext.longAccumulator(elem.toString)).toMap
   
     def getStatsMap: Map[PartitionStats.Value, LongAccumulator] = statsMap
   
     override def getMetrics: util.Map[String, Metric] = {
       val metricsMap: Map[String, Metric] = statsMap.map(
         e =>
           (
             e._1.toString,
             new Gauge[Long]() {
               override def getValue: Long = {
                 val metricValue = e._2.value
   
                 e._2.reset() // this is possibly the problem!!!!
   
                 metricValue
               }
             }
           )
       )
   
       import scala.jdk.CollectionConverters._
       metricsMap.asJava
     }
   }
   
   package org.apache.spark.metrics.source {
     case class StatsSource(srcName: String, registry: MetricRegistry) extends 
Source {
       override def sourceName: String = srcName
   
       override def metricRegistry: MetricRegistry = registry
     }
   }`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to