tprabh509 opened a new pull request #28581: URL: https://github.com/apache/spark/pull/28581
### What changes were proposed in this pull request? Currently KCL 1 is no longer supported by AWS and they already moved to KCL 2. Since Spark kinesis asl library user KCL 1 it can cause issues reported by AWS for KCL 1. https://github.com/awslabs/amazon-kinesis-client/issues/391 The issue I have already reported in JIRA https://issues.apache.org/jira/browse/SPARK-31236 ### Why are the changes needed? Added KCL 2 support. Did not remove KCL 1. With current KCL 1 implementation the user can run into few limitation 1. Application cannot use kinesis direct end. We can use custom URL with KCL 2 2. dynamoDB proxy cannot be used. With KCL 2 added support for dynamoDB proxy 3. cloud watch direct endpoint cannot be used. We can use custom URL Application cannot run without internet connection or firewall restrictions. ### Does this PR introduce _any_ user-facing change? Added KCL 2 support. Did not remove KCL 1. With current KCL 1 implementation the user can run into few limitation 1. Application cannot use kinesis direct end. 2. dynamoDB proxy cannot be used 3. cloud watch direct endpoint cannot be used Application cannot run without internet connection or firewall restrictions. ### How was this patch tested? Tested with our application and test client updated with KCL 2 testing. import org.apache.spark.streaming.kinesis2.KinesisInputDStream; import org.apache.spark.streaming.kinesis2.SparkAWSCredentials; SparkAWSCredentials credentials = SparkAWSCredentials.builder().basicCredentials(awsKey, awsSecret).build(); URI uri = new URI(endpointURL); URI cloudWatchURI = new URI(cloudWatchURL); InitialPositionInStream initPosition = InitialPositionInStream.TRIM_HORIZON; KinesisInputDStream<byte[]> kinStream =KinesisInputDStream.builder() .streamingContext(jssc) .checkpointAppName(applicationName) .streamName(streamName) .regionName(regionName) .endpointUrl(uri) .cloudWatchUrl(cloudWatchURI) .kinesisCreds(credentials) .dynamoDBCreds(credentials) .maxRecords(maxRecords) .protocol(httpProtocol) .initialPositionInStream(initPosition) .cloudWatchCreds(credentials) .dynamoProxyHost(proxyHost) .dynamoProxyPort(proxyPort) .checkpointInterval(checkpointInterval) .storageLevel(StorageLevel.MEMORY_AND_DISK_2()).build(); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org