Re: Guys is this some form of Spam or someone has left his auto-reply loose LOL

2016-07-28 Thread Pedro Rodriguez
Same here, but maybe this is a really urgent matter we need to contact him
about... or just make a filter

On Thu, Jul 28, 2016 at 7:59 AM, Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

>
> -- Forwarded message --
> From: Geert Van Landeghem [Napoleon Games NV] <
> g.vanlandeg...@napoleongames.be>
> Date: 28 July 2016 at 14:38
> Subject: Re: Re: Is spark-1.6.1-bin-2.6.0 compatible with
> hive-1.1.0-cdh5.7.1
> To: Mich Talebzadeh <mich.talebza...@gmail.com>
>
>
> Hello,
>
> I am enjoying holidays untill the end of august, for urgent matters
> contact the BI department on 702 or 703 or send an email to
> b...@napoleongames.be.
>
> For really urgent matters contact me on my mobile phone: +32 477 75 95 33.
>
> kind regards
> Geert
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>



-- 
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni

ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
Github: github.com/EntilZha | LinkedIn:
https://www.linkedin.com/in/pedrorodriguezscience


Re: performance problem when reading lots of small files created by spark streaming.

2016-07-27 Thread Pedro Rodriguez
There are a few blog posts that detail one possible/likely issue for
example:
http://tech.kinja.com/how-not-to-pull-from-s3-using-apache-spark-1704509219

TLDR: The hadoop libraries spark uses assumes that its input comes from a
 file system (works with HDFS) however S3 is a key value store, not a file
system. Somewhere along the line, this makes things very slow. Below I
describe their approach and a library I am working on to solve this problem.

(Much) Longer Version (with a shiny new library in development):
So far in my reading of source code, Hadoop attempts to actually read from
S3 which can be expensive particularly since it does so from a single
driver core (different from listing files, actually reading them, I can
find the source code and link it later if you would like). The concept
explained above is to instead use the AWS sdk to list files then distribute
the files names as a collection with sc.parallelize, then read them in
parallel. I found this worked, but lacking in a few ways so I started this
project: https://github.com/EntilZha/spark-s3

This takes that idea further by:
1. Rather than sc.parallelize, implement the RDD interface where each
partition is defined by the files it needs to read (haven't gotten to
DataFrames yet)
2. At the driver node, use the AWS SDK to list all the files with their
size (listing is fast), then run the Least Processing Time Algorithm to
sift the files into roughly balanced partitions by size
3. API: S3Context(sc).textFileByPrefix("bucket", "file1",
"folder2").regularRDDOperationsHere or import implicits and do
sc.s3.textFileByPrefix

At present, I am battle testing and benchmarking it at my current job and
results are promising with significant improvements to jobs dealing with
many files especially many small files and to jobs whose input is
unbalanced to start with. Jobs perform better because: 1) there isn't a
long stall at the driver when hadoop decides how to split S3 files 2) the
partitions end up nearly perfectly balanced because of LPT algorithm.

Since I hadn't intended to advertise this quite yet the documentation is
not super polished but exists here:
http://spark-s3.entilzha.io/latest/api/#io.entilzha.spark.s3.S3Context

I am completing the sonatype process for publishing artifacts on maven
central (this should be done by tomorrow so referencing
"io.entilzha:spark-s3_2.10:0.0.0" should work very soon). I would love to
hear if this library solution works, otherwise I hope the blog post above
is illuminating.

Pedro

On Wed, Jul 27, 2016 at 8:19 PM, Andy Davidson <
a...@santacruzintegration.com> wrote:

> I have a relatively small data set however it is split into many small
> JSON files. Each file is between maybe 4K and 400K
> This is probably a very common issue for anyone using spark streaming. My
> streaming app works fine, how ever my batch application takes several hours
> to run.
>
> All I am doing is calling count(). Currently I am trying to read the files
> from s3. When I look at the app UI it looks like spark is blocked probably
> on IO? Adding additional workers and memory does not improve performance.
>
> I am able to copy the files from s3 to a worker relatively quickly. So I
> do not think s3 read time is the problem.
>
> In the past when I had similar data sets stored on HDFS I was able to use
> coalesce() to reduce the number of partition from 200K to 30. This made a
> big improvement in processing time. How ever when I read from s3 coalesce()
> does not improve performance.
>
> I tried copying the files to a normal file system and then using ‘hadoop
> fs put’ to copy the files to hdfs how ever this takes several hours and is
> no where near completion. It appears hdfs does not deal with small files
> well.
>
> I am considering copying the files from s3 to a normal file system on one
> of my workers and then concatenating the files into a few much large files,
> then using ‘hadoop fs put’ to move them to hdfs. Do you think this would
> improve the spark count() performance issue?
>
> Does anyone know of heuristics for determining the number or size of the
> concatenated files?
>
> Thanks in advance
>
> Andy
>



-- 
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni

ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
Github: github.com/EntilZha | LinkedIn:
https://www.linkedin.com/in/pedrorodriguezscience


Re: dynamic coalesce to pick file size

2016-07-26 Thread Pedro Rodriguez
I asked something similar if you search for "Tools for Balancing Partitions
By Size" (I couldn't find link on archives). Unfortunately there doesn't
seem to be something good right now other than knowing your job statistics.
I am planning on implementing the idea I explained in the last paragraph or
so of the last email I sent in this library
https://github.com/EntilZha/spark-s3 although it could be a while to make
my way up to data frames (adds for now).

On Tue, Jul 26, 2016 at 1:02 PM, Maurin Lenglart <mau...@cuberonlabs.com>
wrote:

> Hi,
>
> I am doing a Sql query that return a Dataframe. Then I am writing the
> result of the query using “df.write”, but the result get written in a lot
> of different small files (~100 of 200 ko). So now I am doing a
> “.coalesce(2)” before the write.
>
> But the number “2” that I picked is static, is there have a way of
> dynamically picking the number depending of the file size wanted? (around
> 256mb would be perfect)
>
>
>
> I am running spark 1.6 on CDH using yarn, the files are written in parquet
> format.
>
>
>
> Thanks
>
>
>



-- 
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni

ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
Github: github.com/EntilZha | LinkedIn:
https://www.linkedin.com/in/pedrorodriguezscience


Re: dataframe.foreach VS dataframe.collect().foreach

2016-07-26 Thread Pedro Rodriguez
:)

Just realized you didn't get your original question answered though:

scala> import sqlContext.implicits._
import sqlContext.implicits._

scala> case class Person(age: Long, name: String)
defined class Person

scala> val df = Seq(Person(24, "pedro"), Person(22, "fritz")).toDF()
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> df.select("age")
res2: org.apache.spark.sql.DataFrame = [age: bigint]

scala> df.select("age").collect.map(_.getLong(0))
res3: Array[Long] = Array(24, 22)

scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row

scala> df.collect.flatMap {
 | case Row(age: Long, name: String) => Seq(Tuple1(age))
 | case _ => Seq()
 | }
res7: Array[(Long,)] = Array((24,), (22,))

These docs are helpful
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Row
(1.6 docs, but should be similar in 2.0)

On Tue, Jul 26, 2016 at 7:08 AM, Gourav Sengupta <gourav.sengu...@gmail.com>
wrote:

> And Pedro has made sense of a world running amok, scared, and drunken
> stupor.
>
> Regards,
> Gourav
>
> On Tue, Jul 26, 2016 at 2:01 PM, Pedro Rodriguez <ski.rodrig...@gmail.com>
> wrote:
>
>> I am not 100% as I haven't tried this out, but there is a huge difference
>> between the two. Both foreach and collect are actions irregardless of
>> whether or not the data frame is empty.
>>
>> Doing a collect will bring all the results back to the driver, possibly
>> forcing it to run out of memory. Foreach will apply your function to each
>> element of the DataFrame, but will do so across the cluster. This behavior
>> is useful for when you need to do something custom for each element
>> (perhaps save to a db for which there is no driver or something custom like
>> make an http request per element, careful here though due to overhead cost).
>>
>> In your example, I am going to assume that hrecords is something like a
>> list buffer. The reason that will be empty is that each worker will get
>> sent an empty list (its captured in the closure for foreach) and append to
>> it. The instance of the list at the driver doesn't know about what happened
>> at the workers so its empty.
>>
>> I don't know why Chanh's comment applies here since I am guessing the df
>> is not empty.
>>
>> On Tue, Jul 26, 2016 at 1:53 AM, kevin <kiss.kevin...@gmail.com> wrote:
>>
>>> thank you Chanh
>>>
>>> 2016-07-26 15:34 GMT+08:00 Chanh Le <giaosu...@gmail.com>:
>>>
>>>> Hi Ken,
>>>>
>>>> *blacklistDF -> just DataFrame *
>>>> Spark is lazy until you call something like* collect, take, write* it
>>>> will execute the hold process *like you do map or filter before you
>>>> collect*.
>>>> That mean until you call collect spark* do nothing* so you df would
>>>> not have any data -> can’t call foreach.
>>>> Call collect execute the process -> get data -> foreach is ok.
>>>>
>>>>
>>>> On Jul 26, 2016, at 2:30 PM, kevin <kiss.kevin...@gmail.com> wrote:
>>>>
>>>>  blacklistDF.collect()
>>>>
>>>>
>>>>
>>>
>>
>>
>> --
>> Pedro Rodriguez
>> PhD Student in Distributed Machine Learning | CU Boulder
>> UC Berkeley AMPLab Alumni
>>
>> ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
>> Github: github.com/EntilZha | LinkedIn:
>> https://www.linkedin.com/in/pedrorodriguezscience
>>
>>
>


-- 
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni

ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
Github: github.com/EntilZha | LinkedIn:
https://www.linkedin.com/in/pedrorodriguezscience


Re: dataframe.foreach VS dataframe.collect().foreach

2016-07-26 Thread Pedro Rodriguez
I am not 100% as I haven't tried this out, but there is a huge difference
between the two. Both foreach and collect are actions irregardless of
whether or not the data frame is empty.

Doing a collect will bring all the results back to the driver, possibly
forcing it to run out of memory. Foreach will apply your function to each
element of the DataFrame, but will do so across the cluster. This behavior
is useful for when you need to do something custom for each element
(perhaps save to a db for which there is no driver or something custom like
make an http request per element, careful here though due to overhead cost).

In your example, I am going to assume that hrecords is something like a
list buffer. The reason that will be empty is that each worker will get
sent an empty list (its captured in the closure for foreach) and append to
it. The instance of the list at the driver doesn't know about what happened
at the workers so its empty.

I don't know why Chanh's comment applies here since I am guessing the df is
not empty.

On Tue, Jul 26, 2016 at 1:53 AM, kevin <kiss.kevin...@gmail.com> wrote:

> thank you Chanh
>
> 2016-07-26 15:34 GMT+08:00 Chanh Le <giaosu...@gmail.com>:
>
>> Hi Ken,
>>
>> *blacklistDF -> just DataFrame *
>> Spark is lazy until you call something like* collect, take, write* it
>> will execute the hold process *like you do map or filter before you
>> collect*.
>> That mean until you call collect spark* do nothing* so you df would not
>> have any data -> can’t call foreach.
>> Call collect execute the process -> get data -> foreach is ok.
>>
>>
>> On Jul 26, 2016, at 2:30 PM, kevin <kiss.kevin...@gmail.com> wrote:
>>
>>  blacklistDF.collect()
>>
>>
>>
>


-- 
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni

ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
Github: github.com/EntilZha | LinkedIn:
https://www.linkedin.com/in/pedrorodriguezscience


Re: Spark SQL overwrite/append for partitioned tables

2016-07-25 Thread Pedro Rodriguez
Probably should have been more specific with the code we are using, which
is something like

