Re: Databricks Cloud vs AWS EMR

2016-01-28 Thread Michal Klos
We use both databricks and emr. We use databricks for our exploratory / adhoc 
use cases because their notebook is pretty badass and better than Zeppelin IMHO.

We use EMR for our production machine learning and ETL tasks. The nice thing 
about EMR is you can use applications other than spark. From a "tools in the 
toolbox" perspective this is very important.

M

> On Jan 28, 2016, at 6:05 PM, Sourav Mazumder  
> wrote:
> 
> You can also try out IBM's spark as a service in IBM Bluemix. You'll get 
> there all required features for security, multitenancy, notebook, integration 
> with other big data services. You can try that out for free too.
> 
> Regards,
> Sourav
> 
> On Thu, Jan 28, 2016 at 2:10 PM, Rakesh Soni  wrote:
 At its core, EMR just launches Spark applications, whereas Databricks is a 
 higher-level platform that also includes multi-user support, an 
 interactive UI, security, and job scheduling.
 
 Specifically, Databricks runs standard Spark applications inside a user’s 
 AWS account, similar to EMR, but it adds a variety of features to create 
 an end-to-end environment for working with Spark. These include:
 
 Interactive UI (includes a workspace with notebooks, dashboards, a job 
 scheduler, point-and-click cluster management)
 Cluster sharing (multiple users can connect to the same cluster, saving 
 cost)
 Security features (access controls to the whole workspace)
 Collaboration (multi-user access to the same notebook, revision control, 
 and IDE and GitHub integration)
 Data management (support for connecting different data sources to Spark, 
 caching service to speed up queries)
 
 The idea is that a lot of Spark deployments soon need to bring in multiple 
 users, different types of jobs, etc, and we want to have these built-in. 
 But if you just want to connect to existing data and run jobs, that also 
 works.
 
 The cluster manager in Databricks is based on Standalone mode, not YARN, 
 but Databricks adds several features, such as allowing multiple users to 
 run commands on the same cluster and running multiple versions of Spark. 
 Because Databricks is also the team that initially built Spark, the 
 service is very up to date and integrated with the newest Spark features 
 -- e.g. you can run previews of the next release, any data in Spark can be 
 displayed visually, etc.
 
 From: Alex Nastetsky 
 Subject: Databricks Cloud vs AWS EMR
 Date: January 26, 2016 at 11:55:41 AM PST
 To: user 
 
 As a user of AWS EMR (running Spark and MapReduce), I am interested in 
 potential benefits that I may gain from Databricks Cloud. I was wondering 
 if anyone has used both and done comparison / contrast between the two 
 services.
 
 In general, which resource manager(s) does Databricks Cloud use for Spark? 
 If it's YARN, can you also run MapReduce jobs in Databricks Cloud?
 
 Thanks.
>> --
> 


Re: fishing for help!

2015-12-21 Thread Michal Klos
If you are running on Amazon, then it's always a crapshoot as well.

M

> On Dec 21, 2015, at 4:41 PM, Josh Rosen  wrote:
> 
> @Eran, are Server 1 and Server 2 both part of the same cluster / do they have 
> similar positions in the network topology w.r.t the Spark executors? If 
> Server 1 had fast network access to the executors but Server 2 was across a 
> WAN then I'd expect the job to run slower from Server 2 duet to the extra 
> network latency / reduced bandwidth. This is assuming that you're running the 
> driver in non-cluster deploy mode (so the driver process runs on the machine 
> which submitted the job).
> 
>> On Mon, Dec 21, 2015 at 1:30 PM, Igor Berman  wrote:
>> look for differences: packages versions, cpu/network/memory diff etc etc
>> 
>> 
>>> On 21 December 2015 at 14:53, Eran Witkon  wrote:
>>> Hi,
>>> I know it is a wide question but can you think of reasons why a pyspark job 
>>> which runs on from server 1 using user 1 will run faster then the same job 
>>> when running on server 2 with user 1
>>> Eran
> 


Re: newbie best practices: is spark-ec2 intended to be used to manage long-lasting infrastructure ?

2015-12-04 Thread Michal Klos
If you are running on AWS I would recommend using s3 instead of hdfs as a 
general practice if you are maintaining state or data there. This way you can 
treat your spark clusters as ephemeral compute resources that you can swap out 
easily -- eg if something breaks just spin up a fresh cluster and redirect your 
workload rather than fighting a fire and trying to debug and fix a broken 
cluster. It simplifies operations once you are in prod.

M





