HeartSaVioR opened a new pull request #32747:
URL: https://github.com/apache/spark/pull/32747


   ### What changes were proposed in this pull request?
   
   This PR proposes to introduce the strategy on mismatched offset for start 
offset timestamp on Kafka data source.
   
   Please read the section `Why are the changes needed?` to understand the 
rationalization of the functionality.
   
   This would be pretty much helpful for the case where there's a skew between 
partitions and some partitions have older records.
   
   * AS-IS: Spark simply fails the query and end users have to deal with 
workarounds requiring manual steps.
   * TO-BE: Spark will assign the latest offset for these partitions, so that 
Spark can read newer records from these partitions in further micro-batches.
   
   To retain the existing behavior and also give some help for the proposed 
"TO-BE" behavior, we'd like to introduce the strategy on mismatched offset for 
start offset timestamp to let end users choose from them.
   
   The strategy will be added as source option, to ensure end users set the 
behavior explicitly (otherwise simply "known" default value).
   
   * New source option to be added: startingOffsetsByTimestampStrategy
   * Available values: `error` (fail the query as referred as AS-IS), `latest` 
(set the offset to the latest as referred as TO-BE)
   
   Doc changes are following:
   
   
![ES-106042-doc-screenshot-1](https://user-images.githubusercontent.com/1317309/120472697-2c1ba800-c3e1-11eb-884f-f28152168053.png)
   
![ES-106042-doc-screenshot-2](https://user-images.githubusercontent.com/1317309/120472719-33db4c80-c3e1-11eb-9851-939be8a3ddb7.png)
   
   
   ### Why are the changes needed?
   
   We encountered a real-world case Spark fails the query if some of the 
partitions don't have matching offset by timestamp.
   
   This is intended behavior to avoid bring unintended output for some cases 
like:
   
   * timestamp 2 is presented as timestamp-offset, but the some of partitions 
don't have the record yet
   * record with timestamp 1 comes "later" in the following micro-batch
   
   which is possible since Kafka allows to specify the timestamp in record.
   
   Here the unintended output we talked about was the risk of reading record 
with timestamp 1 in the next micro-batch despite the option specifying 
timestamp 2.
   
   But for many cases end users just suppose timestamp is increasing 
monotonically with wall clocks are all in sync, and current behavior blocks 
these cases to make progress.
   
   ### Does this PR introduce _any_ user-facing change?
   
   Yes, but not a breaking change. It's up to end users to choose the behavior 
which the default value is "error" (current behavior). And it's a source option 
(not config) so they need to explicitly set the behavior to let the 
functionality takes effect.
   
   ### How was this patch tested?
   
   New UTs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to