junyuc25 commented on code in PR #44211:
URL: https://github.com/apache/spark/pull/44211#discussion_r1463030142
##########
connector/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisExampleUtils.scala:
##########
@@ -19,16 +19,16 @@ package org.apache.spark.examples.streaming
import scala.jdk.CollectionConverters._
-import com.amazonaws.regions.RegionUtils
-import com.amazonaws.services.kinesis.AmazonKinesis
+import software.amazon.awssdk.regions.servicemetadata.KinesisServiceMetadata
private[streaming] object KinesisExampleUtils {
def getRegionNameByEndpoint(endpoint: String): String = {
val uri = new java.net.URI(endpoint)
- RegionUtils.getRegionsForService(AmazonKinesis.ENDPOINT_PREFIX)
+ val kinesisServiceMetadata = new KinesisServiceMetadata()
+ kinesisServiceMetadata.regions
.asScala
- .find(_.getAvailableEndpoints.asScala.toSeq.contains(uri.getHost))
- .map(_.getName)
+ .find(r =>
kinesisServiceMetadata.endpointFor(r).toString.equals(uri.getHost))
Review Comment:
It looks like the URI objects would be different I we make this change.
Comparing these two different URI causes test failure.
* The URI created from
`kinesisServiceMetadata.endpointFor(Regioin.US_WEST_2)`:
[kinesis.us-west-2.amazon.com](http://kinesis.us-west-2.amazon.com/)
* The URI created from given endpoint string contains HTTP protocol:
https://kinesis.us-west-2.amazon.com/
Stacktrace:
```
Cause: java.lang.IllegalArgumentException: Could not resolve region for
endpoint: https://kinesis.us-west-2.amazonaws.com
at
org.apache.spark.streaming.kinesis.KinesisTestUtils$.$anonfun$getRegionNameByEndpoint$3(KinesisTestUtils.scala:237)
at scala.Option.getOrElse(Option.scala:201)
at
org.apache.spark.streaming.kinesis.KinesisTestUtils$.getRegionNameByEndpoint(KinesisTestUtils.scala:237)
at
org.apache.spark.streaming.kinesis.KinesisStreamTests.<init>(KinesisStreamSuite.scala:51)
at
org.apache.spark.streaming.kinesis.WithoutAggregationKinesisStreamSuite.<init>(KinesisStreamSuite.scala:434)
```
So to avoid this failure, I would prefer to keep this line.
`kinesisServiceMetadata.endpointFor(r).toString.equals(uri.getHost)`
##########
connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointer.scala:
##########
@@ -60,7 +60,7 @@ private[kinesis] class KinesisCheckpointer(
* we will use that to make the final checkpoint. If `null` is provided, we
will not make the
Review Comment:
Thanks for catching this. Updated.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]