LantaoJin commented on code in PR #44211:
URL: https://github.com/apache/spark/pull/44211#discussion_r1458501384
##########
connector/kinesis-asl/src/main/java/org/apache/spark/streaming/kinesis/KinesisInitialPositions.java:
##########
@@ -16,7 +16,8 @@
*/
package org.apache.spark.streaming.kinesis;
-import
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
+
Review Comment:
trivial: omit the empty line #L19
##########
connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointer.scala:
##########
@@ -60,7 +60,7 @@ private[kinesis] class KinesisCheckpointer(
* we will use that to make the final checkpoint. If `null` is provided, we
will not make the
Review Comment:
Could you correct this comment section? At least no more `IRecordProcessor`.
##########
connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala:
##########
@@ -244,20 +266,20 @@ private[kinesis] object KinesisTestUtils {
}
def isAWSCredentialsPresent: Boolean = {
- Try { new DefaultAWSCredentialsProviderChain().getCredentials() }.isSuccess
+ Try { DefaultCredentialsProvider.create().resolveCredentials() }.isSuccess
}
- def getAWSCredentials(): AWSCredentials = {
+ def getAwsCredentials: AwsCredentials = {
Review Comment:
This modification will change the API. Please revert it and keep `()` in
method.
##########
connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/SparkAWSCredentials.scala:
##########
@@ -17,44 +17,47 @@
package org.apache.spark.streaming.kinesis
-import com.amazonaws.auth._
+import software.amazon.awssdk.auth.credentials.{AwsBasicCredentials,
AwsCredentialsProvider, DefaultCredentialsProvider, StaticCredentialsProvider}
+import software.amazon.awssdk.services.sts.StsClient
+import
software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider
+import software.amazon.awssdk.services.sts.model.AssumeRoleRequest
import org.apache.spark.internal.Logging
/**
* Serializable interface providing a method executors can call to obtain an
- * AWSCredentialsProvider instance for authenticating to AWS services.
+ * AwsCredentialsProvider instance for authenticating to AWS services.
*/
private[kinesis] sealed trait SparkAWSCredentials extends Serializable {
/**
* Return an AWSCredentialProvider instance that can be used by the Kinesis
Client
* Library to authenticate to AWS services (Kinesis, CloudWatch and
DynamoDB).
*/
- def provider: AWSCredentialsProvider
+ def provider: AwsCredentialsProvider
}
-/** Returns DefaultAWSCredentialsProviderChain for authentication. */
+/** Returns DefaultCredentialsProvider for authentication. */
private[kinesis] final case object DefaultCredentials extends
SparkAWSCredentials {
- def provider: AWSCredentialsProvider = new DefaultAWSCredentialsProviderChain
+ def provider: AwsCredentialsProvider = DefaultCredentialsProvider.create()
}
/**
- * Returns AWSStaticCredentialsProvider constructed using basic AWS keypair.
Falls back to using
+ * Returns StaticCredentialsProvider constructed using basic AWS keypair.
Falls back to using
* DefaultCredentialsProviderChain if unable to construct a
AWSCredentialsProviderChain
Review Comment:
`AWSCredentialsProviderChain` was retired, check all terms in all comments
in this file.
##########
connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala:
##########
@@ -16,69 +16,53 @@
*/
package org.apache.spark.streaming.kinesis
-import java.util.List
-
import scala.util.Random
import scala.util.control.NonFatal
-import
com.amazonaws.services.kinesis.clientlibrary.exceptions.{InvalidStateException,
KinesisClientLibDependencyException, ShutdownException, ThrottlingException}
-import
com.amazonaws.services.kinesis.clientlibrary.interfaces.{IRecordProcessor,
IRecordProcessorCheckpointer}
-import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason
-import com.amazonaws.services.kinesis.model.Record
+import software.amazon.kinesis.exceptions.{InvalidStateException,
KinesisClientLibDependencyException, ShutdownException, ThrottlingException}
+import software.amazon.kinesis.lifecycle.events.{InitializationInput,
LeaseLostInput, ProcessRecordsInput, ShardEndedInput, ShutdownRequestedInput}
+import software.amazon.kinesis.processor.ShardRecordProcessor
import org.apache.spark.internal.Logging
/**
* Kinesis-specific implementation of the Kinesis Client Library (KCL)
IRecordProcessor.
* This implementation operates on the Array[Byte] from the KinesisReceiver.
- * The Kinesis Worker creates an instance of this KinesisRecordProcessor for
each
+ * The Kinesis scheduler creates an instance of this KinesisRecordProcessor
for each
* shard in the Kinesis stream upon startup. This is normally done in
separate threads,
* but the KCLs within the KinesisReceivers will balance themselves out if you
create
* multiple Receivers.
*
* @param receiver Kinesis receiver
- * @param workerId for logging purposes
+ * @param schedulerId for logging purposes
*/
-private[kinesis] class KinesisRecordProcessor[T](receiver: KinesisReceiver[T],
workerId: String)
- extends IRecordProcessor with Logging {
+private[kinesis] class KinesisRecordProcessor[T](receiver: KinesisReceiver[T],
schedulerId: String)
+ extends ShardRecordProcessor with Logging {
// shardId populated during initialize()
@volatile
private var shardId: String = _
- /**
- * The Kinesis Client Library calls this method during IRecordProcessor
initialization.
Review Comment:
Why the comments before methods (such as `initialize`, `processRecords `)
are removed in this class? And for the new adding public methods, please add
method comments as well.
##########
connector/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisExampleUtils.scala:
##########
@@ -19,16 +19,16 @@ package org.apache.spark.examples.streaming
import scala.jdk.CollectionConverters._
-import com.amazonaws.regions.RegionUtils
-import com.amazonaws.services.kinesis.AmazonKinesis
+import software.amazon.awssdk.regions.servicemetadata.KinesisServiceMetadata
private[streaming] object KinesisExampleUtils {
def getRegionNameByEndpoint(endpoint: String): String = {
val uri = new java.net.URI(endpoint)
- RegionUtils.getRegionsForService(AmazonKinesis.ENDPOINT_PREFIX)
+ val kinesisServiceMetadata = new KinesisServiceMetadata()
+ kinesisServiceMetadata.regions
.asScala
- .find(_.getAvailableEndpoints.asScala.toSeq.contains(uri.getHost))
- .map(_.getName)
+ .find(r =>
kinesisServiceMetadata.endpointFor(r).toString.equals(uri.getHost))
Review Comment:
How about `r => kinesisServiceMetadata.endpointFor(r).equals(uri)`
##########
connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala:
##########
@@ -96,26 +106,30 @@ private[kinesis] class KinesisTestUtils(streamShardCount:
Int = 2) extends Loggi
logInfo(s"Created stream ${_streamName}")
}
- def getShards(): Seq[Shard] = {
-
kinesisClient.describeStream(_streamName).getStreamDescription.getShards.asScala.toSeq
+ def getShards: Seq[Shard] = {
+ val describeStreamRequest = DescribeStreamRequest.builder()
+ .streamName(_streamName)
+ .build()
+
kinesisClient.describeStream(describeStreamRequest).streamDescription.shards.asScala.toSeq
}
def splitShard(shardId: String): Unit = {
- val splitShardRequest = new SplitShardRequest()
- splitShardRequest.withStreamName(_streamName)
- splitShardRequest.withShardToSplit(shardId)
- // Set a half of the max hash value
Review Comment:
This comment should not be omitted.
##########
connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala:
##########
@@ -206,10 +227,11 @@ private[kinesis] object KinesisTestUtils {
def getRegionNameByEndpoint(endpoint: String): String = {
val uri = new java.net.URI(endpoint)
- RegionUtils.getRegionsForService(AmazonKinesis.ENDPOINT_PREFIX)
+ val kinesisServiceMetadata = new KinesisServiceMetadata()
+ kinesisServiceMetadata.regions
.asScala
- .find(_.getAvailableEndpoints.asScala.toSeq.contains(uri.getHost))
- .map(_.getName)
+ .find(r =>
kinesisServiceMetadata.endpointFor(r).toString.equals(uri.getHost))
Review Comment:
ditto
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]