Re: [External Sender] Re: How to make pyspark use custom python?

2018-09-07 Thread mithril
I am sure, all writen as my first post. 
So this make me very confusing.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [External Sender] Re: How to make pyspark use custom python?

2018-09-06 Thread Femi Anthony
Are you sure that pyarrow is deployed on your slave hosts ? If not, you
will either have to get it installed or ship it along when you call
spark-submit by zipping it up and specifying the zipfile to be shipped
using the
--py-files zipfile.zip option

A quick check would be to ssh to a slave host, run pyspark and try to
import pyarrow.

Femi

On Thu, Sep 6, 2018 at 9:25 PM mithril  wrote:

>
> The whole content in `spark-env.sh` is
>
> ```
> SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER
> -Dspark.deploy.zookeeper.url=10.104.85.78:2181,10.104.114.131:2181,
> 10.135.2.132:2181
> -Dspark.deploy.zookeeper.dir=/spark"
> PYSPARK_PYTHON="/usr/local/miniconda3/bin/python"
> ```
>
> I ran `/usr/local/spark/sbin/stop-all.sh`  and
> `/usr/local/spark/sbin/start-all.sh` to restart spark cluster.
>
> Anything wrong ??
>
>
>
> --
> Sent from:
> https://urldefense.proofpoint.com/v2/url?u=http-3A__apache-2Dspark-2Duser-2Dlist.1001560.n3.nabble.com_=DwICAg=pLULRYW__RtkwsQUPxJVDGboCTdgji3AcHNJU0BpTJE=yGeUxkUZBNPLfjlLWOxq5_p1UIOy_S4ghJsg2_iDHFY=MukYKwEikKwBiW7D3pP5WDVQCs39Xo8dHytUwL1JjLM=5Bta_aRxRPJk58UXz-hQd7A1EzF-PX3A5C3vENHe3OQ=
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Re: How to make pyspark use custom python?

2018-09-06 Thread mithril


The whole content in `spark-env.sh` is 

```
SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER
-Dspark.deploy.zookeeper.url=10.104.85.78:2181,10.104.114.131:2181,10.135.2.132:2181
-Dspark.deploy.zookeeper.dir=/spark"
PYSPARK_PYTHON="/usr/local/miniconda3/bin/python"
```

I ran `/usr/local/spark/sbin/stop-all.sh`  and
`/usr/local/spark/sbin/start-all.sh` to restart spark cluster.

Anything wrong ??



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: How to make pyspark use custom python?

2018-09-06 Thread Patrick McCarthy
It looks like for whatever reason your cluster isn't using the python you
distributed, or said distribution doesn't contain what you think.

I've used the following with success to deploy a conda environment to my
cluster at runtime:
https://henning.kropponline.de/2016/09/24/running-pyspark-with-conda-env/

On Thu, Sep 6, 2018 at 2:58 AM, Hyukjin Kwon  wrote:

