Re: Guys is this some form of Spam or someone has left his auto-reply loose LOL
Same here, but maybe this is a really urgent matter we need to contact him about... or just make a filter On Thu, Jul 28, 2016 at 7:59 AM, Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > > -- Forwarded message -- > From: Geert Van Landeghem [Napoleon Games NV] < > g.vanlandeg...@napoleongames.be> > Date: 28 July 2016 at 14:38 > Subject: Re: Re: Is spark-1.6.1-bin-2.6.0 compatible with > hive-1.1.0-cdh5.7.1 > To: Mich Talebzadeh <mich.talebza...@gmail.com> > > > Hello, > > I am enjoying holidays untill the end of august, for urgent matters > contact the BI department on 702 or 703 or send an email to > b...@napoleongames.be. > > For really urgent matters contact me on my mobile phone: +32 477 75 95 33. > > kind regards > Geert > > > Dr Mich Talebzadeh > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > > http://talebzadehmich.wordpress.com > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > -- Pedro Rodriguez PhD Student in Distributed Machine Learning | CU Boulder UC Berkeley AMPLab Alumni ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423 Github: github.com/EntilZha | LinkedIn: https://www.linkedin.com/in/pedrorodriguezscience
Re: performance problem when reading lots of small files created by spark streaming.
There are a few blog posts that detail one possible/likely issue for example: http://tech.kinja.com/how-not-to-pull-from-s3-using-apache-spark-1704509219 TLDR: The hadoop libraries spark uses assumes that its input comes from a file system (works with HDFS) however S3 is a key value store, not a file system. Somewhere along the line, this makes things very slow. Below I describe their approach and a library I am working on to solve this problem. (Much) Longer Version (with a shiny new library in development): So far in my reading of source code, Hadoop attempts to actually read from S3 which can be expensive particularly since it does so from a single driver core (different from listing files, actually reading them, I can find the source code and link it later if you would like). The concept explained above is to instead use the AWS sdk to list files then distribute the files names as a collection with sc.parallelize, then read them in parallel. I found this worked, but lacking in a few ways so I started this project: https://github.com/EntilZha/spark-s3 This takes that idea further by: 1. Rather than sc.parallelize, implement the RDD interface where each partition is defined by the files it needs to read (haven't gotten to DataFrames yet) 2. At the driver node, use the AWS SDK to list all the files with their size (listing is fast), then run the Least Processing Time Algorithm to sift the files into roughly balanced partitions by size 3. API: S3Context(sc).textFileByPrefix("bucket", "file1", "folder2").regularRDDOperationsHere or import implicits and do sc.s3.textFileByPrefix At present, I am battle testing and benchmarking it at my current job and results are promising with significant improvements to jobs dealing with many files especially many small files and to jobs whose input is unbalanced to start with. Jobs perform better because: 1) there isn't a long stall at the driver when hadoop decides how to split S3 files 2) the partitions end up nearly perfectly balanced because of LPT algorithm. Since I hadn't intended to advertise this quite yet the documentation is not super polished but exists here: http://spark-s3.entilzha.io/latest/api/#io.entilzha.spark.s3.S3Context I am completing the sonatype process for publishing artifacts on maven central (this should be done by tomorrow so referencing "io.entilzha:spark-s3_2.10:0.0.0" should work very soon). I would love to hear if this library solution works, otherwise I hope the blog post above is illuminating. Pedro On Wed, Jul 27, 2016 at 8:19 PM, Andy Davidson < a...@santacruzintegration.com> wrote: > I have a relatively small data set however it is split into many small > JSON files. Each file is between maybe 4K and 400K > This is probably a very common issue for anyone using spark streaming. My > streaming app works fine, how ever my batch application takes several hours > to run. > > All I am doing is calling count(). Currently I am trying to read the files > from s3. When I look at the app UI it looks like spark is blocked probably > on IO? Adding additional workers and memory does not improve performance. > > I am able to copy the files from s3 to a worker relatively quickly. So I > do not think s3 read time is the problem. > > In the past when I had similar data sets stored on HDFS I was able to use > coalesce() to reduce the number of partition from 200K to 30. This made a > big improvement in processing time. How ever when I read from s3 coalesce() > does not improve performance. > > I tried copying the files to a normal file system and then using ‘hadoop > fs put’ to copy the files to hdfs how ever this takes several hours and is > no where near completion. It appears hdfs does not deal with small files > well. > > I am considering copying the files from s3 to a normal file system on one > of my workers and then concatenating the files into a few much large files, > then using ‘hadoop fs put’ to move them to hdfs. Do you think this would > improve the spark count() performance issue? > > Does anyone know of heuristics for determining the number or size of the > concatenated files? > > Thanks in advance > > Andy > -- Pedro Rodriguez PhD Student in Distributed Machine Learning | CU Boulder UC Berkeley AMPLab Alumni ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423 Github: github.com/EntilZha | LinkedIn: https://www.linkedin.com/in/pedrorodriguezscience
Re: dynamic coalesce to pick file size
I asked something similar if you search for "Tools for Balancing Partitions By Size" (I couldn't find link on archives). Unfortunately there doesn't seem to be something good right now other than knowing your job statistics. I am planning on implementing the idea I explained in the last paragraph or so of the last email I sent in this library https://github.com/EntilZha/spark-s3 although it could be a while to make my way up to data frames (adds for now). On Tue, Jul 26, 2016 at 1:02 PM, Maurin Lenglart <mau...@cuberonlabs.com> wrote: > Hi, > > I am doing a Sql query that return a Dataframe. Then I am writing the > result of the query using “df.write”, but the result get written in a lot > of different small files (~100 of 200 ko). So now I am doing a > “.coalesce(2)” before the write. > > But the number “2” that I picked is static, is there have a way of > dynamically picking the number depending of the file size wanted? (around > 256mb would be perfect) > > > > I am running spark 1.6 on CDH using yarn, the files are written in parquet > format. > > > > Thanks > > > -- Pedro Rodriguez PhD Student in Distributed Machine Learning | CU Boulder UC Berkeley AMPLab Alumni ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423 Github: github.com/EntilZha | LinkedIn: https://www.linkedin.com/in/pedrorodriguezscience
Re: dataframe.foreach VS dataframe.collect().foreach
:) Just realized you didn't get your original question answered though: scala> import sqlContext.implicits._ import sqlContext.implicits._ scala> case class Person(age: Long, name: String) defined class Person scala> val df = Seq(Person(24, "pedro"), Person(22, "fritz")).toDF() df: org.apache.spark.sql.DataFrame = [age: bigint, name: string] scala> df.select("age") res2: org.apache.spark.sql.DataFrame = [age: bigint] scala> df.select("age").collect.map(_.getLong(0)) res3: Array[Long] = Array(24, 22) scala> import org.apache.spark.sql.Row import org.apache.spark.sql.Row scala> df.collect.flatMap { | case Row(age: Long, name: String) => Seq(Tuple1(age)) | case _ => Seq() | } res7: Array[(Long,)] = Array((24,), (22,)) These docs are helpful http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Row (1.6 docs, but should be similar in 2.0) On Tue, Jul 26, 2016 at 7:08 AM, Gourav Sengupta <gourav.sengu...@gmail.com> wrote: > And Pedro has made sense of a world running amok, scared, and drunken > stupor. > > Regards, > Gourav > > On Tue, Jul 26, 2016 at 2:01 PM, Pedro Rodriguez <ski.rodrig...@gmail.com> > wrote: > >> I am not 100% as I haven't tried this out, but there is a huge difference >> between the two. Both foreach and collect are actions irregardless of >> whether or not the data frame is empty. >> >> Doing a collect will bring all the results back to the driver, possibly >> forcing it to run out of memory. Foreach will apply your function to each >> element of the DataFrame, but will do so across the cluster. This behavior >> is useful for when you need to do something custom for each element >> (perhaps save to a db for which there is no driver or something custom like >> make an http request per element, careful here though due to overhead cost). >> >> In your example, I am going to assume that hrecords is something like a >> list buffer. The reason that will be empty is that each worker will get >> sent an empty list (its captured in the closure for foreach) and append to >> it. The instance of the list at the driver doesn't know about what happened >> at the workers so its empty. >> >> I don't know why Chanh's comment applies here since I am guessing the df >> is not empty. >> >> On Tue, Jul 26, 2016 at 1:53 AM, kevin <kiss.kevin...@gmail.com> wrote: >> >>> thank you Chanh >>> >>> 2016-07-26 15:34 GMT+08:00 Chanh Le <giaosu...@gmail.com>: >>> >>>> Hi Ken, >>>> >>>> *blacklistDF -> just DataFrame * >>>> Spark is lazy until you call something like* collect, take, write* it >>>> will execute the hold process *like you do map or filter before you >>>> collect*. >>>> That mean until you call collect spark* do nothing* so you df would >>>> not have any data -> can’t call foreach. >>>> Call collect execute the process -> get data -> foreach is ok. >>>> >>>> >>>> On Jul 26, 2016, at 2:30 PM, kevin <kiss.kevin...@gmail.com> wrote: >>>> >>>> blacklistDF.collect() >>>> >>>> >>>> >>> >> >> >> -- >> Pedro Rodriguez >> PhD Student in Distributed Machine Learning | CU Boulder >> UC Berkeley AMPLab Alumni >> >> ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423 >> Github: github.com/EntilZha | LinkedIn: >> https://www.linkedin.com/in/pedrorodriguezscience >> >> > -- Pedro Rodriguez PhD Student in Distributed Machine Learning | CU Boulder UC Berkeley AMPLab Alumni ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423 Github: github.com/EntilZha | LinkedIn: https://www.linkedin.com/in/pedrorodriguezscience
Re: dataframe.foreach VS dataframe.collect().foreach
I am not 100% as I haven't tried this out, but there is a huge difference between the two. Both foreach and collect are actions irregardless of whether or not the data frame is empty. Doing a collect will bring all the results back to the driver, possibly forcing it to run out of memory. Foreach will apply your function to each element of the DataFrame, but will do so across the cluster. This behavior is useful for when you need to do something custom for each element (perhaps save to a db for which there is no driver or something custom like make an http request per element, careful here though due to overhead cost). In your example, I am going to assume that hrecords is something like a list buffer. The reason that will be empty is that each worker will get sent an empty list (its captured in the closure for foreach) and append to it. The instance of the list at the driver doesn't know about what happened at the workers so its empty. I don't know why Chanh's comment applies here since I am guessing the df is not empty. On Tue, Jul 26, 2016 at 1:53 AM, kevin <kiss.kevin...@gmail.com> wrote: > thank you Chanh > > 2016-07-26 15:34 GMT+08:00 Chanh Le <giaosu...@gmail.com>: > >> Hi Ken, >> >> *blacklistDF -> just DataFrame * >> Spark is lazy until you call something like* collect, take, write* it >> will execute the hold process *like you do map or filter before you >> collect*. >> That mean until you call collect spark* do nothing* so you df would not >> have any data -> can’t call foreach. >> Call collect execute the process -> get data -> foreach is ok. >> >> >> On Jul 26, 2016, at 2:30 PM, kevin <kiss.kevin...@gmail.com> wrote: >> >> blacklistDF.collect() >> >> >> > -- Pedro Rodriguez PhD Student in Distributed Machine Learning | CU Boulder UC Berkeley AMPLab Alumni ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423 Github: github.com/EntilZha | LinkedIn: https://www.linkedin.com/in/pedrorodriguezscience
Re: Spark SQL overwrite/append for partitioned tables
Probably should have been more specific with the code we are using, which is something like val df = df.write.mode("append or overwrite here").partitionBy("date").saveAsTable("my_table") Unless there is something like what I described on the native API, I will probably take the approach of having a S3 API call to wipe out that partition before the job starts, but it would be nice to not have to incorporate another step in the job. Pedro On Mon, Jul 25, 2016 at 5:23 PM, RK Aduri <rkad...@collectivei.com> wrote: > You can have a temporary file to capture the data that you would like to > overwrite. And swap that with existing partition that you would want to > wipe the data away. Swapping can be done by simple rename of the partition > and just repair the table to pick up the new partition. > > Am not sure if that addresses your scenario. > > On Jul 25, 2016, at 4:18 PM, Pedro Rodriguez <ski.rodrig...@gmail.com> > wrote: > > What would be the best way to accomplish the following behavior: > > 1. There is a table which is partitioned by date > 2. Spark job runs on a particular date, we would like it to wipe out all > data for that date. This is to make the job idempotent and lets us rerun a > job if it failed without fear of duplicated data > 3. Preserve data for all other dates > > I am guessing that overwrite would not work here or if it does its not > guaranteed to stay that way, but am not sure. If thats the case, is there a > good/robust way to get this behavior? > > -- > Pedro Rodriguez > PhD Student in Distributed Machine Learning | CU Boulder > UC Berkeley AMPLab Alumni > > ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423 > Github: github.com/EntilZha | LinkedIn: > https://www.linkedin.com/in/pedrorodriguezscience > > > > Collective[i] dramatically improves sales and marketing performance using > technology, applications and a revolutionary network designed to provide > next generation analytics and decision-support directly to business users. > Our goal is to maximize human potential and minimize mistakes. In most > cases, the results are astounding. We cannot, however, stop emails from > sometimes being sent to the wrong person. If you are not the intended > recipient, please notify us by replying to this email's sender and deleting > it (and any attachments) permanently from your system. If you are, please > respect the confidentiality of this communication's contents. -- Pedro Rodriguez PhD Student in Distributed Machine Learning | CU Boulder UC Berkeley AMPLab Alumni ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423 Github: github.com/EntilZha | LinkedIn: https://www.linkedin.com/in/pedrorodriguezscience
Spark SQL overwrite/append for partitioned tables
What would be the best way to accomplish the following behavior: 1. There is a table which is partitioned by date 2. Spark job runs on a particular date, we would like it to wipe out all data for that date. This is to make the job idempotent and lets us rerun a job if it failed without fear of duplicated data 3. Preserve data for all other dates I am guessing that overwrite would not work here or if it does its not guaranteed to stay that way, but am not sure. If thats the case, is there a good/robust way to get this behavior? -- Pedro Rodriguez PhD Student in Distributed Machine Learning | CU Boulder UC Berkeley AMPLab Alumni ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423 Github: github.com/EntilZha | LinkedIn: https://www.linkedin.com/in/pedrorodriguezscience
Re: Spark 2.0
Spark 2.0 vote for RC5 passed last Friday night so it will probably be released early this week if I had to guess. On Mon, Jul 25, 2016 at 12:23 PM, Bryan Jeffrey <bryan.jeff...@gmail.com> wrote: > All, > > I had three questions: > > (1) Is there a timeline for stable Spark 2.0 release? I know the > 'preview' build is out there, but was curious what the timeline was for > full release. Jira seems to indicate that there should be a release 7/27. > > (2) For 'continuous' datasets there has been a lot of discussion. One > item that came up in tickets was the idea that 'count()' and other > functions do not apply to continuous datasets: > https://github.com/apache/spark/pull/12080. In this case what is the > intended procedure to calculate a streaming statistic based on an interval > (e.g. count the number of records in a 2 minute window every 2 minutes)? > > (3) In previous releases (1.6.1) the call to DStream / RDD repartition w/ > a number of partitions set to zero silently deletes data. I have looked in > Jira for a similar issue, but I do not see one. I would like to address > this (and would likely be willing to go fix it myself). Should I just > create a ticket? > > Thank you, > > Bryan Jeffrey > > -- Pedro Rodriguez PhD Student in Distributed Machine Learning | CU Boulder UC Berkeley AMPLab Alumni ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423 Github: github.com/EntilZha | LinkedIn: https://www.linkedin.com/in/pedrorodriguezscience
Re: How to generate a sequential key in rdd across executors
If you can use a dataframe then you could use rank + window function at the expense of an extra sort. Do you have an example of zip with index not working, that seems surprising. On Jul 23, 2016 10:24 PM, "Andrew Ehrlich"wrote: > It’s hard to do in a distributed system. Maybe try generating a meaningful > key using a timestamp + hashed unique key fields in the record? > > > On Jul 23, 2016, at 7:53 PM, yeshwanth kumar > wrote: > > > > Hi, > > > > i am doing bulk load to hbase using spark, > > in which i need to generate a sequential key for each record, > > the key should be sequential across all the executors. > > > > i tried zipwith index, didn't worked because zipwith index gives index > per executor not across all executors. > > > > looking for some suggestions. > > > > > > Thanks, > > -Yeshwanth > > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >
Re: Choosing RDD/DataFrame/DataSet and Cluster Tuning
Hi Jestin, Spark is smart about how it does joins. In this case, if df2 is sufficiently small it will do a broadcast join. Basically, rather than shuffle df1/df2 for a join, it broadcasts df2 to all workers and joins locally. Looks like you may already have known that though based on using the spark.sql.autoBroadcastJoinThreshold. Its hard to say why your job is slow without knowing more. For example, it could be a CPU intensive calculation or maybe you have imbalance over keys which would cause a straggler. Hard to know without knowing what some of the metrics from the Spark UI are like. 1. If you aren’t tied down by legacy code, Spark 2.0 has a nicer Dataset API and more improvements so I don’t see why not. Spark 2.0 RC5 vote passed last night so the official release will probably go out early next week 2. RDDs will make it worse. In the case of reduceByKey/groupByKey this is specific to RDDs, the DataFrame API doesn’t mirror that. You hear that because reduceByKey will run reduce locally at each node for each key, then reduce all those results to get the final result. groupByKey will shuffle all keys across the network which if you are just doing a reduce right after is wasteful. DataFrame’s have lots of optimizations as well 3. Shouldn’t need to explicitly call broadcast 4. Driver memory is important if your node needs to collect results back to it for some reason. One good example is in mllib/ml its common to collect parameters back to the driver to update a global model. For some algorithms (like LDA), the model can be quite large so it requires high driver memory. 5. Hard to know without more metrics from your job. That being said, your number of executor instances vs number of cores seems a bit high. I would try 5 instances of 15 cores each or 10 of 7 cores each. You can also kick up the memory to use more of your cluster’s memory. Lastly, if you are running on EC2 make sure to configure spark.local.dir to write to something that is not an EBS volume, preferably an attached SSD to something like an r3 machine. — Pedro Rodriguez PhD Student in Large-Scale Machine Learning | CU Boulder Systems Oriented Data Scientist UC Berkeley AMPLab Alumni pedrorodriguez.io | 909-353-4423 github.com/EntilZha | LinkedIn On July 23, 2016 at 9:31:21 AM, Jestin Ma (jestinwith.a...@gmail.com) wrote: Hello, Right now I'm using DataFrames to perform a df1.groupBy(key).count() on one DataFrame and join with another, df2. The first, df1, is very large (many gigabytes) compared to df2 (250 Mb). Right now I'm running this on a cluster of 5 nodes, 16 cores each, 90 GB RAM each. It is taking me about 1 hour and 40 minutes to perform the groupBy, count, and join, which seems very slow to me. Currently I have set the following in my spark-defaults.conf: spark.executor.instances 24 spark.executor.memory 10g spark.executor.cores 3 spark.driver.memory 5g spark.sql.autoBroadcastJoinThreshold 200Mb I have a couple of questions regarding tuning for performance as a beginner. Right now I'm running Spark 1.6.0. Would moving to Spark 2.0 DataSet (or even DataFrames) be better? What if I used RDDs instead? I know that reduceByKey is better than groupByKey, and DataFrames don't have that method. I think I can do a broadcast join and have set a threshold. Do I need to set it above my second DataFrame size? Do I need to explicitly call broadcast(df2)? What's the point of driver memory? Can anyone point out something wrong with my tuning numbers, or any additional parameters worth checking out? Thank you a lot! Sincerely, Jestin
Re: Error in collecting RDD as a Map - IOException in collectAsMap
Have you changed spark-env.sh or spark-defaults.conf from the default? It looks like spark is trying to address local workers based on a network address (eg 192.168……) instead of on localhost (localhost, 127.0.0.1, 0.0.0.0,…). Additionally, that network address doesn’t resolve correctly. You might also check /etc/hosts to make sure that you don’t have anything weird going on. Last thing to try perhaps is that are you running Spark within a VM and/or Docker? If networking isn’t setup correctly on those you may also run into trouble. What would be helpful is to know everything about your setup that might affect networking. — Pedro Rodriguez PhD Student in Large-Scale Machine Learning | CU Boulder Systems Oriented Data Scientist UC Berkeley AMPLab Alumni pedrorodriguez.io | 909-353-4423 github.com/EntilZha | LinkedIn On July 23, 2016 at 9:10:31 AM, VG (vlin...@gmail.com) wrote: Hi pedro, Apologies for not adding this earlier. This is running on a local cluster set up as follows. JavaSparkContext jsc = new JavaSparkContext("local[2]", "DR"); Any suggestions based on this ? The ports are not blocked by firewall. Regards, On Sat, Jul 23, 2016 at 8:35 PM, Pedro Rodriguez <ski.rodrig...@gmail.com> wrote: Make sure that you don’t have ports firewalled. You don’t really give much information to work from, but it looks like the master can’t access the worker nodes for some reason. If you give more information on the cluster, networking, etc, it would help. For example, on AWS you can create a security group which allows all traffic to/from itself to itself. If you are using something like ufw on ubuntu then you probably need to know the ip addresses of the worker nodes beforehand. — Pedro Rodriguez PhD Student in Large-Scale Machine Learning | CU Boulder Systems Oriented Data Scientist UC Berkeley AMPLab Alumni pedrorodriguez.io | 909-353-4423 github.com/EntilZha | LinkedIn On July 23, 2016 at 7:38:01 AM, VG (vlin...@gmail.com) wrote: Please suggest if I am doing something wrong or an alternative way of doing this. I have an RDD with two values as follows JavaPairRDD<String, Long> rdd When I execute rdd..collectAsMap() it always fails with IO exceptions. 16/07/23 19:03:58 ERROR RetryingBlockFetcher: Exception while beginning fetch of 1 outstanding blocks java.io.IOException: Failed to connect to /192.168.1.3:58179 at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:228) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:179) at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:96) at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) at org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120) at org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:105) at org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:92) at org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:546) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:76) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:57) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:57) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1793) at org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:56) at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.lang.Thread.run(Unknown Source) Caused by: java.net.ConnectException: Connection timed out: no further information: /192.168.1.3:58179 at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(Unknown Source) at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224) at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:289) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) ... 1 more 16/07/23 19:03:58 INFO RetryingBlockFetcher: Retrying fetch (1/3) for 1 outstanding blocks after 5000 ms
Re: Error in collecting RDD as a Map - IOException in collectAsMap
Make sure that you don’t have ports firewalled. You don’t really give much information to work from, but it looks like the master can’t access the worker nodes for some reason. If you give more information on the cluster, networking, etc, it would help. For example, on AWS you can create a security group which allows all traffic to/from itself to itself. If you are using something like ufw on ubuntu then you probably need to know the ip addresses of the worker nodes beforehand. — Pedro Rodriguez PhD Student in Large-Scale Machine Learning | CU Boulder Systems Oriented Data Scientist UC Berkeley AMPLab Alumni pedrorodriguez.io | 909-353-4423 github.com/EntilZha | LinkedIn On July 23, 2016 at 7:38:01 AM, VG (vlin...@gmail.com) wrote: Please suggest if I am doing something wrong or an alternative way of doing this. I have an RDD with two values as follows JavaPairRDD<String, Long> rdd When I execute rdd..collectAsMap() it always fails with IO exceptions. 16/07/23 19:03:58 ERROR RetryingBlockFetcher: Exception while beginning fetch of 1 outstanding blocks java.io.IOException: Failed to connect to /192.168.1.3:58179 at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:228) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:179) at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:96) at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) at org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120) at org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:105) at org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:92) at org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:546) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:76) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:57) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:57) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1793) at org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:56) at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.lang.Thread.run(Unknown Source) Caused by: java.net.ConnectException: Connection timed out: no further information: /192.168.1.3:58179 at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(Unknown Source) at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224) at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:289) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) ... 1 more 16/07/23 19:03:58 INFO RetryingBlockFetcher: Retrying fetch (1/3) for 1 outstanding blocks after 5000 ms
Re: Dataset , RDD zipWithIndex -- How to use as a map .
You could either do monotonically_increasing_id or use a window function and rank. The first is a simple spark SQL function, data bricks has a pretty helpful post for how to use window functions (in this case the whole data set is the window). On Fri, Jul 22, 2016 at 12:20 PM, Marco Mistroni <mmistr...@gmail.com> wrote: > Hi > So u u have a data frame, then use zipwindex and create a tuple > I m not sure if df API has something useful for zip w index. > But u can > - get a data frame > - convert it to rdd (there's a tordd ) > - do a zip with index > > That will give u a rdd with 3 fields... > I don't think you can update df columns > Hth > On 22 Jul 2016 5:19 pm, "VG" <vlin...@gmail.com> wrote: > > > > > > Hi All, > > > > Any suggestions for this > > > > Regards, > > VG > > > > On Fri, Jul 22, 2016 at 6:40 PM, VG <vlin...@gmail.com> wrote: > > >> > > >> Hi All, > >> > >> I am really confused how to proceed further. Please help. > >> > >> I have a dataset created as follows: > >> Dataset b = sqlContext.sql("SELECT bid, name FROM business"); > >> > >> Now I need to map each name with a unique index and I did the following > >> JavaPairRDD<Row, Long> indexedBId = business.javaRDD() > >> > .zipWithIndex(); > >> > >> In later part of the code I need to change a datastructure and update > name with index value generated above . > >> I am unable to figure out how to do a look up here.. > >> > >> Please suggest /. > >> > >> If there is a better way to do this please suggest that. > >> > >> Regards > >> VG > >> > > > -- Pedro Rodriguez PhD Student in Distributed Machine Learning | CU Boulder UC Berkeley AMPLab Alumni ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423 Github: github.com/EntilZha | LinkedIn: https://www.linkedin.com/in/pedrorodriguezscience
Re: How to get the number of partitions for a SparkDataFrame in Spark 2.0-preview?
I haven't used SparkR/R before, only Scala/Python APIs so I don't know for sure. I am guessing if things are in a DataFrame they were read either from some disk source (S3/HDFS/file/etc) or they were created from parallelize. If you are using the first, Spark will for the most part choose a reasonable number of partitions while for parallelize I think it depends on what your min parallelism is set to. In my brief google it looks like dapply is an analogue of mapPartitions. Usually the reason to use this is if your map operation has some expensive initialization function. For example, you need to open a connection to a database so its better to re-use that connection for one partition's elements than create it for each element. What are you trying to accomplish with dapply? On Fri, Jul 22, 2016 at 8:05 PM, Neil Chang <iam...@gmail.com> wrote: > Thanks Pedro, > so to use sparkR dapply on SparkDataFrame, don't we need partition the > DataFrame first? the example in doc doesn't seem to do this. > Without knowing how it partitioned, how can one write the function to > process each partition? > > Neil > > On Fri, Jul 22, 2016 at 5:56 PM, Pedro Rodriguez <ski.rodrig...@gmail.com> > wrote: > >> This should work and I don't think triggers any actions: >> >> df.rdd.partitions.length >> >> On Fri, Jul 22, 2016 at 2:20 PM, Neil Chang <iam...@gmail.com> wrote: >> >>> Seems no function does this in Spark 2.0 preview? >>> >> >> >> >> -- >> Pedro Rodriguez >> PhD Student in Distributed Machine Learning | CU Boulder >> UC Berkeley AMPLab Alumni >> >> ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423 >> Github: github.com/EntilZha | LinkedIn: >> https://www.linkedin.com/in/pedrorodriguezscience >> >> > -- Pedro Rodriguez PhD Student in Distributed Machine Learning | CU Boulder UC Berkeley AMPLab Alumni ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423 Github: github.com/EntilZha | LinkedIn: https://www.linkedin.com/in/pedrorodriguezscience
Re: spark and plot data
As of the most recent 0.6.0 release its partially alleviated, but still not great (compared to something like Jupyter). They can be "downloaded" but its only really meaningful in importing it back to Zeppelin. It would be great if they could be exported as HTML or PDF, but at present they can't be. I know they have some sort of git support, but it was never clear to me how it was suppose to be used since the docs are sparse on that. So far what works best for us is S3 storage, but you don't get the benefit of Github using that (history + commits etc). There are a couple other notebooks floating around, Apache Toree seems the most promising for portability since its based on jupyter https://github.com/apache/incubator-toree On Fri, Jul 22, 2016 at 3:53 PM, Gourav Sengupta <gourav.sengu...@gmail.com> wrote: > The biggest stumbling block to using Zeppelin has been that we cannot > download the notebooks, cannot export them and certainly cannot sync them > back to Github, without mind numbing and sometimes irritating hacks. Have > those issues been resolved? > > > Regards, > Gourav > > > On Fri, Jul 22, 2016 at 2:22 PM, Pedro Rodriguez <ski.rodrig...@gmail.com> > wrote: > >> Zeppelin works great. The other thing that we have done in notebooks >> (like Zeppelin or Databricks) which support multiple types of spark session >> is register Spark SQL temp tables in our scala code then escape hatch to >> python for plotting with seaborn/matplotlib when the built in plots are >> insufficient. >> >> — >> Pedro Rodriguez >> PhD Student in Large-Scale Machine Learning | CU Boulder >> Systems Oriented Data Scientist >> UC Berkeley AMPLab Alumni >> >> pedrorodriguez.io | 909-353-4423 >> github.com/EntilZha | LinkedIn >> <https://www.linkedin.com/in/pedrorodriguezscience> >> >> On July 22, 2016 at 3:04:48 AM, Marco Colombo ( >> ing.marco.colo...@gmail.com) wrote: >> >> Take a look at zeppelin >> >> http://zeppelin.apache.org >> >> Il giovedì 21 luglio 2016, Andy Davidson <a...@santacruzintegration.com> >> ha scritto: >> >>> Hi Pseudo >>> >>> Plotting, graphing, data visualization, report generation are common >>> needs in scientific and enterprise computing. >>> >>> Can you tell me more about your use case? What is it about the current >>> process / workflow do you think could be improved by pushing plotting (I >>> assume you mean plotting and graphing) into spark. >>> >>> >>> In my personal work all the graphing is done in the driver on summary >>> stats calculated using spark. So for me using standard python libs has not >>> been a problem. >>> >>> Andy >>> >>> From: pseudo oduesp <pseudo20...@gmail.com> >>> Date: Thursday, July 21, 2016 at 8:30 AM >>> To: "user @spark" <user@spark.apache.org> >>> Subject: spark and plot data >>> >>> Hi , >>> i know spark it s engine to compute large data set but for me i work >>> with pyspark and it s very wonderful machine >>> >>> my question we don't have tools for ploting data each time we have to >>> switch and go back to python for using plot. >>> but when you have large result scatter plot or roc curve you cant use >>> collect to take data . >>> >>> somone have propostion for plot . >>> >>> thanks >>> >>> >> >> -- >> Ing. Marco Colombo >> >> > -- Pedro Rodriguez PhD Student in Distributed Machine Learning | CU Boulder UC Berkeley AMPLab Alumni ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423 Github: github.com/EntilZha | LinkedIn: https://www.linkedin.com/in/pedrorodriguezscience
Re: How to search on a Dataset / RDD <Row, Long >
You might look at monotonically_increasing_id() here http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.functions instead of converting it to an RDD. since you pay a performance penalty for that. If you want to change the name you can do something like this (in scala since I am not familiar with java API, but it should be similar in java) val df = sqlContext.sql("select bid, name from business").withColumn(monotonically_increasing_id().as("id") // some steps later on df.withColumn("name", $"id") I am not 100% what you mean by updating the data structure, I am guessing you mean replace the name column with the id column? Not, on the second line the withColumn call uses $"id" which in scala converts to a Column. In java maybe its something like new Column("id"), not sure. Pedro On Fri, Jul 22, 2016 at 12:21 PM, VG <vlin...@gmail.com> wrote: > Any suggestions here please > > I basically need an ability to look up *name -> index* and *index -> name* > in the code > > -VG > > On Fri, Jul 22, 2016 at 6:40 PM, VG <vlin...@gmail.com> wrote: > >> Hi All, >> >> I am really confused how to proceed further. Please help. >> >> I have a dataset created as follows: >> Dataset b = sqlContext.sql("SELECT bid, name FROM business"); >> >> Now I need to map each name with a unique index and I did the following >> JavaPairRDD<Row, Long> indexedBId = business.javaRDD() >> >> .zipWithIndex(); >> >> In later part of the code I need to change a datastructure and update >> name with index value generated above . >> I am unable to figure out how to do a look up here.. >> >> Please suggest /. >> >> If there is a better way to do this please suggest that. >> >> Regards >> VG >> >> > -- Pedro Rodriguez PhD Student in Distributed Machine Learning | CU Boulder UC Berkeley AMPLab Alumni ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423 Github: github.com/EntilZha | LinkedIn: https://www.linkedin.com/in/pedrorodriguezscience
Re: How to get the number of partitions for a SparkDataFrame in Spark 2.0-preview?
This should work and I don't think triggers any actions: df.rdd.partitions.length On Fri, Jul 22, 2016 at 2:20 PM, Neil Chang <iam...@gmail.com> wrote: > Seems no function does this in Spark 2.0 preview? > -- Pedro Rodriguez PhD Student in Distributed Machine Learning | CU Boulder UC Berkeley AMPLab Alumni ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423 Github: github.com/EntilZha | LinkedIn: https://www.linkedin.com/in/pedrorodriguezscience
Re: spark and plot data
Zeppelin works great. The other thing that we have done in notebooks (like Zeppelin or Databricks) which support multiple types of spark session is register Spark SQL temp tables in our scala code then escape hatch to python for plotting with seaborn/matplotlib when the built in plots are insufficient. — Pedro Rodriguez PhD Student in Large-Scale Machine Learning | CU Boulder Systems Oriented Data Scientist UC Berkeley AMPLab Alumni pedrorodriguez.io | 909-353-4423 github.com/EntilZha | LinkedIn On July 22, 2016 at 3:04:48 AM, Marco Colombo (ing.marco.colo...@gmail.com) wrote: Take a look at zeppelin http://zeppelin.apache.org Il giovedì 21 luglio 2016, Andy Davidson <a...@santacruzintegration.com> ha scritto: Hi Pseudo Plotting, graphing, data visualization, report generation are common needs in scientific and enterprise computing. Can you tell me more about your use case? What is it about the current process / workflow do you think could be improved by pushing plotting (I assume you mean plotting and graphing) into spark. In my personal work all the graphing is done in the driver on summary stats calculated using spark. So for me using standard python libs has not been a problem. Andy From: pseudo oduesp <pseudo20...@gmail.com> Date: Thursday, July 21, 2016 at 8:30 AM To: "user @spark" <user@spark.apache.org> Subject: spark and plot data Hi , i know spark it s engine to compute large data set but for me i work with pyspark and it s very wonderful machine my question we don't have tools for ploting data each time we have to switch and go back to python for using plot. but when you have large result scatter plot or roc curve you cant use collect to take data . somone have propostion for plot . thanks -- Ing. Marco Colombo
Re: How can we control CPU and Memory per Spark job operation..
Sorry, wasn’t very clear (looks like Pavan’s response was dropped from list for some reason as well). I am assuming that: 1) the first map is CPU bound 2) the second map is heavily memory bound To be specific, lets saw you are using 4 m3.2xlarge instances which have 8 CPUs and 30GB of ram each for a total of 32 cores and 120GB of ram. Since the NLP model can’t be distributed that means every worker/core must use 4GB of RAM. If the cluster is fully utilized that means that just for the NLP model you are consuming 32 * 4GB = 128GB of ram. The cluster at this point is out of memory just for the NLP model not considering the data set itself. My suggestion would be see if r3.8xlarge instances will work (or even X1s if you have access) since the cpu/memory fraction is better. Here is the “hack” I proposed in more detail (basically n partitions < total cores): 1) have the first map have a regular number of partitions, suppose 32 * 4 = 128 which is a reasonable starting place 2) repartition immediately after that map to 16 partitions. At this point, spark is not guaranteed to distributed you work evenly across the 4 nodes, but it probably will. The net result is that half the CPU cores are idle, but the NLP model is at worse using 16 * 4GB = 64GB of RAM. To be sure, this is a hack since the nodes being evenly distributed work is not guaranteed. If you wanted to do this as not a hack, you could perform the map, checkpoint your work, end the job, then submit a new job where the cpu/memory ratio is more favorable which reads from the prior job’s output. I am guessing this heavily depends on how expensive reloading the data set from disk/network is. Hopefully one of these helps, — Pedro Rodriguez PhD Student in Large-Scale Machine Learning | CU Boulder Systems Oriented Data Scientist UC Berkeley AMPLab Alumni pedrorodriguez.io | 909-353-4423 github.com/EntilZha | LinkedIn On July 17, 2016 at 6:16:41 AM, Jacek Laskowski (ja...@japila.pl) wrote: Hi, How would that help?! Why would you do that? Jacek On 17 Jul 2016 7:19 a.m., "Pedro Rodriguez" <ski.rodrig...@gmail.com> wrote: You could call map on an RDD which has “many” partitions, then call repartition/coalesce to drastically reduce the number of partitions so that your second map job has less things running. — Pedro Rodriguez PhD Student in Large-Scale Machine Learning | CU Boulder Systems Oriented Data Scientist UC Berkeley AMPLab Alumni pedrorodriguez.io | 909-353-4423 github.com/EntilZha | LinkedIn On July 16, 2016 at 4:46:04 PM, Jacek Laskowski (ja...@japila.pl) wrote: Hi, My understanding is that these two map functions will end up as a job with one stage (as if you wrote the two maps as a single map) so you really need as much vcores and memory as possible for map1 and map2. I initially thought about dynamic allocation of executors that may or may not help you with the case, but since there's just one stage I don't think you can do much. Pozdrawiam, Jacek Laskowski https://medium.com/@jaceklaskowski/ Mastering Apache Spark http://bit.ly/mastering-apache-spark Follow me at https://twitter.com/jaceklaskowski On Fri, Jul 15, 2016 at 9:54 PM, Pavan Achanta <pacha...@sysomos.com> wrote: > Hi All, > > Here is my use case: > > I have a pipeline job consisting of 2 map functions: > > CPU intensive map operation that does not require a lot of memory. > Memory intensive map operation that requires upto 4 GB of memory. And this > 4GB memory cannot be distributed since it is an NLP model. > > Ideally what I like to do is to use 20 nodes with 4 cores each and minimal > memory for first map operation and then use only 3 nodes with minimal CPU > but each having 4GB of memory for 2nd operation. > > While it is possible to control this parallelism for each map operation in > spark. I am not sure how to control the resources for each operation. > Obviously I don’t want to start off the job with 20 nodes with 4 cores and > 4GB memory since I cannot afford that much memory. > > We use Yarn with Spark. Any suggestions ? > > Thanks and regards, > Pavan > > - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: How can we control CPU and Memory per Spark job operation..
You could call map on an RDD which has “many” partitions, then call repartition/coalesce to drastically reduce the number of partitions so that your second map job has less things running. — Pedro Rodriguez PhD Student in Large-Scale Machine Learning | CU Boulder Systems Oriented Data Scientist UC Berkeley AMPLab Alumni pedrorodriguez.io | 909-353-4423 github.com/EntilZha | LinkedIn On July 16, 2016 at 4:46:04 PM, Jacek Laskowski (ja...@japila.pl) wrote: Hi, My understanding is that these two map functions will end up as a job with one stage (as if you wrote the two maps as a single map) so you really need as much vcores and memory as possible for map1 and map2. I initially thought about dynamic allocation of executors that may or may not help you with the case, but since there's just one stage I don't think you can do much. Pozdrawiam, Jacek Laskowski https://medium.com/@jaceklaskowski/ Mastering Apache Spark http://bit.ly/mastering-apache-spark Follow me at https://twitter.com/jaceklaskowski On Fri, Jul 15, 2016 at 9:54 PM, Pavan Achanta <pacha...@sysomos.com> wrote: > Hi All, > > Here is my use case: > > I have a pipeline job consisting of 2 map functions: > > CPU intensive map operation that does not require a lot of memory. > Memory intensive map operation that requires upto 4 GB of memory. And this > 4GB memory cannot be distributed since it is an NLP model. > > Ideally what I like to do is to use 20 nodes with 4 cores each and minimal > memory for first map operation and then use only 3 nodes with minimal CPU > but each having 4GB of memory for 2nd operation. > > While it is possible to control this parallelism for each map operation in > spark. I am not sure how to control the resources for each operation. > Obviously I don’t want to start off the job with 20 nodes with 4 cores and > 4GB memory since I cannot afford that much memory. > > We use Yarn with Spark. Any suggestions ? > > Thanks and regards, > Pavan > > - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Saving data frames on Spark Master/Driver
Out of curiosity, is there a way to pull all the data back to the driver to save without collect()? That is, stream the data in chunks back to the driver so that maximum memory used comparable to a single node’s data, but all the data is saved on one node. — Pedro Rodriguez PhD Student in Large-Scale Machine Learning | CU Boulder Systems Oriented Data Scientist UC Berkeley AMPLab Alumni pedrorodriguez.io | 909-353-4423 github.com/EntilZha | LinkedIn On July 14, 2016 at 6:02:12 PM, Jacek Laskowski (ja...@japila.pl) wrote: Hi, Please re-consider your wish since it is going to move all the distributed dataset to the single machine of the driver and may lead to OOME. It's more pro to save your result to HDFS or S3 or any other distributed filesystem (that is accessible by the driver and executors). If you insist... Use collect() after select() and work with Array[T]. Pozdrawiam, Jacek Laskowski https://medium.com/@jaceklaskowski/ Mastering Apache Spark http://bit.ly/mastering-apache-spark Follow me at https://twitter.com/jaceklaskowski On Fri, Jul 15, 2016 at 12:15 AM, vr.n. nachiappan <nachiappan_...@yahoo.com.invalid> wrote: > Hello, > > I am using data frames to join two cassandra tables. > > Currently when i invoke save on data frames as shown below it is saving the > join results on executor nodes. > > joineddataframe.select(, > ...).format("com.databricks.spark.csv").option("header", > "true").save() > > I would like to persist the results of the join on Spark Master/Driver node. > Is it possible to save the results on Spark Master/Driver and how to do it. > > I appreciate your help. > > Nachi > - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Call http request from within Spark
Hi Amit, Have you tried running a subset of the IDs locally on a single thread? It would be useful to benchmark your getProfile function for a subset of the data then estimate how long the full data set would take then divide by number of spark executor cores. This should at least serve as a sanity check. If things are much slower than expected is it possible that the service has a rate limit per ip address that you are hitting? If requests is more efficient at batching requests together (I don't know much about its internal implementation and connection pools) you could do that with mapPartitions. This is useful when the initialization time of the function in the map call is expensive (eg uses a connection pool for a db or web) as it allows you to initialize that resource once per partition then reuse it for all the elements in the partition. Pedro On Thu, Jul 14, 2016 at 8:52 AM, Amit Dutta <amitkrdu...@outlook.com> wrote: > Hi All, > > > I have a requirement to call a rest service url for 300k customer ids. > > Things I have tried so far is > > > custid_rdd = sc.textFile('file:Users/zzz/CustomerID_SC/Inactive User > Hashed LCID List.csv') #getting all the customer ids and building adds > > profile_rdd = custid_rdd.map(lambda r: getProfile(r.split(',')[0])) > > profile_rdd.count() > > > #getprofile is the method to do the http call > > def getProfile(cust_id): > > api_key = 'txt' > > api_secret = 'yuyuy' > > profile_uri = 'https://profile.localytics.com/x1/customers/{}' > > customer_id = cust_id > > > if customer_id is not None: > > data = requests.get(profile_uri.format(customer_id), > auth=requests.auth.HTTPBasicAuth(api_key, api_secret)) > > # print json.dumps(data.json(), indent=4) > > return data > > > when I print the json dump of the data i see it returning results from the > rest call. But the count never stops. > > > Is there an efficient way of dealing this? Some post says we have to > define a batch size etc but don't know how. > > > Appreciate your help > > > Regards, > > Amit > > -- Pedro Rodriguez PhD Student in Distributed Machine Learning | CU Boulder UC Berkeley AMPLab Alumni ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423 Github: github.com/EntilZha | LinkedIn: https://www.linkedin.com/in/pedrorodriguezscience
Re: Tools for Balancing Partitions by Size
Hi Gourav, In our case, we process raw logs into parquet tables that downstream applications can use for other jobs. The desired outcome is that we only need to worry about unbalanced input data at the preprocess step so that downstream jobs can assume balanced input data. In our specific case, this works because although the raw log rows are of variable size, the rows in the Spark SQL table are of fixed size by parsing primitives or chopping arrays. Due to this, in our use case it makes sense to think in terms of balanced file size because it directly correlates to having a balanced number of rows/partition and thus balanced partitions. Given this setting, are there any specific issues you foresee? I agree that file size isn't a general solution, but in the setting I don't see a reason it should not work. Our overall goal is to avoid two problems when we write data to S3: - Large number of small files (Kbs) since this makes S3 listing take a long time - Small number of large files (GBs) since this makes reads not as efficient Thus far, we have done this on a per-application basis with repartition and a manually tuned number of partitions, but this is inconvenient. We are interested in seeing if there is a way to automatically infer the number of partitions we should use so that our files in S3 have a particular average size (without incurring too high an overhead cost). The solution that seems most promising right now is: - Define a custom write function which does two steps: - Write one partition to S3 and get files size and number of records - Use that to determine the number of partitions to repartition to, then write everything to S3 What seems unclear is how to compute the parent RDD (suppose its an RDD with wide dependencies like a join), get one partition for step (1), then not recompute anything to do step (2) without an explicit cache. This would make it so the additional overhead on the job is on writing one partition to S3 which seems like an acceptable level of overhead. Perhaps this could be accomplished by saying: RDD A computes the size of on partition, RDD B holds all partitions except for the one from A, the parents of A and B are the original parent RDD, RDD C has parents A and B and has the overall write balanced function. Thanks, Pedro On Wed, Jul 13, 2016 at 9:10 AM, Gourav Sengupta <gourav.sengu...@gmail.com> wrote: > Hi, > > Using file size is a very bad way of managing data provided you think that > volume, variety and veracity does not holds true. Actually its a very bad > way of thinking and designing data solutions, you are bound to hit bottle > necks, optimization issues, and manual interventions. > > I have found thinking about data in logical partitions helps overcome most > of the design problems that is mentioned above. > > You can either use reparition with shuffling or colasce with shuffle > turned off to manage loads. > > If you are using HIVE just let me know. > > > Regards, > Gourav Sengupta > > On Wed, Jul 13, 2016 at 5:39 AM, Pedro Rodriguez <ski.rodrig...@gmail.com> > wrote: > >> The primary goal for balancing partitions would be for the write to S3. >> We would like to prevent unbalanced partitions (can do with repartition), >> but also avoid partitions that are too small or too large. >> >> So for that case, getting the cache size would work Maropu if its roughly >> accurate, but for data ingest we aren’t caching, just writing straight >> through to S3. >> >> The idea for writing to disk and checking for the size is interesting >> Hatim. For certain jobs, it seems very doable to write a small percentage >> of the data to S3, check the file size through the AWS API, and use that to >> estimate the total size. Thanks for the idea. >> >> — >> Pedro Rodriguez >> PhD Student in Large-Scale Machine Learning | CU Boulder >> Systems Oriented Data Scientist >> UC Berkeley AMPLab Alumni >> >> pedrorodriguez.io | 909-353-4423 >> github.com/EntilZha | LinkedIn >> <https://www.linkedin.com/in/pedrorodriguezscience> >> >> On July 12, 2016 at 7:26:17 PM, Hatim Diab (timd...@gmail.com) wrote: >> >> Hi, >> >> Since the final size depends on data types and compression. I've had to >> first get a rough estimate of data, written to disk, then compute the >> number of partitions. >> >> partitions = int(ceil(size_data * conversion_ratio / block_size)) >> >> In my case block size 256mb, source txt & dest is snappy parquet, >> compression_ratio .6 >> >> df.repartition(partitions).write.parquet(output) >> >> Which yields files in the range of 230mb. >> >> Another way was to count and come up with an imperial formula. >> >
Re: Tools for Balancing Partitions by Size
The primary goal for balancing partitions would be for the write to S3. We would like to prevent unbalanced partitions (can do with repartition), but also avoid partitions that are too small or too large. So for that case, getting the cache size would work Maropu if its roughly accurate, but for data ingest we aren’t caching, just writing straight through to S3. The idea for writing to disk and checking for the size is interesting Hatim. For certain jobs, it seems very doable to write a small percentage of the data to S3, check the file size through the AWS API, and use that to estimate the total size. Thanks for the idea. — Pedro Rodriguez PhD Student in Large-Scale Machine Learning | CU Boulder Systems Oriented Data Scientist UC Berkeley AMPLab Alumni pedrorodriguez.io | 909-353-4423 github.com/EntilZha | LinkedIn On July 12, 2016 at 7:26:17 PM, Hatim Diab (timd...@gmail.com) wrote: Hi, Since the final size depends on data types and compression. I've had to first get a rough estimate of data, written to disk, then compute the number of partitions. partitions = int(ceil(size_data * conversion_ratio / block_size)) In my case block size 256mb, source txt & dest is snappy parquet, compression_ratio .6 df.repartition(partitions).write.parquet(output) Which yields files in the range of 230mb. Another way was to count and come up with an imperial formula. Cheers, Hatim On Jul 12, 2016, at 9:07 PM, Takeshi Yamamuro <linguin@gmail.com> wrote: Hi, There is no simple way to access the size in a driver side. Since the partitions of primitive typed data (e.g., int) are compressed by `DataFrame#cache`, the actual size is possibly a little bit different from processing partitions size. // maropu On Wed, Jul 13, 2016 at 4:53 AM, Pedro Rodriguez <ski.rodrig...@gmail.com> wrote: Hi, Are there any tools for partitioning RDD/DataFrames by size at runtime? The idea would be to specify that I would like for each partition to be roughly X number of megabytes then write that through to S3. I haven't found anything off the shelf, and looking through stack overflow posts doesn't seem to yield anything concrete. Is there a way to programmatically get the size or a size estimate for an RDD/DataFrame at runtime (eg size of one record would be sufficient)? I gave SizeEstimator a try, but it seems like the results varied quite a bit (tried on whole RDD and a sample). It would also be useful to get programmatic access to the size of the RDD in memory if it is cached. Thanks, -- Pedro Rodriguez PhD Student in Distributed Machine Learning | CU Boulder UC Berkeley AMPLab Alumni ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423 Github: github.com/EntilZha | LinkedIn: https://www.linkedin.com/in/pedrorodriguezscience -- --- Takeshi Yamamuro
Tools for Balancing Partitions by Size
Hi, Are there any tools for partitioning RDD/DataFrames by size at runtime? The idea would be to specify that I would like for each partition to be roughly X number of megabytes then write that through to S3. I haven't found anything off the shelf, and looking through stack overflow posts doesn't seem to yield anything concrete. Is there a way to programmatically get the size or a size estimate for an RDD/DataFrame at runtime (eg size of one record would be sufficient)? I gave SizeEstimator a try, but it seems like the results varied quite a bit (tried on whole RDD and a sample). It would also be useful to get programmatic access to the size of the RDD in memory if it is cached. Thanks, -- Pedro Rodriguez PhD Student in Distributed Machine Learning | CU Boulder UC Berkeley AMPLab Alumni ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423 Github: github.com/EntilZha | LinkedIn: https://www.linkedin.com/in/pedrorodriguezscience
Spark SQL: Merge Arrays/Sets
Is it possible with Spark SQL to merge columns whose types are Arrays or Sets? My use case would be something like this: DF types id: String words: Array[String] I would want to do something like df.groupBy('id).agg(merge_arrays('words)) -> list of all words df.groupBy('id).agg(merge_sets('words)) -> list of distinct words Thanks, -- Pedro Rodriguez PhD Student in Distributed Machine Learning | CU Boulder UC Berkeley AMPLab Alumni ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423 Github: github.com/EntilZha | LinkedIn: https://www.linkedin.com/in/pedrorodriguezscience
Re: question about UDAF
I am not sure I understand your code entirely, but I worked with UDAFs Friday and over the weekend ( https://gist.github.com/EntilZha/3951769a011389fef25e930258c20a2a). I think what is going on is that your "update" function is not defined correctly. Update should take a possibly initialized or in progress buffer and integrate new results in. Right now, you ignore the input row. What is probably occurring is that the initialization value "" is setting the buffer equal to the buffer itself which is "". Merge is responsible for taking two buffers and merging them together. In this case, the buffers are "" since initialize makes it "" and update keeps it "" so the result is just "". I am not sure it matters, but you probably also want to do buffer.getString(0). Pedro On Mon, Jul 11, 2016 at 3:04 AM, <luohui20...@sina.com> wrote: > hello guys: > I have a DF and a UDAF. this DF has 2 columns, lp_location_id , id, > both are of Int type. I want to group by id and aggregate all value of id > into 1 string. So I used a UDAF to do this transformation: multi Int values > to 1 String. However my UDAF returns empty values as the accessory attached. > Here is my code for my main class: > val hc = new org.apache.spark.sql.hive.HiveContext(sc) > val hiveTable = hc.sql("select lp_location_id,id from > house_id_pv_location_top50") > > val jsonArray = new JsonArray > val result = > hiveTable.groupBy("lp_location_id").agg(jsonArray(col("id")).as("jsonArray")).collect.foreach(println) > > -- > Here is my code of my UDAF: > > class JsonArray extends UserDefinedAggregateFunction { > def inputSchema: org.apache.spark.sql.types.StructType = > StructType(StructField("id", IntegerType) :: Nil) > > def bufferSchema: StructType = StructType( > StructField("id", StringType) :: Nil) > > def dataType: DataType = StringType > > def deterministic: Boolean = true > > def initialize(buffer: MutableAggregationBuffer): Unit = { > buffer(0) = "" > } > > def update(buffer: MutableAggregationBuffer, input: Row): Unit = { > buffer(0) = buffer.getAs[Int](0) > } > > def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { > val s1 = buffer1.getAs[Int](0).toString() > val s2 = buffer2.getAs[Int](0).toString() > buffer1(0) = s1.concat(s2) > } > > def evaluate(buffer: Row): Any = { > buffer(0) > } > } > > > I don't quit understand why I get empty result from my UDAF, I guess there > may be 2 reason: > 1. error initialization with "" in code of define initialize method > 2. the buffer didn't write successfully. > > can anyone share a idea about this. thank you. > > > > > > > ThanksBest regards! > San.Luo > > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > -- Pedro Rodriguez PhD Student in Distributed Machine Learning | CU Boulder UC Berkeley AMPLab Alumni ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423 Github: github.com/EntilZha | LinkedIn: https://www.linkedin.com/in/pedrorodriguezscience
Re: DataFrame Min By Column
Thanks Michael, That seems like the analog to sorting tuples. I am curious, is there a significant performance penalty to the UDAF versus that? Its certainly nicer and more compact code at least. — Pedro Rodriguez PhD Student in Large-Scale Machine Learning | CU Boulder Systems Oriented Data Scientist UC Berkeley AMPLab Alumni pedrorodriguez.io | 909-353-4423 github.com/EntilZha | LinkedIn On July 9, 2016 at 2:19:11 PM, Michael Armbrust (mich...@databricks.com) wrote: You can do whats called an argmax/argmin, where you take the min/max of a couple of columns that have been grouped together as a struct. We sort in column order, so you can put the timestamp first. Here is an example. On Sat, Jul 9, 2016 at 6:10 AM, Pedro Rodriguez <ski.rodrig...@gmail.com> wrote: I implemented a more generic version which I posted here: https://gist.github.com/EntilZha/3951769a011389fef25e930258c20a2a I think I could generalize this by pattern matching on DataType to use different getLong/getDouble/etc functions ( not trying to use getAs[] because getting T from Array[T] is hard it seems). Is there a way to go further and make the arguments unnecessary or inferable at runtime, particularly for the valueType since it doesn’t matter what it is? DataType is abstract so I can’t instantiate it, is there a way to define the method so that it pulls from the user input at runtime? Thanks, — Pedro Rodriguez PhD Student in Large-Scale Machine Learning | CU Boulder Systems Oriented Data Scientist UC Berkeley AMPLab Alumni pedrorodriguez.io | 909-353-4423 github.com/EntilZha | LinkedIn On July 9, 2016 at 1:33:18 AM, Pedro Rodriguez (ski.rodrig...@gmail.com) wrote: Hi Xinh, A co-worker also found that solution but I thought it was possibly overkill/brittle so looks into UDAFs (user defined aggregate functions). I don’t have code, but Databricks has a post that has an example https://databricks.com/blog/2015/09/16/apache-spark-1-5-dataframe-api-highlights.html. From that, I was able to write a MinLongByTimestamp function, but was having a hard time writing a generic aggregate to any column by an order able column. Anyone know how you might go about using generics in a UDAF, or something that would mimic union types to express that order able spark sql types are allowed? — Pedro Rodriguez PhD Student in Large-Scale Machine Learning | CU Boulder Systems Oriented Data Scientist UC Berkeley AMPLab Alumni pedrorodriguez.io | 909-353-4423 github.com/EntilZha | LinkedIn On July 8, 2016 at 6:06:32 PM, Xinh Huynh (xinh.hu...@gmail.com) wrote: Hi Pedro, I could not think of a way using an aggregate. It's possible with a window function, partitioned on user and ordered by time: // Assuming "df" holds your dataframe ... import org.apache.spark.sql.functions._ import org.apache.spark.sql.expressions.Window val wSpec = Window.partitionBy("user").orderBy("time") df.select($"user", $"time", rank().over(wSpec).as("rank")) .where($"rank" === 1) Xinh On Fri, Jul 8, 2016 at 12:57 PM, Pedro Rodriguez <ski.rodrig...@gmail.com> wrote: Is there a way to on a GroupedData (from groupBy in DataFrame) to have an aggregate that returns column A based on a min of column B? For example, I have a list of sites visited by a given user and I would like to find the event with the minimum time (first event) Thanks, -- Pedro Rodriguez PhD Student in Distributed Machine Learning | CU Boulder UC Berkeley AMPLab Alumni ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423 Github: github.com/EntilZha | LinkedIn: https://www.linkedin.com/in/pedrorodriguezscience
Re: problem making Zeppelin 0.6 work with Spark 1.6.1, throwing jackson.databind.JsonMappingException exception
It would be helpful if you included relevant configuration files from each or if you are using the defaults, particularly any changes to class paths. I worked through Zeppelin to 0.6.0 at work and at home without any issue so hard to say more without having more details. — Pedro Rodriguez PhD Student in Large-Scale Machine Learning | CU Boulder Systems Oriented Data Scientist UC Berkeley AMPLab Alumni pedrorodriguez.io | 909-353-4423 github.com/EntilZha | LinkedIn On July 9, 2016 at 8:25:30 AM, Mich Talebzadeh (mich.talebza...@gmail.com) wrote: Hi, I just installed the latest Zeppelin 0.6 as follows: Source: zeppelin-0.6.0-bin-all With Spark 1.6.1 Now I am getting this issue with jackson. I did some search that suggested this is caused by the classpath providing you with a different version of Jackson than the one Spark is expecting. However, no luck yet. With Spark 1.5.2 and the previous version of Zeppelin namely 0.5.6-incubating it used to work without problem. Any ideas will be appreciated com.fasterxml.jackson.databind.JsonMappingException: Could not find creator property with name 'id' (in class org.apache.spark.rdd.RDDOperationScope) at [Source: {"id":"14","name":"ExecutedCommand"}; line: 1, column: 1] at com.fasterxml.jackson.databind.JsonMappingException.from(JsonMappingException.java:148) at com.fasterxml.jackson.databind.DeserializationContext.mappingException(DeserializationContext.java:843) at com.fasterxml.jackson.databind.deser.BeanDeserializerFactory.addBeanProps(BeanDeserializerFactory.java:533) at com.fasterxml.jackson.databind.deser.BeanDeserializerFactory.buildBeanDeserializer(BeanDeserializerFactory.java:220) at com.fasterxml.jackson.databind.deser.BeanDeserializerFactory.createBeanDeserializer(BeanDeserializerFactory.java:143) at com.fasterxml.jackson.databind.deser.DeserializerCache._createDeserializer2(DeserializerCache.java:409) at com.fasterxml.jackson.databind.deser.DeserializerCache._createDeserializer(DeserializerCache.java:358) at com.fasterxml.jackson.databind.deser.DeserializerCache._createAndCache2(DeserializerCache.java:265) at com.fasterxml.jackson.databind.deser.DeserializerCache._createAndCacheValueDeserializer(DeserializerCache.java:245) at com.fasterxml.jackson.databind.deser.DeserializerCache.findValueDeserializer(DeserializerCache.java:143) at com.fasterxml.jackson.databind.DeserializationContext.findRootValueDeserializer(DeserializationContext.java:439) at com.fasterxml.jackson.databind.ObjectMapper._findRootDeserializer(ObjectMapper.java:3666) at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3558) at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2578) at org.apache.spark.rdd.RDDOperationScope$.fromJson(RDDOperationScope.scala:85) at org.apache.spark.rdd.RDDOperationScope$$anonfun$5.apply(RDDOperationScope.scala:136) at org.apache.spark.rdd.RDDOperationScope$$anonfun$5.apply(RDDOperationScope.scala:136) at scala.Option.map(Option.scala:145) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:136) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) at org.apache.spark.SparkContext.withScope(SparkContext.scala:714) at org.apache.spark.SparkContext.parallelize(SparkContext.scala:728) at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:70) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55) at org.apache.spark.sql.DataFrame.(DataFrame.scala:145) at org.apache.spark.sql.DataFrame.(DataFrame.scala:130) at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:52) at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:817) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:34) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:39) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:41) at $iwC$$iwC$$iwC$$iwC$$iwC.(:43) at $iwC$$iwC$$iwC$$iwC.(:45) at $iwC$$iwC$$iwC.(:47) at $iwC$$iwC.(:49) at $iwC.(:51) at (:53) at .(:57) Dr Mich Talebzadeh LinkedIn https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw http://talebzadehmich.wordpress.com Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
Re: DataFrame Min By Column
I implemented a more generic version which I posted here: https://gist.github.com/EntilZha/3951769a011389fef25e930258c20a2a I think I could generalize this by pattern matching on DataType to use different getLong/getDouble/etc functions ( not trying to use getAs[] because getting T from Array[T] is hard it seems). Is there a way to go further and make the arguments unnecessary or inferable at runtime, particularly for the valueType since it doesn’t matter what it is? DataType is abstract so I can’t instantiate it, is there a way to define the method so that it pulls from the user input at runtime? Thanks, — Pedro Rodriguez PhD Student in Large-Scale Machine Learning | CU Boulder Systems Oriented Data Scientist UC Berkeley AMPLab Alumni pedrorodriguez.io | 909-353-4423 github.com/EntilZha | LinkedIn On July 9, 2016 at 1:33:18 AM, Pedro Rodriguez (ski.rodrig...@gmail.com) wrote: Hi Xinh, A co-worker also found that solution but I thought it was possibly overkill/brittle so looks into UDAFs (user defined aggregate functions). I don’t have code, but Databricks has a post that has an example https://databricks.com/blog/2015/09/16/apache-spark-1-5-dataframe-api-highlights.html. From that, I was able to write a MinLongByTimestamp function, but was having a hard time writing a generic aggregate to any column by an order able column. Anyone know how you might go about using generics in a UDAF, or something that would mimic union types to express that order able spark sql types are allowed? — Pedro Rodriguez PhD Student in Large-Scale Machine Learning | CU Boulder Systems Oriented Data Scientist UC Berkeley AMPLab Alumni pedrorodriguez.io | 909-353-4423 github.com/EntilZha | LinkedIn On July 8, 2016 at 6:06:32 PM, Xinh Huynh (xinh.hu...@gmail.com) wrote: Hi Pedro, I could not think of a way using an aggregate. It's possible with a window function, partitioned on user and ordered by time: // Assuming "df" holds your dataframe ... import org.apache.spark.sql.functions._ import org.apache.spark.sql.expressions.Window val wSpec = Window.partitionBy("user").orderBy("time") df.select($"user", $"time", rank().over(wSpec).as("rank")) .where($"rank" === 1) Xinh On Fri, Jul 8, 2016 at 12:57 PM, Pedro Rodriguez <ski.rodrig...@gmail.com> wrote: Is there a way to on a GroupedData (from groupBy in DataFrame) to have an aggregate that returns column A based on a min of column B? For example, I have a list of sites visited by a given user and I would like to find the event with the minimum time (first event) Thanks, -- Pedro Rodriguez PhD Student in Distributed Machine Learning | CU Boulder UC Berkeley AMPLab Alumni ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423 Github: github.com/EntilZha | LinkedIn: https://www.linkedin.com/in/pedrorodriguezscience
Re: DataFrame Min By Column
Hi Xinh, A co-worker also found that solution but I thought it was possibly overkill/brittle so looks into UDAFs (user defined aggregate functions). I don’t have code, but Databricks has a post that has an example https://databricks.com/blog/2015/09/16/apache-spark-1-5-dataframe-api-highlights.html. From that, I was able to write a MinLongByTimestamp function, but was having a hard time writing a generic aggregate to any column by an order able column. Anyone know how you might go about using generics in a UDAF, or something that would mimic union types to express that order able spark sql types are allowed? — Pedro Rodriguez PhD Student in Large-Scale Machine Learning | CU Boulder Systems Oriented Data Scientist UC Berkeley AMPLab Alumni pedrorodriguez.io | 909-353-4423 github.com/EntilZha | LinkedIn On July 8, 2016 at 6:06:32 PM, Xinh Huynh (xinh.hu...@gmail.com) wrote: Hi Pedro, I could not think of a way using an aggregate. It's possible with a window function, partitioned on user and ordered by time: // Assuming "df" holds your dataframe ... import org.apache.spark.sql.functions._ import org.apache.spark.sql.expressions.Window val wSpec = Window.partitionBy("user").orderBy("time") df.select($"user", $"time", rank().over(wSpec).as("rank")) .where($"rank" === 1) Xinh On Fri, Jul 8, 2016 at 12:57 PM, Pedro Rodriguez <ski.rodrig...@gmail.com> wrote: Is there a way to on a GroupedData (from groupBy in DataFrame) to have an aggregate that returns column A based on a min of column B? For example, I have a list of sites visited by a given user and I would like to find the event with the minimum time (first event) Thanks, -- Pedro Rodriguez PhD Student in Distributed Machine Learning | CU Boulder UC Berkeley AMPLab Alumni ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423 Github: github.com/EntilZha | LinkedIn: https://www.linkedin.com/in/pedrorodriguezscience
DataFrame Min By Column
Is there a way to on a GroupedData (from groupBy in DataFrame) to have an aggregate that returns column A based on a min of column B? For example, I have a list of sites visited by a given user and I would like to find the event with the minimum time (first event) Thanks, -- Pedro Rodriguez PhD Student in Distributed Machine Learning | CU Boulder UC Berkeley AMPLab Alumni ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423 Github: github.com/EntilZha | LinkedIn: https://www.linkedin.com/in/pedrorodriguezscience
Re: Custom RDD: Report Size of Partition in Bytes to Spark
Just realized I had been replying back to only Takeshi. Thanks for tip as it got me on the right track. Running into an issue with private [spark] methods though. It looks like the input metrics start out as None and are not initialized (verified by throwing new Exception on pattern match cases when it is None and when its not). Looks like NewHadoopRDD calls getInputMetricsForReadMethod which sets _inputMetrics if it is None, but it is unfortunately it is private [spark]. Is there a way for external RDDs to access this method or somehow initialize _inputMetrics in 1.6.X (looks like 2.0 makes more of this API public)? Using reflection I was able to implement it mimicking the NewHadoopRDD code, but if possible would like to avoid using reflection. Below is the source code for the method that works. RDD code: https://github.com/EntilZha/spark-s3/blob/9e632f2a71fba2858df748ed43f0dbb5dae52a83/src/main/scala/io/entilzha/spark/s3/S3RDD.scala#L100-L105 Reflection code: https://github.com/EntilZha/spark-s3/blob/9e632f2a71fba2858df748ed43f0dbb5dae52a83/src/main/scala/io/entilzha/spark/s3/PrivateMethodUtil.scala Thanks, — Pedro Rodriguez PhD Student in Large-Scale Machine Learning | CU Boulder Systems Oriented Data Scientist UC Berkeley AMPLab Alumni pedrorodriguez.io | 909-353-4423 github.com/EntilZha | LinkedIn On July 3, 2016 at 10:31:30 PM, Takeshi Yamamuro (linguin@gmail.com) wrote: How about using `SparkListener`? You can collect IO statistics thru TaskMetrics#inputMetrics by yourself. // maropu On Mon, Jul 4, 2016 at 11:46 AM, Pedro Rodriguez <ski.rodrig...@gmail.com> wrote: Hi All, I noticed on some Spark jobs it shows you input/output read size. I am implementing a custom RDD which reads files and would like to report these metrics to Spark since they are available to me. I looked through the RDD source code and a couple different implementations and the best I could find were some Hadoop metrics. Is there a way to simply report the number of bytes a partition read so Spark can put it on the UI? Thanks, — Pedro Rodriguez PhD Student in Large-Scale Machine Learning | CU Boulder Systems Oriented Data Scientist UC Berkeley AMPLab Alumni pedrorodriguez.io | 909-353-4423 github.com/EntilZha | LinkedIn -- --- Takeshi Yamamuro
Custom RDD: Report Size of Partition in Bytes to Spark
Hi All, I noticed on some Spark jobs it shows you input/output read size. I am implementing a custom RDD which reads files and would like to report these metrics to Spark since they are available to me. I looked through the RDD source code and a couple different implementations and the best I could find were some Hadoop metrics. Is there a way to simply report the number of bytes a partition read so Spark can put it on the UI? Thanks, — Pedro Rodriguez PhD Student in Large-Scale Machine Learning | CU Boulder Systems Oriented Data Scientist UC Berkeley AMPLab Alumni pedrorodriguez.io | 909-353-4423 github.com/EntilZha | LinkedIn
Re: Call Scala API from PySpark
That was indeed the case, using UTF8Deserializer makes everything work correctly. Thanks for the tips! On Thu, Jun 30, 2016 at 3:32 PM, Pedro Rodriguez <ski.rodrig...@gmail.com> wrote: > Quick update, I was able to get most of the plumbing to work thanks to the > code Holden posted and browsing more source code. > > I am running into this error which makes me think that maybe I shouldn't > be leaving the default python RDD serializer/pickler in place and do > something else > https://github.com/apache/spark/blob/v1.6.2/python/pyspark/rdd.py#L182: > _pickle.UnpicklingError: A load persistent id instruction was encountered, > but no persistent_load function was specified. > > > On Thu, Jun 30, 2016 at 2:13 PM, Pedro Rodriguez <ski.rodrig...@gmail.com> > wrote: > >> Thanks Jeff and Holden, >> >> A little more context here probably helps. I am working on implementing >> the idea from this article to make reads from S3 faster: >> http://tech.kinja.com/how-not-to-pull-from-s3-using-apache-spark-1704509219 >> (although my name is Pedro, I am not the author of the article). The reason >> for wrapping SparkContext is so that the code change is from sc.textFile to >> sc.s3TextFile in addition to configuring AWS keys correctly (seeing if we >> can open source our library, but depends on company). Overall, its a very >> light wrapper and perhaps calling it a context is not quite the right name >> because of that. >> >> At the end of the day I make a sc.parallelize call and return an >> RDD[String] as described in that blog post. I found a post from Py4J >> mailing list that reminded my that the JVM gateway needs the jars in >> spark.driver/executor.extraClassPath in addition to the spark.jars option. >> With that, I can see the classes now. Looks like I need to do as you >> suggest and wrap it using Java in order to go the last mile to calling the >> method/constructor. I don't know yet how to get the RDD back to pyspark >> though so any pointers on that would be great. >> >> Thanks for the tip on code Holden, I will take a look to see if that can >> give me some insight on how to write the Python code part. >> >> Thanks! >> Pedro >> >> On Thu, Jun 30, 2016 at 12:23 PM, Holden Karau <hol...@pigscanfly.ca> >> wrote: >> >>> So I'm a little biased - I think the bet bride between the two is using >>> DataFrames. I've got some examples in my talk and on the high performance >>> spark GitHub >>> https://github.com/high-performance-spark/high-performance-spark-examples/blob/master/high_performance_pyspark/simple_perf_test.py >>> calls some custom scala code. >>> >>> Using a custom context is a bit trixie though because of how the >>> launching is done, as Jeff Zhang points out you would need to wrap it in a >>> JavaSparkContext and then you could override the _intialize_context >>> function in context.py >>> >>> On Thu, Jun 30, 2016 at 11:06 AM, Jeff Zhang <zjf...@gmail.com> wrote: >>> >>>> Hi Pedro, >>>> >>>> Your use case is interesting. I think launching java gateway is the >>>> same as native SparkContext, the only difference is on creating your custom >>>> SparkContext instead of native SparkContext. You might also need to wrap it >>>> using java. >>>> >>>> >>>> https://github.com/apache/spark/blob/v1.6.2/python/pyspark/context.py#L172 >>>> >>>> >>>> >>>> On Thu, Jun 30, 2016 at 9:53 AM, Pedro Rodriguez < >>>> ski.rodrig...@gmail.com> wrote: >>>> >>>>> Hi All, >>>>> >>>>> I have written a Scala package which essentially wraps the >>>>> SparkContext around a custom class that adds some functionality specific >>>>> to >>>>> our internal use case. I am trying to figure out the best way to call this >>>>> from PySpark. >>>>> >>>>> I would like to do this similarly to how Spark itself calls the JVM >>>>> SparkContext as in: >>>>> https://github.com/apache/spark/blob/v1.6.2/python/pyspark/context.py >>>>> >>>>> My goal would be something like this: >>>>> >>>>> Scala Code (this is done): >>>>> >>> import com.company.mylibrary.CustomContext >>>>> >>> val myContext = CustomContext(sc) >>>>> >>> val rdd: RDD[String] = myContext.customTextFile("path") >>>>> >&g
Re: Call Scala API from PySpark
Quick update, I was able to get most of the plumbing to work thanks to the code Holden posted and browsing more source code. I am running into this error which makes me think that maybe I shouldn't be leaving the default python RDD serializer/pickler in place and do something else https://github.com/apache/spark/blob/v1.6.2/python/pyspark/rdd.py#L182: _pickle.UnpicklingError: A load persistent id instruction was encountered, but no persistent_load function was specified. On Thu, Jun 30, 2016 at 2:13 PM, Pedro Rodriguez <ski.rodrig...@gmail.com> wrote: > Thanks Jeff and Holden, > > A little more context here probably helps. I am working on implementing > the idea from this article to make reads from S3 faster: > http://tech.kinja.com/how-not-to-pull-from-s3-using-apache-spark-1704509219 > (although my name is Pedro, I am not the author of the article). The reason > for wrapping SparkContext is so that the code change is from sc.textFile to > sc.s3TextFile in addition to configuring AWS keys correctly (seeing if we > can open source our library, but depends on company). Overall, its a very > light wrapper and perhaps calling it a context is not quite the right name > because of that. > > At the end of the day I make a sc.parallelize call and return an > RDD[String] as described in that blog post. I found a post from Py4J > mailing list that reminded my that the JVM gateway needs the jars in > spark.driver/executor.extraClassPath in addition to the spark.jars option. > With that, I can see the classes now. Looks like I need to do as you > suggest and wrap it using Java in order to go the last mile to calling the > method/constructor. I don't know yet how to get the RDD back to pyspark > though so any pointers on that would be great. > > Thanks for the tip on code Holden, I will take a look to see if that can > give me some insight on how to write the Python code part. > > Thanks! > Pedro > > On Thu, Jun 30, 2016 at 12:23 PM, Holden Karau <hol...@pigscanfly.ca> > wrote: > >> So I'm a little biased - I think the bet bride between the two is using >> DataFrames. I've got some examples in my talk and on the high performance >> spark GitHub >> https://github.com/high-performance-spark/high-performance-spark-examples/blob/master/high_performance_pyspark/simple_perf_test.py >> calls some custom scala code. >> >> Using a custom context is a bit trixie though because of how the >> launching is done, as Jeff Zhang points out you would need to wrap it in a >> JavaSparkContext and then you could override the _intialize_context >> function in context.py >> >> On Thu, Jun 30, 2016 at 11:06 AM, Jeff Zhang <zjf...@gmail.com> wrote: >> >>> Hi Pedro, >>> >>> Your use case is interesting. I think launching java gateway is the >>> same as native SparkContext, the only difference is on creating your custom >>> SparkContext instead of native SparkContext. You might also need to wrap it >>> using java. >>> >>> >>> https://github.com/apache/spark/blob/v1.6.2/python/pyspark/context.py#L172 >>> >>> >>> >>> On Thu, Jun 30, 2016 at 9:53 AM, Pedro Rodriguez < >>> ski.rodrig...@gmail.com> wrote: >>> >>>> Hi All, >>>> >>>> I have written a Scala package which essentially wraps the SparkContext >>>> around a custom class that adds some functionality specific to our internal >>>> use case. I am trying to figure out the best way to call this from PySpark. >>>> >>>> I would like to do this similarly to how Spark itself calls the JVM >>>> SparkContext as in: >>>> https://github.com/apache/spark/blob/v1.6.2/python/pyspark/context.py >>>> >>>> My goal would be something like this: >>>> >>>> Scala Code (this is done): >>>> >>> import com.company.mylibrary.CustomContext >>>> >>> val myContext = CustomContext(sc) >>>> >>> val rdd: RDD[String] = myContext.customTextFile("path") >>>> >>>> Python Code (I want to be able to do this): >>>> >>> from company.mylibrary import CustomContext >>>> >>> myContext = CustomContext(sc) >>>> >>> rdd = myContext.customTextFile("path") >>>> >>>> At the end of each code, I should be working with an ordinary >>>> RDD[String]. >>>> >>>> I am trying to access my Scala class through sc._jvm as below, but not >>>> having any luck so far. >>>> >>>> My attempts
Re: Call Scala API from PySpark
Thanks Jeff and Holden, A little more context here probably helps. I am working on implementing the idea from this article to make reads from S3 faster: http://tech.kinja.com/how-not-to-pull-from-s3-using-apache-spark-1704509219 (although my name is Pedro, I am not the author of the article). The reason for wrapping SparkContext is so that the code change is from sc.textFile to sc.s3TextFile in addition to configuring AWS keys correctly (seeing if we can open source our library, but depends on company). Overall, its a very light wrapper and perhaps calling it a context is not quite the right name because of that. At the end of the day I make a sc.parallelize call and return an RDD[String] as described in that blog post. I found a post from Py4J mailing list that reminded my that the JVM gateway needs the jars in spark.driver/executor.extraClassPath in addition to the spark.jars option. With that, I can see the classes now. Looks like I need to do as you suggest and wrap it using Java in order to go the last mile to calling the method/constructor. I don't know yet how to get the RDD back to pyspark though so any pointers on that would be great. Thanks for the tip on code Holden, I will take a look to see if that can give me some insight on how to write the Python code part. Thanks! Pedro On Thu, Jun 30, 2016 at 12:23 PM, Holden Karau <hol...@pigscanfly.ca> wrote: > So I'm a little biased - I think the bet bride between the two is using > DataFrames. I've got some examples in my talk and on the high performance > spark GitHub > https://github.com/high-performance-spark/high-performance-spark-examples/blob/master/high_performance_pyspark/simple_perf_test.py > calls some custom scala code. > > Using a custom context is a bit trixie though because of how the launching > is done, as Jeff Zhang points out you would need to wrap it in a > JavaSparkContext and then you could override the _intialize_context > function in context.py > > On Thu, Jun 30, 2016 at 11:06 AM, Jeff Zhang <zjf...@gmail.com> wrote: > >> Hi Pedro, >> >> Your use case is interesting. I think launching java gateway is the same >> as native SparkContext, the only difference is on creating your custom >> SparkContext instead of native SparkContext. You might also need to wrap it >> using java. >> >> https://github.com/apache/spark/blob/v1.6.2/python/pyspark/context.py#L172 >> >> >> >> On Thu, Jun 30, 2016 at 9:53 AM, Pedro Rodriguez <ski.rodrig...@gmail.com >> > wrote: >> >>> Hi All, >>> >>> I have written a Scala package which essentially wraps the SparkContext >>> around a custom class that adds some functionality specific to our internal >>> use case. I am trying to figure out the best way to call this from PySpark. >>> >>> I would like to do this similarly to how Spark itself calls the JVM >>> SparkContext as in: >>> https://github.com/apache/spark/blob/v1.6.2/python/pyspark/context.py >>> >>> My goal would be something like this: >>> >>> Scala Code (this is done): >>> >>> import com.company.mylibrary.CustomContext >>> >>> val myContext = CustomContext(sc) >>> >>> val rdd: RDD[String] = myContext.customTextFile("path") >>> >>> Python Code (I want to be able to do this): >>> >>> from company.mylibrary import CustomContext >>> >>> myContext = CustomContext(sc) >>> >>> rdd = myContext.customTextFile("path") >>> >>> At the end of each code, I should be working with an ordinary >>> RDD[String]. >>> >>> I am trying to access my Scala class through sc._jvm as below, but not >>> having any luck so far. >>> >>> My attempts: >>> >>> a = sc._jvm.com.company.mylibrary.CustomContext >>> >>> dir(a) >>> [''] >>> >>> Example of what I want:: >>> >>> a = sc._jvm.PythonRDD >>> >>> dir(a) >>> ['anonfun$6', 'anonfun$8', 'collectAndServe', >>> 'doubleRDDToDoubleRDDFunctions', 'getWorkerBroadcasts', 'hadoopFile', >>> 'hadoopRDD', 'newAPIHadoopFile', 'newAPIHadoopRDD', >>> 'numericRDDToDoubleRDDFunctions', 'rddToAsyncRDDActions', >>> 'rddToOrderedRDDFunctions', 'rddToPairRDDFunctions', >>> 'rddToPairRDDFunctions$default$4', 'rddToSequenceFileRDDFunctions', >>> 'readBroadcastFromFile', 'readRDDFromFile', 'runJob', >>> 'saveAsHadoopDataset', 'saveAsHadoopFile', 'saveAsNewAPIHadoopFile', >>> 'saveAsSequenceFile', 'sequenceFile', 'serveIterator', 'valueOfPair', >>> 'writeIteratorToStream', 'writeUTF'] >>
Call Scala API from PySpark
Hi All, I have written a Scala package which essentially wraps the SparkContext around a custom class that adds some functionality specific to our internal use case. I am trying to figure out the best way to call this from PySpark. I would like to do this similarly to how Spark itself calls the JVM SparkContext as in: https://github.com/apache/spark/blob/v1.6.2/python/pyspark/context.py My goal would be something like this: Scala Code (this is done): >>> import com.company.mylibrary.CustomContext >>> val myContext = CustomContext(sc) >>> val rdd: RDD[String] = myContext.customTextFile("path") Python Code (I want to be able to do this): >>> from company.mylibrary import CustomContext >>> myContext = CustomContext(sc) >>> rdd = myContext.customTextFile("path") At the end of each code, I should be working with an ordinary RDD[String]. I am trying to access my Scala class through sc._jvm as below, but not having any luck so far. My attempts: >>> a = sc._jvm.com.company.mylibrary.CustomContext >>> dir(a) [''] Example of what I want:: >>> a = sc._jvm.PythonRDD >>> dir(a) ['anonfun$6', 'anonfun$8', 'collectAndServe', 'doubleRDDToDoubleRDDFunctions', 'getWorkerBroadcasts', 'hadoopFile', 'hadoopRDD', 'newAPIHadoopFile', 'newAPIHadoopRDD', 'numericRDDToDoubleRDDFunctions', 'rddToAsyncRDDActions', 'rddToOrderedRDDFunctions', 'rddToPairRDDFunctions', 'rddToPairRDDFunctions$default$4', 'rddToSequenceFileRDDFunctions', 'readBroadcastFromFile', 'readRDDFromFile', 'runJob', 'saveAsHadoopDataset', 'saveAsHadoopFile', 'saveAsNewAPIHadoopFile', 'saveAsSequenceFile', 'sequenceFile', 'serveIterator', 'valueOfPair', 'writeIteratorToStream', 'writeUTF'] The next thing I would run into is converting the JVM RDD[String] back to a Python RDD, what is the easiest way to do this? Overall, is this a good approach to calling the same API in Scala and Python? -- Pedro Rodriguez PhD Student in Distributed Machine Learning | CU Boulder UC Berkeley AMPLab Alumni ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423 Github: github.com/EntilZha | LinkedIn: https://www.linkedin.com/in/pedrorodriguezscience
Re: Dataset Select Function after Aggregate Error
I am curious if there is a way to call this so that it becomes a compile error rather than runtime error: // Note mispelled count and name ds.groupBy($"name").count.select('nam, $"coun").show More specifically, what are the best type safety guarantees that Datasets provide? It seems like with Dataframes there is still the unsafety of specifying column names by string/symbol and expecting the type to be correct and exist, but if you do something like this then downstream code is safer: // This is Array[(String, Long)] instead of Array[sql.Row] ds.groupBy($"name").count.select('name.as[String], 'count.as [Long]).collect() Does that seem like a correct understanding of Datasets? On Sat, Jun 18, 2016 at 6:39 AM, Pedro Rodriguez <ski.rodrig...@gmail.com> wrote: > Looks like it was my own fault. I had spark 2.0 cloned/built, but had the > spark shell in my path so somehow 1.6.1 was being used instead of 2.0. > Thanks > > On Sat, Jun 18, 2016 at 1:16 AM, Takeshi Yamamuro <linguin@gmail.com> > wrote: > >> which version you use? >> I passed in 2.0-preview as follows; >> --- >> >> Spark context available as 'sc' (master = local[*], app id = >> local-1466234043659). >> >> Spark session available as 'spark'. >> >> Welcome to >> >> __ >> >> / __/__ ___ _/ /__ >> >> _\ \/ _ \/ _ `/ __/ '_/ >> >>/___/ .__/\_,_/_/ /_/\_\ version 2.0.0-preview >> >> /_/ >> >> >> >> Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java >> 1.8.0_31) >> >> Type in expressions to have them evaluated. >> >> Type :help for more information. >> >> >> scala> val ds = Seq[Tuple2[Int, Int]]((1, 0), (2, 0)).toDS >> >> hive.metastore.schema.verification is not enabled so recording the schema >> version 1.2.0 >> >> ds: org.apache.spark.sql.Dataset[(Int, Int)] = [_1: int, _2: int] >> >> scala> ds.groupBy($"_1").count.select($"_1", $"count").show >> >> +---+-+ >> >> | _1|count| >> >> +---+-+ >> >> | 1|1| >> >> | 2|1| >> >> +---+-+ >> >> >> >> On Sat, Jun 18, 2016 at 3:09 PM, Pedro Rodriguez <ski.rodrig...@gmail.com >> > wrote: >> >>> I went ahead and downloaded/compiled Spark 2.0 to try your code snippet >>> Takeshi. It unfortunately doesn't compile. >>> >>> scala> val ds = Seq[Tuple2[Int, Int]]((1, 0), (2, 0)).toDS >>> ds: org.apache.spark.sql.Dataset[(Int, Int)] = [_1: int, _2: int] >>> >>> scala> ds.groupBy($"_1").count.select($"_1", $"count").show >>> :28: error: type mismatch; >>> found : org.apache.spark.sql.ColumnName >>> required: org.apache.spark.sql.TypedColumn[(org.apache.spark.sql.Row, >>> Long),?] >>> ds.groupBy($"_1").count.select($"_1", $"count").show >>> ^ >>> >>> I also gave a try to Xinh's suggestion using the code snippet below >>> (partially from spark docs) >>> scala> val ds = Seq(Person("Andy", 32), Person("Andy", 2), >>> Person("Pedro", 24), Person("Bob", 42)).toDS() >>> scala> ds.groupBy(_.name).count.select($"name".as[String]).show >>> org.apache.spark.sql.AnalysisException: cannot resolve 'name' given >>> input columns: []; >>> scala> ds.groupBy(_.name).count.select($"_1".as[String]).show >>> org.apache.spark.sql.AnalysisException: cannot resolve 'name' given >>> input columns: []; >>> scala> ds.groupBy($"name").count.select($"_1".as[String]).show >>> org.apache.spark.sql.AnalysisException: cannot resolve '_1' given input >>> columns: []; >>> >>> Looks like there are empty columns for some reason, the code below works >>> fine for the simple aggregate >>> scala> ds.groupBy(_.name).count.show >>> >>> Would be great to see an idiomatic example of using aggregates like >>> these mixed with spark.sql.functions. >>> >>> Pedro >>> >>> On Fri, Jun 17, 2016 at 9:59 PM, Pedro Rodriguez < >>> ski.rodrig...@gmail.com> wrote: >>> >>>> Thanks Xinh and Takeshi, >>>> >>>> I am trying to avoid map since my impression is that this uses a Scala >>>> clos
Re: Dataset Select Function after Aggregate Error
Looks like it was my own fault. I had spark 2.0 cloned/built, but had the spark shell in my path so somehow 1.6.1 was being used instead of 2.0. Thanks On Sat, Jun 18, 2016 at 1:16 AM, Takeshi Yamamuro <linguin@gmail.com> wrote: > which version you use? > I passed in 2.0-preview as follows; > --- > > Spark context available as 'sc' (master = local[*], app id = > local-1466234043659). > > Spark session available as 'spark'. > > Welcome to > > __ > > / __/__ ___ _/ /__ > > _\ \/ _ \/ _ `/ __/ '_/ > >/___/ .__/\_,_/_/ /_/\_\ version 2.0.0-preview > > /_/ > > > > Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java > 1.8.0_31) > > Type in expressions to have them evaluated. > > Type :help for more information. > > > scala> val ds = Seq[Tuple2[Int, Int]]((1, 0), (2, 0)).toDS > > hive.metastore.schema.verification is not enabled so recording the schema > version 1.2.0 > > ds: org.apache.spark.sql.Dataset[(Int, Int)] = [_1: int, _2: int] > > scala> ds.groupBy($"_1").count.select($"_1", $"count").show > > +---+-+ > > | _1|count| > > +---+-+ > > | 1|1| > > | 2|1| > > +---+-+ > > > > On Sat, Jun 18, 2016 at 3:09 PM, Pedro Rodriguez <ski.rodrig...@gmail.com> > wrote: > >> I went ahead and downloaded/compiled Spark 2.0 to try your code snippet >> Takeshi. It unfortunately doesn't compile. >> >> scala> val ds = Seq[Tuple2[Int, Int]]((1, 0), (2, 0)).toDS >> ds: org.apache.spark.sql.Dataset[(Int, Int)] = [_1: int, _2: int] >> >> scala> ds.groupBy($"_1").count.select($"_1", $"count").show >> :28: error: type mismatch; >> found : org.apache.spark.sql.ColumnName >> required: org.apache.spark.sql.TypedColumn[(org.apache.spark.sql.Row, >> Long),?] >> ds.groupBy($"_1").count.select($"_1", $"count").show >> ^ >> >> I also gave a try to Xinh's suggestion using the code snippet below >> (partially from spark docs) >> scala> val ds = Seq(Person("Andy", 32), Person("Andy", 2), >> Person("Pedro", 24), Person("Bob", 42)).toDS() >> scala> ds.groupBy(_.name).count.select($"name".as[String]).show >> org.apache.spark.sql.AnalysisException: cannot resolve 'name' given input >> columns: []; >> scala> ds.groupBy(_.name).count.select($"_1".as[String]).show >> org.apache.spark.sql.AnalysisException: cannot resolve 'name' given input >> columns: []; >> scala> ds.groupBy($"name").count.select($"_1".as[String]).show >> org.apache.spark.sql.AnalysisException: cannot resolve '_1' given input >> columns: []; >> >> Looks like there are empty columns for some reason, the code below works >> fine for the simple aggregate >> scala> ds.groupBy(_.name).count.show >> >> Would be great to see an idiomatic example of using aggregates like these >> mixed with spark.sql.functions. >> >> Pedro >> >> On Fri, Jun 17, 2016 at 9:59 PM, Pedro Rodriguez <ski.rodrig...@gmail.com >> > wrote: >> >>> Thanks Xinh and Takeshi, >>> >>> I am trying to avoid map since my impression is that this uses a Scala >>> closure so is not optimized as well as doing column-wise operations is. >>> >>> Looks like the $ notation is the way to go, thanks for the help. Is >>> there an explanation of how this works? I imagine it is a method/function >>> with its name defined as $ in Scala? >>> >>> Lastly, are there prelim Spark 2.0 docs? If there isn't a good >>> description/guide of using this syntax I would be willing to contribute >>> some documentation. >>> >>> Pedro >>> >>> On Fri, Jun 17, 2016 at 8:53 PM, Takeshi Yamamuro <linguin@gmail.com >>> > wrote: >>> >>>> Hi, >>>> >>>> In 2.0, you can say; >>>> val ds = Seq[Tuple2[Int, Int]]((1, 0), (2, 0)).toDS >>>> ds.groupBy($"_1").count.select($"_1", $"count").show >>>> >>>> >>>> // maropu >>>> >>>> >>>> On Sat, Jun 18, 2016 at 7:53 AM, Xinh Huynh <xinh.hu...@gmail.com> >>>> wrote: >>>> >>>>> Hi Pedro, >>>>> >>>>> In 1.6.1, you can do: >&g
Re: Skew data
I am going to take a guess that this means that your partitions within an RDD are not balanced (one or more partitions are much larger than the rest). This would mean a single core would need to do much more work than the rest leading to poor performance. In general, the way to fix this is to spread data across partitions evenly. In most cases calling repartition is enough to solve the problem. If you have a special case you might need create your own custom partitioner. Pedro On Thu, Jun 16, 2016 at 6:55 PM, Selvam Raman <sel...@gmail.com> wrote: > Hi, > > What is skew data. > > I read that if the data was skewed while joining it would take long time > to finish the job.(99 percent finished in seconds where 1 percent of task > taking minutes to hour). > > How to handle skewed data in spark. > > Thanks, > Selvam R > +91-97877-87724 > -- Pedro Rodriguez PhD Student in Distributed Machine Learning | CU Boulder UC Berkeley AMPLab Alumni ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423 Github: github.com/EntilZha | LinkedIn: https://www.linkedin.com/in/pedrorodriguezscience
Re: Dataset Select Function after Aggregate Error
Thanks Xinh and Takeshi, I am trying to avoid map since my impression is that this uses a Scala closure so is not optimized as well as doing column-wise operations is. Looks like the $ notation is the way to go, thanks for the help. Is there an explanation of how this works? I imagine it is a method/function with its name defined as $ in Scala? Lastly, are there prelim Spark 2.0 docs? If there isn't a good description/guide of using this syntax I would be willing to contribute some documentation. Pedro On Fri, Jun 17, 2016 at 8:53 PM, Takeshi Yamamuro <linguin@gmail.com> wrote: > Hi, > > In 2.0, you can say; > val ds = Seq[Tuple2[Int, Int]]((1, 0), (2, 0)).toDS > ds.groupBy($"_1").count.select($"_1", $"count").show > > > // maropu > > > On Sat, Jun 18, 2016 at 7:53 AM, Xinh Huynh <xinh.hu...@gmail.com> wrote: > >> Hi Pedro, >> >> In 1.6.1, you can do: >> >> ds.groupBy(_.uid).count().map(_._1) >> or >> >> ds.groupBy(_.uid).count().select($"value".as[String]) >> >> It doesn't have the exact same syntax as for DataFrame. >> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Dataset >> >> It might be different in 2.0. >> >> Xinh >> >> On Fri, Jun 17, 2016 at 3:33 PM, Pedro Rodriguez <ski.rodrig...@gmail.com >> > wrote: >> >>> Hi All, >>> >>> I am working on using Datasets in 1.6.1 and eventually 2.0 when its >>> released. >>> >>> I am running the aggregate code below where I have a dataset where the >>> row has a field uid: >>> >>> ds.groupBy(_.uid).count() >>> // res0: org.apache.spark.sql.Dataset[(String, Long)] = [_1: string, >>> _2: bigint] >>> >>> This works as expected, however, attempts to run select statements after >>> fails: >>> ds.groupBy(_.uid).count().select(_._1) >>> // error: missing parameter type for expanded function ((x$2) => x$2._1) >>> ds.groupBy(_.uid).count().select(_._1) >>> >>> I have tried several variants, but nothing seems to work. Below is the >>> equivalent Dataframe code which works as expected: >>> df.groupBy("uid").count().select("uid") >>> >>> Thanks! >>> -- >>> Pedro Rodriguez >>> PhD Student in Distributed Machine Learning | CU Boulder >>> UC Berkeley AMPLab Alumni >>> >>> ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423 >>> Github: github.com/EntilZha | LinkedIn: >>> https://www.linkedin.com/in/pedrorodriguezscience >>> >>> >> > > > -- > --- > Takeshi Yamamuro > -- Pedro Rodriguez PhD Student in Distributed Machine Learning | CU Boulder UC Berkeley AMPLab Alumni ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423 Github: github.com/EntilZha | LinkedIn: https://www.linkedin.com/in/pedrorodriguezscience
Dataset Select Function after Aggregate Error
Hi All, I am working on using Datasets in 1.6.1 and eventually 2.0 when its released. I am running the aggregate code below where I have a dataset where the row has a field uid: ds.groupBy(_.uid).count() // res0: org.apache.spark.sql.Dataset[(String, Long)] = [_1: string, _2: bigint] This works as expected, however, attempts to run select statements after fails: ds.groupBy(_.uid).count().select(_._1) // error: missing parameter type for expanded function ((x$2) => x$2._1) ds.groupBy(_.uid).count().select(_._1) I have tried several variants, but nothing seems to work. Below is the equivalent Dataframe code which works as expected: df.groupBy("uid").count().select("uid") Thanks! -- Pedro Rodriguez PhD Student in Distributed Machine Learning | CU Boulder UC Berkeley AMPLab Alumni ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423 Github: github.com/EntilZha | LinkedIn: https://www.linkedin.com/in/pedrorodriguezscience
Undestanding Spark Rebalancing
Hi All, I am running a Spark program where one of my parts is using Spark as a scheduler rather than a data management framework. That is, my job can be described as RDD[String] where the string describes an operation to perform which may be cheap or expensive (process an object which may have a small or large amount of records associated with it). Leaving things to default, I have bad job balancing. I am wondering which approach I should take: 1. Write a partitioner which uses partitionBy to ahead of time balance partitions by number of records each string needs 2. repartition to have many small partitions (I have ~1700 strings acting as jobs to run, so maybe 1-5 per partition). My question here is, does Spark re-schedule/steal jobs if there are executors/worker processes that aren't doing any work? The second one would be easier and since I am not shuffling much data around would work just fine for me, but I can't seem to find out for sure if Spark does job re-scheduling/stealing. Thanks -- Pedro Rodriguez PhD Student in Distributed Machine Learning | CU Boulder UC Berkeley AMPLab Alumni ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423 Github: github.com/EntilZha | LinkedIn: https://www.linkedin.com/in/pedrorodriguezscience
Re: How to speed up MLlib LDA?
I helped some with the LDA and worked quite a bit on a Gibbs version. I don't know if the Gibbs version might help, but since it is not (yet) in MLlib, Intel Analytics kindly created a spark package with their adapted version plus a couple other LDA algorithms: http://spark-packages.org/package/intel-analytics/TopicModeling https://github.com/intel-analytics/TopicModeling It might be worth trying out. Do you know what LDA algorithm VW uses? Pedro On Tue, Sep 22, 2015 at 1:54 AM, Marko Asplund <marko.aspl...@gmail.com> wrote: > Hi, > > I did some profiling for my LDA prototype code that requests topic > distributions from a model. > According to Java Mission Control more than 80 % of execution time during > sample interval is spent in the following methods: > > org.apache.commons.math3.util.FastMath.log(double); count: 337; 47.07% > org.apache.commons.math3.special.Gamma.digamma(double); count: 164; 22.91% > org.apache.commons.math3.util.FastMath.log(double, double[]); count: 50; > 6.98% > java.lang.Double.valueOf(double); count: 31; 4.33% > > Is there any way of using the API more optimally? > Are there any opportunities for optimising the "topicDistributions" code > path in MLlib? > > My code looks like this: > > // executed once > val model = LocalLDAModel.load(ctx, ModelFileName) > > // executed four times > val samples = Transformers.toSparseVectors(vocabularySize, > ctx.parallelize(Seq(input))) // fast > model.topicDistributions(samples.zipWithIndex.map(_.swap)) // <== this > seems to take about 4 seconds to execute > > > marko > -- Pedro Rodriguez PhD Student in Distributed Machine Learning | CU Boulder UC Berkeley AMPLab Alumni ski.rodrig...@gmail.com | pedrorodriguez.io | 208-340-1703 Github: github.com/EntilZha | LinkedIn: https://www.linkedin.com/in/pedrorodriguezscience
Re: Re: How can I know currently supported functions in Spark SQL
Worth noting that Spark 1.5 is extending that list of Spark SQL functions quite a bit. Not sure where in the docs they would be yet, but the JIRA is here: https://issues.apache.org/jira/browse/SPARK-8159 On Thu, Aug 6, 2015 at 7:27 PM, Netwaver wanglong_...@163.com wrote: Thanks for your kindly help At 2015-08-06 19:28:10, Todd Nist tsind...@gmail.com wrote: They are covered here in the docs: http://spark.apache.org/docs/1.4.1/api/scala/index.html#org.apache.spark.sql.functions$ On Thu, Aug 6, 2015 at 5:52 AM, Netwaver wanglong_...@163.com wrote: Hi All, I am using Spark 1.4.1, and I want to know how can I find the complete function list supported in Spark SQL, currently I only know 'sum','count','min','max'. Thanks a lot. -- Pedro Rodriguez PhD Student in Distributed Machine Learning | CU Boulder UC Berkeley AMPLab Alumni ski.rodrig...@gmail.com | pedrorodriguez.io | 208-340-1703 Github: github.com/EntilZha | LinkedIn: https://www.linkedin.com/in/pedrorodriguezscience
Re: Spark Interview Questions
You might look at the edx course on Apache Spark or ML with Spark. There are probably some homework problems or quiz questions that might be relevant. I haven't looked at the course myself, but thats where I would go first. https://www.edx.org/course/introduction-big-data-apache-spark-uc-berkeleyx-cs100-1x https://www.edx.org/course/scalable-machine-learning-uc-berkeleyx-cs190-1x -- Pedro Rodriguez PhD Student in Distributed Machine Learning | CU Boulder UC Berkeley AMPLab Alumni ski.rodrig...@gmail.com | pedrorodriguez.io | 208-340-1703 Github: github.com/EntilZha | LinkedIn: https://www.linkedin.com/in/pedrorodriguezscience
Re: Spark SQL Table Caching
I would be interested in the answer to this question, plus the relationship between those and registerTempTable() Pedro On Tue, Jul 21, 2015 at 1:59 PM, Brandon White bwwintheho...@gmail.com wrote: A few questions about caching a table in Spark SQL. 1) Is there any difference between caching the dataframe and the table? df.cache() vs sqlContext.cacheTable(tableName) 2) Do you need to warm up the cache before seeing the performance benefits? Is the cache LRU? Do you need to run some queries on the table before it is cached in memory? 3) Is caching the table much faster than .saveAsTable? I am only seeing a 10 %- 20% performance increase. -- Pedro Rodriguez UCBerkeley 2014 | Computer Science SnowGeek http://SnowGeek.org pedro-rodriguez.com ski.rodrig...@gmail.com 208-340-1703
Re: Check for null in PySpark DataFrame
Thanks for the tip. Any idea why the intuitive answer doesn't work ( != None)? I inspected the Row columns and they do indeed have a None value. I would suspect that somehow Python's None is translated to something in jvm which doesn't equal to null? I might check out the source code for a better idea as well Pedro On Wed, Jul 1, 2015 at 12:18 PM, Michael Armbrust mich...@databricks.com wrote: There is an isNotNull function on any column. df._1.isNotNull or from pyspark.sql.functions import * col(myColumn).isNotNull On Wed, Jul 1, 2015 at 3:07 AM, Olivier Girardot ssab...@gmail.com wrote: I must admit I've been using the same back to SQL strategy for now :p So I'd be glad to have insights into that too. Le mar. 30 juin 2015 à 23:28, pedro ski.rodrig...@gmail.com a écrit : I am trying to find what is the correct way to programmatically check for null values for rows in a dataframe. For example, below is the code using pyspark and sql: df = sqlContext.createDataFrame(sc.parallelize([(1, None), (2, a), (3, b), (4, None)])) df.where('_2 is not null').count() However, this won't work df.where(df._2 != None).count() It seems there is no native Python way with DataFrames to do this, but I find that difficult to believe and more likely that I am missing the right way to do this. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Check-for-null-in-PySpark-DataFrame-tp23553.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Pedro Rodriguez UCBerkeley 2014 | Computer Science SnowGeek http://SnowGeek.org pedro-rodriguez.com ski.rodrig...@gmail.com 208-340-1703