val df = 
df.write.mode("append or overwrite
here").partitionBy("date").saveAsTable("my_table")

Unless there is something like what I described on the native API, I will
probably take the approach of having a S3 API call to wipe out that
partition before the job starts, but it would be nice to not have to
incorporate another step in the job.

Pedro

On Mon, Jul 25, 2016 at 5:23 PM, RK Aduri <rkad...@collectivei.com> wrote:

> You can have a temporary file to capture the data that you would like to
> overwrite. And swap that with existing partition that you would want to
> wipe the data away. Swapping can be done by simple rename of the partition
> and just repair the table to pick up the new partition.
>
> Am not sure if that addresses your scenario.
>
> On Jul 25, 2016, at 4:18 PM, Pedro Rodriguez <ski.rodrig...@gmail.com>
> wrote:
>
> What would be the best way to accomplish the following behavior:
>
> 1. There is a table which is partitioned by date
> 2. Spark job runs on a particular date, we would like it to wipe out all
> data for that date. This is to make the job idempotent and lets us rerun a
> job if it failed without fear of duplicated data
> 3. Preserve data for all other dates
>
> I am guessing that overwrite would not work here or if it does its not
> guaranteed to stay that way, but am not sure. If thats the case, is there a
> good/robust way to get this behavior?
>
> --
> Pedro Rodriguez
> PhD Student in Distributed Machine Learning | CU Boulder
> UC Berkeley AMPLab Alumni
>
> ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
> Github: github.com/EntilZha | LinkedIn:
> https://www.linkedin.com/in/pedrorodriguezscience
>
>
>
> Collective[i] dramatically improves sales and marketing performance using
> technology, applications and a revolutionary network designed to provide
> next generation analytics and decision-support directly to business users.
> Our goal is to maximize human potential and minimize mistakes. In most
> cases, the results are astounding. We cannot, however, stop emails from
> sometimes being sent to the wrong person. If you are not the intended
> recipient, please notify us by replying to this email's sender and deleting
> it (and any attachments) permanently from your system. If you are, please
> respect the confidentiality of this communication's contents.




-- 
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni

ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
Github: github.com/EntilZha | LinkedIn:
https://www.linkedin.com/in/pedrorodriguezscience


Spark SQL overwrite/append for partitioned tables

2016-07-25 Thread Pedro Rodriguez
What would be the best way to accomplish the following behavior:

1. There is a table which is partitioned by date
2. Spark job runs on a particular date, we would like it to wipe out all
data for that date. This is to make the job idempotent and lets us rerun a
job if it failed without fear of duplicated data
3. Preserve data for all other dates

I am guessing that overwrite would not work here or if it does its not
guaranteed to stay that way, but am not sure. If thats the case, is there a
good/robust way to get this behavior?

-- 
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni

ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
Github: github.com/EntilZha | LinkedIn:
https://www.linkedin.com/in/pedrorodriguezscience


Re: Spark 2.0

2016-07-25 Thread Pedro Rodriguez
Spark 2.0 vote for RC5 passed last Friday night so it will probably be
released early this week if I had to guess.

On Mon, Jul 25, 2016 at 12:23 PM, Bryan Jeffrey <bryan.jeff...@gmail.com>
wrote:

> All,
>
> I had three questions:
>
> (1) Is there a timeline for stable Spark 2.0 release?  I know the
> 'preview' build is out there, but was curious what the timeline was for
> full release. Jira seems to indicate that there should be a release 7/27.
>
> (2)  For 'continuous' datasets there has been a lot of discussion. One
> item that came up in tickets was the idea that 'count()' and other
> functions do not apply to continuous datasets:
> https://github.com/apache/spark/pull/12080.  In this case what is the
> intended procedure to calculate a streaming statistic based on an interval
> (e.g. count the number of records in a 2 minute window every 2 minutes)?
>
> (3) In previous releases (1.6.1) the call to DStream / RDD repartition w/
> a number of partitions set to zero silently deletes data.  I have looked in
> Jira for a similar issue, but I do not see one.  I would like to address
> this (and would likely be willing to go fix it myself).  Should I just
> create a ticket?
>
> Thank you,
>
> Bryan Jeffrey
>
>


-- 
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni

ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
Github: github.com/EntilZha | LinkedIn:
https://www.linkedin.com/in/pedrorodriguezscience


Re: How to generate a sequential key in rdd across executors

2016-07-24 Thread Pedro Rodriguez
If you can use a dataframe then you could use rank + window function at the
expense of an extra sort. Do you have an example of zip with index not
working, that seems surprising.
On Jul 23, 2016 10:24 PM, "Andrew Ehrlich"  wrote:

> It’s hard to do in a distributed system. Maybe try generating a meaningful
> key using a timestamp + hashed unique key fields in the record?
>
> > On Jul 23, 2016, at 7:53 PM, yeshwanth kumar 
> wrote:
> >
> > Hi,
> >
> > i am doing bulk load to hbase using spark,
> > in which i need to generate a sequential key for each record,
> > the key should be sequential across all the executors.
> >
> > i tried zipwith index, didn't worked because zipwith index gives index
> per executor not across all executors.
> >
> > looking for some suggestions.
> >
> >
> > Thanks,
> > -Yeshwanth
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Choosing RDD/DataFrame/DataSet and Cluster Tuning

2016-07-23 Thread Pedro Rodriguez
Hi Jestin,

Spark is smart about how it does joins. In this case, if df2 is sufficiently 
small it will do a broadcast join. Basically, rather than shuffle df1/df2 for a 
join, it broadcasts df2 to all workers and joins locally. Looks like you may 
already have known that though based on using the 
spark.sql.autoBroadcastJoinThreshold.

Its hard to say why your job is slow without knowing more. For example, it 
could be a CPU intensive calculation or maybe you have imbalance over keys 
which would cause a straggler. Hard to know without knowing what some of the 
metrics from the Spark UI are like.

1. If you aren’t tied down by legacy code, Spark 2.0 has a nicer Dataset API 
and more improvements so I don’t see why not. Spark 2.0 RC5 vote passed last 
night so the official release will probably go out early next week
2. RDDs will make it worse. In the case of reduceByKey/groupByKey this is 
specific to RDDs, the DataFrame API doesn’t mirror that. You hear that because 
reduceByKey will run reduce locally at each node for each key, then reduce all 
those results to get the final result. groupByKey will shuffle all keys across 
the network which if you are just doing a reduce right after is wasteful. 
DataFrame’s have lots of optimizations as well
3. Shouldn’t need to explicitly call broadcast
4. Driver memory is important if your node needs to collect results back to it 
for some reason. One good example is in mllib/ml its common to collect 
parameters back to the driver to update a global model. For some algorithms 
(like LDA), the model can be quite large so it requires high driver memory.
5. Hard to know without more metrics from your job. That being said, your 
number of executor instances vs number of cores seems a bit high. I would try 5 
instances of 15 cores each or 10 of 7 cores each. You can also kick up the 
memory to use more of your cluster’s memory. Lastly, if you are running on EC2 
make sure to configure spark.local.dir to write to something that is not an EBS 
volume, preferably an attached SSD to something like an r3 machine.

—
Pedro Rodriguez
PhD Student in Large-Scale Machine Learning | CU Boulder
Systems Oriented Data Scientist
UC Berkeley AMPLab Alumni

pedrorodriguez.io | 909-353-4423
github.com/EntilZha | LinkedIn

On July 23, 2016 at 9:31:21 AM, Jestin Ma (jestinwith.a...@gmail.com) wrote:

Hello,
Right now I'm using DataFrames to perform a df1.groupBy(key).count() on one 
DataFrame and join with another, df2.

The first, df1, is very large (many gigabytes) compared to df2 (250 Mb).

Right now I'm running this on a cluster of 5 nodes, 16 cores each, 90 GB RAM 
each.
It is taking me about 1 hour and 40 minutes to perform the groupBy, count, and 
join, which seems very slow to me.

Currently I have set the following in my spark-defaults.conf:

spark.executor.instances                             24
spark.executor.memory                               10g
spark.executor.cores                                    3
spark.driver.memory                                     5g
spark.sql.autoBroadcastJoinThreshold        200Mb


I have a couple of questions regarding tuning for performance as a beginner.
Right now I'm running Spark 1.6.0. Would moving to Spark 2.0 DataSet (or even 
DataFrames) be better?
What if I used RDDs instead? I know that reduceByKey is better than groupByKey, 
and DataFrames don't have that method. 
I think I can do a broadcast join and have set a threshold. Do I need to set it 
above my second DataFrame size? Do I need to explicitly call broadcast(df2)?
What's the point of driver memory?
Can anyone point out something wrong with my tuning numbers, or any additional 
parameters worth checking out?

Thank you a lot!
Sincerely,
Jestin

Re: Error in collecting RDD as a Map - IOException in collectAsMap

2016-07-23 Thread Pedro Rodriguez
Have you changed spark-env.sh or spark-defaults.conf from the default? It looks 
like spark is trying to address local workers based on a network address (eg 
192.168……) instead of on localhost (localhost, 127.0.0.1, 0.0.0.0,…). 
Additionally, that network address doesn’t resolve correctly. You might also 
check /etc/hosts to make sure that you don’t have anything weird going on.

Last thing to try perhaps is that are you running Spark within a VM and/or 
Docker? If networking isn’t setup correctly on those you may also run into 
trouble.

What would be helpful is to know everything about your setup that might affect 
networking.

—
Pedro Rodriguez
PhD Student in Large-Scale Machine Learning | CU Boulder
Systems Oriented Data Scientist
UC Berkeley AMPLab Alumni

pedrorodriguez.io | 909-353-4423
github.com/EntilZha | LinkedIn

On July 23, 2016 at 9:10:31 AM, VG (vlin...@gmail.com) wrote:

Hi pedro,

Apologies for not adding this earlier. 

This is running on a local cluster set up as follows.
JavaSparkContext jsc = new JavaSparkContext("local[2]", "DR");

Any suggestions based on this ? 

The ports are not blocked by firewall. 

Regards,



On Sat, Jul 23, 2016 at 8:35 PM, Pedro Rodriguez <ski.rodrig...@gmail.com> 
wrote:
Make sure that you don’t have ports firewalled. You don’t really give much 
information to work from, but it looks like the master can’t access the worker 
nodes for some reason. If you give more information on the cluster, networking, 
etc, it would help.

For example, on AWS you can create a security group which allows all traffic 
to/from itself to itself. If you are using something like ufw on ubuntu then 
you probably need to know the ip addresses of the worker nodes beforehand.

—
Pedro Rodriguez
PhD Student in Large-Scale Machine Learning | CU Boulder
Systems Oriented Data Scientist
UC Berkeley AMPLab Alumni

pedrorodriguez.io | 909-353-4423
github.com/EntilZha | LinkedIn

On July 23, 2016 at 7:38:01 AM, VG (vlin...@gmail.com) wrote:

Please suggest if I am doing something wrong or an alternative way of doing 
this. 

I have an RDD with two values as follows 
JavaPairRDD<String, Long> rdd

When I execute   rdd..collectAsMap()
it always fails with IO exceptions.   


16/07/23 19:03:58 ERROR RetryingBlockFetcher: Exception while beginning fetch 
of 1 outstanding blocks 
java.io.IOException: Failed to connect to /192.168.1.3:58179
at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:228)
at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:179)
at 
org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:96)
at 
org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
at 
org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
at 
org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:105)
at 
org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:92)
at org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:546)
at 
org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:76)
at 
org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:57)
at 
org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:57)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1793)
at 
org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:56)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
Caused by: java.net.ConnectException: Connection timed out: no further 
information: /192.168.1.3:58179
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(Unknown Source)
at 
io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224)
at 
io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:289)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at 
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
... 1 more
16/07/23 19:03:58 INFO RetryingBlockFetcher: Retrying fetch (1/3) for 1 
outstanding blocks after 5000 ms






Re: Error in collecting RDD as a Map - IOException in collectAsMap

2016-07-23 Thread Pedro Rodriguez
Make sure that you don’t have ports firewalled. You don’t really give much 
information to work from, but it looks like the master can’t access the worker 
nodes for some reason. If you give more information on the cluster, networking, 
etc, it would help.

For example, on AWS you can create a security group which allows all traffic 
to/from itself to itself. If you are using something like ufw on ubuntu then 
you probably need to know the ip addresses of the worker nodes beforehand.

—
Pedro Rodriguez
PhD Student in Large-Scale Machine Learning | CU Boulder
Systems Oriented Data Scientist
UC Berkeley AMPLab Alumni

pedrorodriguez.io | 909-353-4423
github.com/EntilZha | LinkedIn

On July 23, 2016 at 7:38:01 AM, VG (vlin...@gmail.com) wrote:

Please suggest if I am doing something wrong or an alternative way of doing 
this. 

I have an RDD with two values as follows 
JavaPairRDD<String, Long> rdd

When I execute   rdd..collectAsMap()
it always fails with IO exceptions.   


16/07/23 19:03:58 ERROR RetryingBlockFetcher: Exception while beginning fetch 
of 1 outstanding blocks 
java.io.IOException: Failed to connect to /192.168.1.3:58179
at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:228)
at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:179)
at 
org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:96)
at 
org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
at 
org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
at 
org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:105)
at 
org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:92)
at org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:546)
at 
org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:76)
at 
org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:57)
at 
org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:57)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1793)
at 
org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:56)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
Caused by: java.net.ConnectException: Connection timed out: no further 
information: /192.168.1.3:58179
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(Unknown Source)
at 
io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224)
at 
io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:289)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at 
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
... 1 more
16/07/23 19:03:58 INFO RetryingBlockFetcher: Retrying fetch (1/3) for 1 
outstanding blocks after 5000 ms





Re: Dataset , RDD zipWithIndex -- How to use as a map .

2016-07-22 Thread Pedro Rodriguez
You could either do monotonically_increasing_id or use a window function
and rank. The first is a simple spark SQL function, data bricks has a
pretty helpful post for how to use window functions (in this case the whole
data set is the window).

On Fri, Jul 22, 2016 at 12:20 PM, Marco Mistroni <mmistr...@gmail.com>
wrote:

