[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user davies commented on the pull request: https://github.com/apache/spark/pull/1460#issuecomment-50119786 Awesome! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1460#discussion_r15333099 --- Diff: python/pyspark/shuffle.py --- @@ -0,0 +1,436 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the License); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an AS IS BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import sys +import platform +import shutil +import warnings + +from pyspark.serializers import BatchedSerializer, PickleSerializer + +try: +import psutil + +def get_used_memory(): + Return the used memory in MB +process = psutil.Process(os.getpid()) +if hasattr(process, memory_info): +info = process.memory_info() +else: +info = process.get_memory_info() +return info.rss 20 +except ImportError: + +def get_used_memory(): + Return the used memory in MB +if platform.system() == 'Linux': +for line in open('/proc/self/status'): +if line.startswith('VmRSS:'): +return int(line.split()[1]) 10 +else: +warnings.warn(Please install psutil to have better +support with spilling) +if platform.system() == Darwin: +import resource +rss = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss +return rss 20 +# TODO: support windows +return 0 + + +class Aggregator(object): + + +Aggregator has tree functions to merge values into combiner. + +createCombiner: (value) - combiner +mergeValue: (combine, value) - combiner +mergeCombiners: (combiner, combiner) - combiner + + +def __init__(self, createCombiner, mergeValue, mergeCombiners): +self.createCombiner = createCombiner +self.mergeValue = mergeValue +self.mergeCombiners = mergeCombiners + + +class SimpleAggregator(Aggregator): + + +SimpleAggregator is useful for the cases that combiners have +same type with values + + +def __init__(self, combiner): +Aggregator.__init__(self, lambda x: x, combiner, combiner) + + +class Merger(object): + + +Merge shuffled data together by aggregator + + +def __init__(self, aggregator): +self.agg = aggregator + +def mergeValues(self, iterator): + Combine the items by creator and combiner +raise NotImplementedError + +def mergeCombiners(self, iterator): + Merge the combined items by mergeCombiner +raise NotImplementedError + +def iteritems(self): + Return the merged items ad iterator +raise NotImplementedError + + +class InMemoryMerger(Merger): + + +In memory merger based on in-memory dict. + + +def __init__(self, aggregator): +Merger.__init__(self, aggregator) +self.data = {} + +def mergeValues(self, iterator): + Combine the items by creator and combiner +# speed up attributes lookup +d, creator = self.data, self.agg.createCombiner +comb = self.agg.mergeValue +for k, v in iterator: +d[k] = comb(d[k], v) if k in d else creator(v) + +def mergeCombiners(self, iterator): + Merge the combined items by mergeCombiner +# speed up attributes lookup +d, comb = self.data, self.agg.mergeCombiners +for k, v in iterator: +d[k] = comb(d[k], v) if k in d else v + +def iteritems(self): + Return the merged items ad iterator +return self.data.iteritems() + + +class ExternalMerger(Merger): + + +External merger will dump the aggregated data into disks when +memory usage goes above the limit, then merge them together. + +This
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1460#issuecomment-49976553 QA tests have started for PR 1460. This patch merges cleanly. brView progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17110/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1460#issuecomment-49980302 QA results for PR 1460:br- This patch PASSES unit tests.br- This patch merges cleanlybr- This patch adds the following public classes (experimental):brclass AutoSerializer(FramedSerializer):brclass Aggregator(object):brclass SimpleAggregator(Aggregator):brclass Merger(object):brclass InMemoryMerger(Merger):brclass ExternalMerger(Merger):brbrFor more information see test ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17110/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user mateiz commented on the pull request: https://github.com/apache/spark/pull/1460#issuecomment-50072248 Hey Davies, I tried this out a bit and saw two issues / areas for improvement: 1) Since the ExternalMerger is used in both map tasks and reduce tasks, one problem that can happen is that the reduce task's data is already hashed modulo the # of reduce tasks, and so you get many empty buckets. For example, if you have 2 reduce tasks, task 0 gets all the values whose hash code is even, so it can only use half its buckets. If you have 64 reduce tasks, only one bucket is used. The best way to fix this would be to hash values with a random hash function when choosing the bucket. One simple way might be to generate a random integer X for each ExternalMerger and then take hash((key, X)) instead of hash(key) when choosing the bucket. This is equivalent to salting your hash function. Maybe you have other ideas but I'd suggest trying this first. 2) I also noticed that sometimes maps would fill up again before the old memory was fully freed, leading to smaller spills. For example, for (Int, Int) pairs the first spill from 512 MB memory is about 68 MB of files, but later spills were only around 20 MB. I found that I could get better performance overall by adding some gc.collect() calls after every data.clear() and pdata.clear(). This freed more memory faster and allowed us to do more work in memory before spilling. The perf difference for one test job was around 30% but you should try it on your own jobs. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user mateiz commented on the pull request: https://github.com/apache/spark/pull/1460#issuecomment-50072351 BTW here's a patch that adds the GC calls I talked about above: https://gist.github.com/mateiz/297b8618ed033e7c8005 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1460#issuecomment-50098449 QA tests have started for PR 1460. This patch merges cleanly. brView progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17152/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1460#issuecomment-50099568 QA tests have started for PR 1460. This patch merges cleanly. brView progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17153/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1460#issuecomment-50101034 QA results for PR 1460:br- This patch FAILED unit tests.br- This patch merges cleanlybr- This patch adds the following public classes (experimental):brclass AutoSerializer(FramedSerializer):brclass Aggregator(object):brclass SimpleAggregator(Aggregator):brclass Merger(object):brclass InMemoryMerger(Merger):brclass ExternalMerger(Merger):brbrFor more information see test ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17152/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1460#issuecomment-50102382 QA results for PR 1460:br- This patch PASSES unit tests.br- This patch merges cleanlybr- This patch adds the following public classes (experimental):brclass AutoSerializer(FramedSerializer):brclass Aggregator(object):brclass SimpleAggregator(Aggregator):brclass Merger(object):brclass InMemoryMerger(Merger):brclass ExternalMerger(Merger):brbrFor more information see test ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17153/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user mateiz commented on the pull request: https://github.com/apache/spark/pull/1460#issuecomment-50110575 Thanks Davies. I've merged this in. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/1460 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1460#discussion_r15274517 --- Diff: python/pyspark/shuffle.py --- @@ -0,0 +1,378 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the License); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an AS IS BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import sys +import platform +import shutil +import warnings + +from pyspark.serializers import BatchedSerializer, PickleSerializer + +try: +import psutil + +def get_used_memory(): +self = psutil.Process(os.getpid()) +return self.memory_info().rss 20 + +except ImportError: + +def get_used_memory(): +if platform.system() == 'Linux': +for line in open('/proc/self/status'): +if line.startswith('VmRSS:'): +return int(line.split()[1]) 10 +else: +warnings.warn(please install psutil to get accurate memory usage) +if platform.system() == Darwin: +import resource +return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss 20 +# TODO: support windows +return 0 + + +class Aggregator(object): + +def __init__(self, creator, combiner, mergeCombiner=None): +self.creator = creator +self.combiner = combiner +self.mergeCombiner = mergeCombiner or combiner + + +class Merger(object): + + +merge shuffled data together by aggregator + + +def __init__(self, aggregator): +self.agg = aggregator + +def combine(self, iterator): + combine the items by creator and combiner +raise NotImplementedError + +def merge(self, iterator): + merge the combined items by mergeCombiner +raise NotImplementedError + +def iteritems(self): + return the merged items ad iterator +raise NotImplementedError + + +class InMemoryMerger(Merger): + + +In memory merger based on in-memory dict. + + +def __init__(self, aggregator): +Merger.__init__(self, aggregator) +self.data = {} + +def combine(self, iterator): + combine the items by creator and combiner +# speed up attributes lookup +d, creator, comb = self.data, self.agg.creator, self.agg.combiner +for k, v in iterator: +d[k] = comb(d[k], v) if k in d else creator(v) + +def merge(self, iterator): + merge the combined items by mergeCombiner +# speed up attributes lookup +d, comb = self.data, self.agg.mergeCombiner +for k, v in iterator: +d[k] = comb(d[k], v) if k in d else v + +def iteritems(self): + return the merged items ad iterator +return self.data.iteritems() + + +class ExternalMerger(Merger): + + +External merger will dump the aggregated data into disks when +memory usage goes above the limit, then merge them together. + +This class works as follows: + +- It repeatedly combine the items and save them in one dict in + memory. + +- When the used memory goes above memory limit, it will split + the combined data into partitions by hash code, dump them + into disk, one file per partition. + +- Then it goes through the rest of the iterator, combine items + into different dict by hash. Until the used memory goes over + memory limit, it dump all the dicts into disks, one file per + dict. Repeat this again until combine all the items. + +- Before return any items, it will load each partition and + combine them seperately. Yield them before loading next + partition. + +- During loading a partition, if the memory goes over limit, + it will partition the loaded data and dump them into disks +
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1460#discussion_r15274557 --- Diff: python/pyspark/shuffle.py --- @@ -0,0 +1,378 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the License); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an AS IS BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import sys +import platform +import shutil +import warnings + +from pyspark.serializers import BatchedSerializer, PickleSerializer + +try: +import psutil + +def get_used_memory(): +self = psutil.Process(os.getpid()) +return self.memory_info().rss 20 + +except ImportError: + +def get_used_memory(): +if platform.system() == 'Linux': +for line in open('/proc/self/status'): +if line.startswith('VmRSS:'): +return int(line.split()[1]) 10 +else: +warnings.warn(please install psutil to get accurate memory usage) +if platform.system() == Darwin: +import resource +return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss 20 +# TODO: support windows +return 0 + + +class Aggregator(object): + +def __init__(self, creator, combiner, mergeCombiner=None): +self.creator = creator +self.combiner = combiner +self.mergeCombiner = mergeCombiner or combiner + + +class Merger(object): + + +merge shuffled data together by aggregator + + +def __init__(self, aggregator): +self.agg = aggregator + +def combine(self, iterator): + combine the items by creator and combiner +raise NotImplementedError + +def merge(self, iterator): + merge the combined items by mergeCombiner +raise NotImplementedError + +def iteritems(self): + return the merged items ad iterator +raise NotImplementedError + + +class InMemoryMerger(Merger): + + +In memory merger based on in-memory dict. + + +def __init__(self, aggregator): +Merger.__init__(self, aggregator) +self.data = {} + +def combine(self, iterator): + combine the items by creator and combiner +# speed up attributes lookup +d, creator, comb = self.data, self.agg.creator, self.agg.combiner +for k, v in iterator: +d[k] = comb(d[k], v) if k in d else creator(v) + +def merge(self, iterator): + merge the combined items by mergeCombiner +# speed up attributes lookup +d, comb = self.data, self.agg.mergeCombiner +for k, v in iterator: +d[k] = comb(d[k], v) if k in d else v + +def iteritems(self): + return the merged items ad iterator +return self.data.iteritems() + + +class ExternalMerger(Merger): + + +External merger will dump the aggregated data into disks when +memory usage goes above the limit, then merge them together. + +This class works as follows: + +- It repeatedly combine the items and save them in one dict in + memory. + +- When the used memory goes above memory limit, it will split + the combined data into partitions by hash code, dump them + into disk, one file per partition. + +- Then it goes through the rest of the iterator, combine items + into different dict by hash. Until the used memory goes over + memory limit, it dump all the dicts into disks, one file per + dict. Repeat this again until combine all the items. + +- Before return any items, it will load each partition and + combine them seperately. Yield them before loading next + partition. + +- During loading a partition, if the memory goes over limit, + it will partition the loaded data and dump them into disks +
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1460#discussion_r15274753 --- Diff: python/pyspark/shuffle.py --- @@ -0,0 +1,378 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the License); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an AS IS BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import sys +import platform +import shutil +import warnings + +from pyspark.serializers import BatchedSerializer, PickleSerializer + +try: +import psutil + +def get_used_memory(): +self = psutil.Process(os.getpid()) +return self.memory_info().rss 20 + +except ImportError: + +def get_used_memory(): +if platform.system() == 'Linux': +for line in open('/proc/self/status'): +if line.startswith('VmRSS:'): +return int(line.split()[1]) 10 +else: +warnings.warn(please install psutil to get accurate memory usage) +if platform.system() == Darwin: +import resource +return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss 20 +# TODO: support windows +return 0 + + +class Aggregator(object): + +def __init__(self, creator, combiner, mergeCombiner=None): +self.creator = creator +self.combiner = combiner +self.mergeCombiner = mergeCombiner or combiner + + +class Merger(object): + + +merge shuffled data together by aggregator + + +def __init__(self, aggregator): +self.agg = aggregator + +def combine(self, iterator): + combine the items by creator and combiner +raise NotImplementedError + +def merge(self, iterator): + merge the combined items by mergeCombiner +raise NotImplementedError + +def iteritems(self): + return the merged items ad iterator +raise NotImplementedError + + +class InMemoryMerger(Merger): + + +In memory merger based on in-memory dict. + + +def __init__(self, aggregator): +Merger.__init__(self, aggregator) +self.data = {} + +def combine(self, iterator): + combine the items by creator and combiner +# speed up attributes lookup +d, creator, comb = self.data, self.agg.creator, self.agg.combiner +for k, v in iterator: +d[k] = comb(d[k], v) if k in d else creator(v) + +def merge(self, iterator): + merge the combined items by mergeCombiner +# speed up attributes lookup +d, comb = self.data, self.agg.mergeCombiner +for k, v in iterator: +d[k] = comb(d[k], v) if k in d else v + +def iteritems(self): + return the merged items ad iterator +return self.data.iteritems() + + +class ExternalMerger(Merger): + + +External merger will dump the aggregated data into disks when +memory usage goes above the limit, then merge them together. + +This class works as follows: + +- It repeatedly combine the items and save them in one dict in + memory. + +- When the used memory goes above memory limit, it will split + the combined data into partitions by hash code, dump them + into disk, one file per partition. + +- Then it goes through the rest of the iterator, combine items + into different dict by hash. Until the used memory goes over + memory limit, it dump all the dicts into disks, one file per + dict. Repeat this again until combine all the items. + +- Before return any items, it will load each partition and + combine them seperately. Yield them before loading next + partition. + +- During loading a partition, if the memory goes over limit, + it will partition the loaded data and dump them into disks +
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1460#discussion_r15274766 --- Diff: python/pyspark/shuffle.py --- @@ -0,0 +1,378 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the License); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an AS IS BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import sys +import platform +import shutil +import warnings + +from pyspark.serializers import BatchedSerializer, PickleSerializer + +try: +import psutil + +def get_used_memory(): +self = psutil.Process(os.getpid()) +return self.memory_info().rss 20 + +except ImportError: + +def get_used_memory(): +if platform.system() == 'Linux': +for line in open('/proc/self/status'): +if line.startswith('VmRSS:'): +return int(line.split()[1]) 10 +else: +warnings.warn(please install psutil to get accurate memory usage) +if platform.system() == Darwin: +import resource +return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss 20 +# TODO: support windows +return 0 + + +class Aggregator(object): + +def __init__(self, creator, combiner, mergeCombiner=None): +self.creator = creator +self.combiner = combiner +self.mergeCombiner = mergeCombiner or combiner + + +class Merger(object): + + +merge shuffled data together by aggregator + + +def __init__(self, aggregator): +self.agg = aggregator + +def combine(self, iterator): + combine the items by creator and combiner +raise NotImplementedError + +def merge(self, iterator): + merge the combined items by mergeCombiner +raise NotImplementedError + +def iteritems(self): + return the merged items ad iterator +raise NotImplementedError + + +class InMemoryMerger(Merger): + + +In memory merger based on in-memory dict. + + +def __init__(self, aggregator): +Merger.__init__(self, aggregator) +self.data = {} + +def combine(self, iterator): + combine the items by creator and combiner +# speed up attributes lookup +d, creator, comb = self.data, self.agg.creator, self.agg.combiner +for k, v in iterator: +d[k] = comb(d[k], v) if k in d else creator(v) + +def merge(self, iterator): + merge the combined items by mergeCombiner +# speed up attributes lookup +d, comb = self.data, self.agg.mergeCombiner +for k, v in iterator: +d[k] = comb(d[k], v) if k in d else v + +def iteritems(self): + return the merged items ad iterator +return self.data.iteritems() + + +class ExternalMerger(Merger): + + +External merger will dump the aggregated data into disks when +memory usage goes above the limit, then merge them together. + +This class works as follows: + +- It repeatedly combine the items and save them in one dict in + memory. + +- When the used memory goes above memory limit, it will split + the combined data into partitions by hash code, dump them + into disk, one file per partition. + +- Then it goes through the rest of the iterator, combine items + into different dict by hash. Until the used memory goes over + memory limit, it dump all the dicts into disks, one file per + dict. Repeat this again until combine all the items. + +- Before return any items, it will load each partition and + combine them seperately. Yield them before loading next + partition. + +- During loading a partition, if the memory goes over limit, + it will partition the loaded data and dump them into disks +
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1460#discussion_r15274877 --- Diff: python/pyspark/shuffle.py --- @@ -0,0 +1,416 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the License); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an AS IS BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import sys +import platform +import shutil +import warnings + +from pyspark.serializers import BatchedSerializer, PickleSerializer + +try: +import psutil + +def get_used_memory(): + return the used memory in MB +self = psutil.Process(os.getpid()) +return self.memory_info().rss 20 + +except ImportError: + +def get_used_memory(): + return the used memory in MB +if platform.system() == 'Linux': +for line in open('/proc/self/status'): +if line.startswith('VmRSS:'): +return int(line.split()[1]) 10 +else: +warnings.warn(please install psutil to have better +support with spilling) +if platform.system() == Darwin: +import resource +rss = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss +return rss 20 +# TODO: support windows +return 0 + + +class Aggregator(object): + + +Aggregator has tree functions to merge values into combiner. + +createCombiner: (value) - combiner +mergeValue: (combine, value) - combiner +mergeCombiners: (combiner, combiner) - combiner + + +def __init__(self, createCombiner, mergeValue, mergeCombiners): +self.createCombiner = createCombiner +self.mergeValue = mergeValue +self.mergeCombiners = mergeCombiners + + +class SimpleAggregator(Aggregator): + + +SimpleAggregator is useful for the cases that combiners have +same type with values + + +def __init__(self, combiner): +Aggregator.__init__(self, lambda x: x, combiner, combiner) + + +class Merger(object): + + +merge shuffled data together by aggregator + + +def __init__(self, aggregator): +self.agg = aggregator + +def mergeValues(self, iterator): + combine the items by creator and combiner +raise NotImplementedError + +def mergeCombiners(self, iterator): + merge the combined items by mergeCombiner +raise NotImplementedError + +def iteritems(self): + return the merged items ad iterator +raise NotImplementedError + + +class InMemoryMerger(Merger): + + +In memory merger based on in-memory dict. + + +def __init__(self, aggregator): +Merger.__init__(self, aggregator) +self.data = {} + +def mergeValues(self, iterator): + combine the items by creator and combiner +# speed up attributes lookup +d, creator = self.data, self.agg.createCombiner +comb = self.agg.mergeValue +for k, v in iterator: +d[k] = comb(d[k], v) if k in d else creator(v) + +def mergeCombiners(self, iterator): + merge the combined items by mergeCombiner +# speed up attributes lookup +d, comb = self.data, self.agg.mergeCombiners +for k, v in iterator: +d[k] = comb(d[k], v) if k in d else v + +def iteritems(self): + return the merged items ad iterator +return self.data.iteritems() + + +class ExternalMerger(Merger): + + +External merger will dump the aggregated data into disks when +memory usage goes above the limit, then merge them together. + +This class works as follows: + +- It repeatedly combine the items and save them in one dict in + memory. + +- When
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1460#discussion_r15274897 --- Diff: python/pyspark/shuffle.py --- @@ -0,0 +1,416 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the License); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an AS IS BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import sys +import platform +import shutil +import warnings + +from pyspark.serializers import BatchedSerializer, PickleSerializer + +try: +import psutil + +def get_used_memory(): + return the used memory in MB +self = psutil.Process(os.getpid()) +return self.memory_info().rss 20 + +except ImportError: + +def get_used_memory(): + return the used memory in MB +if platform.system() == 'Linux': +for line in open('/proc/self/status'): +if line.startswith('VmRSS:'): +return int(line.split()[1]) 10 +else: +warnings.warn(please install psutil to have better +support with spilling) +if platform.system() == Darwin: +import resource +rss = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss +return rss 20 +# TODO: support windows +return 0 + + +class Aggregator(object): + + +Aggregator has tree functions to merge values into combiner. + +createCombiner: (value) - combiner +mergeValue: (combine, value) - combiner +mergeCombiners: (combiner, combiner) - combiner + + +def __init__(self, createCombiner, mergeValue, mergeCombiners): +self.createCombiner = createCombiner +self.mergeValue = mergeValue +self.mergeCombiners = mergeCombiners + + +class SimpleAggregator(Aggregator): + + +SimpleAggregator is useful for the cases that combiners have +same type with values + + +def __init__(self, combiner): +Aggregator.__init__(self, lambda x: x, combiner, combiner) + + +class Merger(object): + + +merge shuffled data together by aggregator --- End diff -- Small style thing: capitalize the beginning of each doc comment to match the rest of our code. (This applies to a bunch of places.) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user mattf commented on a diff in the pull request: https://github.com/apache/spark/pull/1460#discussion_r15286548 --- Diff: python/pyspark/rdd.py --- @@ -1209,18 +1227,44 @@ def partitionBy(self, numPartitions, partitionFunc=portable_hash): # Transferring O(n) objects to Java is too expensive. Instead, we'll # form the hash buckets in Python, transferring O(numPartitions) objects -# to Java. Each object is a (splitNumber, [objects]) pair. +# to Java. Each object is a (splitNumber, [objects]) pair. +# In order to void too huge objects, the objects are grouped into chunks. outputSerializer = self.ctx._unbatched_serializer +limit = (_parse_memory(self.ctx._conf.get(spark.python.worker.memory) + or 512m) / 2) --- End diff -- instead of 'conf.get(xyz) or default' why not use 'conf.get(xyz, default)'? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user mattf commented on a diff in the pull request: https://github.com/apache/spark/pull/1460#discussion_r15286571 --- Diff: python/pyspark/rdd.py --- @@ -1265,26 +1309,26 @@ def combineByKey(self, createCombiner, mergeValue, mergeCombiners, if numPartitions is None: numPartitions = self._defaultReducePartitions() +serializer = self.ctx.serializer +spill = (self.ctx._conf.get(spark.shuffle.spill) or 'True').lower() == 'true' +memory = _parse_memory(self.ctx._conf.get(spark.python.worker.memory) or 512m) +agg = Aggregator(createCombiner, mergeValue, mergeCombiners) + def combineLocally(iterator): -combiners = {} -for x in iterator: -(k, v) = x -if k not in combiners: -combiners[k] = createCombiner(v) -else: -combiners[k] = mergeValue(combiners[k], v) -return combiners.iteritems() +merger = ExternalMerger(agg, memory, serializer) \ + if spill else InMemoryMerger(agg) +merger.mergeValues(iterator) +return merger.iteritems() + locally_combined = self.mapPartitions(combineLocally) shuffled = locally_combined.partitionBy(numPartitions) - + --- End diff -- stray whitespace --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user mattf commented on a diff in the pull request: https://github.com/apache/spark/pull/1460#discussion_r15286886 --- Diff: python/pyspark/shuffle.py --- @@ -0,0 +1,416 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the License); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an AS IS BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import sys +import platform +import shutil +import warnings + +from pyspark.serializers import BatchedSerializer, PickleSerializer + +try: +import psutil + +def get_used_memory(): + return the used memory in MB +self = psutil.Process(os.getpid()) +return self.memory_info().rss 20 + +except ImportError: + +def get_used_memory(): + return the used memory in MB +if platform.system() == 'Linux': +for line in open('/proc/self/status'): +if line.startswith('VmRSS:'): +return int(line.split()[1]) 10 +else: +warnings.warn(please install psutil to have better +support with spilling) +if platform.system() == Darwin: +import resource +rss = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss +return rss 20 +# TODO: support windows +return 0 + + +class Aggregator(object): + + +Aggregator has tree functions to merge values into combiner. + +createCombiner: (value) - combiner +mergeValue: (combine, value) - combiner +mergeCombiners: (combiner, combiner) - combiner + + +def __init__(self, createCombiner, mergeValue, mergeCombiners): +self.createCombiner = createCombiner +self.mergeValue = mergeValue +self.mergeCombiners = mergeCombiners + + +class SimpleAggregator(Aggregator): + + +SimpleAggregator is useful for the cases that combiners have +same type with values + + +def __init__(self, combiner): +Aggregator.__init__(self, lambda x: x, combiner, combiner) + + +class Merger(object): + + +merge shuffled data together by aggregator + + +def __init__(self, aggregator): +self.agg = aggregator + +def mergeValues(self, iterator): + combine the items by creator and combiner +raise NotImplementedError + +def mergeCombiners(self, iterator): + merge the combined items by mergeCombiner +raise NotImplementedError + +def iteritems(self): + return the merged items ad iterator +raise NotImplementedError + + +class InMemoryMerger(Merger): + + +In memory merger based on in-memory dict. + + +def __init__(self, aggregator): +Merger.__init__(self, aggregator) +self.data = {} + +def mergeValues(self, iterator): + combine the items by creator and combiner +# speed up attributes lookup +d, creator = self.data, self.agg.createCombiner +comb = self.agg.mergeValue +for k, v in iterator: +d[k] = comb(d[k], v) if k in d else creator(v) + +def mergeCombiners(self, iterator): + merge the combined items by mergeCombiner +# speed up attributes lookup +d, comb = self.data, self.agg.mergeCombiners +for k, v in iterator: +d[k] = comb(d[k], v) if k in d else v + +def iteritems(self): + return the merged items ad iterator +return self.data.iteritems() + + +class ExternalMerger(Merger): + + +External merger will dump the aggregated data into disks when +memory usage goes above the limit, then merge them together. + +This class works as follows: + +- It repeatedly combine the items and save them in one dict in + memory. + +- When the
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/1460#discussion_r15299925 --- Diff: python/pyspark/shuffle.py --- @@ -0,0 +1,416 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the License); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an AS IS BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import sys +import platform +import shutil +import warnings + +from pyspark.serializers import BatchedSerializer, PickleSerializer + +try: +import psutil + +def get_used_memory(): + return the used memory in MB +self = psutil.Process(os.getpid()) +return self.memory_info().rss 20 + +except ImportError: + +def get_used_memory(): + return the used memory in MB +if platform.system() == 'Linux': +for line in open('/proc/self/status'): +if line.startswith('VmRSS:'): +return int(line.split()[1]) 10 +else: +warnings.warn(please install psutil to have better +support with spilling) +if platform.system() == Darwin: +import resource +rss = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss +return rss 20 +# TODO: support windows +return 0 + + +class Aggregator(object): + + +Aggregator has tree functions to merge values into combiner. + +createCombiner: (value) - combiner +mergeValue: (combine, value) - combiner +mergeCombiners: (combiner, combiner) - combiner + + +def __init__(self, createCombiner, mergeValue, mergeCombiners): +self.createCombiner = createCombiner +self.mergeValue = mergeValue +self.mergeCombiners = mergeCombiners + + +class SimpleAggregator(Aggregator): + + +SimpleAggregator is useful for the cases that combiners have +same type with values + + +def __init__(self, combiner): +Aggregator.__init__(self, lambda x: x, combiner, combiner) + + +class Merger(object): + + +merge shuffled data together by aggregator + + +def __init__(self, aggregator): +self.agg = aggregator + +def mergeValues(self, iterator): + combine the items by creator and combiner +raise NotImplementedError + +def mergeCombiners(self, iterator): + merge the combined items by mergeCombiner +raise NotImplementedError + +def iteritems(self): + return the merged items ad iterator +raise NotImplementedError + + +class InMemoryMerger(Merger): + + +In memory merger based on in-memory dict. + + +def __init__(self, aggregator): +Merger.__init__(self, aggregator) +self.data = {} + +def mergeValues(self, iterator): + combine the items by creator and combiner +# speed up attributes lookup +d, creator = self.data, self.agg.createCombiner +comb = self.agg.mergeValue +for k, v in iterator: +d[k] = comb(d[k], v) if k in d else creator(v) + +def mergeCombiners(self, iterator): + merge the combined items by mergeCombiner +# speed up attributes lookup +d, comb = self.data, self.agg.mergeCombiners +for k, v in iterator: +d[k] = comb(d[k], v) if k in d else v + +def iteritems(self): + return the merged items ad iterator +return self.data.iteritems() + + +class ExternalMerger(Merger): + + +External merger will dump the aggregated data into disks when +memory usage goes above the limit, then merge them together. + +This class works as follows: + +- It repeatedly combine the items and save them in one dict in + memory. + +- When
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/1460#discussion_r15299960 --- Diff: python/pyspark/rdd.py --- @@ -1209,18 +1227,44 @@ def partitionBy(self, numPartitions, partitionFunc=portable_hash): # Transferring O(n) objects to Java is too expensive. Instead, we'll # form the hash buckets in Python, transferring O(numPartitions) objects -# to Java. Each object is a (splitNumber, [objects]) pair. +# to Java. Each object is a (splitNumber, [objects]) pair. +# In order to void too huge objects, the objects are grouped into chunks. outputSerializer = self.ctx._unbatched_serializer +limit = (_parse_memory(self.ctx._conf.get(spark.python.worker.memory) + or 512m) / 2) --- End diff -- I did not find out this, sorry. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/1460#discussion_r15300037 --- Diff: python/pyspark/shuffle.py --- @@ -0,0 +1,416 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the License); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an AS IS BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import sys +import platform +import shutil +import warnings + +from pyspark.serializers import BatchedSerializer, PickleSerializer + +try: +import psutil + +def get_used_memory(): + return the used memory in MB +self = psutil.Process(os.getpid()) +return self.memory_info().rss 20 + +except ImportError: + +def get_used_memory(): + return the used memory in MB +if platform.system() == 'Linux': +for line in open('/proc/self/status'): +if line.startswith('VmRSS:'): +return int(line.split()[1]) 10 +else: +warnings.warn(please install psutil to have better +support with spilling) +if platform.system() == Darwin: +import resource +rss = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss +return rss 20 +# TODO: support windows +return 0 + + +class Aggregator(object): + + +Aggregator has tree functions to merge values into combiner. + +createCombiner: (value) - combiner +mergeValue: (combine, value) - combiner +mergeCombiners: (combiner, combiner) - combiner + + +def __init__(self, createCombiner, mergeValue, mergeCombiners): +self.createCombiner = createCombiner +self.mergeValue = mergeValue +self.mergeCombiners = mergeCombiners + + +class SimpleAggregator(Aggregator): + + +SimpleAggregator is useful for the cases that combiners have +same type with values + + +def __init__(self, combiner): +Aggregator.__init__(self, lambda x: x, combiner, combiner) + + +class Merger(object): + + +merge shuffled data together by aggregator + + +def __init__(self, aggregator): +self.agg = aggregator + +def mergeValues(self, iterator): + combine the items by creator and combiner +raise NotImplementedError + +def mergeCombiners(self, iterator): + merge the combined items by mergeCombiner +raise NotImplementedError + +def iteritems(self): + return the merged items ad iterator +raise NotImplementedError + + +class InMemoryMerger(Merger): + + +In memory merger based on in-memory dict. + + +def __init__(self, aggregator): +Merger.__init__(self, aggregator) +self.data = {} + +def mergeValues(self, iterator): + combine the items by creator and combiner +# speed up attributes lookup +d, creator = self.data, self.agg.createCombiner +comb = self.agg.mergeValue +for k, v in iterator: +d[k] = comb(d[k], v) if k in d else creator(v) + +def mergeCombiners(self, iterator): + merge the combined items by mergeCombiner +# speed up attributes lookup +d, comb = self.data, self.agg.mergeCombiners +for k, v in iterator: +d[k] = comb(d[k], v) if k in d else v + +def iteritems(self): + return the merged items ad iterator +return self.data.iteritems() + + +class ExternalMerger(Merger): + + +External merger will dump the aggregated data into disks when +memory usage goes above the limit, then merge them together. + +This class works as follows: + +- It repeatedly combine the items and save them in one dict in + memory. + +- When
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/1460#discussion_r15300412 --- Diff: python/pyspark/shuffle.py --- @@ -0,0 +1,378 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the License); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an AS IS BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import sys +import platform +import shutil +import warnings + +from pyspark.serializers import BatchedSerializer, PickleSerializer + +try: +import psutil + +def get_used_memory(): +self = psutil.Process(os.getpid()) +return self.memory_info().rss 20 + +except ImportError: + +def get_used_memory(): +if platform.system() == 'Linux': +for line in open('/proc/self/status'): +if line.startswith('VmRSS:'): +return int(line.split()[1]) 10 +else: +warnings.warn(please install psutil to get accurate memory usage) +if platform.system() == Darwin: +import resource +return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss 20 +# TODO: support windows +return 0 + + +class Aggregator(object): + +def __init__(self, creator, combiner, mergeCombiner=None): +self.creator = creator +self.combiner = combiner +self.mergeCombiner = mergeCombiner or combiner + + +class Merger(object): + + +merge shuffled data together by aggregator + + +def __init__(self, aggregator): +self.agg = aggregator + +def combine(self, iterator): + combine the items by creator and combiner +raise NotImplementedError + +def merge(self, iterator): + merge the combined items by mergeCombiner +raise NotImplementedError + +def iteritems(self): + return the merged items ad iterator +raise NotImplementedError + + +class InMemoryMerger(Merger): + + +In memory merger based on in-memory dict. + + +def __init__(self, aggregator): +Merger.__init__(self, aggregator) +self.data = {} + +def combine(self, iterator): + combine the items by creator and combiner +# speed up attributes lookup +d, creator, comb = self.data, self.agg.creator, self.agg.combiner +for k, v in iterator: +d[k] = comb(d[k], v) if k in d else creator(v) + +def merge(self, iterator): + merge the combined items by mergeCombiner +# speed up attributes lookup +d, comb = self.data, self.agg.mergeCombiner +for k, v in iterator: +d[k] = comb(d[k], v) if k in d else v + +def iteritems(self): + return the merged items ad iterator +return self.data.iteritems() + + +class ExternalMerger(Merger): + + +External merger will dump the aggregated data into disks when +memory usage goes above the limit, then merge them together. + +This class works as follows: + +- It repeatedly combine the items and save them in one dict in + memory. + +- When the used memory goes above memory limit, it will split + the combined data into partitions by hash code, dump them + into disk, one file per partition. + +- Then it goes through the rest of the iterator, combine items + into different dict by hash. Until the used memory goes over + memory limit, it dump all the dicts into disks, one file per + dict. Repeat this again until combine all the items. + +- Before return any items, it will load each partition and + combine them seperately. Yield them before loading next + partition. + +- During loading a partition, if the memory goes over limit, + it will partition the loaded data and dump them into disks +
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/1460#discussion_r15300459 --- Diff: python/pyspark/shuffle.py --- @@ -0,0 +1,378 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the License); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an AS IS BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import sys +import platform +import shutil +import warnings + +from pyspark.serializers import BatchedSerializer, PickleSerializer + +try: +import psutil + +def get_used_memory(): +self = psutil.Process(os.getpid()) +return self.memory_info().rss 20 + +except ImportError: + +def get_used_memory(): +if platform.system() == 'Linux': +for line in open('/proc/self/status'): +if line.startswith('VmRSS:'): +return int(line.split()[1]) 10 +else: +warnings.warn(please install psutil to get accurate memory usage) +if platform.system() == Darwin: +import resource +return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss 20 +# TODO: support windows +return 0 + + +class Aggregator(object): + +def __init__(self, creator, combiner, mergeCombiner=None): +self.creator = creator +self.combiner = combiner +self.mergeCombiner = mergeCombiner or combiner + + +class Merger(object): + + +merge shuffled data together by aggregator + + +def __init__(self, aggregator): +self.agg = aggregator + +def combine(self, iterator): + combine the items by creator and combiner +raise NotImplementedError + +def merge(self, iterator): + merge the combined items by mergeCombiner +raise NotImplementedError + +def iteritems(self): + return the merged items ad iterator +raise NotImplementedError + + +class InMemoryMerger(Merger): + + +In memory merger based on in-memory dict. + + +def __init__(self, aggregator): +Merger.__init__(self, aggregator) +self.data = {} + +def combine(self, iterator): + combine the items by creator and combiner +# speed up attributes lookup +d, creator, comb = self.data, self.agg.creator, self.agg.combiner +for k, v in iterator: +d[k] = comb(d[k], v) if k in d else creator(v) + +def merge(self, iterator): + merge the combined items by mergeCombiner +# speed up attributes lookup +d, comb = self.data, self.agg.mergeCombiner +for k, v in iterator: +d[k] = comb(d[k], v) if k in d else v + +def iteritems(self): + return the merged items ad iterator +return self.data.iteritems() + + +class ExternalMerger(Merger): + + +External merger will dump the aggregated data into disks when +memory usage goes above the limit, then merge them together. + +This class works as follows: + +- It repeatedly combine the items and save them in one dict in + memory. + +- When the used memory goes above memory limit, it will split + the combined data into partitions by hash code, dump them + into disk, one file per partition. + +- Then it goes through the rest of the iterator, combine items + into different dict by hash. Until the used memory goes over + memory limit, it dump all the dicts into disks, one file per + dict. Repeat this again until combine all the items. + +- Before return any items, it will load each partition and + combine them seperately. Yield them before loading next + partition. + +- During loading a partition, if the memory goes over limit, + it will partition the loaded data and dump them into disks +
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/1460#discussion_r15300603 --- Diff: python/pyspark/shuffle.py --- @@ -0,0 +1,378 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the License); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an AS IS BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import sys +import platform +import shutil +import warnings + +from pyspark.serializers import BatchedSerializer, PickleSerializer + +try: +import psutil + +def get_used_memory(): +self = psutil.Process(os.getpid()) +return self.memory_info().rss 20 + +except ImportError: + +def get_used_memory(): +if platform.system() == 'Linux': +for line in open('/proc/self/status'): +if line.startswith('VmRSS:'): +return int(line.split()[1]) 10 +else: +warnings.warn(please install psutil to get accurate memory usage) +if platform.system() == Darwin: +import resource +return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss 20 +# TODO: support windows +return 0 + + +class Aggregator(object): + +def __init__(self, creator, combiner, mergeCombiner=None): +self.creator = creator +self.combiner = combiner +self.mergeCombiner = mergeCombiner or combiner + + +class Merger(object): + + +merge shuffled data together by aggregator + + +def __init__(self, aggregator): +self.agg = aggregator + +def combine(self, iterator): + combine the items by creator and combiner +raise NotImplementedError + +def merge(self, iterator): + merge the combined items by mergeCombiner +raise NotImplementedError + +def iteritems(self): + return the merged items ad iterator +raise NotImplementedError + + +class InMemoryMerger(Merger): + + +In memory merger based on in-memory dict. + + +def __init__(self, aggregator): +Merger.__init__(self, aggregator) +self.data = {} + +def combine(self, iterator): + combine the items by creator and combiner +# speed up attributes lookup +d, creator, comb = self.data, self.agg.creator, self.agg.combiner +for k, v in iterator: +d[k] = comb(d[k], v) if k in d else creator(v) + +def merge(self, iterator): + merge the combined items by mergeCombiner +# speed up attributes lookup +d, comb = self.data, self.agg.mergeCombiner +for k, v in iterator: +d[k] = comb(d[k], v) if k in d else v + +def iteritems(self): + return the merged items ad iterator +return self.data.iteritems() + + +class ExternalMerger(Merger): + + +External merger will dump the aggregated data into disks when +memory usage goes above the limit, then merge them together. + +This class works as follows: + +- It repeatedly combine the items and save them in one dict in + memory. + +- When the used memory goes above memory limit, it will split + the combined data into partitions by hash code, dump them + into disk, one file per partition. + +- Then it goes through the rest of the iterator, combine items + into different dict by hash. Until the used memory goes over + memory limit, it dump all the dicts into disks, one file per + dict. Repeat this again until combine all the items. + +- Before return any items, it will load each partition and + combine them seperately. Yield them before loading next + partition. + +- During loading a partition, if the memory goes over limit, + it will partition the loaded data and dump them into disks +
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1460#issuecomment-49904887 QA tests have started for PR 1460. This patch merges cleanly. brView progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17043/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1460#discussion_r15302973 --- Diff: python/pyspark/rdd.py --- @@ -1207,20 +1225,49 @@ def partitionBy(self, numPartitions, partitionFunc=portable_hash): if numPartitions is None: numPartitions = self._defaultReducePartitions() -# Transferring O(n) objects to Java is too expensive. Instead, we'll -# form the hash buckets in Python, transferring O(numPartitions) objects -# to Java. Each object is a (splitNumber, [objects]) pair. +# Transferring O(n) objects to Java is too expensive. +# Instead, we'll form the hash buckets in Python, +# transferring O(numPartitions) objects to Java. +# Each object is a (splitNumber, [objects]) pair. +# In order to void too huge objects, the objects are --- End diff -- Small typo: void - avoid --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1460#discussion_r15303224 --- Diff: python/pyspark/rdd.py --- @@ -1207,20 +1225,49 @@ def partitionBy(self, numPartitions, partitionFunc=portable_hash): if numPartitions is None: numPartitions = self._defaultReducePartitions() -# Transferring O(n) objects to Java is too expensive. Instead, we'll -# form the hash buckets in Python, transferring O(numPartitions) objects -# to Java. Each object is a (splitNumber, [objects]) pair. +# Transferring O(n) objects to Java is too expensive. +# Instead, we'll form the hash buckets in Python, +# transferring O(numPartitions) objects to Java. +# Each object is a (splitNumber, [objects]) pair. +# In order to void too huge objects, the objects are +# grouped into chunks. outputSerializer = self.ctx._unbatched_serializer +limit = (_parse_memory(self.ctx._conf.get( +spark.python.worker.memory, 512m) / 2) --- End diff -- Could you get the same problem of increasing the limit here that you have in ExternalMerger? (If Python doesn't free memory right away) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1460#discussion_r15303331 --- Diff: python/pyspark/rdd.py --- @@ -1265,26 +1312,28 @@ def combineByKey(self, createCombiner, mergeValue, mergeCombiners, if numPartitions is None: numPartitions = self._defaultReducePartitions() +serializer = self.ctx.serializer +spill = (self.ctx._conf.get(spark.shuffle.spill, 'True').lower() + == 'true') +memory = (_parse_memory(self.ctx._conf.get( +spark.python.worker.memory,512m) --- End diff -- Space after , --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/1460#discussion_r15304302 --- Diff: python/pyspark/rdd.py --- @@ -1207,20 +1225,49 @@ def partitionBy(self, numPartitions, partitionFunc=portable_hash): if numPartitions is None: numPartitions = self._defaultReducePartitions() -# Transferring O(n) objects to Java is too expensive. Instead, we'll -# form the hash buckets in Python, transferring O(numPartitions) objects -# to Java. Each object is a (splitNumber, [objects]) pair. +# Transferring O(n) objects to Java is too expensive. +# Instead, we'll form the hash buckets in Python, +# transferring O(numPartitions) objects to Java. +# Each object is a (splitNumber, [objects]) pair. +# In order to void too huge objects, the objects are +# grouped into chunks. outputSerializer = self.ctx._unbatched_serializer +limit = (_parse_memory(self.ctx._conf.get( +spark.python.worker.memory, 512m) / 2) --- End diff -- The overhead of splitting the items into small chunks is MUCH lower than that in ExternalMerger, so this limit is smaller than ExternalMerger (0.5 0.9). In the worst cases, it will yield all the items after every `batch` number of items. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/1460#discussion_r15306941 --- Diff: python/pyspark/shuffle.py --- @@ -0,0 +1,378 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the License); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an AS IS BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import sys +import platform +import shutil +import warnings + +from pyspark.serializers import BatchedSerializer, PickleSerializer + +try: +import psutil + +def get_used_memory(): +self = psutil.Process(os.getpid()) +return self.memory_info().rss 20 + +except ImportError: + +def get_used_memory(): +if platform.system() == 'Linux': +for line in open('/proc/self/status'): +if line.startswith('VmRSS:'): +return int(line.split()[1]) 10 +else: +warnings.warn(please install psutil to get accurate memory usage) +if platform.system() == Darwin: +import resource +return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss 20 +# TODO: support windows +return 0 + + +class Aggregator(object): + +def __init__(self, creator, combiner, mergeCombiner=None): +self.creator = creator +self.combiner = combiner +self.mergeCombiner = mergeCombiner or combiner + + +class Merger(object): + + +merge shuffled data together by aggregator + + +def __init__(self, aggregator): +self.agg = aggregator + +def combine(self, iterator): + combine the items by creator and combiner +raise NotImplementedError + +def merge(self, iterator): + merge the combined items by mergeCombiner +raise NotImplementedError + +def iteritems(self): + return the merged items ad iterator +raise NotImplementedError + + +class InMemoryMerger(Merger): + + +In memory merger based on in-memory dict. + + +def __init__(self, aggregator): +Merger.__init__(self, aggregator) +self.data = {} + +def combine(self, iterator): + combine the items by creator and combiner +# speed up attributes lookup +d, creator, comb = self.data, self.agg.creator, self.agg.combiner +for k, v in iterator: +d[k] = comb(d[k], v) if k in d else creator(v) + +def merge(self, iterator): + merge the combined items by mergeCombiner +# speed up attributes lookup +d, comb = self.data, self.agg.mergeCombiner +for k, v in iterator: +d[k] = comb(d[k], v) if k in d else v + +def iteritems(self): + return the merged items ad iterator +return self.data.iteritems() + + +class ExternalMerger(Merger): + + +External merger will dump the aggregated data into disks when +memory usage goes above the limit, then merge them together. + +This class works as follows: + +- It repeatedly combine the items and save them in one dict in + memory. + +- When the used memory goes above memory limit, it will split + the combined data into partitions by hash code, dump them + into disk, one file per partition. + +- Then it goes through the rest of the iterator, combine items + into different dict by hash. Until the used memory goes over + memory limit, it dump all the dicts into disks, one file per + dict. Repeat this again until combine all the items. + +- Before return any items, it will load each partition and + combine them seperately. Yield them before loading next + partition. + +- During loading a partition, if the memory goes over limit, + it will partition the loaded data and dump them into disks +
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1460#issuecomment-49917537 QA tests have started for PR 1460. This patch merges cleanly. brView progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17053/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1460#issuecomment-49917693 QA results for PR 1460:br- This patch FAILED unit tests.br- This patch merges cleanlybr- This patch adds the following public classes (experimental):brclass AutoSerializer(FramedSerializer):brclass Aggregator(object):brclass SimpleAggregator(Aggregator):brclass Merger(object):brclass InMemoryMerger(Merger):brclass ExternalMerger(Merger):brbrFor more information see test ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17043/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user davies commented on the pull request: https://github.com/apache/spark/pull/1460#issuecomment-49927437 The last commit has fixed the tests, should run it again. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user mateiz commented on the pull request: https://github.com/apache/spark/pull/1460#issuecomment-49928790 Ah never mind. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user mateiz commented on the pull request: https://github.com/apache/spark/pull/1460#issuecomment-49928696 Looks like the latest tested code has an error in the test suite: ``` Running PySpark tests. Output is in python/unit-tests.log. Running test: pyspark/rdd.py File pyspark/rdd.py, line 1239 def add_shuffle_key(split, iterator): ^ SyntaxError: invalid syntax Had test failures; see logs. ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user mateiz commented on the pull request: https://github.com/apache/spark/pull/1460#issuecomment-49928744 Ah NM. Jenkins, test this please. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1460#issuecomment-49930734 QA results for PR 1460:br- This patch PASSES unit tests.br- This patch merges cleanlybr- This patch adds the following public classes (experimental):brclass AutoSerializer(FramedSerializer):brclass Aggregator(object):brclass SimpleAggregator(Aggregator):brclass Merger(object):brclass InMemoryMerger(Merger):brclass ExternalMerger(Merger):brbrFor more information see test ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17053/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1460#discussion_r15325695 --- Diff: python/pyspark/shuffle.py --- @@ -0,0 +1,433 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the License); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an AS IS BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import sys +import platform +import shutil +import warnings + +from pyspark.serializers import BatchedSerializer, PickleSerializer + +try: +import psutil + +def get_used_memory(): + Return the used memory in MB +self = psutil.Process(os.getpid()) +return self.memory_info().rss 20 --- End diff -- Unfortunately memory_info only works in psutil 2.0. I tried the Anaconda Python distribution on Mac, which has psutil 1.2.1, and it doesn't work there. In there you have to use get_memory_info() instead. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1460#discussion_r15325703 --- Diff: python/pyspark/shuffle.py --- @@ -0,0 +1,433 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the License); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an AS IS BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import sys +import platform +import shutil +import warnings + +from pyspark.serializers import BatchedSerializer, PickleSerializer + +try: +import psutil + +def get_used_memory(): + Return the used memory in MB +self = psutil.Process(os.getpid()) +return self.memory_info().rss 20 --- End diff -- Also, maybe don't call the process self, it's kind of confusing since it sounds like a this object --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1460#issuecomment-49967247 QA tests have started for PR 1460. This patch merges cleanly. brView progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17094/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1460#issuecomment-49969141 QA results for PR 1460:br- This patch PASSES unit tests.br- This patch merges cleanlybr- This patch adds the following public classes (experimental):brclass AutoSerializer(FramedSerializer):brclass Aggregator(object):brclass SimpleAggregator(Aggregator):brclass Merger(object):brclass InMemoryMerger(Merger):brclass ExternalMerger(Merger):brbrFor more information see test ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17094/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1460#issuecomment-49709494 QA tests have started for PR 1460. This patch merges cleanly. brView progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16957/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1460#issuecomment-49718092 QA results for PR 1460:br- This patch PASSES unit tests.br- This patch merges cleanlybr- This patch adds the following public classes (experimental):brclass AutoSerializer(FramedSerializer):brclass Aggregator(object):brclass Merger(object):brclass InMemoryMerger(Merger):brclass ExternalMerger(Merger):brbrFor more information see test ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16957/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1460#issuecomment-49774956 QA tests have started for PR 1460. This patch merges cleanly. brView progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16975/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1460#issuecomment-49788328 QA results for PR 1460:br- This patch FAILED unit tests.br- This patch merges cleanlybr- This patch adds the following public classes (experimental):brclass AutoSerializer(FramedSerializer):brclass Aggregator(object):brclass Merger(object):brclass InMemoryMerger(Merger):brclass ExternalMerger(Merger):brbrFor more information see test ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16975/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1460#issuecomment-49805468 QA tests have started for PR 1460. This patch merges cleanly. brView progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16991/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1460#issuecomment-49808272 QA tests have started for PR 1460. This patch merges cleanly. brView progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16993/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1460#issuecomment-49814922 QA results for PR 1460:br- This patch PASSES unit tests.br- This patch merges cleanlybr- This patch adds the following public classes (experimental):brclass AutoSerializer(FramedSerializer):brclass Aggregator(object):brclass Merger(object):brclass InMemoryMerger(Merger):brclass ExternalMerger(Merger):brbrFor more information see test ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16991/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1460#discussion_r15263569 --- Diff: python/pyspark/shuffle.py --- @@ -0,0 +1,378 @@ +# --- End diff -- Since this is a new internal file, also add it to the exclude section of python/epydoc.conf so it does not show up in the docs --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1460#discussion_r15263605 --- Diff: python/pyspark/shuffle.py --- @@ -0,0 +1,258 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the License); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an AS IS BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import sys +import platform +import shutil +import warnings + +from pyspark.serializers import BatchedSerializer, AutoSerializer + +try: +import psutil + +def get_used_memory(): +self = psutil.Process(os.getpid()) +return self.memory_info().rss 20 + +except ImportError: + +def get_used_memory(): +if platform.system() == 'Linux': +for line in open('/proc/self/status'): +if line.startswith('VmRSS:'): +return int(line.split()[1]) 10 +else: +warnings.warn(please install psutil to get accurate memory usage) +if platform.system() == Darwin: +import resource +return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss 20 +# TODO: support windows --- End diff -- Alright, then you should open a JIRA saying that this is not supported on Windows, and maybe print a warning in spill() if the OS is Windows. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1460#discussion_r15263661 --- Diff: python/pyspark/shuffle.py --- @@ -0,0 +1,378 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the License); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an AS IS BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import sys +import platform +import shutil +import warnings + +from pyspark.serializers import BatchedSerializer, PickleSerializer + +try: +import psutil + +def get_used_memory(): +self = psutil.Process(os.getpid()) +return self.memory_info().rss 20 + +except ImportError: + +def get_used_memory(): +if platform.system() == 'Linux': +for line in open('/proc/self/status'): +if line.startswith('VmRSS:'): +return int(line.split()[1]) 10 +else: +warnings.warn(please install psutil to get accurate memory usage) +if platform.system() == Darwin: +import resource +return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss 20 +# TODO: support windows +return 0 + + +class Aggregator(object): + +def __init__(self, creator, combiner, mergeCombiner=None): +self.creator = creator +self.combiner = combiner +self.mergeCombiner = mergeCombiner or combiner --- End diff -- Call the fields of this createCombiner, mergeValue and mergeCombiners, same as in Scala. Also explain what it means for mergeCombiners to be None. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1460#discussion_r15264180 --- Diff: python/pyspark/shuffle.py --- @@ -0,0 +1,378 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the License); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an AS IS BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import sys +import platform +import shutil +import warnings + +from pyspark.serializers import BatchedSerializer, PickleSerializer + +try: +import psutil + +def get_used_memory(): +self = psutil.Process(os.getpid()) +return self.memory_info().rss 20 + +except ImportError: + +def get_used_memory(): +if platform.system() == 'Linux': +for line in open('/proc/self/status'): +if line.startswith('VmRSS:'): +return int(line.split()[1]) 10 +else: +warnings.warn(please install psutil to get accurate memory usage) +if platform.system() == Darwin: +import resource +return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss 20 +# TODO: support windows +return 0 + + +class Aggregator(object): + +def __init__(self, creator, combiner, mergeCombiner=None): +self.creator = creator +self.combiner = combiner +self.mergeCombiner = mergeCombiner or combiner + + +class Merger(object): + + +merge shuffled data together by aggregator + + +def __init__(self, aggregator): +self.agg = aggregator + +def combine(self, iterator): + combine the items by creator and combiner +raise NotImplementedError + +def merge(self, iterator): + merge the combined items by mergeCombiner +raise NotImplementedError + +def iteritems(self): + return the merged items ad iterator +raise NotImplementedError + + +class InMemoryMerger(Merger): + + +In memory merger based on in-memory dict. + + +def __init__(self, aggregator): +Merger.__init__(self, aggregator) +self.data = {} + +def combine(self, iterator): + combine the items by creator and combiner +# speed up attributes lookup +d, creator, comb = self.data, self.agg.creator, self.agg.combiner +for k, v in iterator: +d[k] = comb(d[k], v) if k in d else creator(v) + +def merge(self, iterator): + merge the combined items by mergeCombiner +# speed up attributes lookup +d, comb = self.data, self.agg.mergeCombiner +for k, v in iterator: +d[k] = comb(d[k], v) if k in d else v + +def iteritems(self): + return the merged items ad iterator +return self.data.iteritems() + + +class ExternalMerger(Merger): + + +External merger will dump the aggregated data into disks when +memory usage goes above the limit, then merge them together. + +This class works as follows: + +- It repeatedly combine the items and save them in one dict in + memory. + +- When the used memory goes above memory limit, it will split + the combined data into partitions by hash code, dump them + into disk, one file per partition. + +- Then it goes through the rest of the iterator, combine items + into different dict by hash. Until the used memory goes over + memory limit, it dump all the dicts into disks, one file per + dict. Repeat this again until combine all the items. --- End diff -- Just to be clear, this recursing happens at most one time, right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1460#discussion_r15264236 --- Diff: python/pyspark/shuffle.py --- @@ -0,0 +1,378 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the License); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an AS IS BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import sys +import platform +import shutil +import warnings + +from pyspark.serializers import BatchedSerializer, PickleSerializer + +try: +import psutil + +def get_used_memory(): +self = psutil.Process(os.getpid()) +return self.memory_info().rss 20 + +except ImportError: + +def get_used_memory(): +if platform.system() == 'Linux': +for line in open('/proc/self/status'): +if line.startswith('VmRSS:'): +return int(line.split()[1]) 10 +else: +warnings.warn(please install psutil to get accurate memory usage) +if platform.system() == Darwin: +import resource +return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss 20 +# TODO: support windows +return 0 + + +class Aggregator(object): + +def __init__(self, creator, combiner, mergeCombiner=None): +self.creator = creator +self.combiner = combiner +self.mergeCombiner = mergeCombiner or combiner + + +class Merger(object): + + +merge shuffled data together by aggregator + + +def __init__(self, aggregator): +self.agg = aggregator + +def combine(self, iterator): + combine the items by creator and combiner +raise NotImplementedError + +def merge(self, iterator): + merge the combined items by mergeCombiner +raise NotImplementedError + +def iteritems(self): + return the merged items ad iterator +raise NotImplementedError + + +class InMemoryMerger(Merger): + + +In memory merger based on in-memory dict. + + +def __init__(self, aggregator): +Merger.__init__(self, aggregator) +self.data = {} + +def combine(self, iterator): + combine the items by creator and combiner +# speed up attributes lookup +d, creator, comb = self.data, self.agg.creator, self.agg.combiner +for k, v in iterator: +d[k] = comb(d[k], v) if k in d else creator(v) + +def merge(self, iterator): + merge the combined items by mergeCombiner +# speed up attributes lookup +d, comb = self.data, self.agg.mergeCombiner +for k, v in iterator: +d[k] = comb(d[k], v) if k in d else v + +def iteritems(self): + return the merged items ad iterator +return self.data.iteritems() + + +class ExternalMerger(Merger): + + +External merger will dump the aggregated data into disks when +memory usage goes above the limit, then merge them together. + +This class works as follows: + +- It repeatedly combine the items and save them in one dict in + memory. + +- When the used memory goes above memory limit, it will split + the combined data into partitions by hash code, dump them + into disk, one file per partition. + +- Then it goes through the rest of the iterator, combine items + into different dict by hash. Until the used memory goes over + memory limit, it dump all the dicts into disks, one file per + dict. Repeat this again until combine all the items. + +- Before return any items, it will load each partition and + combine them seperately. Yield them before loading next + partition. + +- During loading a partition, if the memory goes over limit, + it will partition the loaded data and dump them into disks +
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1460#issuecomment-49816795 QA results for PR 1460:br- This patch PASSES unit tests.br- This patch merges cleanlybr- This patch adds the following public classes (experimental):brclass AutoSerializer(FramedSerializer):brclass Aggregator(object):brclass Merger(object):brclass InMemoryMerger(Merger):brclass ExternalMerger(Merger):brbrFor more information see test ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16993/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1460#discussion_r15264473 --- Diff: python/pyspark/shuffle.py --- @@ -0,0 +1,378 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the License); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an AS IS BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import sys +import platform +import shutil +import warnings + +from pyspark.serializers import BatchedSerializer, PickleSerializer + +try: +import psutil + +def get_used_memory(): +self = psutil.Process(os.getpid()) +return self.memory_info().rss 20 + +except ImportError: + +def get_used_memory(): +if platform.system() == 'Linux': +for line in open('/proc/self/status'): +if line.startswith('VmRSS:'): +return int(line.split()[1]) 10 +else: +warnings.warn(please install psutil to get accurate memory usage) +if platform.system() == Darwin: +import resource +return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss 20 +# TODO: support windows +return 0 + + +class Aggregator(object): + +def __init__(self, creator, combiner, mergeCombiner=None): +self.creator = creator +self.combiner = combiner +self.mergeCombiner = mergeCombiner or combiner + + +class Merger(object): + + +merge shuffled data together by aggregator + + +def __init__(self, aggregator): +self.agg = aggregator + +def combine(self, iterator): + combine the items by creator and combiner +raise NotImplementedError + +def merge(self, iterator): + merge the combined items by mergeCombiner +raise NotImplementedError + +def iteritems(self): + return the merged items ad iterator +raise NotImplementedError + + +class InMemoryMerger(Merger): + + +In memory merger based on in-memory dict. + + +def __init__(self, aggregator): +Merger.__init__(self, aggregator) +self.data = {} + +def combine(self, iterator): + combine the items by creator and combiner +# speed up attributes lookup +d, creator, comb = self.data, self.agg.creator, self.agg.combiner +for k, v in iterator: +d[k] = comb(d[k], v) if k in d else creator(v) + +def merge(self, iterator): + merge the combined items by mergeCombiner +# speed up attributes lookup +d, comb = self.data, self.agg.mergeCombiner +for k, v in iterator: +d[k] = comb(d[k], v) if k in d else v + +def iteritems(self): + return the merged items ad iterator +return self.data.iteritems() + + +class ExternalMerger(Merger): + + +External merger will dump the aggregated data into disks when +memory usage goes above the limit, then merge them together. + +This class works as follows: + +- It repeatedly combine the items and save them in one dict in + memory. + +- When the used memory goes above memory limit, it will split + the combined data into partitions by hash code, dump them + into disk, one file per partition. + +- Then it goes through the rest of the iterator, combine items + into different dict by hash. Until the used memory goes over + memory limit, it dump all the dicts into disks, one file per + dict. Repeat this again until combine all the items. + +- Before return any items, it will load each partition and + combine them seperately. Yield them before loading next + partition. + +- During loading a partition, if the memory goes over limit, + it will partition the loaded data and dump them into disks +
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1460#discussion_r15264487 --- Diff: python/pyspark/tests.py --- @@ -47,6 +48,64 @@ SPARK_HOME = os.environ[SPARK_HOME] +class TestMerger(unittest.TestCase): + +def setUp(self): +self.N = 1 16 +self.l = [i for i in xrange(self.N)] +self.data = zip(self.l, self.l) +self.agg = Aggregator(lambda x: [x], +lambda x, y: x.append(y) or x, +lambda x, y: x.extend(y) or x) +ExternalMerger.PARTITIONS = 8 +ExternalMerger.BATCH = 1 14 --- End diff -- These are never cleared, so they might affect future tests. It's better to make them arguments to the constructor of ExternalMerger. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1460#discussion_r15263760 --- Diff: python/pyspark/shuffle.py --- @@ -0,0 +1,378 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the License); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an AS IS BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import sys +import platform +import shutil +import warnings + +from pyspark.serializers import BatchedSerializer, PickleSerializer + +try: +import psutil + +def get_used_memory(): +self = psutil.Process(os.getpid()) +return self.memory_info().rss 20 + +except ImportError: + +def get_used_memory(): +if platform.system() == 'Linux': +for line in open('/proc/self/status'): +if line.startswith('VmRSS:'): +return int(line.split()[1]) 10 +else: +warnings.warn(please install psutil to get accurate memory usage) +if platform.system() == Darwin: +import resource +return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss 20 +# TODO: support windows +return 0 + + +class Aggregator(object): + +def __init__(self, creator, combiner, mergeCombiner=None): +self.creator = creator +self.combiner = combiner +self.mergeCombiner = mergeCombiner or combiner + + +class Merger(object): + + +merge shuffled data together by aggregator + + +def __init__(self, aggregator): +self.agg = aggregator + +def combine(self, iterator): --- End diff -- Call this combineValues --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1460#discussion_r15263824 --- Diff: python/pyspark/shuffle.py --- @@ -0,0 +1,378 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the License); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an AS IS BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import sys +import platform +import shutil +import warnings + +from pyspark.serializers import BatchedSerializer, PickleSerializer + +try: +import psutil + +def get_used_memory(): +self = psutil.Process(os.getpid()) +return self.memory_info().rss 20 + +except ImportError: + +def get_used_memory(): +if platform.system() == 'Linux': +for line in open('/proc/self/status'): +if line.startswith('VmRSS:'): +return int(line.split()[1]) 10 +else: +warnings.warn(please install psutil to get accurate memory usage) +if platform.system() == Darwin: +import resource +return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss 20 +# TODO: support windows +return 0 + + +class Aggregator(object): + +def __init__(self, creator, combiner, mergeCombiner=None): +self.creator = creator +self.combiner = combiner +self.mergeCombiner = mergeCombiner or combiner + + +class Merger(object): + + +merge shuffled data together by aggregator + + +def __init__(self, aggregator): +self.agg = aggregator + +def combine(self, iterator): + combine the items by creator and combiner +raise NotImplementedError + +def merge(self, iterator): --- End diff -- Call this combineCombiners --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1460#discussion_r15264608 --- Diff: python/pyspark/shuffle.py --- @@ -0,0 +1,378 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the License); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an AS IS BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import sys +import platform +import shutil +import warnings + +from pyspark.serializers import BatchedSerializer, PickleSerializer + +try: +import psutil + +def get_used_memory(): +self = psutil.Process(os.getpid()) +return self.memory_info().rss 20 + +except ImportError: + +def get_used_memory(): +if platform.system() == 'Linux': +for line in open('/proc/self/status'): +if line.startswith('VmRSS:'): +return int(line.split()[1]) 10 +else: +warnings.warn(please install psutil to get accurate memory usage) +if platform.system() == Darwin: +import resource +return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss 20 +# TODO: support windows +return 0 + + +class Aggregator(object): + +def __init__(self, creator, combiner, mergeCombiner=None): +self.creator = creator +self.combiner = combiner +self.mergeCombiner = mergeCombiner or combiner + + +class Merger(object): + + +merge shuffled data together by aggregator + + +def __init__(self, aggregator): +self.agg = aggregator + +def combine(self, iterator): + combine the items by creator and combiner +raise NotImplementedError + +def merge(self, iterator): + merge the combined items by mergeCombiner +raise NotImplementedError + +def iteritems(self): + return the merged items ad iterator +raise NotImplementedError + + +class InMemoryMerger(Merger): + + +In memory merger based on in-memory dict. + + +def __init__(self, aggregator): +Merger.__init__(self, aggregator) +self.data = {} + +def combine(self, iterator): + combine the items by creator and combiner +# speed up attributes lookup +d, creator, comb = self.data, self.agg.creator, self.agg.combiner +for k, v in iterator: +d[k] = comb(d[k], v) if k in d else creator(v) + +def merge(self, iterator): + merge the combined items by mergeCombiner +# speed up attributes lookup +d, comb = self.data, self.agg.mergeCombiner +for k, v in iterator: +d[k] = comb(d[k], v) if k in d else v + +def iteritems(self): + return the merged items ad iterator +return self.data.iteritems() + + +class ExternalMerger(Merger): + + +External merger will dump the aggregated data into disks when +memory usage goes above the limit, then merge them together. + +This class works as follows: + +- It repeatedly combine the items and save them in one dict in + memory. + +- When the used memory goes above memory limit, it will split + the combined data into partitions by hash code, dump them + into disk, one file per partition. + +- Then it goes through the rest of the iterator, combine items + into different dict by hash. Until the used memory goes over + memory limit, it dump all the dicts into disks, one file per + dict. Repeat this again until combine all the items. + +- Before return any items, it will load each partition and + combine them seperately. Yield them before loading next + partition. + +- During loading a partition, if the memory goes over limit, + it will partition the loaded data and dump them into disks +
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1460#discussion_r15264616 --- Diff: python/pyspark/shuffle.py --- @@ -0,0 +1,378 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the License); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an AS IS BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import sys +import platform +import shutil +import warnings + +from pyspark.serializers import BatchedSerializer, PickleSerializer + +try: +import psutil + +def get_used_memory(): +self = psutil.Process(os.getpid()) +return self.memory_info().rss 20 + +except ImportError: + +def get_used_memory(): +if platform.system() == 'Linux': +for line in open('/proc/self/status'): +if line.startswith('VmRSS:'): +return int(line.split()[1]) 10 +else: +warnings.warn(please install psutil to get accurate memory usage) +if platform.system() == Darwin: +import resource +return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss 20 +# TODO: support windows +return 0 + + +class Aggregator(object): + +def __init__(self, creator, combiner, mergeCombiner=None): +self.creator = creator +self.combiner = combiner +self.mergeCombiner = mergeCombiner or combiner + + +class Merger(object): + + +merge shuffled data together by aggregator + + +def __init__(self, aggregator): +self.agg = aggregator + +def combine(self, iterator): + combine the items by creator and combiner +raise NotImplementedError + +def merge(self, iterator): + merge the combined items by mergeCombiner +raise NotImplementedError + +def iteritems(self): + return the merged items ad iterator +raise NotImplementedError + + +class InMemoryMerger(Merger): + + +In memory merger based on in-memory dict. + + +def __init__(self, aggregator): +Merger.__init__(self, aggregator) +self.data = {} + +def combine(self, iterator): + combine the items by creator and combiner +# speed up attributes lookup +d, creator, comb = self.data, self.agg.creator, self.agg.combiner +for k, v in iterator: +d[k] = comb(d[k], v) if k in d else creator(v) + +def merge(self, iterator): + merge the combined items by mergeCombiner +# speed up attributes lookup +d, comb = self.data, self.agg.mergeCombiner +for k, v in iterator: +d[k] = comb(d[k], v) if k in d else v + +def iteritems(self): + return the merged items ad iterator +return self.data.iteritems() + + +class ExternalMerger(Merger): + + +External merger will dump the aggregated data into disks when +memory usage goes above the limit, then merge them together. + +This class works as follows: + +- It repeatedly combine the items and save them in one dict in + memory. + +- When the used memory goes above memory limit, it will split + the combined data into partitions by hash code, dump them + into disk, one file per partition. + +- Then it goes through the rest of the iterator, combine items + into different dict by hash. Until the used memory goes over + memory limit, it dump all the dicts into disks, one file per + dict. Repeat this again until combine all the items. + +- Before return any items, it will load each partition and + combine them seperately. Yield them before loading next + partition. + +- During loading a partition, if the memory goes over limit, + it will partition the loaded data and dump them into disks +
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1460#discussion_r15264650 --- Diff: python/pyspark/shuffle.py --- @@ -0,0 +1,378 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the License); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an AS IS BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import sys +import platform +import shutil +import warnings + +from pyspark.serializers import BatchedSerializer, PickleSerializer + +try: +import psutil + +def get_used_memory(): +self = psutil.Process(os.getpid()) +return self.memory_info().rss 20 + +except ImportError: + +def get_used_memory(): +if platform.system() == 'Linux': +for line in open('/proc/self/status'): +if line.startswith('VmRSS:'): +return int(line.split()[1]) 10 +else: +warnings.warn(please install psutil to get accurate memory usage) +if platform.system() == Darwin: +import resource +return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss 20 +# TODO: support windows +return 0 + + +class Aggregator(object): + +def __init__(self, creator, combiner, mergeCombiner=None): +self.creator = creator +self.combiner = combiner +self.mergeCombiner = mergeCombiner or combiner + + +class Merger(object): + + +merge shuffled data together by aggregator + + +def __init__(self, aggregator): +self.agg = aggregator + +def combine(self, iterator): + combine the items by creator and combiner +raise NotImplementedError + +def merge(self, iterator): + merge the combined items by mergeCombiner +raise NotImplementedError + +def iteritems(self): + return the merged items ad iterator +raise NotImplementedError + + +class InMemoryMerger(Merger): + + +In memory merger based on in-memory dict. + + +def __init__(self, aggregator): +Merger.__init__(self, aggregator) +self.data = {} + +def combine(self, iterator): + combine the items by creator and combiner +# speed up attributes lookup +d, creator, comb = self.data, self.agg.creator, self.agg.combiner +for k, v in iterator: +d[k] = comb(d[k], v) if k in d else creator(v) + +def merge(self, iterator): + merge the combined items by mergeCombiner +# speed up attributes lookup +d, comb = self.data, self.agg.mergeCombiner +for k, v in iterator: +d[k] = comb(d[k], v) if k in d else v + +def iteritems(self): + return the merged items ad iterator +return self.data.iteritems() + + +class ExternalMerger(Merger): + + +External merger will dump the aggregated data into disks when +memory usage goes above the limit, then merge them together. + +This class works as follows: + +- It repeatedly combine the items and save them in one dict in + memory. + +- When the used memory goes above memory limit, it will split + the combined data into partitions by hash code, dump them + into disk, one file per partition. + +- Then it goes through the rest of the iterator, combine items + into different dict by hash. Until the used memory goes over + memory limit, it dump all the dicts into disks, one file per + dict. Repeat this again until combine all the items. + +- Before return any items, it will load each partition and + combine them seperately. Yield them before loading next + partition. + +- During loading a partition, if the memory goes over limit, + it will partition the loaded data and dump them into disks +
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/1460#discussion_r15266298 --- Diff: python/pyspark/shuffle.py --- @@ -0,0 +1,258 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the License); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an AS IS BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import sys +import platform +import shutil +import warnings + +from pyspark.serializers import BatchedSerializer, AutoSerializer + +try: +import psutil + +def get_used_memory(): +self = psutil.Process(os.getpid()) +return self.memory_info().rss 20 + +except ImportError: + +def get_used_memory(): +if platform.system() == 'Linux': +for line in open('/proc/self/status'): +if line.startswith('VmRSS:'): +return int(line.split()[1]) 10 +else: +warnings.warn(please install psutil to get accurate memory usage) +if platform.system() == Darwin: +import resource +return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss 20 +# TODO: support windows --- End diff -- We already have an warning to tell people that they should install psutil then they got spilling. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/1460#discussion_r15266361 --- Diff: python/pyspark/shuffle.py --- @@ -0,0 +1,378 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the License); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an AS IS BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import sys +import platform +import shutil +import warnings + +from pyspark.serializers import BatchedSerializer, PickleSerializer + +try: +import psutil + +def get_used_memory(): +self = psutil.Process(os.getpid()) +return self.memory_info().rss 20 + +except ImportError: + +def get_used_memory(): +if platform.system() == 'Linux': +for line in open('/proc/self/status'): +if line.startswith('VmRSS:'): +return int(line.split()[1]) 10 +else: +warnings.warn(please install psutil to get accurate memory usage) +if platform.system() == Darwin: +import resource +return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss 20 +# TODO: support windows +return 0 + + +class Aggregator(object): + +def __init__(self, creator, combiner, mergeCombiner=None): +self.creator = creator +self.combiner = combiner +self.mergeCombiner = mergeCombiner or combiner + + +class Merger(object): + + +merge shuffled data together by aggregator + + +def __init__(self, aggregator): +self.agg = aggregator + +def combine(self, iterator): + combine the items by creator and combiner +raise NotImplementedError + +def merge(self, iterator): + merge the combined items by mergeCombiner +raise NotImplementedError + +def iteritems(self): + return the merged items ad iterator +raise NotImplementedError + + +class InMemoryMerger(Merger): + + +In memory merger based on in-memory dict. + + +def __init__(self, aggregator): +Merger.__init__(self, aggregator) +self.data = {} + +def combine(self, iterator): + combine the items by creator and combiner +# speed up attributes lookup +d, creator, comb = self.data, self.agg.creator, self.agg.combiner +for k, v in iterator: +d[k] = comb(d[k], v) if k in d else creator(v) + +def merge(self, iterator): + merge the combined items by mergeCombiner +# speed up attributes lookup +d, comb = self.data, self.agg.mergeCombiner +for k, v in iterator: +d[k] = comb(d[k], v) if k in d else v + +def iteritems(self): + return the merged items ad iterator +return self.data.iteritems() + + +class ExternalMerger(Merger): + + +External merger will dump the aggregated data into disks when +memory usage goes above the limit, then merge them together. + +This class works as follows: + +- It repeatedly combine the items and save them in one dict in + memory. + +- When the used memory goes above memory limit, it will split + the combined data into partitions by hash code, dump them + into disk, one file per partition. + +- Then it goes through the rest of the iterator, combine items + into different dict by hash. Until the used memory goes over + memory limit, it dump all the dicts into disks, one file per + dict. Repeat this again until combine all the items. + +- Before return any items, it will load each partition and + combine them seperately. Yield them before loading next + partition. + +- During loading a partition, if the memory goes over limit, + it will partition the loaded data and dump them into disks +
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/1460#discussion_r15266513 --- Diff: python/pyspark/shuffle.py --- @@ -0,0 +1,378 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the License); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an AS IS BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import sys +import platform +import shutil +import warnings + +from pyspark.serializers import BatchedSerializer, PickleSerializer + +try: +import psutil + +def get_used_memory(): +self = psutil.Process(os.getpid()) +return self.memory_info().rss 20 + +except ImportError: + +def get_used_memory(): +if platform.system() == 'Linux': +for line in open('/proc/self/status'): +if line.startswith('VmRSS:'): +return int(line.split()[1]) 10 +else: +warnings.warn(please install psutil to get accurate memory usage) +if platform.system() == Darwin: +import resource +return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss 20 +# TODO: support windows +return 0 + + +class Aggregator(object): + +def __init__(self, creator, combiner, mergeCombiner=None): +self.creator = creator +self.combiner = combiner +self.mergeCombiner = mergeCombiner or combiner + + +class Merger(object): + + +merge shuffled data together by aggregator + + +def __init__(self, aggregator): +self.agg = aggregator + +def combine(self, iterator): + combine the items by creator and combiner +raise NotImplementedError + +def merge(self, iterator): + merge the combined items by mergeCombiner +raise NotImplementedError + +def iteritems(self): + return the merged items ad iterator +raise NotImplementedError + + +class InMemoryMerger(Merger): + + +In memory merger based on in-memory dict. + + +def __init__(self, aggregator): +Merger.__init__(self, aggregator) +self.data = {} + +def combine(self, iterator): + combine the items by creator and combiner +# speed up attributes lookup +d, creator, comb = self.data, self.agg.creator, self.agg.combiner +for k, v in iterator: +d[k] = comb(d[k], v) if k in d else creator(v) + +def merge(self, iterator): + merge the combined items by mergeCombiner +# speed up attributes lookup +d, comb = self.data, self.agg.mergeCombiner +for k, v in iterator: +d[k] = comb(d[k], v) if k in d else v + +def iteritems(self): + return the merged items ad iterator +return self.data.iteritems() + + +class ExternalMerger(Merger): + + +External merger will dump the aggregated data into disks when +memory usage goes above the limit, then merge them together. + +This class works as follows: + +- It repeatedly combine the items and save them in one dict in + memory. + +- When the used memory goes above memory limit, it will split + the combined data into partitions by hash code, dump them + into disk, one file per partition. + +- Then it goes through the rest of the iterator, combine items + into different dict by hash. Until the used memory goes over + memory limit, it dump all the dicts into disks, one file per + dict. Repeat this again until combine all the items. + +- Before return any items, it will load each partition and + combine them seperately. Yield them before loading next + partition. + +- During loading a partition, if the memory goes over limit, + it will partition the loaded data and dump them into disks +
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/1460#discussion_r15266539 --- Diff: python/pyspark/tests.py --- @@ -47,6 +48,64 @@ SPARK_HOME = os.environ[SPARK_HOME] +class TestMerger(unittest.TestCase): + +def setUp(self): +self.N = 1 16 +self.l = [i for i in xrange(self.N)] +self.data = zip(self.l, self.l) +self.agg = Aggregator(lambda x: [x], +lambda x, y: x.append(y) or x, +lambda x, y: x.extend(y) or x) +ExternalMerger.PARTITIONS = 8 +ExternalMerger.BATCH = 1 14 --- End diff -- Good point! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/1460#discussion_r15266679 --- Diff: python/pyspark/shuffle.py --- @@ -0,0 +1,258 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the License); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an AS IS BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import sys +import platform +import shutil +import warnings + +from pyspark.serializers import BatchedSerializer, AutoSerializer + +try: +import psutil + +def get_used_memory(): +self = psutil.Process(os.getpid()) +return self.memory_info().rss 20 + +except ImportError: + +def get_used_memory(): +if platform.system() == 'Linux': +for line in open('/proc/self/status'): +if line.startswith('VmRSS:'): +return int(line.split()[1]) 10 +else: +warnings.warn(please install psutil to get accurate memory usage) +if platform.system() == Darwin: +import resource +return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss 20 +# TODO: support windows +return 0 + + +class Merger(object): + +merge shuffled data together by combinator + + +def merge(self, iterator): +raise NotImplementedError + +def iteritems(self): +raise NotImplementedError + + +class MapMerger(Merger): + +In memory merger based on map + + +def __init__(self, combiner): +self.combiner = combiner +self.data = {} + +def merge(self, iterator): +d, comb = self.data, self.combiner +for k, v in iter(iterator): +d[k] = comb(d[k], v) if k in d else v + +def iteritems(self): +return self.data.iteritems() + + +class ExternalHashMapMerger(Merger): + + +External merger will dump the aggregated data into disks when memory usage +is above the limit, then merge them together. + + combiner = lambda x, y:x+y + merger = ExternalHashMapMerger(combiner, 10) + N = 1 + merger.merge(zip(xrange(N), xrange(N)) * 10) + assert merger.spills 0 + sum(v for k,v in merger.iteritems()) +49995 + + +PARTITIONS = 64 +BATCH = 1 + +def __init__(self, combiner, memory_limit=512, serializer=None, +localdirs=None, scale=1): +self.combiner = combiner +self.memory_limit = memory_limit +self.serializer = serializer or\ +BatchedSerializer(AutoSerializer(), 1024) +self.localdirs = localdirs or self._get_dirs() +self.scale = scale +self.data = {} +self.pdata = [] +self.spills = 0 + +def _get_dirs(self): +path = os.environ.get(SPARK_LOCAL_DIR, /tmp/spark) +dirs = path.split(,) +localdirs = [] +for d in dirs: +d = os.path.join(d, merge, str(os.getpid())) +try: +os.makedirs(d) +localdirs.append(d) +except IOError: +pass +if not localdirs: +raise IOError(no writable directories: + path) +return localdirs + +def _get_spill_dir(self, n): +return os.path.join(self.localdirs[n % len(self.localdirs)], str(n)) + +@property +def used_memory(self): +return get_used_memory() + +@property +def next_limit(self): +return max(self.memory_limit, self.used_memory * 1.05) + +def merge(self, iterator, check=True): + merge (K,V) pair by combiner +iterator = iter(iterator) +# speedup attribute lookup +d, comb, batch = self.data, self.combiner, self.BATCH +c = 0 +for k, v in iterator: +
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/1460#discussion_r15267205 --- Diff: python/pyspark/shuffle.py --- @@ -0,0 +1,378 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the License); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an AS IS BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import sys +import platform +import shutil +import warnings + +from pyspark.serializers import BatchedSerializer, PickleSerializer + +try: +import psutil + +def get_used_memory(): +self = psutil.Process(os.getpid()) +return self.memory_info().rss 20 + +except ImportError: + +def get_used_memory(): +if platform.system() == 'Linux': +for line in open('/proc/self/status'): +if line.startswith('VmRSS:'): +return int(line.split()[1]) 10 +else: +warnings.warn(please install psutil to get accurate memory usage) +if platform.system() == Darwin: +import resource +return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss 20 +# TODO: support windows +return 0 + + +class Aggregator(object): + +def __init__(self, creator, combiner, mergeCombiner=None): +self.creator = creator +self.combiner = combiner +self.mergeCombiner = mergeCombiner or combiner + + +class Merger(object): + + +merge shuffled data together by aggregator + + +def __init__(self, aggregator): +self.agg = aggregator + +def combine(self, iterator): + combine the items by creator and combiner +raise NotImplementedError + +def merge(self, iterator): --- End diff -- Is mergeCombiners better? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/1460#discussion_r15267225 --- Diff: python/pyspark/shuffle.py --- @@ -0,0 +1,378 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the License); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an AS IS BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import sys +import platform +import shutil +import warnings + +from pyspark.serializers import BatchedSerializer, PickleSerializer + +try: +import psutil + +def get_used_memory(): +self = psutil.Process(os.getpid()) +return self.memory_info().rss 20 + +except ImportError: + +def get_used_memory(): +if platform.system() == 'Linux': +for line in open('/proc/self/status'): +if line.startswith('VmRSS:'): +return int(line.split()[1]) 10 +else: +warnings.warn(please install psutil to get accurate memory usage) +if platform.system() == Darwin: +import resource +return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss 20 +# TODO: support windows +return 0 + + +class Aggregator(object): + +def __init__(self, creator, combiner, mergeCombiner=None): +self.creator = creator +self.combiner = combiner +self.mergeCombiner = mergeCombiner or combiner + + +class Merger(object): + + +merge shuffled data together by aggregator + + +def __init__(self, aggregator): +self.agg = aggregator + +def combine(self, iterator): --- End diff -- is mergeValuers better? because this class is called Merger, so merger.mergeValues() will more readable. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1460#discussion_r15268717 --- Diff: python/pyspark/shuffle.py --- @@ -0,0 +1,378 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the License); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an AS IS BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import sys +import platform +import shutil +import warnings + +from pyspark.serializers import BatchedSerializer, PickleSerializer + +try: +import psutil + +def get_used_memory(): +self = psutil.Process(os.getpid()) +return self.memory_info().rss 20 + +except ImportError: + +def get_used_memory(): +if platform.system() == 'Linux': +for line in open('/proc/self/status'): +if line.startswith('VmRSS:'): +return int(line.split()[1]) 10 +else: +warnings.warn(please install psutil to get accurate memory usage) +if platform.system() == Darwin: +import resource +return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss 20 +# TODO: support windows +return 0 + + +class Aggregator(object): + +def __init__(self, creator, combiner, mergeCombiner=None): +self.creator = creator +self.combiner = combiner +self.mergeCombiner = mergeCombiner or combiner + + +class Merger(object): + + +merge shuffled data together by aggregator + + +def __init__(self, aggregator): +self.agg = aggregator + +def combine(self, iterator): + combine the items by creator and combiner +raise NotImplementedError + +def merge(self, iterator): + merge the combined items by mergeCombiner +raise NotImplementedError + +def iteritems(self): + return the merged items ad iterator +raise NotImplementedError + + +class InMemoryMerger(Merger): + + +In memory merger based on in-memory dict. + + +def __init__(self, aggregator): +Merger.__init__(self, aggregator) +self.data = {} + +def combine(self, iterator): + combine the items by creator and combiner +# speed up attributes lookup +d, creator, comb = self.data, self.agg.creator, self.agg.combiner +for k, v in iterator: +d[k] = comb(d[k], v) if k in d else creator(v) + +def merge(self, iterator): + merge the combined items by mergeCombiner +# speed up attributes lookup +d, comb = self.data, self.agg.mergeCombiner +for k, v in iterator: +d[k] = comb(d[k], v) if k in d else v + +def iteritems(self): + return the merged items ad iterator +return self.data.iteritems() + + +class ExternalMerger(Merger): + + +External merger will dump the aggregated data into disks when +memory usage goes above the limit, then merge them together. + +This class works as follows: + +- It repeatedly combine the items and save them in one dict in + memory. + +- When the used memory goes above memory limit, it will split + the combined data into partitions by hash code, dump them + into disk, one file per partition. + +- Then it goes through the rest of the iterator, combine items + into different dict by hash. Until the used memory goes over + memory limit, it dump all the dicts into disks, one file per + dict. Repeat this again until combine all the items. + +- Before return any items, it will load each partition and + combine them seperately. Yield them before loading next + partition. + +- During loading a partition, if the memory goes over limit, + it will partition the loaded data and dump them into disks +
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1460#discussion_r15268733 --- Diff: python/pyspark/shuffle.py --- @@ -0,0 +1,378 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the License); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an AS IS BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import sys +import platform +import shutil +import warnings + +from pyspark.serializers import BatchedSerializer, PickleSerializer + +try: +import psutil + +def get_used_memory(): +self = psutil.Process(os.getpid()) +return self.memory_info().rss 20 + +except ImportError: + +def get_used_memory(): +if platform.system() == 'Linux': +for line in open('/proc/self/status'): +if line.startswith('VmRSS:'): +return int(line.split()[1]) 10 +else: +warnings.warn(please install psutil to get accurate memory usage) +if platform.system() == Darwin: +import resource +return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss 20 +# TODO: support windows +return 0 + + +class Aggregator(object): + +def __init__(self, creator, combiner, mergeCombiner=None): +self.creator = creator +self.combiner = combiner +self.mergeCombiner = mergeCombiner or combiner + + +class Merger(object): + + +merge shuffled data together by aggregator + + +def __init__(self, aggregator): +self.agg = aggregator + +def combine(self, iterator): + combine the items by creator and combiner +raise NotImplementedError + +def merge(self, iterator): + merge the combined items by mergeCombiner +raise NotImplementedError + +def iteritems(self): + return the merged items ad iterator +raise NotImplementedError + + +class InMemoryMerger(Merger): + + +In memory merger based on in-memory dict. + + +def __init__(self, aggregator): +Merger.__init__(self, aggregator) +self.data = {} + +def combine(self, iterator): + combine the items by creator and combiner +# speed up attributes lookup +d, creator, comb = self.data, self.agg.creator, self.agg.combiner +for k, v in iterator: +d[k] = comb(d[k], v) if k in d else creator(v) + +def merge(self, iterator): + merge the combined items by mergeCombiner +# speed up attributes lookup +d, comb = self.data, self.agg.mergeCombiner +for k, v in iterator: +d[k] = comb(d[k], v) if k in d else v + +def iteritems(self): + return the merged items ad iterator +return self.data.iteritems() + + +class ExternalMerger(Merger): + + +External merger will dump the aggregated data into disks when +memory usage goes above the limit, then merge them together. + +This class works as follows: + +- It repeatedly combine the items and save them in one dict in + memory. + +- When the used memory goes above memory limit, it will split + the combined data into partitions by hash code, dump them + into disk, one file per partition. + +- Then it goes through the rest of the iterator, combine items + into different dict by hash. Until the used memory goes over + memory limit, it dump all the dicts into disks, one file per + dict. Repeat this again until combine all the items. + +- Before return any items, it will load each partition and + combine them seperately. Yield them before loading next + partition. + +- During loading a partition, if the memory goes over limit, + it will partition the loaded data and dump them into disks +
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1460#discussion_r15268727 --- Diff: python/pyspark/shuffle.py --- @@ -0,0 +1,378 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the License); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an AS IS BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import sys +import platform +import shutil +import warnings + +from pyspark.serializers import BatchedSerializer, PickleSerializer + +try: +import psutil + +def get_used_memory(): +self = psutil.Process(os.getpid()) +return self.memory_info().rss 20 + +except ImportError: + +def get_used_memory(): +if platform.system() == 'Linux': +for line in open('/proc/self/status'): +if line.startswith('VmRSS:'): +return int(line.split()[1]) 10 +else: +warnings.warn(please install psutil to get accurate memory usage) +if platform.system() == Darwin: +import resource +return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss 20 +# TODO: support windows +return 0 + + +class Aggregator(object): + +def __init__(self, creator, combiner, mergeCombiner=None): +self.creator = creator +self.combiner = combiner +self.mergeCombiner = mergeCombiner or combiner + + +class Merger(object): + + +merge shuffled data together by aggregator + + +def __init__(self, aggregator): +self.agg = aggregator + +def combine(self, iterator): + combine the items by creator and combiner +raise NotImplementedError + +def merge(self, iterator): + merge the combined items by mergeCombiner +raise NotImplementedError + +def iteritems(self): + return the merged items ad iterator +raise NotImplementedError + + +class InMemoryMerger(Merger): + + +In memory merger based on in-memory dict. + + +def __init__(self, aggregator): +Merger.__init__(self, aggregator) +self.data = {} + +def combine(self, iterator): + combine the items by creator and combiner +# speed up attributes lookup +d, creator, comb = self.data, self.agg.creator, self.agg.combiner +for k, v in iterator: +d[k] = comb(d[k], v) if k in d else creator(v) + +def merge(self, iterator): + merge the combined items by mergeCombiner +# speed up attributes lookup +d, comb = self.data, self.agg.mergeCombiner +for k, v in iterator: +d[k] = comb(d[k], v) if k in d else v + +def iteritems(self): + return the merged items ad iterator +return self.data.iteritems() + + +class ExternalMerger(Merger): + + +External merger will dump the aggregated data into disks when +memory usage goes above the limit, then merge them together. + +This class works as follows: + +- It repeatedly combine the items and save them in one dict in + memory. + +- When the used memory goes above memory limit, it will split + the combined data into partitions by hash code, dump them + into disk, one file per partition. + +- Then it goes through the rest of the iterator, combine items + into different dict by hash. Until the used memory goes over + memory limit, it dump all the dicts into disks, one file per + dict. Repeat this again until combine all the items. + +- Before return any items, it will load each partition and + combine them seperately. Yield them before loading next + partition. + +- During loading a partition, if the memory goes over limit, + it will partition the loaded data and dump them into disks +
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1460#discussion_r15268748 --- Diff: python/pyspark/shuffle.py --- @@ -0,0 +1,378 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the License); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an AS IS BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import sys +import platform +import shutil +import warnings + +from pyspark.serializers import BatchedSerializer, PickleSerializer + +try: +import psutil + +def get_used_memory(): +self = psutil.Process(os.getpid()) +return self.memory_info().rss 20 + +except ImportError: + +def get_used_memory(): +if platform.system() == 'Linux': +for line in open('/proc/self/status'): +if line.startswith('VmRSS:'): +return int(line.split()[1]) 10 +else: +warnings.warn(please install psutil to get accurate memory usage) +if platform.system() == Darwin: +import resource +return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss 20 +# TODO: support windows +return 0 + + +class Aggregator(object): + +def __init__(self, creator, combiner, mergeCombiner=None): +self.creator = creator +self.combiner = combiner +self.mergeCombiner = mergeCombiner or combiner + + +class Merger(object): + + +merge shuffled data together by aggregator + + +def __init__(self, aggregator): +self.agg = aggregator + +def combine(self, iterator): + combine the items by creator and combiner +raise NotImplementedError + +def merge(self, iterator): + merge the combined items by mergeCombiner +raise NotImplementedError + +def iteritems(self): + return the merged items ad iterator +raise NotImplementedError + + +class InMemoryMerger(Merger): + + +In memory merger based on in-memory dict. + + +def __init__(self, aggregator): +Merger.__init__(self, aggregator) +self.data = {} + +def combine(self, iterator): + combine the items by creator and combiner +# speed up attributes lookup +d, creator, comb = self.data, self.agg.creator, self.agg.combiner +for k, v in iterator: +d[k] = comb(d[k], v) if k in d else creator(v) + +def merge(self, iterator): + merge the combined items by mergeCombiner +# speed up attributes lookup +d, comb = self.data, self.agg.mergeCombiner +for k, v in iterator: +d[k] = comb(d[k], v) if k in d else v + +def iteritems(self): + return the merged items ad iterator +return self.data.iteritems() + + +class ExternalMerger(Merger): + + +External merger will dump the aggregated data into disks when +memory usage goes above the limit, then merge them together. + +This class works as follows: + +- It repeatedly combine the items and save them in one dict in + memory. + +- When the used memory goes above memory limit, it will split + the combined data into partitions by hash code, dump them + into disk, one file per partition. + +- Then it goes through the rest of the iterator, combine items + into different dict by hash. Until the used memory goes over + memory limit, it dump all the dicts into disks, one file per + dict. Repeat this again until combine all the items. + +- Before return any items, it will load each partition and + combine them seperately. Yield them before loading next + partition. + +- During loading a partition, if the memory goes over limit, + it will partition the loaded data and dump them into disks +
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1460#discussion_r15268757 --- Diff: python/pyspark/shuffle.py --- @@ -0,0 +1,416 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the License); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an AS IS BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import sys +import platform +import shutil +import warnings + +from pyspark.serializers import BatchedSerializer, PickleSerializer + +try: +import psutil + +def get_used_memory(): + return the used memory in MB +self = psutil.Process(os.getpid()) +return self.memory_info().rss 20 + +except ImportError: + +def get_used_memory(): + return the used memory in MB +if platform.system() == 'Linux': +for line in open('/proc/self/status'): +if line.startswith('VmRSS:'): +return int(line.split()[1]) 10 +else: +warnings.warn(please install psutil to have better +support with spilling) +if platform.system() == Darwin: +import resource +rss = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss +return rss 20 +# TODO: support windows +return 0 + + +class Aggregator(object): + + +Aggregator has tree functions to merge values into combiner. + +createCombiner: (value) - combiner +mergeValue: (combine, value) - combiner +mergeCombiners: (combiner, combiner) - combiner + + +def __init__(self, createCombiner, mergeValue, mergeCombiners): +self.createCombiner = createCombiner +self.mergeValue = mergeValue +self.mergeCombiners = mergeCombiners + + +class SimpleAggregator(Aggregator): + + +SimpleAggregator is useful for the cases that combiners have +same type with values + + +def __init__(self, combiner): +Aggregator.__init__(self, lambda x: x, combiner, combiner) + + +class Merger(object): + + +merge shuffled data together by aggregator + + +def __init__(self, aggregator): +self.agg = aggregator + +def mergeValues(self, iterator): + combine the items by creator and combiner +raise NotImplementedError + +def mergeCombiners(self, iterator): + merge the combined items by mergeCombiner +raise NotImplementedError + +def iteritems(self): + return the merged items ad iterator +raise NotImplementedError + + +class InMemoryMerger(Merger): + + +In memory merger based on in-memory dict. + + +def __init__(self, aggregator): +Merger.__init__(self, aggregator) +self.data = {} + +def mergeValues(self, iterator): + combine the items by creator and combiner +# speed up attributes lookup +d, creator = self.data, self.agg.createCombiner +comb = self.agg.mergeValue +for k, v in iterator: +d[k] = comb(d[k], v) if k in d else creator(v) + +def mergeCombiners(self, iterator): + merge the combined items by mergeCombiner +# speed up attributes lookup +d, comb = self.data, self.agg.mergeCombiners +for k, v in iterator: +d[k] = comb(d[k], v) if k in d else v + +def iteritems(self): + return the merged items ad iterator +return self.data.iteritems() + + +class ExternalMerger(Merger): + + +External merger will dump the aggregated data into disks when +memory usage goes above the limit, then merge them together. + +This class works as follows: + +- It repeatedly combine the items and save them in one dict in + memory. + +- When
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/1460#discussion_r15271189 --- Diff: python/pyspark/shuffle.py --- @@ -0,0 +1,416 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the License); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an AS IS BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import sys +import platform +import shutil +import warnings + +from pyspark.serializers import BatchedSerializer, PickleSerializer + +try: +import psutil + +def get_used_memory(): + return the used memory in MB +self = psutil.Process(os.getpid()) +return self.memory_info().rss 20 + +except ImportError: + +def get_used_memory(): + return the used memory in MB +if platform.system() == 'Linux': +for line in open('/proc/self/status'): +if line.startswith('VmRSS:'): +return int(line.split()[1]) 10 +else: +warnings.warn(please install psutil to have better +support with spilling) +if platform.system() == Darwin: +import resource +rss = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss +return rss 20 +# TODO: support windows +return 0 + + +class Aggregator(object): + + +Aggregator has tree functions to merge values into combiner. + +createCombiner: (value) - combiner +mergeValue: (combine, value) - combiner +mergeCombiners: (combiner, combiner) - combiner + + +def __init__(self, createCombiner, mergeValue, mergeCombiners): +self.createCombiner = createCombiner +self.mergeValue = mergeValue +self.mergeCombiners = mergeCombiners + + +class SimpleAggregator(Aggregator): + + +SimpleAggregator is useful for the cases that combiners have +same type with values + + +def __init__(self, combiner): +Aggregator.__init__(self, lambda x: x, combiner, combiner) + + +class Merger(object): + + +merge shuffled data together by aggregator + + +def __init__(self, aggregator): +self.agg = aggregator + +def mergeValues(self, iterator): + combine the items by creator and combiner +raise NotImplementedError + +def mergeCombiners(self, iterator): + merge the combined items by mergeCombiner +raise NotImplementedError + +def iteritems(self): + return the merged items ad iterator +raise NotImplementedError + + +class InMemoryMerger(Merger): + + +In memory merger based on in-memory dict. + + +def __init__(self, aggregator): +Merger.__init__(self, aggregator) +self.data = {} + +def mergeValues(self, iterator): + combine the items by creator and combiner +# speed up attributes lookup +d, creator = self.data, self.agg.createCombiner +comb = self.agg.mergeValue +for k, v in iterator: +d[k] = comb(d[k], v) if k in d else creator(v) + +def mergeCombiners(self, iterator): + merge the combined items by mergeCombiner +# speed up attributes lookup +d, comb = self.data, self.agg.mergeCombiners +for k, v in iterator: +d[k] = comb(d[k], v) if k in d else v + +def iteritems(self): + return the merged items ad iterator +return self.data.iteritems() + + +class ExternalMerger(Merger): + + +External merger will dump the aggregated data into disks when +memory usage goes above the limit, then merge them together. + +This class works as follows: + +- It repeatedly combine the items and save them in one dict in + memory. + +- When
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/1460#discussion_r15271358 --- Diff: python/pyspark/shuffle.py --- @@ -0,0 +1,378 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the License); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an AS IS BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import sys +import platform +import shutil +import warnings + +from pyspark.serializers import BatchedSerializer, PickleSerializer + +try: +import psutil + +def get_used_memory(): +self = psutil.Process(os.getpid()) +return self.memory_info().rss 20 + +except ImportError: + +def get_used_memory(): +if platform.system() == 'Linux': +for line in open('/proc/self/status'): +if line.startswith('VmRSS:'): +return int(line.split()[1]) 10 +else: +warnings.warn(please install psutil to get accurate memory usage) +if platform.system() == Darwin: +import resource +return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss 20 +# TODO: support windows +return 0 + + +class Aggregator(object): + +def __init__(self, creator, combiner, mergeCombiner=None): +self.creator = creator +self.combiner = combiner +self.mergeCombiner = mergeCombiner or combiner + + +class Merger(object): + + +merge shuffled data together by aggregator + + +def __init__(self, aggregator): +self.agg = aggregator + +def combine(self, iterator): + combine the items by creator and combiner +raise NotImplementedError + +def merge(self, iterator): + merge the combined items by mergeCombiner +raise NotImplementedError + +def iteritems(self): + return the merged items ad iterator +raise NotImplementedError + + +class InMemoryMerger(Merger): + + +In memory merger based on in-memory dict. + + +def __init__(self, aggregator): +Merger.__init__(self, aggregator) +self.data = {} + +def combine(self, iterator): + combine the items by creator and combiner +# speed up attributes lookup +d, creator, comb = self.data, self.agg.creator, self.agg.combiner +for k, v in iterator: +d[k] = comb(d[k], v) if k in d else creator(v) + +def merge(self, iterator): + merge the combined items by mergeCombiner +# speed up attributes lookup +d, comb = self.data, self.agg.mergeCombiner +for k, v in iterator: +d[k] = comb(d[k], v) if k in d else v + +def iteritems(self): + return the merged items ad iterator +return self.data.iteritems() + + +class ExternalMerger(Merger): + + +External merger will dump the aggregated data into disks when +memory usage goes above the limit, then merge them together. + +This class works as follows: + +- It repeatedly combine the items and save them in one dict in + memory. + +- When the used memory goes above memory limit, it will split + the combined data into partitions by hash code, dump them + into disk, one file per partition. + +- Then it goes through the rest of the iterator, combine items + into different dict by hash. Until the used memory goes over + memory limit, it dump all the dicts into disks, one file per + dict. Repeat this again until combine all the items. + +- Before return any items, it will load each partition and + combine them seperately. Yield them before loading next + partition. + +- During loading a partition, if the memory goes over limit, + it will partition the loaded data and dump them into disks +
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/1460#discussion_r15271374 --- Diff: python/pyspark/shuffle.py --- @@ -0,0 +1,378 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the License); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an AS IS BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import sys +import platform +import shutil +import warnings + +from pyspark.serializers import BatchedSerializer, PickleSerializer + +try: +import psutil + +def get_used_memory(): +self = psutil.Process(os.getpid()) +return self.memory_info().rss 20 + +except ImportError: + +def get_used_memory(): +if platform.system() == 'Linux': +for line in open('/proc/self/status'): +if line.startswith('VmRSS:'): +return int(line.split()[1]) 10 +else: +warnings.warn(please install psutil to get accurate memory usage) +if platform.system() == Darwin: +import resource +return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss 20 +# TODO: support windows +return 0 + + +class Aggregator(object): + +def __init__(self, creator, combiner, mergeCombiner=None): +self.creator = creator +self.combiner = combiner +self.mergeCombiner = mergeCombiner or combiner + + +class Merger(object): + + +merge shuffled data together by aggregator + + +def __init__(self, aggregator): +self.agg = aggregator + +def combine(self, iterator): + combine the items by creator and combiner +raise NotImplementedError + +def merge(self, iterator): + merge the combined items by mergeCombiner +raise NotImplementedError + +def iteritems(self): + return the merged items ad iterator +raise NotImplementedError + + +class InMemoryMerger(Merger): + + +In memory merger based on in-memory dict. + + +def __init__(self, aggregator): +Merger.__init__(self, aggregator) +self.data = {} + +def combine(self, iterator): + combine the items by creator and combiner +# speed up attributes lookup +d, creator, comb = self.data, self.agg.creator, self.agg.combiner +for k, v in iterator: +d[k] = comb(d[k], v) if k in d else creator(v) + +def merge(self, iterator): + merge the combined items by mergeCombiner +# speed up attributes lookup +d, comb = self.data, self.agg.mergeCombiner +for k, v in iterator: +d[k] = comb(d[k], v) if k in d else v + +def iteritems(self): + return the merged items ad iterator +return self.data.iteritems() + + +class ExternalMerger(Merger): + + +External merger will dump the aggregated data into disks when +memory usage goes above the limit, then merge them together. + +This class works as follows: + +- It repeatedly combine the items and save them in one dict in + memory. + +- When the used memory goes above memory limit, it will split + the combined data into partitions by hash code, dump them + into disk, one file per partition. + +- Then it goes through the rest of the iterator, combine items + into different dict by hash. Until the used memory goes over + memory limit, it dump all the dicts into disks, one file per + dict. Repeat this again until combine all the items. + +- Before return any items, it will load each partition and + combine them seperately. Yield them before loading next + partition. + +- During loading a partition, if the memory goes over limit, + it will partition the loaded data and dump them into disks +
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/1460#discussion_r15271462 --- Diff: python/pyspark/shuffle.py --- @@ -0,0 +1,378 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the License); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an AS IS BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import sys +import platform +import shutil +import warnings + +from pyspark.serializers import BatchedSerializer, PickleSerializer + +try: +import psutil + +def get_used_memory(): +self = psutil.Process(os.getpid()) +return self.memory_info().rss 20 + +except ImportError: + +def get_used_memory(): +if platform.system() == 'Linux': +for line in open('/proc/self/status'): +if line.startswith('VmRSS:'): +return int(line.split()[1]) 10 +else: +warnings.warn(please install psutil to get accurate memory usage) +if platform.system() == Darwin: +import resource +return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss 20 +# TODO: support windows +return 0 + + +class Aggregator(object): + +def __init__(self, creator, combiner, mergeCombiner=None): +self.creator = creator +self.combiner = combiner +self.mergeCombiner = mergeCombiner or combiner + + +class Merger(object): + + +merge shuffled data together by aggregator + + +def __init__(self, aggregator): +self.agg = aggregator + +def combine(self, iterator): + combine the items by creator and combiner +raise NotImplementedError + +def merge(self, iterator): + merge the combined items by mergeCombiner +raise NotImplementedError + +def iteritems(self): + return the merged items ad iterator +raise NotImplementedError + + +class InMemoryMerger(Merger): + + +In memory merger based on in-memory dict. + + +def __init__(self, aggregator): +Merger.__init__(self, aggregator) +self.data = {} + +def combine(self, iterator): + combine the items by creator and combiner +# speed up attributes lookup +d, creator, comb = self.data, self.agg.creator, self.agg.combiner +for k, v in iterator: +d[k] = comb(d[k], v) if k in d else creator(v) + +def merge(self, iterator): + merge the combined items by mergeCombiner +# speed up attributes lookup +d, comb = self.data, self.agg.mergeCombiner +for k, v in iterator: +d[k] = comb(d[k], v) if k in d else v + +def iteritems(self): + return the merged items ad iterator +return self.data.iteritems() + + +class ExternalMerger(Merger): + + +External merger will dump the aggregated data into disks when +memory usage goes above the limit, then merge them together. + +This class works as follows: + +- It repeatedly combine the items and save them in one dict in + memory. + +- When the used memory goes above memory limit, it will split + the combined data into partitions by hash code, dump them + into disk, one file per partition. + +- Then it goes through the rest of the iterator, combine items + into different dict by hash. Until the used memory goes over + memory limit, it dump all the dicts into disks, one file per + dict. Repeat this again until combine all the items. + +- Before return any items, it will load each partition and + combine them seperately. Yield them before loading next + partition. + +- During loading a partition, if the memory goes over limit, + it will partition the loaded data and dump them into disks +
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1460#issuecomment-49647944 QA tests have started for PR 1460. This patch merges cleanly. brView progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16918/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1460#discussion_r15188798 --- Diff: python/pyspark/rdd.py --- @@ -168,6 +169,18 @@ def _replaceRoot(self, value): self._sink(1) +def _parse_memory(s): --- End diff -- Add a comment to this saying it returns a number in MB --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1460#issuecomment-49658257 QA tests have started for PR 1460. This patch merges cleanly. brView progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16919/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1460#issuecomment-49660273 QA results for PR 1460:br- This patch PASSES unit tests.br- This patch merges cleanlybr- This patch adds the following public classes (experimental):brclass AutoSerializer(FramedSerializer):brclass Merger(object):brclass MapMerger(Merger):brclass ExternalHashMapMerger(Merger):brbrFor more information see test ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16918/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1460#issuecomment-49670702 QA results for PR 1460:br- This patch PASSES unit tests.br- This patch merges cleanlybr- This patch adds the following public classes (experimental):brclass AutoSerializer(FramedSerializer):brclass Merger(object):brclass MapMerger(Merger):brclass ExternalHashMapMerger(Merger):brbrFor more information see test ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16919/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1460#issuecomment-49671323 QA tests have started for PR 1460. This patch merges cleanly. brView progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16929/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1460#issuecomment-49680209 QA results for PR 1460:br- This patch PASSES unit tests.br- This patch merges cleanlybr- This patch adds the following public classes (experimental):brclass AutoSerializer(FramedSerializer):brclass Merger(object):brclass MapMerger(Merger):brclass ExternalHashMapMerger(Merger):brbrFor more information see test ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16929/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1460#discussion_r15207593 --- Diff: docs/configuration.md --- @@ -195,6 +195,15 @@ Apart from these, the following properties are also available, and may be useful Spark's dependencies and user dependencies. It is currently an experimental feature. /td /tr +tr + tdcodespark.python.worker.memory/code/td + td512m/td + td +Amount of memory to use per python worker process during aggregation, in the same +format as JVM memory strings (e.g. code512m/code, code2g/code). If the memory +used during aggregation go above this amount, it will spill the data into disks. --- End diff -- Small typo: go - goes --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1460#discussion_r15207614 --- Diff: python/pyspark/rdd.py --- @@ -1247,15 +1262,16 @@ def combineLocally(iterator): return combiners.iteritems() locally_combined = self.mapPartitions(combineLocally) shuffled = locally_combined.partitionBy(numPartitions) - + +serializer = self.ctx.serializer +spill = ((self.ctx._conf.get(spark.shuffle.spill) or 'True').lower() +in ('true', '1', 'yes')) --- End diff -- Probably just want to allow true to be similar to the way it works in Java and Scala. So just check `self.ctx._conf.get(spark.shuffle.spill, true) == true` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1460#discussion_r15207622 --- Diff: python/pyspark/rdd.py --- @@ -168,6 +169,20 @@ def _replaceRoot(self, value): self._sink(1) +def _parse_memory(s): + +It returns a number in MB --- End diff -- Say something slightly longer, e.g. Parse a memory string in the format supported by Java (e.g. 1g, 200m) and return the value in MB --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1460#discussion_r15207630 --- Diff: python/pyspark/serializers.py --- @@ -297,6 +297,33 @@ class MarshalSerializer(FramedSerializer): loads = marshal.loads +class AutoSerializer(FramedSerializer): --- End diff -- Can we actually use this by default yet or will it fail for NumPy arrays? If it won't work by default, we should use PickleSerializer instead and wait to fix this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1460#discussion_r15207645 --- Diff: python/pyspark/shuffle.py --- @@ -0,0 +1,258 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the License); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an AS IS BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import sys +import platform +import shutil +import warnings + +from pyspark.serializers import BatchedSerializer, AutoSerializer + +try: +import psutil + +def get_used_memory(): +self = psutil.Process(os.getpid()) +return self.memory_info().rss 20 + +except ImportError: + +def get_used_memory(): +if platform.system() == 'Linux': +for line in open('/proc/self/status'): +if line.startswith('VmRSS:'): +return int(line.split()[1]) 10 +else: +warnings.warn(please install psutil to get accurate memory usage) +if platform.system() == Darwin: +import resource +return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss 20 +# TODO: support windows --- End diff -- Have you looked into what it would take to support Windows? I guess right now it will keep returning 0 there and never spill? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1460#discussion_r15207668 --- Diff: python/pyspark/shuffle.py --- @@ -0,0 +1,258 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the License); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an AS IS BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import sys +import platform +import shutil +import warnings + +from pyspark.serializers import BatchedSerializer, AutoSerializer + +try: +import psutil + +def get_used_memory(): +self = psutil.Process(os.getpid()) +return self.memory_info().rss 20 + +except ImportError: + +def get_used_memory(): +if platform.system() == 'Linux': +for line in open('/proc/self/status'): +if line.startswith('VmRSS:'): +return int(line.split()[1]) 10 +else: +warnings.warn(please install psutil to get accurate memory usage) +if platform.system() == Darwin: +import resource +return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss 20 +# TODO: support windows +return 0 + + +class Merger(object): + +merge shuffled data together by combinator + + +def merge(self, iterator): +raise NotImplementedError + +def iteritems(self): +raise NotImplementedError + + +class MapMerger(Merger): + +In memory merger based on map + + +def __init__(self, combiner): +self.combiner = combiner +self.data = {} + +def merge(self, iterator): +d, comb = self.data, self.combiner +for k, v in iter(iterator): +d[k] = comb(d[k], v) if k in d else v + +def iteritems(self): +return self.data.iteritems() + + +class ExternalHashMapMerger(Merger): --- End diff -- Better to call this ExternalMerger --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1460#discussion_r15207662 --- Diff: python/pyspark/shuffle.py --- @@ -0,0 +1,258 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the License); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an AS IS BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import sys +import platform +import shutil +import warnings + +from pyspark.serializers import BatchedSerializer, AutoSerializer + +try: +import psutil + +def get_used_memory(): +self = psutil.Process(os.getpid()) +return self.memory_info().rss 20 + +except ImportError: + +def get_used_memory(): +if platform.system() == 'Linux': +for line in open('/proc/self/status'): +if line.startswith('VmRSS:'): +return int(line.split()[1]) 10 +else: +warnings.warn(please install psutil to get accurate memory usage) +if platform.system() == Darwin: +import resource +return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss 20 +# TODO: support windows +return 0 + + +class Merger(object): + +merge shuffled data together by combinator + + +def merge(self, iterator): +raise NotImplementedError + +def iteritems(self): +raise NotImplementedError + + +class MapMerger(Merger): --- End diff -- Better to call this InMemoryMerger --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1460#discussion_r15207683 --- Diff: python/pyspark/tests.py --- @@ -47,6 +48,40 @@ SPARK_HOME = os.environ[SPARK_HOME] +class TestMerger(unittest.TestCase): + +def setUp(self): +self.N = 118 +self.l = [i for i in xrange(self.N)] +self.data = zip(self.l, self.l) +ExternalHashMapMerger.PARTITIONS = 8 +ExternalHashMapMerger.BATCH = 114 + +def test_in_memory(self): +m = MapMerger(lambda x,y: x+y) +m.merge(self.data) +self.assertEqual(sum(v for k,v in m.iteritems()), sum(xrange(self.N))) --- End diff -- Put spaces around the operators (e.g. `,`, `+`, etc) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1460#discussion_r15207731 --- Diff: python/pyspark/rdd.py --- @@ -1247,15 +1262,16 @@ def combineLocally(iterator): return combiners.iteritems() locally_combined = self.mapPartitions(combineLocally) shuffled = locally_combined.partitionBy(numPartitions) - + +serializer = self.ctx.serializer +spill = ((self.ctx._conf.get(spark.shuffle.spill) or 'True').lower() +in ('true', '1', 'yes')) +memory = _parse_memory(self.ctx._conf.get(spark.python.worker.memory) or 512m) def _mergeCombiners(iterator): -combiners = {} -for (k, v) in iterator: -if k not in combiners: -combiners[k] = v -else: -combiners[k] = mergeCombiners(combiners[k], v) -return combiners.iteritems() +merger = ExternalHashMapMerger(mergeCombiners, memory, serializer)\ + if spill else MapMerger(mergeCombiners) --- End diff -- Add a space before the `\` to match our code style in other places --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1460#discussion_r15207721 --- Diff: python/pyspark/rdd.py --- @@ -1247,15 +1262,16 @@ def combineLocally(iterator): return combiners.iteritems() locally_combined = self.mapPartitions(combineLocally) shuffled = locally_combined.partitionBy(numPartitions) - + +serializer = self.ctx.serializer +spill = ((self.ctx._conf.get(spark.shuffle.spill) or 'True').lower() +in ('true', '1', 'yes')) +memory = _parse_memory(self.ctx._conf.get(spark.python.worker.memory) or 512m) def _mergeCombiners(iterator): -combiners = {} -for (k, v) in iterator: -if k not in combiners: -combiners[k] = v -else: -combiners[k] = mergeCombiners(combiners[k], v) -return combiners.iteritems() +merger = ExternalHashMapMerger(mergeCombiners, memory, serializer)\ + if spill else MapMerger(mergeCombiners) +merger.merge(iterator) +return merger.iteritems() return shuffled.mapPartitions(_mergeCombiners) --- End diff -- This only implements external merging in the reduce tasks, but we need it in the map tasks too. For that you'll need to modify the Merger interface to take `createCombiner`, `mergeValue` and `mergeCombiners` together. Please add those there and add tests for the richer interface. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1460#discussion_r15207775 --- Diff: python/pyspark/shuffle.py --- @@ -0,0 +1,258 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the License); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an AS IS BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import sys +import platform +import shutil +import warnings + +from pyspark.serializers import BatchedSerializer, AutoSerializer + +try: +import psutil + +def get_used_memory(): +self = psutil.Process(os.getpid()) +return self.memory_info().rss 20 + +except ImportError: + +def get_used_memory(): +if platform.system() == 'Linux': +for line in open('/proc/self/status'): +if line.startswith('VmRSS:'): +return int(line.split()[1]) 10 +else: +warnings.warn(please install psutil to get accurate memory usage) +if platform.system() == Darwin: +import resource +return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss 20 +# TODO: support windows +return 0 + + +class Merger(object): + +merge shuffled data together by combinator + + +def merge(self, iterator): +raise NotImplementedError + +def iteritems(self): +raise NotImplementedError + + +class MapMerger(Merger): + +In memory merger based on map + + +def __init__(self, combiner): +self.combiner = combiner +self.data = {} + +def merge(self, iterator): +d, comb = self.data, self.combiner +for k, v in iter(iterator): +d[k] = comb(d[k], v) if k in d else v + +def iteritems(self): +return self.data.iteritems() + + +class ExternalHashMapMerger(Merger): + + +External merger will dump the aggregated data into disks when memory usage +is above the limit, then merge them together. + + combiner = lambda x, y:x+y + merger = ExternalHashMapMerger(combiner, 10) + N = 1 + merger.merge(zip(xrange(N), xrange(N)) * 10) + assert merger.spills 0 + sum(v for k,v in merger.iteritems()) +49995 + + +PARTITIONS = 64 +BATCH = 1 + +def __init__(self, combiner, memory_limit=512, serializer=None, +localdirs=None, scale=1): +self.combiner = combiner +self.memory_limit = memory_limit +self.serializer = serializer or\ +BatchedSerializer(AutoSerializer(), 1024) +self.localdirs = localdirs or self._get_dirs() +self.scale = scale +self.data = {} +self.pdata = [] +self.spills = 0 + +def _get_dirs(self): +path = os.environ.get(SPARK_LOCAL_DIR, /tmp/spark) +dirs = path.split(,) +localdirs = [] +for d in dirs: +d = os.path.join(d, merge, str(os.getpid())) +try: +os.makedirs(d) +localdirs.append(d) +except IOError: +pass +if not localdirs: +raise IOError(no writable directories: + path) +return localdirs + +def _get_spill_dir(self, n): +return os.path.join(self.localdirs[n % len(self.localdirs)], str(n)) + +@property +def used_memory(self): +return get_used_memory() --- End diff -- Probably better to just call `get_used_memory()` all the time instead of making this a property, to indicate that it's expensive. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1460#discussion_r15207812 --- Diff: python/pyspark/shuffle.py --- @@ -0,0 +1,258 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the License); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an AS IS BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import sys +import platform +import shutil +import warnings + +from pyspark.serializers import BatchedSerializer, AutoSerializer + +try: +import psutil + +def get_used_memory(): +self = psutil.Process(os.getpid()) +return self.memory_info().rss 20 + +except ImportError: + +def get_used_memory(): +if platform.system() == 'Linux': +for line in open('/proc/self/status'): +if line.startswith('VmRSS:'): +return int(line.split()[1]) 10 +else: +warnings.warn(please install psutil to get accurate memory usage) +if platform.system() == Darwin: +import resource +return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss 20 +# TODO: support windows +return 0 + + +class Merger(object): + +merge shuffled data together by combinator + + +def merge(self, iterator): +raise NotImplementedError + +def iteritems(self): +raise NotImplementedError + + +class MapMerger(Merger): + +In memory merger based on map + + +def __init__(self, combiner): +self.combiner = combiner +self.data = {} + +def merge(self, iterator): +d, comb = self.data, self.combiner +for k, v in iter(iterator): +d[k] = comb(d[k], v) if k in d else v + +def iteritems(self): +return self.data.iteritems() + + +class ExternalHashMapMerger(Merger): + + +External merger will dump the aggregated data into disks when memory usage +is above the limit, then merge them together. + + combiner = lambda x, y:x+y + merger = ExternalHashMapMerger(combiner, 10) + N = 1 + merger.merge(zip(xrange(N), xrange(N)) * 10) + assert merger.spills 0 + sum(v for k,v in merger.iteritems()) +49995 + + +PARTITIONS = 64 +BATCH = 1 + +def __init__(self, combiner, memory_limit=512, serializer=None, +localdirs=None, scale=1): +self.combiner = combiner +self.memory_limit = memory_limit +self.serializer = serializer or\ +BatchedSerializer(AutoSerializer(), 1024) +self.localdirs = localdirs or self._get_dirs() +self.scale = scale +self.data = {} +self.pdata = [] +self.spills = 0 + +def _get_dirs(self): +path = os.environ.get(SPARK_LOCAL_DIR, /tmp/spark) +dirs = path.split(,) +localdirs = [] +for d in dirs: +d = os.path.join(d, merge, str(os.getpid())) +try: +os.makedirs(d) +localdirs.append(d) +except IOError: +pass +if not localdirs: +raise IOError(no writable directories: + path) +return localdirs + +def _get_spill_dir(self, n): +return os.path.join(self.localdirs[n % len(self.localdirs)], str(n)) + +@property +def used_memory(self): +return get_used_memory() + +@property +def next_limit(self): +return max(self.memory_limit, self.used_memory * 1.05) --- End diff -- Same here, make this a method, it's confusing for it to be a property --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1460#discussion_r15207837 --- Diff: python/pyspark/shuffle.py --- @@ -0,0 +1,258 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the License); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an AS IS BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import sys +import platform +import shutil +import warnings + +from pyspark.serializers import BatchedSerializer, AutoSerializer + +try: +import psutil + +def get_used_memory(): +self = psutil.Process(os.getpid()) +return self.memory_info().rss 20 + +except ImportError: + +def get_used_memory(): +if platform.system() == 'Linux': +for line in open('/proc/self/status'): +if line.startswith('VmRSS:'): +return int(line.split()[1]) 10 +else: +warnings.warn(please install psutil to get accurate memory usage) +if platform.system() == Darwin: +import resource +return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss 20 +# TODO: support windows +return 0 + + +class Merger(object): + +merge shuffled data together by combinator + + +def merge(self, iterator): +raise NotImplementedError + +def iteritems(self): +raise NotImplementedError + + +class MapMerger(Merger): + +In memory merger based on map + + +def __init__(self, combiner): +self.combiner = combiner +self.data = {} + +def merge(self, iterator): +d, comb = self.data, self.combiner +for k, v in iter(iterator): +d[k] = comb(d[k], v) if k in d else v + +def iteritems(self): +return self.data.iteritems() + + +class ExternalHashMapMerger(Merger): + + +External merger will dump the aggregated data into disks when memory usage +is above the limit, then merge them together. + + combiner = lambda x, y:x+y + merger = ExternalHashMapMerger(combiner, 10) + N = 1 + merger.merge(zip(xrange(N), xrange(N)) * 10) + assert merger.spills 0 + sum(v for k,v in merger.iteritems()) +49995 + + +PARTITIONS = 64 +BATCH = 1 + +def __init__(self, combiner, memory_limit=512, serializer=None, +localdirs=None, scale=1): +self.combiner = combiner +self.memory_limit = memory_limit +self.serializer = serializer or\ +BatchedSerializer(AutoSerializer(), 1024) +self.localdirs = localdirs or self._get_dirs() +self.scale = scale +self.data = {} +self.pdata = [] +self.spills = 0 + +def _get_dirs(self): +path = os.environ.get(SPARK_LOCAL_DIR, /tmp/spark) +dirs = path.split(,) +localdirs = [] +for d in dirs: +d = os.path.join(d, merge, str(os.getpid())) +try: +os.makedirs(d) +localdirs.append(d) +except IOError: +pass +if not localdirs: +raise IOError(no writable directories: + path) +return localdirs + +def _get_spill_dir(self, n): +return os.path.join(self.localdirs[n % len(self.localdirs)], str(n)) + +@property +def used_memory(self): +return get_used_memory() + +@property +def next_limit(self): +return max(self.memory_limit, self.used_memory * 1.05) --- End diff -- Also I don't really understand the purpose of this, why don't we always go up to the same limit? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if