Github user BryanCutler commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21546#discussion_r199622313
  
    --- Diff: python/pyspark/serializers.py ---
    @@ -184,27 +184,59 @@ def loads(self, obj):
             raise NotImplementedError
     
     
    -class ArrowSerializer(FramedSerializer):
    +class BatchOrderSerializer(Serializer):
         """
    -    Serializes bytes as Arrow data with the Arrow file format.
    +    Deserialize a stream of batches followed by batch order information.
         """
     
    -    def dumps(self, batch):
    +    def __init__(self, serializer):
    +        self.serializer = serializer
    +        self.batch_order = []
    +
    +    def dump_stream(self, iterator, stream):
    +        return self.serializer.dump_stream(iterator, stream)
    +
    +    def load_stream(self, stream):
    +        for batch in self.serializer.load_stream(stream):
    +            yield batch
    +        num = read_int(stream)
    +        for i in xrange(num):
    +            index = read_int(stream)
    +            self.batch_order.append(index)
    +        raise StopIteration()
    +
    +    def get_batch_order(self):
    --- End diff --
    
    Is this to protect from it being called before `load_stream`?  I guess 
there is also the issue of what if the same serializer instance is used twice, 
then it will never get cleared..


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to