Re: Tools to manage workflows on Spark

2015-02-28 Thread Mayur Rustagi
Sorry not really. Spork is a way to migrate your existing pig scripts to
Spark or write new pig jobs then can execute on spark.
For orchestration you are better off using Oozie especially if you are
using other execution engines/systems besides spark.


Regards,
Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoid.com http://www.sigmoidanalytics.com/
@mayur_rustagi http://www.twitter.com/mayur_rustagi

On Sat, Feb 28, 2015 at 6:59 PM, Qiang Cao caoqiang...@gmail.com wrote:

 Thanks Mayur! I'm looking for something that would allow me to easily
 describe and manage a workflow on Spark. A workflow in my context is a
 composition of Spark applications that may depend on one another based on
 hdfs inputs/outputs. Is Spork a good fit? The orchestration I want is on
 app level.



 On Sat, Feb 28, 2015 at 9:38 PM, Mayur Rustagi mayur.rust...@gmail.com
 wrote:

 We do maintain it but in apache repo itself. However Pig cannot do
 orchestration for you. I am not sure what you are looking at from Pig in
 this context.

 Regards,
 Mayur Rustagi
 Ph: +1 (760) 203 3257
 http://www.sigmoid.com http://www.sigmoidanalytics.com/
 @mayur_rustagi http://www.twitter.com/mayur_rustagi

 On Sat, Feb 28, 2015 at 6:36 PM, Ted Yu yuzhih...@gmail.com wrote:

 Here was latest modification in spork repo:
 Mon Dec 1 10:08:19 2014

 Not sure if it is being actively maintained.

 On Sat, Feb 28, 2015 at 6:26 PM, Qiang Cao caoqiang...@gmail.com
 wrote:

 Thanks for the pointer, Ashish! I was also looking at Spork
 https://github.com/sigmoidanalytics/spork Pig-on-Spark), but wasn't
 sure if that's the right direction.

 On Sat, Feb 28, 2015 at 6:36 PM, Ashish Nigam ashnigamt...@gmail.com
 wrote:

 You have to call spark-submit from oozie.
 I used this link to get the idea for my implementation -


 http://mail-archives.apache.org/mod_mbox/oozie-user/201404.mbox/%3CCAHCsPn-0Grq1rSXrAZu35yy_i4T=fvovdox2ugpcuhkwmjp...@mail.gmail.com%3E



 On Feb 28, 2015, at 3:25 PM, Qiang Cao caoqiang...@gmail.com wrote:

 Thanks, Ashish! Is Oozie integrated with Spark? I knew it can
 accommodate some Hadoop jobs.


 On Sat, Feb 28, 2015 at 6:07 PM, Ashish Nigam ashnigamt...@gmail.com
 wrote:

 Qiang,
 Did you look at Oozie?
 We use oozie to run spark jobs in production.


 On Feb 28, 2015, at 2:45 PM, Qiang Cao caoqiang...@gmail.com wrote:

 Hi Everyone,

 We need to deal with workflows on Spark. In our scenario, each
 workflow consists of multiple processing steps. Among different steps,
 there could be dependencies.  I'm wondering if there are tools
 available that can help us schedule and manage workflows on Spark. I'm
 looking for something like pig on Hadoop, but it should fully function on
 Spark.

 Any suggestion?

 Thanks in advance!

 Qiang











Re: Tools to manage workflows on Spark

2015-02-28 Thread Mayur Rustagi
We do maintain it but in apache repo itself. However Pig cannot do
orchestration for you. I am not sure what you are looking at from Pig in
this context.

Regards,
Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoid.com http://www.sigmoidanalytics.com/
@mayur_rustagi http://www.twitter.com/mayur_rustagi

On Sat, Feb 28, 2015 at 6:36 PM, Ted Yu yuzhih...@gmail.com wrote:

 Here was latest modification in spork repo:
 Mon Dec 1 10:08:19 2014

 Not sure if it is being actively maintained.

 On Sat, Feb 28, 2015 at 6:26 PM, Qiang Cao caoqiang...@gmail.com wrote:

 Thanks for the pointer, Ashish! I was also looking at Spork
 https://github.com/sigmoidanalytics/spork Pig-on-Spark), but wasn't sure
 if that's the right direction.

 On Sat, Feb 28, 2015 at 6:36 PM, Ashish Nigam ashnigamt...@gmail.com
 wrote:

 You have to call spark-submit from oozie.
 I used this link to get the idea for my implementation -


 http://mail-archives.apache.org/mod_mbox/oozie-user/201404.mbox/%3CCAHCsPn-0Grq1rSXrAZu35yy_i4T=fvovdox2ugpcuhkwmjp...@mail.gmail.com%3E



 On Feb 28, 2015, at 3:25 PM, Qiang Cao caoqiang...@gmail.com wrote:

 Thanks, Ashish! Is Oozie integrated with Spark? I knew it can
 accommodate some Hadoop jobs.


 On Sat, Feb 28, 2015 at 6:07 PM, Ashish Nigam ashnigamt...@gmail.com
 wrote:

 Qiang,
 Did you look at Oozie?
 We use oozie to run spark jobs in production.


 On Feb 28, 2015, at 2:45 PM, Qiang Cao caoqiang...@gmail.com wrote:

 Hi Everyone,

 We need to deal with workflows on Spark. In our scenario, each workflow
 consists of multiple processing steps. Among different steps, there could
 be dependencies.  I'm wondering if there are tools available that can
 help us schedule and manage workflows on Spark. I'm looking for something
 like pig on Hadoop, but it should fully function on Spark.

 Any suggestion?

 Thanks in advance!

 Qiang









Re: Can spark job server be used to visualize streaming data?

2015-02-13 Thread Mayur Rustagi
Frankly no good/standard way to visualize streaming data. So far I have
found HBase as good intermediate store to store data from streams 
visualize it by a play based framework  d3.js.
Regards
Mayur


On Fri Feb 13 2015 at 4:22:58 PM Kevin (Sangwoo) Kim kevin...@apache.org
wrote:

 I'm not very sure for CDH 5.3,
 but now Zeppelin works for Spark 1.2 as spark-repl has been published in
 Spark 1.2.1
 Please try again!

 On Fri Feb 13 2015 at 3:55:19 PM Su She suhsheka...@gmail.com wrote:

 Thanks Kevin for the link, I have had issues trying to install zeppelin
 as I believe it is not yet supported for CDH 5.3, and Spark 1.2. Please
 correct me if I am mistaken.

 On Thu, Feb 12, 2015 at 7:33 PM, Kevin (Sangwoo) Kim kevin...@apache.org
  wrote:

 Apache Zeppelin also has a scheduler and then you can reload your chart
 periodically,
 Check it out:
 http://zeppelin.incubator.apache.org/docs/tutorial/tutorial.html




 On Fri Feb 13 2015 at 7:29:00 AM Silvio Fiorito 
 silvio.fior...@granturing.com wrote:

   One method I’ve used is to publish each batch to a message bus or
 queue with a custom UI listening on the other end, displaying the results
 in d3.js or some other app. As far as I’m aware there isn’t a tool that
 will directly take a DStream.

  Spark Notebook seems to have some support for updating graphs
 periodically. I haven’t used it myself yet so not sure how well it works.
 See here: https://github.com/andypetrella/spark-notebook

   From: Su She
 Date: Thursday, February 12, 2015 at 1:55 AM
 To: Felix C
 Cc: Kelvin Chu, user@spark.apache.org

 Subject: Re: Can spark job server be used to visualize streaming data?

   Hello Felix,

  I am already streaming in very simple data using Kafka (few messages
 / second, each record only has 3 columns...really simple, but looking to
 scale once I connect everything). I am processing it in Spark Streaming and
 am currently writing word counts to hdfs. So the part where I am confused
 is...

 Kafka Publishes Data - Kafka Consumer/Spark Streaming Receives Data -
 Spark Word Count - *How do I visualize?*

  is there a viz tool that I can set up to visualize JavaPairDStreams?
 or do I have to write to hbase/hdfs first?

  Thanks!

 On Wed, Feb 11, 2015 at 10:39 PM, Felix C felixcheun...@hotmail.com
 wrote:

  What kind of data do you have? Kafka is a popular source to use with
 spark streaming.
 But, spark streaming also support reading from a file. Its called
 basic source

 https://spark.apache.org/docs/latest/streaming-programming-guide.html#input-dstreams-and-receivers

 --- Original Message ---

 From: Su She suhsheka...@gmail.com
 Sent: February 11, 2015 10:23 AM
 To: Felix C felixcheun...@hotmail.com
 Cc: Kelvin Chu 2dot7kel...@gmail.com, user@spark.apache.org
 Subject: Re: Can spark job server be used to visualize streaming data?

   Thank you Felix and Kelvin. I think I'll def be using the k-means
 tools in mlib.

  It seems the best way to stream data is by storing in hbase and then
 using an api in my viz to extract data? Does anyone have any thoughts on
 this?

   Thanks!


 On Tue, Feb 10, 2015 at 11:45 PM, Felix C felixcheun...@hotmail.com
 wrote:

  Checkout

 https://databricks.com/blog/2015/01/28/introducing-streaming-k-means-in-spark-1-2.html

 In there are links to how that is done.


 --- Original Message ---

 From: Kelvin Chu 2dot7kel...@gmail.com
 Sent: February 10, 2015 12:48 PM
 To: Su She suhsheka...@gmail.com
 Cc: user@spark.apache.org
 Subject: Re: Can spark job server be used to visualize streaming data?

   Hi Su,

  Out of the box, no. But, I know people integrate it with Spark
 Streaming to do real-time visualization. It will take some work though.

  Kelvin

 On Mon, Feb 9, 2015 at 5:04 PM, Su She suhsheka...@gmail.com wrote:

  Hello Everyone,

  I was reading this blog post:
 http://homes.esat.kuleuven.be/~bioiuser/blog/a-d3-visualisation-from-spark-as-a-service/

  and was wondering if this approach can be taken to visualize
 streaming data...not just historical data?

  Thank you!

  -Suh








Re: Modifying an RDD in forEach

2014-12-06 Thread Mayur Rustagi
You'll benefit by viewing Matei's talk in Yahoo on Spark internals and how
it optimizes execution of iterative jobs.
Simple answer is
1. Spark doesn't materialize RDD when you do an iteration but lazily
captures the transformation functions in RDD.(only function and closure ,
no data operation actually happens)
2. When you finally execute and want to cause effects (save to disk ,
collect on master etc) it views the DAG of execution and optimizes what it
can reason (eliminating intermediate states , performing multiple
Transformations in one tasks, leveraging partitioning where available among
others)
Bottom line it doesn't matter how many RDD you have in your DAG chain as
long as Spark can optimize the functions in that DAG to create minimal
materialization on its way to final output.

Regards
Mayur
 On 06-Dec-2014 6:12 pm, Ron Ayoub ronalday...@live.com wrote:

 This is from a separate thread with a differently named title.

 Why can't you modify the actual contents of an RDD using forEach? It
 appears to be working for me. What I'm doing is changing cluster
 assignments and distances per data item for each iteration of the
 clustering algorithm. The clustering algorithm is massive and iterates
 thousands of times. As I understand it now, you are supposed to create new
 RDDs on each pass. This is a hierachical k-means that I'm doing and hence
 it is consist of many iterations rather than large iterations.

 So I understand the restriction of why operation when aggregating and
 reducing etc, need to be associative. However, forEach operates on a single
 item. So being that Spark is advertised as being great for iterative
 algorithms since it operates in-memory, how can it be good to create
 thousands upon thousands of RDDs during the course of an iterative
 algorithm?  Does Spark have some kind of trick like reuse behind the
 scenes - fully persistent data objects or whatever? How can it possibly be
 efficient for 'iterative' algorithms when it is creating so many RDDs as
 opposed to one?

 Or is the answer that I should keep doing what I'm doing because it is
 working even though it is not theoretically sound and aligned with
 functional ideas. I personally just want it to be fast and be able to
 operate on up to 500 million data items.



Re: Joined RDD

2014-11-13 Thread Mayur Rustagi
First of all any action is only performed when you trigger a collect,
When you trigger collect, at that point it retrieves data from disk joins
the datasets together  delivers it to you.

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi


On Thu, Nov 13, 2014 at 12:26 PM, ajay garg ajay.g...@mobileum.com wrote:

 Hi,
  I have two RDDs A and B which are created from reading file from HDFS.
 I have a third RDD C which is created by taking join of A and B. All three
 RDDs (A, B and C ) are not cached.
 Now if I perform any action on C (let say collect), action is served
 without
 reading any data from the disk.
 Since no data is cached in spark how is action on C is served without
 reading data from disk.

 Thanks
 --Ajay



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Joined-RDD-tp18820.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Communication between Driver and Executors

2014-11-13 Thread Mayur Rustagi
I wonder if SparkConf is dynamically updated on all worker nodes or only
during initialization. It can be used to piggyback information.
Otherwise I guess you are stuck with Broadcast.
Primarily I have had these issues moving legacy MR operators to Spark where
MR piggybacks on Hadoop conf pretty  heavily, in spark Native application
its rarely required. Do you have a usecase like that?


Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi


On Fri, Nov 14, 2014 at 10:28 AM, Tobias Pfeiffer t...@preferred.jp wrote:

 Hi,

 (this is related to my previous question about stopping the
 StreamingContext)

 is there any way to send a message from the driver to the executors? There
 is all this Akka machinery running, so it should be easy to have something
 like

   sendToAllExecutors(message)

 on the driver and

   handleMessage {
 case _ = ...
   }

 on the executors, right? Surely at least for Broadcast.unpersist() such a
 thing must exist, so can I use it somehow (dirty way is also ok) to send a
 message to my Spark nodes?

 Thanks
 Tobias



Re: flatMap followed by mapPartitions

2014-11-12 Thread Mayur Rustagi
flatmap would have to shuffle data only if output RDD is expected to be
partitioned by some key.
RDD[X].flatmap(X=RDD[Y])
If it has to shuffle it should be local.

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi


On Thu, Nov 13, 2014 at 7:31 AM, Debasish Das debasish.da...@gmail.com
wrote:

 Hi,

 I am doing a flatMap followed by mapPartitions to do some blocked
 operation...flatMap is shuffling data but this shuffle is strictly
 shuffling to disk and not over the network right ?

 Thanks.
 Deb



Re: Using partitioning to speed up queries in Shark

2014-11-07 Thread Mayur Rustagi
- dev list  + user list
Shark is not officially supported anymore so you are better off moving to
Spark SQL.
Shark doesnt support Hive partitioning logic anyways, it has its version of
partitioning on in-memory blocks but is independent of whether you
partition your data in hive or not.



Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi


On Fri, Nov 7, 2014 at 3:31 AM, Gordon Benjamin gordon.benjami...@gmail.com
 wrote:

 Hi All,

 I'm using Spark/Shark as the foundation for some reporting that I'm doing
 and have a customers table with approximately 3 million rows that I've
 cached in memory.

 I've also created a partitioned table that I've also cached in memory on a
 per day basis

 FROM
 customers_cached
 INSERT OVERWRITE TABLE
 part_customers_cached
 PARTITION(createday)
 SELECT id,email,dt_cr, to_date(dt_cr) as createday where
 dt_crunix_timestamp('2013-01-01 00:00:00') and
 dt_crunix_timestamp('2013-12-31 23:59:59');
 set exec.dynamic.partition=true;

 set exec.dynamic.partition.mode=nonstrict;

 however when I run the following basic tests I get this type of performance

 [localhost:1] shark select count(*) from part_customers_cached where
  createday = '2014-08-01' and createday = '2014-12-06';
 37204
 Time taken (including network latency): 3.131 seconds

 [localhost:1] shark  SELECT count(*) from customers_cached where
 dt_crunix_timestamp('2013-08-01 00:00:00') and
 dt_crunix_timestamp('2013-12-06 23:59:59');
 37204
 Time taken (including network latency): 1.538 seconds

 I'm running this on a cluster with one master and two slaves and was hoping
 that the partitioned table would be noticeably faster but it looks as
 though the partitioning has slowed things down... Is this the case, or is
 there some additional configuration that I need to do to speed things up?

 Best Wishes,

 Gordon



Re: Why RDD is not cached?

2014-10-28 Thread Mayur Rustagi
What is the partition count of the RDD, its possible that you dont have
enough memory to store the whole RDD on a single machine. Can you try
forcibly repartitioning the RDD  then cacheing.
Regards
Mayur

On Tue Oct 28 2014 at 1:19:09 AM shahab shahab.mok...@gmail.com wrote:

 I used Cache followed by a count on RDD to ensure that caching is
 performed.

 val rdd = srdd.flatMap(mapProfile_To_Sessions).cache

val count = rdd.count

 //so at this point RDD should be cahed ? right?

 On Tue, Oct 28, 2014 at 8:35 AM, Sean Owen so...@cloudera.com wrote:

 Did you just call cache()? By itself it does nothing but once an action
 requires it to be computed it should become cached.
 On Oct 28, 2014 8:19 AM, shahab shahab.mok...@gmail.com wrote:

 Hi,

 I have a standalone spark , where the executor is set to have 6.3 G
 memory , as I am using two workers so in total there 12.6 G memory and 4
 cores.

 I am trying to cache a RDD with approximate size of 3.2 G, but
 apparently it is not cached as neither I can see  
 BlockManagerMasterActor: Added rdd_XX in memory  nor  the performance
 of running the tasks is improved

 But, why it is not cached when there is enough memory storage?
 I tried with smaller RDDs. 1 or 2 G and it works, at least I could see 
 BlockManagerMasterActor:
 Added rdd_0_1 in memory and improvement in results.

 Any idea what I am missing in my settings, or... ?

 thanks,
 /Shahab





Re: input split size

2014-10-18 Thread Mayur Rustagi
Does it retain the order if its pulling from the hdfs blocks, meaning
if  file1 = a, b, c partition in order
if I convert to 2 partition read will it map to ab, c or a, bc or it can
also be a, cb ?


Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi


On Sat, Oct 18, 2014 at 9:09 AM, Ilya Ganelin ilgan...@gmail.com wrote:

 Also - if you're doing a text file read you can pass the number of
 resulting partitions as the second argument.
 On Oct 17, 2014 9:05 PM, Larry Liu larryli...@gmail.com wrote:

 Thanks, Andrew. What about reading out of local?

 On Fri, Oct 17, 2014 at 5:38 PM, Andrew Ash and...@andrewash.com wrote:

 When reading out of HDFS it's the HDFS block size.

 On Fri, Oct 17, 2014 at 5:27 PM, Larry Liu larryli...@gmail.com wrote:

 What is the default input split size? How to change it?






Re: rule engine based on spark

2014-10-14 Thread Mayur Rustagi
We are developing something similar on top of Streaming. Could you detail
some rule functionality you are looking for.
We are developing a dsl for data processing on top of streaming as well as
static data enabled on Spark.


Regards
Mayur

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi


On Wed, Oct 15, 2014 at 3:51 AM, salemi alireza.sal...@udo.edu wrote:

 hi,

 is the a rule engine based on spark? i like to allow the business user to
 define their rules in a language and the execution of the rules should be
 done in spark.


 Thanks,
 Ali



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/rule-engine-based-on-spark-tp16433.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: JavaPairDStream saveAsTextFile

2014-10-09 Thread Mayur Rustagi
Thats a cryptic way to say thr should be a Jira for it :)

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi


On Thu, Oct 9, 2014 at 11:46 AM, Sean Owen so...@cloudera.com wrote:

 Yeah it's not there. I imagine it was simply never added, and that
 there's not a good reaosn it couldn't be.

 On Thu, Oct 9, 2014 at 4:53 AM, SA sadhu.a...@gmail.com wrote:
  HI,
 
  I am looking at the documentation for Java API for Streams.  The scala
  library has option to save file locally, but the Java version doesnt seem
  to.  The only option i see is saveAsHadoopFiles.
 
  Is there a reason why this option was left out from Java API?
 
 
 http://spark.apache.org/docs/1.0.0/api/java/index.html?org/apache/spark/streaming/dstream/DStream.html
 
  Thanks.
  SA

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Setup/Cleanup for RDD closures?

2014-10-03 Thread Mayur Rustagi
Current approach is to use mappartition, initialize the connection in the
beginning, iterate through the data  close off the connector.


Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi


On Fri, Oct 3, 2014 at 10:16 AM, Stephen Boesch java...@gmail.com wrote:


 Consider there is some connection / external resource allocation required
 to be accessed/mutated by each of the rows from within a single worker
 thread.  That connection should only  be opened/closed before the first row
 is accessed / after the last row is completed.

 It is my understanding that there is work presently underway (Reynold Xin
 and others)  on defining an external resources API to address this. What is
 the recommended approach in the meanwhile?



Re: Spark Streaming for time consuming job

2014-10-01 Thread Mayur Rustagi
Calling collect on anything  is almost always a bad idea. The only
exception is if you are looking to pass that data on to any other system 
never see it again :) .
I would say you need to implement outlier detection on the rdd  process it
in spark itself rather than calling collect on it.

Regards
Mayur

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi


On Tue, Sep 30, 2014 at 3:22 PM, Eko Susilo eko.harmawan.sus...@gmail.com
wrote:

 Hi All,

 I have a problem that i would like to consult about spark streaming.

 I have a spark streaming application that parse a file (which will be
 growing as time passed by)This file contains several columns containing
 lines of numbers,
 these parsing is divided into windows (each 1 minute). Each column
 represent different entity while each row within a column represent the
 same entity (for example, first column represent temprature, second column
 represent humidty, etc, while each row represent the value of each
 attribute). I use PairDStream for each column.

 Afterwards, I need to run a time consuming algorithm (outlier detection,
 for now i use box plot algorithm) for each RDD of each PairDStream.

 To run the outlier detection, currently i am thinking about to call
 collect on each of the PairDStream from method forEachRDD and then i get
 the List of the items, and then pass the each list of items to a thread.
 Each thread runs the outlier detection algorithm and process the result.

 I run the outlier detection in separate thread in order not to put too
 much burden on spark streaming task. So, I would like to ask if this model
 has a risk? or is there any alternatives provided by the framework such
 that i don't have to run a separate thread for this?

 Thank you for your attention.



 --
 Best Regards,
 Eko Susilo



Re: Processing multiple request in cluster

2014-09-25 Thread Mayur Rustagi
There are two problems you may be facing.
1. your application is taking all resources
2. inside your application task submission is not scheduling properly.

for 1  you can either configure your app to take less resources or use
mesos/yarn types scheduler to dynamically change or juggle resources
for 2. you can use fair scheduler so that application tasks can be
scheduled more fairly.

Regards
Mayur

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi


On Thu, Sep 25, 2014 at 12:32 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 You can try spark on Mesos or Yarn since they have lot more support for
 scheduling and all

 Thanks
 Best Regards

 On Thu, Sep 25, 2014 at 4:50 AM, Subacini B subac...@gmail.com wrote:

 hi All,

 How to run concurrently multiple requests on same cluster.

 I have a program using *spark streaming context *which reads* streaming
 data* and writes it to HBase. It works fine, the problem is when
 multiple requests are submitted to cluster, only first request is processed
 as the entire cluster is used for this request. Rest of the requests are in
 waiting mode.

 i have set  spark.cores.max to 2 or less, so that it can process another
 request,but if there is only one request cluster is not utilized properly.

 Is there any way, that spark cluster can process streaming request
 concurrently at the same time effectively utitlizing cluster, something
 like sharkserver

 Thanks
 Subacini





Re: Serving data

2014-09-13 Thread Mayur Rustagi
You can cache data in memory  query it using Spark Job Server. 

Most folks dump data down to a queue/db for retrieval 

You can batch up data  store into parquet partitions as well.  query it using 
another SparkSQL  shell, JDBC driver in SparkSQL is part 1.1 i believe. 
-- 
Regards,
Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi

On Fri, Sep 12, 2014 at 2:54 PM, Marius Soutier mps@gmail.com wrote:

 Hi there,
 I’m pretty new to Spark, and so far I’ve written my jobs the same way I wrote 
 Scalding jobs - one-off, read data from HDFS, count words, write counts back 
 to HDFS.
 Now I want to display these counts in a dashboard. Since Spark allows to 
 cache RDDs in-memory and you have to explicitly terminate your app (and 
 there’s even a new JDBC server in 1.1), I’m assuming it’s possible to keep an 
 app running indefinitely and query an in-memory RDD from the outside (via 
 SparkSQL for example).
 Is this how others are using Spark? Or are you just dumping job results into 
 message queues or databases?
 Thanks
 - Marius
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org

Re: single worker vs multiple workers on each machine

2014-09-12 Thread Mayur Rustagi
Another aspect to keep in mind is JVM above 8-10GB starts to misbehave.
Typically better to split up ~ 15GB intervals.
if you are choosing machines 10GB/Core is a approx to maintain.

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi


On Fri, Sep 12, 2014 at 2:59 AM, Sean Owen so...@cloudera.com wrote:

 As I understand, there's generally not an advantage to running many
 executors per machine. Each will already use all the cores, and
 multiple executors just means splitting the available memory instead
 of having one big pool. I think there may be an argument at extremes
 of scale where one JVM with a huge heap might have excessive GC
 pauses, or too many open files, that kind of thing?

 On Thu, Sep 11, 2014 at 8:42 PM, Mike Sam mikesam...@gmail.com wrote:
  Hi There,
 
  I am new to Spark and I was wondering when you have so much memory on
 each
  machine of the cluster, is it better to run multiple workers with limited
  memory on each machine or is it better to run a single worker with
 access to
  the majority of the machine memory? If the answer is it depends, would
 you
  please elaborate?
 
  Thanks,
  Mike

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Network requirements between Driver, Master, and Slave

2014-09-12 Thread Mayur Rustagi
Driver needs a consistent connection to the master in standalone mode as whole 
bunch of client stuff happens on the driver. So calls like parallelize send 
data from driver to the master  collect send data from master to the driver. 

If you are looking to avoid the connect you can look into embedded driver model 
in yarn where the driver will also run inside the cluster  hence reliability  
connectivity is a given. 
-- 
Regards,
Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi

On Fri, Sep 12, 2014 at 6:46 PM, Jim Carroll jimfcarr...@gmail.com
wrote:

 Hi Akhil,
 Thanks! I guess in short that means the master (or slaves?) connect back to
 the driver. This seems like a really odd way to work given the driver needs
 to already connect to the master on port 7077. I would have thought that if
 the driver could initiate a connection to the master, that would be all
 that's required.
 Can you describe what it is about the architecture that requires the master
 to connect back to the driver even when the driver initiates a connection to
 the master? Just curious.
 Thanks anyway.
 Jim
  
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Network-requirements-between-Driver-Master-and-Slave-tp13997p14086.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org

Re: Spark caching questions

2014-09-10 Thread Mayur Rustagi
Cached RDD do not survive SparkContext deletion (they are scoped on a per
sparkcontext basis).
I am not sure what you mean by disk based cache eviction, if you cache more
RDD than disk space the result will not be very pretty :)

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi


On Wed, Sep 10, 2014 at 4:43 AM, Vladimir Rodionov 
vrodio...@splicemachine.com wrote:

 Hi, users

 1. Disk based cache eviction policy? The same LRU?

 2. What is the scope of a cached RDD? Does it survive application? What
 happen if I run Java app next time? Will RRD be created or read from cache?

 If , answer is YES, then ...


 3. Is there are any way to invalidate cached RDD automatically? RDD
 partitions? Some API kind of : RDD.isValid()?

 4. HadoopRDD InputFormat - based. Some partitions (splits) may become
 invalid in cache. Can we reload only those partitions? Into cache?

 -Vladimir



Re: Spark Streaming and database access (e.g. MySQL)

2014-09-10 Thread Mayur Rustagi
I think she is checking for blanks?
But if the RDD is blank then nothing will happen, no db connections etc.

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi


On Mon, Sep 8, 2014 at 1:32 PM, Tobias Pfeiffer t...@preferred.jp wrote:

 Hi,

 On Mon, Sep 8, 2014 at 4:39 PM, Sean Owen so...@cloudera.com wrote:

  if (rdd.take (1).size == 1) {
  rdd foreachPartition { iterator =


 I was wondering: Since take() is an output operation, isn't it computed
 twice (once for the take(1), once during the iteration)? Or will only one
 single element be computed for take(1)?

 Thanks
 Tobias





Re: Records - Input Byte

2014-09-09 Thread Mayur Rustagi
What do you mean by control your input”, are you trying to pace your spark 
streaming by number of words. If so that is not supported as of now, you can 
only control time  consume all files within that time period. 
-- 
Regards,
Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi

On Tue, Sep 9, 2014 at 2:24 AM, danilopds danilob...@gmail.com wrote:

 Hi,
 I was reading the paper of Spark Streaming:
 Discretized Streams: Fault-Tolerant Streaming Computation at Scale
 So,
 I read that performance evaluation used 100-byte input records in test Grep
 and WordCount.
 I don't have much experience and I'd like to know how can I control this
 value in my records (like words in an input file)?
 Can anyone suggest me something to start?
 Thanks!
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Records-Input-Byte-tp13733.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org

Re: Spark Streaming and database access (e.g. MySQL)

2014-09-07 Thread Mayur Rustagi
Standard pattern is to initialize the mysql jdbc driver in your
mappartition call , update database  then close off the driver.
Couple of gotchas
1. New driver initiated for all your partitions
2. If the effect(inserts  updates) is not idempotent, so if your server
crashes, Spark will replay updates to mysql  may cause data corruption.


Regards
Mayur

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi


On Sun, Sep 7, 2014 at 11:54 AM, jchen jc...@pivotal.io wrote:

 Hi,

 Has someone tried using Spark Streaming with MySQL (or any other
 database/data store)? I can write to MySQL at the beginning of the driver
 application. However, when I am trying to write the result of every
 streaming processing window to MySQL, it fails with the following error:

 org.apache.spark.SparkException: Job aborted due to stage failure: Task not
 serializable: java.io.NotSerializableException:
 com.mysql.jdbc.JDBC4PreparedStatement

 I think it is because the statement object should be serializable, in order
 to be executed on the worker node. Has someone tried the similar cases?
 Example code will be very helpful. My intension is to execute
 INSERT/UPDATE/DELETE/SELECT statements for each sliding window.

 Thanks,
 JC



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-and-database-access-e-g-MySQL-tp13644.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Q: About scenarios where driver execution flow may block...

2014-09-07 Thread Mayur Rustagi
Statements are executed only when you try to cause some effect on the
server (produce data, collect data on driver). At time of execution Spark
does all the depedency resolution  truncates paths that dont go anywhere
as well as optimize execution pipelines. So you really dont have to worry
about these.

Important thing is if you are doing certain actions in your functions that
are non-explicitly dependent on others then you may start seeing errors.
For example you may write a file in hdfs during a map operations  expect
to read it another map operations, according to spark map operation is not
expected to alter anything apart from the RDD it is created upon, hence
spark may not realize this dependency  try to parallelize the two
operations, causing error . Bottom line as long as you make all your
depedencies explicit in RDD, spark will take care of the magic.

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi


On Sun, Sep 7, 2014 at 12:14 AM, didata subscripti...@didata.us wrote:

  Hello friends:

 I have a theory question about call blocking in a Spark driver.

 Consider this (admittedly contrived =:)) snippet to illustrate this
 question...

  x = rdd01.reduceByKey()  # or maybe some other 'shuffle-requiring
 action'.

  b = sc.broadcast(x. take(20)) # Or any statement that requires the
 previous statement to complete, cluster-wide.

  y = rdd02.someAction(f(b))

 Would the first or second statement above block because the second (or
 third) statement needs to wait for the previous one to complete,
 cluster-wide?

 Maybe this isn't the best example (typed on a phone), but generally I'm
 trying to understand the scenario(s) where a rdd call in the driver may
 block because the graph indicates that the next statement is dependent on
 the completion of the current one, cluster-wide (noy just lazy evaluated).

 Thank you. :)

 Sincerely yours,
 Team Dimension Data



Re: Array and RDDs

2014-09-07 Thread Mayur Rustagi
Your question is a bit confusing..
I assume you have a RDD containing nodes  some meta data (child nodes
maybe)  you are trying to attach another metadata to it (bye array). if
its just same byte array for all nodes you can generate rdd with the count
of nodes  zip the two rdd together, you can also create a (node,
bytearray) combo  join the two rdd together.


Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi


On Sat, Sep 6, 2014 at 10:51 AM, Deep Pradhan pradhandeep1...@gmail.com
wrote:

 Hi,
 I have an input file which consists of stc_node dest_node
 I have created and RDD consisting of key-value pair where key is the node
 id and the values are the children of that node.
 Now I want to associate a byte with each node. For that I have created a
 byte array.
 Every time I print out the key-value pair in the RDD the key-value pairs
 do not come in the same order. Because of this I am finding it difficult to
 assign the byte values with each node.
 Can anyone help me out in this matter?

 I basically have the following code:
 val bitarray = Array.fill[Byte](number)(0)

 And I want to assiciate each byte in the array to a node.
 How should I do that?

 Thank You



Re: how to choose right DStream batch interval

2014-09-07 Thread Mayur Rustagi
Spark will simply have a backlog of tasks, it'll manage to process them
nonetheless, though if it keeps falling behind, you may run out of memory
or have unreasonable latency. For momentary spikes, Spark streaming will
manage.
Mostly if you are looking to do 100% processing, you'll have to go with 5
sec processing, alternative is to process data in two pipelines (.5  5 )
in two spark streaming jobs  overwrite results of one with the other.

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi


On Sat, Sep 6, 2014 at 12:39 AM, qihong qc...@pivotal.io wrote:

 repost since original msg was marked with This post has NOT been accepted
 by
 the mailing list yet.

 I have some questions regarding DStream batch interval:

 1. if it only take 0.5 second to process the batch 99% of time, but 1% of
 batches need 5 seconds to process (due to some random factor or failures),
 then what's the right batch interval? 5 seconds (the worst case)?

 2. What will happen to DStream processing if 1 batch took longer than batch
 interval? Can Spark recover from that?

 Thanks,
 Qihong



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/how-to-choose-right-DStream-batch-interval-tp13578p13579.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Update on Pig on Spark initiative

2014-08-27 Thread Mayur Rustagi
Hi,
We have migrated Pig functionality on top of Spark passing 100% e2e for
success cases in pig test suite. That means UDF, Joins  other
functionality is working quite nicely. We are in the process of merging
with Apache Pig trunk(something that should happen over the next 2 weeks).
Meanwhile if you are interested in giving it a go, you can try it at
https://github.com/sigmoidanalytics/spork
This contains all the major changes but may not have all the patches
required for 100% e2e, if you are trying it out let me know any issues you
face

Whole bunch of folks contributed on this

Julien Le Dem (Twitter),  Praveen R (Sigmoid Analytics), Akhil Das (Sigmoid
Analytics), Bill Graham (Twitter), Dmitriy Ryaboy (Twitter), Kamal Banga
(Sigmoid Analytics), Anish Haldiya (Sigmoid Analytics),  Aniket Mokashi
 (Google), Greg Owen (DataBricks), Amit Kumar Behera (Sigmoid Analytics),
Mahesh Kalakoti (Sigmoid Analytics)

Not to mention Spark  Pig communities.

Regards
Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi


Re: Spark Streaming Output to DB

