Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11804#discussion_r56913708
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala
 ---
    @@ -52,6 +52,21 @@ case class CompositeOffset(offsets: Seq[Option[Offset]]) 
extends Offset {
         case i if i == 0 => 0
         case i if i > 0 => 1
       }
    +
    +  /**
    +   * Unpacks an offset into [[StreamProgress]] by associating each offset 
with the order list of
    +   * sources.
    +   *
    +   * This method is typically used to associate a serialized offset with 
actual sources (which
    +   * cannot be serialized).
    +   */
    +  def toStreamProgress(
    +      sources: Seq[Source],
    +      dest: StreamProgress = new StreamProgress): StreamProgress = {
    --- End diff --
    
    I made this all immutable (so it is actually constructing a new one 
always).  I also got rid of the now unneeded locking.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to