> Hi
> So u u have a data frame, then use zipwindex and create a tuple 
> I m not sure if df API has something useful for zip w index.
> But u can
> - get a data frame
> - convert it to rdd (there's a tordd )
> - do a zip with index
>
> That will give u a rdd with 3 fields...
> I don't think you can update df columns
> Hth
> On 22 Jul 2016 5:19 pm, "VG" <vlin...@gmail.com> wrote:
>
> >
>
> > Hi All,
> >
> > Any suggestions for this
> >
> > Regards,
> > VG
> >
> > On Fri, Jul 22, 2016 at 6:40 PM, VG <vlin...@gmail.com> wrote:
>
> >>
>
> >> Hi All,
> >>
> >> I am really confused how to proceed further. Please help.
> >>
> >> I have a dataset created as follows:
> >> Dataset b = sqlContext.sql("SELECT bid, name FROM business");
> >>
> >> Now I need to map each name with a unique index and I did the following
> >> JavaPairRDD<Row, Long> indexedBId = business.javaRDD()
> >>
>  .zipWithIndex();
> >>
> >> In later part of the code I need to change a datastructure and update
> name with index value generated above .
> >> I am unable to figure out how to do a look up here..
> >>
> >> Please suggest /.
> >>
> >> If there is a better way to do this please suggest that.
> >>
> >> Regards
> >> VG
> >>
> >
>



-- 
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni

ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
Github: github.com/EntilZha | LinkedIn:
https://www.linkedin.com/in/pedrorodriguezscience


Re: How to get the number of partitions for a SparkDataFrame in Spark 2.0-preview?

2016-07-22 Thread Pedro Rodriguez
I haven't used SparkR/R before, only Scala/Python APIs so I don't know for
sure.

I am guessing if things are in a DataFrame they were read either from some
disk source (S3/HDFS/file/etc) or they were created from parallelize. If
you are using the first, Spark will for the most part choose a reasonable
number of partitions while for parallelize I think it depends on what your
min parallelism is set to.

In my brief google it looks like dapply is an analogue of mapPartitions.
Usually the reason to use this is if your map operation has some expensive
initialization function. For example, you need to open a connection to a
database so its better to re-use that connection for one partition's
elements than create it for each element.

What are you trying to accomplish with dapply?

On Fri, Jul 22, 2016 at 8:05 PM, Neil Chang <iam...@gmail.com> wrote:

> Thanks Pedro,
>   so to use sparkR dapply on SparkDataFrame, don't we need partition the
> DataFrame first? the example in doc doesn't seem to do this.
> Without knowing how it partitioned, how can one write the function to
> process each partition?
>
> Neil
>
> On Fri, Jul 22, 2016 at 5:56 PM, Pedro Rodriguez <ski.rodrig...@gmail.com>
> wrote:
>
>> This should work and I don't think triggers any actions:
>>
>> df.rdd.partitions.length
>>
>> On Fri, Jul 22, 2016 at 2:20 PM, Neil Chang <iam...@gmail.com> wrote:
>>
>>> Seems no function does this in Spark 2.0 preview?
>>>
>>
>>
>>
>> --
>> Pedro Rodriguez
>> PhD Student in Distributed Machine Learning | CU Boulder
>> UC Berkeley AMPLab Alumni
>>
>> ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
>> Github: github.com/EntilZha | LinkedIn:
>> https://www.linkedin.com/in/pedrorodriguezscience
>>
>>
>


-- 
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni

ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
Github: github.com/EntilZha | LinkedIn:
https://www.linkedin.com/in/pedrorodriguezscience


Re: spark and plot data

2016-07-22 Thread Pedro Rodriguez
As of the most recent 0.6.0 release its partially alleviated, but still not
great (compared to something like Jupyter).

They can be "downloaded" but its only really meaningful in importing it
back to Zeppelin. It would be great if they could be exported as HTML or
PDF, but at present they can't be. I know they have some sort of git
support, but it was never clear to me how it was suppose to be used since
the docs are sparse on that. So far what works best for us is S3 storage,
but you don't get the benefit of Github using that (history + commits etc).

There are a couple other notebooks floating around, Apache Toree seems the
most promising for portability since its based on jupyter
https://github.com/apache/incubator-toree

On Fri, Jul 22, 2016 at 3:53 PM, Gourav Sengupta <gourav.sengu...@gmail.com>
wrote:

> The biggest stumbling block to using Zeppelin has been that we cannot
> download the notebooks, cannot export them and certainly cannot sync them
> back to Github, without mind numbing and sometimes irritating hacks. Have
> those issues been resolved?
>
>
> Regards,
> Gourav
>
>
> On Fri, Jul 22, 2016 at 2:22 PM, Pedro Rodriguez <ski.rodrig...@gmail.com>
> wrote:
>
>> Zeppelin works great. The other thing that we have done in notebooks
>> (like Zeppelin or Databricks) which support multiple types of spark session
>> is register Spark SQL temp tables in our scala code then escape hatch to
>> python for plotting with seaborn/matplotlib when the built in plots are
>> insufficient.
>>
>> —
>> Pedro Rodriguez
>> PhD Student in Large-Scale Machine Learning | CU Boulder
>> Systems Oriented Data Scientist
>> UC Berkeley AMPLab Alumni
>>
>> pedrorodriguez.io | 909-353-4423
>> github.com/EntilZha | LinkedIn
>> <https://www.linkedin.com/in/pedrorodriguezscience>
>>
>> On July 22, 2016 at 3:04:48 AM, Marco Colombo (
>> ing.marco.colo...@gmail.com) wrote:
>>
>> Take a look at zeppelin
>>
>> http://zeppelin.apache.org
>>
>> Il giovedì 21 luglio 2016, Andy Davidson <a...@santacruzintegration.com>
>> ha scritto:
>>
>>> Hi Pseudo
>>>
>>> Plotting, graphing, data visualization, report generation are common
>>> needs in scientific and enterprise computing.
>>>
>>> Can you tell me more about your use case? What is it about the current
>>> process / workflow do you think could be improved by pushing plotting (I
>>> assume you mean plotting and graphing) into spark.
>>>
>>>
>>> In my personal work all the graphing is done in the driver on summary
>>> stats calculated using spark. So for me using standard python libs has not
>>> been a problem.
>>>
>>> Andy
>>>
>>> From: pseudo oduesp <pseudo20...@gmail.com>
>>> Date: Thursday, July 21, 2016 at 8:30 AM
>>> To: "user @spark" <user@spark.apache.org>
>>> Subject: spark and plot data
>>>
>>> Hi ,
>>> i know spark  it s engine  to compute large data set but for me i work
>>> with pyspark and it s very wonderful machine
>>>
>>> my question  we  don't have tools for ploting data each time we have to
>>> switch and go back to python for using plot.
>>> but when you have large result scatter plot or roc curve  you cant use
>>> collect to take data .
>>>
>>> somone have propostion for plot .
>>>
>>> thanks
>>>
>>>
>>
>> --
>> Ing. Marco Colombo
>>
>>
>


-- 
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni

ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
Github: github.com/EntilZha | LinkedIn:
https://www.linkedin.com/in/pedrorodriguezscience


Re: How to search on a Dataset / RDD <Row, Long >

2016-07-22 Thread Pedro Rodriguez
You might look at monotonically_increasing_id() here
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.functions
instead of converting it to an RDD. since you pay a performance penalty for
that.

If you want to change the name you can do something like this (in scala
since I am not familiar with java API, but it should be similar in java)

val df = sqlContext.sql("select bid, name from
business").withColumn(monotonically_increasing_id().as("id")
// some steps later on
df.withColumn("name", $"id")

I am not 100% what you mean by updating the data structure, I am guessing
you mean replace the name column with the id column? Not, on the second
line the withColumn call uses $"id" which in scala converts to a Column. In
java maybe its something like new Column("id"), not sure.

Pedro

On Fri, Jul 22, 2016 at 12:21 PM, VG <vlin...@gmail.com> wrote:

> Any suggestions here  please
>
> I basically need an ability to look up *name -> index* and *index -> name*
> in the code
>
> -VG
>
> On Fri, Jul 22, 2016 at 6:40 PM, VG <vlin...@gmail.com> wrote:
>
>> Hi All,
>>
>> I am really confused how to proceed further. Please help.
>>
>> I have a dataset created as follows:
>> Dataset b = sqlContext.sql("SELECT bid, name FROM business");
>>
>> Now I need to map each name with a unique index and I did the following
>> JavaPairRDD<Row, Long> indexedBId = business.javaRDD()
>>
>>  .zipWithIndex();
>>
>> In later part of the code I need to change a datastructure and update
>> name with index value generated above .
>> I am unable to figure out how to do a look up here..
>>
>> Please suggest /.
>>
>> If there is a better way to do this please suggest that.
>>
>> Regards
>> VG
>>
>>
>


-- 
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni

ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
Github: github.com/EntilZha | LinkedIn:
https://www.linkedin.com/in/pedrorodriguezscience


Re: How to get the number of partitions for a SparkDataFrame in Spark 2.0-preview?

2016-07-22 Thread Pedro Rodriguez
This should work and I don't think triggers any actions:

df.rdd.partitions.length

On Fri, Jul 22, 2016 at 2:20 PM, Neil Chang <iam...@gmail.com> wrote:

> Seems no function does this in Spark 2.0 preview?
>



-- 
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni

ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
Github: github.com/EntilZha | LinkedIn:
https://www.linkedin.com/in/pedrorodriguezscience


Re: spark and plot data

2016-07-22 Thread Pedro Rodriguez
Zeppelin works great. The other thing that we have done in notebooks (like 
Zeppelin or Databricks) which support multiple types of spark session is 
register Spark SQL temp tables in our scala code then escape hatch to python 
for plotting with seaborn/matplotlib when the built in plots are insufficient.

—
Pedro Rodriguez
PhD Student in Large-Scale Machine Learning | CU Boulder
Systems Oriented Data Scientist
UC Berkeley AMPLab Alumni

pedrorodriguez.io | 909-353-4423
github.com/EntilZha | LinkedIn

On July 22, 2016 at 3:04:48 AM, Marco Colombo (ing.marco.colo...@gmail.com) 
wrote:

Take a look at zeppelin

http://zeppelin.apache.org

Il giovedì 21 luglio 2016, Andy Davidson <a...@santacruzintegration.com> ha 
scritto:
Hi Pseudo

Plotting, graphing, data visualization, report generation are common needs in 
scientific and enterprise computing.

Can you tell me more about your use case? What is it about the current process 
/ workflow do you think could be improved by pushing plotting (I assume you 
mean plotting and graphing) into spark.


In my personal work all the graphing is done in the driver on summary stats 
calculated using spark. So for me using standard python libs has not been a 
problem.

Andy

From: pseudo oduesp <pseudo20...@gmail.com>
Date: Thursday, July 21, 2016 at 8:30 AM
To: "user @spark" <user@spark.apache.org>
Subject: spark and plot data

Hi , 
i know spark  it s engine  to compute large data set but for me i work with 
pyspark and it s very wonderful machine 

my question  we  don't have tools for ploting data each time we have to switch 
and go back to python for using plot.
but when you have large result scatter plot or roc curve  you cant use collect 
to take data .

somone have propostion for plot .

thanks 


--
Ing. Marco Colombo


Re: How can we control CPU and Memory per Spark job operation..

2016-07-22 Thread Pedro Rodriguez
Sorry, wasn’t very clear (looks like Pavan’s response was dropped from list for 
some reason as well).

I am assuming that:
1) the first map is CPU bound
2) the second map is heavily memory bound

To be specific, lets saw you are using 4 m3.2xlarge instances which have 8 CPUs 
and 30GB of ram each for a total of 32 cores and 120GB of ram. Since the NLP 
model can’t be distributed that means every worker/core must use 4GB of RAM. If 
the cluster is fully utilized that means that just for the NLP model you are 
consuming 32 * 4GB = 128GB of ram. The cluster at this point is out of memory 
just for the NLP model not considering the data set itself. My suggestion would 
be see if r3.8xlarge instances will work (or even X1s if you have access) since 
the cpu/memory fraction is better. Here is the “hack” I proposed in more detail 
(basically n partitions < total cores):

1) have the first map have a regular number of partitions, suppose 32 * 4 = 128 
which is a reasonable starting place
2) repartition immediately after that map to 16 partitions. At this point, 
spark is not guaranteed to distributed you work evenly across the 4 nodes, but 
it probably will. The net result is that half the CPU cores are idle, but the 
NLP model is at worse using 16 * 4GB = 64GB of RAM. To be sure, this is a hack 
since the nodes being evenly distributed work is not guaranteed. 

If you wanted to do this as not a hack, you could perform the map, checkpoint 
your work, end the job, then submit a new job where the cpu/memory ratio is 
more favorable which reads from the prior job’s output. I am guessing this 
heavily depends on how expensive reloading the data set from disk/network is. 

Hopefully one of these helps,
—
Pedro Rodriguez
PhD Student in Large-Scale Machine Learning | CU Boulder
Systems Oriented Data Scientist
UC Berkeley AMPLab Alumni

pedrorodriguez.io | 909-353-4423
github.com/EntilZha | LinkedIn

On July 17, 2016 at 6:16:41 AM, Jacek Laskowski (ja...@japila.pl) wrote:

Hi,

How would that help?! Why would you do that?

Jacek


On 17 Jul 2016 7:19 a.m., "Pedro Rodriguez" <ski.rodrig...@gmail.com> wrote:
You could call map on an RDD which has “many” partitions, then call 
repartition/coalesce to drastically reduce the number of partitions so that 
your second map job has less things running.

—
Pedro Rodriguez
PhD Student in Large-Scale Machine Learning | CU Boulder
Systems Oriented Data Scientist
UC Berkeley AMPLab Alumni

pedrorodriguez.io | 909-353-4423
github.com/EntilZha | LinkedIn

On July 16, 2016 at 4:46:04 PM, Jacek Laskowski (ja...@japila.pl) wrote:

Hi,

My understanding is that these two map functions will end up as a job
with one stage (as if you wrote the two maps as a single map) so you
really need as much vcores and memory as possible for map1 and map2. I
initially thought about dynamic allocation of executors that may or
may not help you with the case, but since there's just one stage I
don't think you can do much.

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark http://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Fri, Jul 15, 2016 at 9:54 PM, Pavan Achanta <pacha...@sysomos.com> wrote:
> Hi All,
>
> Here is my use case:
>
> I have a pipeline job consisting of 2 map functions:
>
> CPU intensive map operation that does not require a lot of memory.
> Memory intensive map operation that requires upto 4 GB of memory. And this
> 4GB memory cannot be distributed since it is an NLP model.
>
> Ideally what I like to do is to use 20 nodes with 4 cores each and minimal
> memory for first map operation and then use only 3 nodes with minimal CPU
> but each having 4GB of memory for 2nd operation.
>
> While it is possible to control this parallelism for each map operation in
> spark. I am not sure how to control the resources for each operation.
> Obviously I don’t want to start off the job with 20 nodes with 4 cores and
> 4GB memory since I cannot afford that much memory.
>
> We use Yarn with Spark. Any suggestions ?
>
> Thanks and regards,
> Pavan
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: How can we control CPU and Memory per Spark job operation..

2016-07-16 Thread Pedro Rodriguez
You could call map on an RDD which has “many” partitions, then call 
repartition/coalesce to drastically reduce the number of partitions so that 
your second map job has less things running.

—
Pedro Rodriguez
PhD Student in Large-Scale Machine Learning | CU Boulder
Systems Oriented Data Scientist
UC Berkeley AMPLab Alumni

pedrorodriguez.io | 909-353-4423
github.com/EntilZha | LinkedIn

On July 16, 2016 at 4:46:04 PM, Jacek Laskowski (ja...@japila.pl) wrote:

Hi,  

My understanding is that these two map functions will end up as a job  
with one stage (as if you wrote the two maps as a single map) so you  
really need as much vcores and memory as possible for map1 and map2. I  
initially thought about dynamic allocation of executors that may or  
may not help you with the case, but since there's just one stage I  
don't think you can do much.  

Pozdrawiam,  
Jacek Laskowski  
  
https://medium.com/@jaceklaskowski/  
Mastering Apache Spark http://bit.ly/mastering-apache-spark  
Follow me at https://twitter.com/jaceklaskowski  