2014-08-26 Thread Mayur Rustagi
I would suggest you to use JDBC connector in mappartition instead of maps
as JDBC connections are costly  can really impact your performance.

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Tue, Aug 26, 2014 at 6:45 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 Yes, you can open a jdbc connection at the beginning of the map method
 then close this connection at the end of map() and in between you can use
 this connection.

 Thanks
 Best Regards


 On Tue, Aug 26, 2014 at 6:12 PM, Ravi Sharma raviprincesha...@gmail.com
 wrote:

 Hello People,

  I'm using java spark streaming. I'm just wondering, Can I make simple
 jdbc connection in JavaDStream map() method?

 Or

 Do  I need to create jdbc connection for each JavaPairDStream, after map
 task?

 Kindly give your thoughts.


 Cheers,
 Ravi Sharma






Re: DStream start a separate DStream

2014-08-22 Thread Mayur Rustagi
Why dont you directly use DStream created as output of windowing process?
Any reason
Regards
Mayur

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Thu, Aug 21, 2014 at 8:38 PM, Josh J joshjd...@gmail.com wrote:

 Hi,

 I would like to have a sliding window dstream perform a streaming
 computation and store these results. Once these results are stored, I then
 would like to process the results. Though I must wait until the final
 computation done for all tuples in the sliding window, before I begin the
 new DStream. How can I accomplish this with spark?

 Sincerely,
 Josh



Re: Mapping with extra arguments

2014-08-21 Thread Mayur Rustagi
You can add that as part of your RDD, so as output of your map operation
generate the input of your next map operation.. ofcourse the obscure logic
of generating that data has to be map ..
another way is nested def

def factorial(number: Int) : Int = {
def factorialWithAccumulator(accumulator: Int, number: Int) : Int = {
if (number == 1)
return accumulator
else
factorialWithAccumulator(accumulator * number, number - 1)
}
factorialWithAccumulator(1, number)
 }
 MyRDD.map(factorial(5))



Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Thu, Aug 21, 2014 at 12:03 PM, TJ Klein tjkl...@gmail.com wrote:

 Hi,

 I am using Spark in Python. I wonder if there is a possibility for passing
 extra arguments to the mapping function. In my scenario, after each map I
 update parameters, which I want to use in the folllowning new iteration of
 mapping. Any idea?

 Thanks in advance.

 -Tassilo



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Mapping-with-extra-arguments-tp12541.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: DStream cannot write to text file

2014-08-21 Thread Mayur Rustagi
provide the fullpath of where to write( like hdfs:// etc)

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Thu, Aug 21, 2014 at 8:29 AM, cuongpham92 cuongpha...@gmail.com wrote:

 Hi,
 I tried to write to text file from DStream in Spark Streaming, using
 DStream.saveAsTextFile(test,output), but it did not work.
 Any suggestions?
 Thanks in advance.
 Cuong



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/DStream-cannot-write-to-text-file-tp12525.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Accessing to elements in JavaDStream

2014-08-21 Thread Mayur Rustagi
transform your way :)
MyDStream.transform(RDD = RDD.map(wordChanger))

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Wed, Aug 20, 2014 at 1:25 PM, cuongpham92 cuongpha...@gmail.com wrote:

 Hi,
 I am a newbie to Spark Streaming, and I am quite confused about JavaDStream
 in SparkStreaming. In my situation, after catching a message Hello world
 from Kafka in JavaDStream, I want to access to JavaDStream and change this
 message to Hello John, but I could not figure how to do it.
 Any idea about this?
 Thanks,
 Cuong



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Accessing-to-elements-in-JavaDStream-tp12459.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: spark - reading hfds files every 5 minutes

2014-08-21 Thread Mayur Rustagi
Hi,
case class Person(name: String, age: Int)
val lines = ssc.textFileStream(blah blah)
val sqc = new SQLContext(sc);

lines.foreachRDD(rdd={
  rdd.map(_.split(,)).map(p = Persons(p(0),
p(1).trim.toInt)).registerAsTable(data)
  val teenagers = sqc.sql(SELECT * FROM data)
  teenagers.saveAsParquetFile(people.parquet)
})

You can also try insertInto API instead of registerAsTable..but havnt used
it myself..
also you need to dynamically change parquet file name for every dstream...


Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Wed, Aug 20, 2014 at 1:01 AM, salemi alireza.sal...@udo.edu wrote:

 Thank you but how do you convert the stream to parquet file?

 Ali



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Re-spark-reading-hfds-files-every-5-minutes-tp12359p12401.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: DStream cannot write to text file

2014-08-21 Thread Mayur Rustagi
is your hdfs running, can spark access it?

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Thu, Aug 21, 2014 at 1:15 PM, cuongpham92 cuongpha...@gmail.com wrote:

 I'm sorry, I just forgot /data after hdfs://localhost:50075. When I
 added
 it, a new exception showed up: Call to localhost/127.0.0.1:50075 failed
 on
 local exception. How could I fix it?
 Thanks,
 Cuong.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/DStream-cannot-write-to-text-file-tp12525p12560.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Question regarding spark data partition and coalesce. Need info on my use case.

2014-08-16 Thread Mayur Rustagi
Quite a good question, I assume you know the size of the cluster going in,
then you can essentially try to partition the data in some multiples of
that  use rangepartitioner to partition the data roughly equally. Dynamic
partitions are created based on number of blocks on filesystem  hence the
task overhead of scheduling so many tasks mostly kills the performance.

import org.apache.spark.RangePartitioner;
var file=sc.textFile(my local path)
var partitionedFile=file.map(x=(x,1))
var data= partitionedFile.partitionBy(new RangePartitioner(3, partitionedFile))




Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Sat, Aug 16, 2014 at 7:04 AM, abhiguruvayya sharath.abhis...@gmail.com
wrote:

 My use case as mentioned below.

 1. Read input data from local file system using sparkContext.textFile(input
 path).
 2. partition the input data(80 million records) into partitions using
 RDD.coalesce(numberOfPArtitions) before submitting it to mapper/reducer
 function. Without using coalesce() or repartition() on the input data spark
 executes really slow and fails with out of memory exception.

 The issue i am facing here is in deciding the number of partitions to be
 applied on the input data. *The input data  size varies every time and hard
 coding a particular value is not an option. And spark performs really well
 only when certain optimum partition is applied on the input data for which
 i
 have to perform lots of iteration(trial and error). Which is not an option
 in a production environment.*

 My question: Is there a thumb rule to decide the number of partitions
 required depending on the input data size and cluster resources
 available(executors,cores, etc...)? If yes please point me in that
 direction. Any help  is much appreciated.

 I am using spark 1.0 on yarn.

 Thanks,
 AG



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Question-regarding-spark-data-partition-and-coalesce-Need-info-on-my-use-case-tp12214.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Shared variable in Spark Streaming

2014-08-08 Thread Mayur Rustagi
You can also use Update by key interface to store this shared variable. As
for count you can use foreachRDD to run counts on RDD  then store that as
another RDD or put it in updatebykey

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Fri, Aug 8, 2014 at 11:46 AM, Soumitra Kumar kumar.soumi...@gmail.com
wrote:

 Hello,

 I want to count the number of elements in the DStream, like RDD.count() .
 Since there is no such method in DStream, I thought of using DStream.count
 and use the accumulator.

 How do I do DStream.count() to count the number of elements in a DStream?

 How do I create a shared variable in Spark Streaming?

 -Soumitra.



Re: Low Performance of Shark over Spark.

2014-08-08 Thread Mayur Rustagi
Hi Vinay,
First of all you should probably migrate to sparksql as shark is not
actively supported anymore.
The 100x benefit entails in-memory caching  DAG, since you are not able to
cache the performance can be quite low..
Alternatives you can explore
1. Use parquet as storage which will push down predicates smartly hence get
better performance (similar to impala)
2. cache data at a partition level from Hive  operate on those instead.

Regards
Mayur


Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Fri, Aug 8, 2014 at 10:44 AM, vinay.kash...@socialinfra.net wrote:

 Hi Meng,

 I cannot use cached table in this case as the data size is quite huge.

 Also, as I am trying to run adhoc queries, I cannot keep the table cached.
 I can cache the table only when my requirement is such that, type of
 queries are fixed and for specific set of data.



 Thanks and regards

 Vinay Kashyap

 
 From:Xiangrui Meng men...@gmail.com
 Sent:vinay.kash...@socialinfra.net
 Cc:user@spark.apache.org
 Date:Thu, August 7, 2014 11:06 pm
 Subject:Re: Low Performance of Shark over Spark.



  Did you cache the table? There are couple ways of caching a table in
  Shark: https://github.com/amplab/shark/wiki/Shark-User-Guide
 
  On Thu, Aug 7, 2014 at 6:51 AM, vinay.kash...@socialinfra.net wrote:
  Dear all,
 
  I am using Spark 0.9.2 in Standalone mode. Hive and HDFS in CDH 5.1.0.
 
  6 worker nodes each with memory 96GB and 32 cores.
 
  I am using Shark Shell to execute queries on Spark.
 
  I have a raw_table ( of size 3TB with replication 3 ) which is
  partitioned
  by year, month and day. I am running an adhoc query on one month data
  with
  some condition.
 
  For eg:
 
  CREATE TABLE temp_table AS SELECT field1,field2,field3 FROM raw_table
  WHERE
  year=2000 AND month=01 AND field10  some_value;
 
  It is claimed that the same Hive queries can run 100x faster with shark,
  but
  I don't see such a significant improvement when running the above query,
 
  I am getting almost same performance as when run in Hive which is around
  45
  seconds.
 
  The same query with Impala, takes very less time, almost 7 times less
  time
  than shark which is around 6 seconds. I have tried altering the below
  parameters for the spark jobs but did not see any difference.
 
  spark.local.dir
  spark.serializer
  spark.kryoserializer.buffer.mb
  spark.storage.memoryFraction
  spark.io.compression.codec
  spark.default.parallelism
 
  Any suggestions so that I can improve the performance of the query with
  Shark over Spark and make it comparable to Impala..??
 
 
 
  Thanks and regards
 
  Vinay Kashyap
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 
 





Re: Configuration setup and Connection refused

2014-08-05 Thread Mayur Rustagi
Spark is not able to communicate with your hadoop hdfs. Is your hdfs
running, if so can you try to explicitly connect to it with hadoop command
line tools giving full hostname  port.
Or test port using
  telnet localhost 9000
In all likelyhood either your hdfs is down, bound to wrong port/ip that
spark cannot access

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Tue, Aug 5, 2014 at 11:33 PM, alamin.ishak alamin.is...@gmail.com
wrote:

 Hi,
 Anyone? Any input would be much appreciated

 Thanks,
 Amin
 On 5 Aug 2014 00:31, Al Amin [hidden email]
 http://user/SendEmail.jtp?type=nodenode=11477i=0 wrote:

 Hi all,

 Any help would be much appreciated.

 Thanks,
 Al


 On Mon, Aug 4, 2014 at 7:09 PM, Al Amin [hidden email]
 http://user/SendEmail.jtp?type=nodenode=11477i=1 wrote:

 Hi all,

 I have setup 2 nodes (master and slave1) on stand alone mode. Tried
 running SparkPi example and its working fine. However when I move on to
  wordcount its giving me below error:

 14/08/04 21:40:33 INFO storage.MemoryStore: ensureFreeSpace(32856)
 called with curMem=0, maxMem=311387750
 14/08/04 21:40:33 INFO storage.MemoryStore: Block broadcast_0 stored as
 values in memory (estimated size 32.1 KB, free 296.9 MB)
 14/08/04 21:40:34 INFO ipc.Client: Retrying connect to server: master/
 10.0.1.27:9000. Already tried 0 time(s).
 14/08/04 21:40:35 INFO ipc.Client: Retrying connect to server: master/
 10.0.1.27:9000. Already tried 1 time(s).
 14/08/04 21:40:36 INFO ipc.Client: Retrying connect to server: master/
 10.0.1.27:9000. Already tried 2 time(s).
 14/08/04 21:40:37 INFO ipc.Client: Retrying connect to server: master/
 10.0.1.27:9000. Already tried 3 time(s).
 14/08/04 21:40:38 INFO ipc.Client: Retrying connect to server: master/
 10.0.1.27:9000. Already tried 4 time(s).
 14/08/04 21:40:39 INFO ipc.Client: Retrying connect to server: master/
 10.0.1.27:9000. Already tried 5 time(s).
 14/08/04 21:40:40 INFO ipc.Client: Retrying connect to server: master/
 10.0.1.27:9000. Already tried 6 time(s).
 14/08/04 21:40:41 INFO ipc.Client: Retrying connect to server: master/
 10.0.1.27:9000. Already tried 7 time(s).
 14/08/04 21:40:42 INFO ipc.Client: Retrying connect to server: master/
 10.0.1.27:9000. Already tried 8 time(s).
 14/08/04 21:40:43 INFO ipc.Client: Retrying connect to server: master/
 10.0.1.27:9000. Already tried 9 time(s).
 Exception in thread main java.lang.RuntimeException:
 java.net.ConnectException: Call to master/10.0.1.27:9000 failed on
 connection exception: java.net.ConnectException: Connection refused


 1) how to fix this issue? I have configure hostname --fqdn accordingly.

 2) I could see that in my logs that my master/worker deploy
 configuration is  -Xms512m -Xmx512m. Is there any way that I can increase
 it? Or 512mb is just fine? AFAIK, spark require huge memory.

 3) I have a hadoop cluster and its working. Could anyone point me how to
 integrate Yarn with Spark? Any good tutorial would be very useful


 Thanks,
 Al



 --
 View this message in context: Re: Configuration setup and Connection
 refused
 http://apache-spark-user-list.1001560.n3.nabble.com/Re-Configuration-setup-and-Connection-refused-tp11477.html
 Sent from the Apache Spark User List mailing list archive
 http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.



Re: Configuration setup and Connection refused

2014-08-05 Thread Mayur Rustagi
Then dont specify hdfs when you read file.
Also the community is quite active in response in general, just be a little
patient.
Also if possible look at spark training as part of spark summit 2014 vids
and/or amplabs training on spark website.

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Wed, Aug 6, 2014 at 1:58 AM, Al Amin alamin.is...@gmail.com wrote:

 Finally, someone reply. thank you, sir!
 But I am planning to deploy stand alone mode of Spark. I thought there is
 no need to use hdfs? And my spark is not being built with hadoop/yarn
 config.

 regards,
 Amin


 On Tue, Aug 5, 2014 at 10:39 PM, Mayur Rustagi mayur.rust...@gmail.com
 wrote:

 Spark is not able to communicate with your hadoop hdfs. Is your hdfs
 running, if so can you try to explicitly connect to it with hadoop command
 line tools giving full hostname  port.
 Or test port using
   telnet localhost 9000
 In all likelyhood either your hdfs is down, bound to wrong port/ip that
 spark cannot access

 Mayur Rustagi
 Ph: +1 (760) 203 3257
 http://www.sigmoidanalytics.com
 @mayur_rustagi https://twitter.com/mayur_rustagi



 On Tue, Aug 5, 2014 at 11:33 PM, alamin.ishak alamin.is...@gmail.com
 wrote:

 Hi,
 Anyone? Any input would be much appreciated

 Thanks,
 Amin
 On 5 Aug 2014 00:31, Al Amin [hidden email]
 http://user/SendEmail.jtp?type=nodenode=11477i=0 wrote:

 Hi all,

 Any help would be much appreciated.

 Thanks,
 Al


 On Mon, Aug 4, 2014 at 7:09 PM, Al Amin [hidden email]
 http://user/SendEmail.jtp?type=nodenode=11477i=1 wrote:

 Hi all,

 I have setup 2 nodes (master and slave1) on stand alone mode. Tried
 running SparkPi example and its working fine. However when I move on to
  wordcount its giving me below error:

 14/08/04 21:40:33 INFO storage.MemoryStore: ensureFreeSpace(32856)
 called with curMem=0, maxMem=311387750
 14/08/04 21:40:33 INFO storage.MemoryStore: Block broadcast_0 stored
 as values in memory (estimated size 32.1 KB, free 296.9 MB)
 14/08/04 21:40:34 INFO ipc.Client: Retrying connect to server: master/
 10.0.1.27:9000. Already tried 0 time(s).
 14/08/04 21:40:35 INFO ipc.Client: Retrying connect to server: master/
 10.0.1.27:9000. Already tried 1 time(s).
 14/08/04 21:40:36 INFO ipc.Client: Retrying connect to server: master/
 10.0.1.27:9000. Already tried 2 time(s).
 14/08/04 21:40:37 INFO ipc.Client: Retrying connect to server: master/
 10.0.1.27:9000. Already tried 3 time(s).
 14/08/04 21:40:38 INFO ipc.Client: Retrying connect to server: master/
 10.0.1.27:9000. Already tried 4 time(s).
 14/08/04 21:40:39 INFO ipc.Client: Retrying connect to server: master/
 10.0.1.27:9000. Already tried 5 time(s).
 14/08/04 21:40:40 INFO ipc.Client: Retrying connect to server: master/
 10.0.1.27:9000. Already tried 6 time(s).
 14/08/04 21:40:41 INFO ipc.Client: Retrying connect to server: master/
 10.0.1.27:9000. Already tried 7 time(s).
 14/08/04 21:40:42 INFO ipc.Client: Retrying connect to server: master/
 10.0.1.27:9000. Already tried 8 time(s).
 14/08/04 21:40:43 INFO ipc.Client: Retrying connect to server: master/
 10.0.1.27:9000. Already tried 9 time(s).
 Exception in thread main java.lang.RuntimeException:
 java.net.ConnectException: Call to master/10.0.1.27:9000 failed on
 connection exception: java.net.ConnectException: Connection refused


 1) how to fix this issue? I have configure hostname --fqdn
 accordingly.

 2) I could see that in my logs that my master/worker deploy
 configuration is  -Xms512m -Xmx512m. Is there any way that I can increase
 it? Or 512mb is just fine? AFAIK, spark require huge memory.

 3) I have a hadoop cluster and its working. Could anyone point me how
 to integrate Yarn with Spark? Any good tutorial would be very useful


 Thanks,
 Al



 --
 View this message in context: Re: Configuration setup and Connection
 refused
 http://apache-spark-user-list.1001560.n3.nabble.com/Re-Configuration-setup-and-Connection-refused-tp11477.html
 Sent from the Apache Spark User List mailing list archive
 http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.






