vrozov commented on code in PR #52630:
URL: https://github.com/apache/spark/pull/52630#discussion_r2434233289
##########
connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala:
##########
@@ -184,18 +187,9 @@ private[kinesis] class KinesisTestUtils(streamShardCount:
Int = 2) extends Loggi
}
private def waitForStreamToBeActive(streamNameToWaitFor: String): Unit = {
- val startTimeNs = System.nanoTime()
- while (System.nanoTime() - startTimeNs <
TimeUnit.SECONDS.toNanos(createStreamTimeoutSeconds)) {
- Thread.sleep(TimeUnit.SECONDS.toMillis(describeStreamPollTimeSeconds))
- describeStream(streamNameToWaitFor).foreach { description =>
- val streamStatus = description.getStreamStatus()
- logDebug(s"\t- current state: $streamStatus\n")
- if ("ACTIVE".equals(streamStatus)) {
- return
- }
- }
- }
- require(false, s"Stream $streamName never became active")
+ val describeStreamRequest = new DescribeStreamRequest()
+ .withStreamName(streamNameToWaitFor)
+ streamExistsWaiter.run(new WaiterParameters(describeStreamRequest))
Review Comment:
It is easier to use waiters to wait for the stream to become active.
Additionally, using existing approach intermittently caused
`ResourceNotFoundException`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]