Github user sethah commented on a diff in the pull request:
https://github.com/apache/spark/pull/21546#discussion_r199508609
--- Diff: python/pyspark/serializers.py ---
@@ -184,27 +184,59 @@ def loads(self, obj):
raise NotImplementedError
-class ArrowSerializer(FramedSerializer):
+class BatchOrderSerializer(Serializer):
"""
- Serializes bytes as Arrow data with the Arrow file format.
+ Deserialize a stream of batches followed by batch order information.
"""
- def dumps(self, batch):
+ def __init__(self, serializer):
+ self.serializer = serializer
+ self.batch_order = []
+
+ def dump_stream(self, iterator, stream):
+ return self.serializer.dump_stream(iterator, stream)
+
+ def load_stream(self, stream):
+ for batch in self.serializer.load_stream(stream):
+ yield batch
+ num = read_int(stream)
+ for i in xrange(num):
+ index = read_int(stream)
+ self.batch_order.append(index)
+ raise StopIteration()
+
+ def get_batch_order(self):
--- End diff --
maybe we should initialize `self.batch_order = None`, and add `assert
self.batch_order is not None` here.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]