Re: Installing Spark 0.9.1 on EMR Cluster

2014-08-01 Thread Mayur Rustagi
Have you tried
https://aws.amazon.com/articles/Elastic-MapReduce/4926593393724923
Thr is also a 0.9.1 version they talked about in one of the meetups.
Check out the s3 bucket inthe guide.. it should have a 0.9.1 version as
well.

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Thu, Jul 31, 2014 at 4:58 PM, nit nitinp...@gmail.com wrote:

 Have you tried flag  --spark-version of spark-ec2 ?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Installing-Spark-0-9-1-on-EMR-Cluster-tp11084p11096.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: How to read from OpenTSDB using PySpark (or Scala Spark)?

2014-08-01 Thread Mayur Rustagi
What is the usecase you are looking at?

Tsdb is not designed for you to query data directly from HBase, Ideally you
should use REST API if you are looking to do thin analysis. Are you looking
to do whole reprocessing of TSDB ?

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Fri, Aug 1, 2014 at 2:39 PM, bumble123 tc1...@att.com wrote:

 Hi,

 I've seen many threads about reading from HBase into Spark, but none about
 how to read from OpenTSDB into Spark. Does anyone know anything about this?
 I tried looking into it, but I think OpenTSDB saves its information into
 HBase using hex and I'm not sure how to interpret the data. If you could
 show me some examples of how to extract the information from OpenTSDB,
 that'd be great! Thanks in advance!



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-read-from-OpenTSDB-using-PySpark-or-Scala-Spark-tp11211.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: RDD to DStream

2014-08-01 Thread Mayur Rustagi
Nice question :)
Ideally you should use a queuestream interface to push RDD into a queue 
then spark streaming can handle the rest.
Though why are you looking to convert RDD to DStream, another workaround
folks use is to source DStream from folders  move files that they need
reprocessed back into the folder, its a hack but much less headache .

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Fri, Aug 1, 2014 at 10:21 AM, Aniket Bhatnagar 
aniket.bhatna...@gmail.com wrote:

 Hi everyone

 I haven't been receiving replies to my queries in the distribution list.
 Not pissed but I am actually curious to know if my messages are actually
 going through or not. Can someone please confirm that my msgs are getting
 delivered via this distribution list?

 Thanks,
 Aniket


 On 1 August 2014 13:55, Aniket Bhatnagar aniket.bhatna...@gmail.com
 wrote:

 Sometimes it is useful to convert a RDD into a DStream for testing
 purposes (generating DStreams from historical data, etc). Is there an easy
 way to do this?

 I could come up with the following inefficient way but no sure if there
 is a better way to achieve this. Thoughts?

 class RDDExtension[T](rdd: RDD[T]) {

   def chunked(chunkSize: Int): RDD[Seq[T]] = {
 rdd.mapPartitions(partitionItr = partitionItr.grouped(chunkSize))
   }

   def skipFirst(): RDD[T] = {
 rdd.zipWithIndex().filter(tuple = tuple._2  0).map(_._1)
   }

   def toStream(streamingContext: StreamingContext, chunkSize: Int,
 slideDurationMilli: Option[Long] = None): DStream[T] = {
 new InputDStream[T](streamingContext) {

   @volatile private var currentRDD: RDD[Seq[T]] =
 rdd.chunked(chunkSize)

   override def start(): Unit = {}

   override def stop(): Unit = {}

   override def compute(validTime: Time): Option[RDD[T]] = {
 val chunk = currentRDD.take(1)
 currentRDD = currentRDD.skipFirst()
 Some(rdd.sparkContext.parallelize(chunk))
   }

   override def slideDuration = {
 slideDurationMilli.map(duration = new Duration(duration)).
   getOrElse(super.slideDuration)
   }
 }

 }





Re: Accumulator and Accumulable vs classic MR

2014-08-01 Thread Mayur Rustagi
Only blocker is accumulator can be only added to from slaves  only read
on the master. If that constraint fit you well you can fire away.

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Fri, Aug 1, 2014 at 7:38 AM, Julien Naour julna...@gmail.com wrote:

 Hi,

 My question is simple: could it be some performance issue using
 Accumulable/Accumulator instead of method like map() reduce()... ?

 My use case : implementation of a clustering algorithm like k-means.
 At the begining I used two steps, one to asign data to cluster and another
 to calculate new centroids.
 After some research I use now an accumulable with an Array to calculate
 new centroid during the assigment of data. It's easier to unterstand and
 for the moment it gives better performance.
 It's probably because I used 2 steps before and now only one thanks to
 accumulable.

 So any indications against it ?

 Cheers,

 Julien




Re: How to read from OpenTSDB using PySpark (or Scala Spark)?

2014-08-01 Thread Mayur Rustagi
Http Api would be the best bet, I assume by graph you mean the charts
created by tsdb frontends.

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Fri, Aug 1, 2014 at 4:48 PM, bumble123 tc1...@att.com wrote:

 I'm trying to get metrics out of TSDB so I can use Spark to do anomaly
 detection on graphs.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-read-from-OpenTSDB-using-PySpark-or-Scala-Spark-tp11211p11232.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: The function of ClosureCleaner.clean

2014-07-28 Thread Mayur Rustagi
I am not sure specifically about specific purpose of this function but
Spark needs to remove elements from the closure that may be included by
default but not really needed so as to serialize it  send it to executors
to operate on RDD. For example a function in Map function of RDD  may
reference objects inside the class, so you may want to send across those
objects but not the whole parent class.


Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Mon, Jul 28, 2014 at 8:28 PM, Wang, Jensen jensen.w...@sap.com wrote:

  Hi, All

   Before sc.runJob invokes dagScheduler.runJob, the func
 performed on the rdd is “cleaned” by ClosureCleaner.clearn.

  Why  spark has to do this? What’s the purpose?



Spark as a application library vs infra

2014-07-27 Thread Mayur Rustagi
Based on some discussions with my application users, I have been trying to come 
up with a standard way to deploy applications built on Spark

1. Bundle the version of spark with your application and ask users store it in 
hdfs before referring it in yarn to boot your application
2. Provide ways to manage dependency in your app across various versions of 
spark bundled in with Hadoop distributions 

1 provides greater control and reliability as I am only working against yarn 
versions and dependencies, I assume 2 gives me some benefits of distribution 
versions of spark (easier management, common sysops tools ?? ) . 
I was wondering if anyone has thoughts around both and any reasons to prefer 
one over the other. 

Sent from my iPad

Re: persistent HDFS instance for cluster restarts/destroys

2014-07-23 Thread Mayur Rustagi
Yes you lose the data
You can add machines but will require you to restart the cluster. Also
adding is manual on you add nodes
Regards
Mayur

On Wednesday, July 23, 2014, durga durgak...@gmail.com wrote:

 Hi All,
 I have a question,

 For my company , we are planning to use spark-ec2 scripts to create cluster
 for us.

 I understand that , persistent HDFS will make the hdfs available for
 cluster
 restarts.

 Question is:

 1) What happens , If I destroy and re-create , do I loose the data.
 a) If I loose the data , is there only way is to copy to s3 and recopy
 after launching the cluster(it seems costly data transfer from and to s3?)
 2) How would I add/remove some machines in the cluster?. I mean I am asking
 for cluster management.
 Is there any place amazon allows to see the machines , and do the operation
 of adding and removing?

 Thanks,
 D.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/persistent-HDFS-instance-for-cluster-restarts-destroys-tp10551.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



-- 
Sent from Gmail Mobile


Re: What if there are large, read-only variables shared by all map functions?

2014-07-23 Thread Mayur Rustagi
Have a look at broadcast variables .


On Tuesday, July 22, 2014, Parthus peng.wei@gmail.com wrote:

 Hi there,

 I was wondering if anybody could help me find an efficient way to make a
 MapReduce program like this:

 1) For each map function, it need access some huge files, which is around
 6GB

 2) These files are READ-ONLY. Actually they are like some huge look-up
 table, which will not change during 2~3 years.

 I tried two ways to make the program work, but neither of them is
 efficient:

 1) The first approach I tried is to let each map function load those files
 independently, like this:

 map (...) { load(files); DoMapTask(...)}

 2) The second approach I tried is to load the files before RDD.map(...) and
 broadcast the files. However, because the files are too large, the
 broadcasting overhead is 30min ~ 1 hour.

 Could anybody help me find an efficient way to solve it?

 Thanks very much.










 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/What-if-there-are-large-read-only-variables-shared-by-all-map-functions-tp10435.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



-- 
Sent from Gmail Mobile


Re: Filtering data during the read

2014-07-09 Thread Mayur Rustagi
Hi,
Spark does that out of the box for you :)
It compresses down the execution steps as much as possible.
Regards
Mayur

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Wed, Jul 9, 2014 at 3:15 PM, Konstantin Kudryavtsev 
kudryavtsev.konstan...@gmail.com wrote:

 Hi all,

 I wondered if you could help me to clarify the next situation:
 in the classic example

 val file = spark.textFile(hdfs://...)
 val errors = file.filter(line = line.contains(ERROR))

 As I understand, the data is read in memory in first, and after that
 filtering is applying. Is it any way to apply filtering during the read
 step? and don't put all objects into memory?

 Thank you,
 Konstantin Kudryavtsev



Re: Spark job tracker.

2014-07-09 Thread Mayur Rustagi
 val sem = 0
sc.addSparkListener(new SparkListener {
  override def onTaskStart(taskStart: SparkListenerTaskStart) {
sem +=1
  }

})
sc = spark context



Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Wed, Jul 9, 2014 at 4:34 AM, abhiguruvayya sharath.abhis...@gmail.com
wrote:

 Hello Mayur,

 How can I implement these methods mentioned below. Do u you have any clue
 on
 this pls et me know.


 public void onJobStart(SparkListenerJobStart arg0) {
 }

 @Override
 public void onStageCompleted(SparkListenerStageCompleted arg0) {
 }

 @Override
 public void onStageSubmitted(SparkListenerStageSubmitted arg0) {

 }



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-job-tracker-tp8367p9104.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Pig 0.13, Spark, Spork

2014-07-07 Thread Mayur Rustagi
Hi,
We have fixed many major issues around Spork  deploying it with some
customers. Would be happy to provide a working version to you to try out.
We are looking for more folks to try it out  submit bugs.

Regards
Mayur

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Mon, Jul 7, 2014 at 8:21 PM, Bertrand Dechoux decho...@gmail.com wrote:

 Hi,

 I was wondering what was the state of the Pig+Spark initiative now that
 the execution engine of Pig is pluggable? Granted, it was done in order to
 use Tez but could it be used by Spark? I know about a 'theoretical' project
 called Spork but I don't know any stable and maintained version of it.

 Regards

 Bertrand Dechoux



Re: Pig 0.13, Spark, Spork

2014-07-07 Thread Mayur Rustagi
That version is old :).
We are not forking pig but cleanly separating out pig execution engine. Let
me know if you are willing to give it a go.

Also would love to know what features of pig you are using ?

Regards
Mayur

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Mon, Jul 7, 2014 at 8:46 PM, Bertrand Dechoux decho...@gmail.com wrote:

 I saw a wiki page from your company but with an old version of Spark.

 http://docs.sigmoidanalytics.com/index.php/Setting_up_spork_with_spark_0.8.1

 I have no reason to use it yet but I am interested in the state of the
 initiative.
 What's your point of view (personal and/or professional) about the Pig
 0.13 release?
 Is the pluggable execution engine flexible enough in order to avoid having
 Spork as a fork of Pig? Pig + Spark + Fork = Spork :D

 As a (for now) external observer, I am glad to see competition in that
 space. It can only be good for the community in the end.

 Bertrand Dechoux


 On Mon, Jul 7, 2014 at 5:00 PM, Mayur Rustagi mayur.rust...@gmail.com
 wrote:

 Hi,
 We have fixed many major issues around Spork  deploying it with some
 customers. Would be happy to provide a working version to you to try out.
 We are looking for more folks to try it out  submit bugs.

 Regards
 Mayur

 Mayur Rustagi
 Ph: +1 (760) 203 3257
 http://www.sigmoidanalytics.com
 @mayur_rustagi https://twitter.com/mayur_rustagi



 On Mon, Jul 7, 2014 at 8:21 PM, Bertrand Dechoux decho...@gmail.com
 wrote:

 Hi,

 I was wondering what was the state of the Pig+Spark initiative now that
 the execution engine of Pig is pluggable? Granted, it was done in order to
 use Tez but could it be used by Spark? I know about a 'theoretical' project
 called Spork but I don't know any stable and maintained version of it.

 Regards

 Bertrand Dechoux






Re: Is the order of messages guaranteed in a DStream?

2014-07-07 Thread Mayur Rustagi
If you receive data through multiple receivers across the cluster. I don't
think any order can be guaranteed. Order in distributed systems is tough.

On Tuesday, July 8, 2014, Yan Fang yanfang...@gmail.com wrote:

 I know the order of processing DStream is guaranteed. Wondering if the
 order of messages in one DStream is guaranteed. My gut feeling is yes for
 the question because RDD is immutable. Some simple tests prove this. Want
 to hear from authority to persuade myself. Thank you.

 Best,

 Fang, Yan
 yanfang...@gmail.com
 javascript:_e(%7B%7D,'cvml','yanfang...@gmail.com');
 +1 (206) 849-4108



-- 
Sent from Gmail Mobile


Re: window analysis with Spark and Spark streaming

2014-07-05 Thread Mayur Rustagi
Key idea is to simulate your app time as you enter data . So you can
connect spark streaming to a queue and insert data in it spaced by time.
Easier said than done :). What are the parallelism issues you are hitting
with your static approach.

On Friday, July 4, 2014, alessandro finamore alessandro.finam...@polito.it
wrote:

 Thanks for the replies

 What is not completely clear to me is how time is managed.
 I can create a DStream from file.
 But if I set the window property that will be bounded to the application
 time, right?

 If I got it right, with a receiver I can control the way DStream are
 created.
 But, how can apply then the windowing already shipped with the framework if
 this is bounded to the application time?
 I would like to do define a window of N files but the window() function
 requires a duration as input...




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/window-analysis-with-Spark-and-Spark-streaming-tp8806p8824.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



-- 
Sent from Gmail Mobile


Re: Spark memory optimization

2014-07-04 Thread Mayur Rustagi
I would go with Spark only if you are certain that you are going to scale
out in the near future.
You can change the default storage of RDD to DISK_ONLY, that might remove
issues around any rdd leveraging memory. Thr are some functions
particularly sortbykey that require data to fit in memory to work, so you
may be hitting some of those walls too.
Regards
Mayur

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Fri, Jul 4, 2014 at 2:36 PM, Igor Pernek i...@pernek.net wrote:

  Hi all!

 I have a folder with 150 G of txt files (around 700 files, on average each
 200 MB).

 I'm using scala to process the files and calculate some aggregate
 statistics in the end. I see two possible approaches to do that: - manually
 loop through all the files, do the calculations per file and merge the
 results in the end - read the whole folder to one RDD, do all the
 operations on this single RDD and let spark do all the parallelization

 I'm leaning towards the second approach as it seems cleaner (no need for
 parallelization specific code), but I'm wondering if my scenario will fit
 the constraints imposed by my hardware and data. I have one workstation
 with 16 threads and 64 GB of RAM available (so the parallelization will be
 strictly local between different processor cores). I might scale the
 infrastructure with more machines later on, but for now I would just like
 to focus on tunning the settings for this one workstation scenario.

 The code I'm using: - reads TSV files, and extracts meaningful data to
 (String, String, String) triplets - afterwards some filtering, mapping and
 grouping is performed - finally, the data is reduced and some aggregates
 are calculated

 I've been able to run this code with a single file (~200 MB of data),
 however I get a java.lang.OutOfMemoryError: GC overhead limit exceeded
 and/or a Java out of heap exception when adding more data (the application
 breaks with 6GB of data but I would like to use it with 150 GB of data).

 I guess I would have to tune some parameters to make this work. I would
 appreciate any tips on how to approach this problem (how to debug for
 memory demands). I've tried increasing the 'spark.executor.memory' and
 using a smaller number of cores (the rational being that each core needs
 some heap space), but this didn't solve my problems.

 I don't need the solution to be very fast (it can easily run for a few
 hours even days if needed). I'm also not caching any data, but just saving
 them to the file system in the end. If you think it would be more feasible
 to just go with the manual parallelization approach, I could do that as
 well.

 Thanks,

 Igor



Re: LIMIT with offset in SQL queries

2014-07-04 Thread Mayur Rustagi
What I typically do is use row_number  subquery to filter based on that.
It works out pretty well, reduces the iteration. I think a offset solution
based on windowsing directly would be useful.

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Fri, Jul 4, 2014 at 2:00 AM, Michael Armbrust mich...@databricks.com
wrote:

 Doing an offset is actually pretty expensive in a distributed query
 engine, so in many cases it probably makes sense to just collect and then
 perform the offset as you are doing now.  This is unless the offset is very
 large.

 Another limitation here is that HiveQL does not support OFFSET.  That said
 if you want to open a JIRA we would consider implementing it.


 On Wed, Jul 2, 2014 at 1:37 PM, durin m...@simon-schaefer.net wrote:

 Hi,

 in many SQL-DBMS like MySQL, you can set an offset for the LIMIT clause,
 s.t. /LIMIT 5, 10/ will return 10 rows, starting from row 5.

 As far as I can see, this is not possible in Spark-SQL.
 The best solution I have to imitate that (using Scala) is converting the
 RDD
 into an Array via collect() and then using a for-loop to return certain
 elements from that Array.




 Is there a better solution regarding performance and are there plans to
 implement an offset for LIMIT?


 Kind regards,
 Simon



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/LIMIT-with-offset-in-SQL-queries-tp8673.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.





