[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user davies commented on the pull request: https://github.com/apache/spark/pull/1977#issuecomment-91363104 @JoshRosen Good catch! fixed it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1977#issuecomment-91133286 [Test build #29925 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29925/consoleFull) for PR 1977 at commit [`67772dd`](https://github.com/apache/spark/commit/67772dd9e5fc2157f1da960aebfa82b133615527). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `class FlattenedValuesSerializer(BatchedSerializer):` * `class ExternalList(object):` * `class ExternalListOfList(ExternalList):` * `class GroupByKey(object):` * `class ExternalGroupBy(ExternalMerger):` * This patch does not change any dependencies. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/1977#issuecomment-91133293 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/29925/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user JoshRosen commented on the pull request: https://github.com/apache/spark/pull/1977#issuecomment-91347431 I spent a bit of time fuzz-testing this code to try to reach 100% coverage of the changes in this patch. While doing so, I think I uncovered a bug: ``` ../Spark/python/pyspark/shuffle.py:383: in _external_items for v in self._merged_items(i): ../Spark/python/pyspark/shuffle.py:826: in genexpr return ((k, vs) for k, vs in GroupByKey(sorted_items)) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = pyspark.shuffle.GroupByKey object at 0x1048d0990 def next(self): key, value = self.next_item if self.next_item else next(self.iterator) E TypeError: list object is not an iterator ../Spark/python/pyspark/shuffle.py:669: TypeError ``` It looks like the `GroupByKey` object expects to be instantiated with an iterator, but in `GroupBy. _merge_sorted_items` we end up calling it with the output of `ExternalSorter.sorted`. It looks like there's a branch in `ExternalSorter.sorted` where we can end up returning a list instead of an iterator (line 517), where we return `current_chunk`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/1977#discussion_r27998013 --- Diff: python/pyspark/rdd.py --- @@ -1755,21 +1753,33 @@ def createZero(): return self.combineByKey(lambda v: func(createZero(), v), func, func, numPartitions) +def _can_spill(self): +return self.ctx._conf.get(spark.shuffle.spill, True).lower() == true + +def _memory_limit(self): +return _parse_memory(self.ctx._conf.get(spark.python.worker.memory, 512m)) + # TODO: support variant with custom partitioner def groupByKey(self, numPartitions=None): Group the values for each key in the RDD into a single sequence. -Hash-partitions the resulting RDD with into numPartitions partitions. +Hash-partitions the resulting RDD with numPartitions partitions. + +The values in the resulting RDD is iterable object L{ResultIterable}, +they can be iterated only once. The `len(values)` will result in --- End diff -- Is this still true? [Upthread](https://github.com/apache/spark/pull/1977#issuecomment-56004283), you mentioned that this can now be iterated multiple times, so we should probably update this comment. It would also be nice to add a unit test to illustrate that the calling `len(values)` then reading the values will work, even if it's inefficient (you might have already done this in tests, though; I haven't checked). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/1977#discussion_r28001568 --- Diff: python/pyspark/serializers.py --- @@ -220,6 +220,29 @@ def __repr__(self): return BatchedSerializer(%s, %d) % (str(self.serializer), self.batchSize) +class FlattedValuesSerializer(BatchedSerializer): --- End diff -- Can we rename this to `FlattenedValuesSerializer` (with the missing `en`)? Should be a pretty simple find-and-replace, but we should also update the other occurrences of `flatted` to be `flattened`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/1977#discussion_r28003603 --- Diff: python/pyspark/shuffle.py --- @@ -367,32 +372,13 @@ def iteritems(self): def _external_items(self): Return all partitioned items as iterator -assert not self.data --- End diff -- I noticed that you moved a few of these assertions. I guess the old assumption was that once we've spilled, we'll stop using `data` and only aggregate into `pdata`, given that we clear `data` in the first branch of `_spill`. Why has this assumption changed here? It looks like we do end up writing to `data` again inside of this `_external_items` method, but then we end up clearing `data` at the end after the iterator has been consumed. Was this change necessary in order to support iterating multiple times over the merged result? Just want to double-check my understanding here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/1977#discussion_r28003896 --- Diff: python/pyspark/shuffle.py --- @@ -367,32 +372,13 @@ def iteritems(self): def _external_items(self): Return all partitioned items as iterator -assert not self.data if any(self.pdata): self._spill() -hard_limit = self._next_limit() +self.pdata = [] --- End diff -- It looks like the old code didn't clear `pdata` after spilling, but I'm guessing that wasn't a problem because we didn't iterate multiple times before? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user JoshRosen commented on the pull request: https://github.com/apache/spark/pull/1977#issuecomment-91011683 I spent most of the morning looking this over again and the patch looks pretty good to me. I think I understand the lifecycle of values pretty well. I left a couple of questions / comments upthread, but overall I think this is pretty close to being ready to merge. One quick question: when do groupBy's spill files get cleaned up? It seems like we want them to be cleaned at the end of the task, since at that point we know for sure that they won't be re-used. To handle this case, what do you think about adding a cleanup hook mechanism that allows us to register cleanup code to run at the end of a task? In the past, we could have relied on shutdown hooks for this, but that's no longer possible due to worker re-use. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/1977#discussion_r27994956 --- Diff: python/pyspark/shuffle.py --- @@ -529,6 +520,295 @@ def sorted(self, iterator, key=None, reverse=False): return heapq.merge(chunks, key=key, reverse=reverse) +class ExternalList(object): + +ExternalList can have many items which cannot be hold in memory in +the same time. + + l = ExternalList(range(100)) + len(l) +100 + l.append(10) + len(l) +101 + for i in range(10240): +... l.append(i) + len(l) +10341 + import pickle + l2 = pickle.loads(pickle.dumps(l)) + len(l2) +10341 + list(l2)[100] +10 + +LIMIT = 10240 + +def __init__(self, values): +self.values = values +self.disk_count = 0 +self._file = None +self._ser = None + +def __getstate__(self): +if self._file is not None: +self._file.flush() +f = os.fdopen(os.dup(self._file.fileno())) +f.seek(0) +bytes = f.read() --- End diff -- I think `bytes` is a reserved word in Python, so maybe we should call this `_bytes` instead. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/1977#discussion_r27995826 --- Diff: python/pyspark/shuffle.py --- @@ -529,6 +520,295 @@ def sorted(self, iterator, key=None, reverse=False): return heapq.merge(chunks, key=key, reverse=reverse) +class ExternalList(object): + +ExternalList can have many items which cannot be hold in memory in +the same time. + + l = ExternalList(range(100)) + len(l) +100 + l.append(10) + len(l) +101 + for i in range(10240): +... l.append(i) + len(l) +10341 + import pickle + l2 = pickle.loads(pickle.dumps(l)) + len(l2) +10341 + list(l2)[100] +10 + +LIMIT = 10240 + +def __init__(self, values): +self.values = values +self.disk_count = 0 +self._file = None +self._ser = None + +def __getstate__(self): +if self._file is not None: +self._file.flush() +f = os.fdopen(os.dup(self._file.fileno())) +f.seek(0) +bytes = f.read() +else: +bytes = '' +return self.values, self.disk_count, bytes + +def __setstate__(self, item): +self.values, self.disk_count, bytes = item +if bytes: +self._open_file() +self._file.write(bytes) +else: +self._file = None +self._ser = None + +def __iter__(self): +if self._file is not None: +self._file.flush() +# read all items from disks first +with os.fdopen(os.dup(self._file.fileno()), 'r') as f: +f.seek(0) +for values in self._ser.load_stream(f): +for v in values: +yield v + +for v in self.values: +yield v + +def __len__(self): +return self.disk_count + len(self.values) + +def append(self, value): +self.values.append(value) +# dump them into disk if the key is huge +if len(self.values) = self.LIMIT: +self._spill() + +def _open_file(self): +dirs = _get_local_dirs(objects) +d = dirs[id(self) % len(dirs)] +if not os.path.exists(d): +os.makedirs(d) +p = os.path.join(d, str(id)) +self._file = open(p, w+, 65536) +self._ser = CompressedSerializer(PickleSerializer()) +os.unlink(p) --- End diff -- Why do we need this `unlink` call here? Is this to delete the file if it already exists? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/1977#discussion_r28027864 --- Diff: python/pyspark/shuffle.py --- @@ -529,6 +522,322 @@ def sorted(self, iterator, key=None, reverse=False): return heapq.merge(chunks, key=key, reverse=reverse) +class ExternalList(object): + +ExternalList can have many items which cannot be hold in memory in +the same time. + + l = ExternalList(range(100)) + len(l) +100 + l.append(10) + len(l) +101 + for i in range(10240): +... l.append(i) + len(l) +10341 + import pickle + l2 = pickle.loads(pickle.dumps(l)) + len(l2) +10341 + list(l2)[100] +10 + +LIMIT = 10240 + +def __init__(self, values): +self.values = values +self.count = len(values) +self._file = None +self._ser = None + +def __getstate__(self): +if self._file is not None: +self._file.flush() +f = os.fdopen(os.dup(self._file.fileno())) +f.seek(0) +serialized = f.read() +else: +serialized = '' +return self.values, self.count, serialized + +def __setstate__(self, item): +self.values, self.count, serialized = item +if serialized: +self._open_file() +self._file.write(serialized) +else: +self._file = None +self._ser = None + +def __iter__(self): +if self._file is not None: +self._file.flush() +# read all items from disks first +with os.fdopen(os.dup(self._file.fileno()), 'r') as f: +f.seek(0) +for values in self._ser.load_stream(f): +for v in values: --- End diff -- I understand now that this is just unwrapping the list that we wrapped around `self.values` in the `_spill` call down on line 615, but this was a bit confusing to me. If we need to keep this and the wrapping, could you add comments to explain it? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/1977#discussion_r28032966 --- Diff: python/pyspark/shuffle.py --- @@ -529,6 +522,322 @@ def sorted(self, iterator, key=None, reverse=False): return heapq.merge(chunks, key=key, reverse=reverse) +class ExternalList(object): + +ExternalList can have many items which cannot be hold in memory in +the same time. + + l = ExternalList(range(100)) + len(l) +100 + l.append(10) + len(l) +101 + for i in range(10240): +... l.append(i) + len(l) +10341 + import pickle + l2 = pickle.loads(pickle.dumps(l)) + len(l2) +10341 + list(l2)[100] +10 + +LIMIT = 10240 + +def __init__(self, values): +self.values = values +self.count = len(values) +self._file = None +self._ser = None + +def __getstate__(self): +if self._file is not None: +self._file.flush() +f = os.fdopen(os.dup(self._file.fileno())) +f.seek(0) +serialized = f.read() +else: +serialized = '' +return self.values, self.count, serialized + +def __setstate__(self, item): +self.values, self.count, serialized = item +if serialized: +self._open_file() +self._file.write(serialized) +else: +self._file = None +self._ser = None + +def __iter__(self): +if self._file is not None: +self._file.flush() +# read all items from disks first +with os.fdopen(os.dup(self._file.fileno()), 'r') as f: +f.seek(0) +for values in self._ser.load_stream(f): +for v in values: +yield v + +for v in self.values: +yield v + +def __len__(self): +return self.count + +def append(self, value): +self.values.append(value) +self.count += 1 +# dump them into disk if the key is huge +if len(self.values) = self.LIMIT: +self._spill() + +def _open_file(self): +dirs = _get_local_dirs(objects) +d = dirs[id(self) % len(dirs)] +if not os.path.exists(d): +os.makedirs(d) +p = os.path.join(d, str(id)) +self._file = open(p, w+, 65536) +self._ser = CompressedSerializer(PickleSerializer()) +os.unlink(p) + +def _spill(self): + dump the values into disk +global MemoryBytesSpilled, DiskBytesSpilled +if self._file is None: +self._open_file() + +used_memory = get_used_memory() +pos = self._file.tell() +self._ser.dump_stream([self.values], self._file) +self.values = [] +gc.collect() +DiskBytesSpilled += self._file.tell() - pos +MemoryBytesSpilled += (used_memory - get_used_memory()) 20 + + +class ExternalListOfList(ExternalList): + +An external list for list. + +def __init__(self, values): +ExternalList.__init__(self, values) +self.count = sum(len(i) for i in values) + +def append(self, value): +ExternalList.append(self, value) --- End diff -- I can change the __iter__ of ExternalListOfList, then we do not need ChainedIterator anymore. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/1977#discussion_r28027636 --- Diff: python/pyspark/shuffle.py --- @@ -529,6 +522,322 @@ def sorted(self, iterator, key=None, reverse=False): return heapq.merge(chunks, key=key, reverse=reverse) +class ExternalList(object): + +ExternalList can have many items which cannot be hold in memory in +the same time. + + l = ExternalList(range(100)) + len(l) +100 + l.append(10) + len(l) +101 + for i in range(10240): +... l.append(i) + len(l) +10341 + import pickle + l2 = pickle.loads(pickle.dumps(l)) + len(l2) +10341 + list(l2)[100] +10 + +LIMIT = 10240 + +def __init__(self, values): +self.values = values +self.count = len(values) +self._file = None +self._ser = None + +def __getstate__(self): +if self._file is not None: +self._file.flush() +f = os.fdopen(os.dup(self._file.fileno())) +f.seek(0) +serialized = f.read() +else: +serialized = '' +return self.values, self.count, serialized + +def __setstate__(self, item): +self.values, self.count, serialized = item +if serialized: +self._open_file() +self._file.write(serialized) +else: +self._file = None +self._ser = None + +def __iter__(self): +if self._file is not None: +self._file.flush() +# read all items from disks first +with os.fdopen(os.dup(self._file.fileno()), 'r') as f: +f.seek(0) +for values in self._ser.load_stream(f): +for v in values: +yield v + +for v in self.values: +yield v + +def __len__(self): +return self.count + +def append(self, value): +self.values.append(value) +self.count += 1 +# dump them into disk if the key is huge +if len(self.values) = self.LIMIT: +self._spill() + +def _open_file(self): +dirs = _get_local_dirs(objects) +d = dirs[id(self) % len(dirs)] +if not os.path.exists(d): +os.makedirs(d) +p = os.path.join(d, str(id)) +self._file = open(p, w+, 65536) +self._ser = CompressedSerializer(PickleSerializer()) +os.unlink(p) + +def _spill(self): + dump the values into disk +global MemoryBytesSpilled, DiskBytesSpilled +if self._file is None: +self._open_file() + +used_memory = get_used_memory() +pos = self._file.tell() +self._ser.dump_stream([self.values], self._file) +self.values = [] +gc.collect() +DiskBytesSpilled += self._file.tell() - pos +MemoryBytesSpilled += (used_memory - get_used_memory()) 20 + + +class ExternalListOfList(ExternalList): + +An external list for list. + +def __init__(self, values): +ExternalList.__init__(self, values) +self.count = sum(len(i) for i in values) + +def append(self, value): +ExternalList.append(self, value) +# already counted 1 in ExternalList.append +self.count += len(value) - 1 + + +class GroupByKey(object): --- End diff -- It looks like we only directly use `GroupByKey` in tests, while the actual shuffle code only uses `GroupListsByKey`. Is this intentional? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user davies commented on the pull request: https://github.com/apache/spark/pull/1977#issuecomment-91037582 @JoshRosen Thanks for review this, this may be the most complicated part in PySpark. :( For partitioned file, they will be cleaned up after merging, partition by partition. The sorted file and file for large list (ExternalList) will be cleaned up once it's closed by GC (os.unlink() after open it). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1977#issuecomment-91121784 [Test build #29925 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29925/consoleFull) for PR 1977 at commit [`67772dd`](https://github.com/apache/spark/commit/67772dd9e5fc2157f1da960aebfa82b133615527). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/1977#discussion_r28032910 --- Diff: python/pyspark/shuffle.py --- @@ -529,6 +522,322 @@ def sorted(self, iterator, key=None, reverse=False): return heapq.merge(chunks, key=key, reverse=reverse) +class ExternalList(object): + +ExternalList can have many items which cannot be hold in memory in +the same time. + + l = ExternalList(range(100)) + len(l) +100 + l.append(10) + len(l) +101 + for i in range(10240): +... l.append(i) + len(l) +10341 + import pickle + l2 = pickle.loads(pickle.dumps(l)) + len(l2) +10341 + list(l2)[100] +10 + +LIMIT = 10240 + +def __init__(self, values): +self.values = values +self.count = len(values) +self._file = None +self._ser = None + +def __getstate__(self): +if self._file is not None: +self._file.flush() +f = os.fdopen(os.dup(self._file.fileno())) +f.seek(0) +serialized = f.read() +else: +serialized = '' +return self.values, self.count, serialized + +def __setstate__(self, item): +self.values, self.count, serialized = item +if serialized: +self._open_file() +self._file.write(serialized) +else: +self._file = None +self._ser = None + +def __iter__(self): +if self._file is not None: +self._file.flush() +# read all items from disks first +with os.fdopen(os.dup(self._file.fileno()), 'r') as f: +f.seek(0) +for values in self._ser.load_stream(f): +for v in values: +yield v + +for v in self.values: +yield v + +def __len__(self): +return self.count + +def append(self, value): +self.values.append(value) +self.count += 1 +# dump them into disk if the key is huge +if len(self.values) = self.LIMIT: +self._spill() + +def _open_file(self): +dirs = _get_local_dirs(objects) +d = dirs[id(self) % len(dirs)] +if not os.path.exists(d): +os.makedirs(d) +p = os.path.join(d, str(id)) +self._file = open(p, w+, 65536) +self._ser = CompressedSerializer(PickleSerializer()) +os.unlink(p) + +def _spill(self): + dump the values into disk +global MemoryBytesSpilled, DiskBytesSpilled +if self._file is None: +self._open_file() + +used_memory = get_used_memory() +pos = self._file.tell() +self._ser.dump_stream([self.values], self._file) +self.values = [] +gc.collect() +DiskBytesSpilled += self._file.tell() - pos +MemoryBytesSpilled += (used_memory - get_used_memory()) 20 + + +class ExternalListOfList(ExternalList): + +An external list for list. + +def __init__(self, values): +ExternalList.__init__(self, values) +self.count = sum(len(i) for i in values) + +def append(self, value): +ExternalList.append(self, value) +# already counted 1 in ExternalList.append +self.count += len(value) - 1 + + +class GroupByKey(object): --- End diff -- Yes, I'd want to the code and test of ExternalList and GroupByKey could be easy to understand. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user JoshRosen commented on the pull request: https://github.com/apache/spark/pull/1977#issuecomment-91051963 Jenkins, retest this please. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/1977#issuecomment-91050249 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/29891/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/1977#discussion_r28013345 --- Diff: python/pyspark/shuffle.py --- @@ -529,6 +520,295 @@ def sorted(self, iterator, key=None, reverse=False): return heapq.merge(chunks, key=key, reverse=reverse) +class ExternalList(object): + +ExternalList can have many items which cannot be hold in memory in +the same time. + + l = ExternalList(range(100)) + len(l) +100 + l.append(10) + len(l) +101 + for i in range(10240): +... l.append(i) + len(l) +10341 + import pickle + l2 = pickle.loads(pickle.dumps(l)) + len(l2) +10341 + list(l2)[100] +10 + +LIMIT = 10240 + +def __init__(self, values): +self.values = values +self.disk_count = 0 +self._file = None +self._ser = None + +def __getstate__(self): +if self._file is not None: +self._file.flush() +f = os.fdopen(os.dup(self._file.fileno())) +f.seek(0) +bytes = f.read() +else: +bytes = '' +return self.values, self.disk_count, bytes + +def __setstate__(self, item): +self.values, self.disk_count, bytes = item +if bytes: +self._open_file() +self._file.write(bytes) +else: +self._file = None +self._ser = None + +def __iter__(self): +if self._file is not None: +self._file.flush() +# read all items from disks first +with os.fdopen(os.dup(self._file.fileno()), 'r') as f: +f.seek(0) +for values in self._ser.load_stream(f): +for v in values: +yield v + +for v in self.values: +yield v + +def __len__(self): +return self.disk_count + len(self.values) + +def append(self, value): +self.values.append(value) +# dump them into disk if the key is huge +if len(self.values) = self.LIMIT: +self._spill() + +def _open_file(self): +dirs = _get_local_dirs(objects) +d = dirs[id(self) % len(dirs)] +if not os.path.exists(d): +os.makedirs(d) +p = os.path.join(d, str(id)) +self._file = open(p, w+, 65536) +self._ser = CompressedSerializer(PickleSerializer()) +os.unlink(p) --- End diff -- Ah, I see. From the [unlink](http://pubs.opengroup.org/onlinepubs/009695399/functions/unlink.html) spec: When the file's link count becomes 0 and no process has the file open, the space occupied by the file shall be freed and the file shall no longer be accessible. If one or more processes have the file open when the last link is removed, the link shall be removed before unlink() returns, but the removal of the file contents shall be postponed until all references to the file are closed. This is a very clever trick! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1977#issuecomment-91052717 [Test build #29895 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29895/consoleFull) for PR 1977 at commit [`0dcf320`](https://github.com/apache/spark/commit/0dcf3208220a46cead751aacee7e7e3e22a5132b). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user davies commented on the pull request: https://github.com/apache/spark/pull/1977#issuecomment-91060947 @JoshRosen the last comments had been addressed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/1977#discussion_r28019740 --- Diff: python/pyspark/shuffle.py --- @@ -529,6 +522,301 @@ def sorted(self, iterator, key=None, reverse=False): return heapq.merge(chunks, key=key, reverse=reverse) +class ExternalList(object): + +ExternalList can have many items which cannot be hold in memory in +the same time. + + l = ExternalList(range(100)) + len(l) +100 + l.append(10) + len(l) +101 + for i in range(10240): +... l.append(i) + len(l) +10341 + import pickle + l2 = pickle.loads(pickle.dumps(l)) + len(l2) +10341 + list(l2)[100] +10 + +LIMIT = 10240 + +def __init__(self, values): +self.values = values +if values and isinstance(values[0], list): +self.count = sum(len(i) for i in values) +else: +self.count = len(values) +self._file = None +self._ser = None + +def __getstate__(self): +if self._file is not None: +self._file.flush() +f = os.fdopen(os.dup(self._file.fileno())) +f.seek(0) +serialized = f.read() +else: +serialized = '' +return self.values, self.count, serialized + +def __setstate__(self, item): +self.values, self.count, serialized = item +if serialized: +self._open_file() +self._file.write(serialized) +else: +self._file = None +self._ser = None + +def __iter__(self): +if self._file is not None: +self._file.flush() +# read all items from disks first +with os.fdopen(os.dup(self._file.fileno()), 'r') as f: +f.seek(0) +for values in self._ser.load_stream(f): +for v in values: +yield v + +for v in self.values: +yield v + +def __len__(self): +return self.count + +def append(self, value): +self.values.append(value) +self.count += len(value) if isinstance(value, list) else 1 --- End diff -- Why do we add the length of `value` if it's a list? This would make sense for `extend()`, but I find it a bit confusing for `append()`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1977#issuecomment-91069245 [Test build #29895 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29895/consoleFull) for PR 1977 at commit [`0dcf320`](https://github.com/apache/spark/commit/0dcf3208220a46cead751aacee7e7e3e22a5132b). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `class FlattenedValuesSerializer(BatchedSerializer):` * `class ExternalList(object):` * `class GroupByKey(object):` * `class ChainedIterable(object):` * `class ExternalGroupBy(ExternalMerger):` * This patch does not change any dependencies. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/1977#issuecomment-91069251 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/29895/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/1977#discussion_r28027737 --- Diff: python/pyspark/shuffle.py --- @@ -529,6 +522,322 @@ def sorted(self, iterator, key=None, reverse=False): return heapq.merge(chunks, key=key, reverse=reverse) +class ExternalList(object): + +ExternalList can have many items which cannot be hold in memory in +the same time. + + l = ExternalList(range(100)) + len(l) +100 + l.append(10) + len(l) +101 + for i in range(10240): +... l.append(i) + len(l) +10341 + import pickle + l2 = pickle.loads(pickle.dumps(l)) + len(l2) +10341 + list(l2)[100] +10 + +LIMIT = 10240 + +def __init__(self, values): +self.values = values +self.count = len(values) +self._file = None +self._ser = None + +def __getstate__(self): +if self._file is not None: +self._file.flush() +f = os.fdopen(os.dup(self._file.fileno())) +f.seek(0) +serialized = f.read() +else: +serialized = '' +return self.values, self.count, serialized + +def __setstate__(self, item): +self.values, self.count, serialized = item +if serialized: +self._open_file() +self._file.write(serialized) +else: +self._file = None +self._ser = None + +def __iter__(self): +if self._file is not None: +self._file.flush() +# read all items from disks first +with os.fdopen(os.dup(self._file.fileno()), 'r') as f: +f.seek(0) +for values in self._ser.load_stream(f): +for v in values: +yield v + +for v in self.values: +yield v + +def __len__(self): +return self.count + +def append(self, value): +self.values.append(value) +self.count += 1 +# dump them into disk if the key is huge +if len(self.values) = self.LIMIT: +self._spill() + +def _open_file(self): +dirs = _get_local_dirs(objects) +d = dirs[id(self) % len(dirs)] +if not os.path.exists(d): +os.makedirs(d) +p = os.path.join(d, str(id)) +self._file = open(p, w+, 65536) +self._ser = CompressedSerializer(PickleSerializer()) +os.unlink(p) + +def _spill(self): + dump the values into disk +global MemoryBytesSpilled, DiskBytesSpilled +if self._file is None: +self._open_file() + +used_memory = get_used_memory() +pos = self._file.tell() +self._ser.dump_stream([self.values], self._file) +self.values = [] +gc.collect() +DiskBytesSpilled += self._file.tell() - pos +MemoryBytesSpilled += (used_memory - get_used_memory()) 20 + + +class ExternalListOfList(ExternalList): + +An external list for list. + +def __init__(self, values): +ExternalList.__init__(self, values) +self.count = sum(len(i) for i in values) + +def append(self, value): +ExternalList.append(self, value) --- End diff -- I'm still a bit confused by the contract for this class. If `count = N` and __len__ is defined as `self.count`, then I'd expect that our iterator would yield exactly `N` items. It seems like this contract is either violated by this class or its superclass. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user davies commented on the pull request: https://github.com/apache/spark/pull/1977#issuecomment-91038005 @JoshRosen Also, I had rollback the changes in ResultIterable (because some one is using ResultIterable.maxindex), and improve the performance of len(ResultIterable) for ExternalList, please take another look. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/1977#issuecomment-91077067 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/29900/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/1977#discussion_r28033500 --- Diff: python/pyspark/shuffle.py --- @@ -529,6 +522,322 @@ def sorted(self, iterator, key=None, reverse=False): return heapq.merge(chunks, key=key, reverse=reverse) +class ExternalList(object): + +ExternalList can have many items which cannot be hold in memory in +the same time. + + l = ExternalList(range(100)) + len(l) +100 + l.append(10) + len(l) +101 + for i in range(10240): +... l.append(i) + len(l) +10341 + import pickle + l2 = pickle.loads(pickle.dumps(l)) + len(l2) +10341 + list(l2)[100] +10 + +LIMIT = 10240 + +def __init__(self, values): +self.values = values +self.count = len(values) +self._file = None +self._ser = None + +def __getstate__(self): +if self._file is not None: +self._file.flush() +f = os.fdopen(os.dup(self._file.fileno())) +f.seek(0) +serialized = f.read() +else: +serialized = '' +return self.values, self.count, serialized + +def __setstate__(self, item): +self.values, self.count, serialized = item +if serialized: +self._open_file() +self._file.write(serialized) +else: +self._file = None +self._ser = None + +def __iter__(self): +if self._file is not None: +self._file.flush() +# read all items from disks first +with os.fdopen(os.dup(self._file.fileno()), 'r') as f: +f.seek(0) +for values in self._ser.load_stream(f): +for v in values: +yield v + +for v in self.values: +yield v + +def __len__(self): +return self.count + +def append(self, value): +self.values.append(value) +self.count += 1 +# dump them into disk if the key is huge +if len(self.values) = self.LIMIT: +self._spill() + +def _open_file(self): +dirs = _get_local_dirs(objects) +d = dirs[id(self) % len(dirs)] +if not os.path.exists(d): +os.makedirs(d) +p = os.path.join(d, str(id)) +self._file = open(p, w+, 65536) +self._ser = CompressedSerializer(PickleSerializer()) +os.unlink(p) + +def _spill(self): + dump the values into disk +global MemoryBytesSpilled, DiskBytesSpilled +if self._file is None: +self._open_file() + +used_memory = get_used_memory() +pos = self._file.tell() +self._ser.dump_stream([self.values], self._file) +self.values = [] +gc.collect() +DiskBytesSpilled += self._file.tell() - pos +MemoryBytesSpilled += (used_memory - get_used_memory()) 20 + + +class ExternalListOfList(ExternalList): + +An external list for list. + +def __init__(self, values): +ExternalList.__init__(self, values) +self.count = sum(len(i) for i in values) + +def append(self, value): +ExternalList.append(self, value) --- End diff -- done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user davies commented on the pull request: https://github.com/apache/spark/pull/1977#issuecomment-91108456 @JoshRosen Thanks for the comments, it looks better now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/1977#discussion_r28019825 --- Diff: python/pyspark/shuffle.py --- @@ -529,6 +522,301 @@ def sorted(self, iterator, key=None, reverse=False): return heapq.merge(chunks, key=key, reverse=reverse) +class ExternalList(object): + +ExternalList can have many items which cannot be hold in memory in +the same time. + + l = ExternalList(range(100)) + len(l) +100 + l.append(10) + len(l) +101 + for i in range(10240): +... l.append(i) + len(l) +10341 + import pickle + l2 = pickle.loads(pickle.dumps(l)) + len(l2) +10341 + list(l2)[100] +10 + +LIMIT = 10240 + +def __init__(self, values): +self.values = values +if values and isinstance(values[0], list): +self.count = sum(len(i) for i in values) +else: +self.count = len(values) +self._file = None +self._ser = None + +def __getstate__(self): +if self._file is not None: +self._file.flush() +f = os.fdopen(os.dup(self._file.fileno())) +f.seek(0) +serialized = f.read() +else: +serialized = '' +return self.values, self.count, serialized + +def __setstate__(self, item): +self.values, self.count, serialized = item +if serialized: +self._open_file() +self._file.write(serialized) +else: +self._file = None +self._ser = None + +def __iter__(self): +if self._file is not None: +self._file.flush() +# read all items from disks first +with os.fdopen(os.dup(self._file.fileno()), 'r') as f: +f.seek(0) +for values in self._ser.load_stream(f): +for v in values: +yield v + +for v in self.values: +yield v + +def __len__(self): +return self.count + +def append(self, value): +self.values.append(value) +self.count += len(value) if isinstance(value, list) else 1 +# dump them into disk if the key is huge +if len(self.values) = self.LIMIT: +self._spill() + +def _open_file(self): +dirs = _get_local_dirs(objects) +d = dirs[id(self) % len(dirs)] +if not os.path.exists(d): +os.makedirs(d) +p = os.path.join(d, str(id)) +self._file = open(p, w+, 65536) +self._ser = CompressedSerializer(PickleSerializer()) +os.unlink(p) + +def _spill(self): + dump the values into disk +global MemoryBytesSpilled, DiskBytesSpilled +if self._file is None: +self._open_file() + +used_memory = get_used_memory() +pos = self._file.tell() +self._ser.dump_stream([self.values], self._file) +self.values = [] +gc.collect() +DiskBytesSpilled += self._file.tell() - pos +MemoryBytesSpilled += (used_memory - get_used_memory()) 20 + + +class GroupByKey(object): + +group a sorted iterator into [(k1, it1), (k2, it2), ...] + + k = [i/3 for i in range(6)] + v = [i for i in range(6)] + g = GroupByKey(iter(zip(k, v))) + [(k, list(it)) for k, it in g] +[(0, [0, 1, 2]), (1, [3, 4, 5])] + +def __init__(self, iterator): +self.iterator = iterator +self.next_item = None + +def __iter__(self): +return self + +def next(self): +key, value = self.next_item if self.next_item else next(self.iterator) +values = ExternalList([value]) +try: +while True: +k, v = next(self.iterator) +if k != key: +self.next_item = (k, v) +break +values.append(v) +except StopIteration: +self.next_item = None +return key, values + + +class ChainedIterable(object): + +Picklable chained iterator, similar to itertools.chain.from_iterable() + +def __init__(self, iterators): +self.iterators = iterators + +def __len__(self): +try: +return len(self.iterators) +except: --- End diff -- Can this be `except TypeError` instead?
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1977#issuecomment-91115066 [Test build #29921 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29921/consoleFull) for PR 1977 at commit [`e78c15c`](https://github.com/apache/spark/commit/e78c15c41ff0c1cf393bb6e65eb513ebd8357c75). * This patch **fails PySpark unit tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `class FlattenedValuesSerializer(BatchedSerializer):` * `class ExternalList(object):` * `class ExternalListOfList(ExternalList):` * `class GroupByKey(object):` * `class ExternalGroupBy(ExternalMerger):` * This patch does not change any dependencies. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/1977#issuecomment-91115081 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/29921/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1977#issuecomment-91036610 [Test build #29891 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29891/consoleFull) for PR 1977 at commit [`0dcf320`](https://github.com/apache/spark/commit/0dcf3208220a46cead751aacee7e7e3e22a5132b). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1977#issuecomment-91062845 [Test build #29900 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29900/consoleFull) for PR 1977 at commit [`0b0fde8`](https://github.com/apache/spark/commit/0b0fde8b568141b03d7ba8b7d74e5b885dcb4557). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1977#issuecomment-91108728 [Test build #29921 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29921/consoleFull) for PR 1977 at commit [`e78c15c`](https://github.com/apache/spark/commit/e78c15c41ff0c1cf393bb6e65eb513ebd8357c75). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/1977#discussion_r28028089 --- Diff: python/pyspark/shuffle.py --- @@ -529,6 +522,322 @@ def sorted(self, iterator, key=None, reverse=False): return heapq.merge(chunks, key=key, reverse=reverse) +class ExternalList(object): + +ExternalList can have many items which cannot be hold in memory in +the same time. + + l = ExternalList(range(100)) + len(l) +100 + l.append(10) + len(l) +101 + for i in range(10240): +... l.append(i) + len(l) +10341 + import pickle + l2 = pickle.loads(pickle.dumps(l)) + len(l2) +10341 + list(l2)[100] +10 + +LIMIT = 10240 + +def __init__(self, values): +self.values = values +self.count = len(values) +self._file = None +self._ser = None + +def __getstate__(self): +if self._file is not None: +self._file.flush() +f = os.fdopen(os.dup(self._file.fileno())) +f.seek(0) +serialized = f.read() +else: +serialized = '' +return self.values, self.count, serialized + +def __setstate__(self, item): +self.values, self.count, serialized = item +if serialized: +self._open_file() +self._file.write(serialized) +else: +self._file = None +self._ser = None + +def __iter__(self): +if self._file is not None: +self._file.flush() +# read all items from disks first +with os.fdopen(os.dup(self._file.fileno()), 'r') as f: +f.seek(0) +for values in self._ser.load_stream(f): +for v in values: +yield v + +for v in self.values: +yield v + +def __len__(self): +return self.count + +def append(self, value): +self.values.append(value) +self.count += 1 +# dump them into disk if the key is huge +if len(self.values) = self.LIMIT: +self._spill() + +def _open_file(self): +dirs = _get_local_dirs(objects) +d = dirs[id(self) % len(dirs)] +if not os.path.exists(d): +os.makedirs(d) +p = os.path.join(d, str(id)) +self._file = open(p, w+, 65536) +self._ser = CompressedSerializer(PickleSerializer()) +os.unlink(p) + +def _spill(self): + dump the values into disk +global MemoryBytesSpilled, DiskBytesSpilled +if self._file is None: +self._open_file() + +used_memory = get_used_memory() +pos = self._file.tell() +self._ser.dump_stream([self.values], self._file) --- End diff -- Aha, I see that this because we don't use a batching serializer. This makes sense now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/1977#discussion_r28027829 --- Diff: python/pyspark/shuffle.py --- @@ -529,6 +522,322 @@ def sorted(self, iterator, key=None, reverse=False): return heapq.merge(chunks, key=key, reverse=reverse) +class ExternalList(object): + +ExternalList can have many items which cannot be hold in memory in +the same time. + + l = ExternalList(range(100)) + len(l) +100 + l.append(10) + len(l) +101 + for i in range(10240): +... l.append(i) + len(l) +10341 + import pickle + l2 = pickle.loads(pickle.dumps(l)) + len(l2) +10341 + list(l2)[100] +10 + +LIMIT = 10240 + +def __init__(self, values): +self.values = values +self.count = len(values) +self._file = None +self._ser = None + +def __getstate__(self): +if self._file is not None: +self._file.flush() +f = os.fdopen(os.dup(self._file.fileno())) +f.seek(0) +serialized = f.read() +else: +serialized = '' +return self.values, self.count, serialized + +def __setstate__(self, item): +self.values, self.count, serialized = item +if serialized: +self._open_file() +self._file.write(serialized) +else: +self._file = None +self._ser = None + +def __iter__(self): +if self._file is not None: +self._file.flush() +# read all items from disks first +with os.fdopen(os.dup(self._file.fileno()), 'r') as f: +f.seek(0) +for values in self._ser.load_stream(f): +for v in values: +yield v + +for v in self.values: +yield v + +def __len__(self): +return self.count + +def append(self, value): +self.values.append(value) +self.count += 1 +# dump them into disk if the key is huge +if len(self.values) = self.LIMIT: +self._spill() + +def _open_file(self): +dirs = _get_local_dirs(objects) +d = dirs[id(self) % len(dirs)] +if not os.path.exists(d): +os.makedirs(d) +p = os.path.join(d, str(id)) +self._file = open(p, w+, 65536) +self._ser = CompressedSerializer(PickleSerializer()) +os.unlink(p) + +def _spill(self): + dump the values into disk +global MemoryBytesSpilled, DiskBytesSpilled +if self._file is None: +self._open_file() + +used_memory = get_used_memory() +pos = self._file.tell() +self._ser.dump_stream([self.values], self._file) --- End diff -- If `self.values` is a collection, why do we need to wrap it into a list prior to dumping it? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1977#issuecomment-91050238 [Test build #29891 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29891/consoleFull) for PR 1977 at commit [`0dcf320`](https://github.com/apache/spark/commit/0dcf3208220a46cead751aacee7e7e3e22a5132b). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `class FlattenedValuesSerializer(BatchedSerializer):` * `class ExternalList(object):` * `class GroupByKey(object):` * `class ChainedIterable(object):` * `class ExternalGroupBy(ExternalMerger):` * This patch does not change any dependencies. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1977#issuecomment-91077061 [Test build #29900 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29900/consoleFull) for PR 1977 at commit [`0b0fde8`](https://github.com/apache/spark/commit/0b0fde8b568141b03d7ba8b7d74e5b885dcb4557). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `class FlattenedValuesSerializer(BatchedSerializer):` * `class ExternalList(object):` * `class ExternalListOfList(ExternalList):` * `class GroupByKey(object):` * `class GroupListsByKey(GroupByKey):` * `class ChainedIterable(object):` * `class ExternalGroupBy(ExternalMerger):` * This patch does not change any dependencies. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user JoshRosen commented on the pull request: https://github.com/apache/spark/pull/1977#issuecomment-91081544 Sorry for my initial confusion regarding the external lists of lists. I think that the `__len__` thing might be an issue if we ever directly expose `ExternalListOfList` to users, but it looks like we currently only expose it through a `ChainedIterable` in this code, so it doesn't appear to be a problem yet. This still might be worth addressing if you agree that it could help prevent future bugs if we start using this in more places. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/1977#discussion_r28010925 --- Diff: python/pyspark/shuffle.py --- @@ -367,32 +372,13 @@ def iteritems(self): def _external_items(self): Return all partitioned items as iterator -assert not self.data --- End diff -- The assumption is still true, I can add that back. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/1977#discussion_r28009944 --- Diff: python/pyspark/shuffle.py --- @@ -529,6 +520,295 @@ def sorted(self, iterator, key=None, reverse=False): return heapq.merge(chunks, key=key, reverse=reverse) +class ExternalList(object): + +ExternalList can have many items which cannot be hold in memory in +the same time. + + l = ExternalList(range(100)) + len(l) +100 + l.append(10) + len(l) +101 + for i in range(10240): +... l.append(i) + len(l) +10341 + import pickle + l2 = pickle.loads(pickle.dumps(l)) + len(l2) +10341 + list(l2)[100] +10 + +LIMIT = 10240 + +def __init__(self, values): +self.values = values +self.disk_count = 0 +self._file = None +self._ser = None + +def __getstate__(self): +if self._file is not None: +self._file.flush() +f = os.fdopen(os.dup(self._file.fileno())) +f.seek(0) +bytes = f.read() +else: +bytes = '' +return self.values, self.disk_count, bytes + +def __setstate__(self, item): +self.values, self.disk_count, bytes = item +if bytes: +self._open_file() +self._file.write(bytes) +else: +self._file = None +self._ser = None + +def __iter__(self): +if self._file is not None: +self._file.flush() +# read all items from disks first +with os.fdopen(os.dup(self._file.fileno()), 'r') as f: +f.seek(0) +for values in self._ser.load_stream(f): +for v in values: +yield v + +for v in self.values: +yield v + +def __len__(self): +return self.disk_count + len(self.values) + +def append(self, value): +self.values.append(value) +# dump them into disk if the key is huge +if len(self.values) = self.LIMIT: +self._spill() + +def _open_file(self): +dirs = _get_local_dirs(objects) +d = dirs[id(self) % len(dirs)] +if not os.path.exists(d): +os.makedirs(d) +p = os.path.join(d, str(id)) +self._file = open(p, w+, 65536) +self._ser = CompressedSerializer(PickleSerializer()) +os.unlink(p) --- End diff -- We want to remove the file once it's closed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1977#issuecomment-90697704 [Test build #29802 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29802/consoleFull) for PR 1977 at commit [`2a1857a`](https://github.com/apache/spark/commit/2a1857a4c1d72bee5a93ffbe1da013ffc7ff673b). * This patch **fails PySpark unit tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `class ResultIterable(object):` * `class FlattedValuesSerializer(BatchedSerializer):` * `class ExternalList(object):` * `class GroupByKey(object):` * `class ChainedIterable(object):` * `class ExternalGroupBy(ExternalMerger):` * This patch does not change any dependencies. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/1977#issuecomment-90697722 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/29802/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1977#issuecomment-90670585 [Test build #29802 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29802/consoleFull) for PR 1977 at commit [`2a1857a`](https://github.com/apache/spark/commit/2a1857a4c1d72bee5a93ffbe1da013ffc7ff673b). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1977#issuecomment-90717977 [Test build #637 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/637/consoleFull) for PR 1977 at commit [`2a1857a`](https://github.com/apache/spark/commit/2a1857a4c1d72bee5a93ffbe1da013ffc7ff673b). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1977#issuecomment-90718319 [Test build #638 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/638/consoleFull) for PR 1977 at commit [`2a1857a`](https://github.com/apache/spark/commit/2a1857a4c1d72bee5a93ffbe1da013ffc7ff673b). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1977#issuecomment-90737032 [Test build #638 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/638/consoleFull) for PR 1977 at commit [`2a1857a`](https://github.com/apache/spark/commit/2a1857a4c1d72bee5a93ffbe1da013ffc7ff673b). * This patch **fails PySpark unit tests**. * This patch merges cleanly. * This patch adds no public classes. * This patch does not change any dependencies. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1977#issuecomment-90740407 [Test build #637 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/637/consoleFull) for PR 1977 at commit [`2a1857a`](https://github.com/apache/spark/commit/2a1857a4c1d72bee5a93ffbe1da013ffc7ff673b). * This patch **fails PySpark unit tests**. * This patch merges cleanly. * This patch adds no public classes. * This patch **adds the following new dependencies:** * `activation-1.1.jar` * `aopalliance-1.0.jar` * `avro-1.7.7.jar` * `breeze-macros_2.10-0.11.2.jar` * `breeze_2.10-0.11.2.jar` * `commons-cli-1.2.jar` * `commons-compress-1.4.1.jar` * `commons-io-2.1.jar` * `commons-lang-2.5.jar` * `gmbal-api-only-3.0.0-b023.jar` * `grizzly-framework-2.1.2.jar` * `grizzly-http-2.1.2.jar` * `grizzly-http-server-2.1.2.jar` * `grizzly-http-servlet-2.1.2.jar` * `grizzly-rcm-2.1.2.jar` * `guice-3.0.jar` * `hadoop-annotations-2.2.0.jar` * `hadoop-auth-2.2.0.jar` * `hadoop-client-2.2.0.jar` * `hadoop-common-2.2.0.jar` * `hadoop-hdfs-2.2.0.jar` * `hadoop-mapreduce-client-app-2.2.0.jar` * `hadoop-mapreduce-client-common-2.2.0.jar` * `hadoop-mapreduce-client-core-2.2.0.jar` * `hadoop-mapreduce-client-jobclient-2.2.0.jar` * `hadoop-mapreduce-client-shuffle-2.2.0.jar` * `hadoop-yarn-api-2.2.0.jar` * `hadoop-yarn-client-2.2.0.jar` * `hadoop-yarn-common-2.2.0.jar` * `hadoop-yarn-server-common-2.2.0.jar` * `jackson-jaxrs-1.8.8.jar` * `jackson-xc-1.8.8.jar` * `javax.inject-1.jar` * `javax.servlet-3.1.jar` * `javax.servlet-api-3.0.1.jar` * `jaxb-api-2.2.2.jar` * `jaxb-impl-2.2.3-1.jar` * `jersey-client-1.9.jar` * `jersey-core-1.9.jar` * `jersey-grizzly2-1.9.jar` * `jersey-guice-1.9.jar` * `jersey-json-1.9.jar` * `jersey-server-1.9.jar` * `jersey-test-framework-core-1.9.jar` * `jersey-test-framework-grizzly2-1.9.jar` * `jettison-1.1.jar` * `jetty-util-6.1.26.jar` * `management-api-3.0.0-b012.jar` * `protobuf-java-2.4.1.jar` * `spark-bagel_2.10-1.4.0-SNAPSHOT.jar` * `spark-catalyst_2.10-1.4.0-SNAPSHOT.jar` * `spark-core_2.10-1.4.0-SNAPSHOT.jar` * `spark-graphx_2.10-1.4.0-SNAPSHOT.jar` * `spark-launcher_2.10-1.4.0-SNAPSHOT.jar` * `spark-mllib_2.10-1.4.0-SNAPSHOT.jar` * `spark-network-common_2.10-1.4.0-SNAPSHOT.jar` * `spark-network-shuffle_2.10-1.4.0-SNAPSHOT.jar` * `spark-repl_2.10-1.4.0-SNAPSHOT.jar` * `spark-sql_2.10-1.4.0-SNAPSHOT.jar` * `spark-streaming_2.10-1.4.0-SNAPSHOT.jar` * `stax-api-1.0.1.jar` * `xz-1.0.jar` * This patch **removes the following dependencies:** * `breeze-macros_2.10-0.11.1.jar` * `breeze_2.10-0.11.1.jar` * `commons-el-1.0.jar` * `commons-io-2.4.jar` * `commons-lang-2.4.jar` * `hadoop-client-1.0.4.jar` * `hadoop-core-1.0.4.jar` * `hsqldb-1.8.0.10.jar` * `jblas-1.2.3.jar` * `spark-bagel_2.10-1.3.0-SNAPSHOT.jar` * `spark-catalyst_2.10-1.3.0-SNAPSHOT.jar` * `spark-core_2.10-1.3.0-SNAPSHOT.jar` * `spark-graphx_2.10-1.3.0-SNAPSHOT.jar` * `spark-launcher_2.10-1.3.0-SNAPSHOT.jar` * `spark-mllib_2.10-1.3.0-SNAPSHOT.jar` * `spark-network-common_2.10-1.3.0-SNAPSHOT.jar` * `spark-network-shuffle_2.10-1.3.0-SNAPSHOT.jar` * `spark-repl_2.10-1.3.0-SNAPSHOT.jar` * `spark-sql_2.10-1.3.0-SNAPSHOT.jar` * `spark-streaming_2.10-1.3.0-SNAPSHOT.jar` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1977#issuecomment-90750781 [Test build #29816 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29816/consoleFull) for PR 1977 at commit [`e3b8eab`](https://github.com/apache/spark/commit/e3b8eab0677d39eed0476f5cf1a9f3e4ad963bdc). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1977#issuecomment-90766722 [Test build #29816 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29816/consoleFull) for PR 1977 at commit [`e3b8eab`](https://github.com/apache/spark/commit/e3b8eab0677d39eed0476f5cf1a9f3e4ad963bdc). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds no public classes. * This patch does not change any dependencies. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/1977#issuecomment-90766727 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/29816/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1977#issuecomment-90293354 **[Test build #29757 timed out](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29757/consoleFull)** for PR 1977 at commit [`d9589ab`](https://github.com/apache/spark/commit/d9589ab0dbcd6aa443906c3b1ec432d6c68f0587) after a configured wait of `120m`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/1977#issuecomment-90293365 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/29757/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/1977#issuecomment-90288434 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/29756/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1977#issuecomment-90288396 **[Test build #29756 timed out](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29756/consoleFull)** for PR 1977 at commit [`c6a2f8d`](https://github.com/apache/spark/commit/c6a2f8d1ad67d9e403391399a78202bdd1ffe561) after a configured wait of `120m`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1977#issuecomment-90258222 [Test build #29757 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29757/consoleFull) for PR 1977 at commit [`d9589ab`](https://github.com/apache/spark/commit/d9589ab0dbcd6aa443906c3b1ec432d6c68f0587). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1977#issuecomment-90255941 [Test build #29756 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29756/consoleFull) for PR 1977 at commit [`c6a2f8d`](https://github.com/apache/spark/commit/c6a2f8d1ad67d9e403391399a78202bdd1ffe561). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1977#issuecomment-90356172 [Test build #29779 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29779/consoleFull) for PR 1977 at commit [`d2f053b`](https://github.com/apache/spark/commit/d2f053b3eb39ff0c42a3e5df27f0ed4accc1c1d5). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/1977#issuecomment-90371839 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/29779/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1977#issuecomment-90371821 [Test build #29779 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29779/consoleFull) for PR 1977 at commit [`d2f053b`](https://github.com/apache/spark/commit/d2f053b3eb39ff0c42a3e5df27f0ed4accc1c1d5). * This patch **fails PySpark unit tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `class ResultIterable(object):` * `class FlattedValuesSerializer(BatchedSerializer):` * `class ExternalList(object):` * `class GroupByKey(object):` * `class ChainedIterable(object):` * `class ExternalGroupBy(ExternalMerger):` * This patch does not change any dependencies. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user mkhaitman commented on the pull request: https://github.com/apache/spark/pull/1977#issuecomment-86640421 Tried this out with 1.3-rc3 and was getting FetchFailedExceptions while performing a join between two RDDs: org.apache.spark.shuffle.FetchFailedException: Error in opening FileSegmentManagedBuffer{file=/tmp/spark-6b363150-d169-4f3e-b703-98048b3dca7f/spark-edaee223-72e4-47f5-a984-64d25bdc34bd/spark-97f52fa9-3fc6-49f4-b1da-660c08e6d826/blockmgr-18bb9336-ca07-43db-ae49-c4be84846eb3/2c/shuffle_2_11_0.data, offset=1193, length=230} --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user davies commented on the pull request: https://github.com/apache/spark/pull/1977#issuecomment-86686865 Do you hit this error without the patch? I have no idea on why they are related. On Thursday, March 26, 2015 at 10:43 AM, mkhaitman wrote: Tried this out with 1.3-rc3 and was getting FetchFailedExceptions while performing a join between two RDDs: org.apache.spark.shuffle.FetchFailedException: Error in opening FileSegmentManagedBuffer{file=/tmp/spark-6b363150-d169-4f3e-b703-98048b3dca7f/spark-edaee223-72e4-47f5-a984-64d25bdc34bd/spark-97f52fa9-3fc6-49f4-b1da-660c08e6d826/blockmgr-18bb9336-ca07-43db-ae49-c4be84846eb3/2c/shuffle_2_11_0.data, offset=1193, length=230} — Reply to this email directly or view it on GitHub (https://github.com/apache/spark/pull/1977#issuecomment-86640421). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user mkhaitman commented on the pull request: https://github.com/apache/spark/pull/1977#issuecomment-86688264 @davies Sorry, I deleted the comment though you still received the notification. Think it was just a fluke since it didn't happen the second time. Sorry about that! So far looking good on joins. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user mkhaitman commented on a diff in the pull request: https://github.com/apache/spark/pull/1977#discussion_r27223983 --- Diff: python/pyspark/shuffle.py --- @@ -244,72 +258,57 @@ def _next_limit(self): def mergeValues(self, iterator): Combine the items by creator and combiner -iterator = iter(iterator) # speedup attribute lookup creator, comb = self.agg.createCombiner, self.agg.mergeValue -d, c, batch = self.data, 0, self.batch +c, data, pdata, hfun, batch = 0, self.data, self.pdata, self._partition, self.batch +limit = self.memory_limit for k, v in iterator: +d = pdata[hfun(k)] if pdata else data d[k] = comb(d[k], v) if k in d else creator(v) c += 1 -if c % batch == 0 and get_used_memory() self.memory_limit: -self._spill() -self._partitioned_mergeValues(iterator, self._next_limit()) -break +if c = batch: +if get_used_memory() = limit: +self._spill() +limit = self._next_limit() +batch /= 2 +c = 0 +else: +batch *= 1.5 def _partition(self, key): Return the partition for key return hash((key, self._seed)) % self.partitions -def _partitioned_mergeValues(self, iterator, limit=0): - Partition the items by key, then combine them -# speedup attribute lookup -creator, comb = self.agg.createCombiner, self.agg.mergeValue -c, pdata, hfun, batch = 0, self.pdata, self._partition, self.batch - -for k, v in iterator: -d = pdata[hfun(k)] -d[k] = comb(d[k], v) if k in d else creator(v) -if not limit: -continue - -c += 1 -if c % batch == 0 and get_used_memory() limit: -self._spill() -limit = self._next_limit() +def _object_size(self, obj): + How much of memory for this obj, assume that all the objects +consume similar bytes of memory + +return 1 -def mergeCombiners(self, iterator, check=True): +def mergeCombiners(self, iterator, limit=None): Merge (K,V) pair by mergeCombiner -iterator = iter(iterator) +if limit is None: +limit = self.memory_limit # speedup attribute lookup -d, comb, batch = self.data, self.agg.mergeCombiners, self.batch -c = 0 -for k, v in iterator: -d[k] = comb(d[k], v) if k in d else v -if not check: -continue - -c += 1 -if c % batch == 0 and get_used_memory() self.memory_limit: -self._spill() -self._partitioned_mergeCombiners(iterator, self._next_limit()) -break - -def _partitioned_mergeCombiners(self, iterator, limit=0): - Partition the items by key, then merge them -comb, pdata = self.agg.mergeCombiners, self.pdata -c, hfun = 0, self._partition +comb, hfun, objsize = self.agg.mergeCombiners, self._partition, self._object_size +c, data, pdata, batch = 0, self.data, self.pdata, self.batch for k, v in iterator: -d = pdata[hfun(k)] +d = pdata[hfun(k)] if pdata else data d[k] = comb(d[k], v) if k in d else v if not limit: continue -c += 1 -if c % self.batch == 0 and get_used_memory() limit: -self._spill() -limit = self._next_limit() +c += objsize(v) +if c batch: +if get_used_memory() limit: +self._spill() +limit = self._next_limit() +batch /= 2 +c = 0 +else: +batch *= 1.5 --- End diff -- Would it make sense to do a final memory check after the for loop, in case we just added another 999 items (worst case) without spilling any of it to disk? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user mkhaitman commented on a diff in the pull request: https://github.com/apache/spark/pull/1977#discussion_r27223759 --- Diff: python/pyspark/shuffle.py --- @@ -244,72 +258,57 @@ def _next_limit(self): def mergeValues(self, iterator): Combine the items by creator and combiner -iterator = iter(iterator) # speedup attribute lookup creator, comb = self.agg.createCombiner, self.agg.mergeValue -d, c, batch = self.data, 0, self.batch +c, data, pdata, hfun, batch = 0, self.data, self.pdata, self._partition, self.batch +limit = self.memory_limit for k, v in iterator: +d = pdata[hfun(k)] if pdata else data d[k] = comb(d[k], v) if k in d else creator(v) c += 1 -if c % batch == 0 and get_used_memory() self.memory_limit: -self._spill() -self._partitioned_mergeValues(iterator, self._next_limit()) -break +if c = batch: +if get_used_memory() = limit: +self._spill() +limit = self._next_limit() +batch /= 2 +c = 0 +else: +batch *= 1.5 --- End diff -- Would it make sense to do a final memory check after the for loop, in case we just added another 999 items (worst case) without spilling any of it to disk? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user mkhaitman commented on the pull request: https://github.com/apache/spark/pull/1977#issuecomment-86236377 This PR looks amazing! I'm going to test this out tomorrow with 1.3-rc3 and report back with some findings. I started taking a stab initially at trying to improve the shuffle memory handling a bit but it looks like I don't have to now! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user airhorns commented on the pull request: https://github.com/apache/spark/pull/1977#issuecomment-78116054 Hey @davies this would be freakin' fantastic to get merged... any chance that might happen soon? We hit many issues with skewed group sizes causing a whole job to fail and in my brief experimentation I think this would make things far more stable for us. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user davies commented on the pull request: https://github.com/apache/spark/pull/1977#issuecomment-78118374 Ping @JoshRosen --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1977#issuecomment-68388416 [Test build #24904 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24904/consoleFull) for PR 1977 at commit [`2b9c261`](https://github.com/apache/spark/commit/2b9c261497fab4bb8d310d30e0017aa3b3d2f37f). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1977#issuecomment-68400348 **[Test build #24904 timed out](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24904/consoleFull)** for PR 1977 at commit [`2b9c261`](https://github.com/apache/spark/commit/2b9c261497fab4bb8d310d30e0017aa3b3d2f37f) after a configured wait of `120m`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/1977#issuecomment-68400353 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24904/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1977#issuecomment-68400955 [Test build #557 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/557/consoleFull) for PR 1977 at commit [`2b9c261`](https://github.com/apache/spark/commit/2b9c261497fab4bb8d310d30e0017aa3b3d2f37f). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1977#issuecomment-68407892 [Test build #557 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/557/consoleFull) for PR 1977 at commit [`2b9c261`](https://github.com/apache/spark/commit/2b9c261497fab4bb8d310d30e0017aa3b3d2f37f). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `class ResultIterable(object):` * `class FlattedValuesSerializer(BatchedSerializer):` * `class ExternalList(object):` * `class GroupByKey(object):` * `class ChainedIterable(object):` * `class ExternalGroupBy(ExternalMerger):` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user JoshRosen commented on the pull request: https://github.com/apache/spark/pull/1977#issuecomment-63164145 I agree that this is a good fix; I've been letting the review slip because this PR is pretty complex and it will take me a decent amount of time to be sure that it's correct in all cases. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user davies commented on the pull request: https://github.com/apache/spark/pull/1977#issuecomment-62447893 @JoshRosen @mateiz Could we make this into 1.2 ? it had sit here for 2 months. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1977#issuecomment-62448001 [Test build #23156 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/23156/consoleFull) for PR 1977 at commit [`70aadcd`](https://github.com/apache/spark/commit/70aadcdba2952c38e3ce531ffd25761b553c1369). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/1977#issuecomment-62460926 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/23156/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1977#issuecomment-62460909 [Test build #23156 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/23156/consoleFull) for PR 1977 at commit [`70aadcd`](https://github.com/apache/spark/commit/70aadcdba2952c38e3ce531ffd25761b553c1369). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `public final class JavaRecoverableNetworkWordCount ` * `class ResultIterable(object):` * `class FlattedValuesSerializer(BatchedSerializer):` * `class ExternalList(object):` * `class GroupByKey(object):` * `class ChainedIterable(object):` * `class ExternalGroupBy(ExternalMerger):` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/1977#discussion_r20132241 --- Diff: python/pyspark/shuffle.py --- @@ -520,6 +505,295 @@ def sorted(self, iterator, key=None, reverse=False): return heapq.merge(chunks, key=key, reverse=reverse) +class ExternalList(object): + +ExternalList can have many items which cannot be hold in memory in +the same time. + + l = ExternalList(range(100)) + len(l) +100 + l.append(10) + len(l) +101 + for i in range(10240): +... l.append(i) + len(l) +10341 + import pickle + l2 = pickle.loads(pickle.dumps(l)) + len(l2) +10341 + list(l2)[100] +10 + +LIMIT = 10240 + +def __init__(self, values): +self.values = values +self.disk_count = 0 +self._file = None +self._ser = None + +def __getstate__(self): +if self._file is not None: +self._file.flush() +f = os.fdopen(os.dup(self._file.fileno())) +f.seek(0) +bytes = f.read() +else: +bytes = '' +return self.values, self.disk_count, bytes + +def __setstate__(self, item): +self.values, self.disk_count, bytes = item +if bytes: +self._open_file() +self._file.write(bytes) +else: +self._file = None +self._ser = None + +def __iter__(self): +if self._file is not None: +self._file.flush() +# read all items from disks first +with os.fdopen(os.dup(self._file.fileno()), 'r') as f: +f.seek(0) +for values in self._ser.load_stream(f): +for v in values: +yield v + +for v in self.values: +yield v + +def __len__(self): +return self.disk_count + len(self.values) + +def append(self, value): +self.values.append(value) +# dump them into disk if the key is huge +if len(self.values) = self.LIMIT: +self._spill() + +def _open_file(self): +dirs = _get_local_dirs(objects) +d = dirs[id(self) % len(dirs)] +if not os.path.exists(d): +os.makedirs(d) +p = os.path.join(d, str(id)) +self._file = open(p, w+, 65536) +self._ser = CompressedSerializer(PickleSerializer()) +os.unlink(p) + +def _spill(self): + dump the values into disk +global MemoryBytesSpilled, DiskBytesSpilled +if self._file is None: +self._open_file() + +used_memory = get_used_memory() +pos = self._file.tell() +self._ser.dump_stream([self.values], self._file) +self.disk_count += len(self.values) +self.values = [] +gc.collect() +DiskBytesSpilled += self._file.tell() - pos +MemoryBytesSpilled += (used_memory - get_used_memory()) 20 + + +class GroupByKey(object): + +group a sorted iterator into [(k1, it1), (k2, it2), ...] + + k = [i/3 for i in range(6)] + v = [i for i in range(6)] + g = GroupByKey(iter(zip(k, v))) + [(k, list(it)) for k, it in g] +[(0, [0, 1, 2]), (1, [3, 4, 5])] + +def __init__(self, iterator): +self.iterator = iterator +self.next_item = None + +def __iter__(self): +return self + +def next(self): +key, value = self.next_item if self.next_item else next(self.iterator) +values = ExternalList([value]) +try: +while True: +k, v = next(self.iterator) +if k != key: +self.next_item = (k, v) +break +values.append(v) +except StopIteration: +self.next_item = None +return key, values + + +class ChainedIterable(object): + +Picklable chained iterator, similar to itertools.chain.fromiterable() + +def __init__(self, iterators): +self.iterators = iterators + +def __len__(self): +return sum(len(vs) for vs in self.iterators) + +def __iter__(self): +return itertools.chain.fromiterable(self.iterators) + + +class ExternalGroupBy(ExternalMerger): + + +Group by the items by key. If any partition of them can not been +hold
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/1977#discussion_r20132424 --- Diff: python/pyspark/rdd.py --- @@ -1579,21 +1577,34 @@ def createZero(): return self.combineByKey(lambda v: func(createZero(), v), func, func, numPartitions) +def _can_spill(self): +return self.ctx._conf.get(spark.shuffle.spill, True).lower() == true + +def _memory_limit(self): +return _parse_memory(self.ctx._conf.get(spark.python.worker.memory, 512m)) + # TODO: support variant with custom partitioner def groupByKey(self, numPartitions=None): Group the values for each key in the RDD into a single sequence. -Hash-partitions the resulting RDD with into numPartitions partitions. +Hash-partitions the resulting RDD with into numPartitions --- End diff -- with into should just be into --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/1977#issuecomment-61682581 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22878/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1977#issuecomment-61682571 [Test build #22878 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22878/consoleFull) for PR 1977 at commit [`a14b4bd`](https://github.com/apache/spark/commit/a14b4bdd1cb72f41a5449326888d600cbe183062). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `class ResultIterable(object):` * `class FlattedValuesSerializer(BatchedSerializer):` * `class ExternalList(object):` * `class GroupByKey(object):` * `class ChainedIterable(object):` * `class ExternalGroupBy(ExternalMerger):` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user JoshRosen commented on the pull request: https://github.com/apache/spark/pull/1977#issuecomment-61338247 Jenkins, retest this please. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1977#issuecomment-61338701 [Test build #22651 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22651/consoleFull) for PR 1977 at commit [`ab5515b`](https://github.com/apache/spark/commit/ab5515b4eaf93e95a9d9e04f11538119ca25030b). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1977#issuecomment-61345056 [Test build #22651 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22651/consoleFull) for PR 1977 at commit [`ab5515b`](https://github.com/apache/spark/commit/ab5515b4eaf93e95a9d9e04f11538119ca25030b). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `class ResultIterable(object):` * `class FlattedValuesSerializer(BatchedSerializer):` * `class ExternalList(object):` * `class GroupByKey(object):` * `class ChainedIterable(object):` * `class ExternalGroupBy(ExternalMerger):` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user davies commented on the pull request: https://github.com/apache/spark/pull/1977#issuecomment-60717966 Jenkins, test this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1977#issuecomment-60718072 [Test build #484 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/484/consoleFull) for PR 1977 at commit [`ab5515b`](https://github.com/apache/spark/commit/ab5515b4eaf93e95a9d9e04f11538119ca25030b). * This patch **does not merge cleanly**. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1977#issuecomment-60718263 [Test build #22348 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22348/consoleFull) for PR 1977 at commit [`ab5515b`](https://github.com/apache/spark/commit/ab5515b4eaf93e95a9d9e04f11538119ca25030b). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1977#issuecomment-60722423 [Test build #22348 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22348/consoleFull) for PR 1977 at commit [`ab5515b`](https://github.com/apache/spark/commit/ab5515b4eaf93e95a9d9e04f11538119ca25030b). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `class ResultIterable(object):` * `class FlattedValuesSerializer(BatchedSerializer):` * `class ExternalList(object):` * `class GroupByKey(object):` * `class ChainedIterable(object):` * `class ExternalGroupBy(ExternalMerger):` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/1977#issuecomment-60722431 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22348/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/1977#issuecomment-60473744 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22199/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1977#issuecomment-60473742 [Test build #22199 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22199/consoleFull) for PR 1977 at commit [`ab5515b`](https://github.com/apache/spark/commit/ab5515b4eaf93e95a9d9e04f11538119ca25030b). * This patch **fails PySpark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1977#issuecomment-60474811 **[Test build #431 timed out](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/431/consoleFull)** for PR 1977 at commit [`651f891`](https://github.com/apache/spark/commit/651f8919c0fe8b55967823f1f051ea85e0c4daba) after a configured wait of `120m`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1977#issuecomment-60472577 [Test build #431 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/431/consoleFull) for PR 1977 at commit [`651f891`](https://github.com/apache/spark/commit/651f8919c0fe8b55967823f1f051ea85e0c4daba). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1977#issuecomment-60472642 [Test build #22199 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22199/consoleFull) for PR 1977 at commit [`ab5515b`](https://github.com/apache/spark/commit/ab5515b4eaf93e95a9d9e04f11538119ca25030b). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1977#issuecomment-58248987 [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21397/consoleFull) for PR 1977 at commit [`651f891`](https://github.com/apache/spark/commit/651f8919c0fe8b55967823f1f051ea85e0c4daba). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user davies commented on the pull request: https://github.com/apache/spark/pull/1977#issuecomment-58250188 I had simplify GroupByKey, it's much readable now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/1977#issuecomment-58259787 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21397/Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org