Profiling options for PandasUDF (2.4.7 on yarn)

2021-05-28 Thread Patrick McCarthy
I'm trying to do a very large aggregation of sparse matrices in which my
source data looks like

root
 |-- device_id: string (nullable = true)
 |-- row_id: array (nullable = true)
 ||-- element: integer (containsNull = true)
 |-- column_id: array (nullable = true)
 ||-- element: integer (containsNull = true)



I assume each row to reflect a sparse matrix where each combination of
(row_id, column_id) has value of 1. I have a PandasUDF which performs a
GROUPED_MAP that transforms every row into a scipy.sparse.csr_matrix and,
within the group, sums the matrices before returning columns of (count,
row_id, column_id).

It works at small scale but gets unstable as I scale up. Is there a way to
profile this function in a spark session or am I limited to profiling on
pandas data frames without spark?

-- 


*Patrick McCarthy  *

Senior Data Scientist, Machine Learning Engineering

Dstillery

470 Park Ave South, 17th Floor, NYC 10016


Re: Issue while installing dependencies Python Spark

2020-12-18 Thread Patrick McCarthy
At the risk of repeating myself, this is what I was hoping to avoid when I
suggested deploying a full, zipped, conda venv.

What is your motivation for running an install process on the nodes and
risking the process failing, instead of pushing a validated environment
artifact and not having that risk? In either case you move about the same
number of bytes around.

On Fri, Dec 18, 2020 at 3:04 PM Sachit Murarka 
wrote:

> Hi Patrick/Users,
>
> I am exploring wheel file form packages for this , as this seems simple:-
>
>
> https://bytes.grubhub.com/managing-dependencies-and-artifacts-in-pyspark-7641aa89ddb7
>
> However, I am facing another issue:- I am using pandas , which needs
> numpy. Numpy is giving error!
>
>
> ImportError: Unable to import required dependencies:
> numpy:
>
> IMPORTANT: PLEASE READ THIS FOR ADVICE ON HOW TO SOLVE THIS ISSUE!
>
> Importing the numpy C-extensions failed. This error can happen for
> many reasons, often due to issues with your setup or how NumPy was
> installed.
>
> We have compiled some common reasons and troubleshooting tips at:
>
> https://numpy.org/devdocs/user/troubleshooting-importerror.html
>
> Please note and check the following:
>
>   * The Python version is: Python3.7 from "/usr/bin/python3"
>   * The NumPy version is: "1.19.4"
>
> and make sure that they are the versions you expect.
> Please carefully study the documentation linked above for further help.
>
> Original error was: No module named 'numpy.core._multiarray_umath'
>
>
>
> Kind Regards,
> Sachit Murarka
>
>
> On Thu, Dec 17, 2020 at 9:24 PM Patrick McCarthy 
> wrote:
>
>> I'm not very familiar with the environments on cloud clusters, but in
>> general I'd be reluctant to lean on setuptools or other python install
>> mechanisms. In the worst case, you might encounter /usr/bin/pip not having
>> permissions to install new packages, or even if you do a package might
>> require something you can't change like a libc dependency.
>>
>> Perhaps you can install the .whl and its dependencies to the virtualenv
>> on a local machine, and then *after* the install process, package that
>> venv?
>>
>> If possible, I like conda for this approach over a vanilla venv because
>> it will contain all the non-python dependencies (like libc) if they're
>> needed.
>>
>>
>> Another thing - I think there are several ways to do this, but I've had
>> the most success including the .zip containing my environment in
>> `spark.yarn.dist.archives` and then using a relative path:
>>
>> os.environ['PYSPARK_PYTHON'] = './py37minimal_env/py37minimal/bin/python'
>>
>> dist_archives =
>> 'hdfs:///user/pmccarthy/conda/py37minimal.zip#py37minimal_env'
>>
>> SparkSession.builder.
>> ...
>>  .config('spark.yarn.dist.archives', dist_archives)
>>
>>
>> On Thu, Dec 17, 2020 at 10:32 AM Sachit Murarka 
>> wrote:
>>
>>> Hi Users
>>>
>>> I have a wheel file , while creating it I have mentioned dependencies in
>>> setup.py file.
>>> Now I have 2 virtual envs, 1 was already there . another one I created
>>> just now.
>>>
>>> I have switched to new virtual env, I want spark to download the
>>> dependencies while doing spark-submit using wheel.
>>>
>>> Could you please help me on this?
>>>
>>> It is not downloading dependencies , instead it is pointing to older
>>> version of  virtual env and proceeding with the execution of spark job.
>>>
>>> Please note I have tried setting the env variables also.
>>> Also I have tried following options as well in spark submit
>>>
>>> --conf spark.pyspark.virtualenv.enabled=true  --conf
>>> spark.pyspark.virtualenv.type=native --conf
>>> spark.pyspark.virtualenv.requirements=requirements.txt  --conf
>>> spark.pyspark.python= /path/to/venv/bin/python3 --conf
>>> spark.pyspark.driver.python=/path/to/venv/bin/python3
>>>
>>> This did not help too..
>>>
>>> Kind Regards,
>>> Sachit Murarka
>>>
>>
>>
>> --
>>
>>
>> *Patrick McCarthy  *
>>
>> Senior Data Scientist, Machine Learning Engineering
>>
>> Dstillery
>>
>> 470 Park Ave South, 17th Floor, NYC 10016
>>
>

-- 


*Patrick McCarthy  *

Senior Data Scientist, Machine Learning Engineering

Dstillery

470 Park Ave South, 17th Floor, NYC 10016


Re: Getting error message

2020-12-17 Thread Patrick McCarthy
Possibly. In that case maybe you should step back from spark and see if
there are OS-level tools to understand what's going on, like looking for
evidence of the OOM killer -
https://docs.memset.com/other/linux-s-oom-process-killer

On Thu, Dec 17, 2020 at 1:45 PM Vikas Garg  wrote:

> I am running code in a local machine that is single node machine.
>
> Getting into logs,  it looked like the host is killed.  This is happening
> very frequently an I am unable to find the reason of this.
>
> Could low memory be the reason?
>
> On Fri, 18 Dec 2020, 00:11 Patrick McCarthy, 
> wrote:
>
>> 'Job aborted due to stage failure: Task 1 in stage 39.0 failed 1 times'
>>
>> You may want to change the number of failures to a higher number like 4.
>> A single failure on a task should be able to be tolerated, especially if
>> you're on a shared cluster where resources can be preempted.
>>
>>  It seems that a node dies or goes off the network, so perhaps you can
>> also debug the logs on the failing node to see why it disappears and
>> prevent the failures in the first place.
>>
>> On Thu, Dec 17, 2020 at 1:27 PM Vikas Garg  wrote:
>>
>>> Mydomain is named by me while pasting the logs
>>>
>>> Also,  there are multiple class files in my project, if I run any 1 or 2
>>> at a time,  then they run fine,  sometimes they too give this error. But
>>> running all the classes at the same time always give this error.
>>>
>>> Once this error come, I can't run any program and on restarting the
>>> system, program starts running fine.
>>> This error goes away on
>>>
>>> On Thu, 17 Dec 2020, 23:50 Patrick McCarthy, 
>>> wrote:
>>>
>>>> my-domain.com/192.168.166.8:63534 probably isn't a valid address on
>>>> your network, is it?
>>>>
>>>> On Thu, Dec 17, 2020 at 3:03 AM Vikas Garg  wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> Since last few days, I am getting error message while running my
>>>>> project. I have searched Google for the solution but didn't got any help.
>>>>>
>>>>> Can someone help me to figure out how I could mitigate this issue?
>>>>>
>>>>>
>>>>> 20/12/17 13:26:57 ERROR RetryingBlockFetcher: Exception while
>>>>> beginning fetch of 1 outstanding blocks
>>>>>
>>>>> *java.io.IOException*: Failed to connect to
>>>>> my-domain.com/192.168.166.8:63534
>>>>>
>>>>> at
>>>>> org.apache.spark.network.client.TransportClientFactory.createClient(
>>>>> *TransportClientFactory.java:253*)
>>>>>
>>>>> at
>>>>> org.apache.spark.network.client.TransportClientFactory.createClient(
>>>>> *TransportClientFactory.java:195*)
>>>>>
>>>>> at
>>>>> org.apache.spark.network.netty.NettyBlockTransferService$$anon$2.createAndStart(
>>>>> *NettyBlockTransferService.scala:122*)
>>>>>
>>>>> at
>>>>> org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(
>>>>> *RetryingBlockFetcher.java:141*)
>>>>>
>>>>> at org.apache.spark.network.shuffle.RetryingBlockFetcher.start(
>>>>> *RetryingBlockFetcher.java:121*)
>>>>>
>>>>> at
>>>>> org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(
>>>>> *NettyBlockTransferService.scala:143*)
>>>>>
>>>>> at
>>>>> org.apache.spark.network.BlockTransferService.fetchBlockSync(
>>>>> *BlockTransferService.scala:103*)
>>>>>
>>>>> at
>>>>> org.apache.spark.storage.BlockManager.fetchRemoteManagedBuffer(
>>>>> *BlockManager.scala:1010*)
>>>>>
>>>>> at
>>>>> org.apache.spark.storage.BlockManager.$anonfun$getRemoteBlock$8(
>>>>> *BlockManager.scala:954*)
>>>>>
>>>>> at scala.Option.orElse(*Option.scala:447*)
>>>>>
>>>>> at org.apache.spark.storage.BlockManager.getRemoteBlock(
>>>>> *BlockManager.scala:954*)
>>>>>
>>>>> at org.apache.spark.storage.BlockManager.getRemoteBytes(
>>>>> *BlockManager.scala:1092*)
>>>>>
>>>>> at
>>>>&

Re: Getting error message

2020-12-17 Thread Patrick McCarthy
'Job aborted due to stage failure: Task 1 in stage 39.0 failed 1 times'

You may want to change the number of failures to a higher number like 4. A
single failure on a task should be able to be tolerated, especially if
you're on a shared cluster where resources can be preempted.

 It seems that a node dies or goes off the network, so perhaps you can also
debug the logs on the failing node to see why it disappears and prevent the
failures in the first place.

On Thu, Dec 17, 2020 at 1:27 PM Vikas Garg  wrote:

> Mydomain is named by me while pasting the logs
>
> Also,  there are multiple class files in my project, if I run any 1 or 2
> at a time,  then they run fine,  sometimes they too give this error. But
> running all the classes at the same time always give this error.
>
> Once this error come, I can't run any program and on restarting the
> system, program starts running fine.
> This error goes away on
>
> On Thu, 17 Dec 2020, 23:50 Patrick McCarthy, 
> wrote:
>
>> my-domain.com/192.168.166.8:63534 probably isn't a valid address on your
>> network, is it?
>>
>> On Thu, Dec 17, 2020 at 3:03 AM Vikas Garg  wrote:
>>
>>> Hi,
>>>
>>> Since last few days, I am getting error message while running my
>>> project. I have searched Google for the solution but didn't got any help.
>>>
>>> Can someone help me to figure out how I could mitigate this issue?
>>>
>>>
>>> 20/12/17 13:26:57 ERROR RetryingBlockFetcher: Exception while beginning
>>> fetch of 1 outstanding blocks
>>>
>>> *java.io.IOException*: Failed to connect to
>>> my-domain.com/192.168.166.8:63534
>>>
>>> at
>>> org.apache.spark.network.client.TransportClientFactory.createClient(
>>> *TransportClientFactory.java:253*)
>>>
>>> at
>>> org.apache.spark.network.client.TransportClientFactory.createClient(
>>> *TransportClientFactory.java:195*)
>>>
>>> at
>>> org.apache.spark.network.netty.NettyBlockTransferService$$anon$2.createAndStart(
>>> *NettyBlockTransferService.scala:122*)
>>>
>>> at
>>> org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(
>>> *RetryingBlockFetcher.java:141*)
>>>
>>> at org.apache.spark.network.shuffle.RetryingBlockFetcher.start(
>>> *RetryingBlockFetcher.java:121*)
>>>
>>> at
>>> org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(
>>> *NettyBlockTransferService.scala:143*)
>>>
>>> at org.apache.spark.network.BlockTransferService.fetchBlockSync(
>>> *BlockTransferService.scala:103*)
>>>
>>> at
>>> org.apache.spark.storage.BlockManager.fetchRemoteManagedBuffer(
>>> *BlockManager.scala:1010*)
>>>
>>> at
>>> org.apache.spark.storage.BlockManager.$anonfun$getRemoteBlock$8(
>>> *BlockManager.scala:954*)
>>>
>>> at scala.Option.orElse(*Option.scala:447*)
>>>
>>> at org.apache.spark.storage.BlockManager.getRemoteBlock(
>>> *BlockManager.scala:954*)
>>>
>>> at org.apache.spark.storage.BlockManager.getRemoteBytes(
>>> *BlockManager.scala:1092*)
>>>
>>> at
>>> org.apache.spark.scheduler.TaskResultGetter$$anon$3.$anonfun$run$1(
>>> *TaskResultGetter.scala:88*)
>>>
>>> at
>>> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>>>
>>> at org.apache.spark.util.Utils$.logUncaughtExceptions(
>>> *Utils.scala:1932*)
>>>
>>> at org.apache.spark.scheduler.TaskResultGetter$$anon$3.run(
>>> *TaskResultGetter.scala:63*)
>>>
>>> at java.util.concurrent.ThreadPoolExecutor.runWorker(
>>> *ThreadPoolExecutor.java:1149*)
>>>
>>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
>>> *ThreadPoolExecutor.java:624*)
>>>
>>> at java.lang.Thread.run(*Thread.java:748*)
>>>
>>> Caused by: *io.netty.channel.AbstractChannel$AnnotatedSocketException*:
>>> Permission denied: no further information:
>>> my-domain.com/192.168.166.8:63534
>>>
>>> Caused by: *java.net.SocketException*: Permission denied: no further
>>> information
>>>
>>> at sun.nio.ch.SocketChannelImpl.checkConnect(*Native Method*)
>>>
>>> at sun.nio.ch.SocketChannelImpl.finishConnect(
>>> *SocketChannelImpl.java:715*)
>>

Re: Issue while installing dependencies Python Spark

2020-12-17 Thread Patrick McCarthy
I'm not very familiar with the environments on cloud clusters, but in
general I'd be reluctant to lean on setuptools or other python install
mechanisms. In the worst case, you might encounter /usr/bin/pip not having
permissions to install new packages, or even if you do a package might
require something you can't change like a libc dependency.

Perhaps you can install the .whl and its dependencies to the virtualenv on
a local machine, and then *after* the install process, package that venv?

If possible, I like conda for this approach over a vanilla venv because it
will contain all the non-python dependencies (like libc) if they're needed.


Another thing - I think there are several ways to do this, but I've had the
most success including the .zip containing my environment in
`spark.yarn.dist.archives` and then using a relative path:

os.environ['PYSPARK_PYTHON'] = './py37minimal_env/py37minimal/bin/python'

dist_archives =
'hdfs:///user/pmccarthy/conda/py37minimal.zip#py37minimal_env'

SparkSession.builder.
...
 .config('spark.yarn.dist.archives', dist_archives)


On Thu, Dec 17, 2020 at 10:32 AM Sachit Murarka 
wrote:

> Hi Users
>
> I have a wheel file , while creating it I have mentioned dependencies in
> setup.py file.
> Now I have 2 virtual envs, 1 was already there . another one I created
> just now.
>
> I have switched to new virtual env, I want spark to download the
> dependencies while doing spark-submit using wheel.
>
> Could you please help me on this?
>
> It is not downloading dependencies , instead it is pointing to older
> version of  virtual env and proceeding with the execution of spark job.
>
> Please note I have tried setting the env variables also.
> Also I have tried following options as well in spark submit
>
> --conf spark.pyspark.virtualenv.enabled=true  --conf
> spark.pyspark.virtualenv.type=native --conf
> spark.pyspark.virtualenv.requirements=requirements.txt  --conf
> spark.pyspark.python= /path/to/venv/bin/python3 --conf
> spark.pyspark.driver.python=/path/to/venv/bin/python3
>
> This did not help too..
>
> Kind Regards,
> Sachit Murarka
>


-- 


*Patrick McCarthy  *

Senior Data Scientist, Machine Learning Engineering

Dstillery

470 Park Ave South, 17th Floor, NYC 10016


Re: [Spark Core] Vectorizing very high-dimensional data sourced in long format

2020-10-30 Thread Patrick McCarthy
That's a very large vector. Is it sparse? Perhaps you'd have better luck
performing an aggregate instead of a pivot, and assembling the vector using
a UDF.

On Thu, Oct 29, 2020 at 10:19 PM Daniel Chalef
 wrote:

> Hello,
>
> I have a very large long-format dataframe (several billion rows) that I'd
> like to pivot and vectorize (using the VectorAssembler), with the aim to
> reduce dimensionality using something akin to TF-IDF. Once pivoted, the
> dataframe will have ~130 million columns.
>
> The source, long-format schema looks as follows:
>
> root
>  |-- entity_id: long (nullable = false)
>  |-- attribute_id: long (nullable = false)
>  |-- event_count: integer (nullable = true)
>
> Pivoting as per the following fails, exhausting executor and driver
> memory. I am unsure whether increasing memory limits would be successful
> here as my sense is that pivoting and then using a VectorAssembler isn't
> the right approach to solving this problem.
>
> wide_frame = (
> long_frame.groupBy("entity_id")
> .pivot("attribute_id")
> .agg(F.first("event_count"))
> )
>
> Are there other Spark patterns that I should attempt in order to achieve
> my end goal of a vector of attributes for every entity?
>
> Thanks, Daniel
>


