gaborgsomogyi opened a new pull request #25760: [WIP][SPARK-29054][SS] 
Invalidate Kafka consumer when new delegation token available
URL: https://github.com/apache/spark/pull/25760
 
 
   ### What changes were proposed in this pull request?
   Kafka consumers are cached. If delegation token is used and the token is 
expired, then exception is thrown. Such case new consumer is created in a Task 
retry with the latest delegation token. This can be enhanced by detecting the 
existence of a new delegation token. In this PR I'm detecting whether the token 
in the consumer is the same as the latest stored in the `UGI` 
(`targetServersRegex` must match not to create a consumer with another 
cluster's token).
   
   ### Why are the changes needed?
   It would be good to avoid Task retry to pick up the latest delegation token.
   
   ### Does this PR introduce any user-facing change?
   No.
   
   ### How was this patch tested?
   Existing + new unit tests.
   Additionally executed the following code snippet to measure 
`ensureConsumerHasLatestToken` time consumption:
   ```
       val startTimeNs = System.nanoTime()
       for (i <- 0 until 10000) {
         consumer.ensureConsumerHasLatestToken()
       }
       logInfo(s"It took ${TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
startTimeNs)} ms" +
         " to call ensureConsumerHasLatestToken 10000 times")
   ```
   
   And here are the results:
   ```
   19/09/11 14:58:22 INFO KafkaDataConsumerSuite: It took 1058 ms to call 
ensureConsumerHasLatestToken 10000 times
   ...
   19/09/11 14:58:23 INFO KafkaDataConsumerSuite: It took 780 ms to call 
ensureConsumerHasLatestToken 10000 times
   ...
   19/09/11 15:12:11 INFO KafkaDataConsumerSuite: It took 1032 ms to call 
ensureConsumerHasLatestToken 10000 times
   ...
   19/09/11 15:12:11 INFO KafkaDataConsumerSuite: It took 679 ms to call 
ensureConsumerHasLatestToken 10000 times
   ```
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to