tprabh509 opened a new pull request #28581:
URL: https://github.com/apache/spark/pull/28581


   ### What changes were proposed in this pull request?
   Currently KCL 1 is no longer supported by AWS and they already moved to KCL 
2. Since Spark kinesis asl library user KCL 1 it can cause issues reported by 
AWS for KCL 1. https://github.com/awslabs/amazon-kinesis-client/issues/391
   The issue I have already reported in JIRA
   https://issues.apache.org/jira/browse/SPARK-31236
   
   
   ### Why are the changes needed?
   Added KCL 2 support. Did not remove KCL 1. With current KCL 1 implementation 
the user can run into few limitation
   
   1. Application cannot use kinesis direct end. We can use custom URL with KCL 
2
   2. dynamoDB proxy cannot be used. With KCL 2 added support for dynamoDB proxy
   3. cloud watch direct endpoint cannot be used. We can use custom URL
   Application cannot run without internet connection or firewall restrictions.
   
   
   ### Does this PR introduce _any_ user-facing change?
   Added KCL 2 support. Did not remove KCL 1. With current KCL 1 implementation 
the user can run into few limitation
   
   1. Application cannot use kinesis direct end.
   2. dynamoDB proxy cannot be used
   3. cloud watch direct endpoint cannot be used
   Application cannot run without internet connection or firewall restrictions.
   
   
   ### How was this patch tested?
   Tested with our application and test client updated with KCL 2 testing.
   
   import org.apache.spark.streaming.kinesis2.KinesisInputDStream;
   import org.apache.spark.streaming.kinesis2.SparkAWSCredentials;
   
   SparkAWSCredentials credentials = 
SparkAWSCredentials.builder().basicCredentials(awsKey, awsSecret).build();
   
   URI uri = new URI(endpointURL);
   URI cloudWatchURI = new URI(cloudWatchURL);
   
   InitialPositionInStream initPosition = InitialPositionInStream.TRIM_HORIZON;
   
   KinesisInputDStream<byte[]> kinStream =KinesisInputDStream.builder()
                               .streamingContext(jssc)
                               .checkpointAppName(applicationName)
                               .streamName(streamName)
                               .regionName(regionName)
                               .endpointUrl(uri)
                               .cloudWatchUrl(cloudWatchURI)
                               .kinesisCreds(credentials)
                               .dynamoDBCreds(credentials)
                               .maxRecords(maxRecords)
                               .protocol(httpProtocol)
                               .initialPositionInStream(initPosition)
                               .cloudWatchCreds(credentials)
                               .dynamoProxyHost(proxyHost)
                               .dynamoProxyPort(proxyPort)
                               .checkpointInterval(checkpointInterval)
                               
.storageLevel(StorageLevel.MEMORY_AND_DISK_2()).build();
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to