Are you doubly sure if it is an issue in Spark? I used custom python several times with setting it in PYSPARK_PYTHON before and it was no problem.
2018년 9월 6일 (목) 오후 2:21, mithril <twinmeg...@gmail.com>님이 작성: > For better looking , please see > > https://stackoverflow.com/questions/52178406/howto-make-pyspark-use-custom-python > < > https://stackoverflow.com/questions/52178406/howto-make-pyspark-use-custom-python> > > > ---------------------- > > > I am using zeppelin connect remote spark cluster. > > remote spark is using system python 2.7 . > > I want to switch to miniconda3, install a lib pyarrow. > What I do is : > > 1. Download miniconda3, install some libs, scp miniconda3 folder to spark > master and slaves. > 2. adding `PYSPARK_PYTHON="/usr/local/miniconda3/bin/python"` to > `spark-env.sh` in spark master and slaves. > 3. restart spark and zeppelin > 4. Running code > > %spark.pyspark > > import pandas as pd > from pyspark.sql.functions import pandas_udf,PandasUDFType > > > @pandas_udf(df.schema, PandasUDFType.GROUPED_MAP) > def process_order_items(pdf): > > pdf.loc[:, 'total_price'] = pdf['price'] * pdf['count'] > > d = {'has_discount':'count', > 'clearance':'count', > 'count': ['count', 'sum'], > 'price_guide':'max', > 'total_price': 'sum' > > } > > pdf1 = pdf.groupby('day').agg(d) > pdf1.columns = pdf1.columns.map('_'.join) > d1 = {'has_discount_count':'discount_order_count', > 'clearance_count':'clearance_order_count', > 'count_count':'order_count', > 'count_sum':'sale_count', > 'price_guide_max':'price_guide', > 'total_price_sum': 'total_price' > } > > pdf2 = pdf1.rename(columns=d1) > > pdf2.loc[:, 'discount_sale_count'] = > pdf.loc[pdf.has_discount>0, > 'count'].resample(freq).sum() > pdf2.loc[:, 'clearance_sale_count'] = pdf.loc[pdf.clearance>0, > 'count'].resample(freq).sum() > pdf2.loc[:, 'price'] = pdf2.total_price / pdf2.sale_count > > pdf2 = pdf2.drop(pdf2[pdf2.order_count == 0].index) > > return pdf2 > > > results = df.groupby("store_id", > "product_id").apply(process_order_items) > > results.select(['store_id', 'price']).show(5) > > > Got error : > > Py4JJavaError: An error occurred while calling o172.showString. > : org.apache.spark.SparkException: Job aborted due to stage failure: > Task 0 in stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in > stage 6.0 (TID 143, 10.104.33.18, executor 2): > org.apache.spark.api.python.PythonException: Traceback (most recent call > last): > File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", > line > 230, in main > process() > File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", > line > 225, in process > serializer.dump_stream(func(split_index, iterator), outfile) > File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", > line > 150, in <lambda> > func = lambda _, it: map(mapper, it) > File > "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", > line 276, in load_stream > import pyarrow as pa > ImportError: No module named pyarrow > > > `10.104.33.18` is spark master, so I think the `PYSPARK_PYTHON` is not set > correctly . > > `pyspark` > > I login to master and slaves, run `pyspark interpreter` in each, and found > `import pyarrow` do not throw exception . > > > PS: `pyarrow` also installed in the machine which running zeppelin. > > -------------- > > More info: > > > 1. spark cluster is installed in A, B, C , zeppelin is installed in D. > 2. `PYSPARK_PYTHON` is set in `spark-env.sh` in each A, B, C > 3. `import pyarrow` is fine with `/usr/local/spark/bin/pyspark` in A, B ,C > / > 4. `import pyarrow` is fine on A, B ,C custom python(miniconda3) > 5. `import pyarrow` is fine on D's default python(miniconda3, path is > different with A, B ,C , but it is doesn't matter) > > > > So I completely coundn't understand why it doesn't work. > > > > > > > > -- > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ > > --------------------------------------------------------------------- > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >