[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-25 Thread davies
Github user davies commented on the pull request:

https://github.com/apache/spark/pull/1460#issuecomment-50119786
  
Awesome!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-24 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1460#discussion_r15333099
  
--- Diff: python/pyspark/shuffle.py ---
@@ -0,0 +1,436 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the License); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an AS IS BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import sys
+import platform
+import shutil
+import warnings
+
+from pyspark.serializers import BatchedSerializer, PickleSerializer
+
+try:
+import psutil
+
+def get_used_memory():
+ Return the used memory in MB 
+process = psutil.Process(os.getpid())
+if hasattr(process, memory_info):
+info = process.memory_info()
+else:
+info = process.get_memory_info()
+return info.rss  20
+except ImportError:
+
+def get_used_memory():
+ Return the used memory in MB 
+if platform.system() == 'Linux':
+for line in open('/proc/self/status'):
+if line.startswith('VmRSS:'):
+return int(line.split()[1])  10
+else:
+warnings.warn(Please install psutil to have better 
+support with spilling)
+if platform.system() == Darwin:
+import resource
+rss = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
+return rss  20
+# TODO: support windows
+return 0
+
+
+class Aggregator(object):
+
+
+Aggregator has tree functions to merge values into combiner.
+
+createCombiner:  (value) - combiner
+mergeValue:  (combine, value) - combiner
+mergeCombiners:  (combiner, combiner) - combiner
+
+
+def __init__(self, createCombiner, mergeValue, mergeCombiners):
+self.createCombiner = createCombiner
+self.mergeValue = mergeValue
+self.mergeCombiners = mergeCombiners
+
+
+class SimpleAggregator(Aggregator):
+
+
+SimpleAggregator is useful for the cases that combiners have
+same type with values
+
+
+def __init__(self, combiner):
+Aggregator.__init__(self, lambda x: x, combiner, combiner)
+
+
+class Merger(object):
+
+
+Merge shuffled data together by aggregator
+
+
+def __init__(self, aggregator):
+self.agg = aggregator
+
+def mergeValues(self, iterator):
+ Combine the items by creator and combiner 
+raise NotImplementedError
+
+def mergeCombiners(self, iterator):
+ Merge the combined items by mergeCombiner 
+raise NotImplementedError
+
+def iteritems(self):
+ Return the merged items ad iterator 
+raise NotImplementedError
+
+
+class InMemoryMerger(Merger):
+
+
+In memory merger based on in-memory dict.
+
+
+def __init__(self, aggregator):
+Merger.__init__(self, aggregator)
+self.data = {}
+
+def mergeValues(self, iterator):
+ Combine the items by creator and combiner 
+# speed up attributes lookup
+d, creator = self.data, self.agg.createCombiner
+comb = self.agg.mergeValue
+for k, v in iterator:
+d[k] = comb(d[k], v) if k in d else creator(v)
+
+def mergeCombiners(self, iterator):
+ Merge the combined items by mergeCombiner 
+# speed up attributes lookup
+d, comb = self.data, self.agg.mergeCombiners
+for k, v in iterator:
+d[k] = comb(d[k], v) if k in d else v
+
+def iteritems(self):
+ Return the merged items ad iterator 
+return self.data.iteritems()
+
+
+class ExternalMerger(Merger):
+
+
+External merger will dump the aggregated data into disks when
+memory usage goes above the limit, then merge them together.
+
+This 

[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-24 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1460#issuecomment-49976553
  
QA tests have started for PR 1460. This patch merges cleanly. brView 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17110/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-24 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1460#issuecomment-49980302
  
QA results for PR 1460:br- This patch PASSES unit tests.br- This patch 
merges cleanlybr- This patch adds the following public classes 
(experimental):brclass AutoSerializer(FramedSerializer):brclass 
Aggregator(object):brclass SimpleAggregator(Aggregator):brclass 
Merger(object):brclass InMemoryMerger(Merger):brclass 
ExternalMerger(Merger):brbrFor more information see test 
ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17110/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-24 Thread mateiz
Github user mateiz commented on the pull request:

https://github.com/apache/spark/pull/1460#issuecomment-50072248
  
Hey Davies, I tried this out a bit and saw two issues / areas for 
improvement:

1) Since the ExternalMerger is used in both map tasks and reduce tasks, one 
problem that can happen is that the reduce task's data is already hashed modulo 
the # of reduce tasks, and so you get many empty buckets. For example, if you 
have 2 reduce tasks, task 0 gets all the values whose hash code is even, so it 
can only use half its buckets. If you have 64 reduce tasks, only one bucket is 
used.

The best way to fix this would be to hash values with a random hash 
function when choosing the bucket. One simple way might be to generate a random 
integer X for each ExternalMerger and then take hash((key, X)) instead of 
hash(key) when choosing the bucket. This is equivalent to salting your hash 
function. Maybe you have other ideas but I'd suggest trying this first.

2) I also noticed that sometimes maps would fill up again before the old 
memory was fully freed, leading to smaller spills. For example, for (Int, Int) 
pairs the first spill from 512 MB memory is about 68 MB of files, but later 
spills were only around 20 MB. I found that I could get better performance 
overall by adding some gc.collect() calls after every data.clear() and 
pdata.clear(). This freed more memory faster and allowed us to do more work in 
memory before spilling. The perf difference for one test job was around 30% but 
you should try it on your own jobs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-24 Thread mateiz
Github user mateiz commented on the pull request:

https://github.com/apache/spark/pull/1460#issuecomment-50072351
  
BTW here's a patch that adds the GC calls I talked about above: 
https://gist.github.com/mateiz/297b8618ed033e7c8005


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-24 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1460#issuecomment-50098449
  
QA tests have started for PR 1460. This patch merges cleanly. brView 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17152/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-24 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1460#issuecomment-50099568
  
QA tests have started for PR 1460. This patch merges cleanly. brView 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17153/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-24 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1460#issuecomment-50101034
  
QA results for PR 1460:br- This patch FAILED unit tests.br- This patch 
merges cleanlybr- This patch adds the following public classes 
(experimental):brclass AutoSerializer(FramedSerializer):brclass 
Aggregator(object):brclass SimpleAggregator(Aggregator):brclass 
Merger(object):brclass InMemoryMerger(Merger):brclass 
ExternalMerger(Merger):brbrFor more information see test 
ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17152/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-24 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1460#issuecomment-50102382
  
QA results for PR 1460:br- This patch PASSES unit tests.br- This patch 
merges cleanlybr- This patch adds the following public classes 
(experimental):brclass AutoSerializer(FramedSerializer):brclass 
Aggregator(object):brclass SimpleAggregator(Aggregator):brclass 
Merger(object):brclass InMemoryMerger(Merger):brclass 
ExternalMerger(Merger):brbrFor more information see test 
ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17153/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-24 Thread mateiz
Github user mateiz commented on the pull request:

https://github.com/apache/spark/pull/1460#issuecomment-50110575
  
Thanks Davies. I've merged this in.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-24 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/1460


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-23 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1460#discussion_r15274517
  
--- Diff: python/pyspark/shuffle.py ---
@@ -0,0 +1,378 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the License); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an AS IS BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import sys
+import platform
+import shutil
+import warnings
+
+from pyspark.serializers import BatchedSerializer, PickleSerializer
+
+try:
+import psutil
+
+def get_used_memory():
+self = psutil.Process(os.getpid())
+return self.memory_info().rss  20
+
+except ImportError:
+
+def get_used_memory():
+if platform.system() == 'Linux':
+for line in open('/proc/self/status'):
+if line.startswith('VmRSS:'):
+return int(line.split()[1])  10
+else:
+warnings.warn(please install psutil to get accurate memory 
usage)
+if platform.system() == Darwin:
+import resource
+return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss 
 20
+# TODO: support windows
+return 0
+
+
+class Aggregator(object):
+
+def __init__(self, creator, combiner, mergeCombiner=None):
+self.creator = creator
+self.combiner = combiner
+self.mergeCombiner = mergeCombiner or combiner
+
+
+class Merger(object):
+
+
+merge shuffled data together by aggregator
+
+
+def __init__(self, aggregator):
+self.agg = aggregator
+
+def combine(self, iterator):
+ combine the items by creator and combiner 
+raise NotImplementedError
+
+def merge(self, iterator):
+ merge the combined items by mergeCombiner 
+raise NotImplementedError
+
+def iteritems(self):
+ return the merged items ad iterator 
+raise NotImplementedError
+
+
+class InMemoryMerger(Merger):
+
+
+In memory merger based on in-memory dict.
+
+
+def __init__(self, aggregator):
+Merger.__init__(self, aggregator)
+self.data = {}
+
+def combine(self, iterator):
+ combine the items by creator and combiner 
+# speed up attributes lookup
+d, creator, comb = self.data, self.agg.creator, self.agg.combiner
+for k, v in iterator:
+d[k] = comb(d[k], v) if k in d else creator(v)
+
+def merge(self, iterator):
+ merge the combined items by mergeCombiner 
+# speed up attributes lookup
+d, comb = self.data, self.agg.mergeCombiner
+for k, v in iterator:
+d[k] = comb(d[k], v) if k in d else v
+
+def iteritems(self):
+ return the merged items ad iterator 
+return self.data.iteritems()
+
+
+class ExternalMerger(Merger):
+
+
+External merger will dump the aggregated data into disks when
+memory usage goes above the limit, then merge them together.
+
+This class works as follows:
+
+- It repeatedly combine the items and save them in one dict in 
+  memory.
+
+- When the used memory goes above memory limit, it will split
+  the combined data into partitions by hash code, dump them
+  into disk, one file per partition.
+
+- Then it goes through the rest of the iterator, combine items
+  into different dict by hash. Until the used memory goes over
+  memory limit, it dump all the dicts into disks, one file per
+  dict. Repeat this again until combine all the items.
+
+- Before return any items, it will load each partition and
+  combine them seperately. Yield them before loading next
+  partition.
+
+- During loading a partition, if the memory goes over limit,
+  it will partition the loaded data and dump them into disks
+  

[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-23 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1460#discussion_r15274557
  
--- Diff: python/pyspark/shuffle.py ---
@@ -0,0 +1,378 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the License); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an AS IS BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import sys
+import platform
+import shutil
+import warnings
+
+from pyspark.serializers import BatchedSerializer, PickleSerializer
+
+try:
+import psutil
+
+def get_used_memory():
+self = psutil.Process(os.getpid())
+return self.memory_info().rss  20
+
+except ImportError:
+
+def get_used_memory():
+if platform.system() == 'Linux':
+for line in open('/proc/self/status'):
+if line.startswith('VmRSS:'):
+return int(line.split()[1])  10
+else:
+warnings.warn(please install psutil to get accurate memory 
usage)
+if platform.system() == Darwin:
+import resource
+return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss 
 20
+# TODO: support windows
+return 0
+
+
+class Aggregator(object):
+
+def __init__(self, creator, combiner, mergeCombiner=None):
+self.creator = creator
+self.combiner = combiner
+self.mergeCombiner = mergeCombiner or combiner
+
+
+class Merger(object):
+
+
+merge shuffled data together by aggregator
+
+
+def __init__(self, aggregator):
+self.agg = aggregator
+
+def combine(self, iterator):
+ combine the items by creator and combiner 
+raise NotImplementedError
+
+def merge(self, iterator):
+ merge the combined items by mergeCombiner 
+raise NotImplementedError
+
+def iteritems(self):
+ return the merged items ad iterator 
+raise NotImplementedError
+
+
+class InMemoryMerger(Merger):
+
+
+In memory merger based on in-memory dict.
+
+
+def __init__(self, aggregator):
+Merger.__init__(self, aggregator)
+self.data = {}
+
+def combine(self, iterator):
+ combine the items by creator and combiner 
+# speed up attributes lookup
+d, creator, comb = self.data, self.agg.creator, self.agg.combiner
+for k, v in iterator:
+d[k] = comb(d[k], v) if k in d else creator(v)
+
+def merge(self, iterator):
+ merge the combined items by mergeCombiner 
+# speed up attributes lookup
+d, comb = self.data, self.agg.mergeCombiner
+for k, v in iterator:
+d[k] = comb(d[k], v) if k in d else v
+
+def iteritems(self):
+ return the merged items ad iterator 
+return self.data.iteritems()
+
+
+class ExternalMerger(Merger):
+
+
+External merger will dump the aggregated data into disks when
+memory usage goes above the limit, then merge them together.
+
+This class works as follows:
+
+- It repeatedly combine the items and save them in one dict in 
+  memory.
+
+- When the used memory goes above memory limit, it will split
+  the combined data into partitions by hash code, dump them
+  into disk, one file per partition.
+
+- Then it goes through the rest of the iterator, combine items
+  into different dict by hash. Until the used memory goes over
+  memory limit, it dump all the dicts into disks, one file per
+  dict. Repeat this again until combine all the items.
+
+- Before return any items, it will load each partition and
+  combine them seperately. Yield them before loading next
+  partition.
+
+- During loading a partition, if the memory goes over limit,
+  it will partition the loaded data and dump them into disks
+  

[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-23 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1460#discussion_r15274753
  
--- Diff: python/pyspark/shuffle.py ---
@@ -0,0 +1,378 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the License); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an AS IS BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import sys
+import platform
+import shutil
+import warnings
+
+from pyspark.serializers import BatchedSerializer, PickleSerializer
+
+try:
+import psutil
+
+def get_used_memory():
+self = psutil.Process(os.getpid())
+return self.memory_info().rss  20
+
+except ImportError:
+
+def get_used_memory():
+if platform.system() == 'Linux':
+for line in open('/proc/self/status'):
+if line.startswith('VmRSS:'):
+return int(line.split()[1])  10
+else:
+warnings.warn(please install psutil to get accurate memory 
usage)
+if platform.system() == Darwin:
+import resource
+return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss 
 20
+# TODO: support windows
+return 0
+
+
+class Aggregator(object):
+
+def __init__(self, creator, combiner, mergeCombiner=None):
+self.creator = creator
+self.combiner = combiner
+self.mergeCombiner = mergeCombiner or combiner
+
+
+class Merger(object):
+
+
+merge shuffled data together by aggregator
+
+
+def __init__(self, aggregator):
+self.agg = aggregator
+
+def combine(self, iterator):
+ combine the items by creator and combiner 
+raise NotImplementedError
+
+def merge(self, iterator):
+ merge the combined items by mergeCombiner 
+raise NotImplementedError
+
+def iteritems(self):
+ return the merged items ad iterator 
+raise NotImplementedError
+
+
+class InMemoryMerger(Merger):
+
+
+In memory merger based on in-memory dict.
+
+
+def __init__(self, aggregator):
+Merger.__init__(self, aggregator)
+self.data = {}
+
+def combine(self, iterator):
+ combine the items by creator and combiner 
+# speed up attributes lookup
+d, creator, comb = self.data, self.agg.creator, self.agg.combiner
+for k, v in iterator:
+d[k] = comb(d[k], v) if k in d else creator(v)
+
+def merge(self, iterator):
+ merge the combined items by mergeCombiner 
+# speed up attributes lookup
+d, comb = self.data, self.agg.mergeCombiner
+for k, v in iterator:
+d[k] = comb(d[k], v) if k in d else v
+
+def iteritems(self):
+ return the merged items ad iterator 
+return self.data.iteritems()
+
+
+class ExternalMerger(Merger):
+
+
+External merger will dump the aggregated data into disks when
+memory usage goes above the limit, then merge them together.
+
+This class works as follows:
+
+- It repeatedly combine the items and save them in one dict in 
+  memory.
+
+- When the used memory goes above memory limit, it will split
+  the combined data into partitions by hash code, dump them
+  into disk, one file per partition.
+
+- Then it goes through the rest of the iterator, combine items
+  into different dict by hash. Until the used memory goes over
+  memory limit, it dump all the dicts into disks, one file per
+  dict. Repeat this again until combine all the items.
+
+- Before return any items, it will load each partition and
+  combine them seperately. Yield them before loading next
+  partition.
+
+- During loading a partition, if the memory goes over limit,
+  it will partition the loaded data and dump them into disks
+  

[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-23 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1460#discussion_r15274766
  
--- Diff: python/pyspark/shuffle.py ---
@@ -0,0 +1,378 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the License); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an AS IS BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import sys
+import platform
+import shutil
+import warnings
+
+from pyspark.serializers import BatchedSerializer, PickleSerializer
+
+try:
+import psutil
+
+def get_used_memory():
+self = psutil.Process(os.getpid())
+return self.memory_info().rss  20
+
+except ImportError:
+
+def get_used_memory():
+if platform.system() == 'Linux':
+for line in open('/proc/self/status'):
+if line.startswith('VmRSS:'):
+return int(line.split()[1])  10
+else:
+warnings.warn(please install psutil to get accurate memory 
usage)
+if platform.system() == Darwin:
+import resource
+return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss 
 20
