This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 907074bafad [SPARK-38881][DSTREAMS][KINESIS][PYSPARK] Added Support for CloudWatch MetricsLevel Config 907074bafad is described below commit 907074bafad0da3d1c802a4389589658ecf93432 Author: Mark Khaitman <mkhait...@freewheel.com> AuthorDate: Sat Apr 16 21:30:15 2022 -0500 [SPARK-38881][DSTREAMS][KINESIS][PYSPARK] Added Support for CloudWatch MetricsLevel Config JIRA: https://issues.apache.org/jira/browse/SPARK-38881 ### What changes were proposed in this pull request? Exposing a configuration option (metricsLevel) used for CloudWatch metrics reporting when consuming from an AWS Kinesis Stream, which is already available in Scala/Java Spark APIs This relates to https://issues.apache.org/jira/browse/SPARK-27420 which was merged as part of Spark 3.0.0 ### Why are the changes needed? This change is desirable as it further exposes the metricsLevel config parameter that was added for the Scala/Java Spark APIs when working with the Kinesis Streaming integration, and makes it available to the PySpark API as well. ### Does this PR introduce _any_ user-facing change? No. Default behavior of MetricsLevel.DETAILED is maintained. ### How was this patch tested? This change passes all tests, and local testing was done with a development Kinesis stream in AWS, in order to confirm that metrics were no longer being reported to CloudWatch after specifying MetricsLevel.NONE in the PySpark Kinesis streaming context creation, and also worked as it does today when leaving the MetricsLevel parameter out, which would result in a default of DETAILED, with CloudWatch metrics appearing again. Built with: ``` # ./build/mvn -pl :spark-streaming-kinesis-asl_2.12 -DskipTests -Pkinesis-asl clean install ``` Tested with small pyspark kinesis streaming context + AWS kinesis stream, using updated streaming kinesis asl jar: ``` # spark-submit --packages org.apache.spark:spark-streaming-kinesis-asl_2.12:3.2.1 --jars spark/connector/kinesis-asl/target/spark-streaming-kinesis-asl_2.12-3.4.0-SNAPSHOT.jar metricsLevelTesting.py ``` Closes #36201 from mkman84/metricsLevel-pyspark. Authored-by: Mark Khaitman <mkhait...@freewheel.com> Signed-off-by: Sean Owen <sro...@gmail.com> --- .../kinesis/KinesisUtilsPythonHelper.scala | 10 ++++++++++ docs/streaming-kinesis-integration.md | 10 ++++++---- python/pyspark/streaming/kinesis.py | 22 +++++++++++++++++----- python/pyspark/streaming/tests/test_kinesis.py | 5 ++++- 4 files changed, 37 insertions(+), 10 deletions(-) diff --git a/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtilsPythonHelper.scala b/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtilsPythonHelper.scala index 0056438c4ee..8abaef6b834 100644 --- a/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtilsPythonHelper.scala +++ b/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtilsPythonHelper.scala @@ -17,6 +17,7 @@ package org.apache.spark.streaming.kinesis import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream +import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.Duration @@ -37,6 +38,7 @@ private class KinesisUtilsPythonHelper { regionName: String, initialPositionInStream: Int, checkpointInterval: Duration, + metricsLevel: Int, storageLevel: StorageLevel, awsAccessKeyId: String, awsSecretKey: String, @@ -64,6 +66,13 @@ private class KinesisUtilsPythonHelper { "InitialPositionInStream.LATEST or InitialPositionInStream.TRIM_HORIZON") } + val cloudWatchMetricsLevel = metricsLevel match { + case 0 => MetricsLevel.DETAILED + case 1 => MetricsLevel.SUMMARY + case 2 => MetricsLevel.NONE + case _ => MetricsLevel.DETAILED + } + val builder = KinesisInputDStream.builder. streamingContext(jssc). checkpointAppName(kinesisAppName). @@ -72,6 +81,7 @@ private class KinesisUtilsPythonHelper { regionName(regionName). initialPosition(KinesisInitialPositions.fromKinesisInitialPosition(kinesisInitialPosition)). checkpointInterval(checkpointInterval). + metricsLevel(cloudWatchMetricsLevel). storageLevel(storageLevel) if (stsAssumeRoleArn != null && stsSessionName != null && stsExternalId != null) { diff --git a/docs/streaming-kinesis-integration.md b/docs/streaming-kinesis-integration.md index dc80ff05226..2ce30d7efe2 100644 --- a/docs/streaming-kinesis-integration.md +++ b/docs/streaming-kinesis-integration.md @@ -56,6 +56,7 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m .initialPosition([initial position]) .checkpointAppName([Kinesis app name]) .checkpointInterval([checkpoint interval]) + .metricsLevel([metricsLevel.DETAILED]) .storageLevel(StorageLevel.MEMORY_AND_DISK_2) .build() @@ -78,6 +79,7 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m .initialPosition([initial position]) .checkpointAppName([Kinesis app name]) .checkpointInterval([checkpoint interval]) + .metricsLevel([metricsLevel.DETAILED]) .storageLevel(StorageLevel.MEMORY_AND_DISK_2) .build(); @@ -90,20 +92,20 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m kinesisStream = KinesisUtils.createStream( streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint URL], - [region name], [initial position], [checkpoint interval], StorageLevel.MEMORY_AND_DISK_2) + [region name], [initial position], [checkpoint interval], [metricsLevel.DETAILED], StorageLevel.MEMORY_AND_DISK_2) See the [API docs](api/python/reference/pyspark.streaming.html#kinesis) and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/connector/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py). Refer to the [Running the Example](#running-the-example) subsection for instructions to run the example. + - CloudWatch metrics level and dimensions. See [the AWS documentation about monitoring KCL](https://docs.aws.amazon.com/streams/latest/dev/monitoring-with-kcl.html) for details. Default is MetricsLevel.DETAILED + </div> </div> - You may also provide the following settings. These are currently only supported in Scala and Java. + You may also provide the following settings. This is currently only supported in Scala and Java. - A "message handler function" that takes a Kinesis `Record` and returns a generic object `T`, in case you would like to use other data included in a `Record` such as partition key. - - CloudWatch metrics level and dimensions. See [the AWS documentation about monitoring KCL](https://docs.aws.amazon.com/streams/latest/dev/monitoring-with-kcl.html) for details. - <div class="codetabs"> <div data-lang="scala" markdown="1"> import collection.JavaConverters._ diff --git a/python/pyspark/streaming/kinesis.py b/python/pyspark/streaming/kinesis.py index 150fb79f572..0eede341f9b 100644 --- a/python/pyspark/streaming/kinesis.py +++ b/python/pyspark/streaming/kinesis.py @@ -23,7 +23,16 @@ from pyspark.streaming.context import StreamingContext from pyspark.util import _print_missing_jar -__all__ = ["KinesisUtils", "InitialPositionInStream", "utf8_decoder"] +__all__ = ["KinesisUtils", "InitialPositionInStream", "MetricsLevel", "utf8_decoder"] + + +class InitialPositionInStream: + LATEST, TRIM_HORIZON = (0, 1) + + +class MetricsLevel: + DETAILED, SUMMARY, NONE = (0, 1, 2) + T = TypeVar("T") @@ -46,6 +55,7 @@ class KinesisUtils: regionName: str, initialPositionInStream: str, checkpointInterval: int, + metricsLevel: int = MetricsLevel.DETAILED, storageLevel: StorageLevel = ..., awsAccessKeyId: Optional[str] = ..., awsSecretKey: Optional[str] = ..., @@ -66,6 +76,7 @@ class KinesisUtils: regionName: str, initialPositionInStream: str, checkpointInterval: int, + metricsLevel: int = MetricsLevel.DETAILED, storageLevel: StorageLevel = ..., awsAccessKeyId: Optional[str] = ..., awsSecretKey: Optional[str] = ..., @@ -85,6 +96,7 @@ class KinesisUtils: regionName: str, initialPositionInStream: str, checkpointInterval: int, + metricsLevel: int = MetricsLevel.DETAILED, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_2, awsAccessKeyId: Optional[str] = None, awsSecretKey: Optional[str] = None, @@ -123,6 +135,9 @@ class KinesisUtils: Checkpoint interval(in seconds) for Kinesis checkpointing. See the Kinesis Spark Streaming documentation for more details on the different types of checkpoints. + metricsLevel : int + Level of CloudWatch PutMetrics. + Can be set to either DETAILED, SUMMARY, or NONE. (default is DETAILED) storageLevel : :class:`pyspark.StorageLevel`, optional Storage level to use for storing the received objects (default is StorageLevel.MEMORY_AND_DISK_2) @@ -178,6 +193,7 @@ class KinesisUtils: regionName, initialPositionInStream, jduration, + metricsLevel, jlevel, awsAccessKeyId, awsSecretKey, @@ -187,7 +203,3 @@ class KinesisUtils: ) stream: DStream = DStream(jstream, ssc, NoOpSerializer()) return stream.map(lambda v: decoder(v)) - - -class InitialPositionInStream: - LATEST, TRIM_HORIZON = (0, 1) diff --git a/python/pyspark/streaming/tests/test_kinesis.py b/python/pyspark/streaming/tests/test_kinesis.py index 221ec4dd984..7b09f5b8f5d 100644 --- a/python/pyspark/streaming/tests/test_kinesis.py +++ b/python/pyspark/streaming/tests/test_kinesis.py @@ -18,7 +18,7 @@ import time import unittest from pyspark import StorageLevel -from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream +from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream, MetricsLevel from pyspark.testing.streamingutils import ( should_test_kinesis, kinesis_requirement_message, @@ -38,6 +38,7 @@ class KinesisStreamTests(PySparkStreamingTestCase): "us-west-2", InitialPositionInStream.LATEST, 2, + MetricsLevel.DETAILED, StorageLevel.MEMORY_AND_DISK_2, ) KinesisUtils.createStream( @@ -48,6 +49,7 @@ class KinesisStreamTests(PySparkStreamingTestCase): "us-west-2", InitialPositionInStream.LATEST, 2, + MetricsLevel.DETAILED, StorageLevel.MEMORY_AND_DISK_2, "awsAccessKey", "awsSecretKey", @@ -69,6 +71,7 @@ class KinesisStreamTests(PySparkStreamingTestCase): kinesisTestUtils.regionName(), InitialPositionInStream.LATEST, 10, + MetricsLevel.DETAILED, StorageLevel.MEMORY_ONLY, aWSCredentials.getAWSAccessKeyId(), aWSCredentials.getAWSSecretKey(), --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org