-- 


*Patrick McCarthy  *

Senior Data Scientist, Machine Learning Engineering

Dstillery

470 Park Ave South, 17th Floor, NYC 10016


Re: Hive using Spark engine vs native spark with hive integration.

2020-10-07 Thread Patrick McCarthy
I think a lot will depend on what the scripts do. I've seen some legacy
hive scripts which were written in an awkward way (e.g. lots of subqueries,
nested explodes) because pre-spark it was the only way to express certain
logic. For fairly straightforward operations I expect Catalyst would reduce
both code to similar plans.

On Tue, Oct 6, 2020 at 12:07 PM Manu Jacob 
wrote:

> Hi All,
>
>
>
> Not sure if I need to ask this question on spark community or hive
> community.
>
>
>
> We have a set of hive scripts that runs on EMR (Tez engine). We would like
> to experiment by moving some of it onto Spark. We are planning to
> experiment with two options.
>
>
>
>1. Use the current code based on HQL, with engine set as spark.
>2. Write pure spark code in scala/python using SparkQL and hive
>integration.
>
>
>
> The first approach helps us to transition to Spark quickly but not sure if
> this is the best approach in terms of performance.  Could not find any
> reasonable comparisons of this two approaches.  It looks like writing pure
> Spark code, gives us more control to add logic and also control some of the
> performance features, for example things like caching/evicting etc.
>
>
>
>
>
> Any advice on this is much appreciated.
>
>
>
>
>
> Thanks,
>
> -Manu
>
>
>


-- 


*Patrick McCarthy  *

Senior Data Scientist, Machine Learning Engineering

Dstillery

470 Park Ave South, 17th Floor, NYC 10016


Re: regexp_extract regex for extracting the columns from string

2020-08-10 Thread Patrick McCarthy
Can you simply do a string split on space, and then another on '='?

On Sun, Aug 9, 2020 at 12:00 PM anbutech  wrote:

> Hi All,
>
> I have a following info.in the data column.
>
> <1000> date=2020-08-01 time=20:50:04 name=processing id=123 session=new
> packt=20 orgin=null address=null dest=fgjglgl
>
> here I want to create a separate column for the above key value pairs after
> the integer <1000> separated by spaces.
> Is there any way to achieved it using regexp_extract inbuilt functions.i
> don't want to do it using udf function.
> apart from udf,is there any way to achieved it.
>
>
> Thanks
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -----
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

-- 


*Patrick McCarthy  *

Senior Data Scientist, Machine Learning Engineering

Dstillery

470 Park Ave South, 17th Floor, NYC 10016


Re: [Spark SQL]: Can't write DataFrame after using explode function on multiple columns.

2020-08-03 Thread Patrick McCarthy
If you use pandas_udfs in 2.4 they should be quite performant (or at least
won't suffer serialization overhead), might be worth looking into.

I didn't run your code but one consideration is that the while loop might
be making the DAG a lot bigger than it has to be. You might see if defining
those columns with list comprehensions forming a single select() statement
makes for a smaller DAG.

On Mon, Aug 3, 2020 at 10:06 AM Henrique Oliveira  wrote:

> Hi Patrick, thank you for your quick response.
> That's exactly what I think. Actually, the result of this processing is an
> intermediate table that is going to be used for other views generation.
> Another approach I'm trying now, is to move the "explosion" step for this
> "view generation" step, this way I don't need to explode every column but
> just those used for the final client.
>
> ps.I was avoiding UDFs for now because I'm still on Spark 2.4 and the
> python udfs I tried had very bad performance, but I will give it a try in
> this case. It can't be worse.
> Thanks again!
>
> Em seg., 3 de ago. de 2020 às 10:53, Patrick McCarthy <
> pmccar...@dstillery.com> escreveu:
>
>> This seems like a very expensive operation. Why do you want to write out
>> all the exploded values? If you just want all combinations of values, could
>> you instead do it at read-time with a UDF or something?
>>
>> On Sat, Aug 1, 2020 at 8:34 PM hesouol  wrote:
>>
>>> I forgot to add an information. By "can't write" I mean it keeps
>>> processing
>>> and nothing happens. The job runs for hours even with a very small file
>>> and
>>> I have to force the stoppage.
>>>
>>>
>>>
>>> --
>>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>>
>> --
>>
>>
>> *Patrick McCarthy  *
>>
>> Senior Data Scientist, Machine Learning Engineering
>>
>> Dstillery
>>
>> 470 Park Ave South, 17th Floor, NYC 10016
>>
>

-- 


*Patrick McCarthy  *

Senior Data Scientist, Machine Learning Engineering

Dstillery

470 Park Ave South, 17th Floor, NYC 10016


Re: [Spark SQL]: Can't write DataFrame after using explode function on multiple columns.

2020-08-03 Thread Patrick McCarthy
This seems like a very expensive operation. Why do you want to write out
all the exploded values? If you just want all combinations of values, could
you instead do it at read-time with a UDF or something?

On Sat, Aug 1, 2020 at 8:34 PM hesouol  wrote:

> I forgot to add an information. By "can't write" I mean it keeps processing
> and nothing happens. The job runs for hours even with a very small file and
> I have to force the stoppage.
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

-- 


*Patrick McCarthy  *

Senior Data Scientist, Machine Learning Engineering

Dstillery

470 Park Ave South, 17th Floor, NYC 10016


Re: Issue in parallelization of CNN model using spark

2020-07-14 Thread Patrick McCarthy
Please don't advocate for piracy, this book is not freely available.

I own it and it's wonderful, Mr. Géron deserves to benefit from it.

On Mon, Jul 13, 2020 at 9:59 PM Anwar AliKhan 
wrote:

>  link to a free book  which may be useful.
>
> Hands-On Machine Learning with Scikit-Learn, Keras, and Tensorflow
> Concepts, Tools, and Techniques to Build Intelligent Systems by Aurélien
> Géron
>
> https://bit.ly/2zxueGt
>
>
>
>
>
>  13 Jul 2020, 15:18 Sean Owen,  wrote:
>
>> There is a multilayer perceptron implementation in Spark ML, but
>> that's not what you're looking for.
>> To parallelize model training developed using standard libraries like
>> Keras, use Horovod from Uber.
>> https://horovod.readthedocs.io/en/stable/spark_include.html
>>
>> On Mon, Jul 13, 2020 at 6:59 AM Mukhtaj Khan  wrote:
>> >
>> > Dear Spark User
>> >
>> > I am trying to parallelize the CNN (convolutional neural network) model
>> using spark. I have developed the model using python and Keras library. The
>> model works fine on a single machine but when we try on multiple machines,
>> the execution time remains the same as sequential.
>> > Could you please tell me that there is any built-in library for CNN to
>> parallelize in spark framework. Moreover, MLLIB does not have any support
>> for CNN.
>> > Best regards
>> > Mukhtaj
>> >
>> >
>> >
>> >
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>

-- 


*Patrick McCarthy  *

Senior Data Scientist, Machine Learning Engineering

Dstillery

470 Park Ave South, 17th Floor, NYC 10016


Building Spark 3.0.0 for Hive 1.2

2020-07-10 Thread Patrick McCarthy
I'm trying to build Spark 3.0.0 for my Yarn cluster, with Hadoop 2.7.3 and
Hive 1.2.1. I downloaded the source and created a runnable dist with

./dev/make-distribution.sh --name custom-spark --pip --r --tgz -Psparkr
-Phive-1.2 -Phadoop-2.7 -Pyarn

We're running Spark 2.4.0 in production so I copied the hive-site.xml,
spark-env.sh and spark-defaults.conf from there.

When I try to create a SparkSession in a normal Python REPL, I get the
following uninformative error. How can I debug this? I can run the
spark-shell and get to a scala prompt with Hive access seemingly without
error.

Python 3.6.3 (default, Apr 10 2018, 16:07:04)[GCC 4.8.3 20140911 (Red
Hat 4.8.3-9)] on linuxType "help", "copyright", "credits" or "license"
for more information.>>> import os>>> import sys>>>
os.environ['SPARK_HOME'] = '/home/pmccarthy/custom-spark-3'>>>
sys.path.insert(0,os.path.join(os.environ['SPARK_HOME'],'python','lib','py4j-src.zip'))>>>
sys.path.append(os.path.join(os.environ['SPARK_HOME'],'python'))>>>
import pyspark>>> from pyspark.sql import SparkSession>>> spark =
(SparkSession.builder.enableHiveSupport().config('spark.master','local').getOrCreate())

Traceback (most recent call last):
  File "", line 1, in 
  File "/home/pmccarthy/custom-spark-3/python/pyspark/sql/session.py",
line 191, in getOrCreate
session._jsparkSession.sessionState().conf().setConfString(key, value)
  File 
"/home/pmccarthy/custom-spark-3/python/lib/py4j-src.zip/py4j/java_gateway.py",
line 1305, in __call__
  File "/home/pmccarthy/custom-spark-3/python/pyspark/sql/utils.py",
line 137, in deco
raise_from(converted)
  File "", line 3, in raise_from
pyspark.sql.utils.IllegalArgumentException: 


-- 


*Patrick McCarthy  *

Senior Data Scientist, Machine Learning Engineering

Dstillery

470 Park Ave South, 17th Floor, NYC 10016


Re: Reading TB of JSON file

2020-06-18 Thread Patrick McCarthy
Assuming that the file can be easily split, I would divide it into a number
of pieces and move those pieces to HDFS before using spark at all, using
`hdfs dfs` or similar. At that point you can use your executors to perform
the reading instead of the driver.

On Thu, Jun 18, 2020 at 9:12 AM Chetan Khatri 
wrote:

> Hi Spark Users,
>
> I have a 50GB of JSON file, I would like to read and persist at HDFS so it
> can be taken into next transformation. I am trying to read as
> spark.read.json(path) but this is giving Out of memory error on driver.
> Obviously, I can't afford having 50 GB on driver memory. In general, what
> is the best practice to read large JSON file like 50 GB?
>
> Thanks
>


-- 


*Patrick McCarthy  *

Senior Data Scientist, Machine Learning Engineering

Dstillery

470 Park Ave South, 17th Floor, NYC 10016


Re: Add python library

2020-06-08 Thread Patrick McCarthy
I've found Anaconda encapsulates modules and dependencies and such nicely,
and you can deploy all the needed .so files and such by deploying a whole
conda environment.

I've used this method with success:
https://community.cloudera.com/t5/Community-Articles/Running-PySpark-with-Conda-Env/ta-p/247551

On Sat, Jun 6, 2020 at 4:16 PM Anwar AliKhan 
wrote:

>  " > Have you looked into this article?
> https://medium.com/@SSKahani/pyspark-applications-dependencies-99415e0df987
>  "
>
> This is weird !
> I was hanging out here https://machinelearningmastery.com/start-here/.
> When I came across this post.
>
> The weird part is I was just wondering  how I can take one of the
> projects(Open AI GYM taxi-vt2 in Python), a project I want to develop
> further.
>
> I want to run on Spark using Spark's parallelism features and GPU
> capabilities,  when I am using bigger datasets . While installing the
> workers (slaves)  doing the sliced dataset computations on the new 8GB RAM
> Raspberry Pi (Linux).
>
> Are any other documents on official website which shows how to do that,
> or any other location  , preferably showing full self contained examples?
>
>
>
> On Fri, 5 Jun 2020, 09:02 Dark Crusader, 
> wrote:
>
>> Hi Stone,
>>
>>
>> I haven't tried it with .so files however I did use the approach he
>> recommends to install my other dependencies.
>> I Hope it helps.
>>
>> On Fri, Jun 5, 2020 at 1:12 PM Stone Zhong  wrote:
>>
>>> Hi,
>>>
>>> So my pyspark app depends on some python libraries, it is not a problem,
>>> I pack all the dependencies into a file libs.zip, and then call
>>> *sc.addPyFile("libs.zip")* and it works pretty well for a while.
>>>
>>> Then I encountered a problem, if any of my library has any binary file
>>> dependency (like .so files), this approach does not work. Mainly because
>>> when you set PYTHONPATH to a zip file, python does not look up needed
>>> binary library (e.g. a .so file) inside the zip file, this is a python
>>> *limitation*. So I got a workaround:
>>>
>>> 1) Do not call sc.addPyFile, instead extract the libs.zip into current
>>> directory
>>> 2) When my python code starts, manually call *sys.path.insert(0,
>>> f"{os.getcwd()}/libs")* to set PYTHONPATH
>>>
>>> This workaround works well for me. Then I got another problem: what if
>>> my code in executor need python library that has binary code? Below is am
>>> example:
>>>
>>> def do_something(p):
>>> ...
>>>
>>> rdd = sc.parallelize([
>>> {"x": 1, "y": 2},
>>> {"x": 2, "y": 3},
>>> {"x": 3, "y": 4},
>>> ])
>>> a = rdd.map(do_something)
>>>
>>> What if the function "do_something" need a python library that has
>>> binary code? My current solution is, extract libs.zip into a NFS share (or
>>> a SMB share) and manually do *sys.path.insert(0,
>>> f"share_mount_dir/libs") *in my "do_something" function, but adding
>>> such code in each function looks ugly, is there any better/elegant solution?
>>>
>>> Thanks,
>>> Stone
>>>
>>>

-- 


*Patrick McCarthy  *

Senior Data Scientist, Machine Learning Engineering

Dstillery

470 Park Ave South, 17th Floor, NYC 10016


Re: Save Spark dataframe as dynamic partitioned table in Hive

2020-04-16 Thread Patrick McCarthy
|
> |123456789   |
>
> +-+-+-+++-+-++
>
> org.apache.spark.sql.catalyst.parser.ParseException:
> missing STRING at ','(line 2, pos 85)
>
> == SQL ==
>
>   INSERT INTO TABLE michtest.BroadcastStaging PARTITION (broadcastId =
> broadcastValue, brand = dummy)
>
> -^^^
>   SELECT
>   ocis_party_id AS partyId
> , target_mobile_no AS phoneNumber
>   FROM tmp
>
> It fails passing partition values
>
>
> Thanks,
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>


-- 


*Patrick McCarthy  *

Senior Data Scientist, Machine Learning Engineering

Dstillery

470 Park Ave South, 17th Floor, NYC 10016


Design pattern to invert a large map

2020-03-31 Thread Patrick McCarthy
I'm not a software engineer by training and I hope that there's an existing
best practice for the problem I'm trying to solve. I'm using Spark 2.4.5,
Hadoop 2.7, Hive 1.2.

I have a large table (terabytes) from an external source (which is beyond
my control) where the data is stored in a key-value format with an array of
values:

| id | val
+ - +---
| k1 | 
| k2 | 
| k3 | 

I want to invert the map so that I have a collection of keys for each value
(let's assume I don't care about uniqueness):

| id | val
+ - + --
| v1 | 
| v2 | 
| v3 | 
| v5 | 

It seems like a lot of shuffle is required somehow, but I'm not sure what
the best approach is. I've written solutions using DataFrame (with
explode(), groupBy() and collect_set()) and with RDD but it's always very
expensive.

Is there a best practice technique for this kind of operation? My leading
thought so far is to restage the data in a partitioned, bucketed flat table
as an intermediary step but that too is costly in terms of disk space and
transform time.

Thanks,
Patrick


Re: Using Percentile in Spark SQL

2019-11-11 Thread Patrick McCarthy
Depending on your tolerance for error you could also use
percentile_approx().

On Mon, Nov 11, 2019 at 10:14 AM Jerry Vinokurov 
wrote:

> Do you mean that you are trying to compute the percent rank of some data?
> You can use the SparkSQL percent_rank function for that, but I don't think
> that's going to give you any improvement over calling the percentRank
> function on the data frame. Are you currently using a user-defined function
> for this task? Because I bet that's what's slowing you down.
>
> On Mon, Nov 11, 2019 at 9:46 AM Tzahi File  wrote:
>
>> Hi,
>>
>> Currently, I'm using hive huge cluster(m5.24xl * 40 workers) to run a
>> percentile function. I'm trying to improve this job by moving it to run
>> with spark SQL.
>>
>> Any suggestions on how to use a percentile function in Spark?
>>
>>
>> Thanks,
>> --
>> Tzahi File
>> Data Engineer
>> [image: ironSource] <http://www.ironsrc.com/>
>>
>> email tzahi.f...@ironsrc.com
>> mobile +972-546864835
>> fax +972-77-5448273
>> ironSource HQ - 121 Derech Menachem Begin st. Tel Aviv
>> ironsrc.com <http://www.ironsrc.com/>
>> [image: linkedin] <https://www.linkedin.com/company/ironsource>[image:
>> twitter] <https://twitter.com/ironsource>[image: facebook]
>> <https://www.facebook.com/ironSource>[image: googleplus]
>> <https://plus.google.com/+ironsrc>
>> This email (including any attachments) is for the sole use of the
>> intended recipient and may contain confidential information which may be
>> protected by legal privilege. If you are not the intended recipient, or the
>> employee or agent responsible for delivering it to the intended recipient,
>> you are hereby notified that any use, dissemination, distribution or
>> copying of this communication and/or its content is strictly prohibited. If
>> you are not the intended recipient, please immediately notify us by reply
>> email or by telephone, delete this email and destroy any copies. Thank you.
>>
>
>
> --
> http://www.google.com/profiles/grapesmoker
>


-- 


*Patrick McCarthy  *

Senior Data Scientist, Machine Learning Engineering

Dstillery

470 Park Ave South, 17th Floor, NYC 10016


Best practices for data like file storage

2019-11-01 Thread Patrick McCarthy
Hi List,

I'm looking for resources to learn about how to store data on disk for
later access.

For a while my team has been using Spark on top of our existing hdfs/Hive
cluster without much agency as far as what format is used to store the
data. I'd like to learn more about how to re-stage my data to speed up my
own analyses, and to start building expertise to define new data stores.

One example of a problem I'm facing is data which is written to Hive using
a customized protobuf serde. The data contains many very complex types
(arrays of structs of arrays of... ) and I often need very few elements of
any particular record, yet the format requires Spark to deserialize the
entire object.

The sorts of information I'm looking for:

   - Do's and Dont's of laying out a parquet schema
   - Measuring / debugging read speed
   - How to bucket, index, etc.

Thanks!


Re: [Spark SQL]: Does Union operation followed by drop duplicate follows "keep first"

2019-09-13 Thread Patrick McCarthy
If you only care that you're deduping on one of the fields you could add an
index and count like so:

df3 = df1.withColumn('idx',lit(1))
.union(df2.withColumn('idx',lit(2))

remove_df = df3
.groupBy('id')
.agg(collect_set('idx').alias('set_size')
.filter(size(col('set_size') > 1))
.select('id', lit(2).alias('idx'))

# the duplicated ids in the above are now coded for df2, so only those will
be dropped

df3.join(remove_df, on=['id','idx'], how='leftanti')

On Fri, Sep 13, 2019 at 11:44 AM Abhinesh Hada 
wrote:

> Hi,
>
> I am trying to take union of 2 dataframes and then drop duplicate based on
> the value of a specific column. But, I want to make sure that while
> dropping duplicates, the rows from first data frame are kept.
>
> Example:
> df1 = df1.union(df2).dropDuplicates(['id'])
>
>
>

-- 


*Patrick McCarthy  *

Senior Data Scientist, Machine Learning Engineering

Dstillery

470 Park Ave South, 17th Floor, NYC 10016


Re: script running in jupyter 6-7x faster than spark submit

2019-09-11 Thread Patrick McCarthy
Are you running in cluster mode? A large virtualenv zip for the driver sent
into the cluster on a slow pipe could account for much of that eight
minutes.

On Wed, Sep 11, 2019 at 3:17 AM Dhrubajyoti Hati 
wrote:

> Hi,
>
> I just ran the same script in a shell in jupyter notebook and find the
> performance to be similar. So I can confirm this is because the libraries
> used jupyter notebook python is different than the spark-submit python this
> is happening.
>
> But now I have a following question. Are the dependent libraries in a
> python script also transferred to the worker machines when executing a
> python script in spark. Because though the driver python versions are
> different, the workers machines will use their same python environment to
> run the code. If anyone can explain this part, it would be helpful.
>
>
>
>
> *Regards,Dhrubajyoti Hati.Mob No: 9886428028/9652029028*
>
>
> On Wed, Sep 11, 2019 at 9:45 AM Dhrubajyoti Hati 
> wrote:
>
>> Just checked from where the script is submitted i.e. wrt Driver, the
>> python env are different. Jupyter one is running within a the virtual
>> environment which is Python 2.7.5 and the spark-submit one uses 2.6.6. But
>> the executors have the same python version right? I tried doing a
>> spark-submit from jupyter shell, it fails to find python 2.7  which is not
>> there hence throws error.
>>
>> Here is the udf which might take time:
>>
>> import base64
>> import zlib
>>
>> def decompress(data):
>>
>> bytecode = base64.b64decode(data)
>> d = zlib.decompressobj(32 + zlib.MAX_WBITS)
>> decompressed_data = d.decompress(bytecode )
>> return(decompressed_data.decode('utf-8'))
>>
>>
>> Could this because of the two python environment mismatch from Driver side? 
>> But the processing
>>
>> happens in the executor side?
>>
>>
>>
>>
>> *Regards,Dhrub*
>>
>> On Wed, Sep 11, 2019 at 8:59 AM Abdeali Kothari 
>> wrote:
>>
>>> Maybe you can try running it in a python shell or
>>> jupyter-console/ipython instead of a spark-submit and check how much time
>>> it takes too.
>>>
>>> Compare the env variables to check that no additional env configuration
>>> is present in either environment.
>>>
>>> Also is the python environment for both the exact same? I ask because it
>>> looks like you're using a UDF and if the Jupyter python has (let's say)
>>> numpy compiled with blas it would be faster than a numpy without it. Etc.
>>> I.E. Some library you use may be using pure python and another may be using
>>> a faster C extension...
>>>
>>> What python libraries are you using in the UDFs? It you don't use UDFs
>>> at all and use some very simple pure spark functions does the time
>>> difference still exist?
>>>
>>> Also are you using dynamic allocation or some similar spark config which
>>> could vary performance between runs because the same resources we're not
>>> utilized on Jupyter / spark-submit?
>>>
>>>
>>> On Wed, Sep 11, 2019, 08:43 Stephen Boesch  wrote:
>>>
>>>> Sounds like you have done your homework to properly compare .   I'm
>>>> guessing the answer to the following is yes .. but in any case:  are they
>>>> both running against the same spark cluster with the same configuration
>>>> parameters especially executor memory and number of workers?
>>>>
>>>> Am Di., 10. Sept. 2019 um 20:05 Uhr schrieb Dhrubajyoti Hati <
>>>> dhruba.w...@gmail.com>:
>>>>
>>>>> No, i checked for that, hence written "brand new" jupyter notebook.
>>>>> Also the time taken by both are 30 mins and ~3hrs as i am reading a 500
>>>>> gigs compressed base64 encoded text data from a hive table and
>>>>> decompressing and decoding in one of the udfs. Also the time compared is
>>>>> from Spark UI not  how long the job actually takes after submission. Its
>>>>> just the running time i am comparing/mentioning.
>>>>>
>>>>> As mentioned earlier, all the spark conf params even match in two
>>>>> scripts and that's why i am puzzled what going on.
>>>>>
>>>>> On Wed, 11 Sep, 2019, 12:44 AM Patrick McCarthy, <
>>>>> pmccar...@dstillery.com> wrote:
>>>>>
>>>>>> It's not obvious from what you pasted, but perhaps the juypter
>>>>>> notebook already is connected to a running spark

Re: script running in jupyter 6-7x faster than spark submit

2019-09-10 Thread Patrick McCarthy
It's not obvious from what you pasted, but perhaps the juypter notebook
already is connected to a running spark context, while spark-submit needs
to get a new spot in the (YARN?) queue.

I would check the cluster job IDs for both to ensure you're getting new
cluster tasks for each.

On Tue, Sep 10, 2019 at 2:33 PM Dhrubajyoti Hati 
wrote:

> Hi,
>
> I am facing a weird behaviour while running a python script. Here is what
> the code looks like mostly:
>
> def fn1(ip):
>some code...
> ...
>
> def fn2(row):
> ...
> some operations
> ...
> return row1
>
>
> udf_fn1 = udf(fn1)
> cdf = spark.read.table("") //hive table is of size > 500 Gigs with
> ~4500 partitions
> ddf = cdf.withColumn("coly", udf_fn1(cdf.colz)) \
> .drop("colz") \
> .withColumnRenamed("colz", "coly")
>
> edf = ddf \
> .filter(ddf.colp == 'some_value') \
> .rdd.map(lambda row: fn2(row)) \
> .toDF()
>
> print edf.count() // simple way for the performance test in both platforms
>
> Now when I run the same code in a brand new jupyter notebook it runs 6x
> faster than when I run this python script using spark-submit. The
> configurations are printed and  compared from both the platforms and they
> are exact same. I even tried to run this script in a single cell of jupyter
> notebook and still have the same performance. I need to understand if I am
> missing something in the spark-submit which is causing the issue.  I tried
> to minimise the script to reproduce the same error without much code.
>
> Both are run in client mode on a yarn based spark cluster. The machines
> from which both are executed are also the same and from same user.
>
> What i found is the  the quantile values for median for one ran with
> jupyter was 1.3 mins and one ran with spark-submit was ~8.5 mins.  I am not
> able to figure out why this is happening.
>
> Any one faced this kind of issue before or know how to resolve this?
>
> *Regards,*
> *Dhrub*
>


-- 


*Patrick McCarthy  *

Senior Data Scientist, Machine Learning Engineering

Dstillery

470 Park Ave South, 17th Floor, NYC 10016


Re: Hive external table not working in sparkSQL when subdirectories are present

2019-08-07 Thread Patrick McCarthy
Do the permissions on the hive table files on HDFS correspond with what the
spark user is able to read? This might arise from spark being run as
different users.

On Wed, Aug 7, 2019 at 3:15 PM Rishikesh Gawade 
wrote:

> Hi,
> I did not explicitly create a Hive Context. I have been using the
> spark.sqlContext that gets created upon launching the spark-shell.
> Isn't this sqlContext same as the hiveContext?
> Thanks,
> Rishikesh
>
> On Wed, Aug 7, 2019 at 12:43 PM Jörn Franke  wrote:
>
>> Do you use the HiveContext in Spark? Do you configure the same options
>> there? Can you share some code?
>>
>> Am 07.08.2019 um 08:50 schrieb Rishikesh Gawade > >:
>>
>> Hi.
>> I am using Spark 2.3.2 and Hive 3.1.0.
>> Even if i use parquet files the result would be same, because after all
>> sparkSQL isn't able to descend into the subdirectories over which the table
>> is created. Could there be any other way?
>> Thanks,
>> Rishikesh
>>
>> On Tue, Aug 6, 2019, 1:03 PM Mich Talebzadeh 
>> wrote:
>>
>>> which versions of Spark and Hive are you using.
>>>
>>> what will happen if you use parquet tables instead?
>>>
>>> HTH
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Tue, 6 Aug 2019 at 07:58, Rishikesh Gawade 
>>> wrote:
>>>
>>>> Hi.
>>>> I have built a Hive external table on top of a directory 'A' which has
>>>> data stored in ORC format. This directory has several subdirectories inside
>>>> it, each of which contains the actual ORC files.
>>>> These subdirectories are actually created by spark jobs which ingest
>>>> data from other sources and write it into this directory.
>>>> I tried creating a table and setting the table properties of the same
>>>> as *hive.mapred.supports.subdirectories=TRUE* and
>>>> *mapred.input.dir.recursive**=TRUE*.
>>>> As a result of this, when i fire the simplest query of *select
>>>> count(*) from ExtTable* via the Hive CLI, it successfully gives me the
>>>> expected count of records in the table.
>>>> However, when i fire the same query via sparkSQL, i get count = 0.
>>>>
>>>> I think the sparkSQL isn't able to descend into the subdirectories for
>>>> getting the data while hive is able to do so.
>>>> Are there any configurations needed to be set on the spark side so that
>>>> this works as it does via hive cli?
>>>> I am using Spark on YARN.
>>>>
>>>> Thanks,
>>>> Rishikesh
>>>>
>>>> Tags: subdirectories, subdirectory, recursive, recursion, hive external
>>>> table, orc, sparksql, yarn
>>>>
>>>

-- 


*Patrick McCarthy  *

Senior Data Scientist, Machine Learning Engineering

Dstillery

470 Park Ave South, 17th Floor, NYC 10016


Re: Spark Image resizing

2019-07-31 Thread Patrick McCarthy
It won't be very efficient but you could write a python UDF using
PythonMagick - https://wiki.python.org/moin/ImageMagick

If you have PyArrow > 0.10 then you might be able to get a boost by saving
images in a column as BinaryType and writing a PandasUDF.

On Wed, Jul 31, 2019 at 6:22 AM Nick Dawes  wrote:

> Any other way of resizing the image before creating the DataFrame in
> Spark? I know opencv does it. But I don't have opencv on my cluster. I have
> Anaconda python packages installed on my cluster.
>
> Any ideas will be appreciated.  Thank you!
>
> On Tue, Jul 30, 2019, 4:17 PM Nick Dawes  wrote:
>
>> Hi
>>
>> I'm new to spark image data source.
>>
>> After creating a dataframe using Spark's image data source, I would like
>> to resize the images in PySpark.
>>
>> df = spark.read.format("image").load(imageDir)
>>
>> Can you please help me with this?
>>
>> Nick
>>
>

-- 


*Patrick McCarthy  *

Senior Data Scientist, Machine Learning Engineering

Dstillery

470 Park Ave South, 17th Floor, NYC 10016


Re: [Beginner] Run compute on large matrices and return the result in seconds?

2019-07-17 Thread Patrick McCarthy
; On Thu, Jul 11, 2019 at 9:24 AM Gautham Acharya <
> gauth...@alleninstitute.org> wrote:
>
> Ping? I would really appreciate advice on this! Thank you!
>
>
>
> *From:* Gautham Acharya
> *Sent:* Tuesday, July 9, 2019 4:22 PM
> *To:* user@spark.apache.org
> *Subject:* [Beginner] Run compute on large matrices and return the result
> in seconds?
>
>
>
> This is my first email to this mailing list, so I apologize if I made any
> errors.
>
>
>
> My team's going to be building an application and I'm investigating some
> options for distributed compute systems. We want to be performing computes
> on large matrices.
>
>
>
> The requirements are as follows:
>
>
>
> 1. The matrices can be expected to be up to 50,000 columns x 3
> million rows. The values are all integers (except for the row/column
> headers).
>
> 2. The application needs to select a specific row, and calculate the
> correlation coefficient (
> https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.corr.html
> <https://nam05.safelinks.protection.outlook.com/?url=https%3A%2F%2Fpandas.pydata.org%2Fpandas-docs%2Fstable%2Freference%2Fapi%2Fpandas.DataFrame.corr.html=02%7C01%7C%7C7d44353d2dd5420bc35108d70abff11d%7C32669cd6737f4b398bddd6951120d3fc%7C0%7C1%7C636989691818868018=e5blX8ItE1JDJRx9D3FnmsXp4TnOKvyH6fA6%2Fw2QTbI%3D=0>
>  )
> against every other row. This means up to 3 million different calculations.
>
> 3. A sorted list of the correlation coefficients and their
> corresponding row keys need to be returned in under 5 seconds.
>
> 4. Users will eventually request random row/column subsets to run
> calculations on, so precomputing our coefficients is not an option. This
> needs to be done on request.
>
>
>
> I've been looking at many compute solutions, but I'd consider Spark first
> due to the widespread use and community. I currently have my data loaded
> into Apache Hbase for a different scenario (random access of rows/columns).
> I’ve naively tired loading a dataframe from the CSV using a Spark instance
> hosted on AWS EMR, but getting the results for even a single correlation
> takes over 20 seconds.
>
>
>
> Thank you!
>
>
>
>
>
> --gautham
>
>
>
>

-- 


*Patrick McCarthy  *

Senior Data Scientist, Machine Learning Engineering

Dstillery

470 Park Ave South, 17th Floor, NYC 10016


Re: spark python script importError problem

2019-07-16 Thread Patrick McCarthy
Your module 'feature' isn't available to the yarn workers, so you'll need
to either install it on them if you have access, or else upload to the
workers at runtime using --py-files or similar.

On Tue, Jul 16, 2019 at 7:16 AM zenglong chen 
wrote:

> Hi,all,
>   When i run a run a python script on spark submit,it done well in
> local[*] mode,but not in standalone mode or yarn mode.The error like below:
>
> Caused by: org.apache.spark.api.python.PythonException: Traceback (most
> recent call last):
>   File "/usr/local/lib/python2.7/dist-packages/pyspark/worker.py", line
> 364, in main
> func, profiler, deserializer, serializer = read_command(pickleSer,
> infile)
>   File "/usr/local/lib/python2.7/dist-packages/pyspark/worker.py", line
> 69, in read_command
> command = serializer._read_with_length(file)
>   File "/usr/local/lib/python2.7/dist-packages/pyspark/serializers.py",
> line 172, in _read_with_length
> return self.loads(obj)
>   File "/usr/local/lib/python2.7/dist-packages/pyspark/serializers.py",
> line 583, in loads
> return pickle.loads(obj)
> ImportError: No module named feature.user.user_feature
>
> The script also run well in "sbin/start-master.sh sbin/start-slave.sh",but
> it has the same importError problem in "sbin/start-master.sh
> sbin/start-slaves.sh".The conf/slaves contents is 'localhost'.
>
> What should i do to solve this import problem?Thanks!!!
>


-- 


*Patrick McCarthy  *

Senior Data Scientist, Machine Learning Engineering

Dstillery

470 Park Ave South, 17th Floor, NYC 10016


Re: Anaconda installation with Pyspark/Pyarrow (2.3.0+) on cloudera managed server

2019-05-06 Thread Patrick McCarthy
Thanks Gourav.

Incidentally, since the regular UDF is row-wise, we could optimize that a
bit by taking the convert() closure and simply making that the UDF.

Since there's that MGRS object that we have to create too, we could
probably optimize it further by applying the UDF via rdd.mapPartitions,
which would allow the UDF to instantiate objects once per-partition instead
of per-row and then iterate element-wise through the rows of the partition.

All that said, having done the above on prior projects I find the pandas
abstractions to be very elegant and friendly to the end-user so I haven't
looked back :)