+# TODO: support windows
+return 0
+
+
+class Aggregator(object):
+
+def __init__(self, creator, combiner, mergeCombiner=None):
+self.creator = creator
+self.combiner = combiner
+self.mergeCombiner = mergeCombiner or combiner
+
+
+class Merger(object):
+
+
+merge shuffled data together by aggregator
+
+
+def __init__(self, aggregator):
+self.agg = aggregator
+
+def combine(self, iterator):
+ combine the items by creator and combiner 
+raise NotImplementedError
+
+def merge(self, iterator):
+ merge the combined items by mergeCombiner 
+raise NotImplementedError
+
+def iteritems(self):
+ return the merged items ad iterator 
+raise NotImplementedError
+
+
+class InMemoryMerger(Merger):
+
+
+In memory merger based on in-memory dict.
+
+
+def __init__(self, aggregator):
+Merger.__init__(self, aggregator)
+self.data = {}
+
+def combine(self, iterator):
+ combine the items by creator and combiner 
+# speed up attributes lookup
+d, creator, comb = self.data, self.agg.creator, self.agg.combiner
+for k, v in iterator:
+d[k] = comb(d[k], v) if k in d else creator(v)
+
+def merge(self, iterator):
+ merge the combined items by mergeCombiner 
+# speed up attributes lookup
+d, comb = self.data, self.agg.mergeCombiner
+for k, v in iterator:
+d[k] = comb(d[k], v) if k in d else v
+
+def iteritems(self):
+ return the merged items ad iterator 
+return self.data.iteritems()
+
+
+class ExternalMerger(Merger):
+
+
+External merger will dump the aggregated data into disks when
+memory usage goes above the limit, then merge them together.
+
+This class works as follows:
+
+- It repeatedly combine the items and save them in one dict in 
+  memory.
+
+- When the used memory goes above memory limit, it will split
+  the combined data into partitions by hash code, dump them
+  into disk, one file per partition.
+
+- Then it goes through the rest of the iterator, combine items
+  into different dict by hash. Until the used memory goes over
+  memory limit, it dump all the dicts into disks, one file per
+  dict. Repeat this again until combine all the items.
+
+- Before return any items, it will load each partition and
+  combine them seperately. Yield them before loading next
+  partition.
+
+- During loading a partition, if the memory goes over limit,
+  it will partition the loaded data and dump them into disks
+  

[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-23 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1460#discussion_r15274877
  
--- Diff: python/pyspark/shuffle.py ---
@@ -0,0 +1,416 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the License); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an AS IS BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import sys
+import platform
+import shutil
+import warnings
+
+from pyspark.serializers import BatchedSerializer, PickleSerializer
+
+try:
+import psutil
+
+def get_used_memory():
+ return the used memory in MB 
+self = psutil.Process(os.getpid())
+return self.memory_info().rss  20
+
+except ImportError:
+
+def get_used_memory():
+ return the used memory in MB 
+if platform.system() == 'Linux':
+for line in open('/proc/self/status'):
+if line.startswith('VmRSS:'):
+return int(line.split()[1])  10
+else:
+warnings.warn(please install psutil to have better 
+support with spilling)
+if platform.system() == Darwin:
+import resource
+rss = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
+return rss  20
+# TODO: support windows
+return 0
+
+
+class Aggregator(object):
+
+
+Aggregator has tree functions to merge values into combiner.
+
+createCombiner:  (value) - combiner
+mergeValue:  (combine, value) - combiner
+mergeCombiners:  (combiner, combiner) - combiner
+
+
+def __init__(self, createCombiner, mergeValue, mergeCombiners):
+self.createCombiner = createCombiner
+self.mergeValue = mergeValue
+self.mergeCombiners = mergeCombiners
+
+
+class SimpleAggregator(Aggregator):
+
+
+SimpleAggregator is useful for the cases that combiners have
+same type with values
+
+
+def __init__(self, combiner):
+Aggregator.__init__(self, lambda x: x, combiner, combiner)
+
+
+class Merger(object):
+
+
+merge shuffled data together by aggregator
+
+
+def __init__(self, aggregator):
+self.agg = aggregator
+
+def mergeValues(self, iterator):
+ combine the items by creator and combiner 
+raise NotImplementedError
+
+def mergeCombiners(self, iterator):
+ merge the combined items by mergeCombiner 
+raise NotImplementedError
+
+def iteritems(self):
+ return the merged items ad iterator 
+raise NotImplementedError
+
+
+class InMemoryMerger(Merger):
+
+
+In memory merger based on in-memory dict.
+
+
+def __init__(self, aggregator):
+Merger.__init__(self, aggregator)
+self.data = {}
+
+def mergeValues(self, iterator):
+ combine the items by creator and combiner 
+# speed up attributes lookup
+d, creator = self.data, self.agg.createCombiner
+comb = self.agg.mergeValue
+for k, v in iterator:
+d[k] = comb(d[k], v) if k in d else creator(v)
+
+def mergeCombiners(self, iterator):
+ merge the combined items by mergeCombiner 
+# speed up attributes lookup
+d, comb = self.data, self.agg.mergeCombiners
+for k, v in iterator:
+d[k] = comb(d[k], v) if k in d else v
+
+def iteritems(self):
+ return the merged items ad iterator 
+return self.data.iteritems()
+
+
+class ExternalMerger(Merger):
+
+
+External merger will dump the aggregated data into disks when
+memory usage goes above the limit, then merge them together.
+
+This class works as follows:
+
+- It repeatedly combine the items and save them in one dict in 
+  memory.
+
+- When 

[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-23 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1460#discussion_r15274897
  
--- Diff: python/pyspark/shuffle.py ---
@@ -0,0 +1,416 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the License); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an AS IS BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import sys
+import platform
+import shutil
+import warnings
+
+from pyspark.serializers import BatchedSerializer, PickleSerializer
+
+try:
+import psutil
+
+def get_used_memory():
+ return the used memory in MB 
+self = psutil.Process(os.getpid())
+return self.memory_info().rss  20
+
+except ImportError:
+
+def get_used_memory():
+ return the used memory in MB 
+if platform.system() == 'Linux':
+for line in open('/proc/self/status'):
+if line.startswith('VmRSS:'):
+return int(line.split()[1])  10
+else:
+warnings.warn(please install psutil to have better 
+support with spilling)
+if platform.system() == Darwin:
+import resource
+rss = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
+return rss  20
+# TODO: support windows
+return 0
+
+
+class Aggregator(object):
+
+
+Aggregator has tree functions to merge values into combiner.
+
+createCombiner:  (value) - combiner
+mergeValue:  (combine, value) - combiner
+mergeCombiners:  (combiner, combiner) - combiner
+
+
+def __init__(self, createCombiner, mergeValue, mergeCombiners):
+self.createCombiner = createCombiner
+self.mergeValue = mergeValue
+self.mergeCombiners = mergeCombiners
+
+
+class SimpleAggregator(Aggregator):
+
+
+SimpleAggregator is useful for the cases that combiners have
+same type with values
+
+
+def __init__(self, combiner):
+Aggregator.__init__(self, lambda x: x, combiner, combiner)
+
+
+class Merger(object):
+
+
+merge shuffled data together by aggregator
--- End diff --

Small style thing: capitalize the beginning of each doc comment to match 
the rest of our code. (This applies to a bunch of places.)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-23 Thread mattf
Github user mattf commented on a diff in the pull request:

https://github.com/apache/spark/pull/1460#discussion_r15286548
  
--- Diff: python/pyspark/rdd.py ---
@@ -1209,18 +1227,44 @@ def partitionBy(self, numPartitions, 
partitionFunc=portable_hash):
 
 # Transferring O(n) objects to Java is too expensive.  Instead, 
we'll
 # form the hash buckets in Python, transferring O(numPartitions) 
objects
-# to Java.  Each object is a (splitNumber, [objects]) pair.
+# to Java. Each object is a (splitNumber, [objects]) pair.
+# In order to void too huge objects, the objects are grouped into 
chunks.
 outputSerializer = self.ctx._unbatched_serializer
 
+limit = 
(_parse_memory(self.ctx._conf.get(spark.python.worker.memory)
+   or 512m) / 2)
--- End diff --

instead of 'conf.get(xyz) or default' why not use 'conf.get(xyz, 
default)'?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-23 Thread mattf
Github user mattf commented on a diff in the pull request:

https://github.com/apache/spark/pull/1460#discussion_r15286571
  
--- Diff: python/pyspark/rdd.py ---
@@ -1265,26 +1309,26 @@ def combineByKey(self, createCombiner, mergeValue, 
mergeCombiners,
 if numPartitions is None:
 numPartitions = self._defaultReducePartitions()
 
+serializer = self.ctx.serializer
+spill = (self.ctx._conf.get(spark.shuffle.spill) or 
'True').lower() == 'true'
+memory = 
_parse_memory(self.ctx._conf.get(spark.python.worker.memory) or 512m)
+agg = Aggregator(createCombiner, mergeValue, mergeCombiners)
+
 def combineLocally(iterator):
-combiners = {}
-for x in iterator:
-(k, v) = x
-if k not in combiners:
-combiners[k] = createCombiner(v)
-else:
-combiners[k] = mergeValue(combiners[k], v)
-return combiners.iteritems()
+merger = ExternalMerger(agg, memory, serializer) \
+ if spill else InMemoryMerger(agg)
+merger.mergeValues(iterator)
+return merger.iteritems()
+
 locally_combined = self.mapPartitions(combineLocally)
 shuffled = locally_combined.partitionBy(numPartitions)
-
+ 
--- End diff --

stray whitespace


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-23 Thread mattf
Github user mattf commented on a diff in the pull request:

https://github.com/apache/spark/pull/1460#discussion_r15286886
  
--- Diff: python/pyspark/shuffle.py ---
@@ -0,0 +1,416 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the License); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an AS IS BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import sys
+import platform
+import shutil
+import warnings
+
+from pyspark.serializers import BatchedSerializer, PickleSerializer
+
+try:
+import psutil
+
+def get_used_memory():
+ return the used memory in MB 
+self = psutil.Process(os.getpid())
+return self.memory_info().rss  20
+
+except ImportError:
+
+def get_used_memory():
+ return the used memory in MB 
+if platform.system() == 'Linux':
+for line in open('/proc/self/status'):
+if line.startswith('VmRSS:'):
+return int(line.split()[1])  10
+else:
+warnings.warn(please install psutil to have better 
+support with spilling)
+if platform.system() == Darwin:
+import resource
+rss = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
+return rss  20
+# TODO: support windows
+return 0
+
+
+class Aggregator(object):
+
+
+Aggregator has tree functions to merge values into combiner.
+
+createCombiner:  (value) - combiner
+mergeValue:  (combine, value) - combiner
+mergeCombiners:  (combiner, combiner) - combiner
+
+
+def __init__(self, createCombiner, mergeValue, mergeCombiners):
+self.createCombiner = createCombiner
+self.mergeValue = mergeValue
+self.mergeCombiners = mergeCombiners
+
+
+class SimpleAggregator(Aggregator):
+
+
+SimpleAggregator is useful for the cases that combiners have
+same type with values
+
+
+def __init__(self, combiner):
+Aggregator.__init__(self, lambda x: x, combiner, combiner)
+
+
+class Merger(object):
+
+
+merge shuffled data together by aggregator
+
+
+def __init__(self, aggregator):
+self.agg = aggregator
+
+def mergeValues(self, iterator):
+ combine the items by creator and combiner 
+raise NotImplementedError
+
+def mergeCombiners(self, iterator):
+ merge the combined items by mergeCombiner 
+raise NotImplementedError
+
+def iteritems(self):
+ return the merged items ad iterator 
+raise NotImplementedError
+
+
+class InMemoryMerger(Merger):
+
+
+In memory merger based on in-memory dict.
+
+
+def __init__(self, aggregator):
+Merger.__init__(self, aggregator)
+self.data = {}
+
+def mergeValues(self, iterator):
+ combine the items by creator and combiner 
+# speed up attributes lookup
+d, creator = self.data, self.agg.createCombiner
+comb = self.agg.mergeValue
+for k, v in iterator:
+d[k] = comb(d[k], v) if k in d else creator(v)
+
+def mergeCombiners(self, iterator):
+ merge the combined items by mergeCombiner 
+# speed up attributes lookup
+d, comb = self.data, self.agg.mergeCombiners
+for k, v in iterator:
+d[k] = comb(d[k], v) if k in d else v
+
+def iteritems(self):
+ return the merged items ad iterator 
+return self.data.iteritems()
+
+
+class ExternalMerger(Merger):
+
+
+External merger will dump the aggregated data into disks when
+memory usage goes above the limit, then merge them together.
+
+This class works as follows:
+
+- It repeatedly combine the items and save them in one dict in 
+  memory.
+
+- When the 

[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-23 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/1460#discussion_r15299925
  
--- Diff: python/pyspark/shuffle.py ---
@@ -0,0 +1,416 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the License); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an AS IS BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import sys
+import platform
+import shutil
+import warnings
+
+from pyspark.serializers import BatchedSerializer, PickleSerializer
+
+try:
+import psutil
+
+def get_used_memory():
+ return the used memory in MB 
+self = psutil.Process(os.getpid())
+return self.memory_info().rss  20
+
+except ImportError:
+
+def get_used_memory():
+ return the used memory in MB 
+if platform.system() == 'Linux':
+for line in open('/proc/self/status'):
+if line.startswith('VmRSS:'):
+return int(line.split()[1])  10
+else:
+warnings.warn(please install psutil to have better 
+support with spilling)
+if platform.system() == Darwin:
+import resource
+rss = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
+return rss  20
+# TODO: support windows
+return 0
+
+
+class Aggregator(object):
+
+
+Aggregator has tree functions to merge values into combiner.
+
+createCombiner:  (value) - combiner
+mergeValue:  (combine, value) - combiner
+mergeCombiners:  (combiner, combiner) - combiner
+
+
+def __init__(self, createCombiner, mergeValue, mergeCombiners):
+self.createCombiner = createCombiner
+self.mergeValue = mergeValue
+self.mergeCombiners = mergeCombiners
+
+
+class SimpleAggregator(Aggregator):
+
+
+SimpleAggregator is useful for the cases that combiners have
+same type with values
+
+
+def __init__(self, combiner):
+Aggregator.__init__(self, lambda x: x, combiner, combiner)
+
+
+class Merger(object):
+
+
+merge shuffled data together by aggregator
+
+
+def __init__(self, aggregator):
+self.agg = aggregator
+
+def mergeValues(self, iterator):
+ combine the items by creator and combiner 
+raise NotImplementedError
+
+def mergeCombiners(self, iterator):
+ merge the combined items by mergeCombiner 
+raise NotImplementedError
+
+def iteritems(self):
+ return the merged items ad iterator 
+raise NotImplementedError
+
+
+class InMemoryMerger(Merger):
+
+
+In memory merger based on in-memory dict.
+
+
+def __init__(self, aggregator):
+Merger.__init__(self, aggregator)
+self.data = {}
+
+def mergeValues(self, iterator):
+ combine the items by creator and combiner 
+# speed up attributes lookup
+d, creator = self.data, self.agg.createCombiner
+comb = self.agg.mergeValue
+for k, v in iterator:
+d[k] = comb(d[k], v) if k in d else creator(v)
+
+def mergeCombiners(self, iterator):
+ merge the combined items by mergeCombiner 
+# speed up attributes lookup
+d, comb = self.data, self.agg.mergeCombiners
+for k, v in iterator:
+d[k] = comb(d[k], v) if k in d else v
+
+def iteritems(self):
+ return the merged items ad iterator 
+return self.data.iteritems()
+
+
+class ExternalMerger(Merger):
+
+
+External merger will dump the aggregated data into disks when
+memory usage goes above the limit, then merge them together.
+
+This class works as follows:
+
+- It repeatedly combine the items and save them in one dict in 
+  memory.
+
+- When 

[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-23 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/1460#discussion_r15299960
  
--- Diff: python/pyspark/rdd.py ---
@@ -1209,18 +1227,44 @@ def partitionBy(self, numPartitions, 
partitionFunc=portable_hash):
 
 # Transferring O(n) objects to Java is too expensive.  Instead, 
we'll
 # form the hash buckets in Python, transferring O(numPartitions) 
objects
-# to Java.  Each object is a (splitNumber, [objects]) pair.
+# to Java. Each object is a (splitNumber, [objects]) pair.
+# In order to void too huge objects, the objects are grouped into 
chunks.
 outputSerializer = self.ctx._unbatched_serializer
 
+limit = 
(_parse_memory(self.ctx._conf.get(spark.python.worker.memory)
+   or 512m) / 2)
--- End diff --

I did not find out this, sorry.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-23 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/1460#discussion_r15300037
  
--- Diff: python/pyspark/shuffle.py ---
@@ -0,0 +1,416 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the License); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an AS IS BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import sys
+import platform
+import shutil
+import warnings
+
+from pyspark.serializers import BatchedSerializer, PickleSerializer
+
+try:
+import psutil
+
+def get_used_memory():
+ return the used memory in MB 
+self = psutil.Process(os.getpid())
+return self.memory_info().rss  20
+
+except ImportError:
+
+def get_used_memory():
+ return the used memory in MB 
+if platform.system() == 'Linux':
+for line in open('/proc/self/status'):
+if line.startswith('VmRSS:'):
+return int(line.split()[1])  10
+else:
+warnings.warn(please install psutil to have better 
+support with spilling)
+if platform.system() == Darwin:
+import resource
+rss = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
+return rss  20
+# TODO: support windows
+return 0
+
+
+class Aggregator(object):
+
+
+Aggregator has tree functions to merge values into combiner.
+
+createCombiner:  (value) - combiner
+mergeValue:  (combine, value) - combiner
+mergeCombiners:  (combiner, combiner) - combiner
+
+
+def __init__(self, createCombiner, mergeValue, mergeCombiners):
+self.createCombiner = createCombiner
+self.mergeValue = mergeValue
+self.mergeCombiners = mergeCombiners
+
+
+class SimpleAggregator(Aggregator):
+
+
+SimpleAggregator is useful for the cases that combiners have
+same type with values
+
+
+def __init__(self, combiner):
+Aggregator.__init__(self, lambda x: x, combiner, combiner)
+
+
+class Merger(object):
+
+
+merge shuffled data together by aggregator
+
+
+def __init__(self, aggregator):
+self.agg = aggregator
+
+def mergeValues(self, iterator):
+ combine the items by creator and combiner 
+raise NotImplementedError
+
+def mergeCombiners(self, iterator):
+ merge the combined items by mergeCombiner 
+raise NotImplementedError
+
+def iteritems(self):
+ return the merged items ad iterator 
+raise NotImplementedError
+
+
+class InMemoryMerger(Merger):
+
+
+In memory merger based on in-memory dict.
+
+
+def __init__(self, aggregator):
+Merger.__init__(self, aggregator)
+self.data = {}
+
+def mergeValues(self, iterator):
+ combine the items by creator and combiner 
+# speed up attributes lookup
+d, creator = self.data, self.agg.createCombiner
+comb = self.agg.mergeValue
+for k, v in iterator:
+d[k] = comb(d[k], v) if k in d else creator(v)
+
+def mergeCombiners(self, iterator):
+ merge the combined items by mergeCombiner 
+# speed up attributes lookup
+d, comb = self.data, self.agg.mergeCombiners
+for k, v in iterator:
+d[k] = comb(d[k], v) if k in d else v
+
+def iteritems(self):
+ return the merged items ad iterator 
+return self.data.iteritems()
+
+
+class ExternalMerger(Merger):
+
+
+External merger will dump the aggregated data into disks when
+memory usage goes above the limit, then merge them together.
+
+This class works as follows:
+
+- It repeatedly combine the items and save them in one dict in 
+  memory.
+
+- When 

