0.n3.nabble.com/Check-for-null-in-PySpark-DataFrame-tp23553.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>
--
Pedro Rodriguez
UCBerkeley 2014 | Computer Science
SnowGeek <http://SnowGeek.org>
pedro-rodriguez.com
ski.rodrig...@gmail.com
208-340-1703
the
equivalent Dataframe code which works as expected:
df.groupBy("uid").count().select("uid")
Thanks!
--
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni
ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
Github: github.
t; In 1.6.1, you can do:
>> >> ds.groupBy(_.uid).count().map(_._1)
>> or
>> >> ds.groupBy(_.uid).count().select($"value".as[String])
>>
>> It doesn't have the exact same syntax as for DataFrame.
>> http://spark.apache.org/docs/latest
hat if the data was skewed while joining it would take long time
> to finish the job.(99 percent finished in seconds where 1 percent of task
> taking minutes to hour).
>
> How to handle skewed data in spark.
>
> Thanks,
> Selvam R
> +91-97877-87724
>
--
Pedro Rodriguez
P
e below works
fine for the simple aggregate
scala> ds.groupBy(_.name).count.show
Would be great to see an idiomatic example of using aggregates like these
mixed with spark.sql.functions.
Pedro
On Fri, Jun 17, 2016 at 9:59 PM, Pedro Rodriguez
wrote:
> Thanks Xinh and Takeshi,
>
> I am t
($"_1", $"count").show
>
> +---+-+
>
> | _1|count|
>
> +---+-+
>
> | 1|1|
>
> | 2|1|
>
> +---+-+
>
>
>
> On Sat, Jun 18, 2016 at 3:09 PM, Pedro Rodriguez
> wrote:
>
>> I went ahead and download
t;name").count.select('name.as[String], 'count.as
[Long]).collect()
Does that seem like a correct understanding of Datasets?
On Sat, Jun 18, 2016 at 6:39 AM, Pedro Rodriguez
wrote:
> Looks like it was my own fault. I had spark 2.0 cloned/built, but had the
> spark shell in my
'saveAsHadoopDataset', 'saveAsHadoopFile', 'saveAsNewAPIHadoopFile',
'saveAsSequenceFile', 'sequenceFile', 'serveIterator', 'valueOfPair',
'writeIteratorToStream', 'writeUTF']
The next thing I would
e', 'newAPIHadoopRDD',
>>> 'numericRDDToDoubleRDDFunctions', 'rddToAsyncRDDActions',
>>> 'rddToOrderedRDDFunctions', 'rddToPairRDDFunctions',
>>> 'rddToPairRDDFunctions$default$4', 'rddToSequ
https://github.com/apache/spark/blob/v1.6.2/python/pyspark/rdd.py#L182:
_pickle.UnpicklingError: A load persistent id instruction was encountered,
but no persistent_load function was specified.
On Thu, Jun 30, 2016 at 2:13 PM, Pedro Rodriguez
wrote:
> Thanks Jeff and Holden,
>
> A little mor
That was indeed the case, using UTF8Deserializer makes everything work
correctly.
Thanks for the tips!
On Thu, Jun 30, 2016 at 3:32 PM, Pedro Rodriguez
wrote:
> Quick update, I was able to get most of the plumbing to work thanks to the
> code Holden posted and browsing more source code.
could find were some Hadoop metrics. Is there a way to simply report
the number of bytes a partition read so Spark can put it on the UI?
Thanks,
—
Pedro Rodriguez
PhD Student in Large-Scale Machine Learning | CU Boulder
Systems Oriented Data Scientist
UC Berkeley AMPLab Alumni
pedrorodriguez.io
/9e632f2a71fba2858df748ed43f0dbb5dae52a83/src/main/scala/io/entilzha/spark/s3/S3RDD.scala#L100-L105
Reflection code:
https://github.com/EntilZha/spark-s3/blob/9e632f2a71fba2858df748ed43f0dbb5dae52a83/src/main/scala/io/entilzha/spark/s3/PrivateMethodUtil.scala
Thanks,
—
Pedro Rodriguez
PhD Student in Large-Scale Machine
Is there a way to on a GroupedData (from groupBy in DataFrame) to have an
aggregate that returns column A based on a min of column B? For example, I
have a list of sites visited by a given user and I would like to find the
event with the minimum time (first event)
Thanks,
--
Pedro Rodriguez
PhD
spark sql types are allowed?
—
Pedro Rodriguez
PhD Student in Large-Scale Machine Learning | CU Boulder
Systems Oriented Data Scientist
UC Berkeley AMPLab Alumni
pedrorodriguez.io | 909-353-4423
github.com/EntilZha | LinkedIn
On July 8, 2016 at 6:06:32 PM, Xinh Huynh (xinh.hu...@gmail.com) wrote
input at runtime?
Thanks,
—
Pedro Rodriguez
PhD Student in Large-Scale Machine Learning | CU Boulder
Systems Oriented Data Scientist
UC Berkeley AMPLab Alumni
pedrorodriguez.io | 909-353-4423
github.com/EntilZha | LinkedIn
On July 9, 2016 at 1:33:18 AM, Pedro Rodriguez (ski.rodrig...@gmail.com
It would be helpful if you included relevant configuration files from each or
if you are using the defaults, particularly any changes to class paths.
I worked through Zeppelin to 0.6.0 at work and at home without any issue so
hard to say more without having more details.
—
Pedro Rodriguez
PhD
Thanks Michael,
That seems like the analog to sorting tuples. I am curious, is there a
significant performance penalty to the UDAF versus that? Its certainly nicer
and more compact code at least.
—
Pedro Rodriguez
PhD Student in Large-Scale Machine Learning | CU Boulder
Systems Oriented Data
buffer(0)
> }
> }
>
>
> I don't quit understand why I get empty result from my UDAF, I guess there
> may be 2 reason:
> 1. error initialization with "" in code of define initialize method
> 2. the buffer didn't write successfully.
>
> can anyone
id).agg(merge_sets('words)) -> list of distinct words
Thanks,
--
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni
ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
Github: github.com/EntilZha | LinkedIn:
https://www.linkedin.com/in/pedrorodriguezscience
>> I would want to do something like
>>
>> df.groupBy('id).agg(merge_arrays('words)) -> list of all words
>> df.groupBy('id).agg(merge_sets('words)) -> list of distinct words
>>
>> Thanks,
>> --
>> Pedro Rodriguez
>&g
. It would also be useful to get
programmatic access to the size of the RDD in memory if it is cached.
Thanks,
--
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni
ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
Github: github.co
estimate the
total size. Thanks for the idea.
—
Pedro Rodriguez
PhD Student in Large-Scale Machine Learning | CU Boulder
Systems Oriented Data Scientist
UC Berkeley AMPLab Alumni
pedrorodriguez.io | 909-353-4423
github.com/EntilZha | LinkedIn
On July 12, 2016 at 7:26:17 PM, Hatim Diab (timd
>
>
> Regards,
> Gourav Sengupta
>
> On Wed, Jul 13, 2016 at 5:39 AM, Pedro Rodriguez
> wrote:
>
>> The primary goal for balancing partitions would be for the write to S3.
>> We would like to prevent unbalanced partitions (can do with repartition),
>>
auth.HTTPBasicAuth(api_key, api_secret))
>
> # print json.dumps(data.json(), indent=4)
>
> return data
>
>
> when I print the json dump of the data i see it returning results from the
> rest call. But the count never stops.
>
>
> Is there an efficie
Out of curiosity, is there a way to pull all the data back to the driver to
save without collect()? That is, stream the data in chunks back to the driver
so that maximum memory used comparable to a single node’s data, but all the
data is saved on one node.
—
Pedro Rodriguez
PhD Student in
You could call map on an RDD which has “many” partitions, then call
repartition/coalesce to drastically reduce the number of partitions so that
your second map job has less things running.
—
Pedro Rodriguez
PhD Student in Large-Scale Machine Learning | CU Boulder
Systems Oriented Data Scientist
new job where the cpu/memory ratio is
more favorable which reads from the prior job’s output. I am guessing this
heavily depends on how expensive reloading the data set from disk/network is.
Hopefully one of these helps,
—
Pedro Rodriguez
PhD Student in Large-Scale Machine Learning | CU Boul
insufficient.
—
Pedro Rodriguez
PhD Student in Large-Scale Machine Learning | CU Boulder
Systems Oriented Data Scientist
UC Berkeley AMPLab Alumni
pedrorodriguez.io | 909-353-4423
github.com/EntilZha | LinkedIn
On July 22, 2016 at 3:04:48 AM, Marco Colombo (ing.marco.colo...@gmail.com)
wrote:
Take a
This should work and I don't think triggers any actions:
df.rdd.partitions.length
On Fri, Jul 22, 2016 at 2:20 PM, Neil Chang wrote:
> Seems no function does this in Spark 2.0 preview?
>
--
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AM
siness.javaRDD()
>>
>> .zipWithIndex();
>>
>> In later part of the code I need to change a datastructure and update
>> name with index value generated above .
>> I am unable to figure out how to do a look up here..
>>
>> Please suggest /.
>>
>&g
load the notebooks, cannot export them and certainly cannot sync them
> back to Github, without mind numbing and sometimes irritating hacks. Have
> those issues been resolved?
>
>
> Regards,
> Gourav
>
>
> On Fri, Jul 22, 2016 at 2:22 PM, Pedro Rodriguez
> wrote:
>
use sparkR dapply on SparkDataFrame, don't we need partition the
> DataFrame first? the example in doc doesn't seem to do this.
> Without knowing how it partitioned, how can one write the function to
> process each partition?
>
> Neil
>
> On Fri, Jul 22, 2016 at 5:56 PM, Pedro R
");
> >>
> >> Now I need to map each name with a unique index and I did the following
> >> JavaPairRDD indexedBId = business.javaRDD()
> >>
> .zipWithIndex();
> >>
> >> In later part of the code I need to change a datastructure and update
&g
security group which allows all traffic
to/from itself to itself. If you are using something like ufw on ubuntu then
you probably need to know the ip addresses of the worker nodes beforehand.
—
Pedro Rodriguez
PhD Student in Large-Scale Machine Learning | CU Boulder
Systems Oriented Data Scientist
your setup that might affect
networking.
—
Pedro Rodriguez
PhD Student in Large-Scale Machine Learning | CU Boulder
Systems Oriented Data Scientist
UC Berkeley AMPLab Alumni
pedrorodriguez.io | 909-353-4423
github.com/EntilZha | LinkedIn
On July 23, 2016 at 9:10:31 AM, VG (vlin...@gmail.com) wrote
each or 10 of 7 cores each. You can also kick up the
memory to use more of your cluster’s memory. Lastly, if you are running on EC2
make sure to configure spark.local.dir to write to something that is not an EBS
volume, preferably an attached SSD to something like an r3 machine.
—
Pedro Rodriguez
If you can use a dataframe then you could use rank + window function at the
expense of an extra sort. Do you have an example of zip with index not
working, that seems surprising.
On Jul 23, 2016 10:24 PM, "Andrew Ehrlich" wrote:
> It’s hard to do in a distributed system. Maybe try generating a me
illing to go fix it myself). Should I just
> create a ticket?
>
> Thank you,
>
> Bryan Jeffrey
>
>
--
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni
ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
Github: github.com/EntilZha | LinkedIn:
https://www.linkedin.com/in/pedrorodriguezscience
duplicated data
3. Preserve data for all other dates
I am guessing that overwrite would not work here or if it does its not
guaranteed to stay that way, but am not sure. If thats the case, is there a
good/robust way to get this behavior?
--
Pedro Rodriguez
PhD Student in Distributed Machine
uld like to
> overwrite. And swap that with existing partition that you would want to
> wipe the data away. Swapping can be done by simple rename of the partition
> and just repair the table to pick up the new partition.
>
> Am not sure if that addresses your scenario.
>
>
df would not
>> have any data -> can’t call foreach.
>> Call collect execute the process -> get data -> foreach is ok.
>>
>>
>> On Jul 26, 2016, at 2:30 PM, kevin wrote:
>>
>> blacklistDF.collect()
>>
>>
>>
>
--
Pedro Rod
ml#org.apache.spark.sql.Row
(1.6 docs, but should be similar in 2.0)
On Tue, Jul 26, 2016 at 7:08 AM, Gourav Sengupta
wrote:
> And Pedro has made sense of a world running amok, scared, and drunken
> stupor.
>
> Regards,
> Gourav
>
> On Tue, Jul 26, 2016 at 2:01 PM,
dynamically picking the number depending of the file size wanted? (around
> 256mb would be perfect)
>
>
>
> I am running spark 1.6 on CDH using yarn, the files are written in parquet
> format.
>
>
>
> Thanks
>
>
>
--
Pedro Rodriguez
PhD Student in Distri
considering copying the files from s3 to a normal file system on one
> of my workers and then concatenating the files into a few much large files,
> then using ‘hadoop fs put’ to move them to hdfs. Do you think this would
> improve the spark count() performance issue?
>
> Does anyone kn
ion of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
--
Pedro Rodriguez
PhD Student in
would work just fine for me, but I can't seem to find out for sure
if Spark does job re-scheduling/stealing.
Thanks
--
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni
ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
Github:
3) Is caching the table much faster than .saveAsTable? I am only seeing a
> 10 %- 20% performance increase.
>
--
Pedro Rodriguez
UCBerkeley 2014 | Computer Science
SnowGeek <http://SnowGeek.org>
pedro-rodriguez.com
ski.rodrig...@gmail.com
208-340-1703
rk-uc-berkeleyx-cs100-1x
https://www.edx.org/course/scalable-machine-learning-uc-berkeleyx-cs190-1x
--
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni
ski.rodrig...@gmail.com | pedrorodriguez.io | 208-340-1703
Github: github.com/EntilZha | LinkedIn:
> Hi All,
>> I am using Spark 1.4.1, and I want to know how can I find the
>> complete function list supported in Spark SQL, currently I only know
>> 'sum','count','min','max'. Thanks a lot.
>>
>>
>>
>
>
&g
/ fast
> model.topicDistributions(samples.zipWithIndex.map(_.swap)) // <== this
> seems to take about 4 seconds to execute
>
>
> marko
>
--
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni
ski.rodrig...@gmail.com | pedrorodriguez.io | 208-340-1703
Github: github.com/EntilZha | LinkedIn:
https://www.linkedin.com/in/pedrorodriguezscience
51 matches
Mail list logo