On Fri, Jul 15, 2016 at 9:54 PM, Pavan Achanta <pacha...@sysomos.com> wrote:  
> Hi All,  
>  
> Here is my use case:  
>  
> I have a pipeline job consisting of 2 map functions:  
>  
> CPU intensive map operation that does not require a lot of memory.  
> Memory intensive map operation that requires upto 4 GB of memory. And this  
> 4GB memory cannot be distributed since it is an NLP model.  
>  
> Ideally what I like to do is to use 20 nodes with 4 cores each and minimal  
> memory for first map operation and then use only 3 nodes with minimal CPU  
> but each having 4GB of memory for 2nd operation.  
>  
> While it is possible to control this parallelism for each map operation in  
> spark. I am not sure how to control the resources for each operation.  
> Obviously I don’t want to start off the job with 20 nodes with 4 cores and  
> 4GB memory since I cannot afford that much memory.  
>  
> We use Yarn with Spark. Any suggestions ?  
>  
> Thanks and regards,  
> Pavan  
>  
>  

-  
To unsubscribe e-mail: user-unsubscr...@spark.apache.org  



Re: Saving data frames on Spark Master/Driver

2016-07-14 Thread Pedro Rodriguez
Out of curiosity, is there a way to pull all the data back to the driver to 
save without collect()? That is, stream the data in chunks back to the driver 
so that maximum memory used comparable to a single node’s data, but all the 
data is saved on one node.

—
Pedro Rodriguez
PhD Student in Large-Scale Machine Learning | CU Boulder
Systems Oriented Data Scientist
UC Berkeley AMPLab Alumni

pedrorodriguez.io | 909-353-4423
github.com/EntilZha | LinkedIn

On July 14, 2016 at 6:02:12 PM, Jacek Laskowski (ja...@japila.pl) wrote:

Hi,  

Please re-consider your wish since it is going to move all the  
distributed dataset to the single machine of the driver and may lead  
to OOME. It's more pro to save your result to HDFS or S3 or any other  
distributed filesystem (that is accessible by the driver and  
executors).  

If you insist...  

Use collect() after select() and work with Array[T].  

Pozdrawiam,  
Jacek Laskowski  
  
https://medium.com/@jaceklaskowski/  
Mastering Apache Spark http://bit.ly/mastering-apache-spark  
Follow me at https://twitter.com/jaceklaskowski  


On Fri, Jul 15, 2016 at 12:15 AM, vr.n. nachiappan  
<nachiappan_...@yahoo.com.invalid> wrote:  
> Hello,  
>  
> I am using data frames to join two cassandra tables.  
>  
> Currently when i invoke save on data frames as shown below it is saving the  
> join results on executor nodes.  
>  
> joineddataframe.select(,   
> ...).format("com.databricks.spark.csv").option("header",  
> "true").save()  
>  
> I would like to persist the results of the join on Spark Master/Driver node.  
> Is it possible to save the results on Spark Master/Driver and how to do it.  
>  
> I appreciate your help.  
>  
> Nachi  
>  

-  
To unsubscribe e-mail: user-unsubscr...@spark.apache.org  



Re: Call http request from within Spark

2016-07-14 Thread Pedro Rodriguez
Hi Amit,

Have you tried running a subset of the IDs locally on a single thread? It
would be useful to benchmark your getProfile function for a subset of the
data then estimate how long the full data set would take then divide by
number of spark executor cores. This should at least serve as a sanity
check. If things are much slower than expected is it possible that the
service has a rate limit per ip address that you are hitting?

If requests is more efficient at batching requests together (I don't know
much about its internal implementation and connection pools) you could do
that with mapPartitions. This is useful when the initialization time of the
function in the map call is expensive (eg uses a connection pool for a db
or web) as it allows you to initialize that resource once per partition
then reuse it for all the elements in the partition.

Pedro

On Thu, Jul 14, 2016 at 8:52 AM, Amit Dutta <amitkrdu...@outlook.com> wrote:

> Hi All,
>
>
> I have a requirement to call a rest service url for 300k customer ids.
>
> Things I have tried so far is
>
>
> custid_rdd = sc.textFile('file:Users/zzz/CustomerID_SC/Inactive User
> Hashed LCID List.csv') #getting all the customer ids and building adds
>
> profile_rdd = custid_rdd.map(lambda r: getProfile(r.split(',')[0]))
>
> profile_rdd.count()
>
>
> #getprofile is the method to do the http call
>
> def getProfile(cust_id):
>
> api_key = 'txt'
>
> api_secret = 'yuyuy'
>
> profile_uri = 'https://profile.localytics.com/x1/customers/{}'
>
> customer_id = cust_id
>
>
> if customer_id is not None:
>
> data = requests.get(profile_uri.format(customer_id),
> auth=requests.auth.HTTPBasicAuth(api_key, api_secret))
>
> # print json.dumps(data.json(), indent=4)
>
> return data
>
>
> when I print the json dump of the data i see it returning results from the
> rest call. But the count never stops.
>
>
> Is there an efficient way of dealing this? Some post says we have to
> define a batch size etc but don't know how.
>
>
> Appreciate your help
>
>
> Regards,
>
> Amit
>
>


-- 
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni

ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
Github: github.com/EntilZha | LinkedIn:
https://www.linkedin.com/in/pedrorodriguezscience


Re: Tools for Balancing Partitions by Size

2016-07-13 Thread Pedro Rodriguez
Hi Gourav,

In our case, we process raw logs into parquet tables that downstream
applications can use for other jobs. The desired outcome is that we only
need to worry about unbalanced input data at the preprocess step so that
downstream jobs can assume balanced input data.

In our specific case, this works because although the raw log rows are of
variable size, the rows in the Spark SQL table are of fixed size by parsing
primitives or chopping arrays. Due to this, in our use case it makes sense
to think in terms of balanced file size because it directly correlates to
having a balanced number of rows/partition and thus balanced partitions.

Given this setting, are there any specific issues you foresee? I agree that
file size isn't a general solution, but in the setting I don't see a reason
it should not work.

Our overall goal is to avoid two problems when we write data to S3:
- Large number of small files (Kbs) since this makes S3 listing take a long
time
- Small number of large files (GBs) since this makes reads not as efficient

Thus far, we have done this on a per-application basis with repartition and
a manually tuned number of partitions, but this is inconvenient. We are
interested in seeing if there is a way to automatically infer the number of
partitions we should use so that our files in S3 have a particular average
size (without incurring too high an overhead cost).

The solution that seems most promising right now is:

   - Define a custom write function which does two steps:
   - Write one partition to S3 and get files size and number of records
   - Use that to determine the number of partitions to repartition to, then
   write everything to S3

What seems unclear is how to compute the parent RDD (suppose its an RDD
with wide dependencies like a join), get one partition for step (1), then
not recompute anything to do step (2) without an explicit cache. This would
make it so the additional overhead on the job is on writing one partition
to S3 which seems like an acceptable level of overhead. Perhaps this could
be accomplished by saying: RDD A computes the size of on partition, RDD B
holds all partitions except for the one from A, the parents of A and B are
the original parent RDD, RDD C has parents A and B and has the overall
write balanced function.

Thanks,
Pedro

On Wed, Jul 13, 2016 at 9:10 AM, Gourav Sengupta <gourav.sengu...@gmail.com>
wrote:

> Hi,
>
> Using file size is a very bad way of managing data provided you think that
> volume, variety and veracity does not holds true. Actually its a very bad
> way of thinking and designing data solutions, you are bound to hit bottle
> necks, optimization issues, and manual interventions.
>
> I have found thinking about data in logical partitions helps overcome most
> of the design problems that is mentioned above.
>
> You can either use reparition with shuffling or colasce with shuffle
> turned off to manage loads.
>
> If you are using HIVE just let me know.
>
>
> Regards,
> Gourav Sengupta
>
> On Wed, Jul 13, 2016 at 5:39 AM, Pedro Rodriguez <ski.rodrig...@gmail.com>
> wrote:
>
>> The primary goal for balancing partitions would be for the write to S3.
>> We would like to prevent unbalanced partitions (can do with repartition),
>> but also avoid partitions that are too small or too large.
>>
>> So for that case, getting the cache size would work Maropu if its roughly
>> accurate, but for data ingest we aren’t caching, just writing straight
>> through to S3.
>>
>> The idea for writing to disk and checking for the size is interesting
>> Hatim. For certain jobs, it seems very doable to write a small percentage
>> of the data to S3, check the file size through the AWS API, and use that to
>> estimate the total size. Thanks for the idea.
>>
>> —
>> Pedro Rodriguez
>> PhD Student in Large-Scale Machine Learning | CU Boulder
>> Systems Oriented Data Scientist
>> UC Berkeley AMPLab Alumni
>>
>> pedrorodriguez.io | 909-353-4423
>> github.com/EntilZha | LinkedIn
>> <https://www.linkedin.com/in/pedrorodriguezscience>
>>
>> On July 12, 2016 at 7:26:17 PM, Hatim Diab (timd...@gmail.com) wrote:
>>
>> Hi,
>>
>> Since the final size depends on data types and compression. I've had to
>> first get a rough estimate of data, written to disk, then compute the
>> number of partitions.
>>
>> partitions = int(ceil(size_data * conversion_ratio / block_size))
>>
>> In my case block size 256mb, source txt & dest is snappy parquet,
>> compression_ratio .6
>>
>> df.repartition(partitions).write.parquet(output)
>>
>> Which yields files in the range of 230mb.
>>
>> Another way was to count and come up with an imperial formula.
>>
>

Re: Tools for Balancing Partitions by Size

2016-07-12 Thread Pedro Rodriguez
The primary goal for balancing partitions would be for the write to S3. We 
would like to prevent unbalanced partitions (can do with repartition), but also 
avoid partitions that are too small or too large.

So for that case, getting the cache size would work Maropu if its roughly 
accurate, but for data ingest we aren’t caching, just writing straight through 
to S3.

The idea for writing to disk and checking for the size is interesting Hatim. 
For certain jobs, it seems very doable to write a small percentage of the data 
to S3, check the file size through the AWS API, and use that to estimate the 
total size. Thanks for the idea.

—
Pedro Rodriguez
PhD Student in Large-Scale Machine Learning | CU Boulder
Systems Oriented Data Scientist
UC Berkeley AMPLab Alumni

pedrorodriguez.io | 909-353-4423
github.com/EntilZha | LinkedIn

On July 12, 2016 at 7:26:17 PM, Hatim Diab (timd...@gmail.com) wrote:

Hi,

Since the final size depends on data types and compression. I've had to first 
get a rough estimate of data, written to disk, then compute the number of 
partitions.

partitions = int(ceil(size_data * conversion_ratio / block_size))

In my case block size 256mb, source txt & dest is snappy parquet, 
compression_ratio .6

df.repartition(partitions).write.parquet(output)

Which yields files in the range of 230mb.

Another way was to count and come up with an imperial formula.

Cheers,
Hatim


On Jul 12, 2016, at 9:07 PM, Takeshi Yamamuro <linguin@gmail.com> wrote:

Hi,

There is no simple way to access the size in a driver side.
Since the partitions of primitive typed data (e.g., int) are compressed by 
`DataFrame#cache`,
the actual size is possibly a little bit different from processing partitions 
size.

// maropu

On Wed, Jul 13, 2016 at 4:53 AM, Pedro Rodriguez <ski.rodrig...@gmail.com> 
wrote:
Hi,

Are there any tools for partitioning RDD/DataFrames by size at runtime? The 
idea would be to specify that I would like for each partition to be roughly X 
number of megabytes then write that through to S3. I haven't found anything off 
the shelf, and looking through stack overflow posts doesn't seem to yield 
anything concrete.

Is there a way to programmatically get the size or a size estimate for an 
RDD/DataFrame at runtime (eg size of one record would be sufficient)? I gave 
SizeEstimator a try, but it seems like the results varied quite a bit (tried on 
whole RDD and a sample). It would also be useful to get programmatic access to 
the size of the RDD in memory if it is cached.

Thanks,
--
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni

ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
Github: github.com/EntilZha | LinkedIn: 
https://www.linkedin.com/in/pedrorodriguezscience




--
---
Takeshi Yamamuro


Tools for Balancing Partitions by Size

2016-07-12 Thread Pedro Rodriguez
Hi,

Are there any tools for partitioning RDD/DataFrames by size at runtime? The
idea would be to specify that I would like for each partition to be roughly
X number of megabytes then write that through to S3. I haven't found
anything off the shelf, and looking through stack overflow posts doesn't
seem to yield anything concrete.

Is there a way to programmatically get the size or a size estimate for an
RDD/DataFrame at runtime (eg size of one record would be sufficient)? I
gave SizeEstimator a try, but it seems like the results varied quite a bit
(tried on whole RDD and a sample). It would also be useful to get
programmatic access to the size of the RDD in memory if it is cached.

Thanks,
-- 
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni

ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
Github: github.com/EntilZha | LinkedIn:
https://www.linkedin.com/in/pedrorodriguezscience


Spark SQL: Merge Arrays/Sets

2016-07-11 Thread Pedro Rodriguez
Is it possible with Spark SQL to merge columns whose types are Arrays or
Sets?

My use case would be something like this:

DF types
id: String
words: Array[String]

I would want to do something like

df.groupBy('id).agg(merge_arrays('words)) -> list of all words
df.groupBy('id).agg(merge_sets('words)) -> list of distinct words

Thanks,
-- 
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni

ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
Github: github.com/EntilZha | LinkedIn:
https://www.linkedin.com/in/pedrorodriguezscience


Re: question about UDAF

