LantaoJin commented on code in PR #44211:
URL: https://github.com/apache/spark/pull/44211#discussion_r1458501384


##########
connector/kinesis-asl/src/main/java/org/apache/spark/streaming/kinesis/KinesisInitialPositions.java:
##########
@@ -16,7 +16,8 @@
  */
 package org.apache.spark.streaming.kinesis;
 
-import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
+

Review Comment:
   trivial: omit the empty line #L19



##########
connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointer.scala:
##########
@@ -60,7 +60,7 @@ private[kinesis] class KinesisCheckpointer(
    * we will use that to make the final checkpoint. If `null` is provided, we 
will not make the

Review Comment:
   Could you correct this comment section? At least no more `IRecordProcessor`.



##########
connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala:
##########
@@ -244,20 +266,20 @@ private[kinesis] object KinesisTestUtils {
   }
 
   def isAWSCredentialsPresent: Boolean = {
-    Try { new DefaultAWSCredentialsProviderChain().getCredentials() }.isSuccess
+    Try { DefaultCredentialsProvider.create().resolveCredentials() }.isSuccess
   }
 
-  def getAWSCredentials(): AWSCredentials = {
+  def getAwsCredentials: AwsCredentials = {

Review Comment:
   This modification will change the API. Please revert it and keep `()` in 
method.



##########
connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/SparkAWSCredentials.scala:
##########
@@ -17,44 +17,47 @@
 
 package org.apache.spark.streaming.kinesis
 
-import com.amazonaws.auth._
+import software.amazon.awssdk.auth.credentials.{AwsBasicCredentials, 
AwsCredentialsProvider, DefaultCredentialsProvider, StaticCredentialsProvider}
+import software.amazon.awssdk.services.sts.StsClient
+import 
software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider
+import software.amazon.awssdk.services.sts.model.AssumeRoleRequest
 
 import org.apache.spark.internal.Logging
 
 /**
  * Serializable interface providing a method executors can call to obtain an
- * AWSCredentialsProvider instance for authenticating to AWS services.
+ * AwsCredentialsProvider instance for authenticating to AWS services.
  */
 private[kinesis] sealed trait SparkAWSCredentials extends Serializable {
   /**
    * Return an AWSCredentialProvider instance that can be used by the Kinesis 
Client
    * Library to authenticate to AWS services (Kinesis, CloudWatch and 
DynamoDB).
    */
-  def provider: AWSCredentialsProvider
+  def provider: AwsCredentialsProvider
 }
 
-/** Returns DefaultAWSCredentialsProviderChain for authentication. */
+/** Returns DefaultCredentialsProvider for authentication. */
 private[kinesis] final case object DefaultCredentials extends 
SparkAWSCredentials {
 
-  def provider: AWSCredentialsProvider = new DefaultAWSCredentialsProviderChain
+  def provider: AwsCredentialsProvider = DefaultCredentialsProvider.create()
 }
 
 /**
- * Returns AWSStaticCredentialsProvider constructed using basic AWS keypair. 
Falls back to using
+ * Returns StaticCredentialsProvider constructed using basic AWS keypair. 
Falls back to using
  * DefaultCredentialsProviderChain if unable to construct a 
AWSCredentialsProviderChain

Review Comment:
   `AWSCredentialsProviderChain` was retired, check all terms in all comments 
in this file.



##########
connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala:
##########
@@ -16,69 +16,53 @@
  */
 package org.apache.spark.streaming.kinesis
 
-import java.util.List
-
 import scala.util.Random
 import scala.util.control.NonFatal
 
-import 
com.amazonaws.services.kinesis.clientlibrary.exceptions.{InvalidStateException, 
KinesisClientLibDependencyException, ShutdownException, ThrottlingException}
-import 
com.amazonaws.services.kinesis.clientlibrary.interfaces.{IRecordProcessor, 
IRecordProcessorCheckpointer}
-import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason
-import com.amazonaws.services.kinesis.model.Record
+import software.amazon.kinesis.exceptions.{InvalidStateException, 
KinesisClientLibDependencyException, ShutdownException, ThrottlingException}
+import software.amazon.kinesis.lifecycle.events.{InitializationInput, 
LeaseLostInput, ProcessRecordsInput, ShardEndedInput, ShutdownRequestedInput}
+import software.amazon.kinesis.processor.ShardRecordProcessor
 
 import org.apache.spark.internal.Logging
 
 /**
  * Kinesis-specific implementation of the Kinesis Client Library (KCL) 
IRecordProcessor.
  * This implementation operates on the Array[Byte] from the KinesisReceiver.
- * The Kinesis Worker creates an instance of this KinesisRecordProcessor for 
each
+ * The Kinesis scheduler creates an instance of this KinesisRecordProcessor 
for each
  * shard in the Kinesis stream upon startup.  This is normally done in 
separate threads,
  * but the KCLs within the KinesisReceivers will balance themselves out if you 
create
  * multiple Receivers.
  *
  * @param receiver Kinesis receiver
- * @param workerId for logging purposes
+ * @param schedulerId for logging purposes
  */
-private[kinesis] class KinesisRecordProcessor[T](receiver: KinesisReceiver[T], 
workerId: String)
-  extends IRecordProcessor with Logging {
+private[kinesis] class KinesisRecordProcessor[T](receiver: KinesisReceiver[T], 
schedulerId: String)
+  extends ShardRecordProcessor with Logging {
 
   // shardId populated during initialize()
   @volatile
   private var shardId: String = _
 
-  /**
-   * The Kinesis Client Library calls this method during IRecordProcessor 
initialization.

Review Comment:
   Why the comments before methods (such as `initialize`, `processRecords `) 
are removed in this class? And for the new adding public methods, please add 
method comments as well.



##########
connector/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisExampleUtils.scala:
##########
@@ -19,16 +19,16 @@ package org.apache.spark.examples.streaming
 
 import scala.jdk.CollectionConverters._
 
-import com.amazonaws.regions.RegionUtils
-import com.amazonaws.services.kinesis.AmazonKinesis
+import software.amazon.awssdk.regions.servicemetadata.KinesisServiceMetadata
 
 private[streaming] object KinesisExampleUtils {
   def getRegionNameByEndpoint(endpoint: String): String = {
     val uri = new java.net.URI(endpoint)
-    RegionUtils.getRegionsForService(AmazonKinesis.ENDPOINT_PREFIX)
+    val kinesisServiceMetadata = new KinesisServiceMetadata()
+    kinesisServiceMetadata.regions
       .asScala
-      .find(_.getAvailableEndpoints.asScala.toSeq.contains(uri.getHost))
-      .map(_.getName)
+      .find(r => 
kinesisServiceMetadata.endpointFor(r).toString.equals(uri.getHost))

Review Comment:
   How about `r => kinesisServiceMetadata.endpointFor(r).equals(uri)`



##########
connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala:
##########
@@ -96,26 +106,30 @@ private[kinesis] class KinesisTestUtils(streamShardCount: 
Int = 2) extends Loggi
     logInfo(s"Created stream ${_streamName}")
   }
 
-  def getShards(): Seq[Shard] = {
-    
kinesisClient.describeStream(_streamName).getStreamDescription.getShards.asScala.toSeq
+  def getShards: Seq[Shard] = {
+    val describeStreamRequest = DescribeStreamRequest.builder()
+      .streamName(_streamName)
+      .build()
+    
kinesisClient.describeStream(describeStreamRequest).streamDescription.shards.asScala.toSeq
   }
 
   def splitShard(shardId: String): Unit = {
-    val splitShardRequest = new SplitShardRequest()
-    splitShardRequest.withStreamName(_streamName)
-    splitShardRequest.withShardToSplit(shardId)
-    // Set a half of the max hash value

Review Comment:
   This comment should not be omitted.



##########
connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala:
##########
@@ -206,10 +227,11 @@ private[kinesis] object KinesisTestUtils {
 
   def getRegionNameByEndpoint(endpoint: String): String = {
     val uri = new java.net.URI(endpoint)
-    RegionUtils.getRegionsForService(AmazonKinesis.ENDPOINT_PREFIX)
+    val kinesisServiceMetadata = new KinesisServiceMetadata()
+    kinesisServiceMetadata.regions
       .asScala
-      .find(_.getAvailableEndpoints.asScala.toSeq.contains(uri.getHost))
-      .map(_.getName)
+      .find(r => 
kinesisServiceMetadata.endpointFor(r).toString.equals(uri.getHost))

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to