[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-23 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/1460#discussion_r15300412
  
--- Diff: python/pyspark/shuffle.py ---
@@ -0,0 +1,378 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the License); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an AS IS BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import sys
+import platform
+import shutil
+import warnings
+
+from pyspark.serializers import BatchedSerializer, PickleSerializer
+
+try:
+import psutil
+
+def get_used_memory():
+self = psutil.Process(os.getpid())
+return self.memory_info().rss  20
+
+except ImportError:
+
+def get_used_memory():
+if platform.system() == 'Linux':
+for line in open('/proc/self/status'):
+if line.startswith('VmRSS:'):
+return int(line.split()[1])  10
+else:
+warnings.warn(please install psutil to get accurate memory 
usage)
+if platform.system() == Darwin:
+import resource
+return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss 
 20
+# TODO: support windows
+return 0
+
+
+class Aggregator(object):
+
+def __init__(self, creator, combiner, mergeCombiner=None):
+self.creator = creator
+self.combiner = combiner
+self.mergeCombiner = mergeCombiner or combiner
+
+
+class Merger(object):
+
+
+merge shuffled data together by aggregator
+
+
+def __init__(self, aggregator):
+self.agg = aggregator
+
+def combine(self, iterator):
+ combine the items by creator and combiner 
+raise NotImplementedError
+
+def merge(self, iterator):
+ merge the combined items by mergeCombiner 
+raise NotImplementedError
+
+def iteritems(self):
+ return the merged items ad iterator 
+raise NotImplementedError
+
+
+class InMemoryMerger(Merger):
+
+
+In memory merger based on in-memory dict.
+
+
+def __init__(self, aggregator):
+Merger.__init__(self, aggregator)
+self.data = {}
+
+def combine(self, iterator):
+ combine the items by creator and combiner 
+# speed up attributes lookup
+d, creator, comb = self.data, self.agg.creator, self.agg.combiner
+for k, v in iterator:
+d[k] = comb(d[k], v) if k in d else creator(v)
+
+def merge(self, iterator):
+ merge the combined items by mergeCombiner 
+# speed up attributes lookup
+d, comb = self.data, self.agg.mergeCombiner
+for k, v in iterator:
+d[k] = comb(d[k], v) if k in d else v
+
+def iteritems(self):
+ return the merged items ad iterator 
+return self.data.iteritems()
+
+
+class ExternalMerger(Merger):
+
+
+External merger will dump the aggregated data into disks when
+memory usage goes above the limit, then merge them together.
+
+This class works as follows:
+
+- It repeatedly combine the items and save them in one dict in 
+  memory.
+
+- When the used memory goes above memory limit, it will split
+  the combined data into partitions by hash code, dump them
+  into disk, one file per partition.
+
+- Then it goes through the rest of the iterator, combine items
+  into different dict by hash. Until the used memory goes over
+  memory limit, it dump all the dicts into disks, one file per
+  dict. Repeat this again until combine all the items.
+
+- Before return any items, it will load each partition and
+  combine them seperately. Yield them before loading next
+  partition.
+
+- During loading a partition, if the memory goes over limit,
+  it will partition the loaded data and dump them into disks
+  

[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-23 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/1460#discussion_r15300459
  
--- Diff: python/pyspark/shuffle.py ---
@@ -0,0 +1,378 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the License); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an AS IS BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import sys
+import platform
+import shutil
+import warnings
+
+from pyspark.serializers import BatchedSerializer, PickleSerializer
+
+try:
+import psutil
+
+def get_used_memory():
+self = psutil.Process(os.getpid())
+return self.memory_info().rss  20
+
+except ImportError:
+
+def get_used_memory():
+if platform.system() == 'Linux':
+for line in open('/proc/self/status'):
+if line.startswith('VmRSS:'):
+return int(line.split()[1])  10
+else:
+warnings.warn(please install psutil to get accurate memory 
usage)
+if platform.system() == Darwin:
+import resource
+return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss 
 20
+# TODO: support windows
+return 0
+
+
+class Aggregator(object):
+
+def __init__(self, creator, combiner, mergeCombiner=None):
+self.creator = creator
+self.combiner = combiner
+self.mergeCombiner = mergeCombiner or combiner
+
+
+class Merger(object):
+
+
+merge shuffled data together by aggregator
+
+
+def __init__(self, aggregator):
+self.agg = aggregator
+
+def combine(self, iterator):
+ combine the items by creator and combiner 
+raise NotImplementedError
+
+def merge(self, iterator):
+ merge the combined items by mergeCombiner 
+raise NotImplementedError
+
+def iteritems(self):
+ return the merged items ad iterator 
+raise NotImplementedError
+
+
+class InMemoryMerger(Merger):
+
+
+In memory merger based on in-memory dict.
+
+
+def __init__(self, aggregator):
+Merger.__init__(self, aggregator)
+self.data = {}
+
+def combine(self, iterator):
+ combine the items by creator and combiner 
+# speed up attributes lookup
+d, creator, comb = self.data, self.agg.creator, self.agg.combiner
+for k, v in iterator:
+d[k] = comb(d[k], v) if k in d else creator(v)
+
+def merge(self, iterator):
+ merge the combined items by mergeCombiner 
+# speed up attributes lookup
+d, comb = self.data, self.agg.mergeCombiner
+for k, v in iterator:
+d[k] = comb(d[k], v) if k in d else v
+
+def iteritems(self):
+ return the merged items ad iterator 
+return self.data.iteritems()
+
+
+class ExternalMerger(Merger):
+
+
+External merger will dump the aggregated data into disks when
+memory usage goes above the limit, then merge them together.
+
+This class works as follows:
+
+- It repeatedly combine the items and save them in one dict in 
+  memory.
+
+- When the used memory goes above memory limit, it will split
+  the combined data into partitions by hash code, dump them
+  into disk, one file per partition.
+
+- Then it goes through the rest of the iterator, combine items
+  into different dict by hash. Until the used memory goes over
+  memory limit, it dump all the dicts into disks, one file per
+  dict. Repeat this again until combine all the items.
+
+- Before return any items, it will load each partition and
+  combine them seperately. Yield them before loading next
+  partition.
+
+- During loading a partition, if the memory goes over limit,
+  it will partition the loaded data and dump them into disks
+  

[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-23 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/1460#discussion_r15300603
  
--- Diff: python/pyspark/shuffle.py ---
@@ -0,0 +1,378 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the License); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an AS IS BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import sys
+import platform
+import shutil
+import warnings
+
+from pyspark.serializers import BatchedSerializer, PickleSerializer
+
+try:
+import psutil
+
+def get_used_memory():
+self = psutil.Process(os.getpid())
+return self.memory_info().rss  20
+
+except ImportError:
+
+def get_used_memory():
+if platform.system() == 'Linux':
+for line in open('/proc/self/status'):
+if line.startswith('VmRSS:'):
+return int(line.split()[1])  10
+else:
+warnings.warn(please install psutil to get accurate memory 
usage)
+if platform.system() == Darwin:
+import resource
+return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss 
 20
+# TODO: support windows
+return 0
+
+
+class Aggregator(object):
+
+def __init__(self, creator, combiner, mergeCombiner=None):
+self.creator = creator
+self.combiner = combiner
+self.mergeCombiner = mergeCombiner or combiner
+
+
+class Merger(object):
+
+
+merge shuffled data together by aggregator
+
+
+def __init__(self, aggregator):
+self.agg = aggregator
+
+def combine(self, iterator):
+ combine the items by creator and combiner 
+raise NotImplementedError
+
+def merge(self, iterator):
+ merge the combined items by mergeCombiner 
+raise NotImplementedError
+
+def iteritems(self):
+ return the merged items ad iterator 
+raise NotImplementedError
+
+
+class InMemoryMerger(Merger):
+
+
+In memory merger based on in-memory dict.
+
+
+def __init__(self, aggregator):
+Merger.__init__(self, aggregator)
+self.data = {}
+
+def combine(self, iterator):
+ combine the items by creator and combiner 
+# speed up attributes lookup
+d, creator, comb = self.data, self.agg.creator, self.agg.combiner
+for k, v in iterator:
+d[k] = comb(d[k], v) if k in d else creator(v)
+
+def merge(self, iterator):
+ merge the combined items by mergeCombiner 
+# speed up attributes lookup
+d, comb = self.data, self.agg.mergeCombiner
+for k, v in iterator:
+d[k] = comb(d[k], v) if k in d else v
+
+def iteritems(self):
+ return the merged items ad iterator 
+return self.data.iteritems()
+
+
+class ExternalMerger(Merger):
+
+
+External merger will dump the aggregated data into disks when
+memory usage goes above the limit, then merge them together.
+
+This class works as follows:
+
+- It repeatedly combine the items and save them in one dict in 
+  memory.
+
+- When the used memory goes above memory limit, it will split
+  the combined data into partitions by hash code, dump them
+  into disk, one file per partition.
+
+- Then it goes through the rest of the iterator, combine items
+  into different dict by hash. Until the used memory goes over
+  memory limit, it dump all the dicts into disks, one file per
+  dict. Repeat this again until combine all the items.
+
+- Before return any items, it will load each partition and
+  combine them seperately. Yield them before loading next
+  partition.
+
+- During loading a partition, if the memory goes over limit,
+  it will partition the loaded data and dump them into disks
+  

