Re: Spark Implementation of XGBoost
There's an xgboost exploration jira SPARK-8547. Can it be a good start? 2015-10-27 7:07 GMT+08:00 DB Tsai <dbt...@dbtsai.com>: > Also, does it support categorical feature? > > Sincerely, > > DB Tsai > -- > Web: https://www.dbtsai.com > PGP Key ID: 0xAF08DF8D > > > On Mon, Oct 26, 2015 at 4:06 PM, DB Tsai <dbt...@dbtsai.com> wrote: >> Interesting. For feature sub-sampling, is it per-node or per-tree? Do >> you think you can implement generic GBM and have it merged as part of >> Spark codebase? >> >> Sincerely, >> >> DB Tsai >> -- >> Web: https://www.dbtsai.com >> PGP Key ID: 0xAF08DF8D >> >> >> On Mon, Oct 26, 2015 at 11:42 AM, Meihua Wu >> <rotationsymmetr...@gmail.com> wrote: >>> Hi Spark User/Dev, >>> >>> Inspired by the success of XGBoost, I have created a Spark package for >>> gradient boosting tree with 2nd order approximation of arbitrary >>> user-defined loss functions. >>> >>> https://github.com/rotationsymmetry/SparkXGBoost >>> >>> Currently linear (normal) regression, binary classification, Poisson >>> regression are supported. You can extend with other loss function as >>> well. >>> >>> L1, L2, bagging, feature sub-sampling are also employed to avoid >>> overfitting. >>> >>> Thank you for testing. I am looking forward to your comments and >>> suggestions. Bugs or improvements can be reported through GitHub. >>> >>> Many thanks! >>> >>> Meihua >>> >>> ----- >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>> For additional commands, e-mail: user-h...@spark.apache.org >>> > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > -- Yizhi Liu Senior Software Engineer / Data Mining www.mvad.com, Shanghai, China - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to take user jars precedence over Spark jars
I'm trying to read a Thrift object from SequenceFile, using elephant-bird's ThriftWritable. My code looks like val rawData = sc.sequenceFile[BooleanWritable, ThriftWritable[TrainingSample]](input) val samples = rawData.map { case (key, value) => { value.setConverter(classOf[TrainingSample]) val conversion = if (key.get) 1 else 0 val sample = value.get (conversion, sample) }} When I spark-submit in local mode, it failed with (Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 2, localhost): java.lang.AbstractMethodError: org.apache.thrift.TUnion.standardSchemeReadValue(Lorg/apache/thrift/protocol/TProtocol;Lorg/apache/thrift/protocol/TField;)Ljava/lang/Object; ... ... I'm pretty sure it is caused by the conflict of libthrift, I use thrift-0.6.1 while spark uses 0.9.2, which requires TUnion object to implement the abstract 'standardSchemeReadValue' method. But when I set spark.files.userClassPathFirst=true, it failed even earlier: (Job aborted due to stage failure: Task 1 in stage 0.0 failed 1 times, most recent failure: Lost task 1.0 in stage 0.0 (TID 1, localhost): java.lang.ClassCastException: cannot assign instance of scala.None$ to field org.apache.spark.scheduler.Task.metrics of type scala.Option in instance of org.apache.spark.scheduler.ResultTask at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2089) at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2006) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:69) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:95) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:194) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) It seems I introduced more conflict, but I couldn't figure out which one caused this failure. Interestingly, when I ran mvn test in my project, which test spark job in locally mode, all worked fine. So what is the right way to take user jars precedence over Spark jars? -- Yizhi Liu Senior Software Engineer / Data Mining www.mvad.com, Shanghai, China - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to take user jars precedence over Spark jars
Hi Ted, Unfortunately these two options cause following failure in my environment: (java.lang.RuntimeException: class org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback not org.apache.hadoop.security.GroupMappingServiceProvider,java.lang.RuntimeException: java.lang.RuntimeException: class org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback not org.apache.hadoop.security.GroupMappingServiceProvider) 2015-10-19 22:23 GMT+08:00 Ted Yu <yuzhih...@gmail.com>: > Have you tried the following options ? > > --conf spark.driver.userClassPathFirst=true --conf > spark.executor.userClassPathFirst=true > > Cheers > > On Mon, Oct 19, 2015 at 5:07 AM, YiZhi Liu <javeli...@gmail.com> wrote: >> >> I'm trying to read a Thrift object from SequenceFile, using >> elephant-bird's ThriftWritable. My code looks like >> >> val rawData = sc.sequenceFile[BooleanWritable, >> ThriftWritable[TrainingSample]](input) >> val samples = rawData.map { case (key, value) => { >> value.setConverter(classOf[TrainingSample]) >> val conversion = if (key.get) 1 else 0 >> val sample = value.get >> (conversion, sample) >> }} >> >> When I spark-submit in local mode, it failed with >> >> (Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, >> most recent failure: Lost task 0.0 in stage 1.0 (TID 2, localhost): >> java.lang.AbstractMethodError: >> >> org.apache.thrift.TUnion.standardSchemeReadValue(Lorg/apache/thrift/protocol/TProtocol;Lorg/apache/thrift/protocol/TField;)Ljava/lang/Object; >> ... ... >> >> I'm pretty sure it is caused by the conflict of libthrift, I use >> thrift-0.6.1 while spark uses 0.9.2, which requires TUnion object to >> implement the abstract 'standardSchemeReadValue' method. >> >> But when I set spark.files.userClassPathFirst=true, it failed even >> earlier: >> >> (Job aborted due to stage failure: Task 1 in stage 0.0 failed 1 times, >> most recent failure: Lost task 1.0 in stage 0.0 (TID 1, localhost): >> java.lang.ClassCastException: cannot assign instance of scala.None$ to >> field org.apache.spark.scheduler.Task.metrics of type scala.Option in >> instance of org.apache.spark.scheduler.ResultTask >> at >> java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2089) >> at >> java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261) >> at >> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2006) >> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924) >> at >> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) >> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) >> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) >> at >> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:69) >> at >> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:95) >> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:194) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >> at java.lang.Thread.run(Thread.java:745) >> >> It seems I introduced more conflict, but I couldn't figure out which >> one caused this failure. >> >> Interestingly, when I ran mvn test in my project, which test spark job >> in locally mode, all worked fine. >> >> So what is the right way to take user jars precedence over Spark jars? >> >> -- >> Yizhi Liu >> Senior Software Engineer / Data Mining >> www.mvad.com, Shanghai, China >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> > -- Yizhi Liu Senior Software Engineer / Data Mining www.mvad.com, Shanghai, China - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: What is the difference between ml.classification.LogisticRegression and mllib.classification.LogisticRegressionWithLBFGS
Hi Joseph, Thank you for clarifying the motivation that you setup a different API for ml pipelines, it sounds great. But I still think we could extract some common parts of the training & inference procedures for ml and mllib. In ml.classification.LogisticRegression, you simply transform the DataFrame into RDD and follow the same procedures in mllib.optimization.{LBFGS,OWLQN}, right? My suggestion is, if I may, ml package should focus on the public API, and leave the underlying implementations, e.g. numerical optimization, to mllib package. Please let me know if my understanding has any problem. Thank you! 2015-10-08 1:15 GMT+08:00 Joseph Bradley <jos...@databricks.com>: > Hi YiZhi Liu, > > The spark.ml classes are part of the higher-level "Pipelines" API, which > works with DataFrames. When creating this API, we decided to separate it > from the old API to avoid confusion. You can read more about it here: > http://spark.apache.org/docs/latest/ml-guide.html > > For (3): We use Breeze, but we have to modify it in order to do distributed > optimization based on Spark. > > Joseph > > On Tue, Oct 6, 2015 at 11:47 PM, YiZhi Liu <javeli...@gmail.com> wrote: >> >> Hi everyone, >> >> I'm curious about the difference between >> ml.classification.LogisticRegression and >> mllib.classification.LogisticRegressionWithLBFGS. Both of them are >> optimized using LBFGS, the only difference I see is LogisticRegression >> takes DataFrame while LogisticRegressionWithLBFGS takes RDD. >> >> So I wonder, >> 1. Why not simply add a DataFrame training interface to >> LogisticRegressionWithLBFGS? >> 2. Whats the difference between ml.classification and >> mllib.classification package? >> 3. Why doesn't ml.classification.LogisticRegression call >> mllib.optimization.LBFGS / mllib.optimization.OWLQN directly? Instead, >> it uses breeze.optimize.LBFGS and re-implements most of the procedures >> in mllib.optimization.{LBFGS,OWLQN}. >> >> Thank you. >> >> Best, >> >> -- >> Yizhi Liu >> Senior Software Engineer / Data Mining >> www.mvad.com, Shanghai, China >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> > -- Yizhi Liu Senior Software Engineer / Data Mining www.mvad.com, Shanghai, China - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: What is the difference between ml.classification.LogisticRegression and mllib.classification.LogisticRegressionWithLBFGS
Hi Tsai, Thank you for pointing out the implementation details which I missed. Yes I saw several jira issues with the intercept, regularization and standardization, I just didn't realize it made such a big impact. Thanks again. 2015-10-13 4:32 GMT+08:00 DB Tsai <dbt...@dbtsai.com>: > Hi Liu, > > In ML, even after extracting the data into RDD, the versions between MLib > and ML are quite different. Due to legacy design, in MLlib, we use Updater > for handling regularization, and this layer of abstraction also does > adaptive step size which is only for SGD. In order to get it working with > LBFGS, some hacks were being done here and there, and in Updater, all the > components including intercept are regularized which is not desirable in > many cases. Also, in the legacy design, it's hard for us to do in-place > standardization to improve the convergency rate. As a result, at some point, > we decide to ditch those abstractions, and customize them for each > algorithms. (Even LiR and LoR use different tricks to have better > performance for numerical optimization, so it's hard to share code at that > time. But I can see the point that we have working code now, so it's time to > try to refactor those code to share more.) > > > Sincerely, > > DB Tsai > -- > Blog: https://www.dbtsai.com > PGP Key ID: 0xAF08DF8D > > On Mon, Oct 12, 2015 at 1:24 AM, YiZhi Liu <javeli...@gmail.com> wrote: >> >> Hi Joseph, >> >> Thank you for clarifying the motivation that you setup a different API >> for ml pipelines, it sounds great. But I still think we could extract >> some common parts of the training & inference procedures for ml and >> mllib. In ml.classification.LogisticRegression, you simply transform >> the DataFrame into RDD and follow the same procedures in >> mllib.optimization.{LBFGS,OWLQN}, right? >> >> My suggestion is, if I may, ml package should focus on the public API, >> and leave the underlying implementations, e.g. numerical optimization, >> to mllib package. >> >> Please let me know if my understanding has any problem. Thank you! >> >> 2015-10-08 1:15 GMT+08:00 Joseph Bradley <jos...@databricks.com>: >> > Hi YiZhi Liu, >> > >> > The spark.ml classes are part of the higher-level "Pipelines" API, which >> > works with DataFrames. When creating this API, we decided to separate >> > it >> > from the old API to avoid confusion. You can read more about it here: >> > http://spark.apache.org/docs/latest/ml-guide.html >> > >> > For (3): We use Breeze, but we have to modify it in order to do >> > distributed >> > optimization based on Spark. >> > >> > Joseph >> > >> > On Tue, Oct 6, 2015 at 11:47 PM, YiZhi Liu <javeli...@gmail.com> wrote: >> >> >> >> Hi everyone, >> >> >> >> I'm curious about the difference between >> >> ml.classification.LogisticRegression and >> >> mllib.classification.LogisticRegressionWithLBFGS. Both of them are >> >> optimized using LBFGS, the only difference I see is LogisticRegression >> >> takes DataFrame while LogisticRegressionWithLBFGS takes RDD. >> >> >> >> So I wonder, >> >> 1. Why not simply add a DataFrame training interface to >> >> LogisticRegressionWithLBFGS? >> >> 2. Whats the difference between ml.classification and >> >> mllib.classification package? >> >> 3. Why doesn't ml.classification.LogisticRegression call >> >> mllib.optimization.LBFGS / mllib.optimization.OWLQN directly? Instead, >> >> it uses breeze.optimize.LBFGS and re-implements most of the procedures >> >> in mllib.optimization.{LBFGS,OWLQN}. >> >> >> >> Thank you. >> >> >> >> Best, >> >> >> >> -- >> >> Yizhi Liu >> >> Senior Software Engineer / Data Mining >> >> www.mvad.com, Shanghai, China >> >> >> >> - >> >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> >> For additional commands, e-mail: user-h...@spark.apache.org >> >> >> > >> >> >> >> -- >> Yizhi Liu >> Senior Software Engineer / Data Mining >> www.mvad.com, Shanghai, China >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> > -- Yizhi Liu Senior Software Engineer / Data Mining www.mvad.com, Shanghai, China - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
What is the difference between ml.classification.LogisticRegression and mllib.classification.LogisticRegressionWithLBFGS
Hi everyone, I'm curious about the difference between ml.classification.LogisticRegression and mllib.classification.LogisticRegressionWithLBFGS. Both of them are optimized using LBFGS, the only difference I see is LogisticRegression takes DataFrame while LogisticRegressionWithLBFGS takes RDD. So I wonder, 1. Why not simply add a DataFrame training interface to LogisticRegressionWithLBFGS? 2. Whats the difference between ml.classification and mllib.classification package? 3. Why doesn't ml.classification.LogisticRegression call mllib.optimization.LBFGS / mllib.optimization.OWLQN directly? Instead, it uses breeze.optimize.LBFGS and re-implements most of the procedures in mllib.optimization.{LBFGS,OWLQN}. Thank you. Best, -- Yizhi Liu Senior Software Engineer / Data Mining www.mvad.com, Shanghai, China - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SparkContext._active_spark_context returns None
Hi Ted, I think I've make a mistake. I refered to python/mllib, callJavaFunc in mllib/common.py use SparkContext._active_spark_context because it is called from the driver. So maybe there is no explicit way to reach JVM during rdd operations? What I want to achieve is to take a ThriftWritable object from an LzoBlockInputFormat and deserialize it to a java object. If I could, I want to further transform the thrift object to DataFrame. I think I can implement a custom org.apache.spark.api.python.Converter and pass it to sc.hadoopFile(...,keyConverterClass,valueConverterClass...). But, once I get the converted java object, can I call its methods in python directly, i.e. reach the JVM? Thanks a lot! 2015-09-30 0:54 GMT+08:00 Ted Yu <yuzhih...@gmail.com>: > bq. the right way to reach JVM in python > > Can you tell us more about what you want to achieve ? > > If you want to pass some value to workers, you can use broadcast variable. > > Cheers > > On Mon, Sep 28, 2015 at 10:31 PM, YiZhi Liu <javeli...@gmail.com> wrote: >> >> Hi Ted, >> >> Thank you for reply. The sc works at driver, but how can I reach the >> JVM in rdd.map ? >> >> 2015-09-29 11:26 GMT+08:00 Ted Yu <yuzhih...@gmail.com>: >> >>>> sc._jvm.java.lang.Integer.valueOf("12") >> > 12 >> > >> > FYI >> > >> > On Mon, Sep 28, 2015 at 8:08 PM, YiZhi Liu <javeli...@gmail.com> wrote: >> >> >> >> Hi, >> >> >> >> I'm doing some data processing on pyspark, but I failed to reach JVM >> >> in workers. Here is what I did: >> >> >> >> $ bin/pyspark >> >> >>> data = sc.parallelize(["123", "234"]) >> >> >>> numbers = data.map(lambda s: >> >> >>> >> >> >>> SparkContext._active_spark_context._jvm.java.lang.Integer.valueOf(s.strip())) >> >> >>> numbers.collect() >> >> >> >> I got, >> >> >> >> Caused by: org.apache.spark.api.python.PythonException: Traceback >> >> (most recent call last): >> >> File >> >> >> >> "/mnt/hgfs/lewis/Workspace/source-codes/spark/python/lib/pyspark.zip/pyspark/worker.py", >> >> line 111, in main >> >> process() >> >> File >> >> >> >> "/mnt/hgfs/lewis/Workspace/source-codes/spark/python/lib/pyspark.zip/pyspark/worker.py", >> >> line 106, in process >> >> serializer.dump_stream(func(split_index, iterator), outfile) >> >> File >> >> >> >> "/mnt/hgfs/lewis/Workspace/source-codes/spark/python/lib/pyspark.zip/pyspark/serializers.py", >> >> line 263, in dump_stream >> >> vs = list(itertools.islice(iterator, batch)) >> >> File "", line 1, in >> >> AttributeError: 'NoneType' object has no attribute '_jvm' >> >> >> >> at >> >> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:138) >> >> at >> >> >> >> org.apache.spark.api.python.PythonRDD$$anon$1.(PythonRDD.scala:179) >> >> at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:97) >> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) >> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) >> >> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) >> >> at org.apache.spark.scheduler.Task.run(Task.scala:88) >> >> at >> >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) >> >> at >> >> >> >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >> >> at >> >> >> >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >> >> ... 1 more >> >> >> >> While _jvm at the driver end looks fine: >> >> >> >> >>> >> >> >>> >> >> >>> SparkContext._active_spark_context._jvm.java.lang.Integer.valueOf("123".strip()) >> >> 123 >> >> >> >> The program is trivial, I just wonder what is the right way to reach >> >> JVM in python. Any help would be appreciated. >> >> >> >> Thanks >> >> >> >> -- >> >> Yizhi Liu >> >> Senior Software Engineer / Data Mining >> >> www.mvad.com, Shanghai, China >> >> >> >> - >> >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> >> For additional commands, e-mail: user-h...@spark.apache.org >> >> >> > >> >> >> >> -- >> Yizhi Liu >> Senior Software Engineer / Data Mining >> www.mvad.com, Shanghai, China > > -- Yizhi Liu Senior Software Engineer / Data Mining www.mvad.com, Shanghai, China - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SparkContext._active_spark_context returns None
Hi Ted, Thank you for reply. The sc works at driver, but how can I reach the JVM in rdd.map ? 2015-09-29 11:26 GMT+08:00 Ted Yu <yuzhih...@gmail.com>: >>>> sc._jvm.java.lang.Integer.valueOf("12") > 12 > > FYI > > On Mon, Sep 28, 2015 at 8:08 PM, YiZhi Liu <javeli...@gmail.com> wrote: >> >> Hi, >> >> I'm doing some data processing on pyspark, but I failed to reach JVM >> in workers. Here is what I did: >> >> $ bin/pyspark >> >>> data = sc.parallelize(["123", "234"]) >> >>> numbers = data.map(lambda s: >> >>> SparkContext._active_spark_context._jvm.java.lang.Integer.valueOf(s.strip())) >> >>> numbers.collect() >> >> I got, >> >> Caused by: org.apache.spark.api.python.PythonException: Traceback >> (most recent call last): >> File >> "/mnt/hgfs/lewis/Workspace/source-codes/spark/python/lib/pyspark.zip/pyspark/worker.py", >> line 111, in main >> process() >> File >> "/mnt/hgfs/lewis/Workspace/source-codes/spark/python/lib/pyspark.zip/pyspark/worker.py", >> line 106, in process >> serializer.dump_stream(func(split_index, iterator), outfile) >> File >> "/mnt/hgfs/lewis/Workspace/source-codes/spark/python/lib/pyspark.zip/pyspark/serializers.py", >> line 263, in dump_stream >> vs = list(itertools.islice(iterator, batch)) >> File "", line 1, in >> AttributeError: 'NoneType' object has no attribute '_jvm' >> >> at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:138) >> at >> org.apache.spark.api.python.PythonRDD$$anon$1.(PythonRDD.scala:179) >> at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:97) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) >> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) >> at org.apache.spark.scheduler.Task.run(Task.scala:88) >> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >> ... 1 more >> >> While _jvm at the driver end looks fine: >> >> >>> >> >>> SparkContext._active_spark_context._jvm.java.lang.Integer.valueOf("123".strip()) >> 123 >> >> The program is trivial, I just wonder what is the right way to reach >> JVM in python. Any help would be appreciated. >> >> Thanks >> >> -- >> Yizhi Liu >> Senior Software Engineer / Data Mining >> www.mvad.com, Shanghai, China >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> > -- Yizhi Liu Senior Software Engineer / Data Mining www.mvad.com, Shanghai, China - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
SparkContext._active_spark_context returns None
Hi, I'm doing some data processing on pyspark, but I failed to reach JVM in workers. Here is what I did: $ bin/pyspark >>> data = sc.parallelize(["123", "234"]) >>> numbers = data.map(lambda s: >>> SparkContext._active_spark_context._jvm.java.lang.Integer.valueOf(s.strip())) >>> numbers.collect() I got, Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/mnt/hgfs/lewis/Workspace/source-codes/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main process() File "/mnt/hgfs/lewis/Workspace/source-codes/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process serializer.dump_stream(func(split_index, iterator), outfile) File "/mnt/hgfs/lewis/Workspace/source-codes/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream vs = list(itertools.islice(iterator, batch)) File "", line 1, in AttributeError: 'NoneType' object has no attribute '_jvm' at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:138) at org.apache.spark.api.python.PythonRDD$$anon$1.(PythonRDD.scala:179) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:97) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ... 1 more While _jvm at the driver end looks fine: >>> SparkContext._active_spark_context._jvm.java.lang.Integer.valueOf("123".strip()) 123 The program is trivial, I just wonder what is the right way to reach JVM in python. Any help would be appreciated. Thanks -- Yizhi Liu Senior Software Engineer / Data Mining www.mvad.com, Shanghai, China - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org