(The common memory model via Arrow is a nice boost too!)

On Mon, May 6, 2019 at 11:13 AM Gourav Sengupta 
wrote:

> The proof is in the pudding
>
> :)
>
>
>
> On Mon, May 6, 2019 at 2:46 PM Gourav Sengupta 
> wrote:
>
>> Hi Patrick,
>>
>> super duper, thanks a ton for sharing the code. Can you please confirm
>> that this runs faster than the regular UDF's?
>>
>> Interestingly I am also running same transformations using another geo
>> spatial library in Python, where I am passing two fields and getting back
>> an array.
>>
>>
>> Regards,
>> Gourav Sengupta
>>
>> On Mon, May 6, 2019 at 2:00 PM Patrick McCarthy 
>> wrote:
>>
>>> Human time is considerably more expensive than computer time, so in that
>>> regard, yes :)
>>>
>>> This took me one minute to write and ran fast enough for my needs. If
>>> you're willing to provide a comparable scala implementation I'd be happy to
>>> compare them.
>>>
>>> @F.pandas_udf(T.StringType(), F.PandasUDFType.SCALAR)
>>>
>>> def generate_mgrs_series(lat_lon_str, level):
>>>
>>> import mgrs
>>>
>>> m = mgrs.MGRS()
>>>
>>> precision_level = 0
>>>
>>> levelval = level[0]
>>>
>>> if levelval == 1000:
>>>
>>>precision_level = 2
>>>
>>> if levelval == 100:
>>>
>>>precision_level = 3
>>>
>>> def convert(ll_str):
>>>
>>>   lat, lon = ll_str.split('_')
>>>
>>>   return m.toMGRS(lat, lon,
>>>
>>>   MGRSPrecision = precision_level)
>>>
>>> return lat_lon_str.apply(lambda x: convert(x))
>>>
>>> On Mon, May 6, 2019 at 8:23 AM Gourav Sengupta <
>>> gourav.sengu...@gmail.com> wrote:
>>>
>>>> And you found the PANDAS UDF more performant ? Can you share your code
>>>> and prove it?
>>>>
>>>> On Sun, May 5, 2019 at 9:24 PM Patrick McCarthy <
>>>> pmccar...@dstillery.com> wrote:
>>>>
>>>>> I disagree that it's hype. Perhaps not 1:1 with pure scala
>>>>> performance-wise, but for python-based data scientists or others with a 
>>>>> lot
>>>>> of python expertise it allows one to do things that would otherwise be
>>>>> infeasible at scale.
>>>>>
>>>>> For instance, I recently had to convert latitude / longitude pairs to
>>>>> MGRS strings (
>>>>> https://en.wikipedia.org/wiki/Military_Grid_Reference_System).
>>>>> Writing a pandas UDF (and putting the mgrs python package into a conda
>>>>> environment) was _significantly_ easier than any alternative I found.
>>>>>
>>>>> @Rishi - depending on your network is constructed, some lag could come
>>>>> from just uploading the conda environment. If you load it from hdfs with
>>>>> --archives does it improve?
>>>>>
>>>>> On Sun, May 5, 2019 at 2:15 PM Gourav Sengupta <
>>>>> gourav.sengu...@gmail.com> wrote:
>>>>>
>>>>>> hi,
>>>>>>
>>>>>> Pandas UDF is a bit of hype. One of their blogs shows the used case
>>>>>> of adding 1 to a field using Pandas UDF which is pretty much pointless. 
>>>>>> So
>>>>>> you go beyond the blog and realise that your actual used case is more 
>>>>>> than
>>>>>> adding one :) and the reality hits you
>>>>>>
>>>>>> Pandas UDF in certain scenarios is actually slow, try using apply for
>>>>>> a custom or pandas function. In fact in certain scenarios I have found
>>>>>> general UDF's work much faster and use much less memory. Therefore test 
>>>>>> out
>>>>>> y

Re: Anaconda installation with Pyspark/Pyarrow (2.3.0+) on cloudera managed server

2019-05-06 Thread Patrick McCarthy
Human time is considerably more expensive than computer time, so in that
regard, yes :)

This took me one minute to write and ran fast enough for my needs. If
you're willing to provide a comparable scala implementation I'd be happy to
compare them.

@F.pandas_udf(T.StringType(), F.PandasUDFType.SCALAR)

def generate_mgrs_series(lat_lon_str, level):

import mgrs

m = mgrs.MGRS()

precision_level = 0

levelval = level[0]

if levelval == 1000:

   precision_level = 2

if levelval == 100:

   precision_level = 3

def convert(ll_str):

  lat, lon = ll_str.split('_')

  return m.toMGRS(lat, lon,

  MGRSPrecision = precision_level)

return lat_lon_str.apply(lambda x: convert(x))

On Mon, May 6, 2019 at 8:23 AM Gourav Sengupta 
wrote:

> And you found the PANDAS UDF more performant ? Can you share your code and
> prove it?
>
> On Sun, May 5, 2019 at 9:24 PM Patrick McCarthy 
> wrote:
>
>> I disagree that it's hype. Perhaps not 1:1 with pure scala
>> performance-wise, but for python-based data scientists or others with a lot
>> of python expertise it allows one to do things that would otherwise be
>> infeasible at scale.
>>
>> For instance, I recently had to convert latitude / longitude pairs to
>> MGRS strings (
>> https://en.wikipedia.org/wiki/Military_Grid_Reference_System). Writing a
>> pandas UDF (and putting the mgrs python package into a conda environment)
>> was _significantly_ easier than any alternative I found.
>>
>> @Rishi - depending on your network is constructed, some lag could come
>> from just uploading the conda environment. If you load it from hdfs with
>> --archives does it improve?
>>
>> On Sun, May 5, 2019 at 2:15 PM Gourav Sengupta 
>> wrote:
>>
>>> hi,
>>>
>>> Pandas UDF is a bit of hype. One of their blogs shows the used case of
>>> adding 1 to a field using Pandas UDF which is pretty much pointless. So you
>>> go beyond the blog and realise that your actual used case is more than
>>> adding one :) and the reality hits you
>>>
>>> Pandas UDF in certain scenarios is actually slow, try using apply for a
>>> custom or pandas function. In fact in certain scenarios I have found
>>> general UDF's work much faster and use much less memory. Therefore test out
>>> your used case (with at least 30 million records) before trying to use the
>>> Pandas UDF option.
>>>
>>> And when you start using GroupMap then you realise after reading
>>> https://spark.apache.org/docs/latest/sql-pyspark-pandas-with-arrow.html#pandas-udfs-aka-vectorized-udfs
>>> that "Oh!! now I can run into random OOM errors and the maxrecords options
>>> does not help at all"
>>>
>>> Excerpt from the above link:
>>> Note that all data for a group will be loaded into memory before the
>>> function is applied. This can lead to out of memory exceptions, especially
>>> if the group sizes are skewed. The configuration for maxRecordsPerBatch
>>> <https://spark.apache.org/docs/latest/sql-pyspark-pandas-with-arrow.html#setting-arrow-batch-size>
>>>  is
>>> not applied on groups and it is up to the user to ensure that the grouped
>>> data will fit into the available memory.
>>>
>>> Let me know about your used case in case possible
>>>
>>>
>>> Regards,
>>> Gourav
>>>
>>> On Sun, May 5, 2019 at 3:59 AM Rishi Shah 
>>> wrote:
>>>
>>>> Thanks Patrick! I tried to package it according to this instructions,
>>>> it got distributed on the cluster however the same spark program that takes
>>>> 5 mins without pandas UDF has started to take 25mins...
>>>>
>>>> Have you experienced anything like this? Also is Pyarrow 0.12 supported
>>>> with Spark 2.3 (according to documentation, it should be fine)?
>>>>
>>>> On Tue, Apr 30, 2019 at 9:35 AM Patrick McCarthy <
>>>> pmccar...@dstillery.com> wrote:
>>>>
>>>>> Hi Rishi,
>>>>>
>>>>> I've had success using the approach outlined here:
>>>>> https://community.hortonworks.com/articles/58418/running-pyspark-with-conda-env.html
>>>>>
>>>>> Does this work for you?
>>>>>
>>>>> On Tue, Apr 30, 2019 at 12:32 AM Rishi Shah 
>>>>> wrote:
>>>>>
>>>>>> modified the subject & would like to clarify that I am looking to
>>>>>> create an anaconda parcel with pyarrow and other libraries, so t

Re: Anaconda installation with Pyspark/Pyarrow (2.3.0+) on cloudera managed server

2019-05-05 Thread Patrick McCarthy
I disagree that it's hype. Perhaps not 1:1 with pure scala
performance-wise, but for python-based data scientists or others with a lot
of python expertise it allows one to do things that would otherwise be
infeasible at scale.

For instance, I recently had to convert latitude / longitude pairs to MGRS
strings (https://en.wikipedia.org/wiki/Military_Grid_Reference_System).
Writing a pandas UDF (and putting the mgrs python package into a conda
environment) was _significantly_ easier than any alternative I found.

@Rishi - depending on your network is constructed, some lag could come from
just uploading the conda environment. If you load it from hdfs with
--archives does it improve?

On Sun, May 5, 2019 at 2:15 PM Gourav Sengupta 
wrote:

> hi,
>
> Pandas UDF is a bit of hype. One of their blogs shows the used case of
> adding 1 to a field using Pandas UDF which is pretty much pointless. So you
> go beyond the blog and realise that your actual used case is more than
> adding one :) and the reality hits you
>
> Pandas UDF in certain scenarios is actually slow, try using apply for a
> custom or pandas function. In fact in certain scenarios I have found
> general UDF's work much faster and use much less memory. Therefore test out
> your used case (with at least 30 million records) before trying to use the
> Pandas UDF option.
>
> And when you start using GroupMap then you realise after reading
> https://spark.apache.org/docs/latest/sql-pyspark-pandas-with-arrow.html#pandas-udfs-aka-vectorized-udfs
> that "Oh!! now I can run into random OOM errors and the maxrecords options
> does not help at all"
>
> Excerpt from the above link:
> Note that all data for a group will be loaded into memory before the
> function is applied. This can lead to out of memory exceptions, especially
> if the group sizes are skewed. The configuration for maxRecordsPerBatch
> <https://spark.apache.org/docs/latest/sql-pyspark-pandas-with-arrow.html#setting-arrow-batch-size>
>  is
> not applied on groups and it is up to the user to ensure that the grouped
> data will fit into the available memory.
>
> Let me know about your used case in case possible
>
>
> Regards,
> Gourav
>
> On Sun, May 5, 2019 at 3:59 AM Rishi Shah 
> wrote:
>
>> Thanks Patrick! I tried to package it according to this instructions, it
>> got distributed on the cluster however the same spark program that takes 5
>> mins without pandas UDF has started to take 25mins...
>>
>> Have you experienced anything like this? Also is Pyarrow 0.12 supported
>> with Spark 2.3 (according to documentation, it should be fine)?
>>
>> On Tue, Apr 30, 2019 at 9:35 AM Patrick McCarthy 
>> wrote:
>>
>>> Hi Rishi,
>>>
>>> I've had success using the approach outlined here:
>>> https://community.hortonworks.com/articles/58418/running-pyspark-with-conda-env.html
>>>
>>> Does this work for you?
>>>
>>> On Tue, Apr 30, 2019 at 12:32 AM Rishi Shah 
>>> wrote:
>>>
>>>> modified the subject & would like to clarify that I am looking to
>>>> create an anaconda parcel with pyarrow and other libraries, so that I can
>>>> distribute it on the cloudera cluster..
>>>>
>>>> On Tue, Apr 30, 2019 at 12:21 AM Rishi Shah 
>>>> wrote:
>>>>
>>>>> Hi All,
>>>>>
>>>>> I have been trying to figure out a way to build anaconda parcel with
>>>>> pyarrow included for my cloudera managed server for distribution but this
>>>>> doesn't seem to work right. Could someone please help?
>>>>>
>>>>> I have tried to install anaconda on one of the management nodes on
>>>>> cloudera cluster... tarred the directory, but this directory doesn't
>>>>> include all the packages to form a proper parcel for distribution.
>>>>>
>>>>> Any help is much appreciated!
>>>>>
>>>>> --
>>>>> Regards,
>>>>>
>>>>> Rishi Shah
>>>>>
>>>>
>>>>
>>>> --
>>>> Regards,
>>>>
>>>> Rishi Shah
>>>>
>>>
>>>
>>> --
>>>
>>>
>>> *Patrick McCarthy  *
>>>
>>> Senior Data Scientist, Machine Learning Engineering
>>>
>>> Dstillery
>>>
>>> 470 Park Ave South, 17th Floor, NYC 10016
>>>
>>
>>
>> --
>> Regards,
>>
>> Rishi Shah
>>
>

-- 


*Patrick McCarthy  *

Senior Data Scientist, Machine Learning Engineering

Dstillery

470 Park Ave South, 17th Floor, NYC 10016


Re: Anaconda installation with Pyspark/Pyarrow (2.3.0+) on cloudera managed server

2019-04-30 Thread Patrick McCarthy
Hi Rishi,

I've had success using the approach outlined here:
https://community.hortonworks.com/articles/58418/running-pyspark-with-conda-env.html

Does this work for you?

On Tue, Apr 30, 2019 at 12:32 AM Rishi Shah 
wrote:

> modified the subject & would like to clarify that I am looking to create
> an anaconda parcel with pyarrow and other libraries, so that I can
> distribute it on the cloudera cluster..
>
> On Tue, Apr 30, 2019 at 12:21 AM Rishi Shah 
> wrote:
>
>> Hi All,
>>
>> I have been trying to figure out a way to build anaconda parcel with
>> pyarrow included for my cloudera managed server for distribution but this
>> doesn't seem to work right. Could someone please help?
>>
>> I have tried to install anaconda on one of the management nodes on
>> cloudera cluster... tarred the directory, but this directory doesn't
>> include all the packages to form a proper parcel for distribution.
>>
>> Any help is much appreciated!
>>
>> --
>> Regards,
>>
>> Rishi Shah
>>
>
>
> --
> Regards,
>
> Rishi Shah
>


-- 


*Patrick McCarthy  *

Senior Data Scientist, Machine Learning Engineering

Dstillery

470 Park Ave South, 17th Floor, NYC 10016


Re: Spark ML with null labels

2019-01-10 Thread Patrick McCarthy
I actually tried that first. I moved away from it because the algorithm
needs to evaluate all records for all models, for instance, a model trained
on (2,4) needs to be evaluated on a record whose true label is 8. I found
that if I apply the filter in the label-creation transformer, then a record
whose label is not 2 or 4 will not be scored. I'd be curious to discover if
there's a way to make that approach work, however.

On Thu, Jan 10, 2019 at 12:20 PM Xiangrui Meng  wrote:

> In your custom transformer that produces labels, can you filter null
> labels? A transformer doesn't always need to do 1:1 mapping.
>
> On Thu, Jan 10, 2019, 7:53 AM Patrick McCarthy
> 
>> I'm trying to implement an algorithm on the MNIST digits that runs like
>> so:
>>
>>
>>- for every pair of digits (0,1), (0,2), (0,3)... assign a 0/1 label
>>to the digits and build a LogisticRegression Classifier -- 45 in total
>>- Fit every classifier on the test set separately
>>- Aggregate the results per record of the test set and compute a
>>prediction from the 45 predictions
>>
>> I tried implementing this with a Pipeline, composed of
>>
>>- stringIndexer
>>- a custom transformer which accepts a lower-digit and upper-digit
>>argument, producing the 0/1 label
>>- a custom transformer to assemble the indexed strings to VectorUDT
>>- LogisticRegression
>>
>> fed by a list of paramMaps. It failed because the fit() method of
>> logistic couldn't handle cases of null labels, i.e. a case where my 0/1
>> transformer found neither the lower nor the upper digit label. I fixed this
>> by extending the LogisticRegression class and overriding the fit() method
>> to include a filter for labels in (0,1) -- I didn't want to alter the
>> transform method.
>>
>> Now, I'd like to tune these models using CrossValidator with an estimator
>> of pipeline but when I run either fitMultiple on my paramMap or I loop over
>> the paramMaps, I get arcane Scala errors.
>>
>>
>> Is there a better way to build this procedure? Thanks!
>>
>


Spark ML with null labels

2019-01-10 Thread Patrick McCarthy
I'm trying to implement an algorithm on the MNIST digits that runs like so:


   - for every pair of digits (0,1), (0,2), (0,3)... assign a 0/1 label to
   the digits and build a LogisticRegression Classifier -- 45 in total
   - Fit every classifier on the test set separately
   - Aggregate the results per record of the test set and compute a
   prediction from the 45 predictions

