Re: Databricks Cloud vs AWS EMR
We use both databricks and emr. We use databricks for our exploratory / adhoc use cases because their notebook is pretty badass and better than Zeppelin IMHO. We use EMR for our production machine learning and ETL tasks. The nice thing about EMR is you can use applications other than spark. From a "tools in the toolbox" perspective this is very important. M > On Jan 28, 2016, at 6:05 PM, Sourav Mazumder> wrote: > > You can also try out IBM's spark as a service in IBM Bluemix. You'll get > there all required features for security, multitenancy, notebook, integration > with other big data services. You can try that out for free too. > > Regards, > Sourav > > On Thu, Jan 28, 2016 at 2:10 PM, Rakesh Soni wrote: At its core, EMR just launches Spark applications, whereas Databricks is a higher-level platform that also includes multi-user support, an interactive UI, security, and job scheduling. Specifically, Databricks runs standard Spark applications inside a user’s AWS account, similar to EMR, but it adds a variety of features to create an end-to-end environment for working with Spark. These include: Interactive UI (includes a workspace with notebooks, dashboards, a job scheduler, point-and-click cluster management) Cluster sharing (multiple users can connect to the same cluster, saving cost) Security features (access controls to the whole workspace) Collaboration (multi-user access to the same notebook, revision control, and IDE and GitHub integration) Data management (support for connecting different data sources to Spark, caching service to speed up queries) The idea is that a lot of Spark deployments soon need to bring in multiple users, different types of jobs, etc, and we want to have these built-in. But if you just want to connect to existing data and run jobs, that also works. The cluster manager in Databricks is based on Standalone mode, not YARN, but Databricks adds several features, such as allowing multiple users to run commands on the same cluster and running multiple versions of Spark. Because Databricks is also the team that initially built Spark, the service is very up to date and integrated with the newest Spark features -- e.g. you can run previews of the next release, any data in Spark can be displayed visually, etc. From: Alex Nastetsky Subject: Databricks Cloud vs AWS EMR Date: January 26, 2016 at 11:55:41 AM PST To: user As a user of AWS EMR (running Spark and MapReduce), I am interested in potential benefits that I may gain from Databricks Cloud. I was wondering if anyone has used both and done comparison / contrast between the two services. In general, which resource manager(s) does Databricks Cloud use for Spark? If it's YARN, can you also run MapReduce jobs in Databricks Cloud? Thanks. >> -- >
Re: fishing for help!
If you are running on Amazon, then it's always a crapshoot as well. M > On Dec 21, 2015, at 4:41 PM, Josh Rosenwrote: > > @Eran, are Server 1 and Server 2 both part of the same cluster / do they have > similar positions in the network topology w.r.t the Spark executors? If > Server 1 had fast network access to the executors but Server 2 was across a > WAN then I'd expect the job to run slower from Server 2 duet to the extra > network latency / reduced bandwidth. This is assuming that you're running the > driver in non-cluster deploy mode (so the driver process runs on the machine > which submitted the job). > >> On Mon, Dec 21, 2015 at 1:30 PM, Igor Berman wrote: >> look for differences: packages versions, cpu/network/memory diff etc etc >> >> >>> On 21 December 2015 at 14:53, Eran Witkon wrote: >>> Hi, >>> I know it is a wide question but can you think of reasons why a pyspark job >>> which runs on from server 1 using user 1 will run faster then the same job >>> when running on server 2 with user 1 >>> Eran >
Re: newbie best practices: is spark-ec2 intended to be used to manage long-lasting infrastructure ?
If you are running on AWS I would recommend using s3 instead of hdfs as a general practice if you are maintaining state or data there. This way you can treat your spark clusters as ephemeral compute resources that you can swap out easily -- eg if something breaks just spin up a fresh cluster and redirect your workload rather than fighting a fire and trying to debug and fix a broken cluster. It simplifies operations once you are in prod. M Sent from my iPhone > On Dec 4, 2015, at 6:42 AM, Sean Owenwrote: > > There is no way to upgrade a running cluster here. You can stop a > cluster, and simply start a new cluster in the same way you started > the original cluster. That ought to be simple; the only issue I > suppose is that you have down-time since you have to shut the whole > thing down, but maybe that's acceptable. > > If you have data, including HDFS, set up on ephemeral disks though > then yes that is lost. Really that's an 'ephemeral' HDFS cluster. It > has nothing to do with partitions. > > You would want to get the data out to S3 first, and then copy it back > in later. Yes it's manual, but works fine. > > For more production use cases, on Amazon, you probably want to look > into a distribution or product around Spark rather than manage it > yourself. That could be AWS's own EMR, Databricks cloud, or even CDH > running on AWS. Those would give you much more of a chance of > automatically getting updates and so on, but they're fairly different > products. > >> On Fri, Dec 4, 2015 at 3:21 AM, Divya Gehlot wrote: >> Hello, >> Even I have the same queries in mind . >> What all the upgrades where we can use EC2 as compare to normal servers for >> spark and other big data product development . >> Hope to get inputs from the community . >> >> Thanks, >> Divya >> >> On Dec 4, 2015 6:05 AM, "Andy Davidson" >> wrote: >>> >>> About 2 months ago I used spark-ec2 to set up a small cluster. The cluster >>> runs a spark streaming app 7x24 and stores the data to hdfs. I also need to >>> run some batch analytics on the data. >>> >>> Now that I have a little more experience I wonder if this was a good way >>> to set up the cluster the following issues >>> >>> I have not been able to find explicit directions for upgrading the spark >>> version >>> >>> >>> http://search-hadoop.com/m/q3RTt7E0f92v0tKh2=Re+Upgrading+Spark+in+EC2+clusters >>> >>> I am not sure where the data is physically be stored. I think I may >>> accidentally loose all my data >>> spark-ec2 makes it easy to launch a cluster with as many machines as you >>> like how ever Its not clear how I would add slaves to an existing >>> installation >>> >>> >>> Our Java streaming app we call rdd.saveAsTextFile(“hdfs://path”); >>> >>> ephemeral-hdfs/conf/hdfs-site.xml: >>> >>> >>> >>>dfs.data.dir >>> >>>/mnt/ephemeral-hdfs/data,/mnt2/ephemeral-hdfs/data >>> >>> >>> >>> >>> persistent-hdfs/conf/hdfs-site.xml >>> >>> >>> $ mount >>> >>> /dev/xvdb on /mnt type ext3 (rw,nodiratime) >>> >>> /dev/xvdf on /mnt2 type ext3 (rw,nodiratime) >>> >>> >>> http://spark.apache.org/docs/latest/ec2-scripts.html >>> >>> >>> "The spark-ec2 script also supports pausing a cluster. In this case, the >>> VMs are stopped but not terminated, so they lose all data on ephemeral disks >>> but keep the data in their root partitions and their persistent-pdfs.” >>> >>> >>> Initially I though using HDFS was a good idea. spark-ec2 makes HDFS easy >>> to use. I incorrectly thought spark some how knew how HDFS partitioned my >>> data. >>> >>> I think many people are using amazon s3. I do not have an direct >>> experience with S3. My concern would be that the data is not physically >>> stored closed to my slaves. I.e. High communication costs. >>> >>> Any suggestions would be greatly appreciated >>> >>> Andy > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: RDD functions
http://spark.apache.org/docs/latest/programming-guide.html#rdd-operations M > On Dec 4, 2015, at 8:21 AM, Sateesh Karuturi> wrote: > > Hello Spark experts... > Iam new to Apache Spark..Can anyone send me the proper Documentation to learn > RDD functions. > Thanks in advance...
Re: Is Temporary Access Credential (AccessKeyId, SecretAccessKey + SecurityToken) support by Spark?
We were looking into this as well --- the answer looks like "no" Here's the ticket: https://issues.apache.org/jira/browse/HADOOP-9680 m On Fri, Dec 4, 2015 at 1:41 PM, Lin, Haowrote: > Hi, > > > > Does anyone knows if Spark run in AWS is supported by temporary access > credential (AccessKeyId, SecretAccessKey + SecurityToken) to access S3? I > only see references to specify fs.s3.awsAccessKeyId and > fs.s3.awsSecretAccessKey, without any mention of security token. Apparently > this is only for static credential. > > > > Many thanks > Confidentiality Notice:: This email, including attachments, may include > non-public, proprietary, confidential or legally privileged information. If > you are not an intended recipient or an authorized agent of an intended > recipient, you are hereby notified that any dissemination, distribution or > copying of the information contained in or transmitted with this e-mail is > unauthorized and strictly prohibited. If you have received this email in > error, please notify the sender by replying to this message and permanently > delete this e-mail, its attachments, and any copies of it immediately. You > should not retain, copy or use this e-mail or any attachment for any > purpose, nor disclose all or any part of the contents to any other person. > Thank you. >
Re: Low Latency SQL query
You should consider presto for this use case. If you want fast "first query" times it is a better fit. I think sparksql will catch up at some point but if you are not doing multiple queries against data cached in RDDs and need low latency it may not be a good fit. M > On Dec 1, 2015, at 7:23 PM, Andrés Ivaldiwrote: > > Ok, so latency problem is being generated because I'm using SQL as source? > how about csv, hive, or another source? > > On Tue, Dec 1, 2015 at 9:18 PM, Mark Hamstra wrote: >>> It is not designed for interactive queries. >> >> You might want to ask the designers of Spark, Spark SQL, and particularly >> some things built on top of Spark (such as BlinkDB) about their intent with >> regard to interactive queries. Interactive queries are not the only >> designed use of Spark, but it is going too far to claim that Spark is not >> designed at all to handle interactive queries. >> >> That being said, I think that you are correct to question the wisdom of >> expecting lowest-latency query response from Spark using SQL (sic, >> presumably a RDBMS is intended) as the datastore. >> >>> On Tue, Dec 1, 2015 at 4:05 PM, Jörn Franke wrote: >>> Hmm it will never be faster than SQL if you use SQL as an underlying >>> storage. Spark is (currently) an in-memory batch engine for iterative >>> machine learning workloads. It is not designed for interactive queries. >>> Currently hive is going into the direction of interactive queries. >>> Alternatives are Hbase on Phoenix or Impala. >>> On 01 Dec 2015, at 21:58, Andrés Ivaldi wrote: Yes, The use case would be, Have spark in a service (I didnt invertigate this yet), through api calls of this service we perform some aggregations over data in SQL, We are already doing this with an internal development Nothing complicated, for instance, a table with Product, Product Family, cost, price, etc. Columns like Dimension and Measures, I want to Spark for query that table to perform a kind of rollup, with cost as Measure and Prodcut, Product Family as Dimension Only 3 columns, it takes like 20s to perform that query and the aggregation, the query directly to the database with a grouping at the columns takes like 1s regards > On Tue, Dec 1, 2015 at 5:38 PM, Jörn Franke wrote: > can you elaborate more on the use case? > > > On 01 Dec 2015, at 20:51, Andrés Ivaldi wrote: > > > > Hi, > > > > I'd like to use spark to perform some transformations over data stored > > inSQL, but I need low Latency, I'm doing some test and I run into spark > > context creation and data query over SQL takes too long time. > > > > Any idea for speed up the process? > > > > regards. > > > > -- > > Ing. Ivaldi Andres -- Ing. Ivaldi Andres > > > > -- > Ing. Ivaldi Andres
Re: Additional Master daemon classpath
Hi, Thanks for the suggestion -- but those classpaths config options only affect the driver and executor processes -- not the standalone mode daemons (master and slave). Incidentally we have the extra jars we need set there. I went through the docs but couldn't find a place to set extra classpath for the daemons. M > On Nov 18, 2015, at 1:19 AM, "memorypr...@gmail.com" <memorypr...@gmail.com> > wrote: > > Have you tried using > spark.driver.extraClassPath > and > spark.executor.extraClassPath > > ? > > AFAICT these config options replace SPARK_CLASSPATH. Further info in the > docs. I've had good luck with these options, and for ease of use I just set > them in the spark defaults config. > > https://spark.apache.org/docs/latest/configuration.html > >> On Tue, 17 Nov 2015 at 21:06 Michal Klos <michal.klo...@gmail.com> wrote: >> Hi, >> >> We are running a Spark Standalone cluster on EMR (note: not using YARN) and >> are trying to use S3 w/ EmrFS as our event logging directory. >> >> We are having difficulties with a ClassNotFoundException on EmrFileSystem >> when we navigate to the event log screen. This is to be expected as the >> EmrFs jars are not on the classpath. >> >> But -- I have not been able to figure out a way to add additional classpath >> jars to the start-up of the Master daemon. SPARK_CLASSPATH has been >> deprecated, and looking around at spark-class, etc.. everything seems to be >> pretty locked down. >> >> Do I have to shove everything into the assembly jar? >> >> Am I missing a simple way to add classpath to the daemons? >> >> thanks, >> Michal
Re: Additional Master daemon classpath
We solved this by adding to spark-class script. At the bottom before the exec statement we intercepted the command that was constructed and injected our additional class path : for ((i=0; i<${#CMD[@]}; i++)); do if [[ ${CMD[$i]} == *"$SPARK_ASSEMBLY_JAR"* ]] then CMD[$i]="${CMD[$i]}:/usr/lib/hadoop/*.jar:/usr/share/aws/aws-java-sdk/aws-java-sdk-cloudwatch-1.10.4.jar:/usr/share/aws/emr/emrfs/lib/*" fi done exec "${CMD[@]}" M > On Nov 18, 2015, at 1:19 AM, "memorypr...@gmail.com" <memorypr...@gmail.com> > wrote: > > Have you tried using > spark.driver.extraClassPath > and > spark.executor.extraClassPath > > ? > > AFAICT these config options replace SPARK_CLASSPATH. Further info in the > docs. I've had good luck with these options, and for ease of use I just set > them in the spark defaults config. > > https://spark.apache.org/docs/latest/configuration.html > >> On Tue, 17 Nov 2015 at 21:06 Michal Klos <michal.klo...@gmail.com> wrote: >> Hi, >> >> We are running a Spark Standalone cluster on EMR (note: not using YARN) and >> are trying to use S3 w/ EmrFS as our event logging directory. >> >> We are having difficulties with a ClassNotFoundException on EmrFileSystem >> when we navigate to the event log screen. This is to be expected as the >> EmrFs jars are not on the classpath. >> >> But -- I have not been able to figure out a way to add additional classpath >> jars to the start-up of the Master daemon. SPARK_CLASSPATH has been >> deprecated, and looking around at spark-class, etc.. everything seems to be >> pretty locked down. >> >> Do I have to shove everything into the assembly jar? >> >> Am I missing a simple way to add classpath to the daemons? >> >> thanks, >> Michal
Additional Master daemon classpath
Hi, We are running a Spark Standalone cluster on EMR (note: not using YARN) and are trying to use S3 w/ EmrFS as our event logging directory. We are having difficulties with a ClassNotFoundException on EmrFileSystem when we navigate to the event log screen. This is to be expected as the EmrFs jars are not on the classpath. But -- I have not been able to figure out a way to add additional classpath jars to the start-up of the Master daemon. SPARK_CLASSPATH has been deprecated, and looking around at spark-class, etc.. everything seems to be pretty locked down. Do I have to shove everything into the assembly jar? Am I missing a simple way to add classpath to the daemons? thanks, Michal
Re: Partitioned Parquet based external table
You must add the partitions to the Hive table with something like "alter table your_table add if not exists partition (country='us');". If you have dynamic partitioning turned on, you can do 'msck repair table your_table' to recover the partitions. I would recommend reviewing the Hive documentation on partitions M > On Nov 12, 2015, at 6:38 AM, Chandra Mohan, Ananda Vel Murugan >wrote: > > Hi, > > I am using Spark 1.5.1. > > https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java. > I have slightly modified this example to create partitioned parquet file > > Instead of this line > > schemaPeople.write().parquet("people.parquet"); > > I use this line > > schemaPeople.write().partitionBy("country").parquet("/user/Ananda/people.parquet"); > > I have also updated the Person class and added country attribute. I have also > updated my input file accordingly. > > When I run this code in spark, it seems to work. I could see partitioned > folder and parquet file inside it in HDFS where I store this parquet file. > > But when I create a external table in Hive, it does not work. When I do > “select * from person5”, it returns no rows. > > This is how I create the table > > CREATE EXTERNAL TABLE person5(name string, age int,city string) > PARTITIONED BY (country string) > STORED AS PARQUET > LOCATION '/user/ananda/people.parquet/'; > > When I create a non partitioned table, it works fine. > > Please help if you have any idea. > > Regards, > Anand.C
Add to Powered by Spark page
Hi, We would like to be added to the Powered by Spark list: organization name: Localytics URL: http://eng.localytics.com/ a list of which Spark components you are using: Spark, Spark Streaming, MLLib a short description of your use case: Batch, real-time, and predictive analytics driving our mobile app analytics and marketing automation product. thanks, M
Re: RDD resiliency -- does it keep state?
got it thanks. Making sure everything is idempotent is definitely a critical piece for peace of mind. On Sat, Mar 28, 2015 at 1:47 PM, Aaron Davidson ilike...@gmail.com wrote: Note that speculation is off by default to avoid these kinds of unexpected issues. On Sat, Mar 28, 2015 at 6:21 AM, Steve Loughran ste...@hortonworks.com wrote: It's worth adding that there's no guaranteed that re-evaluated work would be on the same host as before, and in the case of node failure, it is not guaranteed to be elsewhere. this means things that depend on host-local information is going to generate different numbers even if there are no other side effects. random number generation for seeding RDD.sample() would be a case in point here. There's also the fact that if you enable speculative execution, then operations may be repeated —even in the absence of any failure. If you are doing side effect work, or don't have an output committer whose actions are guaranteed to be atomic then you want to turn that option off. On 27 Mar 2015, at 19:46, Patrick Wendell pwend...@gmail.com wrote: If you invoke this, you will get at-least-once semantics on failure. For instance, if a machine dies in the middle of executing the foreach for a single partition, that will be re-executed on another machine. It could even fully complete on one machine, but the machine dies immediately before reporting the result back to the driver. This means you need to make sure the side-effects are idempotent, or use some transactional locking. Spark's own output operations, such as saving to Hadoop, use such mechanisms. For instance, in the case of Hadoop it uses the OutputCommitter classes. - Patrick On Fri, Mar 27, 2015 at 12:36 PM, Michal Klos michal.klo...@gmail.com wrote: Hi Spark group, We haven't been able to find clear descriptions of how Spark handles the resiliency of RDDs in relationship to executing actions with side-effects. If you do an `rdd.foreach(someSideEffect)`, then you are doing a side-effect for each element in the RDD. If a partition goes down -- the resiliency rebuilds the data, but did it keep track of how far it go in the partition's set of data or will it start from the beginning again. So will it do at-least-once execution of foreach closures or at-most-once? thanks, Michal - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RDD resiliency -- does it keep state?
Hi Spark group, We haven't been able to find clear descriptions of how Spark handles the resiliency of RDDs in relationship to executing actions with side-effects. If you do an `rdd.foreach(someSideEffect)`, then you are doing a side-effect for each element in the RDD. If a partition goes down -- the resiliency rebuilds the data, but did it keep track of how far it go in the partition's set of data or will it start from the beginning again. So will it do at-least-once execution of foreach closures or at-most-once? thanks, Michal
Spark yarn-client submission example?
Hi, We have a Scala application and we want it to programmatically submit Spark jobs to a Spark-YARN cluster in yarn-client mode. We're running into a lot of classpath issues, e.g. once submitted it looks for jars in our parent Scala application's local directory, jars that it shouldn't need. Our setJars in the SparkContext only mentions our fat jar, which should be all it needs. We are not sure why the other jars are being included once we submit and we don't see a mechanism to control what it wants. Here's a sample error: Diagnostics: java.io.FileNotFoundException: File file:/Users/github/spark/kindling-container/lib/spark-assembly-1.2.1-hadoop2.4.0.jar does not exist Failing this attempt. Failing the application. I read through the user list and there was discussion around possibly using Client.scala? Are there any code examples out there that we could use as reference? thanks, Michal
Re: Define exception handling on lazy elements?
Is there a way to have the exception handling go lazily along with the definition? e.g... we define it on the RDD but then our exception handling code gets triggered on that first action... without us having to define it on the first action? (e.g. that RDD code is boilerplate and we want to just have it in many many projects) m On Wed, Mar 11, 2015 at 10:08 AM, Sean Owen so...@cloudera.com wrote: Handling exceptions this way means handling errors on the driver side, which may or may not be what you want. You can also write functions with exception handling inside, which could make more sense in some cases (like, to ignore bad records or count them or something). If you want to handle errors at every step on the driver side, you have to force RDDs to materialize to see if they work. You can do that with .count() or .take(1).length 0. But to avoid recomputing the RDD then, it needs to be cached. So there is a big non-trivial overhead to approaching it this way. If you go this way, consider materializing only a few key RDDs in your flow, not every one. The most natural thing is indeed to handle exceptions where the action occurs. On Wed, Mar 11, 2015 at 1:51 PM, Michal Klos michal.klo...@gmail.com wrote: Hi Spark Community, We would like to define exception handling behavior on RDD instantiation / build. Since the RDD is lazily evaluated, it seems like we are forced to put all exception handling in the first action call? This is an example of something that would be nice: def myRDD = { Try { val rdd = sc.textFile(...) } match { Failure(e) = Handle ... } } myRDD.reduceByKey(...) //don't need to worry about that exception here The reason being that we want to try to avoid having to copy paste exception handling boilerplate on every first action. We would love to define this once somewhere for the RDD build code and just re-use. Is there a best practice for this? Are we missing something here? thanks, Michal
Re: Define exception handling on lazy elements?
Well I'm thinking that this RDD would fail to build in a specific way... different from the subsequent code (e.g. s3 access denied or timeout on connecting to a database) So for example, define the RDD failure handling on the RDD, define the action failure handling on the action? Does this make sense.. otherwise... on that first action, we have to handle exceptions for all of the lazy elements that preceded it.. and that could be a lot of stuff. If the RDD failure handling code is defined with the RDD, it just seems cleaner because it's right next to its element. Not to mention, we believe it would be easier to import it into multiple spark jobs without a lot of copy pasta m On Wed, Mar 11, 2015 at 10:45 AM, Sean Owen so...@cloudera.com wrote: Hm, but you already only have to define it in one place, rather than on each transformation. I thought you wanted exception handling at each transformation? Or do you want it once for all actions? you can enclose all actions in a try-catch block, I suppose, to write exception handling code once. You can easily write a Scala construct that takes a function and logs exceptions it throws, and the function you pass can invoke an RDD action. So you can refactor that way too. On Wed, Mar 11, 2015 at 2:39 PM, Michal Klos michal.klo...@gmail.com wrote: Is there a way to have the exception handling go lazily along with the definition? e.g... we define it on the RDD but then our exception handling code gets triggered on that first action... without us having to define it on the first action? (e.g. that RDD code is boilerplate and we want to just have it in many many projects) m On Wed, Mar 11, 2015 at 10:08 AM, Sean Owen so...@cloudera.com wrote: Handling exceptions this way means handling errors on the driver side, which may or may not be what you want. You can also write functions with exception handling inside, which could make more sense in some cases (like, to ignore bad records or count them or something). If you want to handle errors at every step on the driver side, you have to force RDDs to materialize to see if they work. You can do that with .count() or .take(1).length 0. But to avoid recomputing the RDD then, it needs to be cached. So there is a big non-trivial overhead to approaching it this way. If you go this way, consider materializing only a few key RDDs in your flow, not every one. The most natural thing is indeed to handle exceptions where the action occurs. On Wed, Mar 11, 2015 at 1:51 PM, Michal Klos michal.klo...@gmail.com wrote: Hi Spark Community, We would like to define exception handling behavior on RDD instantiation / build. Since the RDD is lazily evaluated, it seems like we are forced to put all exception handling in the first action call? This is an example of something that would be nice: def myRDD = { Try { val rdd = sc.textFile(...) } match { Failure(e) = Handle ... } } myRDD.reduceByKey(...) //don't need to worry about that exception here The reason being that we want to try to avoid having to copy paste exception handling boilerplate on every first action. We would love to define this once somewhere for the RDD build code and just re-use. Is there a best practice for this? Are we missing something here? thanks, Michal
Define exception handling on lazy elements?
Hi Spark Community, We would like to define exception handling behavior on RDD instantiation / build. Since the RDD is lazily evaluated, it seems like we are forced to put all exception handling in the first action call? This is an example of something that would be nice: def myRDD = { Try { val rdd = sc.textFile(...) } match { Failure(e) = Handle ... } } myRDD.reduceByKey(...) //don't need to worry about that exception here The reason being that we want to try to avoid having to copy paste exception handling boilerplate on every first action. We would love to define this once somewhere for the RDD build code and just re-use. Is there a best practice for this? Are we missing something here? thanks, Michal
Re: Scalable JDBCRDD
Hi Cody, Thanks for the reply. Yea, we thought of possibly doing this in a UDX in Vertica somehow to get the lower level co-operation but its a bit daunting. We want to do this because there are things we want to do with the result-set in Spark that are not possible in Vertica. The DStream receiver is a good thought. I think right now, we are learning towards eric's suggestion -- where we run the big query once somewhere (getPartitions maybe) in Vertica and dumps into a temporary table with an additional generated partition_key field. Then we have the workers issue N select * from temp-table where partition_key = ? queries that are hopefully lightweight. The temporary table we are hoping will just clean itself up so we don't need to handle that mess. It seems like the most sane approach today ;] m On Mon, Mar 2, 2015 at 10:51 AM, Cody Koeninger c...@koeninger.org wrote: Have you already tried using the Vertica hadoop input format with spark? I don't know how it's implemented, but I'd hope that it has some notion of vertica-specific shard locality (which JdbcRDD does not). If you're really constrained to consuming the result set in a single thread, whatever processing you're doing of the results must be time-consuming enough to make the overhead of distributing it in a spark job still worthwhile? I guess you might take a look at doing a custom DStream receiver that iterates over the result set and makes micro-batches out of it. On Sun, Mar 1, 2015 at 9:59 AM, michal.klo...@gmail.com michal.klo...@gmail.com wrote: Yes exactly. The temp table is an approach but then we need to manage the deletion of it etc. I'm sure we won't be the only people with this crazy use case. If there isn't a feasible way to do this within the framework then that's okay. But if there is a way we are happy to write the code and PR it back :) M On Mar 1, 2015, at 10:02 AM, eric e...@ericjbell.com wrote: What you're saying is that, due to the intensity of the query, you need to run a single query and partition the results, versus running one query for each partition. I assume it's not viable to throw the query results into another table in your database and then query that using the normal approach? --eric On 3/1/15 4:28 AM, michal.klo...@gmail.com wrote: Jorn: Vertica Cody: I posited the limit just as an example of how jdbcrdd could be used least invasively. Let's say we used a partition on a time field -- we would still need to have N executions of those queries. The queries we have are very intense and concurrency is an issue even if the the N partitioned queries are smaller. Some queries require evaluating the whole data set first. If our use case a simple select * from table.. Then the partitions would be an easier sell if it wasn't for the concurrency problem :) Long story short -- we need only one execution of the query and would like to just divy out the result set. M On Mar 1, 2015, at 5:18 AM, Jörn Franke jornfra...@gmail.com wrote: What database are you using? Le 28 févr. 2015 18:15, Michal Klos michal.klo...@gmail.com a écrit : Hi Spark community, We have a use case where we need to pull huge amounts of data from a SQL query against a database into Spark. We need to execute the query against our huge database and not a substitute (SparkSQL, Hive, etc) because of a couple of factors including custom functions used in the queries that only our database has. We started by looking at JDBC RDD, which utilizes a prepared statement with two parameters that are meant to be used to partition the result set to the workers... e.g.: select * from table limit ?,? turns into select * from table limit 1,100 on worker 1 select * from table limit 101,200 on worker 2 This will not work for us because our database cannot support multiple execution of these queries without being crippled. But, additionally, our database doesn't support the above LIMIT syntax and we don't have a generic way of partitioning the various queries. As a result -- we stated by forking JDBCRDD and made a version that executes the SQL query once in getPartitions into a Vector and then hands each worker node an index and iterator. Here's a snippet of getPartitions and compute: override def getPartitions: Array[Partition] = { //Compute the DB query once here val results = computeQuery (0 until numPartitions).map(i = { // TODO: would be better to do this partitioning when scrolling through result set if still loading into memory val partitionItems = results.drop(i).sliding(1, numPartitions).flatten.toVector new DBPartition(i, partitionItems) }).toArray } override def compute(thePart: Partition, context: TaskContext) = new NextIterator[T] { val part = thePart.asInstanceOf[DBPartition[T]] //Shift the result vector to our index number and then do a sliding iterator over it val iterator
Scalable JDBCRDD
Hi Spark community, We have a use case where we need to pull huge amounts of data from a SQL query against a database into Spark. We need to execute the query against our huge database and not a substitute (SparkSQL, Hive, etc) because of a couple of factors including custom functions used in the queries that only our database has. We started by looking at JDBC RDD, which utilizes a prepared statement with two parameters that are meant to be used to partition the result set to the workers... e.g.: select * from table limit ?,? turns into select * from table limit 1,100 on worker 1 select * from table limit 101,200 on worker 2 This will not work for us because our database cannot support multiple execution of these queries without being crippled. But, additionally, our database doesn't support the above LIMIT syntax and we don't have a generic way of partitioning the various queries. As a result -- we stated by forking JDBCRDD and made a version that executes the SQL query once in getPartitions into a Vector and then hands each worker node an index and iterator. Here's a snippet of getPartitions and compute: override def getPartitions: Array[Partition] = { //Compute the DB query once here val results = computeQuery (0 until numPartitions).map(i = { // TODO: would be better to do this partitioning when scrolling through result set if still loading into memory val partitionItems = results.drop(i).sliding(1, numPartitions).flatten.toVector new DBPartition(i, partitionItems) }).toArray } override def compute(thePart: Partition, context: TaskContext) = new NextIterator[T] { val part = thePart.asInstanceOf[DBPartition[T]] //Shift the result vector to our index number and then do a sliding iterator over it val iterator = part.items.iterator override def getNext : T = { if (iterator.hasNext) { iterator.next() } else { finished = true null.asInstanceOf[T] } } override def close: Unit = () } This is a little better since we can just execute the query once. However, the result-set needs to fit in memory. We've been trying to brainstorm a way to A) have that result set distribute out to the worker RDD partitions as it's streaming in from the cursor? B) have the result set spill to disk if it exceeds memory and do something clever around the iterators? C) something else? We're not familiar enough yet with all of the workings of Spark to know how to proceed on this. We also thought of the worker-around of having the DB query dump to HDFS/S3 and then pick it up for there, but it adds more moving parts and latency to our processing. Does anyone have a clever suggestion? Are we missing something? thanks, Michal