junyuc25 commented on code in PR #44211:
URL: https://github.com/apache/spark/pull/44211#discussion_r1463030593
##########
connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala:
##########
@@ -16,69 +16,53 @@
*/
package org.apache.spark.streaming.kinesis
-import java.util.List
-
import scala.util.Random
import scala.util.control.NonFatal
-import
com.amazonaws.services.kinesis.clientlibrary.exceptions.{InvalidStateException,
KinesisClientLibDependencyException, ShutdownException, ThrottlingException}
-import
com.amazonaws.services.kinesis.clientlibrary.interfaces.{IRecordProcessor,
IRecordProcessorCheckpointer}
-import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason
-import com.amazonaws.services.kinesis.model.Record
+import software.amazon.kinesis.exceptions.{InvalidStateException,
KinesisClientLibDependencyException, ShutdownException, ThrottlingException}
+import software.amazon.kinesis.lifecycle.events.{InitializationInput,
LeaseLostInput, ProcessRecordsInput, ShardEndedInput, ShutdownRequestedInput}
+import software.amazon.kinesis.processor.ShardRecordProcessor
import org.apache.spark.internal.Logging
/**
* Kinesis-specific implementation of the Kinesis Client Library (KCL)
IRecordProcessor.
* This implementation operates on the Array[Byte] from the KinesisReceiver.
- * The Kinesis Worker creates an instance of this KinesisRecordProcessor for
each
+ * The Kinesis scheduler creates an instance of this KinesisRecordProcessor
for each
* shard in the Kinesis stream upon startup. This is normally done in
separate threads,
* but the KCLs within the KinesisReceivers will balance themselves out if you
create
* multiple Receivers.
*
* @param receiver Kinesis receiver
- * @param workerId for logging purposes
+ * @param schedulerId for logging purposes
*/
-private[kinesis] class KinesisRecordProcessor[T](receiver: KinesisReceiver[T],
workerId: String)
- extends IRecordProcessor with Logging {
+private[kinesis] class KinesisRecordProcessor[T](receiver: KinesisReceiver[T],
schedulerId: String)
+ extends ShardRecordProcessor with Logging {
// shardId populated during initialize()
@volatile
private var shardId: String = _
- /**
- * The Kinesis Client Library calls this method during IRecordProcessor
initialization.
Review Comment:
Added comments.
##########
connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala:
##########
@@ -96,26 +106,30 @@ private[kinesis] class KinesisTestUtils(streamShardCount:
Int = 2) extends Loggi
logInfo(s"Created stream ${_streamName}")
}
- def getShards(): Seq[Shard] = {
-
kinesisClient.describeStream(_streamName).getStreamDescription.getShards.asScala.toSeq
+ def getShards: Seq[Shard] = {
+ val describeStreamRequest = DescribeStreamRequest.builder()
+ .streamName(_streamName)
+ .build()
+
kinesisClient.describeStream(describeStreamRequest).streamDescription.shards.asScala.toSeq
}
def splitShard(shardId: String): Unit = {
- val splitShardRequest = new SplitShardRequest()
- splitShardRequest.withStreamName(_streamName)
- splitShardRequest.withShardToSplit(shardId)
- // Set a half of the max hash value
Review Comment:
Added it back
##########
connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala:
##########
@@ -206,10 +227,11 @@ private[kinesis] object KinesisTestUtils {
def getRegionNameByEndpoint(endpoint: String): String = {
val uri = new java.net.URI(endpoint)
- RegionUtils.getRegionsForService(AmazonKinesis.ENDPOINT_PREFIX)
+ val kinesisServiceMetadata = new KinesisServiceMetadata()
+ kinesisServiceMetadata.regions
.asScala
- .find(_.getAvailableEndpoints.asScala.toSeq.contains(uri.getHost))
- .map(_.getName)
+ .find(r =>
kinesisServiceMetadata.endpointFor(r).toString.equals(uri.getHost))
Review Comment:
Same as above response.
##########
connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala:
##########
@@ -244,20 +266,20 @@ private[kinesis] object KinesisTestUtils {
}
def isAWSCredentialsPresent: Boolean = {
- Try { new DefaultAWSCredentialsProviderChain().getCredentials() }.isSuccess
+ Try { DefaultCredentialsProvider.create().resolveCredentials() }.isSuccess
}
- def getAWSCredentials(): AWSCredentials = {
+ def getAwsCredentials: AwsCredentials = {
Review Comment:
Done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]