use case: have a dataset, and want to use different algorithms on that, and
fetch the result.

for making this, I think I should distribute my algorithms, and run these
algorithms on the dataset at the same time, am I right?

but it seems that spark can not parallelize/serialize algorithms/functions,
then how to make it?


*here is the test code*:

------------------------------------------------------------------------------------------------
def test():
    pass
function_list = [test] * 10

sc.parallelize([test] * 10).take(1)
------------------------------------------------------------------------------------------------


*error message: *
Py4JJavaError: An error occurred while calling
z:org.apache.spark.api.python.PythonRDD.runJob.

: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2
in stage 9.0 failed 4 times, most recent failure: Lost task 2.3 in stage
9.0 (TID 105, sh-demo-hadoop-07):
org.apache.spark.api.python.PythonException: Traceback (most recent call
last):

  File
"/usr/local/spark-1.6.0-bin-cdh4/python/lib/pyspark.zip/pyspark/worker.py",
line 111, in main

    process()

  File
"/usr/local/spark-1.6.0-bin-cdh4/python/lib/pyspark.zip/pyspark/worker.py",
line 106, in process

    serializer.dump_stream(func(split_index, iterator), outfile)

  File
"/usr/local/spark-1.6.0-bin-cdh4/python/lib/pyspark.zip/pyspark/serializers.py",
line 263, in dump_stream

    vs = list(itertools.islice(iterator, batch))

  File "/datayes/spark_process/spark-1.6.0-bin-cdh4/python/pyspark/rdd.py",
line 1293, in takeUpToNumLeft

  File
"/usr/local/spark-1.6.0-bin-cdh4/python/lib/pyspark.zip/pyspark/serializers.py",
line 139, in load_stream

    yield self._read_with_length(stream)

  File
"/usr/local/spark-1.6.0-bin-cdh4/python/lib/pyspark.zip/pyspark/serializers.py",
line 164, in _read_with_length

    return self.loads(obj)

  File
"/usr/local/spark-1.6.0-bin-cdh4/python/lib/pyspark.zip/pyspark/serializers.py",
line 422, in loads

    return pickle.loads(obj)

AttributeError: 'module' object has no attribute 'test'


at
org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)

at
org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)

at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)

at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)

at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)

at org.apache.spark.scheduler.Task.run(Task.scala:89)

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

at java.lang.Thread.run(Thread.java:745)


what's interesting is that* when I run sc.parallelize([test] *
10).collect() , it works fine*, returns :

[<function __main__.test>,

 <function __main__.test>,

 <function __main__.test>,

 <function __main__.test>,

 <function __main__.test>,

 <function __main__.test>,

 <function __main__.test>,

 <function __main__.test>,

 <function __main__.test>,

 <function __main__.test>]




-- 
--------------------------------------
a spark lover, a quant, a developer and a good man.

http://github.com/litaotao

Reply via email to