Github user brkyvz commented on a diff in the pull request:
https://github.com/apache/spark/pull/18029#discussion_r143850792
--- Diff:
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
---
@@ -148,18 +149,30 @@ private[kinesis] class KinesisReceiver[T](
kinesisCheckpointer = new KinesisCheckpointer(receiver,
checkpointInterval, workerId)
val kinesisProvider = kinesisCreds.provider
- val kinesisClientLibConfiguration = new KinesisClientLibConfiguration(
- checkpointAppName,
- streamName,
- kinesisProvider,
- dynamoDBCreds.map(_.provider).getOrElse(kinesisProvider),
- cloudWatchCreds.map(_.provider).getOrElse(kinesisProvider),
- workerId)
+
+ val kinesisClientLibConfiguration = {
+ val baseClientLibConfiguration = new KinesisClientLibConfiguration(
+ checkpointAppName,
+ streamName,
+ kinesisProvider,
+ dynamoDBCreds.map(_.provider).getOrElse(kinesisProvider),
+ cloudWatchCreds.map(_.provider).getOrElse(kinesisProvider),
+ workerId)
.withKinesisEndpoint(endpointUrl)
- .withInitialPositionInStream(initialPositionInStream)
+
.withInitialPositionInStream(initialPosition.initialPositionInStream)
.withTaskBackoffTimeMillis(500)
.withRegionName(regionName)
+ // Update the Kinesis client lib config with timestamp
+ // if InitialPositionInStream.AT_TIMESTAMP is passed
+ initialPosition match {
+ case atTimestamp: AtTimestamp =>
--- End diff --
nit:
```scala
initialPosition match {
case AtTimestamp(ts) =>
baseClientLibConfiguration.withTimestampAtInitialPositionInStream(ts)
...
}
```
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]