Re: Visualize task distribution in cluster

2014-07-04 Thread Mayur Rustagi
You'll get most of that information from mesos interface. You may not get
transfer of data information particularly.


Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Thu, Jul 3, 2014 at 6:28 AM, Tobias Pfeiffer t...@preferred.jp wrote:

 Hi,

 I am using Mesos to run my Spark tasks. I would be interested to see how
 Spark distributes the tasks in the cluster (nodes, partitions) and which
 nodes are more or less active and do what kind of tasks, and how long the
 transfer of data and jobs takes. Is there any way to get this information
 from Spark?

 Thanks
 Tobias



Re: Spark job tracker.

2014-07-04 Thread Mayur Rustagi
The application server doesnt provide json api unlike the cluster
interface(8080).
If you are okay to patch spark, you can use our patch to create json API,
or you can use sparklistener interface in your application to get that info
out.

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Thu, Jul 3, 2014 at 6:05 AM, abhiguruvayya sharath.abhis...@gmail.com
wrote:

 Spark displays job status information on port 4040 using
 JobProgressListener,
 any one knows how to hook into this port and read the details?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-job-tracker-tp8367p8697.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Error: UnionPartition cannot be cast to org.apache.spark.rdd.HadoopPartition

2014-07-02 Thread Mayur Rustagi
Ideally you should be converting RDD to schemardd ?
You are creating UnionRDD to join across dstream rdd?


Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Tue, Jul 1, 2014 at 3:11 PM, Honey Joshi honeyjo...@ideata-analytics.com
 wrote:

 Hi,
 I am trying to run a project which takes data as a DStream and dumps the
 data in the Shark table after various operations. I am getting the
 following error :

 Exception in thread main org.apache.spark.SparkException: Job aborted:
 Task 0.0:0 failed 1 times (most recent failure: Exception failure:
 java.lang.ClassCastException: org.apache.spark.rdd.UnionPartition cannot
 be cast to org.apache.spark.rdd.HadoopPartition)
 at

 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
 at

 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
 at

 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at
 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at
 org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
 at

 org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
 at

 org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
 at scala.Option.foreach(Option.scala:236)
 at

 org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619)
 at

 org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
 at akka.actor.ActorCell.invoke(ActorCell.scala:456)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
 at akka.dispatch.Mailbox.run(Mailbox.scala:219)
 at

 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
 at
 scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at

 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at

 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

 Can someone please explain the cause of this error, I am also using a
 Spark Context with the existing Streaming Context.



Re: spark streaming counter metrics

2014-07-02 Thread Mayur Rustagi
You may be able to mix StreamingListener  SparkListener to get meaningful
information about your task. however you need to connect a lot of pieces to
make sense of the flow..

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Mon, Jun 30, 2014 at 9:58 PM, Chen Song chen.song...@gmail.com wrote:

 I am new to spark streaming and wondering if spark streaming tracks
 counters (e.g., how many rows in each consumer, how many rows routed to an
 individual reduce task, etc.) in any form so I can get an idea of how data
 is skewed? I checked spark job page but don't seem to find any.



 --
 Chen Song




Re: Callbacks on freeing up of RDDs

2014-07-02 Thread Mayur Rustagi
A lot of RDD that you create in Code may not even be constructed as the
tasks layer is optimized in the DAG scheduler.. The closest is onUnpersistRDD
in SparkListner.

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Mon, Jun 30, 2014 at 4:48 PM, Jaideep Dhok jaideep.d...@inmobi.com
wrote:

 Hi all,
 I am trying to create a custom RDD class for result set of queries
 supported in InMobi Grill (http://inmobi.github.io/grill/)

 Each result set has a schema (similar to Hive's TableSchema) and a path in
 HDFS containing the result set data.

 An easy way of doing this would be to create a temp table in Hive, and use
 HCatInputFormat to create an RDD using the newAPIHadoopRDD call. I've
 already done this and it works.

 However, I also want to *delete* the temp table when the RDD is
 unpersisted, or when the SparkContext is gone. How could I do that in Spark?

 Does Spark allow users to register code to be executed when an RDD is
 freed? Something like the OutputCommitter in Hadoop?

 Thanks,
 Jaideep

 _
 The information contained in this communication is intended solely for the
 use of the individual or entity to whom it is addressed and others
 authorized to receive it. It may contain confidential or legally privileged
 information. If you are not the intended recipient you are hereby notified
 that any disclosure, copying, distribution or taking any action in reliance
 on the contents of this information is strictly prohibited and may be
 unlawful. If you have received this communication in error, please notify
 us immediately by responding to this email and then delete it from your
 system. The firm is neither liable for the proper and complete transmission
 of the information contained in this communication nor for any delay in its
 receipt.


Re: Error: UnionPartition cannot be cast to org.apache.spark.rdd.HadoopPartition

2014-07-02 Thread Mayur Rustagi
two job context cannot share data, are you collecting the data to the
master  then sending it to the other context?

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Wed, Jul 2, 2014 at 11:57 AM, Honey Joshi 
honeyjo...@ideata-analytics.com wrote:

 On Wed, July 2, 2014 1:11 am, Mayur Rustagi wrote:
  Ideally you should be converting RDD to schemardd ?
  You are creating UnionRDD to join across dstream rdd?
 
 
 
  Mayur Rustagi
  Ph: +1 (760) 203 3257
  http://www.sigmoidanalytics.com
  @mayur_rustagi https://twitter.com/mayur_rustagi
 
 
 
 
  On Tue, Jul 1, 2014 at 3:11 PM, Honey Joshi
  honeyjo...@ideata-analytics.com
 
  wrote:
 
 
  Hi,
  I am trying to run a project which takes data as a DStream and dumps the
   data in the Shark table after various operations. I am getting the
  following error :
 
  Exception in thread main org.apache.spark.SparkException: Job
  aborted:
  Task 0.0:0 failed 1 times (most recent failure: Exception failure:
  java.lang.ClassCastException: org.apache.spark.rdd.UnionPartition cannot
   be cast to org.apache.spark.rdd.HadoopPartition) at
 
  org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$sched
  uler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
  at
 
  org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$sched
  uler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
  at
 
  scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.sc
  ala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
  at org.apache.spark.scheduler.DAGScheduler.org
  $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:102
  6)
  at
 
  org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(
  DAGScheduler.scala:619)
  at
 
  org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(
  DAGScheduler.scala:619)
  at scala.Option.foreach(Option.scala:236) at
 
  org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala
  :619)
  at
 
  org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonf
  un$receive$1.applyOrElse(DAGScheduler.scala:207)
  at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at
  akka.actor.ActorCell.invoke(ActorCell.scala:456)
  at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at
  akka.dispatch.Mailbox.run(Mailbox.scala:219)
  at
 
  akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(Abstra
  ctDispatcher.scala:386)
  at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
  at
 
  scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.j
  ava:1339)
  at
  scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979
  )
  at
 
  scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread
  .java:107)
 
 
  Can someone please explain the cause of this error, I am also using a
  Spark Context with the existing Streaming Context.
 
 
 

 I am using spark 0.9.0-Incubating, so it doesnt have anything to do with
 schemaRDD.This error is probably coming when I am trying to use one spark
 context and one shark context in the same job.Is there any way to
 incorporate two context in one job?
 Regards

 Honey Joshi
 Ideata-Analytics




Re: Serializer or Out-of-Memory issues?

2014-07-02 Thread Mayur Rustagi
Your executors are going out of memory  then subsequent tasks scheduled on
the scheduler are also failing, hence the lost tid(task id).


Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Mon, Jun 30, 2014 at 7:47 PM, Sguj tpcome...@yahoo.com wrote:

 I'm trying to perform operations on a large RDD, that ends up being about
 1.3
 GB in memory when loaded in. It's being cached in memory during the first
 operation, but when another task begins that uses the RDD, I'm getting this
 error that says the RDD was lost:

 14/06/30 09:48:17 INFO TaskSetManager: Serialized task 1.0:4 as 8245 bytes
 in 0 ms
 14/06/30 09:48:17 WARN TaskSetManager: Lost TID 15611 (task 1.0:3)
 14/06/30 09:48:17 WARN TaskSetManager: Loss was due to
 org.apache.spark.api.python.PythonException
 org.apache.spark.api.python.PythonException: Traceback (most recent call
 last):
   File /Users/me/Desktop/spark-1.0.0/python/pyspark/worker.py, line 73,
 in
 main
 command = pickleSer._read_with_length(infile)
   File /Users/me/Desktop/spark-1.0.0/python/pyspark/serializers.py, line
 142, in _read_with_length
 length = read_int(stream)
   File /Users/me/Desktop/spark-1.0.0/python/pyspark/serializers.py, line
 337, in read_int
 raise EOFError
 EOFError

 at
 org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:115)
 at
 org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:145)
 at
 org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:78)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
 at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
 at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
 at
 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
 at org.apache.spark.scheduler.Task.run(Task.scala:51)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
 at

 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at

 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)
 14/06/30 09:48:18 INFO AppClient$ClientActor: Executor updated:
 app-20140630090515-/0 is now FAILED (Command exited with code 52)
 14/06/30 09:48:18 INFO SparkDeploySchedulerBackend: Executor
 app-20140630090515-/0 removed: Command exited with code 52
 14/06/30 09:48:18 INFO SparkDeploySchedulerBackend: Executor 0
 disconnected,
 so removing it
 14/06/30 09:48:18 ERROR TaskSchedulerImpl: Lost executor 0 on localhost:
 OutOfMemoryError
 14/06/30 09:48:18 INFO TaskSetManager: Re-queueing tasks for 0 from TaskSet
 1.0
 14/06/30 09:48:18 WARN TaskSetManager: Lost TID 15610 (task 1.0:2)
 14/06/30 09:48:18 WARN TaskSetManager: Lost TID 15609 (task 1.0:1)
 14/06/30 09:48:18 WARN TaskSetManager: Lost TID 15612 (task 1.0:4)
 14/06/30 09:48:18 WARN TaskSetManager: Lost TID 15608 (task 1.0:0)


 The operation it fails on is a ReduceByKey(), and the RDD before the
 operation is split into several thousand partitions (I'm doing term
 weighting that requires a different partition initially for each document),
 and the system has 6 GB of memory for the executor, so I'm not sure if it's
 actually a memory error, as is mentioned 5 lines from the end of the error.
 The serializer error portion is what's really confusing me, and I can't
 find
 references to this particular error with Spark anywhere.

 Does anyone have a clue as to what the actual error might be here, and what
 a possible solution would be?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Serializer-or-Out-of-Memory-issues-tp8533.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Lost TID: Loss was due to fetch failure from BlockManagerId

2014-07-01 Thread Mayur Rustagi
It could be cause you are out of memory on the worker nodes  blocks are
not getting registered..
A older issue with 0.6.0 was with dead nodes causing loss of task  then
resubmission of data in an infinite loop... It was fixed in 0.7.0 though.
Are you seeing a crash log in this log.. or in the worker log @ 192.168.222.164
or any of the machines where the crash log is displayed.

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Wed, Jul 2, 2014 at 7:51 AM, Yana Kadiyska yana.kadiy...@gmail.com
wrote:

 A lot of things can get funny when you run distributed as opposed to
 local -- e.g. some jar not making it over. Do you see anything of
 interest in the log on the executor machines -- I'm guessing
 192.168.222.152/192.168.222.164. From here

 https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
 seems like the warning message is logged after the task fails -- but I
 wonder if you might see something more useful as to why it failed to
 begin with. As an example we've had cases in Hdfs where a small
 example would work, but on a larger example we'd hit a bad file. But
 the executor log is usually pretty explicit as to what happened...

 On Tue, Jul 1, 2014 at 8:57 PM, Mohammed Guller moham...@glassbeam.com
 wrote:
  I am running Spark 1.0 on a 4-node standalone spark cluster (1 master + 3
  worker). Our app is fetching data from Cassandra and doing a basic
 filter,
  map, and countByKey on that data. I have run into a strange problem.
 Even if
  the number of rows in Cassandra is just 1M, the Spark job goes seems to
 go
  into an infinite loop and runs for hours. With a small amount of data
 (less
  than 100 rows), the job does finish, but takes almost 30-40 seconds and
 we
  frequently see the messages shown below. If we run the same application
 on a
  single node Spark (--master local[4]), then we don’t see these warnings
 and
  the task finishes in less than 6-7 seconds. Any idea what could be the
 cause
  for these problems when we run our application on a standalone 4-node
 spark
  cluster?
 
 
 
  14/06/30 19:30:16 WARN TaskSetManager: Lost TID 25036 (task 6.0:90)
 
  14/06/30 19:30:16 WARN TaskSetManager: Loss was due to fetch failure from
  BlockManagerId(2, 192.168.222.164, 57185, 0)
 
  14/06/30 19:30:18 WARN TaskSetManager: Lost TID 25310 (task 6.1:0)
 
  14/06/30 19:30:18 WARN TaskSetManager: Loss was due to fetch failure from
  BlockManagerId(2, 192.168.222.164, 57185, 0)
 
  14/06/30 19:30:19 WARN TaskSetManager: Lost TID 25582 (task 6.2:0)
 
  14/06/30 19:30:19 WARN TaskSetManager: Loss was due to fetch failure from
  BlockManagerId(2, 192.168.222.164, 57185, 0)
 
  14/06/30 19:30:21 WARN TaskSetManager: Lost TID 25882 (task 6.3:34)
 
  14/06/30 19:30:21 WARN TaskSetManager: Loss was due to fetch failure from
  BlockManagerId(0, 192.168.222.142, 39342, 0)
 
  14/06/30 19:30:22 WARN TaskSetManager: Lost TID 26152 (task 6.4:0)
 
  14/06/30 19:30:22 WARN TaskSetManager: Loss was due to fetch failure from
  BlockManagerId(0, 192.168.222.142, 39342, 0)
 
  14/06/30 19:30:23 WARN TaskSetManager: Lost TID 26427 (task 6.5:4)
 
  14/06/30 19:30:23 WARN TaskSetManager: Loss was due to fetch failure from
  BlockManagerId(2, 192.168.222.164, 57185, 0)
 
  14/06/30 19:30:25 WARN TaskSetManager: Lost TID 26690 (task 6.6:0)
 
  14/06/30 19:30:25 WARN TaskSetManager: Loss was due to fetch failure from
  BlockManagerId(2, 192.168.222.164, 57185, 0)
 
  14/06/30 19:30:26 WARN TaskSetManager: Lost TID 26959 (task 6.7:0)
 
  14/06/30 19:30:26 WARN TaskSetManager: Loss was due to fetch failure from
  BlockManagerId(2, 192.168.222.164, 57185, 0)
 
  14/06/30 19:30:28 WARN TaskSetManager: Lost TID 27449 (task 6.8:218)
 
  14/06/30 19:30:28 WARN TaskSetManager: Loss was due to fetch failure from
  BlockManagerId(2, 192.168.222.164, 57185, 0)
 
  14/06/30 19:30:30 WARN TaskSetManager: Lost TID 27718 (task 6.9:0)
 
  14/06/30 19:30:30 WARN TaskSetManager: Loss was due to fetch failure from
  BlockManagerId(2, 192.168.222.164, 57185, 0)
 
  14/06/30 19:30:30 WARN TaskSetManager: Loss was due to fetch failure from
  BlockManagerId(2, 192.168.222.164, 57185, 0)
 
  14/06/30 19:30:31 WARN TaskSetManager: Lost TID 27991 (task 6.10:1)
 
  14/06/30 19:30:31 WARN TaskSetManager: Loss was due to fetch failure from
  BlockManagerId(2, 192.168.222.164, 57185, 0)
 
  14/06/30 19:30:33 WARN TaskSetManager: Lost TID 28265 (task 6.11:0)
 
  14/06/30 19:30:33 WARN TaskSetManager: Loss was due to fetch failure from
  BlockManagerId(2, 192.168.222.164, 57185, 0)
 
  14/06/30 19:30:34 WARN TaskSetManager: Lost TID 28550 (task 6.12:0)
 
  14/06/30 19:30:34 WARN TaskSetManager: Loss was due to fetch failure from
  BlockManagerId(2, 192.168.222.164, 57185, 0)
 
  14/06/30 19:30:36 WARN TaskSetManager: Lost TID 28822 (task 6.13:0)
 
  14/06/30 19:30:36 WARN TaskSetManager: Loss was due to fetch failure from

Re: Distribute data from Kafka evenly on cluster