Sent from my iPhone
> On Dec 4, 2015, at 6:42 AM, Sean Owen  wrote:
> 
> There is no way to upgrade a running cluster here. You can stop a
> cluster, and simply start a new cluster in the same way you started
> the original cluster. That ought to be simple; the only issue I
> suppose is that you have down-time since you have to shut the whole
> thing down, but maybe that's acceptable.
> 
> If you have data, including HDFS, set up on ephemeral disks though
> then yes that is lost. Really that's an 'ephemeral' HDFS cluster. It
> has nothing to do with partitions.
> 
> You would want to get the data out to S3 first, and then copy it back
> in later. Yes it's manual, but works fine.
> 
> For more production use cases, on Amazon, you probably want to look
> into a distribution or product around Spark rather than manage it
> yourself. That could be AWS's own EMR, Databricks cloud, or even CDH
> running on AWS. Those would give you much more of a chance of
> automatically getting updates and so on, but they're fairly different
> products.
> 
>> On Fri, Dec 4, 2015 at 3:21 AM, Divya Gehlot  wrote:
>> Hello,
>> Even I have the same queries in mind .
>> What all the upgrades where we can use EC2 as compare to normal servers for
>> spark and other big data product development .
>> Hope to get inputs from the community .
>> 
>> Thanks,
>> Divya
>> 
>> On Dec 4, 2015 6:05 AM, "Andy Davidson" 
>> wrote:
>>> 
>>> About 2 months ago I used spark-ec2 to set up a small cluster. The cluster
>>> runs a spark streaming app 7x24 and stores the data to hdfs. I also need to
>>> run some batch analytics on the data.
>>> 
>>> Now that I have a little more experience I wonder if this was a good way
>>> to set up the cluster the following issues
>>> 
>>> I have not been able to find explicit directions for upgrading the spark
>>> version
>>> 
>>> 
>>> http://search-hadoop.com/m/q3RTt7E0f92v0tKh2=Re+Upgrading+Spark+in+EC2+clusters
>>> 
>>> I am not sure where the data is physically be stored. I think I may
>>> accidentally loose all my data
>>> spark-ec2 makes it easy to launch a cluster with as many machines as you
>>> like how ever Its not clear how I would add slaves to an existing
>>> installation
>>> 
>>> 
>>> Our Java streaming app we call rdd.saveAsTextFile(“hdfs://path”);
>>> 
>>> ephemeral-hdfs/conf/hdfs-site.xml:
>>> 
>>>  
>>> 
>>>dfs.data.dir
>>> 
>>>/mnt/ephemeral-hdfs/data,/mnt2/ephemeral-hdfs/data
>>> 
>>>  
>>> 
>>> 
>>> persistent-hdfs/conf/hdfs-site.xml
>>> 
>>> 
>>> $ mount
>>> 
>>> /dev/xvdb on /mnt type ext3 (rw,nodiratime)
>>> 
>>> /dev/xvdf on /mnt2 type ext3 (rw,nodiratime)
>>> 
>>> 
>>> http://spark.apache.org/docs/latest/ec2-scripts.html
>>> 
>>> 
>>> "The spark-ec2 script also supports pausing a cluster. In this case, the
>>> VMs are stopped but not terminated, so they lose all data on ephemeral disks
>>> but keep the data in their root partitions and their persistent-pdfs.”
>>> 
>>> 
>>> Initially I though using HDFS was a good idea. spark-ec2 makes HDFS easy
>>> to use. I incorrectly thought spark some how knew how HDFS partitioned my
>>> data.
>>> 
>>> I think many people are using amazon s3. I do not have an direct
>>> experience with S3. My concern would be that the data is not physically
>>> stored closed to my slaves. I.e. High communication costs.
>>> 
>>> Any suggestions would be greatly appreciated
>>> 
>>> Andy
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: RDD functions

2015-12-04 Thread Michal Klos
http://spark.apache.org/docs/latest/programming-guide.html#rdd-operations

M

> On Dec 4, 2015, at 8:21 AM, Sateesh Karuturi  
> wrote:
> 
> Hello Spark experts...
> Iam new to Apache Spark..Can anyone send me the proper Documentation to learn 
> RDD functions.
> Thanks in advance...


Re: Is Temporary Access Credential (AccessKeyId, SecretAccessKey + SecurityToken) support by Spark?

2015-12-04 Thread Michal Klos
We were looking into this as well --- the answer looks like "no"

Here's the ticket:
https://issues.apache.org/jira/browse/HADOOP-9680

m


On Fri, Dec 4, 2015 at 1:41 PM, Lin, Hao  wrote:

> Hi,
>
>
>
> Does anyone knows if Spark run in AWS is supported by temporary access
> credential (AccessKeyId, SecretAccessKey + SecurityToken) to access S3?  I
> only see references to specify fs.s3.awsAccessKeyId and
> fs.s3.awsSecretAccessKey, without any mention of security token. Apparently
> this is only for static credential.
>
>
>
> Many thanks
> Confidentiality Notice:: This email, including attachments, may include
> non-public, proprietary, confidential or legally privileged information. If
> you are not an intended recipient or an authorized agent of an intended
> recipient, you are hereby notified that any dissemination, distribution or
> copying of the information contained in or transmitted with this e-mail is
> unauthorized and strictly prohibited. If you have received this email in
> error, please notify the sender by replying to this message and permanently
> delete this e-mail, its attachments, and any copies of it immediately. You
> should not retain, copy or use this e-mail or any attachment for any
> purpose, nor disclose all or any part of the contents to any other person.
> Thank you.
>


Re: Low Latency SQL query

2015-12-01 Thread Michal Klos
You should consider presto for this use case. If you want fast "first query" 
times it is a better fit.

I think sparksql will catch up at some point but if you are not doing multiple 
queries against data cached in RDDs and need low latency it may not be a good 
fit.

M

> On Dec 1, 2015, at 7:23 PM, Andrés Ivaldi  wrote:
> 
> Ok, so latency problem is being generated because I'm using SQL as source? 
> how about csv, hive, or another source?
> 
> On Tue, Dec 1, 2015 at 9:18 PM, Mark Hamstra  wrote:
>>> It is not designed for interactive queries.
>> 
>> You might want to ask the designers of Spark, Spark SQL, and particularly 
>> some things built on top of Spark (such as BlinkDB) about their intent with 
>> regard to interactive queries.  Interactive queries are not the only 
>> designed use of Spark, but it is going too far to claim that Spark is not 
>> designed at all to handle interactive queries.
>> 
>> That being said, I think that you are correct to question the wisdom of 
>> expecting lowest-latency query response from Spark using SQL (sic, 
>> presumably a RDBMS is intended) as the datastore.
>> 
>>> On Tue, Dec 1, 2015 at 4:05 PM, Jörn Franke  wrote:
>>> Hmm it will never be faster than SQL if you use SQL as an underlying 
>>> storage. Spark is (currently) an in-memory batch engine for iterative 
>>> machine learning workloads. It is not designed for interactive queries. 
>>> Currently hive is going into the direction of interactive queries. 
>>> Alternatives are Hbase on Phoenix or Impala.
>>> 
 On 01 Dec 2015, at 21:58, Andrés Ivaldi  wrote:
 
 Yes, 
 The use case would be,
 Have spark in a service (I didnt invertigate this yet), through api calls 
 of this service we perform some aggregations over data in SQL, We are 
 already doing this with an internal development
 
 Nothing complicated, for instance, a table with Product, Product Family, 
 cost, price, etc. Columns like Dimension and Measures,
 
 I want to Spark for query that table to perform a kind of rollup, with 
 cost as Measure and Prodcut, Product Family as Dimension
 
 Only 3 columns, it takes like 20s to perform that query and the 
 aggregation, the  query directly to the database with a grouping at the 
 columns takes like 1s 
 
 regards
 
 
 
> On Tue, Dec 1, 2015 at 5:38 PM, Jörn Franke  wrote:
> can you elaborate more on the use case?
> 
> > On 01 Dec 2015, at 20:51, Andrés Ivaldi  wrote:
> >
> > Hi,
> >
> > I'd like to use spark to perform some transformations over data stored 
> > inSQL, but I need low Latency, I'm doing some test and I run into spark 
> > context creation and data query over SQL takes too long time.
> >
> > Any idea for speed up the process?
> >
> > regards.
> >
> > --
> > Ing. Ivaldi Andres
 
 
 
 -- 
 Ing. Ivaldi Andres
> 
> 
> 
> -- 
> Ing. Ivaldi Andres


Re: Additional Master daemon classpath

2015-11-18 Thread Michal Klos
Hi,

Thanks for the suggestion -- but those classpaths config options only affect 
the driver and executor processes -- not the standalone mode daemons (master 
and slave). Incidentally we have the extra jars we need set there.

I went through the docs but couldn't find a place to set extra classpath for 
the daemons. 

M

> On Nov 18, 2015, at 1:19 AM, "memorypr...@gmail.com" <memorypr...@gmail.com> 
> wrote:
> 
> Have you tried using 
> spark.driver.extraClassPath
> and 
> spark.executor.extraClassPath
> 
> ?
> 
> AFAICT these config options replace SPARK_CLASSPATH. Further info in the 
> docs. I've had good luck with these options, and for ease of use I just set 
> them in the spark defaults config.
> 
> https://spark.apache.org/docs/latest/configuration.html
> 
>> On Tue, 17 Nov 2015 at 21:06 Michal Klos <michal.klo...@gmail.com> wrote:
>> Hi,
>> 
>> We are running a Spark Standalone cluster on EMR (note: not using YARN) and 
>> are trying to use S3 w/ EmrFS as our event logging directory.
>> 
>> We are having difficulties with a ClassNotFoundException on EmrFileSystem 
>> when we navigate to the event log screen. This is to be expected as the 
>> EmrFs jars are not on the classpath.
>> 
>> But -- I have not been able to figure out a way to add additional classpath 
>> jars to the start-up of the Master daemon. SPARK_CLASSPATH has been 
>> deprecated, and looking around at spark-class, etc.. everything seems to be 
>> pretty locked down. 
>> 
>> Do I have to shove everything into the assembly jar?
>> 
>> Am I missing a simple way to add classpath to the daemons?
>> 
>> thanks,
>> Michal


Re: Additional Master daemon classpath

2015-11-18 Thread Michal Klos
We solved this by adding to spark-class script. At the bottom before the exec 
statement we intercepted the command that was constructed and injected our 
additional class path :

for ((i=0; i<${#CMD[@]}; i++));
do
if [[ ${CMD[$i]} == *"$SPARK_ASSEMBLY_JAR"* ]]
then
   
CMD[$i]="${CMD[$i]}:/usr/lib/hadoop/*.jar:/usr/share/aws/aws-java-sdk/aws-java-sdk-cloudwatch-1.10.4.jar:/usr/share/aws/emr/emrfs/lib/*"
fi
done

exec "${CMD[@]}"

M


> On Nov 18, 2015, at 1:19 AM, "memorypr...@gmail.com" <memorypr...@gmail.com> 
> wrote:
> 
> Have you tried using 
> spark.driver.extraClassPath
> and 
> spark.executor.extraClassPath
> 
> ?
> 
> AFAICT these config options replace SPARK_CLASSPATH. Further info in the 
> docs. I've had good luck with these options, and for ease of use I just set 
> them in the spark defaults config.
> 
> https://spark.apache.org/docs/latest/configuration.html
> 
>> On Tue, 17 Nov 2015 at 21:06 Michal Klos <michal.klo...@gmail.com> wrote:
>> Hi,
>> 
>> We are running a Spark Standalone cluster on EMR (note: not using YARN) and 
>> are trying to use S3 w/ EmrFS as our event logging directory.
>> 
>> We are having difficulties with a ClassNotFoundException on EmrFileSystem 
>> when we navigate to the event log screen. This is to be expected as the 
>> EmrFs jars are not on the classpath.
>> 
>> But -- I have not been able to figure out a way to add additional classpath 
>> jars to the start-up of the Master daemon. SPARK_CLASSPATH has been 
>> deprecated, and looking around at spark-class, etc.. everything seems to be 
>> pretty locked down. 
>> 
>> Do I have to shove everything into the assembly jar?
>> 
>> Am I missing a simple way to add classpath to the daemons?
>> 
>> thanks,
>> Michal


Additional Master daemon classpath

2015-11-17 Thread Michal Klos
Hi,

We are running a Spark Standalone cluster on EMR (note: not using YARN) and
are trying to use S3 w/ EmrFS as our event logging directory.

We are having difficulties with a ClassNotFoundException on EmrFileSystem
when we navigate to the event log screen. This is to be expected as the
EmrFs jars are not on the classpath.

But -- I have not been able to figure out a way to add additional classpath
jars to the start-up of the Master daemon. SPARK_CLASSPATH has been
deprecated, and looking around at spark-class, etc.. everything seems to be
pretty locked down.

Do I have to shove everything into the assembly jar?

Am I missing a simple way to add classpath to the daemons?

thanks,
Michal


Re: Partitioned Parquet based external table

2015-11-12 Thread Michal Klos
You must add the partitions to the Hive table with something like "alter table 
your_table add if not exists partition (country='us');".

If you have dynamic partitioning turned on,  you can do 'msck repair table 
your_table' to recover the partitions.

I would recommend reviewing the Hive documentation on partitions 

M



> On Nov 12, 2015, at 6:38 AM, Chandra Mohan, Ananda Vel Murugan 
>  wrote:
> 
> Hi,
>  
> I am using Spark 1.5.1.
>  
> https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java.
>  I have slightly modified this example to create partitioned parquet file
>  
> Instead of this line
>  
> schemaPeople.write().parquet("people.parquet");
>  
> I use this line
>  
> schemaPeople.write().partitionBy("country").parquet("/user/Ananda/people.parquet");
>  
> I have also updated the Person class and added country attribute. I have also 
> updated my input file accordingly.
>  
> When I run this code in spark, it seems to work. I could see partitioned 
> folder and parquet file inside it in HDFS where I store this parquet file.
>  
> But when I create a external table in Hive, it does not work. When I do 
> “select  *  from person5”, it returns no rows.
>  
> This is how I create the table
>  
> CREATE EXTERNAL TABLE person5(name string, age int,city string)
> PARTITIONED BY (country string)
> STORED AS PARQUET
> LOCATION '/user/ananda/people.parquet/';
>  
> When I create a non partitioned table, it works fine.
>  
> Please help if you have any idea.
>  
> Regards,
> Anand.C


Add to Powered by Spark page

2015-05-19 Thread Michal Klos
Hi,

We would like to be added to the Powered by Spark list:

organization name: Localytics
URL: http://eng.localytics.com/
a list of which Spark components you are using: Spark, Spark Streaming,
MLLib
a short description of your use case: Batch, real-time, and predictive
analytics driving our mobile app analytics and marketing automation product.

thanks,

M


Re: RDD resiliency -- does it keep state?

2015-03-28 Thread Michal Klos
got it thanks. Making sure everything is idempotent is definitely a
critical piece for peace of mind.

On Sat, Mar 28, 2015 at 1:47 PM, Aaron Davidson ilike...@gmail.com wrote:

 Note that speculation is off by default to avoid these kinds of unexpected
 issues.

 On Sat, Mar 28, 2015 at 6:21 AM, Steve Loughran ste...@hortonworks.com
 wrote:


 It's worth adding that there's no guaranteed that re-evaluated work would
 be on the same host as before, and in the case of node failure, it is not
 guaranteed to be elsewhere.

 this means things that depend on host-local information is going to
 generate different numbers even if there are no other side effects. random
 number generation for seeding RDD.sample() would be a case in point here.

 There's also the fact that if you enable speculative execution, then
 operations may be repeated —even in the absence of any failure. If you are
 doing side effect work, or don't have an output committer whose actions are
 guaranteed to be atomic then you want to turn that option off.

  On 27 Mar 2015, at 19:46, Patrick Wendell pwend...@gmail.com wrote:
 
  If you invoke this, you will get at-least-once semantics on failure.
  For instance, if a machine dies in the middle of executing the foreach
  for a single partition, that will be re-executed on another machine.
  It could even fully complete on one machine, but the machine dies
  immediately before reporting the result back to the driver.
 
  This means you need to make sure the side-effects are idempotent, or
  use some transactional locking. Spark's own output operations, such as
  saving to Hadoop, use such mechanisms. For instance, in the case of
  Hadoop it uses the OutputCommitter classes.
 
  - Patrick
 
  On Fri, Mar 27, 2015 at 12:36 PM, Michal Klos michal.klo...@gmail.com
 wrote:
  Hi Spark group,
 
  We haven't been able to find clear descriptions of how Spark handles
 the
  resiliency of RDDs in relationship to executing actions with
 side-effects.
  If you do an `rdd.foreach(someSideEffect)`, then you are doing a
 side-effect
  for each element in the RDD. If a partition goes down -- the resiliency
  rebuilds the data,  but did it keep track of how far it go in the
  partition's set of data or will it start from the beginning again. So
 will
  it do at-least-once execution of foreach closures or at-most-once?
 
  thanks,
  Michal
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





RDD resiliency -- does it keep state?

2015-03-27 Thread Michal Klos
Hi Spark group,

We haven't been able to find clear descriptions of how Spark handles the
resiliency of RDDs in relationship to executing actions with side-effects.
If you do an `rdd.foreach(someSideEffect)`, then you are doing a
side-effect for each element in the RDD. If a partition goes down -- the
resiliency rebuilds the data,  but did it keep track of how far it go in
the partition's set of data or will it start from the beginning again. So
will it do at-least-once execution of foreach closures or at-most-once?

thanks,
Michal


Spark yarn-client submission example?

2015-03-17 Thread Michal Klos
Hi,

We have a Scala application and we want it to programmatically submit Spark
jobs to a Spark-YARN cluster in yarn-client mode.

We're running into a lot of classpath issues, e.g. once submitted it looks
for jars in our parent Scala application's local directory, jars that it
shouldn't need. Our setJars in the SparkContext only mentions our fat jar,
which should be all it needs. We are not sure why the other jars are being
included once we submit and we don't see a mechanism to control what it
wants.

Here's a sample error:

Diagnostics: java.io.FileNotFoundException: File
file:/Users/github/spark/kindling-container/lib/spark-assembly-1.2.1-hadoop2.4.0.jar
does not exist
Failing this attempt. Failing the application.


I read through the user list and there was discussion around possibly using
Client.scala?

Are there any code examples out there that we could use as reference?

thanks,
Michal


Re: Define exception handling on lazy elements?

2015-03-11 Thread Michal Klos
Is there a way to have the exception handling go lazily along with the
definition?

e.g... we define it on the RDD but then our exception handling code gets
triggered on that first action... without us having to define it on the
first action? (e.g. that RDD code is boilerplate and we want to just have
it in many many projects)

m

On Wed, Mar 11, 2015 at 10:08 AM, Sean Owen so...@cloudera.com wrote:

 Handling exceptions this way means handling errors on the driver side,
 which may or may not be what you want. You can also write functions
 with exception handling inside, which could make more sense in some
 cases (like, to ignore bad records or count them or something).

 If you want to handle errors at every step on the driver side, you
 have to force RDDs to materialize to see if they work. You can do
 that with .count() or .take(1).length  0. But to avoid recomputing
 the RDD then, it needs to be cached. So there is a big non-trivial
 overhead to approaching it this way.

 If you go this way, consider materializing only a few key RDDs in your
 flow, not every one.

 The most natural thing is indeed to handle exceptions where the action
 occurs.


 On Wed, Mar 11, 2015 at 1:51 PM, Michal Klos michal.klo...@gmail.com
 wrote:
  Hi Spark Community,
 
  We would like to define exception handling behavior on RDD instantiation
 /
  build. Since the RDD is lazily evaluated, it seems like we are forced to
 put
  all exception handling in the first action call?
 
  This is an example of something that would be nice:
 
  def myRDD = {
  Try {
  val rdd = sc.textFile(...)
  } match {
  Failure(e) = Handle ...
  }
  }
 
  myRDD.reduceByKey(...) //don't need to worry about that exception here
 
  The reason being that we want to try to avoid having to copy paste
 exception
  handling boilerplate on every first action. We would love to define this
  once somewhere for the RDD build code and just re-use.
 
  Is there a best practice for this? Are we missing something here?
 
  thanks,
  Michal



Re: Define exception handling on lazy elements?

2015-03-11 Thread Michal Klos
Well I'm thinking that this RDD would fail to build in a specific way...
 different from the subsequent code (e.g. s3 access denied or timeout on
connecting to a database)

So for example, define the RDD failure handling on the RDD, define the
action failure handling on the action? Does this make sense.. otherwise...
on that first action, we have to handle exceptions for all of the lazy
elements that preceded it.. and that could be a lot of stuff.

If the RDD failure handling code is defined with the RDD, it just seems
cleaner because it's right next to its element. Not to mention, we believe
it would be easier to import it into multiple spark jobs without a lot of
copy pasta

m

On Wed, Mar 11, 2015 at 10:45 AM, Sean Owen so...@cloudera.com wrote:

 Hm, but you already only have to define it in one place, rather than
 on each transformation. I thought you wanted exception handling at
 each transformation?

 Or do you want it once for all actions? you can enclose all actions in
 a try-catch block, I suppose, to write exception handling code once.
 You can easily write a Scala construct that takes a function and logs
 exceptions it throws, and the function you pass can invoke an RDD
 action. So you can refactor that way too.

 On Wed, Mar 11, 2015 at 2:39 PM, Michal Klos michal.klo...@gmail.com
 wrote:
  Is there a way to have the exception handling go lazily along with the
  definition?
 
  e.g... we define it on the RDD but then our exception handling code gets
  triggered on that first action... without us having to define it on the
  first action? (e.g. that RDD code is boilerplate and we want to just
 have it
  in many many projects)
 
  m
 
  On Wed, Mar 11, 2015 at 10:08 AM, Sean Owen so...@cloudera.com wrote:
 
  Handling exceptions this way means handling errors on the driver side,
  which may or may not be what you want. You can also write functions
  with exception handling inside, which could make more sense in some
  cases (like, to ignore bad records or count them or something).
 
  If you want to handle errors at every step on the driver side, you
  have to force RDDs to materialize to see if they work. You can do
  that with .count() or .take(1).length  0. But to avoid recomputing
  the RDD then, it needs to be cached. So there is a big non-trivial
  overhead to approaching it this way.
 
  If you go this way, consider materializing only a few key RDDs in your
  flow, not every one.
 
  The most natural thing is indeed to handle exceptions where the action
  occurs.
 
 
  On Wed, Mar 11, 2015 at 1:51 PM, Michal Klos michal.klo...@gmail.com
  wrote:
   Hi Spark Community,
  
   We would like to define exception handling behavior on RDD
 instantiation
   /
   build. Since the RDD is lazily evaluated, it seems like we are forced
 to
   put
   all exception handling in the first action call?
  
   This is an example of something that would be nice:
  
   def myRDD = {
   Try {
   val rdd = sc.textFile(...)
   } match {
   Failure(e) = Handle ...
   }
   }
  
   myRDD.reduceByKey(...) //don't need to worry about that exception here
  
   The reason being that we want to try to avoid having to copy paste
   exception
   handling boilerplate on every first action. We would love to define
 this
   once somewhere for the RDD build code and just re-use.
  
   Is there a best practice for this? Are we missing something here?
  
   thanks,
   Michal
 
 



Define exception handling on lazy elements?

2015-03-11 Thread Michal Klos
Hi Spark Community,

We would like to define exception handling behavior on RDD instantiation /
build. Since the RDD is lazily evaluated, it seems like we are forced to
put all exception handling in the first action call?

This is an example of something that would be nice:

def myRDD = {
Try {
val rdd = sc.textFile(...)
} match {
Failure(e) = Handle ...
}
}

myRDD.reduceByKey(...) //don't need to worry about that exception here

The reason being that we want to try to avoid having to copy paste
exception handling boilerplate on every first action. We would love to
define this once somewhere for the RDD build code and just re-use.

Is there a best practice for this? Are we missing something here?

thanks,
Michal


Re: Scalable JDBCRDD

2015-03-02 Thread Michal Klos
Hi Cody,

Thanks for the reply. Yea, we thought of possibly doing this in a UDX in
Vertica somehow to get the lower level co-operation but its a bit daunting.
We want to do this because there are things we want to do with the
result-set in Spark that are not possible in Vertica. The DStream receiver
is a good thought.

I think right now, we are learning towards eric's suggestion -- where we
run the big query once somewhere (getPartitions maybe) in Vertica and dumps
into a temporary table with an additional generated partition_key field.
Then we have the workers issue N select * from temp-table where
partition_key = ? queries that are hopefully lightweight. The temporary
table we are hoping will just clean itself up so we don't need to handle
that mess. It seems like the most sane approach today ;]

m

On Mon, Mar 2, 2015 at 10:51 AM, Cody Koeninger c...@koeninger.org wrote:

 Have you already tried using the Vertica hadoop input format with spark?
 I don't know how it's implemented, but I'd hope that it has some notion of
 vertica-specific shard locality (which JdbcRDD does not).

 If you're really constrained to consuming the result set in a single
 thread, whatever processing you're doing of the results must be
 time-consuming enough to make the overhead of distributing it in a spark
 job still worthwhile?   I guess you might take a look at doing a custom
 DStream receiver that iterates over the result set and makes micro-batches
 out of it.

 On Sun, Mar 1, 2015 at 9:59 AM, michal.klo...@gmail.com 
 michal.klo...@gmail.com wrote:

 Yes exactly.

 The temp table is an approach but then we need to manage the deletion of
 it etc.

 I'm sure we won't be the only people with this crazy use case.

 If there isn't a feasible way to do this within the framework then
 that's okay. But if there is a way we are happy to write the code and PR it
 back :)

 M



 On Mar 1, 2015, at 10:02 AM, eric e...@ericjbell.com wrote:

 What you're saying is that, due to the intensity of the query, you need
 to run a single query and partition the results, versus running one query
 for each partition.

 I assume it's not viable to throw the query results into another table in
 your database and then query that using the normal approach?

 --eric

 On 3/1/15 4:28 AM, michal.klo...@gmail.com wrote:

 Jorn: Vertica

  Cody: I posited the limit just as an example of how jdbcrdd could be
 used least invasively. Let's say we used a partition on a time field -- we
 would still need to have N executions of those queries. The queries we have
 are very intense and concurrency is an issue even if the the N partitioned
 queries are smaller. Some queries require evaluating the whole data set
 first. If our use case a simple select * from table.. Then the partitions
 would be an easier sell if it wasn't for the concurrency problem :) Long
 story short -- we need only one execution of the query and would like to
 just divy out the result set.

  M



 On Mar 1, 2015, at 5:18 AM, Jörn Franke jornfra...@gmail.com wrote:

   What database are you using?
 Le 28 févr. 2015 18:15, Michal Klos michal.klo...@gmail.com a écrit :

 Hi Spark community,

 We have a use case where we need to pull huge amounts of data from a SQL
 query against a database into Spark. We need to execute the query against
 our huge database and not a substitute (SparkSQL, Hive, etc) because of a
 couple of factors including custom functions used in the queries that only
 our database has.

  We started by looking at JDBC RDD, which utilizes a prepared statement
 with two parameters that are meant to be used to partition the result set
 to the workers... e.g.:

  select * from table limit ?,?

  turns into

  select * from table limit 1,100 on worker 1
 select * from table limit 101,200 on worker 2

  This will not work for us because our database cannot support multiple
 execution of these queries without being crippled. But, additionally, our
 database doesn't support the above LIMIT syntax and we don't have a generic
 way of partitioning the various queries.

  As a result -- we stated by forking JDBCRDD and made a version that
 executes the SQL query once in getPartitions into a Vector and then hands
 each worker node an index and iterator. Here's a snippet of getPartitions
 and compute:

override def getPartitions: Array[Partition] = {
 //Compute the DB query once here
 val results = computeQuery

 (0 until numPartitions).map(i = {
   // TODO: would be better to do this partitioning when scrolling 
 through result set if still loading into memory
   val partitionItems = results.drop(i).sliding(1, 
 numPartitions).flatten.toVector
   new DBPartition(i, partitionItems)
 }).toArray
   }

   override def compute(thePart: Partition, context: TaskContext) = new 
 NextIterator[T] {
 val part = thePart.asInstanceOf[DBPartition[T]]

 //Shift the result vector to our index number and then do a sliding 
 iterator over it
 val iterator

Scalable JDBCRDD

2015-02-28 Thread Michal Klos
Hi Spark community,

We have a use case where we need to pull huge amounts of data from a SQL
query against a database into Spark. We need to execute the query against
our huge database and not a substitute (SparkSQL, Hive, etc) because of a
couple of factors including custom functions used in the queries that only
our database has.

We started by looking at JDBC RDD, which utilizes a prepared statement with
two parameters that are meant to be used to partition the result set to the
workers... e.g.:

select * from table limit ?,?

turns into

select * from table limit 1,100 on worker 1
select * from table limit 101,200 on worker 2

This will not work for us because our database cannot support multiple
execution of these queries without being crippled. But, additionally, our
database doesn't support the above LIMIT syntax and we don't have a generic
way of partitioning the various queries.

As a result -- we stated by forking JDBCRDD and made a version that
executes the SQL query once in getPartitions into a Vector and then hands
each worker node an index and iterator. Here's a snippet of getPartitions
and compute:

  override def getPartitions: Array[Partition] = {
//Compute the DB query once here
val results = computeQuery

(0 until numPartitions).map(i = {
  // TODO: would be better to do this partitioning when scrolling
through result set if still loading into memory
  val partitionItems = results.drop(i).sliding(1,
numPartitions).flatten.toVector
  new DBPartition(i, partitionItems)
}).toArray
  }

  override def compute(thePart: Partition, context: TaskContext) = new
NextIterator[T] {
val part = thePart.asInstanceOf[DBPartition[T]]

//Shift the result vector to our index number and then do a
sliding iterator over it
val iterator = part.items.iterator

override def getNext : T = {
  if (iterator.hasNext) {
iterator.next()
  } else {
finished = true
null.asInstanceOf[T]
  }
}

override def close: Unit = ()
  }

This is a little better since we can just execute the query once.
However, the result-set needs to fit in memory.

We've been trying to brainstorm a way to

A) have that result set distribute out to the worker RDD partitions as
it's streaming in from the cursor?
B) have the result set spill to disk if it exceeds memory and do
something clever around the iterators?
C) something else?

We're not familiar enough yet with all of the workings of Spark to
know how to proceed on this.

We also thought of the worker-around of having the DB query dump to
HDFS/S3 and then pick it up for there, but it adds more moving parts
and latency to our processing.

Does anyone have a clever suggestion? Are we missing something?

thanks,
Michal