2016-07-11 Thread Pedro Rodriguez
I am not sure I understand your code entirely, but I worked with UDAFs
Friday and over the weekend (
https://gist.github.com/EntilZha/3951769a011389fef25e930258c20a2a).

I think what is going on is that your "update" function is not defined
correctly. Update should take a possibly initialized or in progress buffer
and integrate new results in. Right now, you ignore the input row. What is
probably occurring is that the initialization value "" is setting the
buffer equal to the buffer itself which is "".

Merge is responsible for taking two buffers and merging them together. In
this case, the buffers are "" since initialize makes it "" and update keeps
it "" so the result is just "". I am not sure it matters, but you probably
also want to do buffer.getString(0).

Pedro

On Mon, Jul 11, 2016 at 3:04 AM, <luohui20...@sina.com> wrote:

> hello guys:
>  I have a DF and a UDAF. this DF has 2 columns, lp_location_id , id,
> both are of Int type. I want to group by id and aggregate all value of id
> into 1 string. So I used a UDAF to do this transformation: multi Int values
> to 1 String. However my UDAF returns empty values as the accessory attached.
>  Here is my code for my main class:
> val hc = new org.apache.spark.sql.hive.HiveContext(sc)
> val hiveTable = hc.sql("select lp_location_id,id from
> house_id_pv_location_top50")
>
> val jsonArray = new JsonArray
> val result =
> hiveTable.groupBy("lp_location_id").agg(jsonArray(col("id")).as("jsonArray")).collect.foreach(println)
>
> --
>  Here is my code of my UDAF:
>
> class JsonArray extends UserDefinedAggregateFunction {
>   def inputSchema: org.apache.spark.sql.types.StructType =
> StructType(StructField("id", IntegerType) :: Nil)
>
>   def bufferSchema: StructType = StructType(
> StructField("id", StringType) :: Nil)
>
>   def dataType: DataType = StringType
>
>   def deterministic: Boolean = true
>
>   def initialize(buffer: MutableAggregationBuffer): Unit = {
> buffer(0) = ""
>   }
>
>   def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
> buffer(0) = buffer.getAs[Int](0)
>   }
>
>   def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
> val s1 = buffer1.getAs[Int](0).toString()
> val s2 = buffer2.getAs[Int](0).toString()
> buffer1(0) = s1.concat(s2)
>   }
>
>   def evaluate(buffer: Row): Any = {
> buffer(0)
>   }
> }
>
>
> I don't quit understand why I get empty result from my UDAF, I guess there
> may be 2 reason:
> 1. error initialization with "" in code of define initialize method
> 2. the buffer didn't write successfully.
>
> can anyone share a idea about this. thank you.
>
>
>
>
> 
>
> ThanksBest regards!
> San.Luo
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>



-- 
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni

ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
Github: github.com/EntilZha | LinkedIn:
https://www.linkedin.com/in/pedrorodriguezscience


Re: DataFrame Min By Column

2016-07-09 Thread Pedro Rodriguez
Thanks Michael,

That seems like the analog to sorting tuples. I am curious, is there a 
significant performance penalty to the UDAF versus that? Its certainly nicer 
and more compact code at least.

—
Pedro Rodriguez
PhD Student in Large-Scale Machine Learning | CU Boulder
Systems Oriented Data Scientist
UC Berkeley AMPLab Alumni

pedrorodriguez.io | 909-353-4423
github.com/EntilZha | LinkedIn

On July 9, 2016 at 2:19:11 PM, Michael Armbrust (mich...@databricks.com) wrote:

You can do whats called an argmax/argmin, where you take the min/max of a 
couple of columns that have been grouped together as a struct.  We sort in 
column order, so you can put the timestamp first.

Here is an example.

On Sat, Jul 9, 2016 at 6:10 AM, Pedro Rodriguez <ski.rodrig...@gmail.com> wrote:
I implemented a more generic version which I posted here: 
https://gist.github.com/EntilZha/3951769a011389fef25e930258c20a2a

I think I could generalize this by pattern matching on DataType to use 
different getLong/getDouble/etc functions ( not trying to use getAs[] because 
getting T from Array[T] is hard it seems).

Is there a way to go further and make the arguments unnecessary or inferable at 
runtime, particularly for the valueType since it doesn’t matter what it is? 
DataType is abstract so I can’t instantiate it, is there a way to define the 
method so that it pulls from the user input at runtime?

Thanks,
—
Pedro Rodriguez
PhD Student in Large-Scale Machine Learning | CU Boulder
Systems Oriented Data Scientist
UC Berkeley AMPLab Alumni

pedrorodriguez.io | 909-353-4423
github.com/EntilZha | LinkedIn

On July 9, 2016 at 1:33:18 AM, Pedro Rodriguez (ski.rodrig...@gmail.com) wrote:

Hi Xinh,

A co-worker also found that solution but I thought it was possibly 
overkill/brittle so looks into UDAFs (user defined aggregate functions). I 
don’t have code, but Databricks has a post that has an example 
https://databricks.com/blog/2015/09/16/apache-spark-1-5-dataframe-api-highlights.html.
 From that, I was able to write a MinLongByTimestamp function, but was having a 
hard time writing a generic aggregate to any column by an order able column.

Anyone know how you might go about using generics in a UDAF, or something that 
would mimic union types to express that order able spark sql types are allowed?

—
Pedro Rodriguez
PhD Student in Large-Scale Machine Learning | CU Boulder
Systems Oriented Data Scientist
UC Berkeley AMPLab Alumni

pedrorodriguez.io | 909-353-4423
github.com/EntilZha | LinkedIn

On July 8, 2016 at 6:06:32 PM, Xinh Huynh (xinh.hu...@gmail.com) wrote:

Hi Pedro,

I could not think of a way using an aggregate. It's possible with a window 
function, partitioned on user and ordered by time:

// Assuming "df" holds your dataframe ...

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
val wSpec = Window.partitionBy("user").orderBy("time")
df.select($"user", $"time", rank().over(wSpec).as("rank"))
  .where($"rank" === 1)

Xinh

On Fri, Jul 8, 2016 at 12:57 PM, Pedro Rodriguez <ski.rodrig...@gmail.com> 
wrote:
Is there a way to on a GroupedData (from groupBy in DataFrame) to have an 
aggregate that returns column A based on a min of column B? For example, I have 
a list of sites visited by a given user and I would like to find the event with 
the minimum time (first event)

Thanks,
--
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni

ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
Github: github.com/EntilZha | LinkedIn: 
https://www.linkedin.com/in/pedrorodriguezscience





Re: problem making Zeppelin 0.6 work with Spark 1.6.1, throwing jackson.databind.JsonMappingException exception

2016-07-09 Thread Pedro Rodriguez
It would be helpful if you included relevant configuration files from each or 
if you are using the defaults, particularly any changes to class paths.

I worked through Zeppelin to 0.6.0 at work and at home without any issue so 
hard to say more without having more details.

—
Pedro Rodriguez
PhD Student in Large-Scale Machine Learning | CU Boulder
Systems Oriented Data Scientist
UC Berkeley AMPLab Alumni

pedrorodriguez.io | 909-353-4423
github.com/EntilZha | LinkedIn

On July 9, 2016 at 8:25:30 AM, Mich Talebzadeh (mich.talebza...@gmail.com) 
wrote:

Hi,

I just installed the latest Zeppelin 0.6 as follows:

Source: zeppelin-0.6.0-bin-all

With Spark 1.6.1


Now I am getting this issue with jackson. I did some search  that suggested 
this is caused by the classpath providing you with a different version of 
Jackson than the one Spark is expecting. However, no luck yet. With Spark 1.5.2 
and the previous version of Zeppelin namely 0.5.6-incubating  it used to work 
without problem.

Any ideas will be appreciated


com.fasterxml.jackson.databind.JsonMappingException:
Could not find creator property with name 'id' (in class
org.apache.spark.rdd.RDDOperationScope)

at [Source:
{"id":"14","name":"ExecutedCommand"}; line: 1, column:
1]

at
com.fasterxml.jackson.databind.JsonMappingException.from(JsonMappingException.java:148)

at
com.fasterxml.jackson.databind.DeserializationContext.mappingException(DeserializationContext.java:843)

at
com.fasterxml.jackson.databind.deser.BeanDeserializerFactory.addBeanProps(BeanDeserializerFactory.java:533)

at
com.fasterxml.jackson.databind.deser.BeanDeserializerFactory.buildBeanDeserializer(BeanDeserializerFactory.java:220)

at
com.fasterxml.jackson.databind.deser.BeanDeserializerFactory.createBeanDeserializer(BeanDeserializerFactory.java:143)

at
com.fasterxml.jackson.databind.deser.DeserializerCache._createDeserializer2(DeserializerCache.java:409)

at
com.fasterxml.jackson.databind.deser.DeserializerCache._createDeserializer(DeserializerCache.java:358)

at
com.fasterxml.jackson.databind.deser.DeserializerCache._createAndCache2(DeserializerCache.java:265)

at
com.fasterxml.jackson.databind.deser.DeserializerCache._createAndCacheValueDeserializer(DeserializerCache.java:245)

at
com.fasterxml.jackson.databind.deser.DeserializerCache.findValueDeserializer(DeserializerCache.java:143)

at
com.fasterxml.jackson.databind.DeserializationContext.findRootValueDeserializer(DeserializationContext.java:439)

at
com.fasterxml.jackson.databind.ObjectMapper._findRootDeserializer(ObjectMapper.java:3666)

at
com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3558)

at
com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2578)

at
org.apache.spark.rdd.RDDOperationScope$.fromJson(RDDOperationScope.scala:85)

at
org.apache.spark.rdd.RDDOperationScope$$anonfun$5.apply(RDDOperationScope.scala:136)

at
org.apache.spark.rdd.RDDOperationScope$$anonfun$5.apply(RDDOperationScope.scala:136)

at
scala.Option.map(Option.scala:145)

at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:136)

at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)

at
org.apache.spark.SparkContext.withScope(SparkContext.scala:714)

at
org.apache.spark.SparkContext.parallelize(SparkContext.scala:728)

at
org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:70)

at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)

at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)

at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)

at
org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)

at
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55)

at
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55)

at
org.apache.spark.sql.DataFrame.(DataFrame.scala:145)

at
org.apache.spark.sql.DataFrame.(DataFrame.scala:130)

at
org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:52)

at
org.apache.spark.sql.SQLContext.sql(SQLContext.scala:817)

at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:34)

at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:39)

at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:41)

at
$iwC$$iwC$$iwC$$iwC$$iwC.(:43)

at
$iwC$$iwC$$iwC$$iwC.(:45)

at
$iwC$$iwC$$iwC.(:47)

at
$iwC$$iwC.(:49)

at
$iwC.(:51)

at
(:53)

at
.(:57)

Dr Mich Talebzadeh
 
LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
 
http://talebzadehmich.wordpress.com

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.
 

Re: DataFrame Min By Column

2016-07-09 Thread Pedro Rodriguez
I implemented a more generic version which I posted here: 
https://gist.github.com/EntilZha/3951769a011389fef25e930258c20a2a

I think I could generalize this by pattern matching on DataType to use 
different getLong/getDouble/etc functions ( not trying to use getAs[] because 
getting T from Array[T] is hard it seems).

Is there a way to go further and make the arguments unnecessary or inferable at 
runtime, particularly for the valueType since it doesn’t matter what it is? 
DataType is abstract so I can’t instantiate it, is there a way to define the 
method so that it pulls from the user input at runtime?

Thanks,
—
Pedro Rodriguez
PhD Student in Large-Scale Machine Learning | CU Boulder
Systems Oriented Data Scientist
UC Berkeley AMPLab Alumni

pedrorodriguez.io | 909-353-4423
github.com/EntilZha | LinkedIn

On July 9, 2016 at 1:33:18 AM, Pedro Rodriguez (ski.rodrig...@gmail.com) wrote:

Hi Xinh,

A co-worker also found that solution but I thought it was possibly 
overkill/brittle so looks into UDAFs (user defined aggregate functions). I 
don’t have code, but Databricks has a post that has an example 
https://databricks.com/blog/2015/09/16/apache-spark-1-5-dataframe-api-highlights.html.
 From that, I was able to write a MinLongByTimestamp function, but was having a 
hard time writing a generic aggregate to any column by an order able column.

Anyone know how you might go about using generics in a UDAF, or something that 
would mimic union types to express that order able spark sql types are allowed?

—
Pedro Rodriguez
PhD Student in Large-Scale Machine Learning | CU Boulder
Systems Oriented Data Scientist
UC Berkeley AMPLab Alumni

pedrorodriguez.io | 909-353-4423
github.com/EntilZha | LinkedIn

On July 8, 2016 at 6:06:32 PM, Xinh Huynh (xinh.hu...@gmail.com) wrote:

Hi Pedro,

I could not think of a way using an aggregate. It's possible with a window 
function, partitioned on user and ordered by time:

// Assuming "df" holds your dataframe ...

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
val wSpec = Window.partitionBy("user").orderBy("time")
df.select($"user", $"time", rank().over(wSpec).as("rank"))
  .where($"rank" === 1)

Xinh

On Fri, Jul 8, 2016 at 12:57 PM, Pedro Rodriguez <ski.rodrig...@gmail.com> 
wrote:
Is there a way to on a GroupedData (from groupBy in DataFrame) to have an 
aggregate that returns column A based on a min of column B? For example, I have 
a list of sites visited by a given user and I would like to find the event with 
the minimum time (first event)

Thanks,
--
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni

ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
Github: github.com/EntilZha | LinkedIn: 
https://www.linkedin.com/in/pedrorodriguezscience




Re: DataFrame Min By Column

2016-07-09 Thread Pedro Rodriguez
Hi Xinh,

A co-worker also found that solution but I thought it was possibly 
overkill/brittle so looks into UDAFs (user defined aggregate functions). I 
don’t have code, but Databricks has a post that has an example 
https://databricks.com/blog/2015/09/16/apache-spark-1-5-dataframe-api-highlights.html.
 From that, I was able to write a MinLongByTimestamp function, but was having a 
hard time writing a generic aggregate to any column by an order able column.

Anyone know how you might go about using generics in a UDAF, or something that 
would mimic union types to express that order able spark sql types are allowed?

—
Pedro Rodriguez
PhD Student in Large-Scale Machine Learning | CU Boulder
Systems Oriented Data Scientist
UC Berkeley AMPLab Alumni

pedrorodriguez.io | 909-353-4423
github.com/EntilZha | LinkedIn

On July 8, 2016 at 6:06:32 PM, Xinh Huynh (xinh.hu...@gmail.com) wrote:

Hi Pedro,