2014-06-28 Thread Mayur Rustagi
how abou this?
https://groups.google.com/forum/#!topic/spark-users/ntPQUZFJt4M


Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Sat, Jun 28, 2014 at 10:19 AM, Tobias Pfeiffer t...@preferred.jp wrote:

 Hi,

 I have a number of questions using the Kafka receiver of Spark
 Streaming. Maybe someone has some more experience with that and can
 help me out.

 I have set up an environment for getting to know Spark, consisting of
 - a Mesos cluster with 3 only-slaves and 3 master-and-slaves,
 - 2 Kafka nodes,
 - 3 Zookeeper nodes providing service to both Kafka and Mesos.

 My Kafka cluster has only one topic with one partition (replicated to
 both nodes). When I start my Kafka receiver, it successfully connects
 to Kafka and does the processing, but it seems as if the (expensive)
 function in the final foreachRDD(...) is only executed on one node of
 my cluster, which is not what I had in mind when setting up the
 cluster ;-)

 So first, I was wondering about the parameter `topics: Map[String,
 Int]` to KafkaUtils.createStream(). Apparently it controls how many
 connections are made from my cluster nodes to Kafka. The Kafka doc at
 https://kafka.apache.org/documentation.html#introduction says each
 message published to a topic is delivered to one consumer instance
 within each subscribing consumer group and If all the consumer
 instances have the same consumer group, then this works just like a
 traditional queue balancing load over the consumers.

 The Kafka docs *also* say: Note however that there cannot be more
 consumer instances than partitions. This seems to imply that with
 only one partition, increasing the number in my Map should have no
 effect.

 However, if I increase the number of streams for my one topic in my
 `topics` Map, I actually *do* see that the task in my foreachRDD(...)
 call is now executed on multiple nodes. Maybe it's more of a Kafka
 question than a Spark one, but can anyone explain this to me? Should I
 always have more Kafka partitions than Mesos cluster nodes?

 So, assuming that changing the number in that Map is not what I want
 (although I don't know if it is), I tried to use
 .repartition(numOfClusterNodes) (which doesn't seem right if I want to
 add and remove Mesos nodes on demand). This *also* did spread the
 foreachRDD(...) action evenly – however, the function never seems to
 terminate, so I never get to process the next interval in the stream.
 A similar behavior can be observed when running locally, not on the
 cluster, then the program will not exit but instead hang after
 everything else has shut down. Any hints concerning this issue?

 Thanks
 Tobias



Re: Map with filter on JavaRdd

2014-06-27 Thread Mayur Rustagi
It happens in a single operation itself. You may write it separately but
the stages are performed together if its possible. You will see only one
task in the output of your application.

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Fri, Jun 27, 2014 at 12:12 PM, ajay garg ajay.g...@mobileum.com wrote:

 Hi All,
  Is it possible to map and filter a javardd in a single operation?

 Thanks



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Map-with-filter-on-JavaRdd-tp8401.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Google Cloud Engine adds out of the box Spark/Shark support

2014-06-26 Thread Mayur Rustagi
https://groups.google.com/forum/#!topic/gcp-hadoop-announce/EfQms8tK5cE

I suspect they are using thr own builds.. has anybody had a chance to look
at it?

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi


Re: Spark job tracker.

2014-06-26 Thread Mayur Rustagi
You can use SparkListener interface to track the tasks.. another is to use
JSON patch (https://github.com/apache/spark/pull/882)  track tasks with
json api

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Fri, Jun 27, 2014 at 2:31 AM, abhiguruvayya sharath.abhis...@gmail.com
wrote:

 I don't want to track it  on the cluster UI. Once i launch the job i would
 to
 like to print the status.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-job-tracker-tp8367p8370.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Persistent Local Node variables

2014-06-24 Thread Mayur Rustagi
Are you trying to process data as part of the same Job(till same spark
context), then all you have to do is cache the output rdd of your
processing. It'll run your processing once  cache the results for future
tasks, unless your node caching the rdd goes down.
if you are trying to retain it for quite a long time you can

   - Simplistically store it as hdfs  load it each time
   - Either store that in a table  try to pull it with sparksql every
   time(experimental).
   - Use Ooyala Jobserver to cache the data  do all processing using that.

Regards
Mayur


Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Mon, Jun 23, 2014 at 11:14 AM, Daedalus tushar.nagara...@gmail.com
wrote:

 Will using mapPartitions and creating a new RDD of ParsedData objects avoid
 multiple parsing?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Persistent-Local-Node-variables-tp8104p8107.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Kafka Streaming - Error Could not compute split

2014-06-24 Thread Mayur Rustagi
I have seen this when I prevent spilling of shuffle data on disk. Can you
change shuffle memory fraction. Is your data spilling to disk?

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Mon, Jun 23, 2014 at 12:09 PM, Kanwaldeep kanwal...@gmail.com wrote:

 We are using Spark 1.0.0 deployed on Spark Standalone cluster and I'm
 getting
 the following exception. With previous version I've seen this error occur
 along with OutOfMemory errors which I'm not seeing with Sparks 1.0.

 Any suggestions?

 Job aborted due to stage failure: Task 3748.0:20 failed 4 times, most
 recent
 failure: Exception failure in TID 225792 on host
 hslave32106.sjc9.service-now.com: java.lang.Exception: Could not compute
 split, block input-0-1403458929600 not found
 org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
 org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:77)
 org.apache.spark.rdd.RDD.iterator(RDD.scala:227)
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
 org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
 org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33)
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
 org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
 org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34)
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
 org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
 org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34)
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
 org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
 org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
 org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
 org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158)
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
 org.apache.spark.scheduler.Task.run(Task.scala:51)
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)

 java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)

 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
 java.lang.Thread.run(Thread.java:662) Driver stacktrace:



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-Streaming-Error-Could-not-compute-split-tp8112.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Problems running Spark job on mesos in fine-grained mode

2014-06-24 Thread Mayur Rustagi
Hi Sebastien,
Are you using Pyspark by any chance, is that working for you (post the
patch?)

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Mon, Jun 23, 2014 at 1:51 PM, Fedechicco fedechi...@gmail.com wrote:

 I'm getting the same behavior and it's crucial I get it fixed for an
 evaluation of Spark + Mesos within my company.

 I'm bumping +1 for the request of putting this fix in the 1.0.1 if
 possible!

 thanks,
 Federico


 2014-06-20 20:51 GMT+02:00 Sébastien Rainville 
 sebastienrainvi...@gmail.com:

 Hi,

 this is just a follow-up regarding this issue. Turns out that it's caused
 by a bug in Spark. I created a case for it:
 https://issues.apache.org/jira/browse/SPARK-2204 and submitted a patch.

 Any chance this could be included in the 1.0.1 release?

 Thanks,

 - Sebastien



 On Tue, Jun 17, 2014 at 2:57 PM, Sébastien Rainville 
 sebastienrainvi...@gmail.com wrote:

 Hi,

 I'm having trouble running spark on mesos in fine-grained mode. I'm
 running spark 1.0.0 and mesos 0.18.0. The tasks are failing randomly, which
 most of the time, but not always, cause the job to fail. The same code is
 running fine in coarse-grained mode. I see the following exceptions in the
 logs of the spark driver:

 W0617 10:57:36.774382  8735 sched.cpp:901] Attempting to launch task 21
 with an unknown offer 20140416-011500-1369465866-5050-26096-52332715
 W0617 10:57:36.774433  8735 sched.cpp:901] Attempting to launch task 22
 with an unknown offer 20140416-011500-1369465866-5050-26096-52332715
 14/06/17 10:57:36 INFO TaskSetManager: Re-queueing tasks for
 201311011608-1369465866-5050-9189-46 from TaskSet 0.0
 14/06/17 10:57:36 WARN TaskSetManager: Lost TID 22 (task 0.0:2)
 14/06/17 10:57:36 WARN TaskSetManager: Lost TID 19 (task 0.0:0)
 14/06/17 10:57:36 WARN TaskSetManager: Lost TID 21 (task 0.0:1)
 14/06/17 10:57:36 INFO DAGScheduler: Executor lost:
 201311011608-1369465866-5050-9189-46 (epoch 0)
 14/06/17 10:57:36 INFO BlockManagerMasterActor: Trying to remove
 executor 201311011608-1369465866-5050-9189-46 from BlockManagerMaster.
 14/06/17 10:57:36 INFO BlockManagerMaster: Removed
 201311011608-1369465866-5050-9189-46 successfully in removeExecutor
 14/06/17 10:57:36 DEBUG MapOutputTrackerMaster: Increasing epoch to 1
 14/06/17 10:57:36 INFO DAGScheduler: Host added was in lost list
 earlier: ca1-dcc1-0065.lab.mtl

 I don't see any exceptions in the spark executor logs. The only error
 message I found in mesos itself is warnings in the mesos master:

 W0617 10:57:36.816748 26100 master.cpp:1615] Failed to validate task 21
 : Task 21 attempted to use cpus(*):1 combined with already used cpus(*):1;
 mem(*):2048 is greater than offered mem(*):3216; disk(*):98304;
 ports(*):[11900-11919, 1192
 1-11995, 11997-11999]; cpus(*):1
 W0617 10:57:36.819807 26100 master.cpp:1615] Failed to validate task 22
 : Task 22 attempted to use cpus(*):1 combined with already used cpus(*):1;
 mem(*):2048 is greater than offered mem(*):3216; disk(*):98304;
 ports(*):[11900-11919, 1192
 1-11995, 11997-11999]; cpus(*):1
 W0617 10:57:36.932287 26102 master.cpp:1615] Failed to validate task 28
 : Task 28 attempted to use cpus(*):1 combined with already used cpus(*):1;
 mem(*):2048 is greater than offered cpus(*):1; mem(*):3216; disk(*):98304;
 ports(*):[11900-
 11960, 11962-11978, 11980-11999]
 W0617 11:05:52.783133 26098 master.cpp:2106] Ignoring unknown exited
 executor 201311011608-1369465866-5050-9189-46 on slave
 201311011608-1369465866-5050-9189-46 (ca1-dcc1-0065.lab.mtl)
 W0617 11:05:52.787739 26103 master.cpp:2106] Ignoring unknown exited
 executor 201311011608-1369465866-5050-9189-34 on slave
 201311011608-1369465866-5050-9189-34 (ca1-dcc1-0053.lab.mtl)
 W0617 11:05:52.790292 26102 master.cpp:2106] Ignoring unknown exited
 executor 201311011608-1369465866-5050-9189-59 on slave
 201311011608-1369465866-5050-9189-59 (ca1-dcc1-0079.lab.mtl)
 W0617 11:05:52.800649 26099 master.cpp:2106] Ignoring unknown exited
 executor 201311011608-1369465866-5050-9189-18 on slave
 201311011608-1369465866-5050-9189-18 (ca1-dcc1-0027.lab.mtl)
 ... (more of those Ignoring unknown exited executor)


 I analyzed the difference in between the execution of the same job in
 coarse-grained mode and fine-grained mode, and I noticed that in the
 fine-grained mode the tasks get executed on executors different than the
 ones reported in spark, as if spark and mesos get out of sync as to which
 executor is responsible for which task. See the following:


 Coarse-grained mode:

  Spark Mesos Task IndexTask ID ExecutorStatusTask ID (UI)Task Name Task
 ID (logs)ExecutorState 0066SUCCESS 4Task 40 66RUNNING1 159SUCCESS0 Task
 0159 RUNNING22 54SUCCESS10Task 10 254RUNNING 33128 SUCCESS6Task 6 3
 128RUNNING ...

 Fine-grained mode:

  Spark Mesos Task IndexTask ID ExecutorTask ID (UI)Task NameTask ID
 (logs) ExecutorState0 23108SUCCESS 23task 0.0:023 27FINISHED0 1965
 FAILED19 task 0.0:01986

Re: Serialization problem in Spark

2014-06-24 Thread Mayur Rustagi
did you try to register the class in Kryo serializer?

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Mon, Jun 23, 2014 at 7:00 PM, rrussell25 rrussel...@gmail.com wrote:

 Thanks for pointer...tried Kryo and ran into a strange error:

 org.apache.spark.SparkException: Job aborted due to stage failure:
 Exception
 while deserializing and fetching task:
 com.esotericsoftware.kryo.KryoException: Unable to find class:
 rg.apache.hadoop.hbase.io.ImmutableBytesWritable

 It is strange in that the complaint is for rg.apache...   (missing o is
 not a typo).





 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Serialization-problem-in-Spark-tp7049p8123.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Efficiently doing an analysis with Cartesian product (pyspark)

2014-06-24 Thread Mayur Rustagi
How about this..
map it to key,value pair, then reducebykey using max operation
Then in the rdd you can do join with your lookup data  reduce (if you only
wanna lookup 2 values then you canuse lookup directly as well).
PS: these are list of operations in Scala, I am not aware how far pyspark
api is in those.

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Tue, Jun 24, 2014 at 3:33 AM, Aaron aaron.doss...@target.com wrote:

 Sorry, I got my sample outputs wrong

  (1,1) - 400
 (1,2) - 500
 (2,2)- 600

 On Jun 23, 2014, at 4:29 PM, Aaron Dossett [via Apache Spark User List] 
 [hidden
 email] http://user/SendEmail.jtp?type=nodenode=8145i=0 wrote:

  I am relatively new to Spark and am getting stuck trying to do the
 following:

 - My input is integer key, value pairs where the key is not unique.  I'm
 interested in information about all possible distinct key combinations,
 thus the Cartesian product.
 - My first attempt was to create a separate RDD of this cartesian product
 and then use map() to calculate the data.  However, I was trying to pass
 another RDD to the function map was calling, which I eventually figured out
 was causing a run time error, even if the function I called with map did
 nothing.  Here's a simple code example:

 ---
 def somefunc(x, y, RDD):
   return 0

 input = sc.parallelize([(1,100), (1,200), (2, 100), (2,300)])

 #Create all pairs of keys, including self-pairs
 itemPairs = input.map(lambda x: x[0]).distinct()
 itemPairs = itemPairs.cartesian(itemPairs)

 print itemPairs.collect()

 TC = itemPairs.map(lambda x: (x, somefunc(x[0], x[1], input)))

 print TC.collect()
 --

 I'm assuming this isn't working because it isn't a very Spark-like way to
 do things and I could imagine that passing RDDs into other RDD's map
 functions might not make sense.  Could someone suggest to me a way to apply
 transformations and actions to input that would produce a mapping of key
 pairs to some information related to the values.

 For example, I might want to (1, 2) to map to the sum of the maximum
 values found for each key in the input (500 in my sample data above).
  Extending that example (1,1) would map to 300 and (2,2) to 400.

 Please let me know if I should provide more details or a more robust
 example.

 Thank you, Aaron

 --
  If you reply to this email, your message will be added to the discussion
 below:

 http://apache-spark-user-list.1001560.n3.nabble.com/Efficiently-doing-an-analysis-with-Cartesian-product-pyspark-tp8144.html
  This email was sent by Aaron Dossett
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=user_nodesuser=1353
 (via Nabble)
 To receive all replies by email, subscribe to this discussion
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=subscribe_by_codenode=8144code=YWFyb24uZG9zc2V0dEB0YXJnZXQuY29tfDgxNDR8MTM3NjcxOTg5


 --
 View this message in context: Re: Efficiently doing an analysis with
 Cartesian product (pyspark)
 http://apache-spark-user-list.1001560.n3.nabble.com/Efficiently-doing-an-analysis-with-Cartesian-product-pyspark-tp8144p8145.html

 Sent from the Apache Spark User List mailing list archive
 http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.



Re: balancing RDDs

2014-06-24 Thread Mayur Rustagi
This would be really useful. Especially for Shark where shift of
partitioning effects all subsequent queries unless task scheduling time
beats spark.locality.wait. Can cause overall low performance for all
subsequent tasks.

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Tue, Jun 24, 2014 at 4:10 AM, Sean McNamara sean.mcnam...@webtrends.com
wrote:

 We have a use case where we’d like something to execute once on each node
 and I thought it would be good to ask here.

 Currently we achieve this by setting the parallelism to the number of
 nodes and use a mod partitioner:

 val balancedRdd = sc.parallelize(
 (0 until Settings.parallelism)
 .map(id = id - Settings.settings)
   ).partitionBy(new ModPartitioner(Settings.parallelism))
   .cache()


 This works great except in two instances where it can become unbalanced:

 1. if a worker is restarted or dies, the partition will move to a
 different node (one of the nodes will run two tasks).  When the worker
 rejoins, is there a way to have a partition move back over to the newly
 restarted worker so that it’s balanced again?

 2. drivers need to be started in a staggered fashion, otherwise one driver
 can launch two tasks on one set of workers, and the other driver will do
 the same with the other set.  Are there any scheduler/config semantics so
 that each driver will take one (and only one) core from *each* node?


 Thanks

 Sean









Re: How to Reload Spark Configuration Files

2014-06-24 Thread Mayur Rustagi
Not really. You are better off using a cluster manager like Mesos or Yarn
for this.

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Tue, Jun 24, 2014 at 11:35 AM, Sirisha Devineni 
sirisha_devin...@persistent.co.in wrote:

  Hi All,



 I am working with Spark to add new slaves automatically when there is more
 data to be processed by the cluster. During this process there is question
 arisen, after adding/removing new slave node to/from the spark cluster do
 we need to restart master and other existing slaves in the cluster?



 From my observations:

 1.   If a new slave node details are added in configuration
 files(/root/spark/conf/salves) on master node , running “start-slaves.sh”
 script will add the new slave to cluster without affecting  existing slaves
 or master.

 2.   If a slave details are removed from the configuration file, one
 need to restart master using stop-master.sh and start-master.sh scripts to
 take effect.



 Is there any reload option available in Spark to load the changed
 configuration files without stopping the services. Here stopping the
 service of master or existing salves may lead to outage of services.

 You can find the options available to start/stop the services of spark at
 http://spark.apache.org/docs/latest/spark-standalone.html





 Thanks  Regards,

 Sirisha Devineni.

 DISCLAIMER == This e-mail may contain privileged and confidential
 information which is the property of Persistent Systems Ltd. It is intended
 only for the use of the individual or entity to which it is addressed. If
 you are not the intended recipient, you are not authorized to read, retain,
 copy, print, distribute or use this message. If you have received this
 communication in error, please notify the sender and delete all copies of
 this message. Persistent Systems Ltd. does not accept any liability for
 virus infected mails.



Re: Questions regarding different spark pre-built packages

2014-06-24 Thread Mayur Rustagi
HDFS driver keeps changing  breaking compatibility, hence all the build
versions. If you dont use HDFS/YARN then you can safely ignore it.

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Tue, Jun 24, 2014 at 12:16 PM, Sourav Chandra 
sourav.chan...@livestream.com wrote:

 Hi,

 I am just curious to know what are the difference between the prebuilt
 packages for Hadoop1, 2, CDH etc.

 I am using spark standalone cluster and we dont use hadoop at all.

 Can we use any one of the pre-buil;t packages OR we have to run
 make-distribution.sh script from the code?

 Thanks,
 --

 Sourav Chandra

 Senior Software Engineer

 · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

 sourav.chan...@livestream.com

 o: +91 80 4121 8723

 m: +91 988 699 3746

 skype: sourav.chandra

 Livestream

 Ajmera Summit, First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd
 Block, Koramangala Industrial Area,

 Bangalore 560034

 www.livestream.com



