[GitHub] spark pull request #19226: [SPARK-21985][PySpark] PairDeserializer is broken...

2017-09-17 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/19226


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19226: [SPARK-21985][PySpark] PairDeserializer is broken...

2017-09-16 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19226#discussion_r139281251
  
--- Diff: python/pyspark/serializers.py ---
@@ -343,6 +343,9 @@ def _load_stream_without_unbatching(self, stream):
 key_batch_stream = 
self.key_ser._load_stream_without_unbatching(stream)
 val_batch_stream = 
self.val_ser._load_stream_without_unbatching(stream)
 for (key_batch, val_batch) in zip(key_batch_stream, 
val_batch_stream):
+# the batch is an iterable, we need to check lengths so we 
convert to list if needed.
--- End diff --

nit: For double-zipped RDDs, the batches can be iterators from other 
PairDeserializer, instead of lists. We need to convert them to lists if needed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19226: [SPARK-21985][PySpark] PairDeserializer is broken...

2017-09-15 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/19226#discussion_r139112030
  
--- Diff: python/pyspark/serializers.py ---
@@ -343,6 +343,8 @@ def _load_stream_without_unbatching(self, stream):
 key_batch_stream = 
self.key_ser._load_stream_without_unbatching(stream)
 val_batch_stream = 
self.val_ser._load_stream_without_unbatching(stream)
 for (key_batch, val_batch) in zip(key_batch_stream, 
val_batch_stream):
+key_batch = key_batch if hasattr(key_batch, '__len__') else 
list(key_batch)
--- End diff --

Could we add a small comment that this is required because 
`_load_stream_without_unbatching` could return an  iterator of iterators in 
this case?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19226: [SPARK-21985][PySpark] PairDeserializer is broken...

2017-09-14 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/19226#discussion_r139035222
  
--- Diff: python/pyspark/tests.py ---
@@ -644,6 +644,18 @@ def test_cartesian_chaining(self):
 set([(x, (y, y)) for x in range(10) for y in range(10)])
 )
 
+def test_zip_chaining(self):
+# Tests for SPARK-21985
+rdd = self.sc.parallelize('abc')
--- End diff --

I'd set the explicit number of partitions because `zip` reserializes it 
depending on this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19226: [SPARK-21985][PySpark] PairDeserializer is broken...

2017-09-14 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/19226#discussion_r139032001
  
--- Diff: python/pyspark/serializers.py ---
@@ -343,6 +343,8 @@ def _load_stream_without_unbatching(self, stream):
 key_batch_stream = 
self.key_ser._load_stream_without_unbatching(stream)
 val_batch_stream = 
self.val_ser._load_stream_without_unbatching(stream)
 for (key_batch, val_batch) in zip(key_batch_stream, 
val_batch_stream):
+key_batch = list(key_batch)
+val_batch = list(val_batch)
--- End diff --

Ah, I had to be clear. Actually, I meant if 
`Serializer._load_stream_without_unbatching` works as documented `an iterator 
of deserialized batches (lists)`, everything should have worked fine. So, I 
think the reverse is actually more correct because `PairDeserializer` and 
`CartesianDeserializer` do not follow this.

I am okay with the current change but I believe the reverse is better. WDYT 
@aray and @holdenk ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19226: [SPARK-21985][PySpark] PairDeserializer is broken...

2017-09-14 Thread aray
Github user aray commented on a diff in the pull request:

https://github.com/apache/spark/pull/19226#discussion_r138917350
  
--- Diff: python/pyspark/serializers.py ---
@@ -343,6 +343,8 @@ def _load_stream_without_unbatching(self, stream):
 key_batch_stream = 
self.key_ser._load_stream_without_unbatching(stream)
 val_batch_stream = 
self.val_ser._load_stream_without_unbatching(stream)
 for (key_batch, val_batch) in zip(key_batch_stream, 
val_batch_stream):
+key_batch = list(key_batch)
+val_batch = list(val_batch)
--- End diff --

fixed in 66477f8



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19226: [SPARK-21985][PySpark] PairDeserializer is broken...

2017-09-13 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/19226#discussion_r138797273
  
--- Diff: python/pyspark/serializers.py ---
@@ -343,6 +343,8 @@ def _load_stream_without_unbatching(self, stream):
 key_batch_stream = 
self.key_ser._load_stream_without_unbatching(stream)
 val_batch_stream = 
self.val_ser._load_stream_without_unbatching(stream)
 for (key_batch, val_batch) in zip(key_batch_stream, 
val_batch_stream):
+key_batch = list(key_batch)
+val_batch = list(val_batch)
--- End diff --

Should we fix the doc in `Serializer._load_stream_without_unbatching` to 
say, it returns iterator of iterables?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19226: [SPARK-21985][PySpark] PairDeserializer is broken...

2017-09-13 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/19226#discussion_r138797113
  
--- Diff: python/pyspark/tests.py ---
@@ -644,6 +644,18 @@ def test_cartesian_chaining(self):
 set([(x, (y, y)) for x in range(10) for y in range(10)])
 )
 
+def test_zip_chaining(self):
+# Tests for SPARK-21985
+rdd = self.sc.parallelize(range(10), 2)
--- End diff --

This test case already passes, doesn't it?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19226: [SPARK-21985][PySpark] PairDeserializer is broken...

2017-09-13 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/19226#discussion_r138790747
  
--- Diff: python/pyspark/serializers.py ---
@@ -343,9 +346,6 @@ def _load_stream_without_unbatching(self, stream):
 key_batch_stream = 
self.key_ser._load_stream_without_unbatching(stream)
 val_batch_stream = 
self.val_ser._load_stream_without_unbatching(stream)
 for (key_batch, val_batch) in zip(key_batch_stream, 
val_batch_stream):
-if len(key_batch) != len(val_batch):
-raise ValueError("Can not deserialize PairRDD with 
different number of items"
- " in batches: (%d, %d)" % 
(len(key_batch), len(val_batch)))
 # for correctness with repeated cartesian/zip this must be 
returned as one batch
 yield zip(key_batch, val_batch)
--- End diff --

How about returning this batch as a list (and as described in the doc)?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19226: [SPARK-21985][PySpark] PairDeserializer is broken...

2017-09-13 Thread aray
GitHub user aray opened a pull request:

https://github.com/apache/spark/pull/19226

[SPARK-21985][PySpark] PairDeserializer is broken for double-zipped RDDs

## What changes were proposed in this pull request?

This removes the mostly unnecessary test that each individual batch from 
the key and value serializers are of the same size. We already enforce the 
batch sizes are the same in rdd.zip (see: 
https://github.com/apache/spark/blob/c06f3f5ac500b02d38ca7ec5fcb33085e07f2f75/python/pyspark/rdd.py#L2118
 ) which is the only palce it is used in a non trivial manner. This adds a 
comment to the PairDeserializer documentation about this requirement.

## How was this patch tested?

Additional unit test

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aray/spark SPARK-21985

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19226.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19226


commit 4a9eb935b8438a159c9f12239135eedd59b25fd3
Author: Andrew Ray 
Date:   2017-09-14T01:26:15Z

remove check and add tests




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org