I tried implementing this with a Pipeline, composed of

   - stringIndexer
   - a custom transformer which accepts a lower-digit and upper-digit
   argument, producing the 0/1 label
   - a custom transformer to assemble the indexed strings to VectorUDT
   - LogisticRegression

fed by a list of paramMaps. It failed because the fit() method of logistic
couldn't handle cases of null labels, i.e. a case where my 0/1 transformer
found neither the lower nor the upper digit label. I fixed this by
extending the LogisticRegression class and overriding the fit() method to
include a filter for labels in (0,1) -- I didn't want to alter the
transform method.

Now, I'd like to tune these models using CrossValidator with an estimator
of pipeline but when I run either fitMultiple on my paramMap or I loop over
the paramMaps, I get arcane Scala errors.


Is there a better way to build this procedure? Thanks!


Re: Need help with SparkSQL Query

2018-12-17 Thread Patrick McCarthy
Untested, but something like the below should work:

from pyspark.sql import functions as F
from pyspark.sql import window as W

(record
.withColumn('ts_rank',
F.dense_rank().over(W.Window.orderBy('timestamp').partitionBy("id"))
.filter(F.col('ts_rank')==1)
.drop('ts_rank')
)


On Mon, Dec 17, 2018 at 4:04 PM Nikhil Goyal  wrote:

> Hi guys,
>
> I have a dataframe of type Record (id: Long, timestamp: Long, isValid:
> Boolean,  other metrics)
>
> Schema looks like this:
> root
>  |-- id: long (nullable = true)
>  |-- timestamp: long (nullable = true)
>  |-- isValid: boolean (nullable = true)
> .
>
> I need to find the earliest valid record per id. In RDD world I can do
> groupBy 'id' and find the earliest one but I am not sure how I can do it in
> SQL. Since I am doing this in PySpark I cannot really use DataSet API for
> this.
>
> One thing I can do is groupBy 'id', find the earliest timestamp available
> and then join with the original dataframe to get the right record (all the
> metrics).
>
> Or I can create a single column with all the records and then implement a
> UDAF in scala and use it in pyspark.
>
> Both solutions don't seem to be straight forward. Is there a simpler
> solution to this?
>
> Thanks
> Nikhil
>


Re: Questions on Python support with Spark

2018-11-12 Thread Patrick McCarthy
I've never tried to run a stand-alone cluster alongside hadoop, but why not
run Spark as a yarn application? That way it can absolutely (in fact
preferably) use the distributed file system.

On Fri, Nov 9, 2018 at 5:04 PM, Arijit Tarafdar  wrote:

> Hello All,
>
>
>
> We have a requirement to run PySpark in standalone cluster mode and also
> reference python libraries (egg/wheel) which are not local but placed in a
> distributed storage like HDFS. From the code it looks like none of cases
> are supported.
>
>
>
> Questions are:
>
>
>
>1. Why is PySpark supported only in standalone client mode?
>2. Why –py-files only support local files and not files stored in
>remote stores?
>
>
>
> We will like to update the Spark code to support these scenarios but just
> want to be aware of any technical difficulties that the community has faced
> while trying to support those.
>
>
>
> Thanks, Arijit
>


Re: Python Dependencies Issue on EMR

2018-09-14 Thread Patrick McCarthy
You didn't say how you're zipping the dependencies, but I'm guessing you
either include .egg files or zipped up a virtualenv. In either case, the
extra C stuff that scipy and pandas rely upon doesn't get included.

An approach like this solved the last problem I had that seemed like this -
https://community.hortonworks.com/articles/58418/running-pyspark-with-conda-env.html

On Thu, Sep 13, 2018 at 10:08 PM, Jonas Shomorony 
wrote:

> Hey everyone,
>
>
> I am currently trying to run a Python Spark job (using YARN client mode)
> that uses multiple libraries, on a Spark cluster on Amazon EMR. To do that,
> I create a dependencies.zip file that contains all of the
> dependencies/libraries (installed through pip) for the job to run
> successfully, such as pandas, scipy, tqdm, psycopg2, etc. The
> dependencies.zip file is contained within an outside directory (let’s call
> it “project”) that contains all the code to run my Spark job. I then zip up
> everything within project (including dependencies.zip) into project.zip.
> Then, I call spark-submit on the master node in my EMR cluster as follows:
>
>
> `spark-submit --packages … --py-files project.zip --jars ...
> run_command.py`
>
>
> Within “run_command.py” I add dependencies.zip as follows:
>
> `self.spark.sparkContext.addPyFile("dependencies.zip”)`
>
>
> The run_command.py then uses other files within project.zip to complete
> the spark job, and within those files, I import various libraries (found in
> dependencies.zip).
>
>
> I am running into a strange issue where all of the libraries are imported
> correctly (with no problems) with the exception of scipy and pandas.
>
>
> For scipy I get the following error:
>
>
> `File "/mnt/tmp/pip-install-79wp6w/scipy/scipy/__init__.py", line 119, in
> 
>
>   File "/mnt/tmp/pip-install-79wp6w/scipy/scipy/_lib/_ccallback.py", line
> 1, in 
>
> ImportError: cannot import name _ccallback_c`
>
>
> And for pandas I get this error message:
>
>
> `File "/mnt/tmp/pip-install-79wp6w/pandas/pandas/__init__.py", line 35,
> in 
>
> ImportError: C extension: No module named tslib not built. If you want to
> import pandas from the source directory, you may need to run 'python
> setup.py build_ext --inplace --force' to build the C extensions first.`
>
>
> When I comment out the imports for these two libraries (and their use from
> within the code) everything works fine.
>
>
> Surprisingly, when I run the application locally (on master node) without
> passing in dependencies.zip, it picks and resolves the libraries from
> site-packages correctly and successfully runs to completion.
> dependencies.zip is created by zipping the contents of site-packages.
>
>
> Does anyone have any ideas as to what may be happening here? I would
> really appreciate it.
>
>
> pip version: 18.0
>
> spark version: 2.3.1
>
> python version: 2.7
>
>
> Thank you,
>
>
> Jonas
>
>


Re: How to make pyspark use custom python?

2018-09-06 Thread Patrick McCarthy
It looks like for whatever reason your cluster isn't using the python you
distributed, or said distribution doesn't contain what you think.

I've used the following with success to deploy a conda environment to my
cluster at runtime:
https://henning.kropponline.de/2016/09/24/running-pyspark-with-conda-env/

On Thu, Sep 6, 2018 at 2:58 AM, Hyukjin Kwon  wrote:

> Are you doubly sure if it is an issue in Spark? I used custom python
> several times with setting it in PYSPARK_PYTHON before and it was no
> problem.
>
> 2018년 9월 6일 (목) 오후 2:21, mithril 님이 작성:
>
>> For better looking , please see
>> https://stackoverflow.com/questions/52178406/howto-make-
>> pyspark-use-custom-python
>> > pyspark-use-custom-python>
>>
>> --
>>
>>
>> I am using zeppelin connect remote spark cluster.
>>
>> remote spark is using system python 2.7 .
>>
>> I want to switch to miniconda3, install a lib pyarrow.
>> What I do is :
>>
>> 1. Download miniconda3, install some libs, scp miniconda3 folder to spark
>> master and slaves.
>> 2. adding `PYSPARK_PYTHON="/usr/local/miniconda3/bin/python"` to
>> `spark-env.sh` in spark master and slaves.
>> 3. restart spark and zeppelin
>> 4. Running code
>>
>> %spark.pyspark
>>
>> import pandas as pd
>> from pyspark.sql.functions import pandas_udf,PandasUDFType
>>
>>
>> @pandas_udf(df.schema, PandasUDFType.GROUPED_MAP)
>> def process_order_items(pdf):
>>
>> pdf.loc[:, 'total_price'] = pdf['price'] * pdf['count']
>>
>> d = {'has_discount':'count',
>> 'clearance':'count',
>> 'count': ['count', 'sum'],
>> 'price_guide':'max',
>> 'total_price': 'sum'
>>
>> }
>>
>> pdf1 = pdf.groupby('day').agg(d)
>> pdf1.columns = pdf1.columns.map('_'.join)
>> d1 = {'has_discount_count':'discount_order_count',
>> 'clearance_count':'clearance_order_count',
>> 'count_count':'order_count',
>> 'count_sum':'sale_count',
>> 'price_guide_max':'price_guide',
>> 'total_price_sum': 'total_price'
>> }
>>
>> pdf2 = pdf1.rename(columns=d1)
>>
>> pdf2.loc[:, 'discount_sale_count'] =
>> pdf.loc[pdf.has_discount>0,
>> 'count'].resample(freq).sum()
>> pdf2.loc[:, 'clearance_sale_count'] = pdf.loc[pdf.clearance>0,
>> 'count'].resample(freq).sum()
>> pdf2.loc[:, 'price'] = pdf2.total_price / pdf2.sale_count
>>
>> pdf2 = pdf2.drop(pdf2[pdf2.order_count == 0].index)
>>
>> return pdf2
>>
>>
>> results = df.groupby("store_id",
>> "product_id").apply(process_order_items)
>>
>> results.select(['store_id', 'price']).show(5)
>>
>>
>> Got error :
>>
>> Py4JJavaError: An error occurred while calling o172.showString.
>> : org.apache.spark.SparkException: Job aborted due to stage failure:
>> Task 0 in stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in
>> stage 6.0 (TID 143, 10.104.33.18, executor 2):
>> org.apache.spark.api.python.PythonException: Traceback (most recent call
>> last):
>>   File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py",
>> line
>> 230, in main
>> process()
>>   File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py",
>> line
>> 225, in process
>> serializer.dump_stream(func(split_index, iterator), outfile)
>>   File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py",
>> line
>> 150, in 
>> func = lambda _, it: map(mapper, it)
>>   File "/usr/local/spark/python/lib/pyspark.zip/pyspark/
>> serializers.py",
>> line 276, in load_stream
>> import pyarrow as pa
>> ImportError: No module named pyarrow
>>
>>
>> `10.104.33.18` is spark master,  so I think the `PYSPARK_PYTHON` is not
>> set
>> correctly .
>>
>> `pyspark`
>>
>> I login to master and slaves, run `pyspark interpreter` in each, and found
>> `import pyarrow` do not throw exception .
>>
>>
>> PS: `pyarrow` also installed in the machine which running zeppelin.
>>
>> --
>>
>> More info:
>>
>>
>> 1. spark cluster is installed in A, B, C , zeppelin is installed in D.
>> 2. `PYSPARK_PYTHON` is set in `spark-env.sh` in each A, B, C
>> 3. `import pyarrow` is fine with `/usr/local/spark/bin/pyspark` in A, B
>> ,C /
>> 4. `import pyarrow` is fine on A, B ,C custom python(miniconda3)
>> 5. `import pyarrow` is fine on D's default python(miniconda3, path is
>> different with A, B ,C , but it is doesn't matter)
>>
>>
>>
>> So I completely coundn't understand why it doesn't work.
>>
>>
>>
>>
>>
>>
>>
>> --
>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>


Re: Pitfalls of partitioning by host?

2018-08-28 Thread Patrick McCarthy
I'm not 100% sure, but a naive repartition() seems to cause a shuffle. If
this is actually happening, it's just wasteful overhead. The ambition is to
say "divide the data into partitions, but make sure you don't move it in
doing so".



On Tue, Aug 28, 2018 at 2:06 PM, Patrick McCarthy 
wrote:

> I'm not 100% sure, but a naive repartition() seems to cause a shuffle. If
> this is actually happening, it's just wasteful overhead.
>
> On Tue, Aug 28, 2018 at 1:03 PM, Sonal Goyal 
> wrote:
>
>> Hi Patrick,
>>
>> Sorry is there something here that helps you beyond repartition(number of
>> partitons) or calling your udf on foreachPartition? If your data is on
>> disk, Spark is already partitioning it for you by rows. How is adding the
>> host info helping?
>>
>> Thanks,
>> Sonal
>> Nube Technologies <http://www.nubetech.co>
>>
>> <http://in.linkedin.com/in/sonalgoyal>
>>
>>
>>
>> On Tue, Aug 28, 2018 at 8:29 PM, Patrick McCarthy <
>> pmccar...@dstillery.com.invalid> wrote:
>>
>>> Mostly I'm guessing that it adds efficiency to a job where partitioning
>>> is required but shuffling is not.
>>>
>>> For example, if I want to apply a UDF to 1tb of records on disk, I might
>>> need to repartition(5) to get the task size down to an acceptable size
>>> for my cluster. If I don't care that it's totally balanced, then I'd hope
>>> that I could save a lot of overhead with
>>>
>>> foo = df.withColumn('randkey',F.floor(1000*F.rand())).repartition(5000,
>>> 'randkey','host').apply(udf)
>>>
>>>
>>> On Mon, Aug 27, 2018 at 10:11 PM, Michael Artz 
>>> wrote:
>>>
>>>> Well if we think of shuffling as a necessity to perform an operation,
>>>> then the problem would be that you are adding a ln aggregation stage to a
>>>> job that is going to get shuffled anyway.  Like if you need to join two
>>>> datasets, then Spark will still shuffle the data, whether they are grouped
>>>> by hostname prior to that or not.  My question is, is there anything else
>>>> that you would expect to gain, except for enforcing maybe a dataset that is
>>>> already bucketed? Like you could enforce that data is where it is supposed
>>>> to be, but what else would you avoid?
>>>>
>>>> Sent from my iPhone
>>>>
>>>> > On Aug 27, 2018, at 12:22 PM, Patrick McCarthy <
>>>> pmccar...@dstillery.com.INVALID> wrote:
>>>> >
>>>> > When debugging some behavior on my YARN cluster I wrote the following
>>>> PySpark UDF to figure out what host was operating on what row of data:
>>>> >
>>>> > @F.udf(T.StringType())
>>>> > def add_hostname(x):
>>>> >
>>>> > import socket
>>>> >
>>>> > return str(socket.gethostname())
>>>> >
>>>> > It occurred to me that I could use this to enforce node-locality for
>>>> other operations:
>>>> >
>>>> > df.withColumn('host', add_hostname(x)).groupBy('host').apply(...)
>>>> >
>>>> > When working on a big job without obvious partition keys, this seems
>>>> like a very straightforward way to avoid a shuffle, but it seems too easy.
>>>> >
>>>> > What problems would I introduce by trying to partition on hostname
>>>> like this?
>>>>
>>>> -
>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>>
>>>>
>>>
>>
>


Re: Pitfalls of partitioning by host?

2018-08-28 Thread Patrick McCarthy
I'm not 100% sure, but a naive repartition() seems to cause a shuffle. If
this is actually happening, it's just wasteful overhead.

On Tue, Aug 28, 2018 at 1:03 PM, Sonal Goyal  wrote:

> Hi Patrick,
>
> Sorry is there something here that helps you beyond repartition(number of
> partitons) or calling your udf on foreachPartition? If your data is on
> disk, Spark is already partitioning it for you by rows. How is adding the
> host info helping?
>
> Thanks,
> Sonal
> Nube Technologies <http://www.nubetech.co>
>
> <http://in.linkedin.com/in/sonalgoyal>
>
>
>
> On Tue, Aug 28, 2018 at 8:29 PM, Patrick McCarthy <
> pmccar...@dstillery.com.invalid> wrote:
>
>> Mostly I'm guessing that it adds efficiency to a job where partitioning
>> is required but shuffling is not.
>>
>> For example, if I want to apply a UDF to 1tb of records on disk, I might
>> need to repartition(5) to get the task size down to an acceptable size
>> for my cluster. If I don't care that it's totally balanced, then I'd hope
>> that I could save a lot of overhead with
>>
>> foo = df.withColumn('randkey',F.floor(1000*F.rand())).repartition(5000,
>> 'randkey','host').apply(udf)
>>
>>
>> On Mon, Aug 27, 2018 at 10:11 PM, Michael Artz 
>> wrote:
>>
>>> Well if we think of shuffling as a necessity to perform an operation,
>>> then the problem would be that you are adding a ln aggregation stage to a
>>> job that is going to get shuffled anyway.  Like if you need to join two
>>> datasets, then Spark will still shuffle the data, whether they are grouped
>>> by hostname prior to that or not.  My question is, is there anything else
>>> that you would expect to gain, except for enforcing maybe a dataset that is
>>> already bucketed? Like you could enforce that data is where it is supposed
>>> to be, but what else would you avoid?
>>>
>>> Sent from my iPhone
>>>
>>> > On Aug 27, 2018, at 12:22 PM, Patrick McCarthy <
>>> pmccar...@dstillery.com.INVALID> wrote:
>>> >
>>> > When debugging some behavior on my YARN cluster I wrote the following
>>> PySpark UDF to figure out what host was operating on what row of data:
>>> >
>>> > @F.udf(T.StringType())
>>> > def add_hostname(x):
>>> >
>>> > import socket
>>> >
>>> > return str(socket.gethostname())
>>> >
>>> > It occurred to me that I could use this to enforce node-locality for
>>> other operations:
>>> >
>>> > df.withColumn('host', add_hostname(x)).groupBy('host').apply(...)
>>> >
>>> > When working on a big job without obvious partition keys, this seems
>>> like a very straightforward way to avoid a shuffle, but it seems too easy.
>>> >
>>> > What problems would I introduce by trying to partition on hostname
>>> like this?
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>>
>


Re: Pitfalls of partitioning by host?

2018-08-28 Thread Patrick McCarthy
Mostly I'm guessing that it adds efficiency to a job where partitioning is
required but shuffling is not.

For example, if I want to apply a UDF to 1tb of records on disk, I might
need to repartition(5) to get the task size down to an acceptable size
for my cluster. If I don't care that it's totally balanced, then I'd hope
that I could save a lot of overhead with

foo = df.withColumn('randkey',F.floor(1000*F.rand())).repartition(5000,
'randkey','host').apply(udf)


On Mon, Aug 27, 2018 at 10:11 PM, Michael Artz 
wrote:

> Well if we think of shuffling as a necessity to perform an operation, then
> the problem would be that you are adding a ln aggregation stage to a job
> that is going to get shuffled anyway.  Like if you need to join two
> datasets, then Spark will still shuffle the data, whether they are grouped
> by hostname prior to that or not.  My question is, is there anything else
> that you would expect to gain, except for enforcing maybe a dataset that is
> already bucketed? Like you could enforce that data is where it is supposed
> to be, but what else would you avoid?
>
> Sent from my iPhone
>
> > On Aug 27, 2018, at 12:22 PM, Patrick McCarthy 
> > 
> wrote:
> >
> > When debugging some behavior on my YARN cluster I wrote the following
> PySpark UDF to figure out what host was operating on what row of data:
> >
> > @F.udf(T.StringType())
> > def add_hostname(x):
> >
> > import socket
> >
> > return str(socket.gethostname())
> >
> > It occurred to me that I could use this to enforce node-locality for
> other operations:
> >
> > df.withColumn('host', add_hostname(x)).groupBy('host').apply(...)
> >
> > When working on a big job without obvious partition keys, this seems
> like a very straightforward way to avoid a shuffle, but it seems too easy.
> >
> > What problems would I introduce by trying to partition on hostname like
> this?
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Pitfalls of partitioning by host?

2018-08-28 Thread Patrick McCarthy
Mostly I'm guessing that it adds efficiency to a job where partitioning is
required but shuffling is not.

For example, if I want to apply a UDF to 1tb of records on disk, I might
need to repartition(5) to get the task size down to an acceptable size
for my cluster. If I don't care that it's totally balanced, then I'd hope
that I could save a lot of overhead with

foo = df.withColumn('randkey',F.floor(1000*F.rand())).repartition(5000,
'randkey','host').apply(udf)


On Mon, Aug 27, 2018 at 10:11 PM, Michael Artz 
wrote:

> Well if we think of shuffling as a necessity to perform an operation, then
> the problem would be that you are adding a ln aggregation stage to a job
> that is going to get shuffled anyway.  Like if you need to join two
> datasets, then Spark will still shuffle the data, whether they are grouped
> by hostname prior to that or not.  My question is, is there anything else
> that you would expect to gain, except for enforcing maybe a dataset that is
> already bucketed? Like you could enforce that data is where it is supposed
> to be, but what else would you avoid?
>
> Sent from my iPhone
>
> > On Aug 27, 2018, at 12:22 PM, Patrick McCarthy 
> > 
> wrote:
> >
> > When debugging some behavior on my YARN cluster I wrote the following
> PySpark UDF to figure out what host was operating on what row of data:
> >
> > @F.udf(T.StringType())
> > def add_hostname(x):
> >
> > import socket
> >
> > return str(socket.gethostname())
> >
> > It occurred to me that I could use this to enforce node-locality for
> other operations:
> >
> > df.withColumn('host', add_hostname(x)).groupBy('host').apply(...)
> >
> > When working on a big job without obvious partition keys, this seems
> like a very straightforward way to avoid a shuffle, but it seems too easy.
> >
> > What problems would I introduce by trying to partition on hostname like
> this?
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Pitfalls of partitioning by host?

2018-08-28 Thread Patrick McCarthy
Mostly I'm guessing that it adds efficiency to a job where partitioning is
required but shuffling is not.

For example, if I want to apply a UDF to 1tb of records on disk, I might
need to repartition(5) to get the task size down to an acceptable size
for my cluster. If I don't care that it's totally balanced, then I'd hope
that I could save a lot of overhead with

foo = df.withColumn('randkey',F.floor(1000*F.rand())).repartition(5000,
'randkey','host').apply(udf)


On Mon, Aug 27, 2018 at 10:11 PM, Michael Artz 
wrote:

> Well if we think of shuffling as a necessity to perform an operation, then
> the problem would be that you are adding a ln aggregation stage to a job
> that is going to get shuffled anyway.  Like if you need to join two
> datasets, then Spark will still shuffle the data, whether they are grouped
> by hostname prior to that or not.  My question is, is there anything else
> that you would expect to gain, except for enforcing maybe a dataset that is
> already bucketed? Like you could enforce that data is where it is supposed
> to be, but what else would you avoid?
>
> Sent from my iPhone
>
> > On Aug 27, 2018, at 12:22 PM, Patrick McCarthy 
> > 
> wrote:
> >
> > When debugging some behavior on my YARN cluster I wrote the following
> PySpark UDF to figure out what host was operating on what row of data:
> >
> > @F.udf(T.StringType())
> > def add_hostname(x):
> >
> > import socket
> >
> > return str(socket.gethostname())
> >
> > It occurred to me that I could use this to enforce node-locality for
> other operations:
> >
> > df.withColumn('host', add_hostname(x)).groupBy('host').apply(...)
> >
> > When working on a big job without obvious partition keys, this seems
> like a very straightforward way to avoid a shuffle, but it seems too easy.
> >
> > What problems would I introduce by trying to partition on hostname like
> this?
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Pitfalls of partitioning by host?

2018-08-28 Thread Patrick McCarthy
Mostly I'm guessing that it adds efficiency to a job where partitioning is
required but shuffling is not.

For example, if I want to apply a UDF to 1tb of records on disk, I might
need to repartition(5) to get the task size down to an acceptable size
for my cluster. If I don't care that it's totally balanced, then I'd hope
that I could save a lot of overhead with

foo = df.withColumn('randkey',F.floor(1000*F.rand())).repartition(5000,
'randkey','host').apply(udf)


On Mon, Aug 27, 2018 at 10:11 PM, Michael Artz 
wrote:

> Well if we think of shuffling as a necessity to perform an operation, then
> the problem would be that you are adding a ln aggregation stage to a job
> that is going to get shuffled anyway.  Like if you need to join two
> datasets, then Spark will still shuffle the data, whether they are grouped
> by hostname prior to that or not.  My question is, is there anything else
> that you would expect to gain, except for enforcing maybe a dataset that is
> already bucketed? Like you could enforce that data is where it is supposed
> to be, but what else would you avoid?
>
> Sent from my iPhone
>
> > On Aug 27, 2018, at 12:22 PM, Patrick McCarthy 
> > 
> wrote:
> >
> > When debugging some behavior on my YARN cluster I wrote the following
> PySpark UDF to figure out what host was operating on what row of data:
> >
> > @F.udf(T.StringType())
> > def add_hostname(x):
> >
> > import socket
> >
> > return str(socket.gethostname())
> >
> > It occurred to me that I could use this to enforce node-locality for
> other operations:
> >
> > df.withColumn('host', add_hostname(x)).groupBy('host').apply(...)
> >
> > When working on a big job without obvious partition keys, this seems
> like a very straightforward way to avoid a shuffle, but it seems too easy.
> >
> > What problems would I introduce by trying to partition on hostname like
> this?
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Pitfalls of partitioning by host?

2018-08-28 Thread Patrick McCarthy
Mostly I'm guessing that it adds efficiency to a job where partitioning is
required but shuffling is not.

For example, if I want to apply a UDF to 1tb of records on disk, I might
need to repartition(5) to get the task size down to an acceptable size
for my cluster. If I don't care that it's totally balanced, then I'd hope
that I could save a lot of overhead with

foo = df.withColumn('randkey',F.floor(1000*F.rand())).repartition(5000,
'randkey','host').apply(udf)


On Tue, Aug 28, 2018 at 10:28 AM, Patrick McCarthy 
wrote:

> Mostly I'm guessing that it adds efficiency to a job where partitioning is
> required but shuffling is not.
>
> For example, if I want to apply a UDF to 1tb of records on disk, I might
> need to repartition(5) to get the task size down to an acceptable size
> for my cluster. If I don't care that it's totally balanced, then I'd hope
> that I could save a lot of overhead with
>
> foo = df.withColumn('randkey',F.floor(1000*F.rand())).repartition(5000,
> 'randkey','host').apply(udf)
>
> On Mon, Aug 27, 2018 at 10:11 PM, Michael Artz 
> wrote:
>
>> Well if we think of shuffling as a necessity to perform an operation,
>> then the problem would be that you are adding a ln aggregation stage to a
>> job that is going to get shuffled anyway.  Like if you need to join two
>> datasets, then Spark will still shuffle the data, whether they are grouped
>> by hostname prior to that or not.  My question is, is there anything else
>> that you would expect to gain, except for enforcing maybe a dataset that is
>> already bucketed? Like you could enforce that data is where it is supposed
>> to be, but what else would you avoid?
>>
>> Sent from my iPhone
>>
>> > On Aug 27, 2018, at 12:22 PM, Patrick McCarthy > .INVALID> wrote:
>> >
>> > When debugging some behavior on my YARN cluster I wrote the following
>> PySpark UDF to figure out what host was operating on what row of data:
>> >
>> > @F.udf(T.StringType())
>> > def add_hostname(x):
>> >
>> > import socket
>> >
>> > return str(socket.gethostname())
>> >
>> > It occurred to me that I could use this to enforce node-locality for
>> other operations:
>> >
>> > df.withColumn('host', add_hostname(x)).groupBy('host').apply(...)
>> >
>> > When working on a big job without obvious partition keys, this seems
>> like a very straightforward way to avoid a shuffle, but it seems too easy.
>> >
>> > What problems would I introduce by trying to partition on hostname like
>> this?
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>


Re: Pitfalls of partitioning by host?

2018-08-28 Thread Patrick McCarthy
Mostly I'm guessing that it adds efficiency to a job where partitioning is
required but shuffling is not.

For example, if I want to apply a UDF to 1tb of records on disk, I might
need to repartition(5) to get the task size down to an acceptable size
for my cluster. If I don't care that it's totally balanced, then I'd hope
that I could save a lot of overhead with

foo = df.withColumn('randkey',F.floor(1000*F.rand())).repartition(5000,
'randkey','host').apply(udf)

On Mon, Aug 27, 2018 at 10:11 PM, Michael Artz 
wrote:

> Well if we think of shuffling as a necessity to perform an operation, then
> the problem would be that you are adding a ln aggregation stage to a job
> that is going to get shuffled anyway.  Like if you need to join two
> datasets, then Spark will still shuffle the data, whether they are grouped
> by hostname prior to that or not.  My question is, is there anything else
> that you would expect to gain, except for enforcing maybe a dataset that is
> already bucketed? Like you could enforce that data is where it is supposed
> to be, but what else would you avoid?
>
> Sent from my iPhone
>
> > On Aug 27, 2018, at 12:22 PM, Patrick McCarthy 
> > 
> wrote:
> >
> > When debugging some behavior on my YARN cluster I wrote the following
> PySpark UDF to figure out what host was operating on what row of data:
> >
> > @F.udf(T.StringType())
> > def add_hostname(x):
> >
> > import socket
> >
> > return str(socket.gethostname())
> >
> > It occurred to me that I could use this to enforce node-locality for
> other operations:
> >
> > df.withColumn('host', add_hostname(x)).groupBy('host').apply(...)
> >
> > When working on a big job without obvious partition keys, this seems
> like a very straightforward way to avoid a shuffle, but it seems too easy.
> >
> > What problems would I introduce by trying to partition on hostname like
> this?
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Pitfalls of partitioning by host?

2018-08-27 Thread Patrick McCarthy
When debugging some behavior on my YARN cluster I wrote the following
PySpark UDF to figure out what host was operating on what row of data:

@F.udf(T.StringType())
def add_hostname(x):

import socket

return str(socket.gethostname())

It occurred to me that I could use this to enforce node-locality for other
operations:

df.withColumn('host', add_hostname(x)).groupBy('host').apply(...)

When working on a big job without obvious partition keys, this seems like a
very straightforward way to avoid a shuffle, but it seems too easy.

What problems would I introduce by trying to partition on hostname like
this?


Re: How to merge multiple rows

2018-08-22 Thread Patrick McCarthy
You didn't specify which API, but in pyspark you could do

import pyspark.sql.functions as F

df.groupBy('ID').agg(F.sort_array(F.collect_set('DETAILS')).alias('DETAILS')).show()

+---++
| ID| DETAILS|
+---++
|  1|[A1, A2, A3]|
|  3|[B2]|
|  2|[B1]|
+---++


If you want to sort by PART I think you'll need a UDF.

On Wed, Aug 22, 2018 at 4:12 PM, Jean Georges Perrin  wrote:

> How do you do it now?
>
> You could use a withColumn(“newDetails”,  details_2...>)
>
> jg
>
>
> > On Aug 22, 2018, at 16:04, msbreuer  wrote:
> >
> > A dataframe with following contents is given:
> >
> > ID PART DETAILS
> > 11 A1
> > 12 A2
> > 13 A3
> > 21 B1
> > 31 C1
> >
> > Target format should be as following:
> >
> > ID DETAILS
> > 1 A1+A2+A3
> > 2 B1
> > 3 C1
> >
> > Note, the order of A1-3 is important.
> >
> > Currently I am using this alternative:
> >
> > ID DETAIL_1 DETAIL_2 DETAIL_3
> > 1 A1   A2   A3
> > 2 B1
> > 3 C1
> >
> > What would be the best method to do such transformation an a large
> dataset?
> >
> >
> >
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Arrow type issue with Pandas UDF

2018-07-24 Thread Patrick McCarthy
Thanks Byran. I think it was ultimately groupings that were too large -
after setting spark.sql.shuffle.partitions to a much higher number I was
able to get the UDF to execute.

On Fri, Jul 20, 2018 at 12:45 AM, Bryan Cutler  wrote:

> Hi Patrick,
>
> It looks like it's failing in Scala before it even gets to Python to
> execute your udf, which is why it doesn't seem to matter what's in your
> udf. Since you are doing a grouped map udf maybe your group sizes are too
> big or skewed? Could you try to reduce the size of your groups by adding
> more keys or sampling a fraction of the data? If the problem persists could
> you make a jira? At the very least a better exception would be nice.
>
> Bryan
>
> On Thu, Jul 19, 2018, 7:07 AM Patrick McCarthy 
> 
> wrote:
>
>> PySpark 2.3.1 on YARN, Python 3.6, PyArrow 0.8.
>>
>> I'm trying to run a pandas UDF, but I seem to get nonsensical exceptions
>> in the last stage of the job regardless of my output type.
>>
>>
>> The problem I'm trying to solve:
>> I have a column of scalar values, and each value on the same row has a
>> sorted vector. I'm trying to replace each scalar value with its closest
>> index from its vector. I'm applying the grouping arbitrarily and performing
>> a python operation row-wise because even when the same vector appears on
>> many rows it's not clear how I would get the lookup to scale.
>>
>> My input data, the product of a join of hive tables, has the following
>> schema:
>>
>> root
>>  |-- scalar_value: float (nullable = true)
>>  |-- quantilelist: array (nullable = true)
>>  ||-- element: double (containsNull = true)
>>
>>
>> My UDF is at bottom. I'm using a GROUPED_MAP UDF because I want to
>> perform an operation on two columns, and because I want to take advantage
>> of Arrow to avoid serialization.
>>
>> The schema my UDF returns is this:
>>
>> pos_schema = T.StructType([
>> T.StructField('feature_value',T.FloatType(),True),
>> T.StructField('error',T.StringType())
>> ])
>>
>> ...however when I try to apply my UDF, either with saveAsTable or show(),
>> I get the following exception:
>>
>> org.apache.arrow.vector.util.OversizedAllocationException: Unable to
>> expand the buffer
>> at org.apache.arrow.vector.BaseFixedWidthVector.
>> reallocBufferHelper(BaseFixedWidthVector.java:447)
>> at org.apache.arrow.vector.BaseFixedWidthVector.reAlloc(
>> BaseFixedWidthVector.java:426)
>> at org.apache.arrow.vector.BaseFixedWidthVector.handleSafe(
>> BaseFixedWidthVector.java:838)
>> at org.apache.arrow.vector.Float8Vector.setSafe(
>> Float8Vector.java:221)
>> at org.apache.spark.sql.execution.arrow.DoubleWriter.
>> setValue(ArrowWriter.scala:223)
>> at org.apache.spark.sql.execution.arrow.ArrowFieldWriter.write(
>> ArrowWriter.scala:122)
>> at org.apache.spark.sql.execution.arrow.ArrayWriter.
>> setValue(ArrowWriter.scala:308)
>> at org.apache.spark.sql.execution.arrow.ArrowFieldWriter.write(
>> ArrowWriter.scala:122)
>> at org.apache.spark.sql.execution.arrow.ArrowWriter.
>> write(ArrowWriter.scala:87)
>> at org.apache.spark.sql.execution.python.
>> ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply$mcV$sp(
>> ArrowPythonRunner.scala:84)
>> at org.apache.spark.sql.execution.python.
>> ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$
>> 1.apply(ArrowPythonRunner.scala:75)
>> at org.apache.spark.sql.execution.python.
>> ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$
>> 1.apply(ArrowPythonRunner.scala:75)
>> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.
>> scala:1380)
>> at org.apache.spark.sql.execution.python.
>> ArrowPythonRunner$$anon$2.writeIteratorToStream(
>> ArrowPythonRunner.scala:95)
>> at org.apache.spark.api.python.BasePythonRunner$WriterThread$
>> $anonfun$run$1.apply(PythonRunner.scala:215)
>> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.
>> scala:1991)
>> at org.apache.spark.api.python.BasePythonRunner$WriterThread.
>> run(PythonRunner.scala:170)
>>
>> I assumed it was the result of some bad typing on my part, until I did a
>> test with a degenerate UDF that only returns a column of 1:
>>
>> @F.pandas_udf(T.StructType([T.StructField('feature_value',T.IntegerType(),True)]),
>>
>> F.PandasUDFType.GROUPED_MAP)
>>
>> def groupedPercentileInt(df):
>>
>>