Re: partitions, coalesce() and parallelism

2014-06-24 Thread Mayur Rustagi
To be clear number of map tasks are determined by number of partitions
inside the rdd hence the suggestion by Nicholas.

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Wed, Jun 25, 2014 at 4:17 AM, Nicholas Chammas 
nicholas.cham...@gmail.com wrote:

 So do you get 2171 as the output for that command? That command tells you
 how many partitions your RDD has, so it’s good to first confirm that rdd1
 has as many partitions as you think it has.
 ​


 On Tue, Jun 24, 2014 at 4:22 PM, Alex Boisvert alex.boisv...@gmail.com
 wrote:

 It's actually a set of 2171 S3 files, with an average size of about 18MB.


 On Tue, Jun 24, 2014 at 1:13 PM, Nicholas Chammas 
 nicholas.cham...@gmail.com wrote:

 What do you get for rdd1._jrdd.splits().size()? You might think you’re
 getting  100 partitions, but it may not be happening.
 ​


 On Tue, Jun 24, 2014 at 3:50 PM, Alex Boisvert alex.boisv...@gmail.com
 wrote:

 With the following pseudo-code,

 val rdd1 = sc.sequenceFile(...) // has  100 partitions
 val rdd2 = rdd1.coalesce(100)
 val rdd3 = rdd2 map { ... }
 val rdd4 = rdd3.coalesce(2)
 val rdd5 = rdd4.saveAsTextFile(...) // want only two output files

 I would expect the parallelism of the map() operation to be 100
 concurrent tasks, and the parallelism of the save() operation to be 2.

 However, it appears the parallelism of the entire chain is 2 -- I only
 see two tasks created for the save() operation and those tasks appear to
 execute the map() operation as well.

 Assuming what I'm seeing is as-specified (meaning, how things are meant
 to be), what's the recommended way to force a parallelism of 100 on the
 map() operation?

 thanks!








Re: ElasticSearch enrich

2014-06-24 Thread Mayur Rustagi
Mostly ES client is not serializable for you. You can do 3 workarounds,
1. Switch to kryo serialization, register the client in kryo , might solve
your serialization issue
2. Use mappartition for all your data  initialize your client in the
mappartition code, this will create client for each partition, reduce some
parallelism  add some overhead of creation of client but prevent
serialization of esclient  transfer to workers
3. Use serializablewrapper to serialize your ESclient manually  send it
across  deserialize it manually, this may or may not work depending on
whether your class is safely serializable.

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Wed, Jun 25, 2014 at 4:12 AM, boci boci.b...@gmail.com wrote:

 Hi guys,

 I have a small question. I want to create a Worker class which using
 ElasticClient to make query to elasticsearch. (I want to enrich my data
 with geo search result).

 How can I do that? I try to create a worker instance with ES host/port
 parameter but spark throw an exceptino (my class not serializable).

 Any idea?

 Thanks
 b0c1




Re: ElasticSearch enrich

2014-06-24 Thread Mayur Rustagi
Its not used as default serializer for some issues with compatibility 
requirement to register the classes..

Which part are you getting as nonserializable... you need to serialize that
class if you are sending it to spark workers inside a map, reduce ,
mappartition or any of the operations on RDD.


Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Wed, Jun 25, 2014 at 4:52 AM, Peng Cheng pc...@uow.edu.au wrote:

 I'm afraid persisting connection across two tasks is a dangerous act as
 they
 can't be guaranteed to be executed on the same machine. Your ES server may
 think its a man-in-the-middle attack!

 I think its possible to invoke a static method that give you a connection
 in
 a local 'pool', so nothing will sneak into your closure, but its too
 complex
 and there should be a better option.

 Never use kryo before, if its that good perhaps we should use it as the
 default serializer



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8222.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Running Spark alongside Hadoop

2014-06-20 Thread Mayur Rustagi
The ideal way to do that is to use a cluster manager like Yarn  mesos. You
can control how much resources to give to which node etc.
You should be able to run both together in standalone mode however you may
experience varying latency  performance in the cluster as both MR  spark
demand resources from same machines etc.


Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Fri, Jun 20, 2014 at 3:41 PM, Sameer Tilak ssti...@live.com wrote:

 Dear Spark users,

 I have a small 4 node Hadoop cluster. Each node is a VM -- 4 virtual
 cores, 8GB memory and 500GB disk. I am currently running Hadoop on it. I
 would like to run Spark (in standalone mode) along side Hadoop on the same
 nodes. Given the configuration of my nodes, will that work? Does anyone has
 any experience in terms of stability and performance of running Spark and
 Hadoop on somewhat resource-constrained nodes.  I was looking at the Spark
 documentation and there is a way to configure memory and cores for the and
 worker nodes and memory for the master node: SPARK_WORKER_CORES,
 SPARK_WORKER_MEMORY, SPARK_DAEMON_MEMORY. Any recommendations on how to
 share resource between HAdoop and Spark?






Re: Spark and RDF

2014-06-20 Thread Mayur Rustagi
You are looking to create Shark operators for RDF? Since Shark backend is
shifting to SparkSQL it would be slightly hard but much better effort would
be to shift Gremlin to Spark (though a much beefier one :) )

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Fri, Jun 20, 2014 at 3:39 PM, andy petrella andy.petre...@gmail.com
wrote:

 For RDF, may GraphX be particularly approriated?

  aℕdy ℙetrella
 about.me/noootsab
 [image: aℕdy ℙetrella on about.me]

 http://about.me/noootsab


 On Thu, Jun 19, 2014 at 4:49 PM, Flavio Pompermaier pomperma...@okkam.it
 wrote:

 Hi guys,

 I'm analyzing the possibility to use Spark to analyze RDF files and
 define reusable Shark operators on them (custom filtering, transforming,
 aggregating, etc). Is that possible? Any hint?

 Best,
 Flavio





Re: Spark and RDF

2014-06-20 Thread Mayur Rustagi
or a seperate RDD for sparql operations ala SchemaRDD .. operators for
sparql can be defined thr.. not a bad idea :)

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Fri, Jun 20, 2014 at 3:56 PM, andy petrella andy.petre...@gmail.com
wrote:

 Maybe some SPARQL features in Shark, then ?

  aℕdy ℙetrella
 about.me/noootsab
 [image: aℕdy ℙetrella on about.me]

 http://about.me/noootsab


 On Fri, Jun 20, 2014 at 9:45 PM, Mayur Rustagi mayur.rust...@gmail.com
 wrote:

 You are looking to create Shark operators for RDF? Since Shark backend is
 shifting to SparkSQL it would be slightly hard but much better effort would
 be to shift Gremlin to Spark (though a much beefier one :) )

 Mayur Rustagi
 Ph: +1 (760) 203 3257
 http://www.sigmoidanalytics.com
 @mayur_rustagi https://twitter.com/mayur_rustagi



 On Fri, Jun 20, 2014 at 3:39 PM, andy petrella andy.petre...@gmail.com
 wrote:

 For RDF, may GraphX be particularly approriated?

  aℕdy ℙetrella
 about.me/noootsab
 [image: aℕdy ℙetrella on about.me]

 http://about.me/noootsab


 On Thu, Jun 19, 2014 at 4:49 PM, Flavio Pompermaier 
 pomperma...@okkam.it wrote:

 Hi guys,

 I'm analyzing the possibility to use Spark to analyze RDF files and
 define reusable Shark operators on them (custom filtering, transforming,
 aggregating, etc). Is that possible? Any hint?

 Best,
 Flavio







Re: Set the number/memory of workers under mesos

2014-06-20 Thread Mayur Rustagi
You should be able to configure in spark context in Spark shell.
spark.cores.max  memory.
Regards
Mayur

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Fri, Jun 20, 2014 at 4:30 PM, Shuo Xiang shuoxiang...@gmail.com wrote:

 Hi, just wondering anybody knows how to set up the number of workers (and
 the amount of memory) in mesos, while lauching spark-shell? I was trying to
 edit conf/spark-env.sh and it looks like that the environment variables are
 for YARN of standalone. Thanks!






Re: list of persisted rdds

2014-06-13 Thread Mayur Rustagi
val myRdds = sc.getPersistentRDDs
assert(myRdds.size === 1)


It'll return a map. Its pretty old 0.8.0 onwards.


Regards
Mayur


Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Fri, Jun 13, 2014 at 9:42 AM, mrm ma...@skimlinks.com wrote:

 Hi Daniel,

 Thank you for your help! This is the sort of thing I was looking for.
 However, when I type sc.getPersistentRDDs, i get the error
 AttributeError: 'SparkContext' object has no attribute
 'getPersistentRDDs'.

 I don't get any error when I type sc.defaultParallelism for example.

 I would appreciate it if you could help me with this, I have tried
 different
 ways and googling it! I suspect it might be a silly error but I can't
 figure
 it out.

 Maria



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/list-of-persisted-rdds-tp7564p7569.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Master not seeing recovered nodes(Got heartbeat from unregistered worker ....)

2014-06-13 Thread Mayur Rustagi
I have also had trouble in worker joining the working set. I have typically
moved to Mesos based setup. Frankly for high availability you are better
off using a cluster manager.

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Fri, Jun 13, 2014 at 8:57 AM, Yana Kadiyska yana.kadiy...@gmail.com
wrote:

 Hi, I see this has been asked before but has not gotten any satisfactory
 answer so I'll try again:

 (here is the original thread I found:
 http://mail-archives.apache.org/mod_mbox/spark-user/201403.mbox/%3c1394044078706-2312.p...@n3.nabble.com%3E
 )

 I have a set of workers dying and coming back again. The master prints the
 following warning:

 Got heartbeat from unregistered worker 

 What is the solution to this -- rolling the master is very undesirable to
 me as I have a Shark context sitting on top of it (it's meant to be highly
 available).

 Insights appreciated -- I don't think an executor going down is very
 unexpected but it does seem odd that it won't be able to rejoin the working
 set.

 I'm running Spark 0.9.1 on CDH





Re: multiple passes in mapPartitions

2014-06-13 Thread Mayur Rustagi
Sorry if this is a dumb question but why not several calls to
map-partitions sequentially. Are you looking to avoid function
serialization or is your function damaging partitions?

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Fri, Jun 13, 2014 at 1:30 AM, zhen z...@latrobe.edu.au wrote:

 I want to take multiple passes through my data in mapPartitions. However,
 the
 iterator only allows you to take one pass through the data. If I
 transformed
 the iterator into an array using iter.toArray, it is too slow, since it
 copies all the data into a new scala array. Also it takes twice the memory.
 Which is also bad in terms of more GC.

 Is there a faster/better way of taking multiple passes without copying all
 the data?

 Thank you,

 Zhen



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/multiple-passes-in-mapPartitions-tp7555.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: specifying fields for join()

2014-06-13 Thread Mayur Rustagi
You can resolve the columns to create keys using them.. then join. Is that
what you did?


Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Thu, Jun 12, 2014 at 9:24 PM, SK skrishna...@gmail.com wrote:

 This issue is resolved.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/specifying-fields-for-join-tp7528p7544.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Writing data to HBase using Spark

2014-06-12 Thread Mayur Rustagi
Are you able to use HadoopInputoutput reader for hbase in new hadoop Api
reader?


Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Thu, Jun 12, 2014 at 7:49 AM, gaurav.dasgupta gaurav.d...@gmail.com
wrote:

 Is there anyone else who is facing this problem of writing to HBase when
 running Spark on YARN mode or Standalone mode using this example?

 If not, then do I need to explicitly, specify something in the classpath?

 Regards,
 Gaurav


 On Wed, Jun 11, 2014 at 1:53 PM, Gaurav Dasgupta [hidden email]
 http://user/SendEmail.jtp?type=nodenode=7474i=0 wrote:

 Hi Kanwaldeep,

 I have tried your code but arrived into a problem. The code is working
 fine in *local* mode. But if I run the same code in Spark stand alone
 mode or YARN mode, then it is continuously executing, but not saving
 anything in the HBase table. I guess, it is stopping data streaming once
 the *saveToHBase* method is called for the first time.

 This is strange. I just want to know whether you have tested the code on
 all Spark execution modes?

 Thanks,
 Gaurav


 On Tue, Jun 10, 2014 at 12:20 PM, Kanwaldeep [via Apache Spark User List]
 [hidden email] http://user/SendEmail.jtp?type=nodenode=7474i=1
 wrote:

 Please see sample code attached at
 https://issues.apache.org/jira/browse/SPARK-944.


 --
  If you reply to this email, your message will be added to the
 discussion below:

 http://apache-spark-user-list.1001560.n3.nabble.com/Writing-data-to-HBase-using-Spark-tp7304p7305.html
  To start a new topic under Apache Spark User List, email [hidden email]
 http://user/SendEmail.jtp?type=nodenode=7474i=2
 To unsubscribe from Apache Spark User List, click here.
 NAML
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml




 --
 View this message in context: Re: Writing data to HBase using Spark
 http://apache-spark-user-list.1001560.n3.nabble.com/Writing-data-to-HBase-using-Spark-tp7304p7474.html
  Sent from the Apache Spark User List mailing list archive
 http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.



Re: Spark Streaming, download a s3 file to run a script shell on it

2014-06-07 Thread Mayur Rustagi
So you can run a job / spark job to get data to disk/hdfs. Then run a
dstream from a hdfs folder. As you move your files, the dstream will kick
in.
Regards
Mayur
On 6 Jun 2014 21:13, Gianluca Privitera 
gianluca.privite...@studio.unibo.it wrote:

  Where are the API for QueueStream and RddQueue?
 In my solution I cannot open a DStream with S3 location because I need to
 run a script on the file (a script that unluckily doesn't accept stdin as
 input), so I have to download it on my disk somehow than handle it from
 there before creating the stream.

 Thanks
 Gianluca

 On 06/06/2014 02:19, Mayur Rustagi wrote:

 You can look to create a Dstream directly from S3 location using file
 stream. If you want to use any specific logic you can rely on Queuestream 
 read data yourself from S3, process it  push it into RDDQueue.

  Mayur Rustagi
 Ph: +1 (760) 203 3257
 http://www.sigmoidanalytics.com
  @mayur_rustagi https://twitter.com/mayur_rustagi



 On Fri, Jun 6, 2014 at 3:00 AM, Gianluca Privitera 
 gianluca.privite...@studio.unibo.it wrote:

 Hi,
 I've got a weird question but maybe someone has already dealt with it.
 My Spark Streaming application needs to
 - download a file from a S3 bucket,
 - run a script with the file as input,
 - create a DStream from this script output.
 I've already got the second part done with the rdd.pipe() API that really
 fits my request, but I have no idea how to manage the first part.
 How can I manage to download a file and run a script on them inside a
 Spark Streaming Application?
 Should I use process() from Scala or it won't work?

 Thanks
 Gianluca






Re: Spark Streaming, download a s3 file to run a script shell on it

2014-06-07 Thread Mayur Rustagi
QueueStream example is in Spark Streaming examples:
http://www.boyunjian.com/javasrc/org.spark-project/spark-examples_2.9.3/0.7.2/_/spark/streaming/examples/QueueStream.scala


Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Sat, Jun 7, 2014 at 6:41 PM, Mayur Rustagi mayur.rust...@gmail.com
wrote:

 So you can run a job / spark job to get data to disk/hdfs. Then run a
 dstream from a hdfs folder. As you move your files, the dstream will kick
 in.
 Regards
 Mayur
 On 6 Jun 2014 21:13, Gianluca Privitera 
 gianluca.privite...@studio.unibo.it wrote:

  Where are the API for QueueStream and RddQueue?
 In my solution I cannot open a DStream with S3 location because I need to
 run a script on the file (a script that unluckily doesn't accept stdin as
 input), so I have to download it on my disk somehow than handle it from
 there before creating the stream.

 Thanks
 Gianluca

 On 06/06/2014 02:19, Mayur Rustagi wrote:

 You can look to create a Dstream directly from S3 location using file
 stream. If you want to use any specific logic you can rely on Queuestream 
 read data yourself from S3, process it  push it into RDDQueue.

  Mayur Rustagi
 Ph: +1 (760) 203 3257
 http://www.sigmoidanalytics.com
  @mayur_rustagi https://twitter.com/mayur_rustagi



 On Fri, Jun 6, 2014 at 3:00 AM, Gianluca Privitera 
 gianluca.privite...@studio.unibo.it wrote:

 Hi,
 I've got a weird question but maybe someone has already dealt with it.
 My Spark Streaming application needs to
 - download a file from a S3 bucket,
 - run a script with the file as input,
 - create a DStream from this script output.
 I've already got the second part done with the rdd.pipe() API that
 really fits my request, but I have no idea how to manage the first part.
 How can I manage to download a file and run a script on them inside a
 Spark Streaming Application?
 Should I use process() from Scala or it won't work?

 Thanks
 Gianluca






Re: Spark Streaming, download a s3 file to run a script shell on it

2014-06-06 Thread Mayur Rustagi
You can look to create a Dstream directly from S3 location using file
stream. If you want to use any specific logic you can rely on Queuestream 
read data yourself from S3, process it  push it into RDDQueue.

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Fri, Jun 6, 2014 at 3:00 AM, Gianluca Privitera 
gianluca.privite...@studio.unibo.it wrote:

 Hi,
 I've got a weird question but maybe someone has already dealt with it.
 My Spark Streaming application needs to
 - download a file from a S3 bucket,
 - run a script with the file as input,
 - create a DStream from this script output.
 I've already got the second part done with the rdd.pipe() API that really
 fits my request, but I have no idea how to manage the first part.
 How can I manage to download a file and run a script on them inside a
 Spark Streaming Application?
 Should I use process() from Scala or it won't work?

 Thanks
 Gianluca




