lost executor due to large shuffle spill memory

2016-04-05 Thread lllll
I have a task to remap the index to actual uuid in ALS prediction results.
But it consistently fail due to lost executors. I noticed there's large
shuffle spill memory but I don't know how to improve it. 

 

I've tried to reduce the number of executors while assigning each to have
bigger memory. 
 

But it still doesn't seem big enough. I don't know what to do. 

Below is my code:
user = load_user()
product = load_product()
user.cache()
product.cache()
model = load_model(model_path)
all_pairs = user.map(lambda x: x[1]).cartesian(product.map(lambda x: x[1]))
all_prediction = model.predictAll(all_pairs)
user_reverse = user.map(lambda r: (r[1], r[0]))
product_reverse = product.map(lambda r: (r[1], r[0]))
user_reversed = all_prediction.map(lambda u: (u[0], (u[1],
u[2]))).join(user_reverse).map(lambda r: (r[1][0][0], (r[1][1],
r[1][0][1])))
both_reversed = user_reversed.join(product_reverse).map(lambda r:
(r[1][0][0], r[1][1], r[1][0][1]))
both_reversed.map(lambda x: '{}|{}|{}'.format(x[0], x[1],
x[2])).saveAsTextFile(recommendation_path)

Both user and products are (uuid, index) tuples. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/lost-executor-due-to-large-shuffle-spill-memory-tp26683.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



zip two RDD in pyspark

2014-07-28 Thread lllll
I have a file in s3 that I want to map each line with an index. Here is my
code:

 input_data = sc.textFile('s3n:/myinput',minPartitions=6).cache()
 N input_data.count()
 index = sc.parallelize(range(N), 6)
 index.zip(input_data).collect()

...
14/07/28 19:49:31 INFO DAGScheduler: Completed ResultTask(18, 4)
14/07/28 19:49:31 INFO DAGScheduler: Stage 18 (collect at stdin:1)
finished in 0.031 s
14/07/28 19:49:31 INFO SparkContext: Job finished: collect at stdin:1,
took 0.03707 s
Traceback (most recent call last):
  File stdin, line 1, in module
  File /root/spark/python/pyspark/rdd.py, line 584, in collect
return list(self._collect_iterator_through_file(bytesInJava))
  File /root/spark/python/pyspark/rdd.py, line 592, in
_collect_iterator_through_file
self.ctx._writeToFile(iterator, tempFile.name)
  File /root/spark/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py,
line 537, in __call__
  File /root/spark/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py, line
300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling
z:org.apache.spark.api.python.PythonRDD.writeToFile.
: java.lang.ClassCastException: java.lang.String cannot be cast to [B
at
org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$3.apply(PythonRDD.scala:312)
at
org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$3.apply(PythonRDD.scala:309)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at
org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:309)
at 
org.apache.spark.api.python.PythonRDD$.writeToFile(PythonRDD.scala:342)
at 
org.apache.spark.api.python.PythonRDD$.writeToFile(PythonRDD.scala:337)
at org.apache.spark.api.python.PythonRDD.writeToFile(PythonRDD.scala)
at sun.reflect.GeneratedMethodAccessor24.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:744)

As I see it, the job is completed, but I don't understand what's happening
to 'String cannot be cast to [B'. I tried to zip two parallelCollectionRDD
and it works fine. But here I have a MappedRDD at textFile. Not sure what's
going on here. 

Also, why Python does not have ZipWithIndex()?

Thanks for any help. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/zip-two-RDD-in-pyspark-tp10806.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.