Arrow type issue with Pandas UDF

2018-07-19 Thread Patrick McCarthy
PySpark 2.3.1 on YARN, Python 3.6, PyArrow 0.8.

I'm trying to run a pandas UDF, but I seem to get nonsensical exceptions in
the last stage of the job regardless of my output type.


The problem I'm trying to solve:
I have a column of scalar values, and each value on the same row has a
sorted vector. I'm trying to replace each scalar value with its closest
index from its vector. I'm applying the grouping arbitrarily and performing
a python operation row-wise because even when the same vector appears on
many rows it's not clear how I would get the lookup to scale.

My input data, the product of a join of hive tables, has the following
schema:

root
 |-- scalar_value: float (nullable = true)
 |-- quantilelist: array (nullable = true)
 ||-- element: double (containsNull = true)


My UDF is at bottom. I'm using a GROUPED_MAP UDF because I want to perform
an operation on two columns, and because I want to take advantage of Arrow
to avoid serialization.

The schema my UDF returns is this:

pos_schema = T.StructType([
T.StructField('feature_value',T.FloatType(),True),
T.StructField('error',T.StringType())
])

...however when I try to apply my UDF, either with saveAsTable or show(), I
get the following exception:

org.apache.arrow.vector.util.OversizedAllocationException: Unable to expand
the buffer
at
org.apache.arrow.vector.BaseFixedWidthVector.reallocBufferHelper(BaseFixedWidthVector.java:447)
at
org.apache.arrow.vector.BaseFixedWidthVector.reAlloc(BaseFixedWidthVector.java:426)
at
org.apache.arrow.vector.BaseFixedWidthVector.handleSafe(BaseFixedWidthVector.java:838)
at
org.apache.arrow.vector.Float8Vector.setSafe(Float8Vector.java:221)
at
org.apache.spark.sql.execution.arrow.DoubleWriter.setValue(ArrowWriter.scala:223)
at
org.apache.spark.sql.execution.arrow.ArrowFieldWriter.write(ArrowWriter.scala:122)
at
org.apache.spark.sql.execution.arrow.ArrayWriter.setValue(ArrowWriter.scala:308)
at
org.apache.spark.sql.execution.arrow.ArrowFieldWriter.write(ArrowWriter.scala:122)
at
org.apache.spark.sql.execution.arrow.ArrowWriter.write(ArrowWriter.scala:87)
at
org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply$mcV$sp(ArrowPythonRunner.scala:84)
at
org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply(ArrowPythonRunner.scala:75)
at
org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply(ArrowPythonRunner.scala:75)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1380)
at
org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2.writeIteratorToStream(ArrowPythonRunner.scala:95)
at
org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:215)
at
org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1991)
at
org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:170)