> Are you doubly sure if it is an issue in Spark? I used custom python
> several times with setting it in PYSPARK_PYTHON before and it was no
> problem.
>
> 2018년 9월 6일 (목) 오후 2:21, mithril 님이 작성:
>
>> For better looking , please see
>> https://stackoverflow.com/questions/52178406/howto-make-
>> pyspark-use-custom-python
>> > pyspark-use-custom-python>
>>
>> --
>>
>>
>> I am using zeppelin connect remote spark cluster.
>>
>> remote spark is using system python 2.7 .
>>
>> I want to switch to miniconda3, install a lib pyarrow.
>> What I do is :
>>
>> 1. Download miniconda3, install some libs, scp miniconda3 folder to spark
>> master and slaves.
>> 2. adding `PYSPARK_PYTHON="/usr/local/miniconda3/bin/python"` to
>> `spark-env.sh` in spark master and slaves.
>> 3. restart spark and zeppelin
>> 4. Running code
>>
>> %spark.pyspark
>>
>> import pandas as pd
>> from pyspark.sql.functions import pandas_udf,PandasUDFType
>>
>>
>> @pandas_udf(df.schema, PandasUDFType.GROUPED_MAP)
>> def process_order_items(pdf):
>>
>> pdf.loc[:, 'total_price'] = pdf['price'] * pdf['count']
>>
>> d = {'has_discount':'count',
>> 'clearance':'count',
>> 'count': ['count', 'sum'],
>> 'price_guide':'max',
>> 'total_price': 'sum'
>>
>> }
>>
>> pdf1 = pdf.groupby('day').agg(d)
>> pdf1.columns = pdf1.columns.map('_'.join)
>> d1 = {'has_discount_count':'discount_order_count',
>> 'clearance_count':'clearance_order_count',
>> 'count_count':'order_count',
>> 'count_sum':'sale_count',
>> 'price_guide_max':'price_guide',
>> 'total_price_sum': 'total_price'
>> }
>>
>> pdf2 = pdf1.rename(columns=d1)
>>
>> pdf2.loc[:, 'discount_sale_count'] =
>> pdf.loc[pdf.has_discount>0,
>> 'count'].resample(freq).sum()
>> pdf2.loc[:, 'clearance_sale_count'] = pdf.loc[pdf.clearance>0,
>> 'count'].resample(freq).sum()
>> pdf2.loc[:, 'price'] = pdf2.total_price / pdf2.sale_count
>>
>> pdf2 = pdf2.drop(pdf2[pdf2.order_count == 0].index)
>>
>> return pdf2
>>
>>
>> results = df.groupby("store_id",
>> "product_id").apply(process_order_items)
>>
>> results.select(['store_id', 'price']).show(5)
>>
>>
>> Got error :
>>
>> Py4JJavaError: An error occurred while calling o172.showString.
>> : org.apache.spark.SparkException: Job aborted due to stage failure:
>> Task 0 in stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in
>> stage 6.0 (TID 143, 10.104.33.18, executor 2):
>> org.apache.spark.api.python.PythonException: Traceback (most recent call
>> last):
>>   File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py",
>> line
>> 230, in main
>> process()
>>   File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py",
>> line
>> 225, in process
>> serializer.dump_stream(func(split_index, iterator), outfile)
>>   File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py",
>> line
>> 150, in 
>> func = lambda _, it: map(mapper, it)
>>   File "/usr/local/spark/python/lib/pyspark.zip/pyspark/
>> serializers.py",
>> line 276, in load_stream
>> import pyarrow as pa
>> ImportError: No module named pyarrow
>>
>>
>> `10.104.33.18` is spark master,  so I think the `PYSPARK_PYTHON` is not
>> set
>> correctly .
>>
>> `pyspark`
>>
>> I login to master and slaves, run `pyspark interpreter` in each, and found
>> `import pyarrow` do not throw exception .
>>
>>
>> PS: `pyarrow` also installed in the machine which running zeppelin.
>>
>> --
>>
>> More info:
>>
>>
>> 1. spark cluster is installed in A, B, C , zeppelin is installed in D.
>> 2. `PYSPARK_PYTHON` is set in `spark-env.sh` in each A, B, C
>> 3. `import pyarrow` is fine with `/usr/local/spark/bin/pyspark` in A, B
>> ,C /
>> 4. `import pyarrow` is fine on A, B ,C custom python(miniconda3)
>> 5. `import pyarrow` is fine on D's default python(miniconda3, path is
>> different with A, B ,C , but it is doesn't matter)
>>
>>
>>
>> So I completely coundn't understand why it doesn't work.
>>
>>
>>
>>
>>
>>
>>
>> --
>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>


Re: How to make pyspark use custom python?

2018-09-06 Thread Hyukjin Kwon
Are you doubly sure if it is an issue in Spark? I used custom python
several times with setting it in PYSPARK_PYTHON before and it was no
problem.

2018년 9월 6일 (목) 오후 2:21, mithril 님이 작성:

> For better looking , please see
>
> https://stackoverflow.com/questions/52178406/howto-make-pyspark-use-custom-python
> <
> https://stackoverflow.com/questions/52178406/howto-make-pyspark-use-custom-python>
>
>
> --
>
>
> I am using zeppelin connect remote spark cluster.
>
> remote spark is using system python 2.7 .
>
> I want to switch to miniconda3, install a lib pyarrow.
> What I do is :
>
> 1. Download miniconda3, install some libs, scp miniconda3 folder to spark
> master and slaves.
> 2. adding `PYSPARK_PYTHON="/usr/local/miniconda3/bin/python"` to
> `spark-env.sh` in spark master and slaves.
> 3. restart spark and zeppelin
> 4. Running code
>
> %spark.pyspark
>
> import pandas as pd
> from pyspark.sql.functions import pandas_udf,PandasUDFType
>
>
> @pandas_udf(df.schema, PandasUDFType.GROUPED_MAP)
> def process_order_items(pdf):
>
> pdf.loc[:, 'total_price'] = pdf['price'] * pdf['count']
>
> d = {'has_discount':'count',
> 'clearance':'count',
> 'count': ['count', 'sum'],
> 'price_guide':'max',
> 'total_price': 'sum'
>
> }
>
> pdf1 = pdf.groupby('day').agg(d)
> pdf1.columns = pdf1.columns.map('_'.join)
> d1 = {'has_discount_count':'discount_order_count',
> 'clearance_count':'clearance_order_count',
> 'count_count':'order_count',
> 'count_sum':'sale_count',
> 'price_guide_max':'price_guide',
> 'total_price_sum': 'total_price'
> }
>
> pdf2 = pdf1.rename(columns=d1)
>
> pdf2.loc[:, 'discount_sale_count'] =
> pdf.loc[pdf.has_discount>0,
> 'count'].resample(freq).sum()
> pdf2.loc[:, 'clearance_sale_count'] = pdf.loc[pdf.clearance>0,
> 'count'].resample(freq).sum()
> pdf2.loc[:, 'price'] = pdf2.total_price / pdf2.sale_count
>
> pdf2 = pdf2.drop(pdf2[pdf2.order_count == 0].index)
>
> return pdf2
>
>
> results = df.groupby("store_id",
> "product_id").apply(process_order_items)
>
> results.select(['store_id', 'price']).show(5)
>
>
> Got error :
>
> Py4JJavaError: An error occurred while calling o172.showString.
> : org.apache.spark.SparkException: Job aborted due to stage failure:
> Task 0 in stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in
> stage 6.0 (TID 143, 10.104.33.18, executor 2):
> org.apache.spark.api.python.PythonException: Traceback (most recent call
> last):
>   File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py",
> line
> 230, in main
> process()
>   File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py",
> line
> 225, in process
> serializer.dump_stream(func(split_index, iterator), outfile)
>   File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py",
> line
> 150, in 
> func = lambda _, it: map(mapper, it)
>   File
> "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py",
> line 276, in load_stream
> import pyarrow as pa
> ImportError: No module named pyarrow
>
>
> `10.104.33.18` is spark master,  so I think the `PYSPARK_PYTHON` is not set
> correctly .
>
> `pyspark`
>
> I login to master and slaves, run `pyspark interpreter` in each, and found
> `import pyarrow` do not throw exception .
>
>
> PS: `pyarrow` also installed in the machine which running zeppelin.
>
> --
>
> More info:
>
>
> 1. spark cluster is installed in A, B, C , zeppelin is installed in D.
> 2. `PYSPARK_PYTHON` is set in `spark-env.sh` in each A, B, C
> 3. `import pyarrow` is fine with `/usr/local/spark/bin/pyspark` in A, B ,C
> /
> 4. `import pyarrow` is fine on A, B ,C custom python(miniconda3)
> 5. `import pyarrow` is fine on D's default python(miniconda3, path is
> different with A, B ,C , but it is doesn't matter)
>
>
>
> So I completely coundn't understand why it doesn't work.
>
>
>
>
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


