This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 547996e [SPARK-37147][SS] MetricsReporter producing NullPointerException when element 'triggerExecution' not present in Map[] 547996e is described below commit 547996efed8e4593265aea2eb26b10f8b366e141 Author: Radek Busz <ext.radek.b...@ef.com> AuthorDate: Sun Oct 31 07:30:14 2021 +0900 [SPARK-37147][SS] MetricsReporter producing NullPointerException when element 'triggerExecution' not present in Map[] ### What changes were proposed in this pull request? Bug Fix. The problematic code is in `MetricsReporter`: `registerGauge("latency", _.durationMs.get("triggerExecution").longValue(), 0L)` Instead of `.getOrDefault(...).longValue()` it uses `.get("triggerExecution").longValue()` which can return a null pointer exception if "triggerExecution" is not in the durationMs map. Solution: use `.getOrDefault` when accessing a map. ### Why are the changes needed? When `MetricsReporter.scala` registers a Gauge it occasionally returns a NPE. This breaks reporting custom metrics via Dropwizard and logs multiple times a stacktrace. It usually happens when using StructuredStreaming on a slow data source but I'm not able to reliably reproduce it every time. ### Does this PR introduce _any_ user-facing change? Yes - fixes occasional failures when reporting metrics with Dropwizard ### How was this patch tested? Added a unit-test. Closes #34426 from gitplaneta/SPARK-37147. Authored-by: Radek Busz <ext.radek.b...@ef.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../spark/sql/execution/streaming/MetricsReporter.scala | 2 +- .../apache/spark/sql/streaming/StreamingQuerySuite.scala | 13 +++++++++++++ 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala index 8709822..600b16f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala @@ -40,7 +40,7 @@ class MetricsReporter( // together in Ganglia as a single metric group registerGauge("inputRate-total", _.inputRowsPerSecond, 0.0) registerGauge("processingRate-total", _.processedRowsPerSecond, 0.0) - registerGauge("latency", _.durationMs.get("triggerExecution").longValue(), 0L) + registerGauge("latency", _.durationMs.getOrDefault("triggerExecution", 0L).longValue(), 0L) private val timestampFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601 timestampFormat.setTimeZone(DateTimeUtils.getTimeZone("UTC")) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index 9c2403d..21a0b24 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -19,13 +19,16 @@ package org.apache.spark.sql.streaming import java.io.File import java.nio.charset.StandardCharsets.UTF_8 +import java.util.Collections import java.util.concurrent.CountDownLatch import scala.collection.mutable +import scala.util.{Success, Try} import org.apache.commons.io.FileUtils import org.apache.commons.lang3.RandomStringUtils import org.apache.hadoop.fs.Path +import org.mockito.Mockito.when import org.scalactic.TolerantNumerics import org.scalatest.BeforeAndAfter import org.scalatest.concurrent.PatienceConfiguration.Timeout @@ -465,6 +468,16 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi } } + test("SPARK-37147: MetricsReporter does not fail when durationMs is empty") { + val stateOpProgressMock = mock[StreamingQueryProgress] + when(stateOpProgressMock.durationMs).thenReturn(Collections.emptyMap[String, java.lang.Long]()) + val streamExecMock = mock[StreamExecution] + when(streamExecMock.lastProgress).thenReturn(stateOpProgressMock) + + val gauges = new MetricsReporter(streamExecMock, "").metricRegistry.getGauges() + assert(Try(gauges.get("latency").getValue) == Success(0L)) + } + test("input row calculation with same V1 source used twice in self-join") { val streamingTriggerDF = spark.createDataset(1 to 10).toDF val streamingInputDF = createSingleTriggerStreamingDF(streamingTriggerDF).toDF("value") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org