[GitHub] spark pull request #19226: [SPARK-21985][PySpark] PairDeserializer is broken...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19226 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19226: [SPARK-21985][PySpark] PairDeserializer is broken...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19226#discussion_r139281251 --- Diff: python/pyspark/serializers.py --- @@ -343,6 +343,9 @@ def _load_stream_without_unbatching(self, stream): key_batch_stream = self.key_ser._load_stream_without_unbatching(stream) val_batch_stream = self.val_ser._load_stream_without_unbatching(stream) for (key_batch, val_batch) in zip(key_batch_stream, val_batch_stream): +# the batch is an iterable, we need to check lengths so we convert to list if needed. --- End diff -- nit: For double-zipped RDDs, the batches can be iterators from other PairDeserializer, instead of lists. We need to convert them to lists if needed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19226: [SPARK-21985][PySpark] PairDeserializer is broken...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/19226#discussion_r139112030 --- Diff: python/pyspark/serializers.py --- @@ -343,6 +343,8 @@ def _load_stream_without_unbatching(self, stream): key_batch_stream = self.key_ser._load_stream_without_unbatching(stream) val_batch_stream = self.val_ser._load_stream_without_unbatching(stream) for (key_batch, val_batch) in zip(key_batch_stream, val_batch_stream): +key_batch = key_batch if hasattr(key_batch, '__len__') else list(key_batch) --- End diff -- Could we add a small comment that this is required because `_load_stream_without_unbatching` could return an iterator of iterators in this case? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19226: [SPARK-21985][PySpark] PairDeserializer is broken...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/19226#discussion_r139035222 --- Diff: python/pyspark/tests.py --- @@ -644,6 +644,18 @@ def test_cartesian_chaining(self): set([(x, (y, y)) for x in range(10) for y in range(10)]) ) +def test_zip_chaining(self): +# Tests for SPARK-21985 +rdd = self.sc.parallelize('abc') --- End diff -- I'd set the explicit number of partitions because `zip` reserializes it depending on this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19226: [SPARK-21985][PySpark] PairDeserializer is broken...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/19226#discussion_r139032001 --- Diff: python/pyspark/serializers.py --- @@ -343,6 +343,8 @@ def _load_stream_without_unbatching(self, stream): key_batch_stream = self.key_ser._load_stream_without_unbatching(stream) val_batch_stream = self.val_ser._load_stream_without_unbatching(stream) for (key_batch, val_batch) in zip(key_batch_stream, val_batch_stream): +key_batch = list(key_batch) +val_batch = list(val_batch) --- End diff -- Ah, I had to be clear. Actually, I meant if `Serializer._load_stream_without_unbatching` works as documented `an iterator of deserialized batches (lists)`, everything should have worked fine. So, I think the reverse is actually more correct because `PairDeserializer` and `CartesianDeserializer` do not follow this. I am okay with the current change but I believe the reverse is better. WDYT @aray and @holdenk ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19226: [SPARK-21985][PySpark] PairDeserializer is broken...
Github user aray commented on a diff in the pull request: https://github.com/apache/spark/pull/19226#discussion_r138917350 --- Diff: python/pyspark/serializers.py --- @@ -343,6 +343,8 @@ def _load_stream_without_unbatching(self, stream): key_batch_stream = self.key_ser._load_stream_without_unbatching(stream) val_batch_stream = self.val_ser._load_stream_without_unbatching(stream) for (key_batch, val_batch) in zip(key_batch_stream, val_batch_stream): +key_batch = list(key_batch) +val_batch = list(val_batch) --- End diff -- fixed in 66477f8 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19226: [SPARK-21985][PySpark] PairDeserializer is broken...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/19226#discussion_r138797273 --- Diff: python/pyspark/serializers.py --- @@ -343,6 +343,8 @@ def _load_stream_without_unbatching(self, stream): key_batch_stream = self.key_ser._load_stream_without_unbatching(stream) val_batch_stream = self.val_ser._load_stream_without_unbatching(stream) for (key_batch, val_batch) in zip(key_batch_stream, val_batch_stream): +key_batch = list(key_batch) +val_batch = list(val_batch) --- End diff -- Should we fix the doc in `Serializer._load_stream_without_unbatching` to say, it returns iterator of iterables? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19226: [SPARK-21985][PySpark] PairDeserializer is broken...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/19226#discussion_r138797113 --- Diff: python/pyspark/tests.py --- @@ -644,6 +644,18 @@ def test_cartesian_chaining(self): set([(x, (y, y)) for x in range(10) for y in range(10)]) ) +def test_zip_chaining(self): +# Tests for SPARK-21985 +rdd = self.sc.parallelize(range(10), 2) --- End diff -- This test case already passes, doesn't it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19226: [SPARK-21985][PySpark] PairDeserializer is broken...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/19226#discussion_r138790747 --- Diff: python/pyspark/serializers.py --- @@ -343,9 +346,6 @@ def _load_stream_without_unbatching(self, stream): key_batch_stream = self.key_ser._load_stream_without_unbatching(stream) val_batch_stream = self.val_ser._load_stream_without_unbatching(stream) for (key_batch, val_batch) in zip(key_batch_stream, val_batch_stream): -if len(key_batch) != len(val_batch): -raise ValueError("Can not deserialize PairRDD with different number of items" - " in batches: (%d, %d)" % (len(key_batch), len(val_batch))) # for correctness with repeated cartesian/zip this must be returned as one batch yield zip(key_batch, val_batch) --- End diff -- How about returning this batch as a list (and as described in the doc)? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19226: [SPARK-21985][PySpark] PairDeserializer is broken...
GitHub user aray opened a pull request: https://github.com/apache/spark/pull/19226 [SPARK-21985][PySpark] PairDeserializer is broken for double-zipped RDDs ## What changes were proposed in this pull request? This removes the mostly unnecessary test that each individual batch from the key and value serializers are of the same size. We already enforce the batch sizes are the same in rdd.zip (see: https://github.com/apache/spark/blob/c06f3f5ac500b02d38ca7ec5fcb33085e07f2f75/python/pyspark/rdd.py#L2118 ) which is the only palce it is used in a non trivial manner. This adds a comment to the PairDeserializer documentation about this requirement. ## How was this patch tested? Additional unit test You can merge this pull request into a Git repository by running: $ git pull https://github.com/aray/spark SPARK-21985 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19226.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19226 commit 4a9eb935b8438a159c9f12239135eedd59b25fd3 Author: Andrew Ray Date: 2017-09-14T01:26:15Z remove check and add tests --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org