[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2015-04-09 Thread davies
Github user davies commented on the pull request:

https://github.com/apache/spark/pull/1977#issuecomment-91363104
  
@JoshRosen Good catch! fixed it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2015-04-09 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1977#issuecomment-91133286
  
  [Test build #29925 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29925/consoleFull)
 for   PR 1977 at commit 
[`67772dd`](https://github.com/apache/spark/commit/67772dd9e5fc2157f1da960aebfa82b133615527).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `class FlattenedValuesSerializer(BatchedSerializer):`
  * `class ExternalList(object):`
  * `class ExternalListOfList(ExternalList):`
  * `class GroupByKey(object):`
  * `class ExternalGroupBy(ExternalMerger):`

 * This patch does not change any dependencies.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2015-04-09 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/1977#issuecomment-91133293
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/29925/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2015-04-09 Thread JoshRosen
Github user JoshRosen commented on the pull request:

https://github.com/apache/spark/pull/1977#issuecomment-91347431
  
I spent a bit of time fuzz-testing this code to try to reach 100% coverage 
of the changes in this patch.  While doing so, I think I uncovered a bug:

```
../Spark/python/pyspark/shuffle.py:383: in _external_items
for v in self._merged_items(i):
../Spark/python/pyspark/shuffle.py:826: in genexpr
return ((k, vs) for k, vs in GroupByKey(sorted_items))
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ _ _ _ _ _ _ _ _ _ _

self = pyspark.shuffle.GroupByKey object at 0x1048d0990

def next(self):
   key, value = self.next_item if self.next_item else 
next(self.iterator)
E   TypeError: list object is not an iterator

../Spark/python/pyspark/shuffle.py:669: TypeError
```

It looks like the `GroupByKey` object expects to be instantiated with an 
iterator, but in `GroupBy. _merge_sorted_items` we end up calling it with the 
output of `ExternalSorter.sorted`.  It looks like there's a branch in 
`ExternalSorter.sorted` where we can end up returning a list instead of an 
iterator (line 517), where we return `current_chunk`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2015-04-08 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/1977#discussion_r27998013
  
--- Diff: python/pyspark/rdd.py ---
@@ -1755,21 +1753,33 @@ def createZero():
 
 return self.combineByKey(lambda v: func(createZero(), v), func, 
func, numPartitions)
 
+def _can_spill(self):
+return self.ctx._conf.get(spark.shuffle.spill, True).lower() 
== true
+
+def _memory_limit(self):
+return 
_parse_memory(self.ctx._conf.get(spark.python.worker.memory, 512m))
+
 # TODO: support variant with custom partitioner
 def groupByKey(self, numPartitions=None):
 
 Group the values for each key in the RDD into a single sequence.
-Hash-partitions the resulting RDD with into numPartitions 
partitions.
+Hash-partitions the resulting RDD with numPartitions partitions.
+
+The values in the resulting RDD is iterable object 
L{ResultIterable},
+they can be iterated only once. The `len(values)` will result in
--- End diff --

Is this still true?  
[Upthread](https://github.com/apache/spark/pull/1977#issuecomment-56004283), 
you mentioned that this can now be iterated multiple times, so we should 
probably update this comment.  It would also be nice to add a unit test to 
illustrate that the calling `len(values)` then reading the values will work, 
even if it's inefficient (you might have already done this in tests, though; I 
haven't checked).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2015-04-08 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/1977#discussion_r28001568
  
--- Diff: python/pyspark/serializers.py ---
@@ -220,6 +220,29 @@ def __repr__(self):
 return BatchedSerializer(%s, %d) % (str(self.serializer), 
self.batchSize)
 
 
+class FlattedValuesSerializer(BatchedSerializer):
--- End diff --

Can we rename this to `FlattenedValuesSerializer` (with the missing `en`)?  
Should be a pretty simple find-and-replace, but we should also update the other 
occurrences of `flatted` to be `flattened`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2015-04-08 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/1977#discussion_r28003603
  
--- Diff: python/pyspark/shuffle.py ---
@@ -367,32 +372,13 @@ def iteritems(self):
 
 def _external_items(self):
  Return all partitioned items as iterator 
-assert not self.data
--- End diff --

I noticed that you moved a few of these assertions.  I guess the old 
assumption was that once we've spilled, we'll stop using `data` and only 
aggregate into `pdata`, given that we clear `data` in the first branch of 
`_spill`.  Why has this assumption changed here?  It looks like we do end up 
writing to `data` again inside of this `_external_items` method, but then we 
end up clearing `data` at the end after the iterator has been consumed.

Was this change necessary in order to support iterating multiple times over 
the merged result?  Just want to double-check my understanding here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2015-04-08 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/1977#discussion_r28003896
  
--- Diff: python/pyspark/shuffle.py ---
@@ -367,32 +372,13 @@ def iteritems(self):
 
 def _external_items(self):
  Return all partitioned items as iterator 
-assert not self.data
 if any(self.pdata):
 self._spill()
-hard_limit = self._next_limit()
+self.pdata = []
--- End diff --

It looks like the old code didn't clear `pdata` after spilling, but I'm 
guessing that wasn't a problem because we didn't iterate multiple times before?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2015-04-08 Thread JoshRosen
Github user JoshRosen commented on the pull request:

https://github.com/apache/spark/pull/1977#issuecomment-91011683
  
I spent most of the morning looking this over again and the patch looks 
pretty good to me.  I think I understand the lifecycle of values pretty well.  
I left a couple of questions / comments upthread, but overall I think this is 
pretty close to being ready to merge.

One quick question: when do groupBy's spill files get cleaned up?  It seems 
like we want them to be cleaned at the end of the task, since at that point we 
know for sure that they won't be re-used.  To handle this case, what do you 
think about adding a cleanup hook mechanism that allows us to register cleanup 
code to run at the end of a task?  In the past, we could have relied on 
shutdown hooks for this, but that's no longer possible due to worker re-use.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2015-04-08 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/1977#discussion_r27994956
  
--- Diff: python/pyspark/shuffle.py ---
@@ -529,6 +520,295 @@ def sorted(self, iterator, key=None, reverse=False):
 return heapq.merge(chunks, key=key, reverse=reverse)
 
 
+class ExternalList(object):
+
+ExternalList can have many items which cannot be hold in memory in
+the same time.
+
+ l = ExternalList(range(100))
+ len(l)
+100
+ l.append(10)
+ len(l)
+101
+ for i in range(10240):
+... l.append(i)
+ len(l)
+10341
+ import pickle
+ l2 = pickle.loads(pickle.dumps(l))
+ len(l2)
+10341
+ list(l2)[100]
+10
+
+LIMIT = 10240
+
+def __init__(self, values):
+self.values = values
+self.disk_count = 0
+self._file = None
+self._ser = None
+
+def __getstate__(self):
+if self._file is not None:
+self._file.flush()
+f = os.fdopen(os.dup(self._file.fileno()))
+f.seek(0)
+bytes = f.read()
--- End diff --

I think `bytes` is a reserved word in Python, so maybe we should call this 
`_bytes` instead.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2015-04-08 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/1977#discussion_r27995826
  
--- Diff: python/pyspark/shuffle.py ---
@@ -529,6 +520,295 @@ def sorted(self, iterator, key=None, reverse=False):
 return heapq.merge(chunks, key=key, reverse=reverse)
 
 
+class ExternalList(object):
+
+ExternalList can have many items which cannot be hold in memory in
+the same time.
+
+ l = ExternalList(range(100))
+ len(l)
+100
+ l.append(10)
+ len(l)
+101
+ for i in range(10240):
+... l.append(i)
+ len(l)
+10341
+ import pickle
+ l2 = pickle.loads(pickle.dumps(l))
+ len(l2)
+10341
+ list(l2)[100]
+10
+
+LIMIT = 10240
+
+def __init__(self, values):
+self.values = values
+self.disk_count = 0
+self._file = None
+self._ser = None
+
+def __getstate__(self):
+if self._file is not None:
+self._file.flush()
+f = os.fdopen(os.dup(self._file.fileno()))
+f.seek(0)
+bytes = f.read()
+else:
+bytes = ''
+return self.values, self.disk_count, bytes
+
+def __setstate__(self, item):
+self.values, self.disk_count, bytes = item
+if bytes:
+self._open_file()
+self._file.write(bytes)
+else:
+self._file = None
+self._ser = None
+
+def __iter__(self):
+if self._file is not None:
+self._file.flush()
+# read all items from disks first
+with os.fdopen(os.dup(self._file.fileno()), 'r') as f:
+f.seek(0)
+for values in self._ser.load_stream(f):
+for v in values:
+yield v
+
+for v in self.values:
+yield v
+
+def __len__(self):
+return self.disk_count + len(self.values)
+
+def append(self, value):
+self.values.append(value)
+# dump them into disk if the key is huge
+if len(self.values) = self.LIMIT:
+self._spill()
+
+def _open_file(self):
+dirs = _get_local_dirs(objects)
+d = dirs[id(self) % len(dirs)]
+if not os.path.exists(d):
+os.makedirs(d)
+p = os.path.join(d, str(id))
+self._file = open(p, w+, 65536)
+self._ser = CompressedSerializer(PickleSerializer())
+os.unlink(p)
--- End diff --

Why do we need this `unlink` call here?  Is this to delete the file if it 
already exists?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2015-04-08 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/1977#discussion_r28027864
  
--- Diff: python/pyspark/shuffle.py ---
@@ -529,6 +522,322 @@ def sorted(self, iterator, key=None, reverse=False):
 return heapq.merge(chunks, key=key, reverse=reverse)
 
 
+class ExternalList(object):
+
+ExternalList can have many items which cannot be hold in memory in
+the same time.
+
+ l = ExternalList(range(100))
+ len(l)
+100
+ l.append(10)
+ len(l)
+101
+ for i in range(10240):
+... l.append(i)
+ len(l)
+10341
+ import pickle
+ l2 = pickle.loads(pickle.dumps(l))
+ len(l2)
+10341
+ list(l2)[100]
+10
+
+LIMIT = 10240
+
+def __init__(self, values):
+self.values = values
+self.count = len(values)
+self._file = None
+self._ser = None
+
+def __getstate__(self):
+if self._file is not None:
+self._file.flush()
+f = os.fdopen(os.dup(self._file.fileno()))
+f.seek(0)
+serialized = f.read()
+else:
+serialized = ''
+return self.values, self.count, serialized
+
+def __setstate__(self, item):
+self.values, self.count, serialized = item
+if serialized:
+self._open_file()
+self._file.write(serialized)
+else:
+self._file = None
+self._ser = None
+
+def __iter__(self):
+if self._file is not None:
+self._file.flush()
+# read all items from disks first
+with os.fdopen(os.dup(self._file.fileno()), 'r') as f:
+f.seek(0)
+for values in self._ser.load_stream(f):
+for v in values:
--- End diff --

I understand now that this is just unwrapping the list that we wrapped 
around `self.values` in the `_spill` call down on line 615, but this was a bit 
confusing to me.  If we need to keep this and the wrapping, could you add 
comments to explain it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2015-04-08 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/1977#discussion_r28032966
  
--- Diff: python/pyspark/shuffle.py ---
@@ -529,6 +522,322 @@ def sorted(self, iterator, key=None, reverse=False):
 return heapq.merge(chunks, key=key, reverse=reverse)
 
 
+class ExternalList(object):
+
+ExternalList can have many items which cannot be hold in memory in
+the same time.
+
+ l = ExternalList(range(100))
+ len(l)
+100
+ l.append(10)
+ len(l)
+101
+ for i in range(10240):
+... l.append(i)
+ len(l)
+10341
+ import pickle
+ l2 = pickle.loads(pickle.dumps(l))
+ len(l2)
+10341
+ list(l2)[100]
+10
+
+LIMIT = 10240
+
+def __init__(self, values):
+self.values = values
+self.count = len(values)
+self._file = None
+self._ser = None
+
+def __getstate__(self):
+if self._file is not None:
+self._file.flush()
+f = os.fdopen(os.dup(self._file.fileno()))
+f.seek(0)
+serialized = f.read()
+else:
+serialized = ''
+return self.values, self.count, serialized
+
+def __setstate__(self, item):
+self.values, self.count, serialized = item
+if serialized:
+self._open_file()
+self._file.write(serialized)
+else:
+self._file = None
+self._ser = None
+
+def __iter__(self):
+if self._file is not None:
+self._file.flush()
+# read all items from disks first
+with os.fdopen(os.dup(self._file.fileno()), 'r') as f:
+f.seek(0)
+for values in self._ser.load_stream(f):
+for v in values:
+yield v
+
+for v in self.values:
+yield v
+
+def __len__(self):
+return self.count
+
+def append(self, value):
+self.values.append(value)
+self.count += 1
+# dump them into disk if the key is huge
+if len(self.values) = self.LIMIT:
+self._spill()
+
+def _open_file(self):
+dirs = _get_local_dirs(objects)
+d = dirs[id(self) % len(dirs)]
+if not os.path.exists(d):
+os.makedirs(d)
+p = os.path.join(d, str(id))
+self._file = open(p, w+, 65536)
+self._ser = CompressedSerializer(PickleSerializer())
+os.unlink(p)
+
+def _spill(self):
+ dump the values into disk 
+global MemoryBytesSpilled, DiskBytesSpilled
+if self._file is None:
+self._open_file()
+
+used_memory = get_used_memory()
+pos = self._file.tell()
+self._ser.dump_stream([self.values], self._file)
+self.values = []
+gc.collect()
+DiskBytesSpilled += self._file.tell() - pos
+MemoryBytesSpilled += (used_memory - get_used_memory())  20
+
+
+class ExternalListOfList(ExternalList):
+
+An external list for list.
+
+def __init__(self, values):
+ExternalList.__init__(self, values)
+self.count = sum(len(i) for i in values)
+
+def append(self, value):
+ExternalList.append(self, value)
--- End diff --

I can change the __iter__ of ExternalListOfList, then we do not need 
ChainedIterator anymore.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2015-04-08 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/1977#discussion_r28027636
  
--- Diff: python/pyspark/shuffle.py ---
@@ -529,6 +522,322 @@ def sorted(self, iterator, key=None, reverse=False):
 return heapq.merge(chunks, key=key, reverse=reverse)
 
 
+class ExternalList(object):
+
+ExternalList can have many items which cannot be hold in memory in
+the same time.
+
+ l = ExternalList(range(100))
+ len(l)
+100
+ l.append(10)
+ len(l)
+101
+ for i in range(10240):
+... l.append(i)
+ len(l)
+10341
+ import pickle
+ l2 = pickle.loads(pickle.dumps(l))
+ len(l2)
+10341
+ list(l2)[100]
+10
+
+LIMIT = 10240
+
+def __init__(self, values):
+self.values = values
+self.count = len(values)
+self._file = None
+self._ser = None
+
+def __getstate__(self):
+if self._file is not None:
+self._file.flush()
+f = os.fdopen(os.dup(self._file.fileno()))
+f.seek(0)
+serialized = f.read()
+else:
+serialized = ''
+return self.values, self.count, serialized
+
+def __setstate__(self, item):
+self.values, self.count, serialized = item
+if serialized:
+self._open_file()
+self._file.write(serialized)
+else:
+self._file = None
+self._ser = None
+
+def __iter__(self):
+if self._file is not None:
+self._file.flush()
+# read all items from disks first
+with os.fdopen(os.dup(self._file.fileno()), 'r') as f:
+f.seek(0)
+for values in self._ser.load_stream(f):
+for v in values:
+yield v
+
+for v in self.values:
+yield v
+
+def __len__(self):
+return self.count
+
+def append(self, value):
+self.values.append(value)
+self.count += 1
+# dump them into disk if the key is huge
+if len(self.values) = self.LIMIT:
+self._spill()
+
+def _open_file(self):
+dirs = _get_local_dirs(objects)
+d = dirs[id(self) % len(dirs)]
+if not os.path.exists(d):
+os.makedirs(d)
+p = os.path.join(d, str(id))
+self._file = open(p, w+, 65536)
+self._ser = CompressedSerializer(PickleSerializer())
+os.unlink(p)
+
+def _spill(self):
+ dump the values into disk 
+global MemoryBytesSpilled, DiskBytesSpilled
+if self._file is None:
+self._open_file()
+
+used_memory = get_used_memory()
+pos = self._file.tell()
+self._ser.dump_stream([self.values], self._file)
+self.values = []
+gc.collect()
+DiskBytesSpilled += self._file.tell() - pos
+MemoryBytesSpilled += (used_memory - get_used_memory())  20
+
+
+class ExternalListOfList(ExternalList):
+
+An external list for list.
+
+def __init__(self, values):
+ExternalList.__init__(self, values)
+self.count = sum(len(i) for i in values)
+
+def append(self, value):
+ExternalList.append(self, value)
+# already counted 1 in ExternalList.append
+self.count += len(value) - 1
+
+
+class GroupByKey(object):
--- End diff --

It looks like we only directly use `GroupByKey` in tests, while the actual 
shuffle code only uses `GroupListsByKey`.  Is this intentional?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2015-04-08 Thread davies
Github user davies commented on the pull request:

https://github.com/apache/spark/pull/1977#issuecomment-91037582
  
@JoshRosen Thanks for review this, this may be the most complicated part in 
PySpark. :(

For partitioned file, they will be cleaned up after merging, partition by 
partition. The sorted file and file for large list (ExternalList) will be 
cleaned up once it's closed by GC (os.unlink() after open it).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2015-04-08 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1977#issuecomment-91121784
  
  [Test build #29925 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29925/consoleFull)
 for   PR 1977 at commit 
[`67772dd`](https://github.com/apache/spark/commit/67772dd9e5fc2157f1da960aebfa82b133615527).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2015-04-08 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/1977#discussion_r28032910
  
--- Diff: python/pyspark/shuffle.py ---
@@ -529,6 +522,322 @@ def sorted(self, iterator, key=None, reverse=False):
 return heapq.merge(chunks, key=key, reverse=reverse)
 
 
+class ExternalList(object):
+
+ExternalList can have many items which cannot be hold in memory in
+the same time.
+
+ l = ExternalList(range(100))
+ len(l)
+100
+ l.append(10)
+ len(l)
+101
+ for i in range(10240):
+... l.append(i)
+ len(l)
+10341
+ import pickle
+ l2 = pickle.loads(pickle.dumps(l))
+ len(l2)
+10341
+ list(l2)[100]
+10
+
+LIMIT = 10240
+
+def __init__(self, values):
+self.values = values
+self.count = len(values)
+self._file = None
+self._ser = None
+
+def __getstate__(self):
+if self._file is not None:
+self._file.flush()
+f = os.fdopen(os.dup(self._file.fileno()))
+f.seek(0)
+serialized = f.read()
+else:
+serialized = ''
+return self.values, self.count, serialized
+
+def __setstate__(self, item):
+self.values, self.count, serialized = item
+if serialized:
+self._open_file()
+self._file.write(serialized)
+else:
+self._file = None
+self._ser = None
+
+def __iter__(self):
+if self._file is not None:
+self._file.flush()
+# read all items from disks first
+with os.fdopen(os.dup(self._file.fileno()), 'r') as f:
+f.seek(0)
+for values in self._ser.load_stream(f):
+for v in values:
+yield v
+
+for v in self.values:
+yield v
+
+def __len__(self):
+return self.count
+
+def append(self, value):
+self.values.append(value)
+self.count += 1
+# dump them into disk if the key is huge
+if len(self.values) = self.LIMIT:
+self._spill()
+
+def _open_file(self):
+dirs = _get_local_dirs(objects)
+d = dirs[id(self) % len(dirs)]
+if not os.path.exists(d):
+os.makedirs(d)
+p = os.path.join(d, str(id))
+self._file = open(p, w+, 65536)
+self._ser = CompressedSerializer(PickleSerializer())
+os.unlink(p)
+
+def _spill(self):
+ dump the values into disk 
+global MemoryBytesSpilled, DiskBytesSpilled
+if self._file is None:
+self._open_file()
+
+used_memory = get_used_memory()
+pos = self._file.tell()
+self._ser.dump_stream([self.values], self._file)
+self.values = []
+gc.collect()
+DiskBytesSpilled += self._file.tell() - pos
+MemoryBytesSpilled += (used_memory - get_used_memory())  20
+
+
+class ExternalListOfList(ExternalList):
+
+An external list for list.
+
+def __init__(self, values):
+ExternalList.__init__(self, values)
+self.count = sum(len(i) for i in values)
+
+def append(self, value):
+ExternalList.append(self, value)
+# already counted 1 in ExternalList.append
+self.count += len(value) - 1
+
+
+class GroupByKey(object):
--- End diff --

Yes, I'd want to the code and test of ExternalList and GroupByKey could be 
easy to understand.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2015-04-08 Thread JoshRosen
Github user JoshRosen commented on the pull request:

https://github.com/apache/spark/pull/1977#issuecomment-91051963
  
Jenkins, retest this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2015-04-08 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/1977#issuecomment-91050249
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/29891/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2015-04-08 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/1977#discussion_r28013345
  
--- Diff: python/pyspark/shuffle.py ---
@@ -529,6 +520,295 @@ def sorted(self, iterator, key=None, reverse=False):
 return heapq.merge(chunks, key=key, reverse=reverse)
 
 
+class ExternalList(object):
+
+ExternalList can have many items which cannot be hold in memory in
+the same time.
+
+ l = ExternalList(range(100))
+ len(l)
+100
+ l.append(10)
+ len(l)
+101
+ for i in range(10240):
+... l.append(i)
+ len(l)
+10341
+ import pickle
+ l2 = pickle.loads(pickle.dumps(l))
+ len(l2)
+10341
+ list(l2)[100]
+10
+
+LIMIT = 10240
+
+def __init__(self, values):
+self.values = values
+self.disk_count = 0
+self._file = None
+self._ser = None
+
+def __getstate__(self):
+if self._file is not None:
+self._file.flush()
+f = os.fdopen(os.dup(self._file.fileno()))
+f.seek(0)
+bytes = f.read()
+else:
+bytes = ''
+return self.values, self.disk_count, bytes
+
+def __setstate__(self, item):
+self.values, self.disk_count, bytes = item
+if bytes:
+self._open_file()
+self._file.write(bytes)
+else:
+self._file = None
+self._ser = None
+
+def __iter__(self):
+if self._file is not None:
+self._file.flush()
+# read all items from disks first
+with os.fdopen(os.dup(self._file.fileno()), 'r') as f:
+f.seek(0)
+for values in self._ser.load_stream(f):
+for v in values:
+yield v
+
+for v in self.values:
+yield v
+
+def __len__(self):
+return self.disk_count + len(self.values)
+
+def append(self, value):
+self.values.append(value)
+# dump them into disk if the key is huge
+if len(self.values) = self.LIMIT:
+self._spill()
+
+def _open_file(self):
+dirs = _get_local_dirs(objects)
+d = dirs[id(self) % len(dirs)]
+if not os.path.exists(d):
+os.makedirs(d)
+p = os.path.join(d, str(id))
+self._file = open(p, w+, 65536)
+self._ser = CompressedSerializer(PickleSerializer())
+os.unlink(p)
--- End diff --

Ah, I see.  From the 
[unlink](http://pubs.opengroup.org/onlinepubs/009695399/functions/unlink.html) 
spec:

 When the file's link count becomes 0 and no process has the file open, 
the space occupied by the file shall be freed and the file shall no longer be 
accessible. If one or more processes have the file open when the last link is 
removed, the link shall be removed before unlink() returns, but the removal of 
the file contents shall be postponed until all references to the file are 
closed.

This is a very clever trick!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2015-04-08 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1977#issuecomment-91052717
  
  [Test build #29895 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29895/consoleFull)
 for   PR 1977 at commit 
[`0dcf320`](https://github.com/apache/spark/commit/0dcf3208220a46cead751aacee7e7e3e22a5132b).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2015-04-08 Thread davies
Github user davies commented on the pull request:

https://github.com/apache/spark/pull/1977#issuecomment-91060947
  
@JoshRosen the last comments had been addressed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2015-04-08 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/1977#discussion_r28019740
  
--- Diff: python/pyspark/shuffle.py ---
@@ -529,6 +522,301 @@ def sorted(self, iterator, key=None, reverse=False):
 return heapq.merge(chunks, key=key, reverse=reverse)
 
 
+class ExternalList(object):
+
+ExternalList can have many items which cannot be hold in memory in
+the same time.
+
+ l = ExternalList(range(100))
+ len(l)
+100
+ l.append(10)
+ len(l)
+101
+ for i in range(10240):
+... l.append(i)
+ len(l)
+10341
+ import pickle
+ l2 = pickle.loads(pickle.dumps(l))
+ len(l2)
+10341
+ list(l2)[100]
+10
+
+LIMIT = 10240
+
+def __init__(self, values):
+self.values = values
+if values and isinstance(values[0], list):
+self.count = sum(len(i) for i in values)
+else:
+self.count = len(values)
+self._file = None
+self._ser = None
+
+def __getstate__(self):
+if self._file is not None:
+self._file.flush()
+f = os.fdopen(os.dup(self._file.fileno()))
+f.seek(0)
+serialized = f.read()
+else:
+serialized = ''
+return self.values, self.count, serialized
+
+def __setstate__(self, item):
+self.values, self.count, serialized = item
+if serialized:
+self._open_file()
+self._file.write(serialized)
+else:
+self._file = None
+self._ser = None
+
+def __iter__(self):
+if self._file is not None:
+self._file.flush()
+# read all items from disks first
+with os.fdopen(os.dup(self._file.fileno()), 'r') as f:
+f.seek(0)
+for values in self._ser.load_stream(f):
+for v in values:
+yield v
+
+for v in self.values:
+yield v
+
+def __len__(self):
+return self.count
+
+def append(self, value):
+self.values.append(value)
+self.count += len(value) if isinstance(value, list) else 1
--- End diff --

Why do we add the length of `value` if it's a list?  This would make sense 
for `extend()`, but I find it a bit confusing for `append()`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2015-04-08 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1977#issuecomment-91069245
  
  [Test build #29895 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29895/consoleFull)
 for   PR 1977 at commit 
[`0dcf320`](https://github.com/apache/spark/commit/0dcf3208220a46cead751aacee7e7e3e22a5132b).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `class FlattenedValuesSerializer(BatchedSerializer):`
  * `class ExternalList(object):`
  * `class GroupByKey(object):`
  * `class ChainedIterable(object):`
  * `class ExternalGroupBy(ExternalMerger):`

 * This patch does not change any dependencies.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2015-04-08 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/1977#issuecomment-91069251
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/29895/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2015-04-08 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/1977#discussion_r28027737
  
--- Diff: python/pyspark/shuffle.py ---
@@ -529,6 +522,322 @@ def sorted(self, iterator, key=None, reverse=False):
 return heapq.merge(chunks, key=key, reverse=reverse)
 
 
+class ExternalList(object):
+
+ExternalList can have many items which cannot be hold in memory in
+the same time.
+
+ l = ExternalList(range(100))
+ len(l)
+100
+ l.append(10)
+ len(l)
+101
+ for i in range(10240):
+... l.append(i)
+ len(l)
+10341
+ import pickle
+ l2 = pickle.loads(pickle.dumps(l))
+ len(l2)
+10341
+ list(l2)[100]
+10
+
+LIMIT = 10240
+
+def __init__(self, values):
+self.values = values
+self.count = len(values)
+self._file = None
+self._ser = None
+
+def __getstate__(self):
+if self._file is not None:
+self._file.flush()
+f = os.fdopen(os.dup(self._file.fileno()))
+f.seek(0)
+serialized = f.read()
+else:
+serialized = ''
+return self.values, self.count, serialized
+
+def __setstate__(self, item):
+self.values, self.count, serialized = item
+if serialized:
+self._open_file()
+self._file.write(serialized)
+else:
+self._file = None
+self._ser = None
+
+def __iter__(self):
+if self._file is not None:
+self._file.flush()
+# read all items from disks first
+with os.fdopen(os.dup(self._file.fileno()), 'r') as f:
+f.seek(0)
+for values in self._ser.load_stream(f):
+for v in values:
+yield v
+
+for v in self.values:
+yield v
+
+def __len__(self):
+return self.count
+
+def append(self, value):
+self.values.append(value)
+self.count += 1
+# dump them into disk if the key is huge
+if len(self.values) = self.LIMIT:
+self._spill()
+
+def _open_file(self):
+dirs = _get_local_dirs(objects)
+d = dirs[id(self) % len(dirs)]
+if not os.path.exists(d):
+os.makedirs(d)
+p = os.path.join(d, str(id))
+self._file = open(p, w+, 65536)
+self._ser = CompressedSerializer(PickleSerializer())
+os.unlink(p)
+
+def _spill(self):
+ dump the values into disk 
+global MemoryBytesSpilled, DiskBytesSpilled
+if self._file is None:
+self._open_file()
+
+used_memory = get_used_memory()
+pos = self._file.tell()
+self._ser.dump_stream([self.values], self._file)
+self.values = []
+gc.collect()
+DiskBytesSpilled += self._file.tell() - pos
+MemoryBytesSpilled += (used_memory - get_used_memory())  20
+
+
+class ExternalListOfList(ExternalList):
+
+An external list for list.
+
+def __init__(self, values):
+ExternalList.__init__(self, values)
+self.count = sum(len(i) for i in values)
+
+def append(self, value):
+ExternalList.append(self, value)
--- End diff --

I'm still a bit confused by the contract for this class.  If `count = N` 
and __len__ is defined as `self.count`, then I'd expect that our iterator would 
yield exactly `N` items.  It seems like this contract is either violated by 
this class or its superclass.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2015-04-08 Thread davies
Github user davies commented on the pull request:

https://github.com/apache/spark/pull/1977#issuecomment-91038005
  
@JoshRosen Also, I had rollback the changes in ResultIterable (because some 
one is using ResultIterable.maxindex), and improve the performance of 
len(ResultIterable) for ExternalList, please take another look. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2015-04-08 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/1977#issuecomment-91077067
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/29900/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2015-04-08 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/1977#discussion_r28033500
  
--- Diff: python/pyspark/shuffle.py ---
@@ -529,6 +522,322 @@ def sorted(self, iterator, key=None, reverse=False):
 return heapq.merge(chunks, key=key, reverse=reverse)
 
 
+class ExternalList(object):
+
+ExternalList can have many items which cannot be hold in memory in
+the same time.
+
+ l = ExternalList(range(100))
+ len(l)
+100
+ l.append(10)
+ len(l)
+101
+ for i in range(10240):
+... l.append(i)
+ len(l)
+10341
+ import pickle
+ l2 = pickle.loads(pickle.dumps(l))
+ len(l2)
+10341
+ list(l2)[100]
+10
+
+LIMIT = 10240
+
+def __init__(self, values):
+self.values = values
+self.count = len(values)
+self._file = None
+self._ser = None
+
+def __getstate__(self):
+if self._file is not None:
+self._file.flush()
+f = os.fdopen(os.dup(self._file.fileno()))
+f.seek(0)
+serialized = f.read()
+else:
+serialized = ''
+return self.values, self.count, serialized
+
+def __setstate__(self, item):
+self.values, self.count, serialized = item
+if serialized:
+self._open_file()
+self._file.write(serialized)
+else:
+self._file = None
+self._ser = None
+
+def __iter__(self):
+if self._file is not None:
+self._file.flush()
+# read all items from disks first
+with os.fdopen(os.dup(self._file.fileno()), 'r') as f:
+f.seek(0)
+for values in self._ser.load_stream(f):
+for v in values:
+yield v
+
+for v in self.values:
+yield v
+
+def __len__(self):
+return self.count
+
+def append(self, value):
+self.values.append(value)
+self.count += 1
+# dump them into disk if the key is huge
+if len(self.values) = self.LIMIT:
+self._spill()
+
+def _open_file(self):
+dirs = _get_local_dirs(objects)
+d = dirs[id(self) % len(dirs)]
+if not os.path.exists(d):
+os.makedirs(d)
+p = os.path.join(d, str(id))
+self._file = open(p, w+, 65536)
+self._ser = CompressedSerializer(PickleSerializer())
+os.unlink(p)
+
+def _spill(self):
+ dump the values into disk 
+global MemoryBytesSpilled, DiskBytesSpilled
+if self._file is None:
+self._open_file()
+
+used_memory = get_used_memory()
+pos = self._file.tell()
+self._ser.dump_stream([self.values], self._file)
+self.values = []
+gc.collect()
+DiskBytesSpilled += self._file.tell() - pos
+MemoryBytesSpilled += (used_memory - get_used_memory())  20
+
+
+class ExternalListOfList(ExternalList):
+
+An external list for list.
+
+def __init__(self, values):
+ExternalList.__init__(self, values)
+self.count = sum(len(i) for i in values)
+
+def append(self, value):
+ExternalList.append(self, value)
--- End diff --

done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2015-04-08 Thread davies
Github user davies commented on the pull request:

https://github.com/apache/spark/pull/1977#issuecomment-91108456
  
@JoshRosen Thanks for the comments, it looks better now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2015-04-08 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/1977#discussion_r28019825
  
--- Diff: python/pyspark/shuffle.py ---
@@ -529,6 +522,301 @@ def sorted(self, iterator, key=None, reverse=False):
 return heapq.merge(chunks, key=key, reverse=reverse)
 
 
+class ExternalList(object):
+
+ExternalList can have many items which cannot be hold in memory in
+the same time.
+
+ l = ExternalList(range(100))
+ len(l)
+100
+ l.append(10)
+ len(l)
+101
+ for i in range(10240):
+... l.append(i)
+ len(l)
+10341
+ import pickle
+ l2 = pickle.loads(pickle.dumps(l))
+ len(l2)
+10341
+ list(l2)[100]
+10
+
+LIMIT = 10240
+
+def __init__(self, values):
+self.values = values
+if values and isinstance(values[0], list):
+self.count = sum(len(i) for i in values)
+else:
+self.count = len(values)
+self._file = None
+self._ser = None
+
+def __getstate__(self):
+if self._file is not None:
+self._file.flush()
+f = os.fdopen(os.dup(self._file.fileno()))
+f.seek(0)
+serialized = f.read()
+else:
+serialized = ''
+return self.values, self.count, serialized
+
+def __setstate__(self, item):
+self.values, self.count, serialized = item
+if serialized:
+self._open_file()
+self._file.write(serialized)
+else:
+self._file = None
+self._ser = None
+
+def __iter__(self):
+if self._file is not None:
+self._file.flush()
+# read all items from disks first
+with os.fdopen(os.dup(self._file.fileno()), 'r') as f:
+f.seek(0)
+for values in self._ser.load_stream(f):
+for v in values:
+yield v
+
+for v in self.values:
+yield v
+
+def __len__(self):
+return self.count
+
+def append(self, value):
+self.values.append(value)
+self.count += len(value) if isinstance(value, list) else 1
+# dump them into disk if the key is huge
+if len(self.values) = self.LIMIT:
+self._spill()
+
+def _open_file(self):
+dirs = _get_local_dirs(objects)
+d = dirs[id(self) % len(dirs)]
+if not os.path.exists(d):
+os.makedirs(d)
+p = os.path.join(d, str(id))
+self._file = open(p, w+, 65536)
+self._ser = CompressedSerializer(PickleSerializer())
+os.unlink(p)
+
+def _spill(self):
+ dump the values into disk 
+global MemoryBytesSpilled, DiskBytesSpilled
+if self._file is None:
+self._open_file()
+
+used_memory = get_used_memory()
+pos = self._file.tell()
+self._ser.dump_stream([self.values], self._file)
+self.values = []
+gc.collect()
+DiskBytesSpilled += self._file.tell() - pos
+MemoryBytesSpilled += (used_memory - get_used_memory())  20
+
+
+class GroupByKey(object):
+
+group a sorted iterator into [(k1, it1), (k2, it2), ...]
+
+ k = [i/3 for i in range(6)]
+ v = [i for i in range(6)]
+ g = GroupByKey(iter(zip(k, v)))
+ [(k, list(it)) for k, it in g]
+[(0, [0, 1, 2]), (1, [3, 4, 5])]
+
+def __init__(self, iterator):
+self.iterator = iterator
+self.next_item = None
+
+def __iter__(self):
+return self
+
+def next(self):
+key, value = self.next_item if self.next_item else 
next(self.iterator)
+values = ExternalList([value])
+try:
+while True:
+k, v = next(self.iterator)
+if k != key:
+self.next_item = (k, v)
+break
+values.append(v)
+except StopIteration:
+self.next_item = None
+return key, values
+
+
+class ChainedIterable(object):
+
+Picklable chained iterator, similar to itertools.chain.from_iterable()
+
+def __init__(self, iterators):
+self.iterators = iterators
+
+def __len__(self):
+try:
+return len(self.iterators)
+except:
--- End diff --

Can this be `except TypeError` instead?



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2015-04-08 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1977#issuecomment-91115066
  
  [Test build #29921 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29921/consoleFull)
 for   PR 1977 at commit 
[`e78c15c`](https://github.com/apache/spark/commit/e78c15c41ff0c1cf393bb6e65eb513ebd8357c75).
 * This patch **fails PySpark unit tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `class FlattenedValuesSerializer(BatchedSerializer):`
  * `class ExternalList(object):`
  * `class ExternalListOfList(ExternalList):`
  * `class GroupByKey(object):`
  * `class ExternalGroupBy(ExternalMerger):`

 * This patch does not change any dependencies.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2015-04-08 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/1977#issuecomment-91115081
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/29921/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2015-04-08 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1977#issuecomment-91036610
  
  [Test build #29891 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29891/consoleFull)
 for   PR 1977 at commit 
[`0dcf320`](https://github.com/apache/spark/commit/0dcf3208220a46cead751aacee7e7e3e22a5132b).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2015-04-08 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1977#issuecomment-91062845
  
  [Test build #29900 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29900/consoleFull)
 for   PR 1977 at commit 
[`0b0fde8`](https://github.com/apache/spark/commit/0b0fde8b568141b03d7ba8b7d74e5b885dcb4557).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2015-04-08 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1977#issuecomment-91108728
  
  [Test build #29921 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29921/consoleFull)
 for   PR 1977 at commit 
[`e78c15c`](https://github.com/apache/spark/commit/e78c15c41ff0c1cf393bb6e65eb513ebd8357c75).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2015-04-08 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/1977#discussion_r28028089
  
--- Diff: python/pyspark/shuffle.py ---
@@ -529,6 +522,322 @@ def sorted(self, iterator, key=None, reverse=False):
 return heapq.merge(chunks, key=key, reverse=reverse)
 
 
+class ExternalList(object):
+
+ExternalList can have many items which cannot be hold in memory in
+the same time.
+
+ l = ExternalList(range(100))
+ len(l)
+100
+ l.append(10)
+ len(l)
+101
+ for i in range(10240):
+... l.append(i)
+ len(l)
+10341
+ import pickle
+ l2 = pickle.loads(pickle.dumps(l))
+ len(l2)
+10341
+ list(l2)[100]
+10
+
+LIMIT = 10240
+
+def __init__(self, values):
+self.values = values
+self.count = len(values)
+self._file = None
+self._ser = None
+
+def __getstate__(self):
+if self._file is not None:
+self._file.flush()
+f = os.fdopen(os.dup(self._file.fileno()))
+f.seek(0)
+serialized = f.read()
+else:
+serialized = ''
+return self.values, self.count, serialized
+
+def __setstate__(self, item):
+self.values, self.count, serialized = item
+if serialized:
+self._open_file()
+self._file.write(serialized)
+else:
+self._file = None
+self._ser = None
+
+def __iter__(self):
+if self._file is not None:
+self._file.flush()
+# read all items from disks first
+with os.fdopen(os.dup(self._file.fileno()), 'r') as f:
+f.seek(0)
+for values in self._ser.load_stream(f):
+for v in values:
+yield v
+
+for v in self.values:
+yield v
+
+def __len__(self):
+return self.count
+
+def append(self, value):
+self.values.append(value)
+self.count += 1
+# dump them into disk if the key is huge
+if len(self.values) = self.LIMIT:
+self._spill()
+
+def _open_file(self):
+dirs = _get_local_dirs(objects)
+d = dirs[id(self) % len(dirs)]
+if not os.path.exists(d):
+os.makedirs(d)
+p = os.path.join(d, str(id))
+self._file = open(p, w+, 65536)
+self._ser = CompressedSerializer(PickleSerializer())
+os.unlink(p)
+
+def _spill(self):
+ dump the values into disk 
+global MemoryBytesSpilled, DiskBytesSpilled
+if self._file is None:
+self._open_file()
+
+used_memory = get_used_memory()
+pos = self._file.tell()
+self._ser.dump_stream([self.values], self._file)
--- End diff --

Aha, I see that this because we don't use a batching serializer.  This 
makes sense now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2015-04-08 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/1977#discussion_r28027829
  
--- Diff: python/pyspark/shuffle.py ---
@@ -529,6 +522,322 @@ def sorted(self, iterator, key=None, reverse=False):
 return heapq.merge(chunks, key=key, reverse=reverse)
 
 
+class ExternalList(object):
+
+ExternalList can have many items which cannot be hold in memory in
+the same time.
+
+ l = ExternalList(range(100))
+ len(l)
+100
+ l.append(10)
+ len(l)
+101
+ for i in range(10240):
+... l.append(i)
+ len(l)
+10341
+ import pickle
+ l2 = pickle.loads(pickle.dumps(l))
+ len(l2)
+10341
+ list(l2)[100]
+10
+
+LIMIT = 10240
+
+def __init__(self, values):
+self.values = values
+self.count = len(values)
+self._file = None
+self._ser = None
+
+def __getstate__(self):
+if self._file is not None:
+self._file.flush()
+f = os.fdopen(os.dup(self._file.fileno()))
+f.seek(0)
+serialized = f.read()
+else:
+serialized = ''
+return self.values, self.count, serialized
+
+def __setstate__(self, item):
+self.values, self.count, serialized = item
+if serialized:
+self._open_file()
+self._file.write(serialized)
+else:
+self._file = None
+self._ser = None
+
+def __iter__(self):
+if self._file is not None:
+self._file.flush()
+# read all items from disks first
+with os.fdopen(os.dup(self._file.fileno()), 'r') as f:
+f.seek(0)
+for values in self._ser.load_stream(f):
+for v in values:
+yield v
+
+for v in self.values:
+yield v
+
+def __len__(self):
+return self.count
+
+def append(self, value):
+self.values.append(value)
+self.count += 1
+# dump them into disk if the key is huge
+if len(self.values) = self.LIMIT:
+self._spill()
+
+def _open_file(self):
+dirs = _get_local_dirs(objects)
+d = dirs[id(self) % len(dirs)]
+if not os.path.exists(d):
+os.makedirs(d)
+p = os.path.join(d, str(id))
+self._file = open(p, w+, 65536)
+self._ser = CompressedSerializer(PickleSerializer())
+os.unlink(p)
+
+def _spill(self):
+ dump the values into disk 
+global MemoryBytesSpilled, DiskBytesSpilled
+if self._file is None:
+self._open_file()
+
+used_memory = get_used_memory()
+pos = self._file.tell()
+self._ser.dump_stream([self.values], self._file)
--- End diff --

If `self.values` is a collection, why do we need to wrap it into a list 
prior to dumping it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2015-04-08 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1977#issuecomment-91050238
  
  [Test build #29891 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29891/consoleFull)
 for   PR 1977 at commit 
[`0dcf320`](https://github.com/apache/spark/commit/0dcf3208220a46cead751aacee7e7e3e22a5132b).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `class FlattenedValuesSerializer(BatchedSerializer):`
  * `class ExternalList(object):`
  * `class GroupByKey(object):`
  * `class ChainedIterable(object):`
  * `class ExternalGroupBy(ExternalMerger):`

 * This patch does not change any dependencies.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2015-04-08 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1977#issuecomment-91077061
  
  [Test build #29900 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29900/consoleFull)
 for   PR 1977 at commit 
[`0b0fde8`](https://github.com/apache/spark/commit/0b0fde8b568141b03d7ba8b7d74e5b885dcb4557).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `class FlattenedValuesSerializer(BatchedSerializer):`
  * `class ExternalList(object):`
  * `class ExternalListOfList(ExternalList):`
  * `class GroupByKey(object):`
  * `class GroupListsByKey(GroupByKey):`
  * `class ChainedIterable(object):`
  * `class ExternalGroupBy(ExternalMerger):`

 * This patch does not change any dependencies.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2015-04-08 Thread JoshRosen
Github user JoshRosen commented on the pull request:

https://github.com/apache/spark/pull/1977#issuecomment-91081544
  
Sorry for my initial confusion regarding the external lists of lists.  I 
think that the `__len__` thing might be an issue if we ever directly expose 
`ExternalListOfList` to users, but it looks like we currently only expose it 
through a `ChainedIterable` in this code, so it doesn't appear to be a problem 
yet.  This still might be worth addressing if you agree that it could help 
prevent future bugs if we start using this in more places.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2015-04-08 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/1977#discussion_r28010925
  
--- Diff: python/pyspark/shuffle.py ---
@@ -367,32 +372,13 @@ def iteritems(self):
 
 def _external_items(self):
  Return all partitioned items as iterator 
-assert not self.data
--- End diff --

The assumption is still true, I can add that back.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2015-04-08 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/1977#discussion_r28009944
  
--- Diff: python/pyspark/shuffle.py ---
@@ -529,6 +520,295 @@ def sorted(self, iterator, key=None, reverse=False):
 return heapq.merge(chunks, key=key, reverse=reverse)
 
 
+class ExternalList(object):
+
+ExternalList can have many items which cannot be hold in memory in
+the same time.
+
+ l = ExternalList(range(100))
+ len(l)
+100
+ l.append(10)
+ len(l)
+101
+ for i in range(10240):
+... l.append(i)
+ len(l)
+10341
+ import pickle
+ l2 = pickle.loads(pickle.dumps(l))
+ len(l2)
+10341
+ list(l2)[100]
+10
+
+LIMIT = 10240
+
+def __init__(self, values):
+self.values = values
+self.disk_count = 0
+self._file = None
+self._ser = None
+
+def __getstate__(self):
+if self._file is not None:
+self._file.flush()
+f = os.fdopen(os.dup(self._file.fileno()))
+f.seek(0)
+bytes = f.read()
+else:
+bytes = ''
+return self.values, self.disk_count, bytes
+
+def __setstate__(self, item):
+self.values, self.disk_count, bytes = item
+if bytes:
+self._open_file()
+self._file.write(bytes)
+else:
+self._file = None
+self._ser = None
+
+def __iter__(self):
+if self._file is not None:
+self._file.flush()
+# read all items from disks first
+with os.fdopen(os.dup(self._file.fileno()), 'r') as f:
+f.seek(0)
+for values in self._ser.load_stream(f):
+for v in values:
+yield v
+
+for v in self.values:
+yield v
+
+def __len__(self):
+return self.disk_count + len(self.values)
+
+def append(self, value):
+self.values.append(value)
+# dump them into disk if the key is huge
+if len(self.values) = self.LIMIT:
+self._spill()
+
+def _open_file(self):
+dirs = _get_local_dirs(objects)
+d = dirs[id(self) % len(dirs)]
+if not os.path.exists(d):
+os.makedirs(d)
+p = os.path.join(d, str(id))
+self._file = open(p, w+, 65536)
+self._ser = CompressedSerializer(PickleSerializer())
+os.unlink(p)
--- End diff --

We want to remove the file once it's closed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2015-04-07 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1977#issuecomment-90697704
  
  [Test build #29802 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29802/consoleFull)
 for   PR 1977 at commit 
[`2a1857a`](https://github.com/apache/spark/commit/2a1857a4c1d72bee5a93ffbe1da013ffc7ff673b).
 * This patch **fails PySpark unit tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `class ResultIterable(object):`
  * `class FlattedValuesSerializer(BatchedSerializer):`
  * `class ExternalList(object):`
  * `class GroupByKey(object):`
  * `class ChainedIterable(object):`
  * `class ExternalGroupBy(ExternalMerger):`

 * This patch does not change any dependencies.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2015-04-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/1977#issuecomment-90697722
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/29802/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2015-04-07 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1977#issuecomment-90670585
  
  [Test build #29802 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29802/consoleFull)
 for   PR 1977 at commit 
[`2a1857a`](https://github.com/apache/spark/commit/2a1857a4c1d72bee5a93ffbe1da013ffc7ff673b).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2015-04-07 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1977#issuecomment-90717977
  
  [Test build #637 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/637/consoleFull)
 for   PR 1977 at commit 
[`2a1857a`](https://github.com/apache/spark/commit/2a1857a4c1d72bee5a93ffbe1da013ffc7ff673b).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2015-04-07 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1977#issuecomment-90718319
  
  [Test build #638 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/638/consoleFull)
 for   PR 1977 at commit 
[`2a1857a`](https://github.com/apache/spark/commit/2a1857a4c1d72bee5a93ffbe1da013ffc7ff673b).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2015-04-07 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1977#issuecomment-90737032
  
  [Test build #638 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/638/consoleFull)
 for   PR 1977 at commit 
[`2a1857a`](https://github.com/apache/spark/commit/2a1857a4c1d72bee5a93ffbe1da013ffc7ff673b).
 * This patch **fails PySpark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.
 * This patch does not change any dependencies.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2015-04-07 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1977#issuecomment-90740407
  
  [Test build #637 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/637/consoleFull)
 for   PR 1977 at commit 
[`2a1857a`](https://github.com/apache/spark/commit/2a1857a4c1d72bee5a93ffbe1da013ffc7ff673b).
 * This patch **fails PySpark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.
 * This patch **adds the following new dependencies:**
   * `activation-1.1.jar`
   * `aopalliance-1.0.jar`
   * `avro-1.7.7.jar`
   * `breeze-macros_2.10-0.11.2.jar`
   * `breeze_2.10-0.11.2.jar`
   * `commons-cli-1.2.jar`
   * `commons-compress-1.4.1.jar`
   * `commons-io-2.1.jar`
   * `commons-lang-2.5.jar`
   * `gmbal-api-only-3.0.0-b023.jar`
   * `grizzly-framework-2.1.2.jar`
   * `grizzly-http-2.1.2.jar`
   * `grizzly-http-server-2.1.2.jar`
   * `grizzly-http-servlet-2.1.2.jar`
   * `grizzly-rcm-2.1.2.jar`
   * `guice-3.0.jar`
   * `hadoop-annotations-2.2.0.jar`
   * `hadoop-auth-2.2.0.jar`
   * `hadoop-client-2.2.0.jar`
   * `hadoop-common-2.2.0.jar`
   * `hadoop-hdfs-2.2.0.jar`
   * `hadoop-mapreduce-client-app-2.2.0.jar`
   * `hadoop-mapreduce-client-common-2.2.0.jar`
   * `hadoop-mapreduce-client-core-2.2.0.jar`
   * `hadoop-mapreduce-client-jobclient-2.2.0.jar`
   * `hadoop-mapreduce-client-shuffle-2.2.0.jar`
   * `hadoop-yarn-api-2.2.0.jar`
   * `hadoop-yarn-client-2.2.0.jar`
   * `hadoop-yarn-common-2.2.0.jar`
   * `hadoop-yarn-server-common-2.2.0.jar`
   * `jackson-jaxrs-1.8.8.jar`
   * `jackson-xc-1.8.8.jar`
   * `javax.inject-1.jar`
   * `javax.servlet-3.1.jar`
   * `javax.servlet-api-3.0.1.jar`
   * `jaxb-api-2.2.2.jar`
   * `jaxb-impl-2.2.3-1.jar`
   * `jersey-client-1.9.jar`
   * `jersey-core-1.9.jar`
   * `jersey-grizzly2-1.9.jar`
   * `jersey-guice-1.9.jar`
   * `jersey-json-1.9.jar`
   * `jersey-server-1.9.jar`
   * `jersey-test-framework-core-1.9.jar`
   * `jersey-test-framework-grizzly2-1.9.jar`
   * `jettison-1.1.jar`
   * `jetty-util-6.1.26.jar`
   * `management-api-3.0.0-b012.jar`
   * `protobuf-java-2.4.1.jar`
   * `spark-bagel_2.10-1.4.0-SNAPSHOT.jar`
   * `spark-catalyst_2.10-1.4.0-SNAPSHOT.jar`
   * `spark-core_2.10-1.4.0-SNAPSHOT.jar`
   * `spark-graphx_2.10-1.4.0-SNAPSHOT.jar`
   * `spark-launcher_2.10-1.4.0-SNAPSHOT.jar`
   * `spark-mllib_2.10-1.4.0-SNAPSHOT.jar`
   * `spark-network-common_2.10-1.4.0-SNAPSHOT.jar`
   * `spark-network-shuffle_2.10-1.4.0-SNAPSHOT.jar`
   * `spark-repl_2.10-1.4.0-SNAPSHOT.jar`
   * `spark-sql_2.10-1.4.0-SNAPSHOT.jar`
   * `spark-streaming_2.10-1.4.0-SNAPSHOT.jar`
   * `stax-api-1.0.1.jar`
   * `xz-1.0.jar`

 * This patch **removes the following dependencies:**
   * `breeze-macros_2.10-0.11.1.jar`
   * `breeze_2.10-0.11.1.jar`
   * `commons-el-1.0.jar`
   * `commons-io-2.4.jar`
   * `commons-lang-2.4.jar`
   * `hadoop-client-1.0.4.jar`
   * `hadoop-core-1.0.4.jar`
   * `hsqldb-1.8.0.10.jar`
   * `jblas-1.2.3.jar`
   * `spark-bagel_2.10-1.3.0-SNAPSHOT.jar`
   * `spark-catalyst_2.10-1.3.0-SNAPSHOT.jar`
   * `spark-core_2.10-1.3.0-SNAPSHOT.jar`
   * `spark-graphx_2.10-1.3.0-SNAPSHOT.jar`
   * `spark-launcher_2.10-1.3.0-SNAPSHOT.jar`
   * `spark-mllib_2.10-1.3.0-SNAPSHOT.jar`
   * `spark-network-common_2.10-1.3.0-SNAPSHOT.jar`
   * `spark-network-shuffle_2.10-1.3.0-SNAPSHOT.jar`
   * `spark-repl_2.10-1.3.0-SNAPSHOT.jar`
   * `spark-sql_2.10-1.3.0-SNAPSHOT.jar`
   * `spark-streaming_2.10-1.3.0-SNAPSHOT.jar`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2015-04-07 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1977#issuecomment-90750781
  
  [Test build #29816 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29816/consoleFull)
 for   PR 1977 at commit 
[`e3b8eab`](https://github.com/apache/spark/commit/e3b8eab0677d39eed0476f5cf1a9f3e4ad963bdc).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2015-04-07 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1977#issuecomment-90766722
  
  [Test build #29816 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29816/consoleFull)
 for   PR 1977 at commit 
[`e3b8eab`](https://github.com/apache/spark/commit/e3b8eab0677d39eed0476f5cf1a9f3e4ad963bdc).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.
 * This patch does not change any dependencies.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2015-04-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/1977#issuecomment-90766727
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/29816/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2015-04-06 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1977#issuecomment-90293354
  
**[Test build #29757 timed 
out](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29757/consoleFull)**
 for PR 1977 at commit 
[`d9589ab`](https://github.com/apache/spark/commit/d9589ab0dbcd6aa443906c3b1ec432d6c68f0587)
 after a configured wait of `120m`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2015-04-06 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/1977#issuecomment-90293365
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/29757/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2015-04-06 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/1977#issuecomment-90288434
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/29756/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2015-04-06 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1977#issuecomment-90288396
  
**[Test build #29756 timed 
out](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29756/consoleFull)**
 for PR 1977 at commit 
[`c6a2f8d`](https://github.com/apache/spark/commit/c6a2f8d1ad67d9e403391399a78202bdd1ffe561)
 after a configured wait of `120m`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2015-04-06 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1977#issuecomment-90258222
  
  [Test build #29757 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29757/consoleFull)
 for   PR 1977 at commit 
[`d9589ab`](https://github.com/apache/spark/commit/d9589ab0dbcd6aa443906c3b1ec432d6c68f0587).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2015-04-06 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1977#issuecomment-90255941
  
  [Test build #29756 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29756/consoleFull)
 for   PR 1977 at commit 
[`c6a2f8d`](https://github.com/apache/spark/commit/c6a2f8d1ad67d9e403391399a78202bdd1ffe561).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2015-04-06 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1977#issuecomment-90356172
  
  [Test build #29779 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29779/consoleFull)
 for   PR 1977 at commit 
[`d2f053b`](https://github.com/apache/spark/commit/d2f053b3eb39ff0c42a3e5df27f0ed4accc1c1d5).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2015-04-06 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/1977#issuecomment-90371839
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/29779/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2015-04-06 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1977#issuecomment-90371821
  
  [Test build #29779 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29779/consoleFull)
 for   PR 1977 at commit 
[`d2f053b`](https://github.com/apache/spark/commit/d2f053b3eb39ff0c42a3e5df27f0ed4accc1c1d5).
 * This patch **fails PySpark unit tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `class ResultIterable(object):`
  * `class FlattedValuesSerializer(BatchedSerializer):`
  * `class ExternalList(object):`
  * `class GroupByKey(object):`
  * `class ChainedIterable(object):`
  * `class ExternalGroupBy(ExternalMerger):`

 * This patch does not change any dependencies.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2015-03-26 Thread mkhaitman
Github user mkhaitman commented on the pull request:

https://github.com/apache/spark/pull/1977#issuecomment-86640421
  
Tried this out with 1.3-rc3 and was getting FetchFailedExceptions while 
performing a join between two RDDs:

org.apache.spark.shuffle.FetchFailedException: Error in opening 
FileSegmentManagedBuffer{file=/tmp/spark-6b363150-d169-4f3e-b703-98048b3dca7f/spark-edaee223-72e4-47f5-a984-64d25bdc34bd/spark-97f52fa9-3fc6-49f4-b1da-660c08e6d826/blockmgr-18bb9336-ca07-43db-ae49-c4be84846eb3/2c/shuffle_2_11_0.data,
 offset=1193, length=230}




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2015-03-26 Thread davies
Github user davies commented on the pull request:

https://github.com/apache/spark/pull/1977#issuecomment-86686865
  
Do you hit this error without the patch? I have no idea on why they are 
related.

On Thursday, March 26, 2015 at 10:43 AM, mkhaitman wrote:

 Tried this out with 1.3-rc3 and was getting FetchFailedExceptions while 
performing a join between two RDDs:
 org.apache.spark.shuffle.FetchFailedException: Error in opening 
FileSegmentManagedBuffer{file=/tmp/spark-6b363150-d169-4f3e-b703-98048b3dca7f/spark-edaee223-72e4-47f5-a984-64d25bdc34bd/spark-97f52fa9-3fc6-49f4-b1da-660c08e6d826/blockmgr-18bb9336-ca07-43db-ae49-c4be84846eb3/2c/shuffle_2_11_0.data,
 offset=1193, length=230}
  
 —
 Reply to this email directly or view it on GitHub 
(https://github.com/apache/spark/pull/1977#issuecomment-86640421).
  
  
  
  
  





---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2015-03-26 Thread mkhaitman
Github user mkhaitman commented on the pull request:

https://github.com/apache/spark/pull/1977#issuecomment-86688264
  
@davies Sorry, I deleted the comment though you still received the 
notification. Think it was just a fluke since it didn't happen the second time. 
Sorry about that! So far looking good on joins.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2015-03-26 Thread mkhaitman
Github user mkhaitman commented on a diff in the pull request:

https://github.com/apache/spark/pull/1977#discussion_r27223983
  
--- Diff: python/pyspark/shuffle.py ---
@@ -244,72 +258,57 @@ def _next_limit(self):
 
 def mergeValues(self, iterator):
  Combine the items by creator and combiner 
-iterator = iter(iterator)
 # speedup attribute lookup
 creator, comb = self.agg.createCombiner, self.agg.mergeValue
-d, c, batch = self.data, 0, self.batch
+c, data, pdata, hfun, batch = 0, self.data, self.pdata, 
self._partition, self.batch
+limit = self.memory_limit
 
 for k, v in iterator:
+d = pdata[hfun(k)] if pdata else data
 d[k] = comb(d[k], v) if k in d else creator(v)
 
 c += 1
-if c % batch == 0 and get_used_memory()  self.memory_limit:
-self._spill()
-self._partitioned_mergeValues(iterator, self._next_limit())
-break
+if c = batch:
+if get_used_memory() = limit:
+self._spill()
+limit = self._next_limit()
+batch /= 2
+c = 0
+else:
+batch *= 1.5
 
 def _partition(self, key):
  Return the partition for key 
 return hash((key, self._seed)) % self.partitions
 
-def _partitioned_mergeValues(self, iterator, limit=0):
- Partition the items by key, then combine them 
-# speedup attribute lookup
-creator, comb = self.agg.createCombiner, self.agg.mergeValue
-c, pdata, hfun, batch = 0, self.pdata, self._partition, self.batch
-
-for k, v in iterator:
-d = pdata[hfun(k)]
-d[k] = comb(d[k], v) if k in d else creator(v)
-if not limit:
-continue
-
-c += 1
-if c % batch == 0 and get_used_memory()  limit:
-self._spill()
-limit = self._next_limit()
+def _object_size(self, obj):
+ How much of memory for this obj, assume that all the objects
+consume similar bytes of memory
+
+return 1
 
-def mergeCombiners(self, iterator, check=True):
+def mergeCombiners(self, iterator, limit=None):
  Merge (K,V) pair by mergeCombiner 
-iterator = iter(iterator)
+if limit is None:
+limit = self.memory_limit
 # speedup attribute lookup
-d, comb, batch = self.data, self.agg.mergeCombiners, self.batch
-c = 0
-for k, v in iterator:
-d[k] = comb(d[k], v) if k in d else v
-if not check:
-continue
-
-c += 1
-if c % batch == 0 and get_used_memory()  self.memory_limit:
-self._spill()
-self._partitioned_mergeCombiners(iterator, 
self._next_limit())
-break
-
-def _partitioned_mergeCombiners(self, iterator, limit=0):
- Partition the items by key, then merge them 
-comb, pdata = self.agg.mergeCombiners, self.pdata
-c, hfun = 0, self._partition
+comb, hfun, objsize = self.agg.mergeCombiners, self._partition, 
self._object_size
+c, data, pdata, batch = 0, self.data, self.pdata, self.batch
 for k, v in iterator:
-d = pdata[hfun(k)]
+d = pdata[hfun(k)] if pdata else data
 d[k] = comb(d[k], v) if k in d else v
 if not limit:
 continue
 
-c += 1
-if c % self.batch == 0 and get_used_memory()  limit:
-self._spill()
-limit = self._next_limit()
+c += objsize(v)
+if c  batch:
+if get_used_memory()  limit:
+self._spill()
+limit = self._next_limit()
+batch /= 2
+c = 0
+else:
+batch *= 1.5
 
--- End diff --

Would it make sense to do a final memory check after the for loop, in case 
we just added another 999 items (worst case) without spilling any of it to disk?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2015-03-26 Thread mkhaitman
Github user mkhaitman commented on a diff in the pull request:

https://github.com/apache/spark/pull/1977#discussion_r27223759
  
--- Diff: python/pyspark/shuffle.py ---
@@ -244,72 +258,57 @@ def _next_limit(self):
 
 def mergeValues(self, iterator):
  Combine the items by creator and combiner 
-iterator = iter(iterator)
 # speedup attribute lookup
 creator, comb = self.agg.createCombiner, self.agg.mergeValue
-d, c, batch = self.data, 0, self.batch
+c, data, pdata, hfun, batch = 0, self.data, self.pdata, 
self._partition, self.batch
+limit = self.memory_limit
 
 for k, v in iterator:
+d = pdata[hfun(k)] if pdata else data
 d[k] = comb(d[k], v) if k in d else creator(v)
 
 c += 1
-if c % batch == 0 and get_used_memory()  self.memory_limit:
-self._spill()
-self._partitioned_mergeValues(iterator, self._next_limit())
-break
+if c = batch:
+if get_used_memory() = limit:
+self._spill()
+limit = self._next_limit()
+batch /= 2
+c = 0
+else:
+batch *= 1.5
 
--- End diff --

Would it make sense to do a final memory check after the for loop, in case 
we just added another 999 items (worst case) without spilling any of it to 
disk? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2015-03-25 Thread mkhaitman
Github user mkhaitman commented on the pull request:

https://github.com/apache/spark/pull/1977#issuecomment-86236377
  
This PR looks amazing! I'm going to test this out tomorrow with 1.3-rc3 and 
report back with some findings. I started taking a stab initially at trying to 
improve the shuffle memory handling a bit but it looks like I don't have to now!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2015-03-10 Thread airhorns
Github user airhorns commented on the pull request:

https://github.com/apache/spark/pull/1977#issuecomment-78116054
  
Hey @davies this would be freakin' fantastic to get merged... any chance 
that might happen soon? We hit many issues with skewed group sizes causing a 
whole job to fail and in my brief experimentation I think this would make 
things far more stable for us. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2015-03-10 Thread davies
Github user davies commented on the pull request:

https://github.com/apache/spark/pull/1977#issuecomment-78118374
  
 Ping @JoshRosen


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2014-12-30 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1977#issuecomment-68388416
  
  [Test build #24904 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24904/consoleFull)
 for   PR 1977 at commit 
[`2b9c261`](https://github.com/apache/spark/commit/2b9c261497fab4bb8d310d30e0017aa3b3d2f37f).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2014-12-30 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1977#issuecomment-68400348
  
**[Test build #24904 timed 
out](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24904/consoleFull)**
 for PR 1977 at commit 
[`2b9c261`](https://github.com/apache/spark/commit/2b9c261497fab4bb8d310d30e0017aa3b3d2f37f)
 after a configured wait of `120m`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2014-12-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/1977#issuecomment-68400353
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24904/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2014-12-30 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1977#issuecomment-68400955
  
  [Test build #557 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/557/consoleFull)
 for   PR 1977 at commit 
[`2b9c261`](https://github.com/apache/spark/commit/2b9c261497fab4bb8d310d30e0017aa3b3d2f37f).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2014-12-30 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1977#issuecomment-68407892
  
  [Test build #557 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/557/consoleFull)
 for   PR 1977 at commit 
[`2b9c261`](https://github.com/apache/spark/commit/2b9c261497fab4bb8d310d30e0017aa3b3d2f37f).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `class ResultIterable(object):`
  * `class FlattedValuesSerializer(BatchedSerializer):`
  * `class ExternalList(object):`
  * `class GroupByKey(object):`
  * `class ChainedIterable(object):`
  * `class ExternalGroupBy(ExternalMerger):`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2014-11-14 Thread JoshRosen
Github user JoshRosen commented on the pull request:

https://github.com/apache/spark/pull/1977#issuecomment-63164145
  
I agree that this is a good fix; I've been letting the review slip because 
this PR is pretty complex and it will take me a decent amount of time to be 
sure that it's correct in all cases.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2014-11-10 Thread davies
Github user davies commented on the pull request:

https://github.com/apache/spark/pull/1977#issuecomment-62447893
  
@JoshRosen @mateiz Could we make this into 1.2 ? it had sit here for 2 
months.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2014-11-10 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1977#issuecomment-62448001
  
  [Test build #23156 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/23156/consoleFull)
 for   PR 1977 at commit 
[`70aadcd`](https://github.com/apache/spark/commit/70aadcdba2952c38e3ce531ffd25761b553c1369).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2014-11-10 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/1977#issuecomment-62460926
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/23156/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2014-11-10 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1977#issuecomment-62460909
  
  [Test build #23156 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/23156/consoleFull)
 for   PR 1977 at commit 
[`70aadcd`](https://github.com/apache/spark/commit/70aadcdba2952c38e3ce531ffd25761b553c1369).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `public final class JavaRecoverableNetworkWordCount `
  * `class ResultIterable(object):`
  * `class FlattedValuesSerializer(BatchedSerializer):`
  * `class ExternalList(object):`
  * `class GroupByKey(object):`
  * `class ChainedIterable(object):`
  * `class ExternalGroupBy(ExternalMerger):`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2014-11-10 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/1977#discussion_r20132241
  
--- Diff: python/pyspark/shuffle.py ---
@@ -520,6 +505,295 @@ def sorted(self, iterator, key=None, reverse=False):
 return heapq.merge(chunks, key=key, reverse=reverse)
 
 
+class ExternalList(object):
+
+ExternalList can have many items which cannot be hold in memory in
+the same time.
+
+ l = ExternalList(range(100))
+ len(l)
+100
+ l.append(10)
+ len(l)
+101
+ for i in range(10240):
+... l.append(i)
+ len(l)
+10341
+ import pickle
+ l2 = pickle.loads(pickle.dumps(l))
+ len(l2)
+10341
+ list(l2)[100]
+10
+
+LIMIT = 10240
+
+def __init__(self, values):
+self.values = values
+self.disk_count = 0
+self._file = None
+self._ser = None
+
+def __getstate__(self):
+if self._file is not None:
+self._file.flush()
+f = os.fdopen(os.dup(self._file.fileno()))
+f.seek(0)
+bytes = f.read()
+else:
+bytes = ''
+return self.values, self.disk_count, bytes
+
+def __setstate__(self, item):
+self.values, self.disk_count, bytes = item
+if bytes:
+self._open_file()
+self._file.write(bytes)
+else:
+self._file = None
+self._ser = None
+
+def __iter__(self):
+if self._file is not None:
+self._file.flush()
+# read all items from disks first
+with os.fdopen(os.dup(self._file.fileno()), 'r') as f:
+f.seek(0)
+for values in self._ser.load_stream(f):
+for v in values:
+yield v
+
+for v in self.values:
+yield v
+
+def __len__(self):
+return self.disk_count + len(self.values)
+
+def append(self, value):
+self.values.append(value)
+# dump them into disk if the key is huge
+if len(self.values) = self.LIMIT:
+self._spill()
+
+def _open_file(self):
+dirs = _get_local_dirs(objects)
+d = dirs[id(self) % len(dirs)]
+if not os.path.exists(d):
+os.makedirs(d)
+p = os.path.join(d, str(id))
+self._file = open(p, w+, 65536)
+self._ser = CompressedSerializer(PickleSerializer())
+os.unlink(p)
+
+def _spill(self):
+ dump the values into disk 
+global MemoryBytesSpilled, DiskBytesSpilled
+if self._file is None:
+self._open_file()
+
+used_memory = get_used_memory()
+pos = self._file.tell()
+self._ser.dump_stream([self.values], self._file)
+self.disk_count += len(self.values)
+self.values = []
+gc.collect()
+DiskBytesSpilled += self._file.tell() - pos
+MemoryBytesSpilled += (used_memory - get_used_memory())  20
+
+
+class GroupByKey(object):
+
+group a sorted iterator into [(k1, it1), (k2, it2), ...]
+
+ k = [i/3 for i in range(6)]
+ v = [i for i in range(6)]
+ g = GroupByKey(iter(zip(k, v)))
+ [(k, list(it)) for k, it in g]
+[(0, [0, 1, 2]), (1, [3, 4, 5])]
+
+def __init__(self, iterator):
+self.iterator = iterator
+self.next_item = None
+
+def __iter__(self):
+return self
+
+def next(self):
+key, value = self.next_item if self.next_item else 
next(self.iterator)
+values = ExternalList([value])
+try:
+while True:
+k, v = next(self.iterator)
+if k != key:
+self.next_item = (k, v)
+break
+values.append(v)
+except StopIteration:
+self.next_item = None
+return key, values
+
+
+class ChainedIterable(object):
+
+Picklable chained iterator, similar to itertools.chain.fromiterable()
+
+def __init__(self, iterators):
+self.iterators = iterators
+
+def __len__(self):
+return sum(len(vs) for vs in self.iterators)
+
+def __iter__(self):
+return itertools.chain.fromiterable(self.iterators)
+
+
+class ExternalGroupBy(ExternalMerger):
+
+
+Group by the items by key. If any partition of them can not been
+hold 

[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2014-11-10 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/1977#discussion_r20132424
  
--- Diff: python/pyspark/rdd.py ---
@@ -1579,21 +1577,34 @@ def createZero():
 
 return self.combineByKey(lambda v: func(createZero(), v), func, 
func, numPartitions)
 
+def _can_spill(self):
+return self.ctx._conf.get(spark.shuffle.spill, True).lower() 
== true
+
+def _memory_limit(self):
+return 
_parse_memory(self.ctx._conf.get(spark.python.worker.memory, 512m))
+
 # TODO: support variant with custom partitioner
 def groupByKey(self, numPartitions=None):
 
 Group the values for each key in the RDD into a single sequence.
-Hash-partitions the resulting RDD with into numPartitions 
partitions.
+Hash-partitions the resulting RDD with into numPartitions
--- End diff --

with into should just be into


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2014-11-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/1977#issuecomment-61682581
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22878/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2014-11-04 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1977#issuecomment-61682571
  
  [Test build #22878 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22878/consoleFull)
 for   PR 1977 at commit 
[`a14b4bd`](https://github.com/apache/spark/commit/a14b4bdd1cb72f41a5449326888d600cbe183062).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `class ResultIterable(object):`
  * `class FlattedValuesSerializer(BatchedSerializer):`
  * `class ExternalList(object):`
  * `class GroupByKey(object):`
  * `class ChainedIterable(object):`
  * `class ExternalGroupBy(ExternalMerger):`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2014-10-31 Thread JoshRosen
Github user JoshRosen commented on the pull request:

https://github.com/apache/spark/pull/1977#issuecomment-61338247
  
Jenkins, retest this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2014-10-31 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1977#issuecomment-61338701
  
  [Test build #22651 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22651/consoleFull)
 for   PR 1977 at commit 
[`ab5515b`](https://github.com/apache/spark/commit/ab5515b4eaf93e95a9d9e04f11538119ca25030b).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2014-10-31 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1977#issuecomment-61345056
  
  [Test build #22651 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22651/consoleFull)
 for   PR 1977 at commit 
[`ab5515b`](https://github.com/apache/spark/commit/ab5515b4eaf93e95a9d9e04f11538119ca25030b).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `class ResultIterable(object):`
  * `class FlattedValuesSerializer(BatchedSerializer):`
  * `class ExternalList(object):`
  * `class GroupByKey(object):`
  * `class ChainedIterable(object):`
  * `class ExternalGroupBy(ExternalMerger):`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2014-10-28 Thread davies
Github user davies commented on the pull request:

https://github.com/apache/spark/pull/1977#issuecomment-60717966
  
Jenkins, test this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2014-10-28 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1977#issuecomment-60718072
  
  [Test build #484 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/484/consoleFull)
 for   PR 1977 at commit 
[`ab5515b`](https://github.com/apache/spark/commit/ab5515b4eaf93e95a9d9e04f11538119ca25030b).
 * This patch **does not merge cleanly**.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2014-10-28 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1977#issuecomment-60718263
  
  [Test build #22348 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22348/consoleFull)
 for   PR 1977 at commit 
[`ab5515b`](https://github.com/apache/spark/commit/ab5515b4eaf93e95a9d9e04f11538119ca25030b).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2014-10-28 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1977#issuecomment-60722423
  
  [Test build #22348 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22348/consoleFull)
 for   PR 1977 at commit 
[`ab5515b`](https://github.com/apache/spark/commit/ab5515b4eaf93e95a9d9e04f11538119ca25030b).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `class ResultIterable(object):`
  * `class FlattedValuesSerializer(BatchedSerializer):`
  * `class ExternalList(object):`
  * `class GroupByKey(object):`
  * `class ChainedIterable(object):`
  * `class ExternalGroupBy(ExternalMerger):`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2014-10-28 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/1977#issuecomment-60722431
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22348/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2014-10-25 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/1977#issuecomment-60473744
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22199/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2014-10-25 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1977#issuecomment-60473742
  
  [Test build #22199 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22199/consoleFull)
 for   PR 1977 at commit 
[`ab5515b`](https://github.com/apache/spark/commit/ab5515b4eaf93e95a9d9e04f11538119ca25030b).
 * This patch **fails PySpark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2014-10-25 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1977#issuecomment-60474811
  
**[Test build #431 timed 
out](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/431/consoleFull)**
 for PR 1977 at commit 
[`651f891`](https://github.com/apache/spark/commit/651f8919c0fe8b55967823f1f051ea85e0c4daba)
 after a configured wait of `120m`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2014-10-24 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1977#issuecomment-60472577
  
  [Test build #431 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/431/consoleFull)
 for   PR 1977 at commit 
[`651f891`](https://github.com/apache/spark/commit/651f8919c0fe8b55967823f1f051ea85e0c4daba).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2014-10-24 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1977#issuecomment-60472642
  
  [Test build #22199 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22199/consoleFull)
 for   PR 1977 at commit 
[`ab5515b`](https://github.com/apache/spark/commit/ab5515b4eaf93e95a9d9e04f11538119ca25030b).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2014-10-07 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1977#issuecomment-58248987
  
  [QA tests have 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21397/consoleFull)
 for   PR 1977 at commit 
[`651f891`](https://github.com/apache/spark/commit/651f8919c0fe8b55967823f1f051ea85e0c4daba).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2014-10-07 Thread davies
Github user davies commented on the pull request:

https://github.com/apache/spark/pull/1977#issuecomment-58250188
  
I had simplify GroupByKey, it's much readable now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

2014-10-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/1977#issuecomment-58259787
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21397/Test 
PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   >