I could not think of a way using an aggregate. It's possible with a window 
function, partitioned on user and ordered by time:

// Assuming "df" holds your dataframe ...

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
val wSpec = Window.partitionBy("user").orderBy("time")
df.select($"user", $"time", rank().over(wSpec).as("rank"))
  .where($"rank" === 1)

Xinh

On Fri, Jul 8, 2016 at 12:57 PM, Pedro Rodriguez <ski.rodrig...@gmail.com> 
wrote:
Is there a way to on a GroupedData (from groupBy in DataFrame) to have an 
aggregate that returns column A based on a min of column B? For example, I have 
a list of sites visited by a given user and I would like to find the event with 
the minimum time (first event)

Thanks,
--
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni

ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
Github: github.com/EntilZha | LinkedIn: 
https://www.linkedin.com/in/pedrorodriguezscience




DataFrame Min By Column

2016-07-08 Thread Pedro Rodriguez
Is there a way to on a GroupedData (from groupBy in DataFrame) to have an
aggregate that returns column A based on a min of column B? For example, I
have a list of sites visited by a given user and I would like to find the
event with the minimum time (first event)

Thanks,
-- 
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni

ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
Github: github.com/EntilZha | LinkedIn:
https://www.linkedin.com/in/pedrorodriguezscience


Re: Custom RDD: Report Size of Partition in Bytes to Spark

2016-07-04 Thread Pedro Rodriguez
Just realized I had been replying back to only Takeshi.

Thanks for tip as it got me on the right track. Running into an issue with 
private [spark] methods though. It looks like the input metrics start out as 
None and are not initialized (verified by throwing new Exception on pattern 
match cases when it is None and when its not). Looks like NewHadoopRDD calls 
getInputMetricsForReadMethod which sets _inputMetrics if it is None, but it is 
unfortunately it is private [spark]. Is there a way for external RDDs to access 
this method or somehow initialize _inputMetrics in 1.6.X (looks like 2.0 makes 
more of this API public)?

Using reflection I was able to implement it mimicking the NewHadoopRDD code, 
but if possible would like to avoid using reflection. Below is the source code 
for the method that works.

RDD code: 
https://github.com/EntilZha/spark-s3/blob/9e632f2a71fba2858df748ed43f0dbb5dae52a83/src/main/scala/io/entilzha/spark/s3/S3RDD.scala#L100-L105
Reflection code: 
https://github.com/EntilZha/spark-s3/blob/9e632f2a71fba2858df748ed43f0dbb5dae52a83/src/main/scala/io/entilzha/spark/s3/PrivateMethodUtil.scala

Thanks,
—
Pedro Rodriguez
PhD Student in Large-Scale Machine Learning | CU Boulder
Systems Oriented Data Scientist
UC Berkeley AMPLab Alumni

pedrorodriguez.io | 909-353-4423
github.com/EntilZha | LinkedIn

On July 3, 2016 at 10:31:30 PM, Takeshi Yamamuro (linguin@gmail.com) wrote:

How about using `SparkListener`?
You can collect IO statistics thru TaskMetrics#inputMetrics by yourself.

// maropu

On Mon, Jul 4, 2016 at 11:46 AM, Pedro Rodriguez <ski.rodrig...@gmail.com> 
wrote:
Hi All,

I noticed on some Spark jobs it shows you input/output read size. I am 
implementing a custom RDD which reads files and would like to report these 
metrics to Spark since they are available to me.

I looked through the RDD source code and a couple different implementations and 
the best I could find were some Hadoop metrics. Is there a way to simply report 
the number of bytes a partition read so Spark can put it on the UI?

Thanks,
—
Pedro Rodriguez
PhD Student in Large-Scale Machine Learning | CU Boulder
Systems Oriented Data Scientist
UC Berkeley AMPLab Alumni

pedrorodriguez.io | 909-353-4423
github.com/EntilZha | LinkedIn



--
---
Takeshi Yamamuro


Custom RDD: Report Size of Partition in Bytes to Spark

2016-07-03 Thread Pedro Rodriguez
Hi All,

I noticed on some Spark jobs it shows you input/output read size. I am 
implementing a custom RDD which reads files and would like to report these 
metrics to Spark since they are available to me.

I looked through the RDD source code and a couple different implementations and 
the best I could find were some Hadoop metrics. Is there a way to simply report 
the number of bytes a partition read so Spark can put it on the UI?

Thanks,
—
Pedro Rodriguez
PhD Student in Large-Scale Machine Learning | CU Boulder
Systems Oriented Data Scientist
UC Berkeley AMPLab Alumni

pedrorodriguez.io | 909-353-4423
github.com/EntilZha | LinkedIn

Re: Call Scala API from PySpark

2016-06-30 Thread Pedro Rodriguez
That was indeed the case, using UTF8Deserializer makes everything work
correctly.

Thanks for the tips!

On Thu, Jun 30, 2016 at 3:32 PM, Pedro Rodriguez <ski.rodrig...@gmail.com>
wrote:

> Quick update, I was able to get most of the plumbing to work thanks to the
> code Holden posted and browsing more source code.
>
> I am running into this error which makes me think that maybe I shouldn't
> be leaving the default python RDD serializer/pickler in place and do
> something else
> https://github.com/apache/spark/blob/v1.6.2/python/pyspark/rdd.py#L182:
> _pickle.UnpicklingError: A load persistent id instruction was encountered,
> but no persistent_load function was specified.
>
>
> On Thu, Jun 30, 2016 at 2:13 PM, Pedro Rodriguez <ski.rodrig...@gmail.com>
> wrote:
>
>> Thanks Jeff and Holden,
>>
>> A little more context here probably helps. I am working on implementing
>> the idea from this article to make reads from S3 faster:
>> http://tech.kinja.com/how-not-to-pull-from-s3-using-apache-spark-1704509219
>> (although my name is Pedro, I am not the author of the article). The reason
>> for wrapping SparkContext is so that the code change is from sc.textFile to
>> sc.s3TextFile in addition to configuring AWS keys correctly (seeing if we
>> can open source our library, but depends on company). Overall, its a very
>> light wrapper and perhaps calling it a context is not quite the right name
>> because of that.
>>
>> At the end of the day I make a sc.parallelize call and return an
>> RDD[String] as described in that blog post. I found a post from Py4J
>> mailing list that reminded my that the JVM gateway needs the jars in
>> spark.driver/executor.extraClassPath in addition to the spark.jars option.
>> With that, I can see the classes now. Looks like I need to do as you
>> suggest and wrap it using Java in order to go the last mile to calling the
>> method/constructor. I don't know yet how to get the RDD back to pyspark
>> though so any pointers on that would be great.
>>
>> Thanks for the tip on code Holden, I will take a look to see if that can
>> give me some insight on how to write the Python code part.
>>
>> Thanks!
>> Pedro
>>
>> On Thu, Jun 30, 2016 at 12:23 PM, Holden Karau <hol...@pigscanfly.ca>
>> wrote:
>>
>>> So I'm a little biased - I think the bet bride between the two is using
>>> DataFrames. I've got some examples in my talk and on the high performance
>>> spark GitHub
>>> https://github.com/high-performance-spark/high-performance-spark-examples/blob/master/high_performance_pyspark/simple_perf_test.py
>>> calls some custom scala code.
>>>
>>> Using a custom context is a bit trixie though because of how the
>>> launching is done, as Jeff Zhang points out you would need to wrap it in a
>>> JavaSparkContext and then you could override the _intialize_context
>>> function in context.py
>>>
>>> On Thu, Jun 30, 2016 at 11:06 AM, Jeff Zhang <zjf...@gmail.com> wrote:
>>>
>>>> Hi Pedro,
>>>>
>>>> Your use case is interesting.  I think launching java gateway is the
>>>> same as native SparkContext, the only difference is on creating your custom
>>>> SparkContext instead of native SparkContext. You might also need to wrap it
>>>> using java.
>>>>
>>>>
>>>> https://github.com/apache/spark/blob/v1.6.2/python/pyspark/context.py#L172
>>>>
>>>>
>>>>
>>>> On Thu, Jun 30, 2016 at 9:53 AM, Pedro Rodriguez <
>>>> ski.rodrig...@gmail.com> wrote:
>>>>
>>>>> Hi All,
>>>>>
>>>>> I have written a Scala package which essentially wraps the
>>>>> SparkContext around a custom class that adds some functionality specific 
>>>>> to
>>>>> our internal use case. I am trying to figure out the best way to call this
>>>>> from PySpark.
>>>>>
>>>>> I would like to do this similarly to how Spark itself calls the JVM
>>>>> SparkContext as in:
>>>>> https://github.com/apache/spark/blob/v1.6.2/python/pyspark/context.py
>>>>>
>>>>> My goal would be something like this:
>>>>>
>>>>> Scala Code (this is done):
>>>>> >>> import com.company.mylibrary.CustomContext
>>>>> >>> val myContext = CustomContext(sc)
>>>>> >>> val rdd: RDD[String] = myContext.customTextFile("path")
>>>>>
>&g

Re: Call Scala API from PySpark

2016-06-30 Thread Pedro Rodriguez
Quick update, I was able to get most of the plumbing to work thanks to the
code Holden posted and browsing more source code.

I am running into this error which makes me think that maybe I shouldn't be
leaving the default python RDD serializer/pickler in place and do something
else https://github.com/apache/spark/blob/v1.6.2/python/pyspark/rdd.py#L182:
_pickle.UnpicklingError: A load persistent id instruction was encountered,
but no persistent_load function was specified.


On Thu, Jun 30, 2016 at 2:13 PM, Pedro Rodriguez <ski.rodrig...@gmail.com>
wrote:

> Thanks Jeff and Holden,
>
> A little more context here probably helps. I am working on implementing
> the idea from this article to make reads from S3 faster:
> http://tech.kinja.com/how-not-to-pull-from-s3-using-apache-spark-1704509219
> (although my name is Pedro, I am not the author of the article). The reason
> for wrapping SparkContext is so that the code change is from sc.textFile to
> sc.s3TextFile in addition to configuring AWS keys correctly (seeing if we
> can open source our library, but depends on company). Overall, its a very
> light wrapper and perhaps calling it a context is not quite the right name
> because of that.
>
> At the end of the day I make a sc.parallelize call and return an
> RDD[String] as described in that blog post. I found a post from Py4J
> mailing list that reminded my that the JVM gateway needs the jars in
> spark.driver/executor.extraClassPath in addition to the spark.jars option.
> With that, I can see the classes now. Looks like I need to do as you
> suggest and wrap it using Java in order to go the last mile to calling the
> method/constructor. I don't know yet how to get the RDD back to pyspark
> though so any pointers on that would be great.
>
> Thanks for the tip on code Holden, I will take a look to see if that can
> give me some insight on how to write the Python code part.
>
> Thanks!
> Pedro
>
> On Thu, Jun 30, 2016 at 12:23 PM, Holden Karau <hol...@pigscanfly.ca>
> wrote:
>
>> So I'm a little biased - I think the bet bride between the two is using
>> DataFrames. I've got some examples in my talk and on the high performance
>> spark GitHub
>> https://github.com/high-performance-spark/high-performance-spark-examples/blob/master/high_performance_pyspark/simple_perf_test.py
>> calls some custom scala code.
>>
>> Using a custom context is a bit trixie though because of how the
>> launching is done, as Jeff Zhang points out you would need to wrap it in a
>> JavaSparkContext and then you could override the _intialize_context
>> function in context.py
>>
>> On Thu, Jun 30, 2016 at 11:06 AM, Jeff Zhang <zjf...@gmail.com> wrote:
>>
>>> Hi Pedro,
>>>
>>> Your use case is interesting.  I think launching java gateway is the
>>> same as native SparkContext, the only difference is on creating your custom
>>> SparkContext instead of native SparkContext. You might also need to wrap it
>>> using java.
>>>
>>>
>>> https://github.com/apache/spark/blob/v1.6.2/python/pyspark/context.py#L172
>>>
>>>
>>>
>>> On Thu, Jun 30, 2016 at 9:53 AM, Pedro Rodriguez <
>>> ski.rodrig...@gmail.com> wrote:
>>>
>>>> Hi All,
>>>>
>>>> I have written a Scala package which essentially wraps the SparkContext
>>>> around a custom class that adds some functionality specific to our internal
>>>> use case. I am trying to figure out the best way to call this from PySpark.
>>>>
>>>> I would like to do this similarly to how Spark itself calls the JVM
>>>> SparkContext as in:
>>>> https://github.com/apache/spark/blob/v1.6.2/python/pyspark/context.py
>>>>
>>>> My goal would be something like this:
>>>>
>>>> Scala Code (this is done):
>>>> >>> import com.company.mylibrary.CustomContext
>>>> >>> val myContext = CustomContext(sc)
>>>> >>> val rdd: RDD[String] = myContext.customTextFile("path")
>>>>
>>>> Python Code (I want to be able to do this):
>>>> >>> from company.mylibrary import CustomContext
>>>> >>> myContext = CustomContext(sc)
>>>> >>> rdd = myContext.customTextFile("path")
>>>>
>>>> At the end of each code, I should be working with an ordinary
>>>> RDD[String].
>>>>
>>>> I am trying to access my Scala class through sc._jvm as below, but not
>>>> having any luck so far.
>>>>
>>>> My attempts

Re: Call Scala API from PySpark

2016-06-30 Thread Pedro Rodriguez
Thanks Jeff and Holden,

A little more context here probably helps. I am working on implementing the
idea from this article to make reads from S3 faster:
http://tech.kinja.com/how-not-to-pull-from-s3-using-apache-spark-1704509219
(although my name is Pedro, I am not the author of the article). The reason
for wrapping SparkContext is so that the code change is from sc.textFile to
sc.s3TextFile in addition to configuring AWS keys correctly (seeing if we
can open source our library, but depends on company). Overall, its a very
light wrapper and perhaps calling it a context is not quite the right name
because of that.