I assumed it was the result of some bad typing on my part, until I did a
test with a degenerate UDF that only returns a column of 1:

@F.pandas_udf(T.StructType([T.StructField('feature_value',T.IntegerType(),True)]),

F.PandasUDFType.GROUPED_MAP)

def groupedPercentileInt(df):

return
pd.DataFrame({'feature_value':[int(1)]*df.shape[0]}).reset_index(drop=True)


This clearly only has one return value of type int, yet I get the same
exception:

org.apache.arrow.vector.util.OversizedAllocationException: Unable to expand
the buffer
at
org.apache.arrow.vector.BaseFixedWidthVector.reallocBufferHelper(BaseFixedWidthVector.java:447)
at
org.apache.arrow.vector.BaseFixedWidthVector.reAlloc(BaseFixedWidthVector.java:426)
at
org.apache.arrow.vector.BaseFixedWidthVector.handleSafe(BaseFixedWidthVector.java:838)
at
org.apache.arrow.vector.Float8Vector.setSafe(Float8Vector.java:221)
at
org.apache.spark.sql.execution.arrow.DoubleWriter.setValue(ArrowWriter.scala:223)
at
org.apache.spark.sql.execution.arrow.ArrowFieldWriter.write(ArrowWriter.scala:122)
at
org.apache.spark.sql.execution.arrow.ArrayWriter.setValue(ArrowWriter.scala:308)
at
org.apache.spark.sql.execution.arrow.ArrowFieldWriter.write(ArrowWriter.scala:122)
at
org.apache.spark.sql.execution.arrow.ArrowWriter.write(ArrowWriter.scala:87)
at
org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply$mcV$sp(ArrowPythonRunner.scala:84)
at
org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply(ArrowPythonRunner.scala:75)
at
org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply(ArrowPythonRunner.scala:75)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1380)
at

Re: DataTypes of an ArrayType

2018-07-11 Thread Patrick McCarthy
Arrays need to be a single type, I think you're looking for a Struct
column. See:
https://medium.com/@mrpowers/adding-structtype-columns-to-spark-dataframes-b44125409803

On Wed, Jul 11, 2018 at 6:37 AM, dimitris plakas 
wrote:

> Hello everyone,
>
> I am new to Pyspark and i would like to ask if there is any way to have a
> Dataframe column which is ArrayType and have a different DataType for each
> elemnt of the ArrayType. For example
> to have something like :
>
> StructType([StructField("Column_Name", ArrayType(ArrayType(FloatType(),
> FloatType(), DecimalType(), False),False), False)]).
>
> I want to have an ArrayType column with 2 elements as FloatType and 1
> element as DecimalType
>
> Thank you in advance
>


Re: Building SparkML vectors from long data

2018-07-03 Thread Patrick McCarthy
I'm still validating my results, but my solution for the moment looks like
the below. I'm presently dealing with one-hot encoded values, so all the
numbers in my array are 1:

def udfMaker(feature_len):

return F.udf(lambda x: SparseVector(feature_len, sorted(x),
[1.0]*len(x)), VectorUDT())

indexer =
StringIndexer(inputCol='contentStrings',outputCol='indexedContent).fit(source_df)

makeVec = udfMaker(len(indexer.labels))

indexed_data = indexer.transform(source_df)

sparse_content = (indexed_data.groupBy('ID').
.agg(F.collect_set('indexedContent').alias('contentIdx'))
.withColumn('content', makeVec(F.col('contentIdx')))
.drop('contentIdx')
)

On Tue, Jun 12, 2018 at 3:59 PM, Nathan Kronenfeld <
nkronenfeld@uncharted.software> wrote:

> I don't know if this is the best way or not, but:
>
> val indexer = new StringIndexer().setInputCol("vr").setOutputCol("vrIdx")
> val indexModel = indexer.fit(data)
> val indexedData = indexModel.transform(data)
> val variables = indexModel.labels.length
>
> val toSeq = udf((a: Double, b: Double) => Seq(a, b))
> val toVector = udf((seq: Seq[Seq[Double]]) => {
>   new SparseVector(variables, seq.map(_(0).toInt).toArray, 
> seq.map(_(1)).toArray)
> })
> val result = indexedData
>   .withColumn("val", toSeq(col("vrIdx"), col("value")))
>   .groupBy("ID")
>   .agg(collect_set(col("val")).name("collected_val"))
>   .withColumn("collected_val", 
> toVector(col("collected_val")).as[Row](Encoders.javaSerialization(classOf[Row])))
>
>
> at least works.  The indices still aren't in order in the vector - I don't
> know if this matters much, but if it does, it's easy enough to sort them in
> toVector (and to remove duplicates)
>
>
> On Tue, Jun 12, 2018 at 2:24 PM, Patrick McCarthy  > wrote:
>
>> I work with a lot of data in a long format, cases in which an ID column
>> is repeated, followed by a variable and a value column like so:
>>
>> +---+-+---+
>> |ID | var | value |
>> +---+-+---+
>> | A | v1  | 1.0   |
>> | A | v2  | 2.0   |
>> | B | v1  | 1.5   |
>> | B | v3  | -1.0  |
>> +---+-+---+
>>
>> It seems to me that Spark doesn't provide any clear native way to
>> transform data of this format into a Vector() or VectorUDT() type suitable
>> for machine learning algorithms.
>>
>> The best solution I've found so far (which isn't very good) is to group
>> by ID, perform a collect_list, and then use a UDF to translate the
>> resulting array into a vector datatype.
>>
>> I can get kind of close like so:
>>
>> indexer = MF.StringIndexer(inputCol = 'var', outputCol = 'varIdx')
>>
>> (indexed_df
>> .withColumn('val',F.concat(F.col('varIdx').astype(T.IntegerType()).astype(T.StringType()),
>> F.lit(':'),F.col('value')))
>> .groupBy('ID')
>> .agg(F.collect_set('val'))
>> )
>>
>> But the resultant 'val' vector is out of index order, and still would
>> need to be parsed.
>>
>> What's the current preferred way to solve a problem like this?
>>
>
>


Building SparkML vectors from long data

2018-06-12 Thread Patrick McCarthy
I work with a lot of data in a long format, cases in which an ID column is
repeated, followed by a variable and a value column like so:

+---+-+---+
|ID | var | value |
+---+-+---+
| A | v1  | 1.0   |
| A | v2  | 2.0   |
| B | v1  | 1.5   |
| B | v3  | -1.0  |
+---+-+---+

It seems to me that Spark doesn't provide any clear native way to transform
data of this format into a Vector() or VectorUDT() type suitable for
machine learning algorithms.

The best solution I've found so far (which isn't very good) is to group by
ID, perform a collect_list, and then use a UDF to translate the resulting
array into a vector datatype.

I can get kind of close like so:

indexer = MF.StringIndexer(inputCol = 'var', outputCol = 'varIdx')

(indexed_df
.withColumn('val',F.concat(F.col('varIdx').astype(T.IntegerType()).astype(T.StringType()),
F.lit(':'),F.col('value')))
.groupBy('ID')
.agg(F.collect_set('val'))
)

But the resultant 'val' vector is out of index order, and still would need
to be parsed.

What's the current preferred way to solve a problem like this?


Poor performance reading Hive table made of sequence files

2018-05-01 Thread Patrick McCarthy
I recently ran a query with the following form:

select a.*, b.*
from some_small_table a
inner join
(
  select things from someother table
  lateral view explode(s) ss as sss
  where a_key is in (x,y,z)
) b
on a.key = b.key
where someothercriterion

On hive, this query took about five minutes. In Spark, using either the
same syntax in a spark.sql call or using the dataframe API, it appeared as
if it was going to take on the order of 10 hours. I didn't let it finish.

The data underlying the hive table are sequence files, ~30mb each, ~1000 to
a partition, and my query ran over only five partitions. A single partition
is about 25gb.

How can Spark perform so badly? Do I need to handle sequence files in a
special way?


Reservoir sampling in parallel

2018-02-23 Thread Patrick McCarthy
I have a large dataset composed of scores for several thousand segments,
and the timestamps at which time those scores occurred. I'd like to apply
some techniques like reservoir sampling[1], where for every segment I
process records in order of their timestamps, generate a sample, and then
at intervals compute the quantiles in the sample. Ideally I'd like to write
a pyspark udf to do the sampling/quantizing procedure.

It seems like something I should be doing via rdd.map, but it's not really
clear how I can enforce a function to process records in order within a
partition. Any pointers?

Thanks,
Patrick

[1] https://en.wikipedia.org/wiki/Reservoir_sampling


Re: Type Casting Error in Spark Data Frame

2018-01-29 Thread Patrick McCarthy
You can't select from an array like that, try instead using 'lateral view
explode' in the query for that element, or before the sql stage
(py)spark.sql.functions.explode.

On Mon, Jan 29, 2018 at 4:26 PM, Arnav kumar  wrote:

> Hello Experts,
>
> I would need your advice in resolving the below issue when I am trying to
> retrieving the data from a dataframe.
>
> Can you please let me know where I am going wrong.
>
> code :
>
>
> // create the dataframe by parsing the json
> // Message Helper describes the JSON Struct
> //data out is the json string received from Streaming Engine.
>
> val dataDF = sparkSession.createDataFrame(dataOut,
> MessageHelper.sqlMapping)
> dataDF.printSchema()
> /* -- out put of dataDF.printSchema
>
> root
>  |-- messageID: string (nullable = true)
>  |-- messageType: string (nullable = true)
>  |-- meta: array (nullable = true)
>  ||-- element: struct (containsNull = true)
>  |||-- messageParsedTimestamp: string (nullable = true)
>  |||-- ipaddress: string (nullable = true)
>  |-- messageData: array (nullable = true)
>  ||-- element: struct (containsNull = true)
>  |||-- packetID: string (nullable = true)
>  |||-- messageID: string (nullable = true)
>  |||-- unixTime: string (nullable = true)
>
>
>
> */
>
>
> dataDF.createOrReplaceTempView("message")
> val routeEventDF=sparkSession.sql("select messageId 
> ,messageData.unixTime,messageData.packetID,
> messageData.messageID from message")
> routeEventDF.show
>
>
> Error  on routeEventDF.show
> Caused by: java.lang.RuntimeException: 
> org.apache.spark.sql.catalyst.expressions.GenericRow
> is not a valid external type for schema of array messageParsedTimestamp:string,ipaddress:string,port:string,
> message:string
> at org.apache.spark.sql.catalyst.expressions.GeneratedClass$
> SpecificUnsafeProjection.evalIfFalseExpr14$(Unknown Source)
>
>
> Appreciate your help
>
> Best Regards
> Arnav Kumar.
>
>
>


Re: Spark vs Snowflake

2018-01-22 Thread Patrick McCarthy
Last I heard of them a year or two ago, they basically repackage AWS
services behind their own API/service layer for convenience. There's
probably a value-add if you're not familiar with optimizing AWS, but if you
already have that expertise I don't expect they would add much extra
performance if any.

On Mon, Jan 22, 2018 at 4:51 PM, Mich Talebzadeh 
wrote:

> Hi,
>
> Has anyone had experience of using Snowflake
>  which touts itself as data warehouse
> built for the cloud? In reviews
> one
> recommendation states
>
> "DEFINITELY AN ALTERNATIVE TO AMAZON RED SHIFT AND SPARK"
>
> Now I am not sure about inner workings of this product but I will be
> surprised if the product is actually faster than using Spark on HDFS.
>
> There is very little literature on this product except that it shouts "me
> too" among Amazon Redshift and Google BigQuery.
>
> anyone has used this product?
>
> thanks
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>


Re: StreamingLogisticRegressionWithSGD : Multiclass Classification : Options

2018-01-19 Thread Patrick McCarthy
Rather than use a fancy purpose-built class, I was thinking that you could
rather generate a series of label vectors, vector A is 1 when class a is
positive and 0 when any other is, vector B is 1 when class b is positive
and 0 when any other is, etc.

I don't know anything about streaming in particular so I don't know if this
introduces any lag or concurrency problems, but you could perform the
logistic regression on each of these label vectors independently using the
classifier algorithm of your choice and then, concatenating the predictions
into a dataframe, take a rowmax to do your multiclass evaluation.

On Fri, Jan 19, 2018 at 11:29 AM, Sundeep Kumar Mehta <sunnyjai...@gmail.com
> wrote:

> Thanks a lot Patrick, I do see a class OneVsRest classifier which only
> takes classifier instance of ml package and not mlib package, do you see
> any alternative for using OneVsRest with StreamingLogisticRegressionWithSGD
> ?
>
> Regards
> Sundeep
>
> On Thu, Jan 18, 2018 at 8:18 PM, Patrick McCarthy <pmccar...@dstillery.com
> > wrote:
>
>> As a hack, you could perform a number of 1 vs. all classifiers and then
>> post-hoc select among the highest prediction probability to assign class.
>>
>> On Thu, Jan 18, 2018 at 12:17 AM, Sundeep Kumar Mehta <
>> sunnyjai...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I was looking for Logistic Regression with Multi Class classifier on
>>> Streaming data do we have any alternative options or library/github prj.
>>>
>>> As StreamingLogisticRegressionWithSGD only supports binary
>>> classification
>>>
>>> Regards
>>> Sundeep
>>>
>>
>>
>


Re: StreamingLogisticRegressionWithSGD : Multiclass Classification : Options

2018-01-18 Thread Patrick McCarthy
As a hack, you could perform a number of 1 vs. all classifiers and then
post-hoc select among the highest prediction probability to assign class.

On Thu, Jan 18, 2018 at 12:17 AM, Sundeep Kumar Mehta  wrote:

> Hi,
>
> I was looking for Logistic Regression with Multi Class classifier on
> Streaming data do we have any alternative options or library/github prj.
>
> As StreamingLogisticRegressionWithSGD only supports binary classification
>
> Regards
> Sundeep
>


Re: PySpark - Expand rows into dataframes via function

2017-10-03 Thread Patrick McCarthy
Thanks Sathish.

Before you responded, I came up with this solution:

# A function to take in one row and return the expanded ranges:
def processRow(x):

...
return zip(list_of_ip_ranges, list_of_registry_ids)

# and then in spark,

processed_rdds = spark_df_of_input_data.rdd.flatMap(lambda x: processRow(x))

processed_df =
(processed_rdds.toDF().withColumnRenamed('_1','ip').withColumnRenamed('_2','registryid'))

And then after that I split and subset the IP column into what I wanted.

On Mon, Oct 2, 2017 at 7:52 PM, Sathish Kumaran Vairavelu <
vsathishkuma...@gmail.com> wrote:

> It's possible with array function combined with struct construct. Below is
> a SQL example
>
> select Array(struct(ip1,hashkey), struct(ip2,hashkey))
> from (select substr(col1,1,2) as ip1, substr(col1,3,3) as ip2, etc,
> hashkey from object) a
>
> If you want dynamic ip ranges; you need to dynamically construct structs
> based on the range values. Hope this helps.
>
>
> Thanks
>
> Sathish
>
> On Mon, Oct 2, 2017 at 9:01 AM Patrick McCarthy <pmccar...@dstillery.com>
> wrote:
>
>> Hello,
>>
>> I'm trying to map ARIN registry files into more explicit IP ranges. They
>> provide a number of IPs in the range (here it's 8192) and a starting IP,
>> and I'm trying to map it into all the included /24 subnets. For example,
>>
>> Input:
>>
>> array(['arin', 'US', 'ipv4', '23.239.160.0', 8192, 20131104.0, 'allocated',
>>
>>'ff26920a408f15613096aa7fe0ddaa57'], dtype=object)
>>
>>
>> Output:
>>
>> array([['23', '239', '160', 'ff26920a408f15613096aa7fe0ddaa57'],
>>['23', '239', '161', 'ff26920a408f15613096aa7fe0ddaa57'],
>>['23', '239', '162', 'ff26920a408f15613096aa7fe0ddaa57'],
>>
>> ...
>>
>>
>> I have the input lookup table in a pyspark DF, and a python function to do 
>> the conversion into the mapped output. I think to produce the full mapping I 
>> need a UDTF but this concept doesn't seem to exist in PySpark. What's the 
>> best approach to do this mapping and recombine into a new DataFrame?
>>
>>
>> Thanks,
>>
>> Patrick
>>
>>


PySpark - Expand rows into dataframes via function

2017-10-02 Thread Patrick McCarthy
Hello,

I'm trying to map ARIN registry files into more explicit IP ranges. They
provide a number of IPs in the range (here it's 8192) and a starting IP,
and I'm trying to map it into all the included /24 subnets. For example,

Input:

array(['arin', 'US', 'ipv4', '23.239.160.0', 8192, 20131104.0, 'allocated',

   'ff26920a408f15613096aa7fe0ddaa57'], dtype=object)


Output:

array([['23', '239', '160', 'ff26920a408f15613096aa7fe0ddaa57'],
   ['23', '239', '161', 'ff26920a408f15613096aa7fe0ddaa57'],
   ['23', '239', '162', 'ff26920a408f15613096aa7fe0ddaa57'],

...


I have the input lookup table in a pyspark DF, and a python function
to do the conversion into the mapped output. I think to produce the
full mapping I need a UDTF but this concept doesn't seem to exist in
PySpark. What's the best approach to do this mapping and recombine
into a new DataFrame?


Thanks,

Patrick


Re: Apache Spark: Parallelization of Multiple Machine Learning ALgorithm

2017-09-05 Thread Patrick McCarthy
You might benefit from watching this JIRA issue -
https://issues.apache.org/jira/browse/SPARK-19071

On Sun, Sep 3, 2017 at 5:50 PM, Timsina, Prem  wrote:

> Is there a way to parallelize multiple ML algorithms in Spark. My use case
> is something like this:
>
> A) Run multiple machine learning algorithm (Naive Bayes, ANN, Random
> Forest, etc.) in parallel.
>
> 1) Validate each algorithm using 10-fold cross-validation
>
> B) Feed the output of step A) in second layer machine learning algorithm.
>
> My question is:
>
> Can we run multiple machine learning algorithm in step A in parallel?
>
> Can we do cross-validation in parallel? Like, run 10 iterations of Naive
> Bayes training in parallel?
>
>
>
> I was not able to find any way to run the different algorithm in parallel.
> And it seems cross-validation also can not be done in parallel.
>
> I appreciate any suggestion to parallelize this use case.
>
>
>
> Prem
>


Re: Memory problems with simple ETL in Pyspark

2017-04-16 Thread Patrick McCarthy
The partitions helped!

I added repartition() and my function looks like this now:

feature_df = (alldat_idx
.withColumn('label',alldat_idx['label_val'].cast('double'))
.groupBy('id','label')

.agg(sort_array(collect_list('indexedSegs')).alias('collect_list_is'))
.repartition(1000)
.withColumn('num_feat',lit(feature_vec_len))

.withColumn('features',list_to_sparse_udf('collect_list_is','num_feat'))
.drop('collect_list_is')
.drop('num_feat'))

I got a few failed containers for memory overflow, but the job was able to
finish successfully. I tried upping the repartition as high as 4000 but a
few still failed.

For posterity's sake, where would I look for the footprint you have in
mind? On the executor tab?

Since the audience part of the task finished successfully and the failure
was on a df that didn't touch it, it shouldn't've made a difference.

Thank you!

On Sat, Apr 15, 2017 at 9:07 PM, ayan guha <guha.a...@gmail.com> wrote:

> What i missed is try increasing number of partitions using repartition
>
> On Sun, 16 Apr 2017 at 11:06 am, ayan guha <guha.a...@gmail.com> wrote:
>
>> It does not look like scala vs python thing. How big is your audience
>> data store? Can it be broadcasted?
>>
>> What is the memory footprint you are seeing? At what point yarn is
>> killing? Depeneding on that you may want to tweak around number of
>> partitions of input dataset and increase number of executors
>>
>> Ayan
>>
>>
>> On Sat, 15 Apr 2017 at 2:10 am, Patrick McCarthy <pmccar...@dstillery.com>
>> wrote:
>>
>>> Hello,
>>>
>>> I'm trying to build an ETL job which takes in 30-100gb of text data and
>>> prepares it for SparkML. I don't speak Scala so I've been trying to
>>> implement in PySpark on YARN, Spark 2.1.
>>>
>>> Despite the transformations being fairly simple, the job always fails by
>>> running out of executor memory.
>>>
>>> The input table is long (~6bn rows) but composed of three simple values:
>>>
>>> #
>>> all_data_long.printSchema()
>>>
>>> root
>>> |-- id: long (nullable = true)
>>> |-- label: short (nullable = true)
>>> |-- segment: string (nullable = true)
>>>
>>> #
>>>
>>> First I join it to a table of particular segments of interests and do an
>>> aggregation,
>>>
>>> #
>>>
>>> audiences.printSchema()
>>>
>>> root
>>>  |-- entry: integer (nullable = true)
>>>  |-- descr: string (nullable = true)
>>>
>>>
>>> print("Num in adl: {}".format(str(all_data_long.count(
>>>
>>> aud_str = audiences.select(audiences['entry'].cast('string'),
>>> audiences['descr'])
>>>
>>> alldata_aud = all_data_long.join(aud_str,
>>> all_data_long['segment']==aud_str['entry'],
>>> 'left_outer')
>>>
>>> str_idx  = StringIndexer(inputCol='segment',outputCol='indexedSegs')
>>>
>>> idx_df   = str_idx.fit(alldata_aud)
>>> label_df = idx_df.transform(alldata_aud).withColumnRenamed('label','
>>> label_val')
>>>
>>> id_seg = (label_df
>>> .filter(label_df.descr.isNotNull())
>>> .groupBy('id')
>>> .agg(collect_list('descr')))
>>>
>>> id_seg.write.saveAsTable("hive.id_seg")
>>>
>>> #
>>>
>>> Then, I use that StringIndexer again on the first data frame to
>>> featurize the segment ID
>>>
>>> #
>>>
>>> alldat_idx = idx_df.transform(all_data_long).withColumnRenamed('
>>> label','label_val')
>>>
>>> #
>>>
>>>
>>> My ultimate goal is to make a SparseVector, so I group the indexed
>>> segments by id and try to cast it into a vector
>>>
>>> #
>>>
>>> list_to_sparse_udf = udf(lambda l, maxlen: Vectors.sparse(maxlen, {v:1.0
>>> for v in l}),VectorUDT())
>>>
>>> alldat_idx.cache()
>>>
>>> feature_vec_len = (alldat_idx.select(max('indexedSegs')).first()

Memory problems with simple ETL in Pyspark

2017-04-14 Thread Patrick McCarthy
Hello,

I'm trying to build an ETL job which takes in 30-100gb of text data and
prepares it for SparkML. I don't speak Scala so I've been trying to
implement in PySpark on YARN, Spark 2.1.

Despite the transformations being fairly simple, the job always fails by
running out of executor memory.

The input table is long (~6bn rows) but composed of three simple values:

#
all_data_long.printSchema()

root
|-- id: long (nullable = true)
|-- label: short (nullable = true)
|-- segment: string (nullable = true)

#

First I join it to a table of particular segments of interests and do an
aggregation,

#

audiences.printSchema()

root
 |-- entry: integer (nullable = true)
 |-- descr: string (nullable = true)


print("Num in adl: {}".format(str(all_data_long.count(

aud_str = audiences.select(audiences['entry'].cast('string'),
audiences['descr'])

alldata_aud = all_data_long.join(aud_str,
all_data_long['segment']==aud_str['entry'],
'left_outer')

str_idx  = StringIndexer(inputCol='segment',outputCol='indexedSegs')

idx_df   = str_idx.fit(alldata_aud)
label_df =
idx_df.transform(alldata_aud).withColumnRenamed('label','label_val')

id_seg = (label_df
.filter(label_df.descr.isNotNull())
.groupBy('id')
.agg(collect_list('descr')))

id_seg.write.saveAsTable("hive.id_seg")

#

Then, I use that StringIndexer again on the first data frame to featurize
the segment ID

#

alldat_idx =
idx_df.transform(all_data_long).withColumnRenamed('label','label_val')

#


My ultimate goal is to make a SparseVector, so I group the indexed segments
by id and try to cast it into a vector

#

list_to_sparse_udf = udf(lambda l, maxlen: Vectors.sparse(maxlen, {v:1.0
for v in l}),VectorUDT())

alldat_idx.cache()

feature_vec_len = (alldat_idx.select(max('indexedSegs')).first()[0] + 1)

print("alldat_dix: {}".format(str(alldat_idx.count(

feature_df = (alldat_idx
.withColumn('label',alldat_idx['label_val'].cast('double'))
.groupBy('id','label')

.agg(sort_array(collect_list('indexedSegs')).alias('collect_list_is'))
.withColumn('num_feat',lit(feature_vec_len))

.withColumn('features',list_to_sparse_udf('collect_list_is','num_feat'))
.drop('collect_list_is')
.drop('num_feat'))

feature_df.cache()
print("Num in featuredf: {}".format(str(feature_df.count(  ## <-
failure occurs here

#

Here, however, I always run out of memory on the executors (I've twiddled
driver and executor memory to check) and YARN kills off my containers. I've
gone as high as —executor-memory 15g but it still doesn't help.

Given the number of segments is at most 50,000 I'm surprised that a
smallish row-wise operation is enough to blow up the process.


Is it really the UDF that's killing me? Do I have to rewrite it in Scala?





Query plans for the failing stage:

#


== Parsed Logical Plan ==
Aggregate [count(1) AS count#265L]
+- Project [id#0L, label#183, features#208]
   +- Project [id#0L, label#183, num_feat#202, features#208]
  +- Project [id#0L, label#183, collect_list_is#197, num_feat#202,
(collect_list_is#197, num_feat#202) AS features#208]
 +- Project [id#0L, label#183, collect_list_is#197, 56845.0 AS
num_feat#202]
+- Aggregate [id#0L, label#183], [id#0L, label#183,
sort_array(collect_list(indexedSegs#93, 0, 0), true) AS collect_list_is#197]
   +- Project [id#0L, label_val#99, segment#2, indexedSegs#93,
cast(label_val#99 as double) AS label#183]
  +- Project [id#0L, label#1 AS label_val#99, segment#2,
indexedSegs#93]
 +- Project [id#0L, label#1, segment#2,
UDF(cast(segment#2 as string)) AS indexedSegs#93]
+- MetastoreRelation pmccarthy, all_data_long

== Analyzed Logical Plan ==
count: bigint
Aggregate [count(1) AS count#265L]
+- Project [id#0L, label#183, features#208]
   +- Project [id#0L, label#183, num_feat#202, features#208]
  +- Project [id#0L, label#183, collect_list_is#197, num_feat#202,
(collect_list_is#197, num_feat#202) AS features#208]
 +- Project [id#0L, label#183, collect_list_is#197, 56845.0 AS
num_feat#202]
+- Aggregate [id#0L, label#183], [id#0L, label#183,
sort_array(collect_list(indexedSegs#93, 0, 0), true) AS collect_list_is#197]
   +- Project [id#0L, label_val#99, segment#2, indexedSegs#93,
cast(label_val#99 as 

Re: Twitter4J streaming question

2015-07-23 Thread Patrick McCarthy
How can I tell if it's the sample stream or full stream ?
Thanks

Sent from my iPhone

On Jul 23, 2015, at 4:17 PM, Enno Shioji 
eshi...@gmail.commailto:eshi...@gmail.com wrote:

You are probably listening to the sample stream, and THEN filtering. This means 
you listen to 1% of the twitter stream, and then looking for the tweet by 
Bloomberg, so there is a very good chance you don't see the particular tweet.

In order to get all Bloomberg related tweets, you must connect to twitter using 
the filter API and not the sample API: 
https://dev.twitter.com/streaming/reference/post/statuses/filter

On Thu, Jul 23, 2015 at 8:23 PM, pjmccarthy 
pmccar...@eatonvance.commailto:pmccar...@eatonvance.com wrote:
Hopefully this is an easy one.  I am trying to filter a twitter dstream by
user ScreenName - my code is as follows
val stream = TwitterUtils.createStream(ssc, None)
.filter(_.getUser.getScreenName.contains(markets))

however nothing gets returned and I can see that Bloomberg has tweeted.  If
I remove the filter I get tweets
If I change the code to looke for engligh or french tweets that works

Is there a better way to do it ?

Can anyone assist ?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Twitter4J-streaming-question-tp23974.html
Sent from the Apache Spark User List mailing list archive at 
Nabble.comhttp://Nabble.com.

-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.orgmailto:user-h...@spark.apache.org




Re: Twitter4J streaming question

2015-07-23 Thread Patrick McCarthy
Ahh
Makes sense - thanks for the help

Sent from my iPhone

On Jul 23, 2015, at 4:29 PM, Enno Shioji 
eshi...@gmail.commailto:eshi...@gmail.com wrote:

You need to pay a lot of money to get the full stream, so unless you are doing 
that, it's the sample stream!

On Thu, Jul 23, 2015 at 9:26 PM, Patrick McCarthy 
pmccar...@eatonvance.commailto:pmccar...@eatonvance.com wrote:
How can I tell if it's the sample stream or full stream ?
Thanks

Sent from my iPhone

On Jul 23, 2015, at 4:17 PM, Enno Shioji 
eshi...@gmail.commailto:eshi...@gmail.com wrote:

You are probably listening to the sample stream, and THEN filtering. This means 
you listen to 1% of the twitter stream, and then looking for the tweet by 
Bloomberg, so there is a very good chance you don't see the particular tweet.

In order to get all Bloomberg related tweets, you must connect to twitter using 
the filter API and not the sample API: 
https://dev.twitter.com/streaming/reference/post/statuses/filter

On Thu, Jul 23, 2015 at 8:23 PM, pjmccarthy 
pmccar...@eatonvance.commailto:pmccar...@eatonvance.com wrote:
Hopefully this is an easy one.  I am trying to filter a twitter dstream by
user ScreenName - my code is as follows
val stream = TwitterUtils.createStream(ssc, None)
.filter(_.getUser.getScreenName.contains(markets))

however nothing gets returned and I can see that Bloomberg has tweeted.  If
I remove the filter I get tweets
If I change the code to looke for engligh or french tweets that works

Is there a better way to do it ?

Can anyone assist ?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Twitter4J-streaming-question-tp23974.html
Sent from the Apache Spark User List mailing list archive at 
Nabble.comhttp://Nabble.com.

-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.orgmailto:user-h...@spark.apache.org