[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-23 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1460#issuecomment-49904887
  
QA tests have started for PR 1460. This patch merges cleanly. brView 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17043/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-23 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1460#discussion_r15302973
  
--- Diff: python/pyspark/rdd.py ---
@@ -1207,20 +1225,49 @@ def partitionBy(self, numPartitions, 
partitionFunc=portable_hash):
 if numPartitions is None:
 numPartitions = self._defaultReducePartitions()
 
-# Transferring O(n) objects to Java is too expensive.  Instead, 
we'll
-# form the hash buckets in Python, transferring O(numPartitions) 
objects
-# to Java.  Each object is a (splitNumber, [objects]) pair.
+# Transferring O(n) objects to Java is too expensive.
+# Instead, we'll form the hash buckets in Python,
+# transferring O(numPartitions) objects to Java.
+# Each object is a (splitNumber, [objects]) pair.
+# In order to void too huge objects, the objects are
--- End diff --

Small typo: void - avoid


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-23 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1460#discussion_r15303224
  
--- Diff: python/pyspark/rdd.py ---
@@ -1207,20 +1225,49 @@ def partitionBy(self, numPartitions, 
partitionFunc=portable_hash):
 if numPartitions is None:
 numPartitions = self._defaultReducePartitions()
 
-# Transferring O(n) objects to Java is too expensive.  Instead, 
we'll
-# form the hash buckets in Python, transferring O(numPartitions) 
objects
-# to Java.  Each object is a (splitNumber, [objects]) pair.
+# Transferring O(n) objects to Java is too expensive.
+# Instead, we'll form the hash buckets in Python,
+# transferring O(numPartitions) objects to Java.
+# Each object is a (splitNumber, [objects]) pair.
+# In order to void too huge objects, the objects are
+# grouped into chunks.
 outputSerializer = self.ctx._unbatched_serializer
 
+limit = (_parse_memory(self.ctx._conf.get(
+spark.python.worker.memory, 512m) / 2)
--- End diff --

Could you get the same problem of increasing the limit here that you have 
in ExternalMerger? (If Python doesn't free memory right away)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-23 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1460#discussion_r15303331
  
--- Diff: python/pyspark/rdd.py ---
@@ -1265,26 +1312,28 @@ def combineByKey(self, createCombiner, mergeValue, 
mergeCombiners,
 if numPartitions is None:
 numPartitions = self._defaultReducePartitions()
 
+serializer = self.ctx.serializer
+spill = (self.ctx._conf.get(spark.shuffle.spill, 'True').lower()
+ == 'true')
+memory = (_parse_memory(self.ctx._conf.get(
+spark.python.worker.memory,512m)
--- End diff --

Space after ,


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-23 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/1460#discussion_r15304302
  
--- Diff: python/pyspark/rdd.py ---
@@ -1207,20 +1225,49 @@ def partitionBy(self, numPartitions, 
partitionFunc=portable_hash):
 if numPartitions is None:
 numPartitions = self._defaultReducePartitions()
 
-# Transferring O(n) objects to Java is too expensive.  Instead, 
we'll
-# form the hash buckets in Python, transferring O(numPartitions) 
objects
-# to Java.  Each object is a (splitNumber, [objects]) pair.
+# Transferring O(n) objects to Java is too expensive.
+# Instead, we'll form the hash buckets in Python,
+# transferring O(numPartitions) objects to Java.
+# Each object is a (splitNumber, [objects]) pair.
+# In order to void too huge objects, the objects are
+# grouped into chunks.
 outputSerializer = self.ctx._unbatched_serializer
 
+limit = (_parse_memory(self.ctx._conf.get(
+spark.python.worker.memory, 512m) / 2)
--- End diff --

The overhead of splitting the items into small chunks is MUCH lower than 
that in ExternalMerger, so this limit is smaller than ExternalMerger (0.5  
0.9). In the worst cases, it will yield all the items after every `batch` 
number of items.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-23 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/1460#discussion_r15306941
  
--- Diff: python/pyspark/shuffle.py ---
@@ -0,0 +1,378 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the License); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an AS IS BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import sys
+import platform
+import shutil
+import warnings
+
+from pyspark.serializers import BatchedSerializer, PickleSerializer
+
+try:
+import psutil
+
+def get_used_memory():
+self = psutil.Process(os.getpid())
+return self.memory_info().rss  20
+
+except ImportError:
+
+def get_used_memory():
+if platform.system() == 'Linux':
+for line in open('/proc/self/status'):
+if line.startswith('VmRSS:'):
+return int(line.split()[1])  10
+else:
+warnings.warn(please install psutil to get accurate memory 
usage)
+if platform.system() == Darwin:
+import resource
+return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss 
 20
+# TODO: support windows
+return 0
+
+
+class Aggregator(object):
+
+def __init__(self, creator, combiner, mergeCombiner=None):
+self.creator = creator
+self.combiner = combiner
+self.mergeCombiner = mergeCombiner or combiner
+
+
+class Merger(object):
+
+
+merge shuffled data together by aggregator
+
+
+def __init__(self, aggregator):
+self.agg = aggregator
+
+def combine(self, iterator):
+ combine the items by creator and combiner 
+raise NotImplementedError
+
+def merge(self, iterator):
+ merge the combined items by mergeCombiner 
+raise NotImplementedError
+
+def iteritems(self):
+ return the merged items ad iterator 
+raise NotImplementedError
+
+
+class InMemoryMerger(Merger):
+
+
+In memory merger based on in-memory dict.
+
+
+def __init__(self, aggregator):
+Merger.__init__(self, aggregator)
+self.data = {}
+
+def combine(self, iterator):
+ combine the items by creator and combiner 
+# speed up attributes lookup
+d, creator, comb = self.data, self.agg.creator, self.agg.combiner
+for k, v in iterator:
+d[k] = comb(d[k], v) if k in d else creator(v)
+
+def merge(self, iterator):
+ merge the combined items by mergeCombiner 
+# speed up attributes lookup
+d, comb = self.data, self.agg.mergeCombiner
+for k, v in iterator:
+d[k] = comb(d[k], v) if k in d else v
+
+def iteritems(self):
+ return the merged items ad iterator 
+return self.data.iteritems()
+
+
+class ExternalMerger(Merger):
+
+
+External merger will dump the aggregated data into disks when
+memory usage goes above the limit, then merge them together.
+
+This class works as follows:
+
+- It repeatedly combine the items and save them in one dict in 
+  memory.
+
+- When the used memory goes above memory limit, it will split
+  the combined data into partitions by hash code, dump them
+  into disk, one file per partition.
+
+- Then it goes through the rest of the iterator, combine items
+  into different dict by hash. Until the used memory goes over
+  memory limit, it dump all the dicts into disks, one file per
+  dict. Repeat this again until combine all the items.
+
+- Before return any items, it will load each partition and
+  combine them seperately. Yield them before loading next
+  partition.
+
+- During loading a partition, if the memory goes over limit,
+  it will partition the loaded data and dump them into disks
+  

[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-23 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1460#issuecomment-49917537
  
QA tests have started for PR 1460. This patch merges cleanly. brView 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17053/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-23 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1460#issuecomment-49917693
  
QA results for PR 1460:br- This patch FAILED unit tests.br- This patch 
merges cleanlybr- This patch adds the following public classes 
(experimental):brclass AutoSerializer(FramedSerializer):brclass 
Aggregator(object):brclass SimpleAggregator(Aggregator):brclass 
Merger(object):brclass InMemoryMerger(Merger):brclass 
ExternalMerger(Merger):brbrFor more information see test 
ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17043/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-23 Thread davies
Github user davies commented on the pull request:

https://github.com/apache/spark/pull/1460#issuecomment-49927437
  
The last commit has fixed the tests, should run it again.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-23 Thread mateiz
Github user mateiz commented on the pull request:

https://github.com/apache/spark/pull/1460#issuecomment-49928790
  
Ah never mind.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-23 Thread mateiz
Github user mateiz commented on the pull request:

https://github.com/apache/spark/pull/1460#issuecomment-49928696
  
Looks like the latest tested code has an error in the test suite:
```
Running PySpark tests. Output is in python/unit-tests.log.
Running test: pyspark/rdd.py
  File pyspark/rdd.py, line 1239
def add_shuffle_key(split, iterator):
  ^
SyntaxError: invalid syntax
Had test failures; see logs.
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-23 Thread mateiz
Github user mateiz commented on the pull request:

https://github.com/apache/spark/pull/1460#issuecomment-49928744
  
Ah NM. Jenkins, test this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-23 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1460#issuecomment-49930734
  
QA results for PR 1460:br- This patch PASSES unit tests.br- This patch 
merges cleanlybr- This patch adds the following public classes 
(experimental):brclass AutoSerializer(FramedSerializer):brclass 
Aggregator(object):brclass SimpleAggregator(Aggregator):brclass 
Merger(object):brclass InMemoryMerger(Merger):brclass 
ExternalMerger(Merger):brbrFor more information see test 
ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17053/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-23 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1460#discussion_r15325695
  
--- Diff: python/pyspark/shuffle.py ---
@@ -0,0 +1,433 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the License); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an AS IS BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import sys
+import platform
+import shutil
+import warnings
+
+from pyspark.serializers import BatchedSerializer, PickleSerializer
+
+try:
+import psutil
+
+def get_used_memory():
+ Return the used memory in MB 
+self = psutil.Process(os.getpid())
+return self.memory_info().rss  20
--- End diff --

Unfortunately memory_info only works in psutil 2.0. I tried the Anaconda 
Python distribution on Mac, which has psutil 1.2.1, and it doesn't work there. 
In there you have to use get_memory_info() instead.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-23 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1460#discussion_r15325703
  
--- Diff: python/pyspark/shuffle.py ---
@@ -0,0 +1,433 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the License); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an AS IS BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import sys
+import platform
+import shutil
+import warnings
+
+from pyspark.serializers import BatchedSerializer, PickleSerializer
+
+try:
+import psutil
+
+def get_used_memory():
+ Return the used memory in MB 
+self = psutil.Process(os.getpid())
+return self.memory_info().rss  20
--- End diff --

Also, maybe don't call the process self, it's kind of confusing since it 
sounds like a this object


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-23 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1460#issuecomment-49967247
  
QA tests have started for PR 1460. This patch merges cleanly. brView 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17094/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-23 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1460#issuecomment-49969141
  
QA results for PR 1460:br- This patch PASSES unit tests.br- This patch 
merges cleanlybr- This patch adds the following public classes 
(experimental):brclass AutoSerializer(FramedSerializer):brclass 
Aggregator(object):brclass SimpleAggregator(Aggregator):brclass 
Merger(object):brclass InMemoryMerger(Merger):brclass 
ExternalMerger(Merger):brbrFor more information see test 
ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17094/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-22 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1460#issuecomment-49709494
  
QA tests have started for PR 1460. This patch merges cleanly. brView 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16957/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-22 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1460#issuecomment-49718092
  
QA results for PR 1460:br- This patch PASSES unit tests.br- This patch 
merges cleanlybr- This patch adds the following public classes 
(experimental):brclass AutoSerializer(FramedSerializer):brclass 
Aggregator(object):brclass Merger(object):brclass 
InMemoryMerger(Merger):brclass ExternalMerger(Merger):brbrFor more 
information see test 
ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16957/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-22 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1460#issuecomment-49774956
  
QA tests have started for PR 1460. This patch merges cleanly. brView 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16975/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-22 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1460#issuecomment-49788328
  
QA results for PR 1460:br- This patch FAILED unit tests.br- This patch 
merges cleanlybr- This patch adds the following public classes 
(experimental):brclass AutoSerializer(FramedSerializer):brclass 
Aggregator(object):brclass Merger(object):brclass 
InMemoryMerger(Merger):brclass ExternalMerger(Merger):brbrFor more 
information see test 
ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16975/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-22 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1460#issuecomment-49805468
  
QA tests have started for PR 1460. This patch merges cleanly. brView 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16991/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-22 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1460#issuecomment-49808272
  
QA tests have started for PR 1460. This patch merges cleanly. brView 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16993/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-22 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1460#issuecomment-49814922
  
QA results for PR 1460:br- This patch PASSES unit tests.br- This patch 
merges cleanlybr- This patch adds the following public classes 
(experimental):brclass AutoSerializer(FramedSerializer):brclass 
Aggregator(object):brclass Merger(object):brclass 
InMemoryMerger(Merger):brclass ExternalMerger(Merger):brbrFor more 
information see test 
ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16991/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-22 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1460#discussion_r15263569
  
--- Diff: python/pyspark/shuffle.py ---
@@ -0,0 +1,378 @@
+#
--- End diff --

Since this is a new internal file, also add it to the exclude section of 
python/epydoc.conf so it does not show up in the docs


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-22 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1460#discussion_r15263605
  
--- Diff: python/pyspark/shuffle.py ---
@@ -0,0 +1,258 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the License); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an AS IS BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import sys
+import platform
+import shutil
+import warnings
+
+from pyspark.serializers import BatchedSerializer, AutoSerializer
+
+try:
+import psutil
+
+def get_used_memory():
+self = psutil.Process(os.getpid())
+return self.memory_info().rss  20
+
+except ImportError:
+
+def get_used_memory():
+if platform.system() == 'Linux':
+for line in open('/proc/self/status'):
+if line.startswith('VmRSS:'):
+return int(line.split()[1])  10
+else:
+warnings.warn(please install psutil to get accurate memory 
usage)
+if platform.system() == Darwin:
+import resource
+return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss 
 20
+# TODO: support windows
--- End diff --

Alright, then you should open a JIRA saying that this is not supported on 
Windows, and maybe print a warning in spill() if the OS is Windows.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-22 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1460#discussion_r15263661
  
--- Diff: python/pyspark/shuffle.py ---
@@ -0,0 +1,378 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the License); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an AS IS BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import sys
+import platform
+import shutil
+import warnings
+
+from pyspark.serializers import BatchedSerializer, PickleSerializer
+
+try:
+import psutil
+
+def get_used_memory():
+self = psutil.Process(os.getpid())
+return self.memory_info().rss  20
+
+except ImportError:
+
+def get_used_memory():
+if platform.system() == 'Linux':
+for line in open('/proc/self/status'):
+if line.startswith('VmRSS:'):
+return int(line.split()[1])  10
+else:
+warnings.warn(please install psutil to get accurate memory 
usage)
+if platform.system() == Darwin:
+import resource
+return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss 
 20
+# TODO: support windows
+return 0
+
+
+class Aggregator(object):
+
+def __init__(self, creator, combiner, mergeCombiner=None):
+self.creator = creator
+self.combiner = combiner
+self.mergeCombiner = mergeCombiner or combiner
--- End diff --

Call the fields of this createCombiner, mergeValue and mergeCombiners, same 
as in Scala. Also explain what it means for mergeCombiners to be None.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-22 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1460#discussion_r15264180
  
--- Diff: python/pyspark/shuffle.py ---
@@ -0,0 +1,378 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the License); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an AS IS BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import sys
+import platform
+import shutil
+import warnings
+
+from pyspark.serializers import BatchedSerializer, PickleSerializer
+
+try:
+import psutil
+
+def get_used_memory():
+self = psutil.Process(os.getpid())
+return self.memory_info().rss  20
+
+except ImportError:
+
+def get_used_memory():
+if platform.system() == 'Linux':
+for line in open('/proc/self/status'):
+if line.startswith('VmRSS:'):
+return int(line.split()[1])  10
+else:
+warnings.warn(please install psutil to get accurate memory 
usage)
+if platform.system() == Darwin:
+import resource
+return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss 
 20
+# TODO: support windows
+return 0
+
+
+class Aggregator(object):
+
+def __init__(self, creator, combiner, mergeCombiner=None):
+self.creator = creator
+self.combiner = combiner
+self.mergeCombiner = mergeCombiner or combiner
+
+
+class Merger(object):
+
+
+merge shuffled data together by aggregator
+
+
+def __init__(self, aggregator):
+self.agg = aggregator
+
+def combine(self, iterator):
+ combine the items by creator and combiner 
+raise NotImplementedError
+
+def merge(self, iterator):
+ merge the combined items by mergeCombiner 
+raise NotImplementedError
+
+def iteritems(self):
+ return the merged items ad iterator 
+raise NotImplementedError
+
+
+class InMemoryMerger(Merger):
+
+
+In memory merger based on in-memory dict.
+
+
+def __init__(self, aggregator):
+Merger.__init__(self, aggregator)
+self.data = {}
+
+def combine(self, iterator):
+ combine the items by creator and combiner 
+# speed up attributes lookup
+d, creator, comb = self.data, self.agg.creator, self.agg.combiner
+for k, v in iterator:
+d[k] = comb(d[k], v) if k in d else creator(v)
+
+def merge(self, iterator):
+ merge the combined items by mergeCombiner 
+# speed up attributes lookup
+d, comb = self.data, self.agg.mergeCombiner
+for k, v in iterator:
+d[k] = comb(d[k], v) if k in d else v
+
+def iteritems(self):
+ return the merged items ad iterator 
+return self.data.iteritems()
+
+
+class ExternalMerger(Merger):
+
+
+External merger will dump the aggregated data into disks when
+memory usage goes above the limit, then merge them together.
+
+This class works as follows:
+
+- It repeatedly combine the items and save them in one dict in 
+  memory.
+
+- When the used memory goes above memory limit, it will split
+  the combined data into partitions by hash code, dump them
+  into disk, one file per partition.
+
+- Then it goes through the rest of the iterator, combine items
+  into different dict by hash. Until the used memory goes over
+  memory limit, it dump all the dicts into disks, one file per
+  dict. Repeat this again until combine all the items.
--- End diff --

Just to be clear, this recursing happens at most one time, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not 

[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-22 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1460#discussion_r15264236
  
--- Diff: python/pyspark/shuffle.py ---
@@ -0,0 +1,378 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the License); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an AS IS BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import sys
+import platform
+import shutil
+import warnings
+
+from pyspark.serializers import BatchedSerializer, PickleSerializer
+
+try:
+import psutil
+
+def get_used_memory():
+self = psutil.Process(os.getpid())
+return self.memory_info().rss  20
+
+except ImportError:
+
+def get_used_memory():
+if platform.system() == 'Linux':
+for line in open('/proc/self/status'):
+if line.startswith('VmRSS:'):
+return int(line.split()[1])  10
+else:
+warnings.warn(please install psutil to get accurate memory 
usage)
+if platform.system() == Darwin:
+import resource
+return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss 
 20
+# TODO: support windows
+return 0
+
+
+class Aggregator(object):
+
+def __init__(self, creator, combiner, mergeCombiner=None):
+self.creator = creator
+self.combiner = combiner
+self.mergeCombiner = mergeCombiner or combiner
+
+
+class Merger(object):
+
+
+merge shuffled data together by aggregator
+
+
+def __init__(self, aggregator):
+self.agg = aggregator
+
+def combine(self, iterator):
+ combine the items by creator and combiner 
+raise NotImplementedError
+
+def merge(self, iterator):
+ merge the combined items by mergeCombiner 
+raise NotImplementedError
+
+def iteritems(self):
+ return the merged items ad iterator 
+raise NotImplementedError
+
+
+class InMemoryMerger(Merger):
+
+
+In memory merger based on in-memory dict.
+
+
+def __init__(self, aggregator):
+Merger.__init__(self, aggregator)
+self.data = {}
+
+def combine(self, iterator):
+ combine the items by creator and combiner 
+# speed up attributes lookup
+d, creator, comb = self.data, self.agg.creator, self.agg.combiner
+for k, v in iterator:
+d[k] = comb(d[k], v) if k in d else creator(v)
+
+def merge(self, iterator):
+ merge the combined items by mergeCombiner 
+# speed up attributes lookup
+d, comb = self.data, self.agg.mergeCombiner
+for k, v in iterator:
+d[k] = comb(d[k], v) if k in d else v
+
+def iteritems(self):
+ return the merged items ad iterator 
+return self.data.iteritems()
+
+
+class ExternalMerger(Merger):
+
+
+External merger will dump the aggregated data into disks when
+memory usage goes above the limit, then merge them together.
+
+This class works as follows:
+
+- It repeatedly combine the items and save them in one dict in 
+  memory.
+
+- When the used memory goes above memory limit, it will split
+  the combined data into partitions by hash code, dump them
+  into disk, one file per partition.
+
+- Then it goes through the rest of the iterator, combine items
+  into different dict by hash. Until the used memory goes over
+  memory limit, it dump all the dicts into disks, one file per
+  dict. Repeat this again until combine all the items.
+
+- Before return any items, it will load each partition and
+  combine them seperately. Yield them before loading next
+  partition.
+
+- During loading a partition, if the memory goes over limit,
+  it will partition the loaded data and dump them into disks
+  

[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-22 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1460#issuecomment-49816795
  
QA results for PR 1460:br- This patch PASSES unit tests.br- This patch 
merges cleanlybr- This patch adds the following public classes 
(experimental):brclass AutoSerializer(FramedSerializer):brclass 
Aggregator(object):brclass Merger(object):brclass 
InMemoryMerger(Merger):brclass ExternalMerger(Merger):brbrFor more 
information see test 
ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16993/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-22 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1460#discussion_r15264473
  
--- Diff: python/pyspark/shuffle.py ---
@@ -0,0 +1,378 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the License); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an AS IS BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import sys
+import platform
+import shutil
+import warnings
+
+from pyspark.serializers import BatchedSerializer, PickleSerializer
+
+try:
+import psutil
+
+def get_used_memory():
+self = psutil.Process(os.getpid())
+return self.memory_info().rss  20
+
+except ImportError:
+
+def get_used_memory():
+if platform.system() == 'Linux':
+for line in open('/proc/self/status'):
+if line.startswith('VmRSS:'):
+return int(line.split()[1])  10
+else:
+warnings.warn(please install psutil to get accurate memory 
usage)
+if platform.system() == Darwin:
+import resource
+return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss 
 20
+# TODO: support windows
+return 0
+
+
+class Aggregator(object):
+
+def __init__(self, creator, combiner, mergeCombiner=None):
+self.creator = creator
+self.combiner = combiner
+self.mergeCombiner = mergeCombiner or combiner
+
+
+class Merger(object):
+
+
+merge shuffled data together by aggregator
+
+
+def __init__(self, aggregator):
+self.agg = aggregator
+
+def combine(self, iterator):
+ combine the items by creator and combiner 
+raise NotImplementedError
+
+def merge(self, iterator):
+ merge the combined items by mergeCombiner 
+raise NotImplementedError
+
+def iteritems(self):
+ return the merged items ad iterator 
+raise NotImplementedError
+
+
+class InMemoryMerger(Merger):
+
+
+In memory merger based on in-memory dict.
+
+
+def __init__(self, aggregator):
+Merger.__init__(self, aggregator)
+self.data = {}
+
+def combine(self, iterator):
+ combine the items by creator and combiner 
+# speed up attributes lookup
+d, creator, comb = self.data, self.agg.creator, self.agg.combiner
+for k, v in iterator:
+d[k] = comb(d[k], v) if k in d else creator(v)
+
+def merge(self, iterator):
+ merge the combined items by mergeCombiner 
+# speed up attributes lookup
+d, comb = self.data, self.agg.mergeCombiner
+for k, v in iterator:
+d[k] = comb(d[k], v) if k in d else v
+
+def iteritems(self):
+ return the merged items ad iterator 
+return self.data.iteritems()
+
+
+class ExternalMerger(Merger):
+
+
+External merger will dump the aggregated data into disks when
+memory usage goes above the limit, then merge them together.
+
+This class works as follows:
+
+- It repeatedly combine the items and save them in one dict in 
+  memory.
+
+- When the used memory goes above memory limit, it will split
+  the combined data into partitions by hash code, dump them
+  into disk, one file per partition.
+
+- Then it goes through the rest of the iterator, combine items
+  into different dict by hash. Until the used memory goes over
+  memory limit, it dump all the dicts into disks, one file per
+  dict. Repeat this again until combine all the items.
+
+- Before return any items, it will load each partition and
+  combine them seperately. Yield them before loading next
+  partition.
+
+- During loading a partition, if the memory goes over limit,
+  it will partition the loaded data and dump them into disks
+  

[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-22 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1460#discussion_r15264487
  
--- Diff: python/pyspark/tests.py ---
@@ -47,6 +48,64 @@
 SPARK_HOME = os.environ[SPARK_HOME]
 
 
+class TestMerger(unittest.TestCase):
+
+def setUp(self):
+self.N = 1  16
+self.l = [i for i in xrange(self.N)]
+self.data = zip(self.l, self.l)
+self.agg = Aggregator(lambda x: [x], 
+lambda x, y: x.append(y) or x,
+lambda x, y: x.extend(y) or x)
+ExternalMerger.PARTITIONS = 8
+ExternalMerger.BATCH = 1  14
--- End diff --

These are never cleared, so they might affect future tests. It's better to 
make them arguments to the constructor of ExternalMerger.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-22 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1460#discussion_r15263760
  
--- Diff: python/pyspark/shuffle.py ---
@@ -0,0 +1,378 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the License); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an AS IS BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import sys
+import platform
+import shutil
+import warnings
+
+from pyspark.serializers import BatchedSerializer, PickleSerializer
+
+try:
+import psutil
+
+def get_used_memory():
+self = psutil.Process(os.getpid())
+return self.memory_info().rss  20
+
+except ImportError:
+
+def get_used_memory():
+if platform.system() == 'Linux':
+for line in open('/proc/self/status'):
+if line.startswith('VmRSS:'):
+return int(line.split()[1])  10
+else:
+warnings.warn(please install psutil to get accurate memory 
usage)
+if platform.system() == Darwin:
+import resource
+return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss 
 20
+# TODO: support windows
+return 0
+
+
+class Aggregator(object):
+
+def __init__(self, creator, combiner, mergeCombiner=None):
+self.creator = creator
+self.combiner = combiner
+self.mergeCombiner = mergeCombiner or combiner
+
+
+class Merger(object):
+
+
+merge shuffled data together by aggregator
+
+
+def __init__(self, aggregator):
+self.agg = aggregator
+
+def combine(self, iterator):
--- End diff --

Call this combineValues


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-22 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1460#discussion_r15263824
  
--- Diff: python/pyspark/shuffle.py ---
@@ -0,0 +1,378 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the License); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an AS IS BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import sys
+import platform
+import shutil
+import warnings
+
+from pyspark.serializers import BatchedSerializer, PickleSerializer
+
+try:
+import psutil
+
+def get_used_memory():
+self = psutil.Process(os.getpid())
+return self.memory_info().rss  20
+
+except ImportError:
+
+def get_used_memory():
+if platform.system() == 'Linux':
+for line in open('/proc/self/status'):
+if line.startswith('VmRSS:'):
+return int(line.split()[1])  10
+else:
+warnings.warn(please install psutil to get accurate memory 
usage)
+if platform.system() == Darwin:
+import resource
+return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss 
 20
+# TODO: support windows
+return 0
+
+
+class Aggregator(object):
+
+def __init__(self, creator, combiner, mergeCombiner=None):
+self.creator = creator
+self.combiner = combiner
+self.mergeCombiner = mergeCombiner or combiner
+
+
+class Merger(object):
+
+
+merge shuffled data together by aggregator
+
+
+def __init__(self, aggregator):
+self.agg = aggregator
+
+def combine(self, iterator):
+ combine the items by creator and combiner 
+raise NotImplementedError
+
+def merge(self, iterator):
--- End diff --

Call this combineCombiners


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-22 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1460#discussion_r15264608
  
--- Diff: python/pyspark/shuffle.py ---
@@ -0,0 +1,378 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the License); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an AS IS BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import sys
+import platform
+import shutil
+import warnings
+
+from pyspark.serializers import BatchedSerializer, PickleSerializer
+
+try:
+import psutil
+
+def get_used_memory():
+self = psutil.Process(os.getpid())
+return self.memory_info().rss  20
+
+except ImportError:
+
+def get_used_memory():
+if platform.system() == 'Linux':
+for line in open('/proc/self/status'):
+if line.startswith('VmRSS:'):
+return int(line.split()[1])  10
+else:
+warnings.warn(please install psutil to get accurate memory 
usage)
+if platform.system() == Darwin:
+import resource
+return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss 
 20
+# TODO: support windows
+return 0
+
+
+class Aggregator(object):
+
+def __init__(self, creator, combiner, mergeCombiner=None):
+self.creator = creator
+self.combiner = combiner
+self.mergeCombiner = mergeCombiner or combiner
+
+
+class Merger(object):
+
+
+merge shuffled data together by aggregator
+
+
+def __init__(self, aggregator):
+self.agg = aggregator
+
+def combine(self, iterator):
+ combine the items by creator and combiner 
+raise NotImplementedError
+
+def merge(self, iterator):
+ merge the combined items by mergeCombiner 
+raise NotImplementedError
+
+def iteritems(self):
+ return the merged items ad iterator 
+raise NotImplementedError
+
+
+class InMemoryMerger(Merger):
+
+
+In memory merger based on in-memory dict.
+
+
+def __init__(self, aggregator):
+Merger.__init__(self, aggregator)
+self.data = {}
+
+def combine(self, iterator):
+ combine the items by creator and combiner 
+# speed up attributes lookup
+d, creator, comb = self.data, self.agg.creator, self.agg.combiner
+for k, v in iterator:
+d[k] = comb(d[k], v) if k in d else creator(v)
+
+def merge(self, iterator):
+ merge the combined items by mergeCombiner 
+# speed up attributes lookup
+d, comb = self.data, self.agg.mergeCombiner
+for k, v in iterator:
+d[k] = comb(d[k], v) if k in d else v
+
+def iteritems(self):
+ return the merged items ad iterator 
+return self.data.iteritems()
+
+
+class ExternalMerger(Merger):
+
+
+External merger will dump the aggregated data into disks when
+memory usage goes above the limit, then merge them together.
+
+This class works as follows:
+
+- It repeatedly combine the items and save them in one dict in 
+  memory.
+
+- When the used memory goes above memory limit, it will split
+  the combined data into partitions by hash code, dump them
+  into disk, one file per partition.
+
+- Then it goes through the rest of the iterator, combine items
+  into different dict by hash. Until the used memory goes over
+  memory limit, it dump all the dicts into disks, one file per
+  dict. Repeat this again until combine all the items.
+
+- Before return any items, it will load each partition and
+  combine them seperately. Yield them before loading next
+  partition.
+
+- During loading a partition, if the memory goes over limit,
+  it will partition the loaded data and dump them into disks
+  

[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-22 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1460#discussion_r15264616
  
--- Diff: python/pyspark/shuffle.py ---
@@ -0,0 +1,378 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the License); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an AS IS BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import sys
+import platform
+import shutil
+import warnings
+
+from pyspark.serializers import BatchedSerializer, PickleSerializer
+
+try:
+import psutil
+
+def get_used_memory():
+self = psutil.Process(os.getpid())
+return self.memory_info().rss  20
+
+except ImportError:
+
+def get_used_memory():
+if platform.system() == 'Linux':
+for line in open('/proc/self/status'):
+if line.startswith('VmRSS:'):
+return int(line.split()[1])  10
+else:
+warnings.warn(please install psutil to get accurate memory 
usage)
+if platform.system() == Darwin:
+import resource
+return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss 
 20
+# TODO: support windows
+return 0
+
+
+class Aggregator(object):
+
+def __init__(self, creator, combiner, mergeCombiner=None):
+self.creator = creator
+self.combiner = combiner
+self.mergeCombiner = mergeCombiner or combiner
+
+
+class Merger(object):
+
+
+merge shuffled data together by aggregator
+
+
+def __init__(self, aggregator):
+self.agg = aggregator
+
+def combine(self, iterator):
+ combine the items by creator and combiner 
+raise NotImplementedError
+
+def merge(self, iterator):
+ merge the combined items by mergeCombiner 
+raise NotImplementedError
+
+def iteritems(self):
+ return the merged items ad iterator 
+raise NotImplementedError
+
+
+class InMemoryMerger(Merger):
+
+
+In memory merger based on in-memory dict.
+
+
+def __init__(self, aggregator):
+Merger.__init__(self, aggregator)
+self.data = {}
+
+def combine(self, iterator):
+ combine the items by creator and combiner 
+# speed up attributes lookup
+d, creator, comb = self.data, self.agg.creator, self.agg.combiner
+for k, v in iterator:
+d[k] = comb(d[k], v) if k in d else creator(v)
+
+def merge(self, iterator):
+ merge the combined items by mergeCombiner 
+# speed up attributes lookup
+d, comb = self.data, self.agg.mergeCombiner
+for k, v in iterator:
+d[k] = comb(d[k], v) if k in d else v
+
+def iteritems(self):
+ return the merged items ad iterator 
+return self.data.iteritems()
+
+
+class ExternalMerger(Merger):
+
+
+External merger will dump the aggregated data into disks when
+memory usage goes above the limit, then merge them together.
+
+This class works as follows:
+
+- It repeatedly combine the items and save them in one dict in 
+  memory.
+
+- When the used memory goes above memory limit, it will split
+  the combined data into partitions by hash code, dump them
+  into disk, one file per partition.
+
+- Then it goes through the rest of the iterator, combine items
+  into different dict by hash. Until the used memory goes over
+  memory limit, it dump all the dicts into disks, one file per
+  dict. Repeat this again until combine all the items.
+
+- Before return any items, it will load each partition and
+  combine them seperately. Yield them before loading next
+  partition.
+
+- During loading a partition, if the memory goes over limit,
+  it will partition the loaded data and dump them into disks
+  

[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-22 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1460#discussion_r15264650
  
--- Diff: python/pyspark/shuffle.py ---
@@ -0,0 +1,378 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the License); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an AS IS BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import sys
+import platform
+import shutil
+import warnings
+
+from pyspark.serializers import BatchedSerializer, PickleSerializer
+
+try:
+import psutil
+
+def get_used_memory():
+self = psutil.Process(os.getpid())
+return self.memory_info().rss  20
+
+except ImportError:
+
+def get_used_memory():
+if platform.system() == 'Linux':
+for line in open('/proc/self/status'):
+if line.startswith('VmRSS:'):
+return int(line.split()[1])  10
+else:
+warnings.warn(please install psutil to get accurate memory 
usage)
+if platform.system() == Darwin:
+import resource
+return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss 
 20
+# TODO: support windows
+return 0
+
+
+class Aggregator(object):
+
+def __init__(self, creator, combiner, mergeCombiner=None):
+self.creator = creator
+self.combiner = combiner
+self.mergeCombiner = mergeCombiner or combiner
+
+
+class Merger(object):
+
+
+merge shuffled data together by aggregator
+
+
+def __init__(self, aggregator):
+self.agg = aggregator
+
+def combine(self, iterator):
+ combine the items by creator and combiner 
+raise NotImplementedError
+
+def merge(self, iterator):
+ merge the combined items by mergeCombiner 
+raise NotImplementedError
+
+def iteritems(self):
+ return the merged items ad iterator 
+raise NotImplementedError
+
+
+class InMemoryMerger(Merger):
+
+
+In memory merger based on in-memory dict.
+
+
+def __init__(self, aggregator):
+Merger.__init__(self, aggregator)
+self.data = {}
+
+def combine(self, iterator):
+ combine the items by creator and combiner 
+# speed up attributes lookup
+d, creator, comb = self.data, self.agg.creator, self.agg.combiner
+for k, v in iterator:
+d[k] = comb(d[k], v) if k in d else creator(v)
+
+def merge(self, iterator):
+ merge the combined items by mergeCombiner 
+# speed up attributes lookup
+d, comb = self.data, self.agg.mergeCombiner
+for k, v in iterator:
+d[k] = comb(d[k], v) if k in d else v
+
+def iteritems(self):
+ return the merged items ad iterator 
+return self.data.iteritems()
+
+
+class ExternalMerger(Merger):
+
+
+External merger will dump the aggregated data into disks when
+memory usage goes above the limit, then merge them together.
+
+This class works as follows:
+
+- It repeatedly combine the items and save them in one dict in 
+  memory.
+
+- When the used memory goes above memory limit, it will split
+  the combined data into partitions by hash code, dump them
+  into disk, one file per partition.
+
+- Then it goes through the rest of the iterator, combine items
+  into different dict by hash. Until the used memory goes over
+  memory limit, it dump all the dicts into disks, one file per
+  dict. Repeat this again until combine all the items.
+
+- Before return any items, it will load each partition and
+  combine them seperately. Yield them before loading next
+  partition.
+
+- During loading a partition, if the memory goes over limit,
+  it will partition the loaded data and dump them into disks
+  

[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-22 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/1460#discussion_r15266298
  
--- Diff: python/pyspark/shuffle.py ---
@@ -0,0 +1,258 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the License); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an AS IS BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import sys
+import platform
+import shutil
+import warnings
+
+from pyspark.serializers import BatchedSerializer, AutoSerializer
+
+try:
+import psutil
+
+def get_used_memory():
+self = psutil.Process(os.getpid())
+return self.memory_info().rss  20
+
+except ImportError:
+
+def get_used_memory():
+if platform.system() == 'Linux':
+for line in open('/proc/self/status'):
+if line.startswith('VmRSS:'):
+return int(line.split()[1])  10
+else:
+warnings.warn(please install psutil to get accurate memory 
usage)
+if platform.system() == Darwin:
+import resource
+return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss 
 20
+# TODO: support windows
--- End diff --

We already have an warning to tell people that they should install psutil 
then they got spilling.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-22 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/1460#discussion_r15266361
  
--- Diff: python/pyspark/shuffle.py ---
@@ -0,0 +1,378 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the License); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an AS IS BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import sys
+import platform
+import shutil
+import warnings
+
+from pyspark.serializers import BatchedSerializer, PickleSerializer
+
+try:
+import psutil
+
+def get_used_memory():
+self = psutil.Process(os.getpid())
+return self.memory_info().rss  20
+
+except ImportError:
+
+def get_used_memory():
+if platform.system() == 'Linux':
+for line in open('/proc/self/status'):
+if line.startswith('VmRSS:'):
+return int(line.split()[1])  10
+else:
+warnings.warn(please install psutil to get accurate memory 
usage)
+if platform.system() == Darwin:
+import resource
+return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss 
 20
+# TODO: support windows
+return 0
+
+
+class Aggregator(object):
+
+def __init__(self, creator, combiner, mergeCombiner=None):
+self.creator = creator
+self.combiner = combiner
+self.mergeCombiner = mergeCombiner or combiner
+
+
+class Merger(object):
+
+
+merge shuffled data together by aggregator
+
+
+def __init__(self, aggregator):
+self.agg = aggregator
+
+def combine(self, iterator):
+ combine the items by creator and combiner 
+raise NotImplementedError
+
+def merge(self, iterator):
+ merge the combined items by mergeCombiner 
+raise NotImplementedError
+
+def iteritems(self):
+ return the merged items ad iterator 
+raise NotImplementedError
+
+
+class InMemoryMerger(Merger):
+
+
+In memory merger based on in-memory dict.
+
+
+def __init__(self, aggregator):
+Merger.__init__(self, aggregator)
+self.data = {}
+
+def combine(self, iterator):
+ combine the items by creator and combiner 
+# speed up attributes lookup
+d, creator, comb = self.data, self.agg.creator, self.agg.combiner
+for k, v in iterator:
+d[k] = comb(d[k], v) if k in d else creator(v)
+
+def merge(self, iterator):
+ merge the combined items by mergeCombiner 
+# speed up attributes lookup
+d, comb = self.data, self.agg.mergeCombiner
+for k, v in iterator:
+d[k] = comb(d[k], v) if k in d else v
+
+def iteritems(self):
+ return the merged items ad iterator 
+return self.data.iteritems()
+
+
+class ExternalMerger(Merger):
+
+
+External merger will dump the aggregated data into disks when
+memory usage goes above the limit, then merge them together.
+
+This class works as follows:
+
+- It repeatedly combine the items and save them in one dict in 
+  memory.
+
+- When the used memory goes above memory limit, it will split
+  the combined data into partitions by hash code, dump them
+  into disk, one file per partition.
+
+- Then it goes through the rest of the iterator, combine items
+  into different dict by hash. Until the used memory goes over
+  memory limit, it dump all the dicts into disks, one file per
+  dict. Repeat this again until combine all the items.
+
+- Before return any items, it will load each partition and
+  combine them seperately. Yield them before loading next
+  partition.
+
+- During loading a partition, if the memory goes over limit,
+  it will partition the loaded data and dump them into disks
+  

[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-22 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/1460#discussion_r15266513
  
--- Diff: python/pyspark/shuffle.py ---
@@ -0,0 +1,378 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the License); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an AS IS BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import sys
+import platform
+import shutil
+import warnings
+
+from pyspark.serializers import BatchedSerializer, PickleSerializer
+
+try:
+import psutil
+
+def get_used_memory():
+self = psutil.Process(os.getpid())
+return self.memory_info().rss  20
+
+except ImportError:
+
+def get_used_memory():
+if platform.system() == 'Linux':
+for line in open('/proc/self/status'):
+if line.startswith('VmRSS:'):
+return int(line.split()[1])  10
+else:
+warnings.warn(please install psutil to get accurate memory 
usage)
+if platform.system() == Darwin:
+import resource
+return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss 
 20
+# TODO: support windows
+return 0
+
+
+class Aggregator(object):
+
+def __init__(self, creator, combiner, mergeCombiner=None):
+self.creator = creator
+self.combiner = combiner
+self.mergeCombiner = mergeCombiner or combiner
+
+
+class Merger(object):
+
+
+merge shuffled data together by aggregator
+
+
+def __init__(self, aggregator):
+self.agg = aggregator
+
+def combine(self, iterator):
+ combine the items by creator and combiner 
+raise NotImplementedError
+
+def merge(self, iterator):
+ merge the combined items by mergeCombiner 
+raise NotImplementedError
+
+def iteritems(self):
+ return the merged items ad iterator 
+raise NotImplementedError
+
+
+class InMemoryMerger(Merger):
+
+
+In memory merger based on in-memory dict.
+
+
+def __init__(self, aggregator):
+Merger.__init__(self, aggregator)
+self.data = {}
+
+def combine(self, iterator):
+ combine the items by creator and combiner 
+# speed up attributes lookup
+d, creator, comb = self.data, self.agg.creator, self.agg.combiner
+for k, v in iterator:
+d[k] = comb(d[k], v) if k in d else creator(v)
+
+def merge(self, iterator):
+ merge the combined items by mergeCombiner 
+# speed up attributes lookup
+d, comb = self.data, self.agg.mergeCombiner
+for k, v in iterator:
+d[k] = comb(d[k], v) if k in d else v
+
+def iteritems(self):
+ return the merged items ad iterator 
+return self.data.iteritems()
+
+
+class ExternalMerger(Merger):
+
+
+External merger will dump the aggregated data into disks when
+memory usage goes above the limit, then merge them together.
+
+This class works as follows:
+
+- It repeatedly combine the items and save them in one dict in 
+  memory.
+
+- When the used memory goes above memory limit, it will split
+  the combined data into partitions by hash code, dump them
+  into disk, one file per partition.
+
+- Then it goes through the rest of the iterator, combine items
+  into different dict by hash. Until the used memory goes over
+  memory limit, it dump all the dicts into disks, one file per
+  dict. Repeat this again until combine all the items.
+
+- Before return any items, it will load each partition and
+  combine them seperately. Yield them before loading next
+  partition.
+
+- During loading a partition, if the memory goes over limit,
+  it will partition the loaded data and dump them into disks
+  

[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-22 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/1460#discussion_r15266539
  
--- Diff: python/pyspark/tests.py ---
@@ -47,6 +48,64 @@
 SPARK_HOME = os.environ[SPARK_HOME]
 
 
+class TestMerger(unittest.TestCase):
+
+def setUp(self):
+self.N = 1  16
+self.l = [i for i in xrange(self.N)]
+self.data = zip(self.l, self.l)
+self.agg = Aggregator(lambda x: [x], 
+lambda x, y: x.append(y) or x,
+lambda x, y: x.extend(y) or x)
+ExternalMerger.PARTITIONS = 8
+ExternalMerger.BATCH = 1  14
--- End diff --

Good point!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-22 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/1460#discussion_r15266679
  
--- Diff: python/pyspark/shuffle.py ---
@@ -0,0 +1,258 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the License); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an AS IS BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import sys
+import platform
+import shutil
+import warnings
+
+from pyspark.serializers import BatchedSerializer, AutoSerializer
+
+try:
+import psutil
+
+def get_used_memory():
+self = psutil.Process(os.getpid())
+return self.memory_info().rss  20
+
+except ImportError:
+
+def get_used_memory():
+if platform.system() == 'Linux':
+for line in open('/proc/self/status'):
+if line.startswith('VmRSS:'):
+return int(line.split()[1])  10
+else:
+warnings.warn(please install psutil to get accurate memory 
usage)
+if platform.system() == Darwin:
+import resource
+return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss 
 20
+# TODO: support windows
+return 0
+
+
+class Merger(object):
+
+merge shuffled data together by combinator
+
+
+def merge(self, iterator):
+raise NotImplementedError
+
+def iteritems(self):
+raise NotImplementedError
+
+
+class MapMerger(Merger):
+
+In memory merger based on map
+
+
+def __init__(self, combiner):
+self.combiner = combiner
+self.data = {}
+
+def merge(self, iterator):
+d, comb = self.data, self.combiner
+for k, v in iter(iterator):
+d[k] = comb(d[k], v) if k in d else v
+
+def iteritems(self):
+return self.data.iteritems()
+
+
+class ExternalHashMapMerger(Merger):
+
+
+External merger will dump the aggregated data into disks when memory 
usage
+is above the limit, then merge them together.
+
+ combiner = lambda x, y:x+y
+ merger = ExternalHashMapMerger(combiner, 10)
+ N = 1
+ merger.merge(zip(xrange(N), xrange(N)) * 10)
+ assert merger.spills  0
+ sum(v for k,v in merger.iteritems())
+49995
+
+
+PARTITIONS = 64
+BATCH = 1
+
+def __init__(self, combiner, memory_limit=512, serializer=None,
+localdirs=None, scale=1):
+self.combiner = combiner
+self.memory_limit = memory_limit
+self.serializer = serializer or\
+BatchedSerializer(AutoSerializer(), 1024)
+self.localdirs = localdirs or self._get_dirs()
+self.scale = scale
+self.data = {}
+self.pdata = []
+self.spills = 0
+
+def _get_dirs(self):
+path = os.environ.get(SPARK_LOCAL_DIR, /tmp/spark)
+dirs = path.split(,)
+localdirs = []
+for d in dirs:
+d = os.path.join(d, merge, str(os.getpid()))
+try:
+os.makedirs(d)
+localdirs.append(d)
+except IOError:
+pass
+if not localdirs:
+raise IOError(no writable directories:  + path)
+return localdirs
+
+def _get_spill_dir(self, n):
+return os.path.join(self.localdirs[n % len(self.localdirs)], 
str(n))
+
+@property
+def used_memory(self):
+return get_used_memory()
+
+@property
+def next_limit(self):
+return max(self.memory_limit, self.used_memory * 1.05)
+
+def merge(self, iterator, check=True):
+ merge (K,V) pair by combiner 
+iterator = iter(iterator)
+# speedup attribute lookup
+d, comb, batch = self.data, self.combiner, self.BATCH
+c = 0
+for k, v in iterator:
+

[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-22 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/1460#discussion_r15267205
  
--- Diff: python/pyspark/shuffle.py ---
@@ -0,0 +1,378 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the License); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an AS IS BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import sys
+import platform
+import shutil
+import warnings
+
+from pyspark.serializers import BatchedSerializer, PickleSerializer
+
+try:
+import psutil
+
+def get_used_memory():
+self = psutil.Process(os.getpid())
+return self.memory_info().rss  20
+
+except ImportError:
+
+def get_used_memory():
+if platform.system() == 'Linux':
+for line in open('/proc/self/status'):
+if line.startswith('VmRSS:'):
+return int(line.split()[1])  10
+else:
+warnings.warn(please install psutil to get accurate memory 
usage)
+if platform.system() == Darwin:
+import resource
+return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss 
 20
+# TODO: support windows
+return 0
+
+
+class Aggregator(object):
+
+def __init__(self, creator, combiner, mergeCombiner=None):
+self.creator = creator
+self.combiner = combiner
+self.mergeCombiner = mergeCombiner or combiner
+
+
+class Merger(object):
+
+
+merge shuffled data together by aggregator
+
+
+def __init__(self, aggregator):
+self.agg = aggregator
+
+def combine(self, iterator):
+ combine the items by creator and combiner 
+raise NotImplementedError
+
+def merge(self, iterator):
--- End diff --

Is mergeCombiners better?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-22 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/1460#discussion_r15267225
  
--- Diff: python/pyspark/shuffle.py ---
@@ -0,0 +1,378 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the License); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an AS IS BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import sys
+import platform
+import shutil
+import warnings
+
+from pyspark.serializers import BatchedSerializer, PickleSerializer
+
+try:
+import psutil
+
+def get_used_memory():
+self = psutil.Process(os.getpid())
+return self.memory_info().rss  20
+
+except ImportError:
+
+def get_used_memory():
+if platform.system() == 'Linux':
+for line in open('/proc/self/status'):
+if line.startswith('VmRSS:'):
+return int(line.split()[1])  10
+else:
+warnings.warn(please install psutil to get accurate memory 
usage)
+if platform.system() == Darwin:
+import resource
+return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss 
 20
+# TODO: support windows
+return 0
+
+
+class Aggregator(object):
+
+def __init__(self, creator, combiner, mergeCombiner=None):
+self.creator = creator
+self.combiner = combiner
+self.mergeCombiner = mergeCombiner or combiner
+
+
+class Merger(object):
+
+
+merge shuffled data together by aggregator
+
+
+def __init__(self, aggregator):
+self.agg = aggregator
+
+def combine(self, iterator):
--- End diff --

is mergeValuers better? because this class is called Merger,  so 
merger.mergeValues() will more readable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-22 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1460#discussion_r15268717
  
--- Diff: python/pyspark/shuffle.py ---
@@ -0,0 +1,378 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the License); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an AS IS BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import sys
+import platform
+import shutil
+import warnings
+
+from pyspark.serializers import BatchedSerializer, PickleSerializer
+
+try:
+import psutil
+
+def get_used_memory():
+self = psutil.Process(os.getpid())
+return self.memory_info().rss  20
+
+except ImportError:
+
+def get_used_memory():
+if platform.system() == 'Linux':
+for line in open('/proc/self/status'):
+if line.startswith('VmRSS:'):
+return int(line.split()[1])  10
+else:
+warnings.warn(please install psutil to get accurate memory 
usage)
+if platform.system() == Darwin:
+import resource
+return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss 
 20
+# TODO: support windows
+return 0
+
+
+class Aggregator(object):
+
+def __init__(self, creator, combiner, mergeCombiner=None):
+self.creator = creator
+self.combiner = combiner
+self.mergeCombiner = mergeCombiner or combiner
+
+
+class Merger(object):
+
+
+merge shuffled data together by aggregator
+
+
+def __init__(self, aggregator):
+self.agg = aggregator
+
+def combine(self, iterator):
+ combine the items by creator and combiner 
+raise NotImplementedError
+
+def merge(self, iterator):
+ merge the combined items by mergeCombiner 
+raise NotImplementedError
+
+def iteritems(self):
+ return the merged items ad iterator 
+raise NotImplementedError
+
+
+class InMemoryMerger(Merger):
+
+
+In memory merger based on in-memory dict.
+
+
+def __init__(self, aggregator):
+Merger.__init__(self, aggregator)
+self.data = {}
+
+def combine(self, iterator):
+ combine the items by creator and combiner 
+# speed up attributes lookup
+d, creator, comb = self.data, self.agg.creator, self.agg.combiner
+for k, v in iterator:
+d[k] = comb(d[k], v) if k in d else creator(v)
+
+def merge(self, iterator):
+ merge the combined items by mergeCombiner 
+# speed up attributes lookup
+d, comb = self.data, self.agg.mergeCombiner
+for k, v in iterator:
+d[k] = comb(d[k], v) if k in d else v
+
+def iteritems(self):
+ return the merged items ad iterator 
+return self.data.iteritems()
+
+
+class ExternalMerger(Merger):
+
+
+External merger will dump the aggregated data into disks when
+memory usage goes above the limit, then merge them together.
+
+This class works as follows:
+
+- It repeatedly combine the items and save them in one dict in 
+  memory.
+
+- When the used memory goes above memory limit, it will split
+  the combined data into partitions by hash code, dump them
+  into disk, one file per partition.
+
+- Then it goes through the rest of the iterator, combine items
+  into different dict by hash. Until the used memory goes over
+  memory limit, it dump all the dicts into disks, one file per
+  dict. Repeat this again until combine all the items.
+
+- Before return any items, it will load each partition and
+  combine them seperately. Yield them before loading next
+  partition.
+
+- During loading a partition, if the memory goes over limit,
+  it will partition the loaded data and dump them into disks
+  

[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-22 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1460#discussion_r15268733
  
--- Diff: python/pyspark/shuffle.py ---
@@ -0,0 +1,378 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the License); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an AS IS BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import sys
+import platform
+import shutil
+import warnings
+
+from pyspark.serializers import BatchedSerializer, PickleSerializer
+
+try:
+import psutil
+
+def get_used_memory():
+self = psutil.Process(os.getpid())
+return self.memory_info().rss  20
+
+except ImportError:
+
+def get_used_memory():
+if platform.system() == 'Linux':
+for line in open('/proc/self/status'):
+if line.startswith('VmRSS:'):
+return int(line.split()[1])  10
+else:
+warnings.warn(please install psutil to get accurate memory 
usage)
+if platform.system() == Darwin:
+import resource
+return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss 
 20
+# TODO: support windows
+return 0
+
+
+class Aggregator(object):
+
+def __init__(self, creator, combiner, mergeCombiner=None):
+self.creator = creator
+self.combiner = combiner
+self.mergeCombiner = mergeCombiner or combiner
+
+
+class Merger(object):
+
+
+merge shuffled data together by aggregator
+
+
+def __init__(self, aggregator):
+self.agg = aggregator
+
+def combine(self, iterator):
+ combine the items by creator and combiner 
+raise NotImplementedError
+
+def merge(self, iterator):
+ merge the combined items by mergeCombiner 
+raise NotImplementedError
+
+def iteritems(self):
+ return the merged items ad iterator 
+raise NotImplementedError
+
+
+class InMemoryMerger(Merger):
+
+
+In memory merger based on in-memory dict.
+
+
+def __init__(self, aggregator):
+Merger.__init__(self, aggregator)
+self.data = {}
+
+def combine(self, iterator):
+ combine the items by creator and combiner 
+# speed up attributes lookup
+d, creator, comb = self.data, self.agg.creator, self.agg.combiner
+for k, v in iterator:
+d[k] = comb(d[k], v) if k in d else creator(v)
+
+def merge(self, iterator):
+ merge the combined items by mergeCombiner 
+# speed up attributes lookup
+d, comb = self.data, self.agg.mergeCombiner
+for k, v in iterator:
+d[k] = comb(d[k], v) if k in d else v
+
+def iteritems(self):
+ return the merged items ad iterator 
+return self.data.iteritems()
+
+
+class ExternalMerger(Merger):
+
+
+External merger will dump the aggregated data into disks when
+memory usage goes above the limit, then merge them together.
+
+This class works as follows:
+
+- It repeatedly combine the items and save them in one dict in 
+  memory.
+
+- When the used memory goes above memory limit, it will split
+  the combined data into partitions by hash code, dump them
+  into disk, one file per partition.
+
+- Then it goes through the rest of the iterator, combine items
+  into different dict by hash. Until the used memory goes over
+  memory limit, it dump all the dicts into disks, one file per
+  dict. Repeat this again until combine all the items.
+
+- Before return any items, it will load each partition and
+  combine them seperately. Yield them before loading next
+  partition.
+
+- During loading a partition, if the memory goes over limit,
+  it will partition the loaded data and dump them into disks
+  

[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-22 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1460#discussion_r15268727
  
--- Diff: python/pyspark/shuffle.py ---
@@ -0,0 +1,378 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the License); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an AS IS BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import sys
+import platform
+import shutil
+import warnings
+
+from pyspark.serializers import BatchedSerializer, PickleSerializer
+
+try:
+import psutil
+
+def get_used_memory():
+self = psutil.Process(os.getpid())
+return self.memory_info().rss  20
+
+except ImportError:
+
+def get_used_memory():
+if platform.system() == 'Linux':
+for line in open('/proc/self/status'):
+if line.startswith('VmRSS:'):
+return int(line.split()[1])  10
+else:
+warnings.warn(please install psutil to get accurate memory 
usage)
+if platform.system() == Darwin:
+import resource
+return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss 
 20
+# TODO: support windows
+return 0
+
+
+class Aggregator(object):
+
+def __init__(self, creator, combiner, mergeCombiner=None):
+self.creator = creator
+self.combiner = combiner
+self.mergeCombiner = mergeCombiner or combiner
+
+
+class Merger(object):
+
+
+merge shuffled data together by aggregator
+
+
+def __init__(self, aggregator):
+self.agg = aggregator
+
+def combine(self, iterator):
+ combine the items by creator and combiner 
+raise NotImplementedError
+
+def merge(self, iterator):
+ merge the combined items by mergeCombiner 
+raise NotImplementedError
+
+def iteritems(self):
+ return the merged items ad iterator 
+raise NotImplementedError
+
+
+class InMemoryMerger(Merger):
+
+
+In memory merger based on in-memory dict.
+
+
+def __init__(self, aggregator):
+Merger.__init__(self, aggregator)
+self.data = {}
+
+def combine(self, iterator):
+ combine the items by creator and combiner 
+# speed up attributes lookup
+d, creator, comb = self.data, self.agg.creator, self.agg.combiner
+for k, v in iterator:
+d[k] = comb(d[k], v) if k in d else creator(v)
+
+def merge(self, iterator):
+ merge the combined items by mergeCombiner 
+# speed up attributes lookup
+d, comb = self.data, self.agg.mergeCombiner
+for k, v in iterator:
+d[k] = comb(d[k], v) if k in d else v
+
+def iteritems(self):
+ return the merged items ad iterator 
+return self.data.iteritems()
+
+
+class ExternalMerger(Merger):
+
+
+External merger will dump the aggregated data into disks when
+memory usage goes above the limit, then merge them together.
+
+This class works as follows:
+
+- It repeatedly combine the items and save them in one dict in 
+  memory.
+
+- When the used memory goes above memory limit, it will split
+  the combined data into partitions by hash code, dump them
+  into disk, one file per partition.
+
+- Then it goes through the rest of the iterator, combine items
+  into different dict by hash. Until the used memory goes over
+  memory limit, it dump all the dicts into disks, one file per
+  dict. Repeat this again until combine all the items.
+
+- Before return any items, it will load each partition and
+  combine them seperately. Yield them before loading next
+  partition.
+
+- During loading a partition, if the memory goes over limit,
+  it will partition the loaded data and dump them into disks
+  

[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-22 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1460#discussion_r15268748
  
--- Diff: python/pyspark/shuffle.py ---
@@ -0,0 +1,378 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the License); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an AS IS BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import sys
+import platform
+import shutil
+import warnings
+
+from pyspark.serializers import BatchedSerializer, PickleSerializer
+
+try:
+import psutil
+
+def get_used_memory():
+self = psutil.Process(os.getpid())
+return self.memory_info().rss  20
+
+except ImportError:
+
+def get_used_memory():
+if platform.system() == 'Linux':
+for line in open('/proc/self/status'):
+if line.startswith('VmRSS:'):
+return int(line.split()[1])  10
+else:
+warnings.warn(please install psutil to get accurate memory 
usage)
+if platform.system() == Darwin:
+import resource
+return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss 
 20
+# TODO: support windows
+return 0
+
+
+class Aggregator(object):
+
+def __init__(self, creator, combiner, mergeCombiner=None):
+self.creator = creator
+self.combiner = combiner
+self.mergeCombiner = mergeCombiner or combiner
+
+
+class Merger(object):
+
+
+merge shuffled data together by aggregator
+
+
+def __init__(self, aggregator):
+self.agg = aggregator
+
+def combine(self, iterator):
+ combine the items by creator and combiner 
+raise NotImplementedError
+
+def merge(self, iterator):
+ merge the combined items by mergeCombiner 
+raise NotImplementedError
+
+def iteritems(self):
+ return the merged items ad iterator 
+raise NotImplementedError
+
+
+class InMemoryMerger(Merger):
+
+
+In memory merger based on in-memory dict.
+
+
+def __init__(self, aggregator):
+Merger.__init__(self, aggregator)
+self.data = {}
+
+def combine(self, iterator):
+ combine the items by creator and combiner 
+# speed up attributes lookup
+d, creator, comb = self.data, self.agg.creator, self.agg.combiner
+for k, v in iterator:
+d[k] = comb(d[k], v) if k in d else creator(v)
+
+def merge(self, iterator):
+ merge the combined items by mergeCombiner 
+# speed up attributes lookup
+d, comb = self.data, self.agg.mergeCombiner
+for k, v in iterator:
+d[k] = comb(d[k], v) if k in d else v
+
+def iteritems(self):
+ return the merged items ad iterator 
+return self.data.iteritems()
+
+
+class ExternalMerger(Merger):
+
+
+External merger will dump the aggregated data into disks when
+memory usage goes above the limit, then merge them together.
+
+This class works as follows:
+
+- It repeatedly combine the items and save them in one dict in 
+  memory.
+
+- When the used memory goes above memory limit, it will split
+  the combined data into partitions by hash code, dump them
+  into disk, one file per partition.
+
+- Then it goes through the rest of the iterator, combine items
+  into different dict by hash. Until the used memory goes over
+  memory limit, it dump all the dicts into disks, one file per
+  dict. Repeat this again until combine all the items.
+
+- Before return any items, it will load each partition and
+  combine them seperately. Yield them before loading next
+  partition.
+
+- During loading a partition, if the memory goes over limit,
+  it will partition the loaded data and dump them into disks
+  

[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-22 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1460#discussion_r15268757
  
--- Diff: python/pyspark/shuffle.py ---
@@ -0,0 +1,416 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the License); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an AS IS BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import sys
+import platform
+import shutil
+import warnings
+
+from pyspark.serializers import BatchedSerializer, PickleSerializer
+
+try:
+import psutil
+
+def get_used_memory():
+ return the used memory in MB 
+self = psutil.Process(os.getpid())
+return self.memory_info().rss  20
+
+except ImportError:
+
+def get_used_memory():
+ return the used memory in MB 
+if platform.system() == 'Linux':
+for line in open('/proc/self/status'):
+if line.startswith('VmRSS:'):
+return int(line.split()[1])  10
+else:
+warnings.warn(please install psutil to have better 
+support with spilling)
+if platform.system() == Darwin:
+import resource
+rss = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
+return rss  20
+# TODO: support windows
+return 0
+
+
+class Aggregator(object):
+
+
+Aggregator has tree functions to merge values into combiner.
+
+createCombiner:  (value) - combiner
+mergeValue:  (combine, value) - combiner
+mergeCombiners:  (combiner, combiner) - combiner
+
+
+def __init__(self, createCombiner, mergeValue, mergeCombiners):
+self.createCombiner = createCombiner
+self.mergeValue = mergeValue
+self.mergeCombiners = mergeCombiners
+
+
+class SimpleAggregator(Aggregator):
+
+
+SimpleAggregator is useful for the cases that combiners have
+same type with values
+
+
+def __init__(self, combiner):
+Aggregator.__init__(self, lambda x: x, combiner, combiner)
+
+
+class Merger(object):
+
+
+merge shuffled data together by aggregator
+
+
+def __init__(self, aggregator):
+self.agg = aggregator
+
+def mergeValues(self, iterator):
+ combine the items by creator and combiner 
+raise NotImplementedError
+
+def mergeCombiners(self, iterator):
+ merge the combined items by mergeCombiner 
+raise NotImplementedError
+
+def iteritems(self):
+ return the merged items ad iterator 
+raise NotImplementedError
+
+
+class InMemoryMerger(Merger):
+
+
+In memory merger based on in-memory dict.
+
+
+def __init__(self, aggregator):
+Merger.__init__(self, aggregator)
+self.data = {}
+
+def mergeValues(self, iterator):
+ combine the items by creator and combiner 
+# speed up attributes lookup
+d, creator = self.data, self.agg.createCombiner
+comb = self.agg.mergeValue
+for k, v in iterator:
+d[k] = comb(d[k], v) if k in d else creator(v)
+
+def mergeCombiners(self, iterator):
+ merge the combined items by mergeCombiner 
+# speed up attributes lookup
+d, comb = self.data, self.agg.mergeCombiners
+for k, v in iterator:
+d[k] = comb(d[k], v) if k in d else v
+
+def iteritems(self):
+ return the merged items ad iterator 
+return self.data.iteritems()
+
+
+class ExternalMerger(Merger):
+
+
+External merger will dump the aggregated data into disks when
+memory usage goes above the limit, then merge them together.
+
+This class works as follows:
+
+- It repeatedly combine the items and save them in one dict in 
+  memory.
+
+- When 

[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-22 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/1460#discussion_r15271189
  
--- Diff: python/pyspark/shuffle.py ---
@@ -0,0 +1,416 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the License); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an AS IS BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import sys
+import platform
+import shutil
+import warnings
+
+from pyspark.serializers import BatchedSerializer, PickleSerializer
+
+try:
+import psutil
+
+def get_used_memory():
+ return the used memory in MB 
+self = psutil.Process(os.getpid())
+return self.memory_info().rss  20
+
+except ImportError:
+
+def get_used_memory():
+ return the used memory in MB 
+if platform.system() == 'Linux':
+for line in open('/proc/self/status'):
+if line.startswith('VmRSS:'):
+return int(line.split()[1])  10
+else:
+warnings.warn(please install psutil to have better 
+support with spilling)
+if platform.system() == Darwin:
+import resource
+rss = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
+return rss  20
+# TODO: support windows
+return 0
+
+
+class Aggregator(object):
+
+
+Aggregator has tree functions to merge values into combiner.
+
+createCombiner:  (value) - combiner
+mergeValue:  (combine, value) - combiner
+mergeCombiners:  (combiner, combiner) - combiner
+
+
+def __init__(self, createCombiner, mergeValue, mergeCombiners):
+self.createCombiner = createCombiner
+self.mergeValue = mergeValue
+self.mergeCombiners = mergeCombiners
+
+
+class SimpleAggregator(Aggregator):
+
+
+SimpleAggregator is useful for the cases that combiners have
+same type with values
+
+
+def __init__(self, combiner):
+Aggregator.__init__(self, lambda x: x, combiner, combiner)
+
+
+class Merger(object):
+
+
+merge shuffled data together by aggregator
+
+
+def __init__(self, aggregator):
+self.agg = aggregator
+
+def mergeValues(self, iterator):
+ combine the items by creator and combiner 
+raise NotImplementedError
+
+def mergeCombiners(self, iterator):
+ merge the combined items by mergeCombiner 
+raise NotImplementedError
+
+def iteritems(self):
+ return the merged items ad iterator 
+raise NotImplementedError
+
+
+class InMemoryMerger(Merger):
+
+
+In memory merger based on in-memory dict.
+
+
+def __init__(self, aggregator):
+Merger.__init__(self, aggregator)
+self.data = {}
+
+def mergeValues(self, iterator):
+ combine the items by creator and combiner 
+# speed up attributes lookup
+d, creator = self.data, self.agg.createCombiner
+comb = self.agg.mergeValue
+for k, v in iterator:
+d[k] = comb(d[k], v) if k in d else creator(v)
+
+def mergeCombiners(self, iterator):
+ merge the combined items by mergeCombiner 
+# speed up attributes lookup
+d, comb = self.data, self.agg.mergeCombiners
+for k, v in iterator:
+d[k] = comb(d[k], v) if k in d else v
+
+def iteritems(self):
+ return the merged items ad iterator 
+return self.data.iteritems()
+
+
+class ExternalMerger(Merger):
+
+
+External merger will dump the aggregated data into disks when
+memory usage goes above the limit, then merge them together.
+
+This class works as follows:
+
+- It repeatedly combine the items and save them in one dict in 
+  memory.
+
+- When 

[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-22 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/1460#discussion_r15271358
  
--- Diff: python/pyspark/shuffle.py ---
@@ -0,0 +1,378 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the License); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an AS IS BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import sys
+import platform
+import shutil
+import warnings
+
+from pyspark.serializers import BatchedSerializer, PickleSerializer
+
+try:
+import psutil
+
+def get_used_memory():
+self = psutil.Process(os.getpid())
+return self.memory_info().rss  20
+
+except ImportError:
+
+def get_used_memory():
+if platform.system() == 'Linux':
+for line in open('/proc/self/status'):
+if line.startswith('VmRSS:'):
+return int(line.split()[1])  10
+else:
+warnings.warn(please install psutil to get accurate memory 
usage)
+if platform.system() == Darwin:
+import resource
+return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss 
 20
+# TODO: support windows
+return 0
+
+
+class Aggregator(object):
+
+def __init__(self, creator, combiner, mergeCombiner=None):
+self.creator = creator
+self.combiner = combiner
+self.mergeCombiner = mergeCombiner or combiner
+
+
+class Merger(object):
+
+
+merge shuffled data together by aggregator
+
+
+def __init__(self, aggregator):
+self.agg = aggregator
+
+def combine(self, iterator):
+ combine the items by creator and combiner 
+raise NotImplementedError
+
+def merge(self, iterator):
+ merge the combined items by mergeCombiner 
+raise NotImplementedError
+
+def iteritems(self):
+ return the merged items ad iterator 
+raise NotImplementedError
+
+
+class InMemoryMerger(Merger):
+
+
+In memory merger based on in-memory dict.
+
+
+def __init__(self, aggregator):
+Merger.__init__(self, aggregator)
+self.data = {}
+
+def combine(self, iterator):
+ combine the items by creator and combiner 
+# speed up attributes lookup
+d, creator, comb = self.data, self.agg.creator, self.agg.combiner
+for k, v in iterator:
+d[k] = comb(d[k], v) if k in d else creator(v)
+
+def merge(self, iterator):
+ merge the combined items by mergeCombiner 
+# speed up attributes lookup
+d, comb = self.data, self.agg.mergeCombiner
+for k, v in iterator:
+d[k] = comb(d[k], v) if k in d else v
+
+def iteritems(self):
+ return the merged items ad iterator 
+return self.data.iteritems()
+
+
+class ExternalMerger(Merger):
+
+
+External merger will dump the aggregated data into disks when
+memory usage goes above the limit, then merge them together.
+
+This class works as follows:
+
+- It repeatedly combine the items and save them in one dict in 
+  memory.
+
+- When the used memory goes above memory limit, it will split
+  the combined data into partitions by hash code, dump them
+  into disk, one file per partition.
+
+- Then it goes through the rest of the iterator, combine items
+  into different dict by hash. Until the used memory goes over
+  memory limit, it dump all the dicts into disks, one file per
+  dict. Repeat this again until combine all the items.
+
+- Before return any items, it will load each partition and
+  combine them seperately. Yield them before loading next
+  partition.
+
+- During loading a partition, if the memory goes over limit,
+  it will partition the loaded data and dump them into disks
+  

[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-22 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/1460#discussion_r15271374
  
--- Diff: python/pyspark/shuffle.py ---
@@ -0,0 +1,378 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the License); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an AS IS BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import sys
+import platform
+import shutil
+import warnings
+
+from pyspark.serializers import BatchedSerializer, PickleSerializer
+
+try:
+import psutil
+
+def get_used_memory():
+self = psutil.Process(os.getpid())
+return self.memory_info().rss  20
+
+except ImportError:
+
+def get_used_memory():
+if platform.system() == 'Linux':
+for line in open('/proc/self/status'):
+if line.startswith('VmRSS:'):
+return int(line.split()[1])  10
+else:
+warnings.warn(please install psutil to get accurate memory 
usage)
+if platform.system() == Darwin:
+import resource
+return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss 
 20
+# TODO: support windows
+return 0
+
+
+class Aggregator(object):
+
+def __init__(self, creator, combiner, mergeCombiner=None):
+self.creator = creator
+self.combiner = combiner
+self.mergeCombiner = mergeCombiner or combiner
+
+
+class Merger(object):
+
+
+merge shuffled data together by aggregator
+
+
+def __init__(self, aggregator):
+self.agg = aggregator
+
+def combine(self, iterator):
+ combine the items by creator and combiner 
+raise NotImplementedError
+
+def merge(self, iterator):
+ merge the combined items by mergeCombiner 
+raise NotImplementedError
+
+def iteritems(self):
+ return the merged items ad iterator 
+raise NotImplementedError
+
+
+class InMemoryMerger(Merger):
+
+
+In memory merger based on in-memory dict.
+
+
+def __init__(self, aggregator):
+Merger.__init__(self, aggregator)
+self.data = {}
+
+def combine(self, iterator):
+ combine the items by creator and combiner 
+# speed up attributes lookup
+d, creator, comb = self.data, self.agg.creator, self.agg.combiner
+for k, v in iterator:
+d[k] = comb(d[k], v) if k in d else creator(v)
+
+def merge(self, iterator):
+ merge the combined items by mergeCombiner 
+# speed up attributes lookup
+d, comb = self.data, self.agg.mergeCombiner
+for k, v in iterator:
+d[k] = comb(d[k], v) if k in d else v
+
+def iteritems(self):
+ return the merged items ad iterator 
+return self.data.iteritems()
+
+
+class ExternalMerger(Merger):
+
+
+External merger will dump the aggregated data into disks when
+memory usage goes above the limit, then merge them together.
+
+This class works as follows:
+
+- It repeatedly combine the items and save them in one dict in 
+  memory.
+
+- When the used memory goes above memory limit, it will split
+  the combined data into partitions by hash code, dump them
+  into disk, one file per partition.
+
+- Then it goes through the rest of the iterator, combine items
+  into different dict by hash. Until the used memory goes over
+  memory limit, it dump all the dicts into disks, one file per
+  dict. Repeat this again until combine all the items.
+
+- Before return any items, it will load each partition and
+  combine them seperately. Yield them before loading next
+  partition.
+
+- During loading a partition, if the memory goes over limit,
+  it will partition the loaded data and dump them into disks
+  

[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-22 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/1460#discussion_r15271462
  
--- Diff: python/pyspark/shuffle.py ---
@@ -0,0 +1,378 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the License); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an AS IS BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import sys
+import platform
+import shutil
+import warnings
+
+from pyspark.serializers import BatchedSerializer, PickleSerializer
+
+try:
+import psutil
+
+def get_used_memory():
+self = psutil.Process(os.getpid())
+return self.memory_info().rss  20
+
+except ImportError:
+
+def get_used_memory():
+if platform.system() == 'Linux':
+for line in open('/proc/self/status'):
+if line.startswith('VmRSS:'):
+return int(line.split()[1])  10
+else:
+warnings.warn(please install psutil to get accurate memory 
usage)
+if platform.system() == Darwin:
+import resource
+return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss 
 20
+# TODO: support windows
+return 0
+
+
+class Aggregator(object):
+
+def __init__(self, creator, combiner, mergeCombiner=None):
+self.creator = creator
+self.combiner = combiner
+self.mergeCombiner = mergeCombiner or combiner
+
+
+class Merger(object):
+
+
+merge shuffled data together by aggregator
+
+
+def __init__(self, aggregator):
+self.agg = aggregator
+
+def combine(self, iterator):
+ combine the items by creator and combiner 
+raise NotImplementedError
+
+def merge(self, iterator):
+ merge the combined items by mergeCombiner 
+raise NotImplementedError
+
+def iteritems(self):
+ return the merged items ad iterator 
+raise NotImplementedError
+
+
+class InMemoryMerger(Merger):
+
+
+In memory merger based on in-memory dict.
+
+
+def __init__(self, aggregator):
+Merger.__init__(self, aggregator)
+self.data = {}
+
+def combine(self, iterator):
+ combine the items by creator and combiner 
+# speed up attributes lookup
+d, creator, comb = self.data, self.agg.creator, self.agg.combiner
+for k, v in iterator:
+d[k] = comb(d[k], v) if k in d else creator(v)
+
+def merge(self, iterator):
+ merge the combined items by mergeCombiner 
+# speed up attributes lookup
+d, comb = self.data, self.agg.mergeCombiner
+for k, v in iterator:
+d[k] = comb(d[k], v) if k in d else v
+
+def iteritems(self):
+ return the merged items ad iterator 
+return self.data.iteritems()
+
+
+class ExternalMerger(Merger):
+
+
+External merger will dump the aggregated data into disks when
+memory usage goes above the limit, then merge them together.
+
+This class works as follows:
+
+- It repeatedly combine the items and save them in one dict in 
+  memory.
+
+- When the used memory goes above memory limit, it will split
+  the combined data into partitions by hash code, dump them
+  into disk, one file per partition.
+
+- Then it goes through the rest of the iterator, combine items
+  into different dict by hash. Until the used memory goes over
+  memory limit, it dump all the dicts into disks, one file per
+  dict. Repeat this again until combine all the items.
+
+- Before return any items, it will load each partition and
+  combine them seperately. Yield them before loading next
+  partition.
+
+- During loading a partition, if the memory goes over limit,
+  it will partition the loaded data and dump them into disks
+  

[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-21 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1460#issuecomment-49647944
  
QA tests have started for PR 1460. This patch merges cleanly. brView 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16918/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-21 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1460#discussion_r15188798
  
--- Diff: python/pyspark/rdd.py ---
@@ -168,6 +169,18 @@ def _replaceRoot(self, value):
 self._sink(1)
 
 
+def _parse_memory(s):
--- End diff --

Add a comment to this saying it returns a number in MB


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-21 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1460#issuecomment-49658257
  
QA tests have started for PR 1460. This patch merges cleanly. brView 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16919/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-21 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1460#issuecomment-49660273
  
QA results for PR 1460:br- This patch PASSES unit tests.br- This patch 
merges cleanlybr- This patch adds the following public classes 
(experimental):brclass AutoSerializer(FramedSerializer):brclass 
Merger(object):brclass MapMerger(Merger):brclass 
ExternalHashMapMerger(Merger):brbrFor more information see test 
ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16918/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-21 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1460#issuecomment-49670702
  
QA results for PR 1460:br- This patch PASSES unit tests.br- This patch 
merges cleanlybr- This patch adds the following public classes 
(experimental):brclass AutoSerializer(FramedSerializer):brclass 
Merger(object):brclass MapMerger(Merger):brclass 
ExternalHashMapMerger(Merger):brbrFor more information see test 
ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16919/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-21 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1460#issuecomment-49671323
  
QA tests have started for PR 1460. This patch merges cleanly. brView 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16929/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-21 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1460#issuecomment-49680209
  
QA results for PR 1460:br- This patch PASSES unit tests.br- This patch 
merges cleanlybr- This patch adds the following public classes 
(experimental):brclass AutoSerializer(FramedSerializer):brclass 
Merger(object):brclass MapMerger(Merger):brclass 
ExternalHashMapMerger(Merger):brbrFor more information see test 
ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16929/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-21 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1460#discussion_r15207593
  
--- Diff: docs/configuration.md ---
@@ -195,6 +195,15 @@ Apart from these, the following properties are also 
available, and may be useful
 Spark's dependencies and user dependencies. It is currently an 
experimental feature.
   /td
 /tr
+tr
+  tdcodespark.python.worker.memory/code/td
+  td512m/td
+  td
+Amount of memory to use per python worker process during aggregation, 
in the same
+format as JVM memory strings (e.g. code512m/code, 
code2g/code). If the memory
+used during aggregation go above this amount, it will spill the data 
into disks.
--- End diff --

Small typo: go - goes


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-21 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1460#discussion_r15207614
  
--- Diff: python/pyspark/rdd.py ---
@@ -1247,15 +1262,16 @@ def combineLocally(iterator):
 return combiners.iteritems()
 locally_combined = self.mapPartitions(combineLocally)
 shuffled = locally_combined.partitionBy(numPartitions)
-
+ 
+serializer = self.ctx.serializer
+spill = ((self.ctx._conf.get(spark.shuffle.spill) or 
'True').lower()
+in ('true', '1', 'yes'))
--- End diff --

Probably just want to allow true to be similar to the way it works in 
Java and Scala. So just check `self.ctx._conf.get(spark.shuffle.spill, 
true) == true`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-21 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1460#discussion_r15207622
  
--- Diff: python/pyspark/rdd.py ---
@@ -168,6 +169,20 @@ def _replaceRoot(self, value):
 self._sink(1)
 
 
+def _parse_memory(s):
+
+It returns a number in MB
--- End diff --

Say something slightly longer, e.g. Parse a memory string in the format 
supported by Java (e.g. 1g, 200m) and return the value in MB


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-21 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1460#discussion_r15207630
  
--- Diff: python/pyspark/serializers.py ---
@@ -297,6 +297,33 @@ class MarshalSerializer(FramedSerializer):
 loads = marshal.loads
 
 
+class AutoSerializer(FramedSerializer):
--- End diff --

Can we actually use this by default yet or will it fail for NumPy arrays? 
If it won't work by default, we should use PickleSerializer instead and wait to 
fix this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-21 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1460#discussion_r15207645
  
--- Diff: python/pyspark/shuffle.py ---
@@ -0,0 +1,258 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the License); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an AS IS BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import sys
+import platform
+import shutil
+import warnings
+
+from pyspark.serializers import BatchedSerializer, AutoSerializer
+
+try:
+import psutil
+
+def get_used_memory():
+self = psutil.Process(os.getpid())
+return self.memory_info().rss  20
+
+except ImportError:
+
+def get_used_memory():
+if platform.system() == 'Linux':
+for line in open('/proc/self/status'):
+if line.startswith('VmRSS:'):
+return int(line.split()[1])  10
+else:
+warnings.warn(please install psutil to get accurate memory 
usage)
+if platform.system() == Darwin:
+import resource
+return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss 
 20
+# TODO: support windows
--- End diff --

Have you looked into what it would take to support Windows? I guess right 
now it will keep returning 0 there and never spill?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-21 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1460#discussion_r15207668
  
--- Diff: python/pyspark/shuffle.py ---
@@ -0,0 +1,258 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the License); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an AS IS BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import sys
+import platform
+import shutil
+import warnings
+
+from pyspark.serializers import BatchedSerializer, AutoSerializer
+
+try:
+import psutil
+
+def get_used_memory():
+self = psutil.Process(os.getpid())
+return self.memory_info().rss  20
+
+except ImportError:
+
+def get_used_memory():
+if platform.system() == 'Linux':
+for line in open('/proc/self/status'):
+if line.startswith('VmRSS:'):
+return int(line.split()[1])  10
+else:
+warnings.warn(please install psutil to get accurate memory 
usage)
+if platform.system() == Darwin:
+import resource
+return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss 
 20
+# TODO: support windows
+return 0
+
+
+class Merger(object):
+
+merge shuffled data together by combinator
+
+
+def merge(self, iterator):
+raise NotImplementedError
+
+def iteritems(self):
+raise NotImplementedError
+
+
+class MapMerger(Merger):
+
+In memory merger based on map
+
+
+def __init__(self, combiner):
+self.combiner = combiner
+self.data = {}
+
+def merge(self, iterator):
+d, comb = self.data, self.combiner
+for k, v in iter(iterator):
+d[k] = comb(d[k], v) if k in d else v
+
+def iteritems(self):
+return self.data.iteritems()
+
+
+class ExternalHashMapMerger(Merger):
--- End diff --

Better to call this ExternalMerger


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-21 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1460#discussion_r15207662
  
--- Diff: python/pyspark/shuffle.py ---
@@ -0,0 +1,258 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the License); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an AS IS BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import sys
+import platform
+import shutil
+import warnings
+
+from pyspark.serializers import BatchedSerializer, AutoSerializer
+
+try:
+import psutil
+
+def get_used_memory():
+self = psutil.Process(os.getpid())
+return self.memory_info().rss  20
+
+except ImportError:
+
+def get_used_memory():
+if platform.system() == 'Linux':
+for line in open('/proc/self/status'):
+if line.startswith('VmRSS:'):
+return int(line.split()[1])  10
+else:
+warnings.warn(please install psutil to get accurate memory 
usage)
+if platform.system() == Darwin:
+import resource
+return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss 
 20
+# TODO: support windows
+return 0
+
+
+class Merger(object):
+
+merge shuffled data together by combinator
+
+
+def merge(self, iterator):
+raise NotImplementedError
+
+def iteritems(self):
+raise NotImplementedError
+
+
+class MapMerger(Merger):
--- End diff --

Better to call this InMemoryMerger


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-21 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1460#discussion_r15207683
  
--- Diff: python/pyspark/tests.py ---
@@ -47,6 +48,40 @@
 SPARK_HOME = os.environ[SPARK_HOME]
 
 
+class TestMerger(unittest.TestCase):
+
+def setUp(self):
+self.N = 118
+self.l = [i for i in xrange(self.N)]
+self.data = zip(self.l, self.l)
+ExternalHashMapMerger.PARTITIONS = 8
+ExternalHashMapMerger.BATCH = 114
+
+def test_in_memory(self):
+m = MapMerger(lambda x,y: x+y)
+m.merge(self.data)
+self.assertEqual(sum(v for k,v in m.iteritems()), 
sum(xrange(self.N)))
--- End diff --

Put spaces around the operators (e.g. `,`, `+`, etc)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-21 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1460#discussion_r15207731
  
--- Diff: python/pyspark/rdd.py ---
@@ -1247,15 +1262,16 @@ def combineLocally(iterator):
 return combiners.iteritems()
 locally_combined = self.mapPartitions(combineLocally)
 shuffled = locally_combined.partitionBy(numPartitions)
-
+ 
+serializer = self.ctx.serializer
+spill = ((self.ctx._conf.get(spark.shuffle.spill) or 
'True').lower()
+in ('true', '1', 'yes'))
+memory = 
_parse_memory(self.ctx._conf.get(spark.python.worker.memory) or 512m)
 def _mergeCombiners(iterator):
-combiners = {}
-for (k, v) in iterator:
-if k not in combiners:
-combiners[k] = v
-else:
-combiners[k] = mergeCombiners(combiners[k], v)
-return combiners.iteritems()
+merger = ExternalHashMapMerger(mergeCombiners, memory, 
serializer)\
+ if spill else MapMerger(mergeCombiners)
--- End diff --

Add a space before the `\` to match our code style in other places


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-21 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1460#discussion_r15207721
  
--- Diff: python/pyspark/rdd.py ---
@@ -1247,15 +1262,16 @@ def combineLocally(iterator):
 return combiners.iteritems()
 locally_combined = self.mapPartitions(combineLocally)
 shuffled = locally_combined.partitionBy(numPartitions)
-
+ 
+serializer = self.ctx.serializer
+spill = ((self.ctx._conf.get(spark.shuffle.spill) or 
'True').lower()
+in ('true', '1', 'yes'))
+memory = 
_parse_memory(self.ctx._conf.get(spark.python.worker.memory) or 512m)
 def _mergeCombiners(iterator):
-combiners = {}
-for (k, v) in iterator:
-if k not in combiners:
-combiners[k] = v
-else:
-combiners[k] = mergeCombiners(combiners[k], v)
-return combiners.iteritems()
+merger = ExternalHashMapMerger(mergeCombiners, memory, 
serializer)\
+ if spill else MapMerger(mergeCombiners)
+merger.merge(iterator)
+return merger.iteritems()
 return shuffled.mapPartitions(_mergeCombiners)
--- End diff --

This only implements external merging in the reduce tasks, but we need it 
in the map tasks too. For that you'll need to modify the Merger interface to 
take `createCombiner`, `mergeValue` and `mergeCombiners` together. Please add 
those there and add tests for the richer interface.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-21 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1460#discussion_r15207775
  
--- Diff: python/pyspark/shuffle.py ---
@@ -0,0 +1,258 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the License); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an AS IS BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import sys
+import platform
+import shutil
+import warnings
+
+from pyspark.serializers import BatchedSerializer, AutoSerializer
+
+try:
+import psutil
+
+def get_used_memory():
+self = psutil.Process(os.getpid())
+return self.memory_info().rss  20
+
+except ImportError:
+
+def get_used_memory():
+if platform.system() == 'Linux':
+for line in open('/proc/self/status'):
+if line.startswith('VmRSS:'):
+return int(line.split()[1])  10
+else:
+warnings.warn(please install psutil to get accurate memory 
usage)
+if platform.system() == Darwin:
+import resource
+return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss 
 20
+# TODO: support windows
+return 0
+
+
+class Merger(object):
+
+merge shuffled data together by combinator
+
+
+def merge(self, iterator):
+raise NotImplementedError
+
+def iteritems(self):
+raise NotImplementedError
+
+
+class MapMerger(Merger):
+
+In memory merger based on map
+
+
+def __init__(self, combiner):
+self.combiner = combiner
+self.data = {}
+
+def merge(self, iterator):
+d, comb = self.data, self.combiner
+for k, v in iter(iterator):
+d[k] = comb(d[k], v) if k in d else v
+
+def iteritems(self):
+return self.data.iteritems()
+
+
+class ExternalHashMapMerger(Merger):
+
+
+External merger will dump the aggregated data into disks when memory 
usage
+is above the limit, then merge them together.
+
+ combiner = lambda x, y:x+y
+ merger = ExternalHashMapMerger(combiner, 10)
+ N = 1
+ merger.merge(zip(xrange(N), xrange(N)) * 10)
+ assert merger.spills  0
+ sum(v for k,v in merger.iteritems())
+49995
+
+
+PARTITIONS = 64
+BATCH = 1
+
+def __init__(self, combiner, memory_limit=512, serializer=None,
+localdirs=None, scale=1):
+self.combiner = combiner
+self.memory_limit = memory_limit
+self.serializer = serializer or\
+BatchedSerializer(AutoSerializer(), 1024)
+self.localdirs = localdirs or self._get_dirs()
+self.scale = scale
+self.data = {}
+self.pdata = []
+self.spills = 0
+
+def _get_dirs(self):
+path = os.environ.get(SPARK_LOCAL_DIR, /tmp/spark)
+dirs = path.split(,)
+localdirs = []
+for d in dirs:
+d = os.path.join(d, merge, str(os.getpid()))
+try:
+os.makedirs(d)
+localdirs.append(d)
+except IOError:
+pass
+if not localdirs:
+raise IOError(no writable directories:  + path)
+return localdirs
+
+def _get_spill_dir(self, n):
+return os.path.join(self.localdirs[n % len(self.localdirs)], 
str(n))
+
+@property
+def used_memory(self):
+return get_used_memory()
--- End diff --

Probably better to just call `get_used_memory()` all the time instead of 
making this a property, to indicate that it's expensive.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at 

[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-21 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1460#discussion_r15207812
  
--- Diff: python/pyspark/shuffle.py ---
@@ -0,0 +1,258 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the License); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an AS IS BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import sys
+import platform
+import shutil
+import warnings
+
+from pyspark.serializers import BatchedSerializer, AutoSerializer
+
+try:
+import psutil
+
+def get_used_memory():
+self = psutil.Process(os.getpid())
+return self.memory_info().rss  20
+
+except ImportError:
+
+def get_used_memory():
+if platform.system() == 'Linux':
+for line in open('/proc/self/status'):
+if line.startswith('VmRSS:'):
+return int(line.split()[1])  10
+else:
+warnings.warn(please install psutil to get accurate memory 
usage)
+if platform.system() == Darwin:
+import resource
+return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss 
 20
+# TODO: support windows
+return 0
+
+
+class Merger(object):
+
+merge shuffled data together by combinator
+
+
+def merge(self, iterator):
+raise NotImplementedError
+
+def iteritems(self):
+raise NotImplementedError
+
+
+class MapMerger(Merger):
+
+In memory merger based on map
+
+
+def __init__(self, combiner):
+self.combiner = combiner
+self.data = {}
+
+def merge(self, iterator):
+d, comb = self.data, self.combiner
+for k, v in iter(iterator):
+d[k] = comb(d[k], v) if k in d else v
+
+def iteritems(self):
+return self.data.iteritems()
+
+
+class ExternalHashMapMerger(Merger):
+
+
+External merger will dump the aggregated data into disks when memory 
usage
+is above the limit, then merge them together.
+
+ combiner = lambda x, y:x+y
+ merger = ExternalHashMapMerger(combiner, 10)
+ N = 1
+ merger.merge(zip(xrange(N), xrange(N)) * 10)
+ assert merger.spills  0
+ sum(v for k,v in merger.iteritems())
+49995
+
+
+PARTITIONS = 64
+BATCH = 1
+
+def __init__(self, combiner, memory_limit=512, serializer=None,
+localdirs=None, scale=1):
+self.combiner = combiner
+self.memory_limit = memory_limit
+self.serializer = serializer or\
+BatchedSerializer(AutoSerializer(), 1024)
+self.localdirs = localdirs or self._get_dirs()
+self.scale = scale
+self.data = {}
+self.pdata = []
+self.spills = 0
+
+def _get_dirs(self):
+path = os.environ.get(SPARK_LOCAL_DIR, /tmp/spark)
+dirs = path.split(,)
+localdirs = []
+for d in dirs:
+d = os.path.join(d, merge, str(os.getpid()))
+try:
+os.makedirs(d)
+localdirs.append(d)
+except IOError:
+pass
+if not localdirs:
+raise IOError(no writable directories:  + path)
+return localdirs
+
+def _get_spill_dir(self, n):
+return os.path.join(self.localdirs[n % len(self.localdirs)], 
str(n))
+
+@property
+def used_memory(self):
+return get_used_memory()
+
+@property
+def next_limit(self):
+return max(self.memory_limit, self.used_memory * 1.05)
--- End diff --

Same here, make this a method, it's confusing for it to be a property


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but 

[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-21 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1460#discussion_r15207837
  
--- Diff: python/pyspark/shuffle.py ---
@@ -0,0 +1,258 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the License); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an AS IS BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import sys
+import platform
+import shutil
+import warnings
+
+from pyspark.serializers import BatchedSerializer, AutoSerializer
+
+try:
+import psutil
+
+def get_used_memory():
+self = psutil.Process(os.getpid())
+return self.memory_info().rss  20
+
+except ImportError:
+
+def get_used_memory():
+if platform.system() == 'Linux':
+for line in open('/proc/self/status'):
+if line.startswith('VmRSS:'):
+return int(line.split()[1])  10
+else:
+warnings.warn(please install psutil to get accurate memory 
usage)
+if platform.system() == Darwin:
+import resource
+return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss 
 20
+# TODO: support windows
+return 0
+
+
+class Merger(object):
+
+merge shuffled data together by combinator
+
+
+def merge(self, iterator):
+raise NotImplementedError
+
+def iteritems(self):
+raise NotImplementedError
+
+
+class MapMerger(Merger):
+
+In memory merger based on map
+
+
+def __init__(self, combiner):
+self.combiner = combiner
+self.data = {}
+
+def merge(self, iterator):
+d, comb = self.data, self.combiner
+for k, v in iter(iterator):
+d[k] = comb(d[k], v) if k in d else v
+
+def iteritems(self):
+return self.data.iteritems()
+
+
+class ExternalHashMapMerger(Merger):
+
+
+External merger will dump the aggregated data into disks when memory 
usage
+is above the limit, then merge them together.
+
+ combiner = lambda x, y:x+y
+ merger = ExternalHashMapMerger(combiner, 10)
+ N = 1
+ merger.merge(zip(xrange(N), xrange(N)) * 10)
+ assert merger.spills  0
+ sum(v for k,v in merger.iteritems())
+49995
+
+
+PARTITIONS = 64
+BATCH = 1
+
+def __init__(self, combiner, memory_limit=512, serializer=None,
+localdirs=None, scale=1):
+self.combiner = combiner
+self.memory_limit = memory_limit
+self.serializer = serializer or\
+BatchedSerializer(AutoSerializer(), 1024)
+self.localdirs = localdirs or self._get_dirs()
+self.scale = scale
+self.data = {}
+self.pdata = []
+self.spills = 0
+
+def _get_dirs(self):
+path = os.environ.get(SPARK_LOCAL_DIR, /tmp/spark)
+dirs = path.split(,)
+localdirs = []
+for d in dirs:
+d = os.path.join(d, merge, str(os.getpid()))
+try:
+os.makedirs(d)
+localdirs.append(d)
+except IOError:
+pass
+if not localdirs:
+raise IOError(no writable directories:  + path)
+return localdirs
+
+def _get_spill_dir(self, n):
+return os.path.join(self.localdirs[n % len(self.localdirs)], 
str(n))
+
+@property
+def used_memory(self):
+return get_used_memory()
+
+@property
+def next_limit(self):
+return max(self.memory_limit, self.used_memory * 1.05)
--- End diff --

Also I don't really understand the purpose of this, why don't we always go 
up to the same limit?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if 

  1   2   >