How to make pyspark use custom python?

2018-09-06 Thread mithril
For better looking , please see 
https://stackoverflow.com/questions/52178406/howto-make-pyspark-use-custom-python

  

--


I am using zeppelin connect remote spark cluster.

remote spark is using system python 2.7 .

I want to switch to miniconda3, install a lib pyarrow. 
What I do is :

1. Download miniconda3, install some libs, scp miniconda3 folder to spark
master and slaves.
2. adding `PYSPARK_PYTHON="/usr/local/miniconda3/bin/python"` to
`spark-env.sh` in spark master and slaves.
3. restart spark and zeppelin
4. Running code 

%spark.pyspark

import pandas as pd
from pyspark.sql.functions import pandas_udf,PandasUDFType


@pandas_udf(df.schema, PandasUDFType.GROUPED_MAP)
def process_order_items(pdf):

pdf.loc[:, 'total_price'] = pdf['price'] * pdf['count']

d = {'has_discount':'count',
'clearance':'count',
'count': ['count', 'sum'],
'price_guide':'max',
'total_price': 'sum'

}

pdf1 = pdf.groupby('day').agg(d)
pdf1.columns = pdf1.columns.map('_'.join)
d1 = {'has_discount_count':'discount_order_count',
'clearance_count':'clearance_order_count',
'count_count':'order_count',
'count_sum':'sale_count',
'price_guide_max':'price_guide',
'total_price_sum': 'total_price'
}

pdf2 = pdf1.rename(columns=d1)

pdf2.loc[:, 'discount_sale_count'] = pdf.loc[pdf.has_discount>0,
'count'].resample(freq).sum()
pdf2.loc[:, 'clearance_sale_count'] = pdf.loc[pdf.clearance>0,
'count'].resample(freq).sum()
pdf2.loc[:, 'price'] = pdf2.total_price / pdf2.sale_count

pdf2 = pdf2.drop(pdf2[pdf2.order_count == 0].index)

return pdf2


results = df.groupby("store_id",
"product_id").apply(process_order_items)

results.select(['store_id', 'price']).show(5)


Got error :

Py4JJavaError: An error occurred while calling o172.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure:
Task 0 in stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in
stage 6.0 (TID 143, 10.104.33.18, executor 2):
org.apache.spark.api.python.PythonException: Traceback (most recent call
last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line
230, in main
process()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line
225, in process
serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line
150, in 
func = lambda _, it: map(mapper, it)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py",
line 276, in load_stream
import pyarrow as pa
ImportError: No module named pyarrow


`10.104.33.18` is spark master,  so I think the `PYSPARK_PYTHON` is not set
correctly . 

`pyspark`

I login to master and slaves, run `pyspark interpreter` in each, and found
`import pyarrow` do not throw exception .


PS: `pyarrow` also installed in the machine which running zeppelin. 

--

More info:


1. spark cluster is installed in A, B, C , zeppelin is installed in D.
2. `PYSPARK_PYTHON` is set in `spark-env.sh` in each A, B, C 
3. `import pyarrow` is fine with `/usr/local/spark/bin/pyspark` in A, B ,C /
4. `import pyarrow` is fine on A, B ,C custom python(miniconda3)
5. `import pyarrow` is fine on D's default python(miniconda3, path is
different with A, B ,C , but it is doesn't matter)



So I completely coundn't understand why it doesn't work. 







--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org