dipeshmr1 commented on PR #54058:
URL: https://github.com/apache/spark/pull/54058#issuecomment-4643162085

   > I might be missing something, but from my reading the code change itself 
makes sense.
   > 
   > I had one question about the added test: `Option(null)` becomes `None` in 
Scala, and there is already a `None` case just above it. So I’m not sure the 
new assertion exercises the same path as the original issue.
   > 
   > Was the original issue from the instance `metrics()` method creating 
`Some(null)`? If so, would it be useful to cover that path directly?
   
    The key is the Scala default initialization. private var 
latestPartitionOffsets: PartitionOffsetMap = _ — in Scala, = _ for a reference 
type initializes to null.
        
     The field is only assigned inside latestOffset(start, readLimit) (line 
140). The metrics() instance method (line 347) is called by the streaming 
engine for progress
     reporting, independently of the trigger lifecycle.
   
     So latestPartitionOffsets is null in exactly one scenario: at stream 
startup, before the first latestOffset() call has completed. The sequence is:
   
     Stream starts
       → latestPartitionOffsets = null  (default)
       → streaming engine calls metrics() for progress reporting  ← NPE here 
(before fix)
       → latestOffset() runs → latestPartitionOffsets gets assigned
       → planInputPartitions() runs (line 269 has an assert confirming it's 
non-null by now)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to