Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18029#discussion_r143850792
  
    --- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
 ---
    @@ -148,18 +149,30 @@ private[kinesis] class KinesisReceiver[T](
     
         kinesisCheckpointer = new KinesisCheckpointer(receiver, 
checkpointInterval, workerId)
         val kinesisProvider = kinesisCreds.provider
    -    val kinesisClientLibConfiguration = new KinesisClientLibConfiguration(
    -          checkpointAppName,
    -          streamName,
    -          kinesisProvider,
    -          dynamoDBCreds.map(_.provider).getOrElse(kinesisProvider),
    -          cloudWatchCreds.map(_.provider).getOrElse(kinesisProvider),
    -          workerId)
    +
    +    val kinesisClientLibConfiguration = {
    +      val baseClientLibConfiguration = new KinesisClientLibConfiguration(
    +        checkpointAppName,
    +        streamName,
    +        kinesisProvider,
    +        dynamoDBCreds.map(_.provider).getOrElse(kinesisProvider),
    +        cloudWatchCreds.map(_.provider).getOrElse(kinesisProvider),
    +        workerId)
             .withKinesisEndpoint(endpointUrl)
    -        .withInitialPositionInStream(initialPositionInStream)
    +        
.withInitialPositionInStream(initialPosition.initialPositionInStream)
             .withTaskBackoffTimeMillis(500)
             .withRegionName(regionName)
     
    +      // Update the Kinesis client lib config with timestamp
    +      // if InitialPositionInStream.AT_TIMESTAMP is passed
    +      initialPosition match {
    +        case atTimestamp: AtTimestamp =>
    --- End diff --
    
    nit: 
    ```scala
    initialPosition match {
      case AtTimestamp(ts) =>
        baseClientLibConfiguration.withTimestampAtInitialPositionInStream(ts)
    ...
    }
    
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to