Re: Serialization problem in Spark

2014-06-06 Thread Mayur Rustagi
Where are you getting serialization error. Its likely to be a different
problem. Which class is not getting serialized?


Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Thu, Jun 5, 2014 at 6:32 PM, Vibhor Banga vibhorba...@gmail.com wrote:

 Any inputs on this will be helpful.

 Thanks,
 -Vibhor


 On Thu, Jun 5, 2014 at 3:41 PM, Vibhor Banga vibhorba...@gmail.com
 wrote:

 Hi,

 I am trying to do something like following in Spark:

 JavaPairRDDbyte[], MyObject eventRDD = hBaseRDD.map(new
 PairFunctionTuple2ImmutableBytesWritable, Result, byte[], MyObject () {
 @Override
 public Tuple2byte[], MyObject 
 call(Tuple2ImmutableBytesWritable, Result
 immutableBytesWritableResultTuple2) throws Exception {
 return new
 Tuple2byte[], MyObject (immutableBytesWritableResultTuple2._1.get(),
 MyClass.get(immutableBytesWritableResultTuple2._2));
 }
 });

 eventRDD.foreach(new VoidFunctionTuple2byte[], Event() {
 @Override
 public void call(Tuple2byte[], Event eventTuple2) throws
 Exception {

 processForEvent(eventTuple2._2);
 }
 });


 processForEvent() function flow contains some processing and ultimately
 writing to HBase Table. But I am getting serialisation issues with Hadoop
 and HBase inbuilt classes. How do I solve this ? Does using Kyro
 Serialisation help in this case ?

 Thanks,
 -Vibhor




 --
 Vibhor Banga
 Software Development Engineer
 Flipkart Internet Pvt. Ltd., Bangalore




Re: Using mongo with PySpark

2014-06-06 Thread Mayur Rustagi
Yes initialization each turn is hard.. you seem to using python. Another
risky thing you can try is to serialize the mongoclient object using any
serializer (like kryo wrappers in python)  pass it on to mappers.. then in
each mapper you'll just have to unserialize it  use it directly... This
may or may not work for you depending on internals of Mongodb client.

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Wed, Jun 4, 2014 at 10:27 PM, Samarth Mailinglist 
mailinglistsama...@gmail.com wrote:

 Thanks a lot, sorry for the really late reply! (Didn't have my laptop)

 This is working, but it's dreadfully slow and seems to not run in
 parallel?


 On Mon, May 19, 2014 at 2:54 PM, Nick Pentreath nick.pentre...@gmail.com
 wrote:

 You need to use mapPartitions (or foreachPartition) to instantiate your
 client in each partition as it is not serializable by the pickle library.
 Something like

 def mapper(iter):
 db = MongoClient()['spark_test_db']
 *collec = db['programs']*
 *for val in iter:*
 asc = val.encode('ascii','ignore')
 json = convertToJSON(asc, indexMap)
 yield collec.insert(json)



 def convertToJSON(string, indexMap):
 values = string.strip().split(,)
 json = {}
 for i in range(len(values)):
 json[indexMap[i]] = values[i]
 return json

 *doc_ids = data.mapPartitions(mapper)*




 On Mon, May 19, 2014 at 8:00 AM, Samarth Mailinglist 
 mailinglistsama...@gmail.com wrote:

 db = MongoClient()['spark_test_db']
 *collec = db['programs']*

 def mapper(val):
 asc = val.encode('ascii','ignore')
 json = convertToJSON(asc, indexMap)
 collec.insert(json) # *this is not working*

 def convertToJSON(string, indexMap):
 values = string.strip().split(,)
  json = {}
 for i in range(len(values)):
 json[indexMap[i]] = values[i]
 return json

 *jsons = data.map(mapper)*



 *The last line does the mapping. I am very new to Spark, can you explain
 what explicit serialization, etc is in the context of spark?  The error I
 am getting:*
 *Traceback (most recent call last):  File stdin, line 1, in
 module  File /usr/local/spark-0.9.1/python/pyspark/rdd.py, line 712, in
 saveAsTextFile
 keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path)  File
 /usr/local/spark-0.9.1/python/pyspark/rdd.py, line 1178, in _jrdd
 pickled_command = CloudPickleSerializer().dumps(command)   File
 /usr/local/spark-0.9.1/python/pyspark/serializers.py, line 275, in dumps
   def dumps(self, obj): return cloudpickle.dumps(obj, 2)  File
 /usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 801, in dumps
 cp.dump(obj)  File
 /usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 140, in dump
   return pickle.Pickler.dump(self, obj)  File
 /usr/lib/python2.7/pickle.py, line 224, in dump self.save(obj)  File
 /usr/lib/python2.7/pickle.py, line 286, in savef(self, obj) # Call
 unbound method with explicit self  File /usr/lib/python2.7/pickle.py,
 line 548, in save_tuple save(element)  File
 /usr/lib/python2.7/pickle.py, line 286, in savef(self, obj) # Call
 unbound method with explicit self  File
 /usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 259, in
 save_function self.save_function_tuple(obj, [themodule])  File
 /usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 316, in
 save_function_tuplesave(closure)  File /usr/lib/python2.7/pickle.py,
 line 286, in save f(self, obj) # Call unbound method with explicit
 self  File /usr/lib/python2.7/pickle.py, line 600, in save_list
 self._batch_appends(iter(obj))  File /usr/lib/python2.7/pickle.py, line
 633, in _batch_appends save(x)  File /usr/lib/python2.7/pickle.py,
 line 286, in savef(self, obj) # Call unbound method with explicit self
 File /usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 259, in
 save_function self.save_function_tuple(obj, [themodule])  File
 /usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 316, in
 save_function_tuplesave(closure)  File /usr/lib/python2.7/pickle.py,
 line 286, in save f(self, obj) # Call unbound method with explicit
 self  File /usr/lib/python2.7/pickle.py, line 600, in save_list
 self._batch_appends(iter(obj))  File /usr/lib/python2.7/pickle.py, line
 636, in _batch_appends save(tmp[0])  File
 /usr/lib/python2.7/pickle.py, line 286, in savef(self, obj) # Call
 unbound method with explicit self  File
 /usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 254, in
 save_function self.save_function_tuple(obj, modList)  File
 /usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 314, in
 save_function_tuplesave(f_globals)  File
 /usr/lib/python2.7/pickle.py, line 286, in save f(self, obj) # Call
 unbound method with explicit self  File
 /usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 181, in
 save_dictpickle.Pickler.save_dict(self, obj)   File

Re: stage kill link is awfully close to the stage name

2014-06-06 Thread Mayur Rustagi
And then a are you sure after that :)
On 7 Jun 2014 06:59, Mikhail Strebkov streb...@gmail.com wrote:

 Nick Chammas wrote
  I think it would be better to have the kill link flush right, leaving a
  large amount of space between it the stage detail link.

 I think even better would be to have a pop-up confirmation Do you really
 want to kill this stage? :)





 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/stage-kill-link-is-awfully-close-to-the-stage-name-tp7153p7154.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Error related to serialisation in spark streaming

2014-06-04 Thread Mayur Rustagi
I had issues around embedded functions here's what I have figured. Every
inner class actually contains a field referencing the outer class. The
anonymous class actually has a this$0 field referencing the outer class,
and thus why
Spark is trying to serialize Outer class.

In the Scala API, the closure (which is really just implemented
as anonymous classes) has a field called $outer, and Spark uses a
closure cleaner that goes into the anonymous class to remove the $outer
field if it is not used in the closure itself. In Java, the compiler
generates a field called this$0, and thus the closure cleaner doesn't
find it and can't clean it properly.



Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Wed, Jun 4, 2014 at 4:18 PM, nilmish nilmish@gmail.com wrote:

 The error is resolved. I was using a comparator which was not serialised
 because of which it was throwing the error.

 I have now switched to kryo serializer as it is faster than java serialser.
 I have set the required config

 conf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer);
 conf.set(spark.kryo.registrator, MyRegistrator);

 and also in MyRegistrator class I have registered all the classes I am
 serialising.

 How can I confirm that my code is actually using kryo serialiser and not
 java serialiser now ?

 PS : It seems like my code is still not using kryo serialiser.




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Error-related-to-serialisation-in-spark-streaming-tp6801p6904.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Error related to serialisation in spark streaming

2014-06-03 Thread Mayur Rustagi
So are you using Java 7 or 8.
7 doesnt clean closures properly. So you need to define a static class as a
function  then call that in your operations. Else it'll try to send the
whole class along with the function.

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Tue, Jun 3, 2014 at 7:19 PM, Sean Owen so...@cloudera.com wrote:

 Sorry if I'm dense but is OptimisingSort your class? it's saying you
 have included something from it in a function that is shipped off to
 remote workers but something in it is not java.io.Serializable.
 OptimisingSort$6$1 needs to be Serializable.

 On Tue, Jun 3, 2014 at 2:23 PM, nilmish nilmish@gmail.com wrote:
  I am using the following code segment :
 
  countPerWindow.foreachRDD(new FunctionJavaPairRDDlt;String, Long,
 Void()
  {
  @Override
  public Void call(JavaPairRDDString, Long rdd) throws
 Exception
  {
 
  ComparatorTuple2lt;String,Long comp = new
  ComparatorTuple2lt;String,Long ()
  {
 
  public int compare(Tuple2String,Long tupleA,
 Tuple2String,Long tupleB)
  {
  return 1-tupleA._2.compareTo(tupleB._2);
  }
 
  };
 
 Listscala.Tuple2lt;String,Long top = rdd.top(5,comp);
 //
  creating error
 
 System.out.println(Top 5 are : );
  for(int i=0;itop.size();++i)
  {
  System.out.println(top.get(i)._2 +   +
 top.get(i)._1);
  }
  return null;
  }
  });
  }
 
 
 
 
  I am getting the following error related to serialisation  :
 
  org.apache.spark.SparkException: Job aborted: Task not serializable:
  java.io.NotSerializableException
 
  Detailed Error :
 
   INFO  org.apache.spark.scheduler.DAGScheduler - Failed to run top at
  OptimisingSort.java:173
  2014-06-03 13:10:57,180 [spark-akka.actor.default-dispatcher-14] ERROR
  org.apache.spark.streaming.scheduler.JobScheduler - Error running job
  streaming job 1401801057000 ms.2
  org.apache.spark.SparkException: Job aborted: Task not serializable:
  java.io.NotSerializableException: OptimisingSort$6$1
  at
 
 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
  at
 
 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
  at
 
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at
 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
  at
  org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
  at
  org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:794)
  at
  org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:737)
 
  How can I remove this error ?
 
 
 
 
 
  --
  View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Error-related-to-serialisation-in-spark-streaming-tp6801.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Reg: Add/Remove slave nodes spark-ec2

2014-06-03 Thread Mayur Rustagi
You'll have to restart the cluster.. create copy of your existing slave..
add it to slave files in master  restart the cluster

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Tue, Jun 3, 2014 at 4:30 PM, Sirisha Devineni 
sirisha_devin...@persistent.co.in wrote:

  Hi All,



 I have created a spark cluster on EC2 using spark-ec2 script. Whenever
 more data is there to be processed I would like to add new slaves to
 existing cluster and would like to remove slave node when the data to be
 processed is low.



 It seems currently spark-ec2 doesn’t have option to add/remove slaves to
 existing cluster. Could you please suggest how can we achieve this?



 *One liner problem statement*: How to add/remove slaves to/from Spark
 cluster on EC2 using spark-ec2?



 Suggestions on when to add/remove slaves is much appreciated.



 Thanks  Regards,

 Sirisha Devineni

 DISCLAIMER == This e-mail may contain privileged and confidential
 information which is the property of Persistent Systems Ltd. It is intended
 only for the use of the individual or entity to which it is addressed. If
 you are not the intended recipient, you are not authorized to read, retain,
 copy, print, distribute or use this message. If you have received this
 communication in error, please notify the sender and delete all copies of
 this message. Persistent Systems Ltd. does not accept any liability for
 virus infected mails.



Re: WebUI's Application count doesn't get updated

2014-06-03 Thread Mayur Rustagi
Did you use docker or plain lxc specifically?


Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Tue, Jun 3, 2014 at 1:40 PM, MrAsanjar . afsan...@gmail.com wrote:

 thanks guys, that fixed my problem. As you might have noticed, I am VERY
 new to spark. Building a spark cluster using LXC has been a challenge.


 On Tue, Jun 3, 2014 at 2:49 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 ​As Andrew said, your application is running on Standalone mode. You need
 to pass

 MASTER=spark://sanjar-local-machine-1:7077

 before running your sparkPi example.


 Thanks
 Best Regards


 On Tue, Jun 3, 2014 at 1:12 PM, MrAsanjar . afsan...@gmail.com wrote:

 Thanks for your reply Andrew. I am running  applications directly on the
 master node. My cluster also contain three worker nodes, all are visible
 on WebUI.
 Spark Master at spark://sanjar-local-machine-1:7077

- *URL:* spark://sanjar-local-machine-1:7077
- *Workers:* 3
- *Cores:* 24 Total, 0 Used
- *Memory:* 43.7 GB Total, 0.0 B Used
- *Applications:* 0 Running, 0 Completed
- *Drivers:* 0 Running, 0 Completed
- *Status:* ALIVE

 Workers Id AddressState CoresMemory
 worker-20140603013834-sanjar-local-machine-2-43334
 http://sanjar-local-machine-2:8081/ sanjar-local-machine-2:43334 ALIVE
 8 (0 Used)14.6 GB (0.0 B Used)
 worker-20140603015921-sanjar-local-machine-3-51926
 http://sanjar-local-machine-3:8081/ sanjar-local-machine-3:51926 ALIVE8
 (0 Used) 14.6 GB (0.0 B Used)
 worker-20140603020250-sanjar-local-machine-4-43167
 http://sanjar-local-machine-4:8081/ sanjar-local-machine-4:43167 ALIVE8
 (0 Used) 14.6 GB (0.0 B Used)
 Running Applications ID NameCores Memory per NodeSubmitted Time User
 State Duration
 Completed Applications ID NameCores Memory per NodeSubmitted Time User
 State Duration



 On Tue, Jun 3, 2014 at 2:33 AM, Andrew Ash and...@andrewash.com wrote:

 Your applications are probably not connecting to your existing cluster
 and instead running in local mode.  Are you passing the master URL to the
 SparkPi application?

 Andrew


 On Tue, Jun 3, 2014 at 12:30 AM, MrAsanjar . afsan...@gmail.com
 wrote:


- HI all,
- Application running and completed count does not get updated, it
is always zero. I have ran
- SparkPi application at least 10 times. please help
-
- *Workers:* 3
- *Cores:* 24 Total, 0 Used
- *Memory:* 43.7 GB Total, 0.0 B Used
- *Applications:* 0 Running, 0 Completed
- *Drivers:* 0 Running, 0 Completed
- *Status:* ALIVE








Re: Using Spark on Data size larger than Memory size

2014-05-31 Thread Mayur Rustagi
Clearly thr will be impact on performance but frankly depends on what you
are trying to achieve with the dataset.

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Sat, May 31, 2014 at 11:45 AM, Vibhor Banga vibhorba...@gmail.com
wrote:

 Some inputs will be really helpful.

 Thanks,
 -Vibhor


 On Fri, May 30, 2014 at 7:51 PM, Vibhor Banga vibhorba...@gmail.com
 wrote:

 Hi all,

 I am planning to use spark with HBase, where I generate RDD by reading
 data from HBase Table.

 I want to know that in the case when the size of HBase Table grows larger
 than the size of RAM available in the cluster, will the application fail,
 or will there be an impact in performance ?

 Any thoughts in this direction will be helpful and are welcome.

 Thanks,
 -Vibhor




 --
 Vibhor Banga
 Software Development Engineer
 Flipkart Internet Pvt. Ltd., Bangalore




Re: Failed to remove RDD error

2014-05-31 Thread Mayur Rustagi
You can increase your akka timeout, should give you some more life.. are
you running out of memory by any chance?


Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Sat, May 31, 2014 at 6:52 AM, Michael Chang m...@tellapart.com wrote:

 I'm running a some kafka streaming spark contexts (on 0.9.1), and they
 seem to be dying after 10 or so minutes with a lot of these errors.  I
 can't really tell what's going on here, except that maybe the driver is
 unresponsive somehow?  Has anyone seen this before?

 14/05/31 01:13:30 ERROR BlockManagerMaster: Failed to remove RDD 12635

 akka.pattern.AskTimeoutException: Timed out

 at
 akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334)

 at akka.actor.Scheduler$$anon$11.run(Scheduler.scala:118)

 at
 scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:691)

 at
 scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:688)

 at
 akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:455)

 at
 akka.actor.LightArrayRevolverScheduler$$anon$12.executeBucket$1(Scheduler.scala:407)

 at
 akka.actor.LightArrayRevolverScheduler$$anon$12.nextTick(Scheduler.scala:411)

 at
 akka.actor.LightArrayRevolverScheduler$$anon$12.run(Scheduler.scala:363)

 at java.lang.Thread.run(Thread.java:744)

 Thanks,

 Mike





Re: Reading bz2 files that do not end with .bz2

2014-05-28 Thread Mayur Rustagi
You can use Hadoop APi  provide input/output reader  hadoop configuration
file to read the data.
Regards
Mayur

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Wed, May 28, 2014 at 7:22 PM, Laurent T laurent.thou...@ldmobile.netwrote:

 Hi,

 I have a bunch of files that are bz2 compressed but do not have the
 extension .bz2
 Is there anyway to force spark to read them as bz2 files using sc.textFile
 ?

 FYI, if i add the .bz2 extension to the file it works fine but the process
 that creates those files can't do that and i'd like to find another way to
 make this work than renaming all the files before executing my Spark job.

 Thanks
 Regards
 Laurent



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Reading-bz2-files-that-do-not-end-with-bz2-tp6473.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



  1   2   >