junyuc25 commented on code in PR #44211:
URL: https://github.com/apache/spark/pull/44211#discussion_r1463030593


##########
connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala:
##########
@@ -16,69 +16,53 @@
  */
 package org.apache.spark.streaming.kinesis
 
-import java.util.List
-
 import scala.util.Random
 import scala.util.control.NonFatal
 
-import 
com.amazonaws.services.kinesis.clientlibrary.exceptions.{InvalidStateException, 
KinesisClientLibDependencyException, ShutdownException, ThrottlingException}
-import 
com.amazonaws.services.kinesis.clientlibrary.interfaces.{IRecordProcessor, 
IRecordProcessorCheckpointer}
-import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason
-import com.amazonaws.services.kinesis.model.Record
+import software.amazon.kinesis.exceptions.{InvalidStateException, 
KinesisClientLibDependencyException, ShutdownException, ThrottlingException}
+import software.amazon.kinesis.lifecycle.events.{InitializationInput, 
LeaseLostInput, ProcessRecordsInput, ShardEndedInput, ShutdownRequestedInput}
+import software.amazon.kinesis.processor.ShardRecordProcessor
 
 import org.apache.spark.internal.Logging
 
 /**
  * Kinesis-specific implementation of the Kinesis Client Library (KCL) 
IRecordProcessor.
  * This implementation operates on the Array[Byte] from the KinesisReceiver.
- * The Kinesis Worker creates an instance of this KinesisRecordProcessor for 
each
+ * The Kinesis scheduler creates an instance of this KinesisRecordProcessor 
for each
  * shard in the Kinesis stream upon startup.  This is normally done in 
separate threads,
  * but the KCLs within the KinesisReceivers will balance themselves out if you 
create
  * multiple Receivers.
  *
  * @param receiver Kinesis receiver
- * @param workerId for logging purposes
+ * @param schedulerId for logging purposes
  */
-private[kinesis] class KinesisRecordProcessor[T](receiver: KinesisReceiver[T], 
workerId: String)
-  extends IRecordProcessor with Logging {
+private[kinesis] class KinesisRecordProcessor[T](receiver: KinesisReceiver[T], 
schedulerId: String)
+  extends ShardRecordProcessor with Logging {
 
   // shardId populated during initialize()
   @volatile
   private var shardId: String = _
 
-  /**
-   * The Kinesis Client Library calls this method during IRecordProcessor 
initialization.

Review Comment:
   Added comments.



##########
connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala:
##########
@@ -96,26 +106,30 @@ private[kinesis] class KinesisTestUtils(streamShardCount: 
Int = 2) extends Loggi
     logInfo(s"Created stream ${_streamName}")
   }
 
-  def getShards(): Seq[Shard] = {
-    
kinesisClient.describeStream(_streamName).getStreamDescription.getShards.asScala.toSeq
+  def getShards: Seq[Shard] = {
+    val describeStreamRequest = DescribeStreamRequest.builder()
+      .streamName(_streamName)
+      .build()
+    
kinesisClient.describeStream(describeStreamRequest).streamDescription.shards.asScala.toSeq
   }
 
   def splitShard(shardId: String): Unit = {
-    val splitShardRequest = new SplitShardRequest()
-    splitShardRequest.withStreamName(_streamName)
-    splitShardRequest.withShardToSplit(shardId)
-    // Set a half of the max hash value

Review Comment:
   Added it back



##########
connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala:
##########
@@ -206,10 +227,11 @@ private[kinesis] object KinesisTestUtils {
 
   def getRegionNameByEndpoint(endpoint: String): String = {
     val uri = new java.net.URI(endpoint)
-    RegionUtils.getRegionsForService(AmazonKinesis.ENDPOINT_PREFIX)
+    val kinesisServiceMetadata = new KinesisServiceMetadata()
+    kinesisServiceMetadata.regions
       .asScala
-      .find(_.getAvailableEndpoints.asScala.toSeq.contains(uri.getHost))
-      .map(_.getName)
+      .find(r => 
kinesisServiceMetadata.endpointFor(r).toString.equals(uri.getHost))

Review Comment:
   Same as above response.



##########
connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala:
##########
@@ -244,20 +266,20 @@ private[kinesis] object KinesisTestUtils {
   }
 
   def isAWSCredentialsPresent: Boolean = {
-    Try { new DefaultAWSCredentialsProviderChain().getCredentials() }.isSuccess
+    Try { DefaultCredentialsProvider.create().resolveCredentials() }.isSuccess
   }
 
-  def getAWSCredentials(): AWSCredentials = {
+  def getAwsCredentials: AwsCredentials = {

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to