At the end of the day I make a sc.parallelize call and return an
RDD[String] as described in that blog post. I found a post from Py4J
mailing list that reminded my that the JVM gateway needs the jars in
spark.driver/executor.extraClassPath in addition to the spark.jars option.
With that, I can see the classes now. Looks like I need to do as you
suggest and wrap it using Java in order to go the last mile to calling the
method/constructor. I don't know yet how to get the RDD back to pyspark
though so any pointers on that would be great.

Thanks for the tip on code Holden, I will take a look to see if that can
give me some insight on how to write the Python code part.

Thanks!
Pedro

On Thu, Jun 30, 2016 at 12:23 PM, Holden Karau <hol...@pigscanfly.ca> wrote:

> So I'm a little biased - I think the bet bride between the two is using
> DataFrames. I've got some examples in my talk and on the high performance
> spark GitHub
> https://github.com/high-performance-spark/high-performance-spark-examples/blob/master/high_performance_pyspark/simple_perf_test.py
> calls some custom scala code.
>
> Using a custom context is a bit trixie though because of how the launching
> is done, as Jeff Zhang points out you would need to wrap it in a
> JavaSparkContext and then you could override the _intialize_context
> function in context.py
>
> On Thu, Jun 30, 2016 at 11:06 AM, Jeff Zhang <zjf...@gmail.com> wrote:
>
>> Hi Pedro,
>>
>> Your use case is interesting.  I think launching java gateway is the same
>> as native SparkContext, the only difference is on creating your custom
>> SparkContext instead of native SparkContext. You might also need to wrap it
>> using java.
>>
>> https://github.com/apache/spark/blob/v1.6.2/python/pyspark/context.py#L172
>>
>>
>>
>> On Thu, Jun 30, 2016 at 9:53 AM, Pedro Rodriguez <ski.rodrig...@gmail.com
>> > wrote:
>>
>>> Hi All,
>>>
>>> I have written a Scala package which essentially wraps the SparkContext
>>> around a custom class that adds some functionality specific to our internal
>>> use case. I am trying to figure out the best way to call this from PySpark.
>>>
>>> I would like to do this similarly to how Spark itself calls the JVM
>>> SparkContext as in:
>>> https://github.com/apache/spark/blob/v1.6.2/python/pyspark/context.py
>>>
>>> My goal would be something like this:
>>>
>>> Scala Code (this is done):
>>> >>> import com.company.mylibrary.CustomContext
>>> >>> val myContext = CustomContext(sc)
>>> >>> val rdd: RDD[String] = myContext.customTextFile("path")
>>>
>>> Python Code (I want to be able to do this):
>>> >>> from company.mylibrary import CustomContext
>>> >>> myContext = CustomContext(sc)
>>> >>> rdd = myContext.customTextFile("path")
>>>
>>> At the end of each code, I should be working with an ordinary
>>> RDD[String].
>>>
>>> I am trying to access my Scala class through sc._jvm as below, but not
>>> having any luck so far.
>>>
>>> My attempts:
>>> >>> a = sc._jvm.com.company.mylibrary.CustomContext
>>> >>> dir(a)
>>> ['']
>>>
>>> Example of what I want::
>>> >>> a = sc._jvm.PythonRDD
>>> >>> dir(a)
>>> ['anonfun$6', 'anonfun$8', 'collectAndServe',
>>> 'doubleRDDToDoubleRDDFunctions', 'getWorkerBroadcasts', 'hadoopFile',
>>> 'hadoopRDD', 'newAPIHadoopFile', 'newAPIHadoopRDD',
>>> 'numericRDDToDoubleRDDFunctions', 'rddToAsyncRDDActions',
>>> 'rddToOrderedRDDFunctions', 'rddToPairRDDFunctions',
>>> 'rddToPairRDDFunctions$default$4', 'rddToSequenceFileRDDFunctions',
>>> 'readBroadcastFromFile', 'readRDDFromFile', 'runJob',
>>> 'saveAsHadoopDataset', 'saveAsHadoopFile', 'saveAsNewAPIHadoopFile',
>>> 'saveAsSequenceFile', 'sequenceFile', 'serveIterator', 'valueOfPair',
>>> 'writeIteratorToStream', 'writeUTF']
>>

Call Scala API from PySpark

2016-06-30 Thread Pedro Rodriguez
Hi All,

I have written a Scala package which essentially wraps the SparkContext
around a custom class that adds some functionality specific to our internal
use case. I am trying to figure out the best way to call this from PySpark.

I would like to do this similarly to how Spark itself calls the JVM
SparkContext as in:
https://github.com/apache/spark/blob/v1.6.2/python/pyspark/context.py

My goal would be something like this:

Scala Code (this is done):
>>> import com.company.mylibrary.CustomContext
>>> val myContext = CustomContext(sc)
>>> val rdd: RDD[String] = myContext.customTextFile("path")

Python Code (I want to be able to do this):
>>> from company.mylibrary import CustomContext
>>> myContext = CustomContext(sc)
>>> rdd = myContext.customTextFile("path")

At the end of each code, I should be working with an ordinary RDD[String].

I am trying to access my Scala class through sc._jvm as below, but not
having any luck so far.

My attempts:
>>> a = sc._jvm.com.company.mylibrary.CustomContext
>>> dir(a)
['']

Example of what I want::
>>> a = sc._jvm.PythonRDD
>>> dir(a)
['anonfun$6', 'anonfun$8', 'collectAndServe',
'doubleRDDToDoubleRDDFunctions', 'getWorkerBroadcasts', 'hadoopFile',
'hadoopRDD', 'newAPIHadoopFile', 'newAPIHadoopRDD',
'numericRDDToDoubleRDDFunctions', 'rddToAsyncRDDActions',
'rddToOrderedRDDFunctions', 'rddToPairRDDFunctions',
'rddToPairRDDFunctions$default$4', 'rddToSequenceFileRDDFunctions',
'readBroadcastFromFile', 'readRDDFromFile', 'runJob',
'saveAsHadoopDataset', 'saveAsHadoopFile', 'saveAsNewAPIHadoopFile',
'saveAsSequenceFile', 'sequenceFile', 'serveIterator', 'valueOfPair',
'writeIteratorToStream', 'writeUTF']

The next thing I would run into is converting the JVM RDD[String] back to a
Python RDD, what is the easiest way to do this?

Overall, is this a good approach to calling the same API in Scala and
Python?

-- 
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni

ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
Github: github.com/EntilZha | LinkedIn:
https://www.linkedin.com/in/pedrorodriguezscience


Re: Dataset Select Function after Aggregate Error

2016-06-18 Thread Pedro Rodriguez
I am curious if there is a way to call this so that it becomes a compile
error rather than runtime error:

// Note mispelled count and name
ds.groupBy($"name").count.select('nam, $"coun").show

More specifically, what are the best type safety guarantees that Datasets
provide? It seems like with Dataframes there is still the unsafety of
specifying column names by string/symbol and expecting the type to be
correct and exist, but if you do something like this then downstream code
is safer:

// This is Array[(String, Long)] instead of Array[sql.Row]
ds.groupBy($"name").count.select('name.as[String], 'count.as
[Long]).collect()

Does that seem like a correct understanding of Datasets?

On Sat, Jun 18, 2016 at 6:39 AM, Pedro Rodriguez <ski.rodrig...@gmail.com>
wrote:

> Looks like it was my own fault. I had spark 2.0 cloned/built, but had the
> spark shell in my path so somehow 1.6.1 was being used instead of 2.0.
> Thanks
>
> On Sat, Jun 18, 2016 at 1:16 AM, Takeshi Yamamuro <linguin@gmail.com>
> wrote:
>
>> which version you use?
>> I passed in 2.0-preview as follows;
>> ---
>>
>> Spark context available as 'sc' (master = local[*], app id =
>> local-1466234043659).
>>
>> Spark session available as 'spark'.
>>
>> Welcome to
>>
>>     __
>>
>>  / __/__  ___ _/ /__
>>
>> _\ \/ _ \/ _ `/ __/  '_/
>>
>>/___/ .__/\_,_/_/ /_/\_\   version 2.0.0-preview
>>
>>   /_/
>>
>>
>>
>> Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java
>> 1.8.0_31)
>>
>> Type in expressions to have them evaluated.
>>
>> Type :help for more information.
>>
>>
>> scala> val ds = Seq[Tuple2[Int, Int]]((1, 0), (2, 0)).toDS
>>
>> hive.metastore.schema.verification is not enabled so recording the schema
>> version 1.2.0
>>
>> ds: org.apache.spark.sql.Dataset[(Int, Int)] = [_1: int, _2: int]
>>
>> scala> ds.groupBy($"_1").count.select($"_1", $"count").show
>>
>> +---+-+
>>
>> | _1|count|
>>
>> +---+-+
>>
>> |  1|1|
>>
>> |  2|1|
>>
>> +---+-+
>>
>>
>>
>> On Sat, Jun 18, 2016 at 3:09 PM, Pedro Rodriguez <ski.rodrig...@gmail.com
>> > wrote:
>>
>>> I went ahead and downloaded/compiled Spark 2.0 to try your code snippet
>>> Takeshi. It unfortunately doesn't compile.
>>>
>>> scala> val ds = Seq[Tuple2[Int, Int]]((1, 0), (2, 0)).toDS
>>> ds: org.apache.spark.sql.Dataset[(Int, Int)] = [_1: int, _2: int]
>>>
>>> scala> ds.groupBy($"_1").count.select($"_1", $"count").show
>>> :28: error: type mismatch;
>>>  found   : org.apache.spark.sql.ColumnName
>>>  required: org.apache.spark.sql.TypedColumn[(org.apache.spark.sql.Row,
>>> Long),?]
>>>   ds.groupBy($"_1").count.select($"_1", $"count").show
>>>  ^
>>>
>>> I also gave a try to Xinh's suggestion using the code snippet below
>>> (partially from spark docs)
>>> scala> val ds = Seq(Person("Andy", 32), Person("Andy", 2),
>>> Person("Pedro", 24), Person("Bob", 42)).toDS()
>>> scala> ds.groupBy(_.name).count.select($"name".as[String]).show
>>> org.apache.spark.sql.AnalysisException: cannot resolve 'name' given
>>> input columns: [];
>>> scala> ds.groupBy(_.name).count.select($"_1".as[String]).show
>>> org.apache.spark.sql.AnalysisException: cannot resolve 'name' given
>>> input columns: [];
>>> scala> ds.groupBy($"name").count.select($"_1".as[String]).show
>>> org.apache.spark.sql.AnalysisException: cannot resolve '_1' given input
>>> columns: [];
>>>
>>> Looks like there are empty columns for some reason, the code below works
>>> fine for the simple aggregate
>>> scala> ds.groupBy(_.name).count.show
>>>
>>> Would be great to see an idiomatic example of using aggregates like
>>> these mixed with spark.sql.functions.
>>>
>>> Pedro
>>>
>>> On Fri, Jun 17, 2016 at 9:59 PM, Pedro Rodriguez <
>>> ski.rodrig...@gmail.com> wrote:
>>>
>>>> Thanks Xinh and Takeshi,
>>>>
>>>> I am trying to avoid map since my impression is that this uses a Scala
>>>> clos

Re: Dataset Select Function after Aggregate Error

2016-06-18 Thread Pedro Rodriguez
Looks like it was my own fault. I had spark 2.0 cloned/built, but had the
spark shell in my path so somehow 1.6.1 was being used instead of 2.0.
Thanks

On Sat, Jun 18, 2016 at 1:16 AM, Takeshi Yamamuro <linguin@gmail.com>
wrote:

> which version you use?
> I passed in 2.0-preview as follows;
> ---
>
> Spark context available as 'sc' (master = local[*], app id =
> local-1466234043659).
>
> Spark session available as 'spark'.
>
> Welcome to
>
>     __
>
>  / __/__  ___ _/ /__
>
> _\ \/ _ \/ _ `/ __/  '_/
>
>/___/ .__/\_,_/_/ /_/\_\   version 2.0.0-preview
>
>   /_/
>
>
>
> Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java
> 1.8.0_31)
>
> Type in expressions to have them evaluated.
>
> Type :help for more information.
>
>
> scala> val ds = Seq[Tuple2[Int, Int]]((1, 0), (2, 0)).toDS
>
> hive.metastore.schema.verification is not enabled so recording the schema
> version 1.2.0
>
> ds: org.apache.spark.sql.Dataset[(Int, Int)] = [_1: int, _2: int]
>
> scala> ds.groupBy($"_1").count.select($"_1", $"count").show
>
> +---+-+
>
> | _1|count|
>
> +---+-+
>
> |  1|1|
>
> |  2|1|
>
> +---+-+
>
>
>
> On Sat, Jun 18, 2016 at 3:09 PM, Pedro Rodriguez <ski.rodrig...@gmail.com>
> wrote:
>
>> I went ahead and downloaded/compiled Spark 2.0 to try your code snippet
>> Takeshi. It unfortunately doesn't compile.
>>
>> scala> val ds = Seq[Tuple2[Int, Int]]((1, 0), (2, 0)).toDS
>> ds: org.apache.spark.sql.Dataset[(Int, Int)] = [_1: int, _2: int]
>>
>> scala> ds.groupBy($"_1").count.select($"_1", $"count").show
>> :28: error: type mismatch;
>>  found   : org.apache.spark.sql.ColumnName
>>  required: org.apache.spark.sql.TypedColumn[(org.apache.spark.sql.Row,
>> Long),?]
>>   ds.groupBy($"_1").count.select($"_1", $"count").show
>>  ^
>>
>> I also gave a try to Xinh's suggestion using the code snippet below
>> (partially from spark docs)
>> scala> val ds = Seq(Person("Andy", 32), Person("Andy", 2),
>> Person("Pedro", 24), Person("Bob", 42)).toDS()
>> scala> ds.groupBy(_.name).count.select($"name".as[String]).show
>> org.apache.spark.sql.AnalysisException: cannot resolve 'name' given input
>> columns: [];
>> scala> ds.groupBy(_.name).count.select($"_1".as[String]).show
>> org.apache.spark.sql.AnalysisException: cannot resolve 'name' given input
>> columns: [];
>> scala> ds.groupBy($"name").count.select($"_1".as[String]).show
>> org.apache.spark.sql.AnalysisException: cannot resolve '_1' given input
>> columns: [];
>>
>> Looks like there are empty columns for some reason, the code below works
>> fine for the simple aggregate
>> scala> ds.groupBy(_.name).count.show
>>
>> Would be great to see an idiomatic example of using aggregates like these
>> mixed with spark.sql.functions.
>>
>> Pedro
>>
>> On Fri, Jun 17, 2016 at 9:59 PM, Pedro Rodriguez <ski.rodrig...@gmail.com
>> > wrote:
>>
>>> Thanks Xinh and Takeshi,
>>>
>>> I am trying to avoid map since my impression is that this uses a Scala
>>> closure so is not optimized as well as doing column-wise operations is.
>>>
>>> Looks like the $ notation is the way to go, thanks for the help. Is
>>> there an explanation of how this works? I imagine it is a method/function
>>> with its name defined as $ in Scala?
>>>
>>> Lastly, are there prelim Spark 2.0 docs? If there isn't a good
>>> description/guide of using this syntax I would be willing to contribute
>>> some documentation.
>>>
>>> Pedro
>>>
>>> On Fri, Jun 17, 2016 at 8:53 PM, Takeshi Yamamuro <linguin@gmail.com
>>> > wrote:
>>>
>>>> Hi,
>>>>
>>>> In 2.0, you can say;
>>>> val ds = Seq[Tuple2[Int, Int]]((1, 0), (2, 0)).toDS
>>>> ds.groupBy($"_1").count.select($"_1", $"count").show
>>>>
>>>>
>>>> // maropu
>>>>
>>>>
>>>> On Sat, Jun 18, 2016 at 7:53 AM, Xinh Huynh <xinh.hu...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Pedro,
>>>>>
>>>>> In 1.6.1, you can do:
>&g

