Github user brkyvz commented on a diff in the pull request:
https://github.com/apache/spark/pull/18029#discussion_r156265997
--- Diff:
external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisInputDStreamBuilderSuite.scala
---
@@ -101,12 +102,60 @@ class KinesisInputDStreamBuilderSuite extends
TestSuiteBase with BeforeAndAfterE
.build()
assert(dstream.endpointUrl == customEndpointUrl)
assert(dstream.regionName == customRegion)
- assert(dstream.initialPositionInStream == customInitialPosition)
+ assert(dstream.initialPosition == customInitialPosition)
assert(dstream.checkpointAppName == customAppName)
assert(dstream.checkpointInterval == customCheckpointInterval)
assert(dstream._storageLevel == customStorageLevel)
assert(dstream.kinesisCreds == customKinesisCreds)
assert(dstream.dynamoDBCreds == Option(customDynamoDBCreds))
assert(dstream.cloudWatchCreds == Option(customCloudWatchCreds))
+
+ // Testing with AtTimestamp
+ val cal = Calendar.getInstance()
+ cal.add(Calendar.DATE, -1)
+ val timestamp = cal.getTime()
+ val initialPositionAtTimestamp = AtTimestamp(timestamp)
+
+ val dstreamAtTimestamp = builder
+ .endpointUrl(customEndpointUrl)
+ .regionName(customRegion)
+ .initialPosition(initialPositionAtTimestamp)
+ .checkpointAppName(customAppName)
+ .checkpointInterval(customCheckpointInterval)
+ .storageLevel(customStorageLevel)
+ .kinesisCredentials(customKinesisCreds)
+ .dynamoDBCredentials(customDynamoDBCreds)
+ .cloudWatchCredentials(customCloudWatchCreds)
+ .build()
+ assert(dstreamAtTimestamp.endpointUrl == customEndpointUrl)
+ assert(dstreamAtTimestamp.regionName == customRegion)
+ assert(dstreamAtTimestamp.initialPosition.initialPositionInStream
+ == initialPositionAtTimestamp.initialPositionInStream)
+ assert(
+
dstreamAtTimestamp.initialPosition.asInstanceOf[AtTimestamp].timestamp.equals(timestamp))
+ assert(dstreamAtTimestamp.checkpointAppName == customAppName)
+ assert(dstreamAtTimestamp.checkpointInterval ==
customCheckpointInterval)
+ assert(dstreamAtTimestamp._storageLevel == customStorageLevel)
+ assert(dstreamAtTimestamp.kinesisCreds == customKinesisCreds)
+ assert(dstreamAtTimestamp.dynamoDBCreds == Option(customDynamoDBCreds))
+ assert(dstreamAtTimestamp.cloudWatchCreds ==
Option(customCloudWatchCreds))
+
+ // Testing with AtTimestamp
+ val initialPositionAtTimestamp2 = AtTimestamp(timestamp)
--- End diff --
how is the following lines a different test?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]