Re: Skew data

2016-06-17 Thread Pedro Rodriguez
I am going to take a guess that this means that your partitions within an
RDD are not balanced (one or more partitions are much larger than the
rest). This would mean a single core would need to do much more work than
the rest leading to poor performance. In general, the way to fix this is to
spread data across partitions evenly. In most cases calling repartition is
enough to solve the problem. If you have a special case you might need
create your own custom partitioner.

Pedro

On Thu, Jun 16, 2016 at 6:55 PM, Selvam Raman <sel...@gmail.com> wrote:

> Hi,
>
> What is skew data.
>
> I read that if the data was skewed while joining it would take long time
> to finish the job.(99 percent finished in seconds where 1 percent of task
> taking minutes to hour).
>
> How to handle skewed data in spark.
>
> Thanks,
> Selvam R
> +91-97877-87724
>



-- 
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni

ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
Github: github.com/EntilZha | LinkedIn:
https://www.linkedin.com/in/pedrorodriguezscience


Re: Dataset Select Function after Aggregate Error

2016-06-17 Thread Pedro Rodriguez
Thanks Xinh and Takeshi,

I am trying to avoid map since my impression is that this uses a Scala
closure so is not optimized as well as doing column-wise operations is.

Looks like the $ notation is the way to go, thanks for the help. Is there
an explanation of how this works? I imagine it is a method/function with
its name defined as $ in Scala?

Lastly, are there prelim Spark 2.0 docs? If there isn't a good
description/guide of using this syntax I would be willing to contribute
some documentation.

Pedro

On Fri, Jun 17, 2016 at 8:53 PM, Takeshi Yamamuro <linguin@gmail.com>
wrote:

> Hi,
>
> In 2.0, you can say;
> val ds = Seq[Tuple2[Int, Int]]((1, 0), (2, 0)).toDS
> ds.groupBy($"_1").count.select($"_1", $"count").show
>
>
> // maropu
>
>
> On Sat, Jun 18, 2016 at 7:53 AM, Xinh Huynh <xinh.hu...@gmail.com> wrote:
>
>> Hi Pedro,
>>
>> In 1.6.1, you can do:
>> >> ds.groupBy(_.uid).count().map(_._1)
>> or
>> >> ds.groupBy(_.uid).count().select($"value".as[String])
>>
>> It doesn't have the exact same syntax as for DataFrame.
>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Dataset
>>
>> It might be different in 2.0.
>>
>> Xinh
>>
>> On Fri, Jun 17, 2016 at 3:33 PM, Pedro Rodriguez <ski.rodrig...@gmail.com
>> > wrote:
>>
>>> Hi All,
>>>
>>> I am working on using Datasets in 1.6.1 and eventually 2.0 when its
>>> released.
>>>
>>> I am running the aggregate code below where I have a dataset where the
>>> row has a field uid:
>>>
>>> ds.groupBy(_.uid).count()
>>> // res0: org.apache.spark.sql.Dataset[(String, Long)] = [_1: string,
>>> _2: bigint]
>>>
>>> This works as expected, however, attempts to run select statements after
>>> fails:
>>> ds.groupBy(_.uid).count().select(_._1)
>>> // error: missing parameter type for expanded function ((x$2) => x$2._1)
>>> ds.groupBy(_.uid).count().select(_._1)
>>>
>>> I have tried several variants, but nothing seems to work. Below is the
>>> equivalent Dataframe code which works as expected:
>>> df.groupBy("uid").count().select("uid")
>>>
>>> Thanks!
>>> --
>>> Pedro Rodriguez
>>> PhD Student in Distributed Machine Learning | CU Boulder
>>> UC Berkeley AMPLab Alumni
>>>
>>> ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
>>> Github: github.com/EntilZha | LinkedIn:
>>> https://www.linkedin.com/in/pedrorodriguezscience
>>>
>>>
>>
>
>
> --
> ---
> Takeshi Yamamuro
>



-- 
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni

ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
Github: github.com/EntilZha | LinkedIn:
https://www.linkedin.com/in/pedrorodriguezscience


Dataset Select Function after Aggregate Error

2016-06-17 Thread Pedro Rodriguez
Hi All,

I am working on using Datasets in 1.6.1 and eventually 2.0 when its
released.

I am running the aggregate code below where I have a dataset where the row
has a field uid:

ds.groupBy(_.uid).count()
// res0: org.apache.spark.sql.Dataset[(String, Long)] = [_1: string, _2:
bigint]

This works as expected, however, attempts to run select statements after
fails:
ds.groupBy(_.uid).count().select(_._1)
// error: missing parameter type for expanded function ((x$2) => x$2._1)
ds.groupBy(_.uid).count().select(_._1)

I have tried several variants, but nothing seems to work. Below is the
equivalent Dataframe code which works as expected:
df.groupBy("uid").count().select("uid")

Thanks!
-- 
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni

ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
Github: github.com/EntilZha | LinkedIn:
https://www.linkedin.com/in/pedrorodriguezscience


Undestanding Spark Rebalancing

2016-01-14 Thread Pedro Rodriguez
Hi All,

I am running a Spark program where one of my parts is using Spark as a
scheduler rather than a data management framework. That is, my job can be
described as RDD[String] where the string describes an operation to perform
which may be cheap or expensive (process an object which may have a small
or large amount of records associated with it).

Leaving things to default, I have bad job balancing. I am wondering which
approach I should take:
1. Write a partitioner which uses partitionBy to ahead of time balance
partitions by number of records each string needs
2. repartition to have many small partitions (I have ~1700 strings acting
as jobs to run, so maybe 1-5 per partition). My question here is, does
Spark re-schedule/steal jobs if there are executors/worker processes that
aren't doing any work?

The second one would be easier and since I am not shuffling much data
around would work just fine for me, but I can't seem to find out for sure
if Spark does job re-scheduling/stealing.

Thanks
-- 
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni

ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
Github: github.com/EntilZha | LinkedIn:
https://www.linkedin.com/in/pedrorodriguezscience


Re: How to speed up MLlib LDA?

2015-09-22 Thread Pedro Rodriguez
I helped some with the LDA and worked quite a bit on a Gibbs version. I
don't know if the Gibbs version might help, but since it is not (yet) in
MLlib, Intel Analytics kindly created a spark package with their adapted
version plus a couple other LDA algorithms:
http://spark-packages.org/package/intel-analytics/TopicModeling
https://github.com/intel-analytics/TopicModeling

It might be worth trying out. Do you know what LDA algorithm VW uses?

Pedro


On Tue, Sep 22, 2015 at 1:54 AM, Marko Asplund <marko.aspl...@gmail.com>
wrote:

> Hi,
>
> I did some profiling for my LDA prototype code that requests topic
> distributions from a model.
> According to Java Mission Control more than 80 % of execution time during
> sample interval is spent in the following methods:
>
> org.apache.commons.math3.util.FastMath.log(double); count: 337; 47.07%
> org.apache.commons.math3.special.Gamma.digamma(double); count: 164; 22.91%
> org.apache.commons.math3.util.FastMath.log(double, double[]); count: 50;
> 6.98%
> java.lang.Double.valueOf(double); count: 31; 4.33%
>
> Is there any way of using the API more optimally?
> Are there any opportunities for optimising the "topicDistributions" code
> path in MLlib?
>
> My code looks like this:
>
> // executed once
> val model = LocalLDAModel.load(ctx, ModelFileName)
>
> // executed four times
> val samples = Transformers.toSparseVectors(vocabularySize,
> ctx.parallelize(Seq(input))) // fast
> model.topicDistributions(samples.zipWithIndex.map(_.swap)) // <== this
> seems to take about 4 seconds to execute
>
>
> marko
>



-- 
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni

ski.rodrig...@gmail.com | pedrorodriguez.io | 208-340-1703
Github: github.com/EntilZha | LinkedIn:
https://www.linkedin.com/in/pedrorodriguezscience


Re: Re: How can I know currently supported functions in Spark SQL

2015-08-06 Thread Pedro Rodriguez
Worth noting that Spark 1.5 is extending that list of Spark SQL functions
quite a bit. Not sure where in the docs they would be yet, but the JIRA is
here: https://issues.apache.org/jira/browse/SPARK-8159

On Thu, Aug 6, 2015 at 7:27 PM, Netwaver wanglong_...@163.com wrote:

 Thanks for your kindly help






 At 2015-08-06 19:28:10, Todd Nist tsind...@gmail.com wrote:

 They are covered here in the docs:


 http://spark.apache.org/docs/1.4.1/api/scala/index.html#org.apache.spark.sql.functions$


 On Thu, Aug 6, 2015 at 5:52 AM, Netwaver wanglong_...@163.com wrote:

 Hi All,
  I am using Spark 1.4.1, and I want to know how can I find the
 complete function list supported in Spark SQL, currently I only know
 'sum','count','min','max'. Thanks a lot.








-- 
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni

ski.rodrig...@gmail.com | pedrorodriguez.io | 208-340-1703
Github: github.com/EntilZha | LinkedIn:
https://www.linkedin.com/in/pedrorodriguezscience


Re: Spark Interview Questions

2015-07-29 Thread Pedro Rodriguez
You might look at the edx course on Apache Spark or ML with Spark. There
are probably some homework problems or quiz questions that might be
relevant. I haven't looked at the course myself, but thats where I would go
first.

https://www.edx.org/course/introduction-big-data-apache-spark-uc-berkeleyx-cs100-1x
https://www.edx.org/course/scalable-machine-learning-uc-berkeleyx-cs190-1x

--
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni

ski.rodrig...@gmail.com | pedrorodriguez.io | 208-340-1703
Github: github.com/EntilZha | LinkedIn:
https://www.linkedin.com/in/pedrorodriguezscience


Re: Spark SQL Table Caching

2015-07-22 Thread Pedro Rodriguez
I would be interested in the answer to this question, plus the relationship
between those and registerTempTable()

Pedro

On Tue, Jul 21, 2015 at 1:59 PM, Brandon White bwwintheho...@gmail.com
wrote:

 A few questions about caching a table in Spark SQL.

 1) Is there any difference between caching the dataframe and the table?

 df.cache() vs sqlContext.cacheTable(tableName)

 2) Do you need to warm up the cache before seeing the performance
 benefits? Is the cache LRU? Do you need to run some queries on the table
 before it is cached in memory?

 3) Is caching the table much faster than .saveAsTable? I am only seeing a
 10 %- 20% performance increase.




-- 
Pedro Rodriguez
UCBerkeley 2014 | Computer Science
SnowGeek http://SnowGeek.org
pedro-rodriguez.com
ski.rodrig...@gmail.com
208-340-1703


Re: Check for null in PySpark DataFrame

2015-07-02 Thread Pedro Rodriguez
Thanks for the tip. Any idea why the intuitive answer doesn't work ( !=
None)? I inspected the Row columns and they do indeed have a None value. I
would suspect that somehow Python's None is translated to something in jvm
which doesn't equal to null?

I might check out the source code for a better idea as well

Pedro

On Wed, Jul 1, 2015 at 12:18 PM, Michael Armbrust mich...@databricks.com
wrote:

 There is an isNotNull function on any column.

 df._1.isNotNull

 or

 from pyspark.sql.functions import *
 col(myColumn).isNotNull

 On Wed, Jul 1, 2015 at 3:07 AM, Olivier Girardot ssab...@gmail.com
 wrote:

 I must admit I've been using the same back to SQL strategy for now :p
 So I'd be glad to have insights into that too.

 Le mar. 30 juin 2015 à 23:28, pedro ski.rodrig...@gmail.com a écrit :

 I am trying to find what is the correct way to programmatically check for
 null values for rows in a dataframe. For example, below is the code using
 pyspark and sql:

 df = sqlContext.createDataFrame(sc.parallelize([(1, None), (2, a), (3,
 b), (4, None)]))
 df.where('_2 is not null').count()

 However, this won't work
 df.where(df._2 != None).count()

 It seems there is no native Python way with DataFrames to do this, but I
 find that difficult to believe and more likely that I am missing the
 right
 way to do this.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Check-for-null-in-PySpark-DataFrame-tp23553.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





-- 
Pedro Rodriguez
UCBerkeley 2014 | Computer Science
SnowGeek http://SnowGeek.org
pedro-rodriguez.com
ski.rodrig...@gmail.com
208-340-1703