Re: Tools to manage workflows on Spark
Sorry not really. Spork is a way to migrate your existing pig scripts to Spark or write new pig jobs then can execute on spark. For orchestration you are better off using Oozie especially if you are using other execution engines/systems besides spark. Regards, Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoid.com http://www.sigmoidanalytics.com/ @mayur_rustagi http://www.twitter.com/mayur_rustagi On Sat, Feb 28, 2015 at 6:59 PM, Qiang Cao caoqiang...@gmail.com wrote: Thanks Mayur! I'm looking for something that would allow me to easily describe and manage a workflow on Spark. A workflow in my context is a composition of Spark applications that may depend on one another based on hdfs inputs/outputs. Is Spork a good fit? The orchestration I want is on app level. On Sat, Feb 28, 2015 at 9:38 PM, Mayur Rustagi mayur.rust...@gmail.com wrote: We do maintain it but in apache repo itself. However Pig cannot do orchestration for you. I am not sure what you are looking at from Pig in this context. Regards, Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoid.com http://www.sigmoidanalytics.com/ @mayur_rustagi http://www.twitter.com/mayur_rustagi On Sat, Feb 28, 2015 at 6:36 PM, Ted Yu yuzhih...@gmail.com wrote: Here was latest modification in spork repo: Mon Dec 1 10:08:19 2014 Not sure if it is being actively maintained. On Sat, Feb 28, 2015 at 6:26 PM, Qiang Cao caoqiang...@gmail.com wrote: Thanks for the pointer, Ashish! I was also looking at Spork https://github.com/sigmoidanalytics/spork Pig-on-Spark), but wasn't sure if that's the right direction. On Sat, Feb 28, 2015 at 6:36 PM, Ashish Nigam ashnigamt...@gmail.com wrote: You have to call spark-submit from oozie. I used this link to get the idea for my implementation - http://mail-archives.apache.org/mod_mbox/oozie-user/201404.mbox/%3CCAHCsPn-0Grq1rSXrAZu35yy_i4T=fvovdox2ugpcuhkwmjp...@mail.gmail.com%3E On Feb 28, 2015, at 3:25 PM, Qiang Cao caoqiang...@gmail.com wrote: Thanks, Ashish! Is Oozie integrated with Spark? I knew it can accommodate some Hadoop jobs. On Sat, Feb 28, 2015 at 6:07 PM, Ashish Nigam ashnigamt...@gmail.com wrote: Qiang, Did you look at Oozie? We use oozie to run spark jobs in production. On Feb 28, 2015, at 2:45 PM, Qiang Cao caoqiang...@gmail.com wrote: Hi Everyone, We need to deal with workflows on Spark. In our scenario, each workflow consists of multiple processing steps. Among different steps, there could be dependencies. I'm wondering if there are tools available that can help us schedule and manage workflows on Spark. I'm looking for something like pig on Hadoop, but it should fully function on Spark. Any suggestion? Thanks in advance! Qiang
Re: Tools to manage workflows on Spark
We do maintain it but in apache repo itself. However Pig cannot do orchestration for you. I am not sure what you are looking at from Pig in this context. Regards, Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoid.com http://www.sigmoidanalytics.com/ @mayur_rustagi http://www.twitter.com/mayur_rustagi On Sat, Feb 28, 2015 at 6:36 PM, Ted Yu yuzhih...@gmail.com wrote: Here was latest modification in spork repo: Mon Dec 1 10:08:19 2014 Not sure if it is being actively maintained. On Sat, Feb 28, 2015 at 6:26 PM, Qiang Cao caoqiang...@gmail.com wrote: Thanks for the pointer, Ashish! I was also looking at Spork https://github.com/sigmoidanalytics/spork Pig-on-Spark), but wasn't sure if that's the right direction. On Sat, Feb 28, 2015 at 6:36 PM, Ashish Nigam ashnigamt...@gmail.com wrote: You have to call spark-submit from oozie. I used this link to get the idea for my implementation - http://mail-archives.apache.org/mod_mbox/oozie-user/201404.mbox/%3CCAHCsPn-0Grq1rSXrAZu35yy_i4T=fvovdox2ugpcuhkwmjp...@mail.gmail.com%3E On Feb 28, 2015, at 3:25 PM, Qiang Cao caoqiang...@gmail.com wrote: Thanks, Ashish! Is Oozie integrated with Spark? I knew it can accommodate some Hadoop jobs. On Sat, Feb 28, 2015 at 6:07 PM, Ashish Nigam ashnigamt...@gmail.com wrote: Qiang, Did you look at Oozie? We use oozie to run spark jobs in production. On Feb 28, 2015, at 2:45 PM, Qiang Cao caoqiang...@gmail.com wrote: Hi Everyone, We need to deal with workflows on Spark. In our scenario, each workflow consists of multiple processing steps. Among different steps, there could be dependencies. I'm wondering if there are tools available that can help us schedule and manage workflows on Spark. I'm looking for something like pig on Hadoop, but it should fully function on Spark. Any suggestion? Thanks in advance! Qiang
Re: Can spark job server be used to visualize streaming data?
Frankly no good/standard way to visualize streaming data. So far I have found HBase as good intermediate store to store data from streams visualize it by a play based framework d3.js. Regards Mayur On Fri Feb 13 2015 at 4:22:58 PM Kevin (Sangwoo) Kim kevin...@apache.org wrote: I'm not very sure for CDH 5.3, but now Zeppelin works for Spark 1.2 as spark-repl has been published in Spark 1.2.1 Please try again! On Fri Feb 13 2015 at 3:55:19 PM Su She suhsheka...@gmail.com wrote: Thanks Kevin for the link, I have had issues trying to install zeppelin as I believe it is not yet supported for CDH 5.3, and Spark 1.2. Please correct me if I am mistaken. On Thu, Feb 12, 2015 at 7:33 PM, Kevin (Sangwoo) Kim kevin...@apache.org wrote: Apache Zeppelin also has a scheduler and then you can reload your chart periodically, Check it out: http://zeppelin.incubator.apache.org/docs/tutorial/tutorial.html On Fri Feb 13 2015 at 7:29:00 AM Silvio Fiorito silvio.fior...@granturing.com wrote: One method I’ve used is to publish each batch to a message bus or queue with a custom UI listening on the other end, displaying the results in d3.js or some other app. As far as I’m aware there isn’t a tool that will directly take a DStream. Spark Notebook seems to have some support for updating graphs periodically. I haven’t used it myself yet so not sure how well it works. See here: https://github.com/andypetrella/spark-notebook From: Su She Date: Thursday, February 12, 2015 at 1:55 AM To: Felix C Cc: Kelvin Chu, user@spark.apache.org Subject: Re: Can spark job server be used to visualize streaming data? Hello Felix, I am already streaming in very simple data using Kafka (few messages / second, each record only has 3 columns...really simple, but looking to scale once I connect everything). I am processing it in Spark Streaming and am currently writing word counts to hdfs. So the part where I am confused is... Kafka Publishes Data - Kafka Consumer/Spark Streaming Receives Data - Spark Word Count - *How do I visualize?* is there a viz tool that I can set up to visualize JavaPairDStreams? or do I have to write to hbase/hdfs first? Thanks! On Wed, Feb 11, 2015 at 10:39 PM, Felix C felixcheun...@hotmail.com wrote: What kind of data do you have? Kafka is a popular source to use with spark streaming. But, spark streaming also support reading from a file. Its called basic source https://spark.apache.org/docs/latest/streaming-programming-guide.html#input-dstreams-and-receivers --- Original Message --- From: Su She suhsheka...@gmail.com Sent: February 11, 2015 10:23 AM To: Felix C felixcheun...@hotmail.com Cc: Kelvin Chu 2dot7kel...@gmail.com, user@spark.apache.org Subject: Re: Can spark job server be used to visualize streaming data? Thank you Felix and Kelvin. I think I'll def be using the k-means tools in mlib. It seems the best way to stream data is by storing in hbase and then using an api in my viz to extract data? Does anyone have any thoughts on this? Thanks! On Tue, Feb 10, 2015 at 11:45 PM, Felix C felixcheun...@hotmail.com wrote: Checkout https://databricks.com/blog/2015/01/28/introducing-streaming-k-means-in-spark-1-2.html In there are links to how that is done. --- Original Message --- From: Kelvin Chu 2dot7kel...@gmail.com Sent: February 10, 2015 12:48 PM To: Su She suhsheka...@gmail.com Cc: user@spark.apache.org Subject: Re: Can spark job server be used to visualize streaming data? Hi Su, Out of the box, no. But, I know people integrate it with Spark Streaming to do real-time visualization. It will take some work though. Kelvin On Mon, Feb 9, 2015 at 5:04 PM, Su She suhsheka...@gmail.com wrote: Hello Everyone, I was reading this blog post: http://homes.esat.kuleuven.be/~bioiuser/blog/a-d3-visualisation-from-spark-as-a-service/ and was wondering if this approach can be taken to visualize streaming data...not just historical data? Thank you! -Suh
Re: Modifying an RDD in forEach
You'll benefit by viewing Matei's talk in Yahoo on Spark internals and how it optimizes execution of iterative jobs. Simple answer is 1. Spark doesn't materialize RDD when you do an iteration but lazily captures the transformation functions in RDD.(only function and closure , no data operation actually happens) 2. When you finally execute and want to cause effects (save to disk , collect on master etc) it views the DAG of execution and optimizes what it can reason (eliminating intermediate states , performing multiple Transformations in one tasks, leveraging partitioning where available among others) Bottom line it doesn't matter how many RDD you have in your DAG chain as long as Spark can optimize the functions in that DAG to create minimal materialization on its way to final output. Regards Mayur On 06-Dec-2014 6:12 pm, Ron Ayoub ronalday...@live.com wrote: This is from a separate thread with a differently named title. Why can't you modify the actual contents of an RDD using forEach? It appears to be working for me. What I'm doing is changing cluster assignments and distances per data item for each iteration of the clustering algorithm. The clustering algorithm is massive and iterates thousands of times. As I understand it now, you are supposed to create new RDDs on each pass. This is a hierachical k-means that I'm doing and hence it is consist of many iterations rather than large iterations. So I understand the restriction of why operation when aggregating and reducing etc, need to be associative. However, forEach operates on a single item. So being that Spark is advertised as being great for iterative algorithms since it operates in-memory, how can it be good to create thousands upon thousands of RDDs during the course of an iterative algorithm? Does Spark have some kind of trick like reuse behind the scenes - fully persistent data objects or whatever? How can it possibly be efficient for 'iterative' algorithms when it is creating so many RDDs as opposed to one? Or is the answer that I should keep doing what I'm doing because it is working even though it is not theoretically sound and aligned with functional ideas. I personally just want it to be fast and be able to operate on up to 500 million data items.
Re: Joined RDD
First of all any action is only performed when you trigger a collect, When you trigger collect, at that point it retrieves data from disk joins the datasets together delivers it to you. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Thu, Nov 13, 2014 at 12:26 PM, ajay garg ajay.g...@mobileum.com wrote: Hi, I have two RDDs A and B which are created from reading file from HDFS. I have a third RDD C which is created by taking join of A and B. All three RDDs (A, B and C ) are not cached. Now if I perform any action on C (let say collect), action is served without reading any data from the disk. Since no data is cached in spark how is action on C is served without reading data from disk. Thanks --Ajay -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Joined-RDD-tp18820.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Communication between Driver and Executors
I wonder if SparkConf is dynamically updated on all worker nodes or only during initialization. It can be used to piggyback information. Otherwise I guess you are stuck with Broadcast. Primarily I have had these issues moving legacy MR operators to Spark where MR piggybacks on Hadoop conf pretty heavily, in spark Native application its rarely required. Do you have a usecase like that? Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Fri, Nov 14, 2014 at 10:28 AM, Tobias Pfeiffer t...@preferred.jp wrote: Hi, (this is related to my previous question about stopping the StreamingContext) is there any way to send a message from the driver to the executors? There is all this Akka machinery running, so it should be easy to have something like sendToAllExecutors(message) on the driver and handleMessage { case _ = ... } on the executors, right? Surely at least for Broadcast.unpersist() such a thing must exist, so can I use it somehow (dirty way is also ok) to send a message to my Spark nodes? Thanks Tobias
Re: flatMap followed by mapPartitions
flatmap would have to shuffle data only if output RDD is expected to be partitioned by some key. RDD[X].flatmap(X=RDD[Y]) If it has to shuffle it should be local. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Thu, Nov 13, 2014 at 7:31 AM, Debasish Das debasish.da...@gmail.com wrote: Hi, I am doing a flatMap followed by mapPartitions to do some blocked operation...flatMap is shuffling data but this shuffle is strictly shuffling to disk and not over the network right ? Thanks. Deb
Re: Using partitioning to speed up queries in Shark
- dev list + user list Shark is not officially supported anymore so you are better off moving to Spark SQL. Shark doesnt support Hive partitioning logic anyways, it has its version of partitioning on in-memory blocks but is independent of whether you partition your data in hive or not. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Fri, Nov 7, 2014 at 3:31 AM, Gordon Benjamin gordon.benjami...@gmail.com wrote: Hi All, I'm using Spark/Shark as the foundation for some reporting that I'm doing and have a customers table with approximately 3 million rows that I've cached in memory. I've also created a partitioned table that I've also cached in memory on a per day basis FROM customers_cached INSERT OVERWRITE TABLE part_customers_cached PARTITION(createday) SELECT id,email,dt_cr, to_date(dt_cr) as createday where dt_crunix_timestamp('2013-01-01 00:00:00') and dt_crunix_timestamp('2013-12-31 23:59:59'); set exec.dynamic.partition=true; set exec.dynamic.partition.mode=nonstrict; however when I run the following basic tests I get this type of performance [localhost:1] shark select count(*) from part_customers_cached where createday = '2014-08-01' and createday = '2014-12-06'; 37204 Time taken (including network latency): 3.131 seconds [localhost:1] shark SELECT count(*) from customers_cached where dt_crunix_timestamp('2013-08-01 00:00:00') and dt_crunix_timestamp('2013-12-06 23:59:59'); 37204 Time taken (including network latency): 1.538 seconds I'm running this on a cluster with one master and two slaves and was hoping that the partitioned table would be noticeably faster but it looks as though the partitioning has slowed things down... Is this the case, or is there some additional configuration that I need to do to speed things up? Best Wishes, Gordon
Re: Why RDD is not cached?
What is the partition count of the RDD, its possible that you dont have enough memory to store the whole RDD on a single machine. Can you try forcibly repartitioning the RDD then cacheing. Regards Mayur On Tue Oct 28 2014 at 1:19:09 AM shahab shahab.mok...@gmail.com wrote: I used Cache followed by a count on RDD to ensure that caching is performed. val rdd = srdd.flatMap(mapProfile_To_Sessions).cache val count = rdd.count //so at this point RDD should be cahed ? right? On Tue, Oct 28, 2014 at 8:35 AM, Sean Owen so...@cloudera.com wrote: Did you just call cache()? By itself it does nothing but once an action requires it to be computed it should become cached. On Oct 28, 2014 8:19 AM, shahab shahab.mok...@gmail.com wrote: Hi, I have a standalone spark , where the executor is set to have 6.3 G memory , as I am using two workers so in total there 12.6 G memory and 4 cores. I am trying to cache a RDD with approximate size of 3.2 G, but apparently it is not cached as neither I can see BlockManagerMasterActor: Added rdd_XX in memory nor the performance of running the tasks is improved But, why it is not cached when there is enough memory storage? I tried with smaller RDDs. 1 or 2 G and it works, at least I could see BlockManagerMasterActor: Added rdd_0_1 in memory and improvement in results. Any idea what I am missing in my settings, or... ? thanks, /Shahab
Re: input split size
Does it retain the order if its pulling from the hdfs blocks, meaning if file1 = a, b, c partition in order if I convert to 2 partition read will it map to ab, c or a, bc or it can also be a, cb ? Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Sat, Oct 18, 2014 at 9:09 AM, Ilya Ganelin ilgan...@gmail.com wrote: Also - if you're doing a text file read you can pass the number of resulting partitions as the second argument. On Oct 17, 2014 9:05 PM, Larry Liu larryli...@gmail.com wrote: Thanks, Andrew. What about reading out of local? On Fri, Oct 17, 2014 at 5:38 PM, Andrew Ash and...@andrewash.com wrote: When reading out of HDFS it's the HDFS block size. On Fri, Oct 17, 2014 at 5:27 PM, Larry Liu larryli...@gmail.com wrote: What is the default input split size? How to change it?
Re: rule engine based on spark
We are developing something similar on top of Streaming. Could you detail some rule functionality you are looking for. We are developing a dsl for data processing on top of streaming as well as static data enabled on Spark. Regards Mayur Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Wed, Oct 15, 2014 at 3:51 AM, salemi alireza.sal...@udo.edu wrote: hi, is the a rule engine based on spark? i like to allow the business user to define their rules in a language and the execution of the rules should be done in spark. Thanks, Ali -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/rule-engine-based-on-spark-tp16433.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: JavaPairDStream saveAsTextFile
Thats a cryptic way to say thr should be a Jira for it :) Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Thu, Oct 9, 2014 at 11:46 AM, Sean Owen so...@cloudera.com wrote: Yeah it's not there. I imagine it was simply never added, and that there's not a good reaosn it couldn't be. On Thu, Oct 9, 2014 at 4:53 AM, SA sadhu.a...@gmail.com wrote: HI, I am looking at the documentation for Java API for Streams. The scala library has option to save file locally, but the Java version doesnt seem to. The only option i see is saveAsHadoopFiles. Is there a reason why this option was left out from Java API? http://spark.apache.org/docs/1.0.0/api/java/index.html?org/apache/spark/streaming/dstream/DStream.html Thanks. SA - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Setup/Cleanup for RDD closures?
Current approach is to use mappartition, initialize the connection in the beginning, iterate through the data close off the connector. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Fri, Oct 3, 2014 at 10:16 AM, Stephen Boesch java...@gmail.com wrote: Consider there is some connection / external resource allocation required to be accessed/mutated by each of the rows from within a single worker thread. That connection should only be opened/closed before the first row is accessed / after the last row is completed. It is my understanding that there is work presently underway (Reynold Xin and others) on defining an external resources API to address this. What is the recommended approach in the meanwhile?
Re: Spark Streaming for time consuming job
Calling collect on anything is almost always a bad idea. The only exception is if you are looking to pass that data on to any other system never see it again :) . I would say you need to implement outlier detection on the rdd process it in spark itself rather than calling collect on it. Regards Mayur Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Tue, Sep 30, 2014 at 3:22 PM, Eko Susilo eko.harmawan.sus...@gmail.com wrote: Hi All, I have a problem that i would like to consult about spark streaming. I have a spark streaming application that parse a file (which will be growing as time passed by)This file contains several columns containing lines of numbers, these parsing is divided into windows (each 1 minute). Each column represent different entity while each row within a column represent the same entity (for example, first column represent temprature, second column represent humidty, etc, while each row represent the value of each attribute). I use PairDStream for each column. Afterwards, I need to run a time consuming algorithm (outlier detection, for now i use box plot algorithm) for each RDD of each PairDStream. To run the outlier detection, currently i am thinking about to call collect on each of the PairDStream from method forEachRDD and then i get the List of the items, and then pass the each list of items to a thread. Each thread runs the outlier detection algorithm and process the result. I run the outlier detection in separate thread in order not to put too much burden on spark streaming task. So, I would like to ask if this model has a risk? or is there any alternatives provided by the framework such that i don't have to run a separate thread for this? Thank you for your attention. -- Best Regards, Eko Susilo
Re: Processing multiple request in cluster
There are two problems you may be facing. 1. your application is taking all resources 2. inside your application task submission is not scheduling properly. for 1 you can either configure your app to take less resources or use mesos/yarn types scheduler to dynamically change or juggle resources for 2. you can use fair scheduler so that application tasks can be scheduled more fairly. Regards Mayur Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Thu, Sep 25, 2014 at 12:32 PM, Akhil Das ak...@sigmoidanalytics.com wrote: You can try spark on Mesos or Yarn since they have lot more support for scheduling and all Thanks Best Regards On Thu, Sep 25, 2014 at 4:50 AM, Subacini B subac...@gmail.com wrote: hi All, How to run concurrently multiple requests on same cluster. I have a program using *spark streaming context *which reads* streaming data* and writes it to HBase. It works fine, the problem is when multiple requests are submitted to cluster, only first request is processed as the entire cluster is used for this request. Rest of the requests are in waiting mode. i have set spark.cores.max to 2 or less, so that it can process another request,but if there is only one request cluster is not utilized properly. Is there any way, that spark cluster can process streaming request concurrently at the same time effectively utitlizing cluster, something like sharkserver Thanks Subacini
Re: Serving data
You can cache data in memory query it using Spark Job Server. Most folks dump data down to a queue/db for retrieval You can batch up data store into parquet partitions as well. query it using another SparkSQL shell, JDBC driver in SparkSQL is part 1.1 i believe. -- Regards, Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi On Fri, Sep 12, 2014 at 2:54 PM, Marius Soutier mps@gmail.com wrote: Hi there, I’m pretty new to Spark, and so far I’ve written my jobs the same way I wrote Scalding jobs - one-off, read data from HDFS, count words, write counts back to HDFS. Now I want to display these counts in a dashboard. Since Spark allows to cache RDDs in-memory and you have to explicitly terminate your app (and there’s even a new JDBC server in 1.1), I’m assuming it’s possible to keep an app running indefinitely and query an in-memory RDD from the outside (via SparkSQL for example). Is this how others are using Spark? Or are you just dumping job results into message queues or databases? Thanks - Marius - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: single worker vs multiple workers on each machine
Another aspect to keep in mind is JVM above 8-10GB starts to misbehave. Typically better to split up ~ 15GB intervals. if you are choosing machines 10GB/Core is a approx to maintain. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Fri, Sep 12, 2014 at 2:59 AM, Sean Owen so...@cloudera.com wrote: As I understand, there's generally not an advantage to running many executors per machine. Each will already use all the cores, and multiple executors just means splitting the available memory instead of having one big pool. I think there may be an argument at extremes of scale where one JVM with a huge heap might have excessive GC pauses, or too many open files, that kind of thing? On Thu, Sep 11, 2014 at 8:42 PM, Mike Sam mikesam...@gmail.com wrote: Hi There, I am new to Spark and I was wondering when you have so much memory on each machine of the cluster, is it better to run multiple workers with limited memory on each machine or is it better to run a single worker with access to the majority of the machine memory? If the answer is it depends, would you please elaborate? Thanks, Mike - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Network requirements between Driver, Master, and Slave
Driver needs a consistent connection to the master in standalone mode as whole bunch of client stuff happens on the driver. So calls like parallelize send data from driver to the master collect send data from master to the driver. If you are looking to avoid the connect you can look into embedded driver model in yarn where the driver will also run inside the cluster hence reliability connectivity is a given. -- Regards, Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi On Fri, Sep 12, 2014 at 6:46 PM, Jim Carroll jimfcarr...@gmail.com wrote: Hi Akhil, Thanks! I guess in short that means the master (or slaves?) connect back to the driver. This seems like a really odd way to work given the driver needs to already connect to the master on port 7077. I would have thought that if the driver could initiate a connection to the master, that would be all that's required. Can you describe what it is about the architecture that requires the master to connect back to the driver even when the driver initiates a connection to the master? Just curious. Thanks anyway. Jim -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Network-requirements-between-Driver-Master-and-Slave-tp13997p14086.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark caching questions
Cached RDD do not survive SparkContext deletion (they are scoped on a per sparkcontext basis). I am not sure what you mean by disk based cache eviction, if you cache more RDD than disk space the result will not be very pretty :) Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Wed, Sep 10, 2014 at 4:43 AM, Vladimir Rodionov vrodio...@splicemachine.com wrote: Hi, users 1. Disk based cache eviction policy? The same LRU? 2. What is the scope of a cached RDD? Does it survive application? What happen if I run Java app next time? Will RRD be created or read from cache? If , answer is YES, then ... 3. Is there are any way to invalidate cached RDD automatically? RDD partitions? Some API kind of : RDD.isValid()? 4. HadoopRDD InputFormat - based. Some partitions (splits) may become invalid in cache. Can we reload only those partitions? Into cache? -Vladimir
Re: Spark Streaming and database access (e.g. MySQL)
I think she is checking for blanks? But if the RDD is blank then nothing will happen, no db connections etc. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Mon, Sep 8, 2014 at 1:32 PM, Tobias Pfeiffer t...@preferred.jp wrote: Hi, On Mon, Sep 8, 2014 at 4:39 PM, Sean Owen so...@cloudera.com wrote: if (rdd.take (1).size == 1) { rdd foreachPartition { iterator = I was wondering: Since take() is an output operation, isn't it computed twice (once for the take(1), once during the iteration)? Or will only one single element be computed for take(1)? Thanks Tobias
Re: Records - Input Byte
What do you mean by control your input”, are you trying to pace your spark streaming by number of words. If so that is not supported as of now, you can only control time consume all files within that time period. -- Regards, Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi On Tue, Sep 9, 2014 at 2:24 AM, danilopds danilob...@gmail.com wrote: Hi, I was reading the paper of Spark Streaming: Discretized Streams: Fault-Tolerant Streaming Computation at Scale So, I read that performance evaluation used 100-byte input records in test Grep and WordCount. I don't have much experience and I'd like to know how can I control this value in my records (like words in an input file)? Can anyone suggest me something to start? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Records-Input-Byte-tp13733.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming and database access (e.g. MySQL)
Standard pattern is to initialize the mysql jdbc driver in your mappartition call , update database then close off the driver. Couple of gotchas 1. New driver initiated for all your partitions 2. If the effect(inserts updates) is not idempotent, so if your server crashes, Spark will replay updates to mysql may cause data corruption. Regards Mayur Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Sun, Sep 7, 2014 at 11:54 AM, jchen jc...@pivotal.io wrote: Hi, Has someone tried using Spark Streaming with MySQL (or any other database/data store)? I can write to MySQL at the beginning of the driver application. However, when I am trying to write the result of every streaming processing window to MySQL, it fails with the following error: org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: com.mysql.jdbc.JDBC4PreparedStatement I think it is because the statement object should be serializable, in order to be executed on the worker node. Has someone tried the similar cases? Example code will be very helpful. My intension is to execute INSERT/UPDATE/DELETE/SELECT statements for each sliding window. Thanks, JC -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-and-database-access-e-g-MySQL-tp13644.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Q: About scenarios where driver execution flow may block...
Statements are executed only when you try to cause some effect on the server (produce data, collect data on driver). At time of execution Spark does all the depedency resolution truncates paths that dont go anywhere as well as optimize execution pipelines. So you really dont have to worry about these. Important thing is if you are doing certain actions in your functions that are non-explicitly dependent on others then you may start seeing errors. For example you may write a file in hdfs during a map operations expect to read it another map operations, according to spark map operation is not expected to alter anything apart from the RDD it is created upon, hence spark may not realize this dependency try to parallelize the two operations, causing error . Bottom line as long as you make all your depedencies explicit in RDD, spark will take care of the magic. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Sun, Sep 7, 2014 at 12:14 AM, didata subscripti...@didata.us wrote: Hello friends: I have a theory question about call blocking in a Spark driver. Consider this (admittedly contrived =:)) snippet to illustrate this question... x = rdd01.reduceByKey() # or maybe some other 'shuffle-requiring action'. b = sc.broadcast(x. take(20)) # Or any statement that requires the previous statement to complete, cluster-wide. y = rdd02.someAction(f(b)) Would the first or second statement above block because the second (or third) statement needs to wait for the previous one to complete, cluster-wide? Maybe this isn't the best example (typed on a phone), but generally I'm trying to understand the scenario(s) where a rdd call in the driver may block because the graph indicates that the next statement is dependent on the completion of the current one, cluster-wide (noy just lazy evaluated). Thank you. :) Sincerely yours, Team Dimension Data
Re: Array and RDDs
Your question is a bit confusing.. I assume you have a RDD containing nodes some meta data (child nodes maybe) you are trying to attach another metadata to it (bye array). if its just same byte array for all nodes you can generate rdd with the count of nodes zip the two rdd together, you can also create a (node, bytearray) combo join the two rdd together. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Sat, Sep 6, 2014 at 10:51 AM, Deep Pradhan pradhandeep1...@gmail.com wrote: Hi, I have an input file which consists of stc_node dest_node I have created and RDD consisting of key-value pair where key is the node id and the values are the children of that node. Now I want to associate a byte with each node. For that I have created a byte array. Every time I print out the key-value pair in the RDD the key-value pairs do not come in the same order. Because of this I am finding it difficult to assign the byte values with each node. Can anyone help me out in this matter? I basically have the following code: val bitarray = Array.fill[Byte](number)(0) And I want to assiciate each byte in the array to a node. How should I do that? Thank You
Re: how to choose right DStream batch interval
Spark will simply have a backlog of tasks, it'll manage to process them nonetheless, though if it keeps falling behind, you may run out of memory or have unreasonable latency. For momentary spikes, Spark streaming will manage. Mostly if you are looking to do 100% processing, you'll have to go with 5 sec processing, alternative is to process data in two pipelines (.5 5 ) in two spark streaming jobs overwrite results of one with the other. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Sat, Sep 6, 2014 at 12:39 AM, qihong qc...@pivotal.io wrote: repost since original msg was marked with This post has NOT been accepted by the mailing list yet. I have some questions regarding DStream batch interval: 1. if it only take 0.5 second to process the batch 99% of time, but 1% of batches need 5 seconds to process (due to some random factor or failures), then what's the right batch interval? 5 seconds (the worst case)? 2. What will happen to DStream processing if 1 batch took longer than batch interval? Can Spark recover from that? Thanks, Qihong -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/how-to-choose-right-DStream-batch-interval-tp13578p13579.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Update on Pig on Spark initiative
Hi, We have migrated Pig functionality on top of Spark passing 100% e2e for success cases in pig test suite. That means UDF, Joins other functionality is working quite nicely. We are in the process of merging with Apache Pig trunk(something that should happen over the next 2 weeks). Meanwhile if you are interested in giving it a go, you can try it at https://github.com/sigmoidanalytics/spork This contains all the major changes but may not have all the patches required for 100% e2e, if you are trying it out let me know any issues you face Whole bunch of folks contributed on this Julien Le Dem (Twitter), Praveen R (Sigmoid Analytics), Akhil Das (Sigmoid Analytics), Bill Graham (Twitter), Dmitriy Ryaboy (Twitter), Kamal Banga (Sigmoid Analytics), Anish Haldiya (Sigmoid Analytics), Aniket Mokashi (Google), Greg Owen (DataBricks), Amit Kumar Behera (Sigmoid Analytics), Mahesh Kalakoti (Sigmoid Analytics) Not to mention Spark Pig communities. Regards Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi
Re: Spark Streaming Output to DB
I would suggest you to use JDBC connector in mappartition instead of maps as JDBC connections are costly can really impact your performance. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Tue, Aug 26, 2014 at 6:45 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Yes, you can open a jdbc connection at the beginning of the map method then close this connection at the end of map() and in between you can use this connection. Thanks Best Regards On Tue, Aug 26, 2014 at 6:12 PM, Ravi Sharma raviprincesha...@gmail.com wrote: Hello People, I'm using java spark streaming. I'm just wondering, Can I make simple jdbc connection in JavaDStream map() method? Or Do I need to create jdbc connection for each JavaPairDStream, after map task? Kindly give your thoughts. Cheers, Ravi Sharma
Re: DStream start a separate DStream
Why dont you directly use DStream created as output of windowing process? Any reason Regards Mayur Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Thu, Aug 21, 2014 at 8:38 PM, Josh J joshjd...@gmail.com wrote: Hi, I would like to have a sliding window dstream perform a streaming computation and store these results. Once these results are stored, I then would like to process the results. Though I must wait until the final computation done for all tuples in the sliding window, before I begin the new DStream. How can I accomplish this with spark? Sincerely, Josh
Re: Mapping with extra arguments
You can add that as part of your RDD, so as output of your map operation generate the input of your next map operation.. ofcourse the obscure logic of generating that data has to be map .. another way is nested def def factorial(number: Int) : Int = { def factorialWithAccumulator(accumulator: Int, number: Int) : Int = { if (number == 1) return accumulator else factorialWithAccumulator(accumulator * number, number - 1) } factorialWithAccumulator(1, number) } MyRDD.map(factorial(5)) Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Thu, Aug 21, 2014 at 12:03 PM, TJ Klein tjkl...@gmail.com wrote: Hi, I am using Spark in Python. I wonder if there is a possibility for passing extra arguments to the mapping function. In my scenario, after each map I update parameters, which I want to use in the folllowning new iteration of mapping. Any idea? Thanks in advance. -Tassilo -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Mapping-with-extra-arguments-tp12541.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: DStream cannot write to text file
provide the fullpath of where to write( like hdfs:// etc) Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Thu, Aug 21, 2014 at 8:29 AM, cuongpham92 cuongpha...@gmail.com wrote: Hi, I tried to write to text file from DStream in Spark Streaming, using DStream.saveAsTextFile(test,output), but it did not work. Any suggestions? Thanks in advance. Cuong -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/DStream-cannot-write-to-text-file-tp12525.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Accessing to elements in JavaDStream
transform your way :) MyDStream.transform(RDD = RDD.map(wordChanger)) Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Wed, Aug 20, 2014 at 1:25 PM, cuongpham92 cuongpha...@gmail.com wrote: Hi, I am a newbie to Spark Streaming, and I am quite confused about JavaDStream in SparkStreaming. In my situation, after catching a message Hello world from Kafka in JavaDStream, I want to access to JavaDStream and change this message to Hello John, but I could not figure how to do it. Any idea about this? Thanks, Cuong -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Accessing-to-elements-in-JavaDStream-tp12459.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark - reading hfds files every 5 minutes
Hi, case class Person(name: String, age: Int) val lines = ssc.textFileStream(blah blah) val sqc = new SQLContext(sc); lines.foreachRDD(rdd={ rdd.map(_.split(,)).map(p = Persons(p(0), p(1).trim.toInt)).registerAsTable(data) val teenagers = sqc.sql(SELECT * FROM data) teenagers.saveAsParquetFile(people.parquet) }) You can also try insertInto API instead of registerAsTable..but havnt used it myself.. also you need to dynamically change parquet file name for every dstream... Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Wed, Aug 20, 2014 at 1:01 AM, salemi alireza.sal...@udo.edu wrote: Thank you but how do you convert the stream to parquet file? Ali -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Re-spark-reading-hfds-files-every-5-minutes-tp12359p12401.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: DStream cannot write to text file
is your hdfs running, can spark access it? Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Thu, Aug 21, 2014 at 1:15 PM, cuongpham92 cuongpha...@gmail.com wrote: I'm sorry, I just forgot /data after hdfs://localhost:50075. When I added it, a new exception showed up: Call to localhost/127.0.0.1:50075 failed on local exception. How could I fix it? Thanks, Cuong. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/DStream-cannot-write-to-text-file-tp12525p12560.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Question regarding spark data partition and coalesce. Need info on my use case.
Quite a good question, I assume you know the size of the cluster going in, then you can essentially try to partition the data in some multiples of that use rangepartitioner to partition the data roughly equally. Dynamic partitions are created based on number of blocks on filesystem hence the task overhead of scheduling so many tasks mostly kills the performance. import org.apache.spark.RangePartitioner; var file=sc.textFile(my local path) var partitionedFile=file.map(x=(x,1)) var data= partitionedFile.partitionBy(new RangePartitioner(3, partitionedFile)) Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Sat, Aug 16, 2014 at 7:04 AM, abhiguruvayya sharath.abhis...@gmail.com wrote: My use case as mentioned below. 1. Read input data from local file system using sparkContext.textFile(input path). 2. partition the input data(80 million records) into partitions using RDD.coalesce(numberOfPArtitions) before submitting it to mapper/reducer function. Without using coalesce() or repartition() on the input data spark executes really slow and fails with out of memory exception. The issue i am facing here is in deciding the number of partitions to be applied on the input data. *The input data size varies every time and hard coding a particular value is not an option. And spark performs really well only when certain optimum partition is applied on the input data for which i have to perform lots of iteration(trial and error). Which is not an option in a production environment.* My question: Is there a thumb rule to decide the number of partitions required depending on the input data size and cluster resources available(executors,cores, etc...)? If yes please point me in that direction. Any help is much appreciated. I am using spark 1.0 on yarn. Thanks, AG -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Question-regarding-spark-data-partition-and-coalesce-Need-info-on-my-use-case-tp12214.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Shared variable in Spark Streaming
You can also use Update by key interface to store this shared variable. As for count you can use foreachRDD to run counts on RDD then store that as another RDD or put it in updatebykey Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Fri, Aug 8, 2014 at 11:46 AM, Soumitra Kumar kumar.soumi...@gmail.com wrote: Hello, I want to count the number of elements in the DStream, like RDD.count() . Since there is no such method in DStream, I thought of using DStream.count and use the accumulator. How do I do DStream.count() to count the number of elements in a DStream? How do I create a shared variable in Spark Streaming? -Soumitra.
Re: Low Performance of Shark over Spark.
Hi Vinay, First of all you should probably migrate to sparksql as shark is not actively supported anymore. The 100x benefit entails in-memory caching DAG, since you are not able to cache the performance can be quite low.. Alternatives you can explore 1. Use parquet as storage which will push down predicates smartly hence get better performance (similar to impala) 2. cache data at a partition level from Hive operate on those instead. Regards Mayur Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Fri, Aug 8, 2014 at 10:44 AM, vinay.kash...@socialinfra.net wrote: Hi Meng, I cannot use cached table in this case as the data size is quite huge. Also, as I am trying to run adhoc queries, I cannot keep the table cached. I can cache the table only when my requirement is such that, type of queries are fixed and for specific set of data. Thanks and regards Vinay Kashyap From:Xiangrui Meng men...@gmail.com Sent:vinay.kash...@socialinfra.net Cc:user@spark.apache.org Date:Thu, August 7, 2014 11:06 pm Subject:Re: Low Performance of Shark over Spark. Did you cache the table? There are couple ways of caching a table in Shark: https://github.com/amplab/shark/wiki/Shark-User-Guide On Thu, Aug 7, 2014 at 6:51 AM, vinay.kash...@socialinfra.net wrote: Dear all, I am using Spark 0.9.2 in Standalone mode. Hive and HDFS in CDH 5.1.0. 6 worker nodes each with memory 96GB and 32 cores. I am using Shark Shell to execute queries on Spark. I have a raw_table ( of size 3TB with replication 3 ) which is partitioned by year, month and day. I am running an adhoc query on one month data with some condition. For eg: CREATE TABLE temp_table AS SELECT field1,field2,field3 FROM raw_table WHERE year=2000 AND month=01 AND field10 some_value; It is claimed that the same Hive queries can run 100x faster with shark, but I don't see such a significant improvement when running the above query, I am getting almost same performance as when run in Hive which is around 45 seconds. The same query with Impala, takes very less time, almost 7 times less time than shark which is around 6 seconds. I have tried altering the below parameters for the spark jobs but did not see any difference. spark.local.dir spark.serializer spark.kryoserializer.buffer.mb spark.storage.memoryFraction spark.io.compression.codec spark.default.parallelism Any suggestions so that I can improve the performance of the query with Shark over Spark and make it comparable to Impala..?? Thanks and regards Vinay Kashyap - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Configuration setup and Connection refused
Spark is not able to communicate with your hadoop hdfs. Is your hdfs running, if so can you try to explicitly connect to it with hadoop command line tools giving full hostname port. Or test port using telnet localhost 9000 In all likelyhood either your hdfs is down, bound to wrong port/ip that spark cannot access Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Tue, Aug 5, 2014 at 11:33 PM, alamin.ishak alamin.is...@gmail.com wrote: Hi, Anyone? Any input would be much appreciated Thanks, Amin On 5 Aug 2014 00:31, Al Amin [hidden email] http://user/SendEmail.jtp?type=nodenode=11477i=0 wrote: Hi all, Any help would be much appreciated. Thanks, Al On Mon, Aug 4, 2014 at 7:09 PM, Al Amin [hidden email] http://user/SendEmail.jtp?type=nodenode=11477i=1 wrote: Hi all, I have setup 2 nodes (master and slave1) on stand alone mode. Tried running SparkPi example and its working fine. However when I move on to wordcount its giving me below error: 14/08/04 21:40:33 INFO storage.MemoryStore: ensureFreeSpace(32856) called with curMem=0, maxMem=311387750 14/08/04 21:40:33 INFO storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 32.1 KB, free 296.9 MB) 14/08/04 21:40:34 INFO ipc.Client: Retrying connect to server: master/ 10.0.1.27:9000. Already tried 0 time(s). 14/08/04 21:40:35 INFO ipc.Client: Retrying connect to server: master/ 10.0.1.27:9000. Already tried 1 time(s). 14/08/04 21:40:36 INFO ipc.Client: Retrying connect to server: master/ 10.0.1.27:9000. Already tried 2 time(s). 14/08/04 21:40:37 INFO ipc.Client: Retrying connect to server: master/ 10.0.1.27:9000. Already tried 3 time(s). 14/08/04 21:40:38 INFO ipc.Client: Retrying connect to server: master/ 10.0.1.27:9000. Already tried 4 time(s). 14/08/04 21:40:39 INFO ipc.Client: Retrying connect to server: master/ 10.0.1.27:9000. Already tried 5 time(s). 14/08/04 21:40:40 INFO ipc.Client: Retrying connect to server: master/ 10.0.1.27:9000. Already tried 6 time(s). 14/08/04 21:40:41 INFO ipc.Client: Retrying connect to server: master/ 10.0.1.27:9000. Already tried 7 time(s). 14/08/04 21:40:42 INFO ipc.Client: Retrying connect to server: master/ 10.0.1.27:9000. Already tried 8 time(s). 14/08/04 21:40:43 INFO ipc.Client: Retrying connect to server: master/ 10.0.1.27:9000. Already tried 9 time(s). Exception in thread main java.lang.RuntimeException: java.net.ConnectException: Call to master/10.0.1.27:9000 failed on connection exception: java.net.ConnectException: Connection refused 1) how to fix this issue? I have configure hostname --fqdn accordingly. 2) I could see that in my logs that my master/worker deploy configuration is -Xms512m -Xmx512m. Is there any way that I can increase it? Or 512mb is just fine? AFAIK, spark require huge memory. 3) I have a hadoop cluster and its working. Could anyone point me how to integrate Yarn with Spark? Any good tutorial would be very useful Thanks, Al -- View this message in context: Re: Configuration setup and Connection refused http://apache-spark-user-list.1001560.n3.nabble.com/Re-Configuration-setup-and-Connection-refused-tp11477.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.
Re: Configuration setup and Connection refused
Then dont specify hdfs when you read file. Also the community is quite active in response in general, just be a little patient. Also if possible look at spark training as part of spark summit 2014 vids and/or amplabs training on spark website. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Wed, Aug 6, 2014 at 1:58 AM, Al Amin alamin.is...@gmail.com wrote: Finally, someone reply. thank you, sir! But I am planning to deploy stand alone mode of Spark. I thought there is no need to use hdfs? And my spark is not being built with hadoop/yarn config. regards, Amin On Tue, Aug 5, 2014 at 10:39 PM, Mayur Rustagi mayur.rust...@gmail.com wrote: Spark is not able to communicate with your hadoop hdfs. Is your hdfs running, if so can you try to explicitly connect to it with hadoop command line tools giving full hostname port. Or test port using telnet localhost 9000 In all likelyhood either your hdfs is down, bound to wrong port/ip that spark cannot access Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Tue, Aug 5, 2014 at 11:33 PM, alamin.ishak alamin.is...@gmail.com wrote: Hi, Anyone? Any input would be much appreciated Thanks, Amin On 5 Aug 2014 00:31, Al Amin [hidden email] http://user/SendEmail.jtp?type=nodenode=11477i=0 wrote: Hi all, Any help would be much appreciated. Thanks, Al On Mon, Aug 4, 2014 at 7:09 PM, Al Amin [hidden email] http://user/SendEmail.jtp?type=nodenode=11477i=1 wrote: Hi all, I have setup 2 nodes (master and slave1) on stand alone mode. Tried running SparkPi example and its working fine. However when I move on to wordcount its giving me below error: 14/08/04 21:40:33 INFO storage.MemoryStore: ensureFreeSpace(32856) called with curMem=0, maxMem=311387750 14/08/04 21:40:33 INFO storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 32.1 KB, free 296.9 MB) 14/08/04 21:40:34 INFO ipc.Client: Retrying connect to server: master/ 10.0.1.27:9000. Already tried 0 time(s). 14/08/04 21:40:35 INFO ipc.Client: Retrying connect to server: master/ 10.0.1.27:9000. Already tried 1 time(s). 14/08/04 21:40:36 INFO ipc.Client: Retrying connect to server: master/ 10.0.1.27:9000. Already tried 2 time(s). 14/08/04 21:40:37 INFO ipc.Client: Retrying connect to server: master/ 10.0.1.27:9000. Already tried 3 time(s). 14/08/04 21:40:38 INFO ipc.Client: Retrying connect to server: master/ 10.0.1.27:9000. Already tried 4 time(s). 14/08/04 21:40:39 INFO ipc.Client: Retrying connect to server: master/ 10.0.1.27:9000. Already tried 5 time(s). 14/08/04 21:40:40 INFO ipc.Client: Retrying connect to server: master/ 10.0.1.27:9000. Already tried 6 time(s). 14/08/04 21:40:41 INFO ipc.Client: Retrying connect to server: master/ 10.0.1.27:9000. Already tried 7 time(s). 14/08/04 21:40:42 INFO ipc.Client: Retrying connect to server: master/ 10.0.1.27:9000. Already tried 8 time(s). 14/08/04 21:40:43 INFO ipc.Client: Retrying connect to server: master/ 10.0.1.27:9000. Already tried 9 time(s). Exception in thread main java.lang.RuntimeException: java.net.ConnectException: Call to master/10.0.1.27:9000 failed on connection exception: java.net.ConnectException: Connection refused 1) how to fix this issue? I have configure hostname --fqdn accordingly. 2) I could see that in my logs that my master/worker deploy configuration is -Xms512m -Xmx512m. Is there any way that I can increase it? Or 512mb is just fine? AFAIK, spark require huge memory. 3) I have a hadoop cluster and its working. Could anyone point me how to integrate Yarn with Spark? Any good tutorial would be very useful Thanks, Al -- View this message in context: Re: Configuration setup and Connection refused http://apache-spark-user-list.1001560.n3.nabble.com/Re-Configuration-setup-and-Connection-refused-tp11477.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.
Re: Installing Spark 0.9.1 on EMR Cluster
Have you tried https://aws.amazon.com/articles/Elastic-MapReduce/4926593393724923 Thr is also a 0.9.1 version they talked about in one of the meetups. Check out the s3 bucket inthe guide.. it should have a 0.9.1 version as well. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Thu, Jul 31, 2014 at 4:58 PM, nit nitinp...@gmail.com wrote: Have you tried flag --spark-version of spark-ec2 ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Installing-Spark-0-9-1-on-EMR-Cluster-tp11084p11096.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: How to read from OpenTSDB using PySpark (or Scala Spark)?
What is the usecase you are looking at? Tsdb is not designed for you to query data directly from HBase, Ideally you should use REST API if you are looking to do thin analysis. Are you looking to do whole reprocessing of TSDB ? Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Fri, Aug 1, 2014 at 2:39 PM, bumble123 tc1...@att.com wrote: Hi, I've seen many threads about reading from HBase into Spark, but none about how to read from OpenTSDB into Spark. Does anyone know anything about this? I tried looking into it, but I think OpenTSDB saves its information into HBase using hex and I'm not sure how to interpret the data. If you could show me some examples of how to extract the information from OpenTSDB, that'd be great! Thanks in advance! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-read-from-OpenTSDB-using-PySpark-or-Scala-Spark-tp11211.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: RDD to DStream
Nice question :) Ideally you should use a queuestream interface to push RDD into a queue then spark streaming can handle the rest. Though why are you looking to convert RDD to DStream, another workaround folks use is to source DStream from folders move files that they need reprocessed back into the folder, its a hack but much less headache . Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Fri, Aug 1, 2014 at 10:21 AM, Aniket Bhatnagar aniket.bhatna...@gmail.com wrote: Hi everyone I haven't been receiving replies to my queries in the distribution list. Not pissed but I am actually curious to know if my messages are actually going through or not. Can someone please confirm that my msgs are getting delivered via this distribution list? Thanks, Aniket On 1 August 2014 13:55, Aniket Bhatnagar aniket.bhatna...@gmail.com wrote: Sometimes it is useful to convert a RDD into a DStream for testing purposes (generating DStreams from historical data, etc). Is there an easy way to do this? I could come up with the following inefficient way but no sure if there is a better way to achieve this. Thoughts? class RDDExtension[T](rdd: RDD[T]) { def chunked(chunkSize: Int): RDD[Seq[T]] = { rdd.mapPartitions(partitionItr = partitionItr.grouped(chunkSize)) } def skipFirst(): RDD[T] = { rdd.zipWithIndex().filter(tuple = tuple._2 0).map(_._1) } def toStream(streamingContext: StreamingContext, chunkSize: Int, slideDurationMilli: Option[Long] = None): DStream[T] = { new InputDStream[T](streamingContext) { @volatile private var currentRDD: RDD[Seq[T]] = rdd.chunked(chunkSize) override def start(): Unit = {} override def stop(): Unit = {} override def compute(validTime: Time): Option[RDD[T]] = { val chunk = currentRDD.take(1) currentRDD = currentRDD.skipFirst() Some(rdd.sparkContext.parallelize(chunk)) } override def slideDuration = { slideDurationMilli.map(duration = new Duration(duration)). getOrElse(super.slideDuration) } } }
Re: Accumulator and Accumulable vs classic MR
Only blocker is accumulator can be only added to from slaves only read on the master. If that constraint fit you well you can fire away. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Fri, Aug 1, 2014 at 7:38 AM, Julien Naour julna...@gmail.com wrote: Hi, My question is simple: could it be some performance issue using Accumulable/Accumulator instead of method like map() reduce()... ? My use case : implementation of a clustering algorithm like k-means. At the begining I used two steps, one to asign data to cluster and another to calculate new centroids. After some research I use now an accumulable with an Array to calculate new centroid during the assigment of data. It's easier to unterstand and for the moment it gives better performance. It's probably because I used 2 steps before and now only one thanks to accumulable. So any indications against it ? Cheers, Julien
Re: How to read from OpenTSDB using PySpark (or Scala Spark)?
Http Api would be the best bet, I assume by graph you mean the charts created by tsdb frontends. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Fri, Aug 1, 2014 at 4:48 PM, bumble123 tc1...@att.com wrote: I'm trying to get metrics out of TSDB so I can use Spark to do anomaly detection on graphs. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-read-from-OpenTSDB-using-PySpark-or-Scala-Spark-tp11211p11232.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: The function of ClosureCleaner.clean
I am not sure specifically about specific purpose of this function but Spark needs to remove elements from the closure that may be included by default but not really needed so as to serialize it send it to executors to operate on RDD. For example a function in Map function of RDD may reference objects inside the class, so you may want to send across those objects but not the whole parent class. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Mon, Jul 28, 2014 at 8:28 PM, Wang, Jensen jensen.w...@sap.com wrote: Hi, All Before sc.runJob invokes dagScheduler.runJob, the func performed on the rdd is “cleaned” by ClosureCleaner.clearn. Why spark has to do this? What’s the purpose?
Spark as a application library vs infra
Based on some discussions with my application users, I have been trying to come up with a standard way to deploy applications built on Spark 1. Bundle the version of spark with your application and ask users store it in hdfs before referring it in yarn to boot your application 2. Provide ways to manage dependency in your app across various versions of spark bundled in with Hadoop distributions 1 provides greater control and reliability as I am only working against yarn versions and dependencies, I assume 2 gives me some benefits of distribution versions of spark (easier management, common sysops tools ?? ) . I was wondering if anyone has thoughts around both and any reasons to prefer one over the other. Sent from my iPad
Re: persistent HDFS instance for cluster restarts/destroys
Yes you lose the data You can add machines but will require you to restart the cluster. Also adding is manual on you add nodes Regards Mayur On Wednesday, July 23, 2014, durga durgak...@gmail.com wrote: Hi All, I have a question, For my company , we are planning to use spark-ec2 scripts to create cluster for us. I understand that , persistent HDFS will make the hdfs available for cluster restarts. Question is: 1) What happens , If I destroy and re-create , do I loose the data. a) If I loose the data , is there only way is to copy to s3 and recopy after launching the cluster(it seems costly data transfer from and to s3?) 2) How would I add/remove some machines in the cluster?. I mean I am asking for cluster management. Is there any place amazon allows to see the machines , and do the operation of adding and removing? Thanks, D. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/persistent-HDFS-instance-for-cluster-restarts-destroys-tp10551.html Sent from the Apache Spark User List mailing list archive at Nabble.com. -- Sent from Gmail Mobile
Re: What if there are large, read-only variables shared by all map functions?
Have a look at broadcast variables . On Tuesday, July 22, 2014, Parthus peng.wei@gmail.com wrote: Hi there, I was wondering if anybody could help me find an efficient way to make a MapReduce program like this: 1) For each map function, it need access some huge files, which is around 6GB 2) These files are READ-ONLY. Actually they are like some huge look-up table, which will not change during 2~3 years. I tried two ways to make the program work, but neither of them is efficient: 1) The first approach I tried is to let each map function load those files independently, like this: map (...) { load(files); DoMapTask(...)} 2) The second approach I tried is to load the files before RDD.map(...) and broadcast the files. However, because the files are too large, the broadcasting overhead is 30min ~ 1 hour. Could anybody help me find an efficient way to solve it? Thanks very much. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/What-if-there-are-large-read-only-variables-shared-by-all-map-functions-tp10435.html Sent from the Apache Spark User List mailing list archive at Nabble.com. -- Sent from Gmail Mobile
Re: Filtering data during the read
Hi, Spark does that out of the box for you :) It compresses down the execution steps as much as possible. Regards Mayur Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Wed, Jul 9, 2014 at 3:15 PM, Konstantin Kudryavtsev kudryavtsev.konstan...@gmail.com wrote: Hi all, I wondered if you could help me to clarify the next situation: in the classic example val file = spark.textFile(hdfs://...) val errors = file.filter(line = line.contains(ERROR)) As I understand, the data is read in memory in first, and after that filtering is applying. Is it any way to apply filtering during the read step? and don't put all objects into memory? Thank you, Konstantin Kudryavtsev
Re: Spark job tracker.
val sem = 0 sc.addSparkListener(new SparkListener { override def onTaskStart(taskStart: SparkListenerTaskStart) { sem +=1 } }) sc = spark context Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Wed, Jul 9, 2014 at 4:34 AM, abhiguruvayya sharath.abhis...@gmail.com wrote: Hello Mayur, How can I implement these methods mentioned below. Do u you have any clue on this pls et me know. public void onJobStart(SparkListenerJobStart arg0) { } @Override public void onStageCompleted(SparkListenerStageCompleted arg0) { } @Override public void onStageSubmitted(SparkListenerStageSubmitted arg0) { } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-job-tracker-tp8367p9104.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Pig 0.13, Spark, Spork
Hi, We have fixed many major issues around Spork deploying it with some customers. Would be happy to provide a working version to you to try out. We are looking for more folks to try it out submit bugs. Regards Mayur Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Mon, Jul 7, 2014 at 8:21 PM, Bertrand Dechoux decho...@gmail.com wrote: Hi, I was wondering what was the state of the Pig+Spark initiative now that the execution engine of Pig is pluggable? Granted, it was done in order to use Tez but could it be used by Spark? I know about a 'theoretical' project called Spork but I don't know any stable and maintained version of it. Regards Bertrand Dechoux
Re: Pig 0.13, Spark, Spork
That version is old :). We are not forking pig but cleanly separating out pig execution engine. Let me know if you are willing to give it a go. Also would love to know what features of pig you are using ? Regards Mayur Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Mon, Jul 7, 2014 at 8:46 PM, Bertrand Dechoux decho...@gmail.com wrote: I saw a wiki page from your company but with an old version of Spark. http://docs.sigmoidanalytics.com/index.php/Setting_up_spork_with_spark_0.8.1 I have no reason to use it yet but I am interested in the state of the initiative. What's your point of view (personal and/or professional) about the Pig 0.13 release? Is the pluggable execution engine flexible enough in order to avoid having Spork as a fork of Pig? Pig + Spark + Fork = Spork :D As a (for now) external observer, I am glad to see competition in that space. It can only be good for the community in the end. Bertrand Dechoux On Mon, Jul 7, 2014 at 5:00 PM, Mayur Rustagi mayur.rust...@gmail.com wrote: Hi, We have fixed many major issues around Spork deploying it with some customers. Would be happy to provide a working version to you to try out. We are looking for more folks to try it out submit bugs. Regards Mayur Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Mon, Jul 7, 2014 at 8:21 PM, Bertrand Dechoux decho...@gmail.com wrote: Hi, I was wondering what was the state of the Pig+Spark initiative now that the execution engine of Pig is pluggable? Granted, it was done in order to use Tez but could it be used by Spark? I know about a 'theoretical' project called Spork but I don't know any stable and maintained version of it. Regards Bertrand Dechoux
Re: Is the order of messages guaranteed in a DStream?
If you receive data through multiple receivers across the cluster. I don't think any order can be guaranteed. Order in distributed systems is tough. On Tuesday, July 8, 2014, Yan Fang yanfang...@gmail.com wrote: I know the order of processing DStream is guaranteed. Wondering if the order of messages in one DStream is guaranteed. My gut feeling is yes for the question because RDD is immutable. Some simple tests prove this. Want to hear from authority to persuade myself. Thank you. Best, Fang, Yan yanfang...@gmail.com javascript:_e(%7B%7D,'cvml','yanfang...@gmail.com'); +1 (206) 849-4108 -- Sent from Gmail Mobile
Re: window analysis with Spark and Spark streaming
Key idea is to simulate your app time as you enter data . So you can connect spark streaming to a queue and insert data in it spaced by time. Easier said than done :). What are the parallelism issues you are hitting with your static approach. On Friday, July 4, 2014, alessandro finamore alessandro.finam...@polito.it wrote: Thanks for the replies What is not completely clear to me is how time is managed. I can create a DStream from file. But if I set the window property that will be bounded to the application time, right? If I got it right, with a receiver I can control the way DStream are created. But, how can apply then the windowing already shipped with the framework if this is bounded to the application time? I would like to do define a window of N files but the window() function requires a duration as input... -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/window-analysis-with-Spark-and-Spark-streaming-tp8806p8824.html Sent from the Apache Spark User List mailing list archive at Nabble.com. -- Sent from Gmail Mobile
Re: Spark memory optimization
I would go with Spark only if you are certain that you are going to scale out in the near future. You can change the default storage of RDD to DISK_ONLY, that might remove issues around any rdd leveraging memory. Thr are some functions particularly sortbykey that require data to fit in memory to work, so you may be hitting some of those walls too. Regards Mayur Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Fri, Jul 4, 2014 at 2:36 PM, Igor Pernek i...@pernek.net wrote: Hi all! I have a folder with 150 G of txt files (around 700 files, on average each 200 MB). I'm using scala to process the files and calculate some aggregate statistics in the end. I see two possible approaches to do that: - manually loop through all the files, do the calculations per file and merge the results in the end - read the whole folder to one RDD, do all the operations on this single RDD and let spark do all the parallelization I'm leaning towards the second approach as it seems cleaner (no need for parallelization specific code), but I'm wondering if my scenario will fit the constraints imposed by my hardware and data. I have one workstation with 16 threads and 64 GB of RAM available (so the parallelization will be strictly local between different processor cores). I might scale the infrastructure with more machines later on, but for now I would just like to focus on tunning the settings for this one workstation scenario. The code I'm using: - reads TSV files, and extracts meaningful data to (String, String, String) triplets - afterwards some filtering, mapping and grouping is performed - finally, the data is reduced and some aggregates are calculated I've been able to run this code with a single file (~200 MB of data), however I get a java.lang.OutOfMemoryError: GC overhead limit exceeded and/or a Java out of heap exception when adding more data (the application breaks with 6GB of data but I would like to use it with 150 GB of data). I guess I would have to tune some parameters to make this work. I would appreciate any tips on how to approach this problem (how to debug for memory demands). I've tried increasing the 'spark.executor.memory' and using a smaller number of cores (the rational being that each core needs some heap space), but this didn't solve my problems. I don't need the solution to be very fast (it can easily run for a few hours even days if needed). I'm also not caching any data, but just saving them to the file system in the end. If you think it would be more feasible to just go with the manual parallelization approach, I could do that as well. Thanks, Igor
Re: LIMIT with offset in SQL queries
What I typically do is use row_number subquery to filter based on that. It works out pretty well, reduces the iteration. I think a offset solution based on windowsing directly would be useful. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Fri, Jul 4, 2014 at 2:00 AM, Michael Armbrust mich...@databricks.com wrote: Doing an offset is actually pretty expensive in a distributed query engine, so in many cases it probably makes sense to just collect and then perform the offset as you are doing now. This is unless the offset is very large. Another limitation here is that HiveQL does not support OFFSET. That said if you want to open a JIRA we would consider implementing it. On Wed, Jul 2, 2014 at 1:37 PM, durin m...@simon-schaefer.net wrote: Hi, in many SQL-DBMS like MySQL, you can set an offset for the LIMIT clause, s.t. /LIMIT 5, 10/ will return 10 rows, starting from row 5. As far as I can see, this is not possible in Spark-SQL. The best solution I have to imitate that (using Scala) is converting the RDD into an Array via collect() and then using a for-loop to return certain elements from that Array. Is there a better solution regarding performance and are there plans to implement an offset for LIMIT? Kind regards, Simon -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/LIMIT-with-offset-in-SQL-queries-tp8673.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Visualize task distribution in cluster
You'll get most of that information from mesos interface. You may not get transfer of data information particularly. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Thu, Jul 3, 2014 at 6:28 AM, Tobias Pfeiffer t...@preferred.jp wrote: Hi, I am using Mesos to run my Spark tasks. I would be interested to see how Spark distributes the tasks in the cluster (nodes, partitions) and which nodes are more or less active and do what kind of tasks, and how long the transfer of data and jobs takes. Is there any way to get this information from Spark? Thanks Tobias
Re: Spark job tracker.
The application server doesnt provide json api unlike the cluster interface(8080). If you are okay to patch spark, you can use our patch to create json API, or you can use sparklistener interface in your application to get that info out. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Thu, Jul 3, 2014 at 6:05 AM, abhiguruvayya sharath.abhis...@gmail.com wrote: Spark displays job status information on port 4040 using JobProgressListener, any one knows how to hook into this port and read the details? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-job-tracker-tp8367p8697.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Error: UnionPartition cannot be cast to org.apache.spark.rdd.HadoopPartition
Ideally you should be converting RDD to schemardd ? You are creating UnionRDD to join across dstream rdd? Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Tue, Jul 1, 2014 at 3:11 PM, Honey Joshi honeyjo...@ideata-analytics.com wrote: Hi, I am trying to run a project which takes data as a DStream and dumps the data in the Shark table after various operations. I am getting the following error : Exception in thread main org.apache.spark.SparkException: Job aborted: Task 0.0:0 failed 1 times (most recent failure: Exception failure: java.lang.ClassCastException: org.apache.spark.rdd.UnionPartition cannot be cast to org.apache.spark.rdd.HadoopPartition) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026) at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619) at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619) at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Can someone please explain the cause of this error, I am also using a Spark Context with the existing Streaming Context.
Re: spark streaming counter metrics
You may be able to mix StreamingListener SparkListener to get meaningful information about your task. however you need to connect a lot of pieces to make sense of the flow.. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Mon, Jun 30, 2014 at 9:58 PM, Chen Song chen.song...@gmail.com wrote: I am new to spark streaming and wondering if spark streaming tracks counters (e.g., how many rows in each consumer, how many rows routed to an individual reduce task, etc.) in any form so I can get an idea of how data is skewed? I checked spark job page but don't seem to find any. -- Chen Song
Re: Callbacks on freeing up of RDDs
A lot of RDD that you create in Code may not even be constructed as the tasks layer is optimized in the DAG scheduler.. The closest is onUnpersistRDD in SparkListner. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Mon, Jun 30, 2014 at 4:48 PM, Jaideep Dhok jaideep.d...@inmobi.com wrote: Hi all, I am trying to create a custom RDD class for result set of queries supported in InMobi Grill (http://inmobi.github.io/grill/) Each result set has a schema (similar to Hive's TableSchema) and a path in HDFS containing the result set data. An easy way of doing this would be to create a temp table in Hive, and use HCatInputFormat to create an RDD using the newAPIHadoopRDD call. I've already done this and it works. However, I also want to *delete* the temp table when the RDD is unpersisted, or when the SparkContext is gone. How could I do that in Spark? Does Spark allow users to register code to be executed when an RDD is freed? Something like the OutputCommitter in Hadoop? Thanks, Jaideep _ The information contained in this communication is intended solely for the use of the individual or entity to whom it is addressed and others authorized to receive it. It may contain confidential or legally privileged information. If you are not the intended recipient you are hereby notified that any disclosure, copying, distribution or taking any action in reliance on the contents of this information is strictly prohibited and may be unlawful. If you have received this communication in error, please notify us immediately by responding to this email and then delete it from your system. The firm is neither liable for the proper and complete transmission of the information contained in this communication nor for any delay in its receipt.
Re: Error: UnionPartition cannot be cast to org.apache.spark.rdd.HadoopPartition
two job context cannot share data, are you collecting the data to the master then sending it to the other context? Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Wed, Jul 2, 2014 at 11:57 AM, Honey Joshi honeyjo...@ideata-analytics.com wrote: On Wed, July 2, 2014 1:11 am, Mayur Rustagi wrote: Ideally you should be converting RDD to schemardd ? You are creating UnionRDD to join across dstream rdd? Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Tue, Jul 1, 2014 at 3:11 PM, Honey Joshi honeyjo...@ideata-analytics.com wrote: Hi, I am trying to run a project which takes data as a DStream and dumps the data in the Shark table after various operations. I am getting the following error : Exception in thread main org.apache.spark.SparkException: Job aborted: Task 0.0:0 failed 1 times (most recent failure: Exception failure: java.lang.ClassCastException: org.apache.spark.rdd.UnionPartition cannot be cast to org.apache.spark.rdd.HadoopPartition) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$sched uler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$sched uler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.sc ala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:102 6) at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply( DAGScheduler.scala:619) at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply( DAGScheduler.scala:619) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala :619) at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonf un$receive$1.applyOrElse(DAGScheduler.scala:207) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(Abstra ctDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.j ava:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979 ) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread .java:107) Can someone please explain the cause of this error, I am also using a Spark Context with the existing Streaming Context. I am using spark 0.9.0-Incubating, so it doesnt have anything to do with schemaRDD.This error is probably coming when I am trying to use one spark context and one shark context in the same job.Is there any way to incorporate two context in one job? Regards Honey Joshi Ideata-Analytics
Re: Serializer or Out-of-Memory issues?
Your executors are going out of memory then subsequent tasks scheduled on the scheduler are also failing, hence the lost tid(task id). Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Mon, Jun 30, 2014 at 7:47 PM, Sguj tpcome...@yahoo.com wrote: I'm trying to perform operations on a large RDD, that ends up being about 1.3 GB in memory when loaded in. It's being cached in memory during the first operation, but when another task begins that uses the RDD, I'm getting this error that says the RDD was lost: 14/06/30 09:48:17 INFO TaskSetManager: Serialized task 1.0:4 as 8245 bytes in 0 ms 14/06/30 09:48:17 WARN TaskSetManager: Lost TID 15611 (task 1.0:3) 14/06/30 09:48:17 WARN TaskSetManager: Loss was due to org.apache.spark.api.python.PythonException org.apache.spark.api.python.PythonException: Traceback (most recent call last): File /Users/me/Desktop/spark-1.0.0/python/pyspark/worker.py, line 73, in main command = pickleSer._read_with_length(infile) File /Users/me/Desktop/spark-1.0.0/python/pyspark/serializers.py, line 142, in _read_with_length length = read_int(stream) File /Users/me/Desktop/spark-1.0.0/python/pyspark/serializers.py, line 337, in read_int raise EOFError EOFError at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:115) at org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:145) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:78) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 14/06/30 09:48:18 INFO AppClient$ClientActor: Executor updated: app-20140630090515-/0 is now FAILED (Command exited with code 52) 14/06/30 09:48:18 INFO SparkDeploySchedulerBackend: Executor app-20140630090515-/0 removed: Command exited with code 52 14/06/30 09:48:18 INFO SparkDeploySchedulerBackend: Executor 0 disconnected, so removing it 14/06/30 09:48:18 ERROR TaskSchedulerImpl: Lost executor 0 on localhost: OutOfMemoryError 14/06/30 09:48:18 INFO TaskSetManager: Re-queueing tasks for 0 from TaskSet 1.0 14/06/30 09:48:18 WARN TaskSetManager: Lost TID 15610 (task 1.0:2) 14/06/30 09:48:18 WARN TaskSetManager: Lost TID 15609 (task 1.0:1) 14/06/30 09:48:18 WARN TaskSetManager: Lost TID 15612 (task 1.0:4) 14/06/30 09:48:18 WARN TaskSetManager: Lost TID 15608 (task 1.0:0) The operation it fails on is a ReduceByKey(), and the RDD before the operation is split into several thousand partitions (I'm doing term weighting that requires a different partition initially for each document), and the system has 6 GB of memory for the executor, so I'm not sure if it's actually a memory error, as is mentioned 5 lines from the end of the error. The serializer error portion is what's really confusing me, and I can't find references to this particular error with Spark anywhere. Does anyone have a clue as to what the actual error might be here, and what a possible solution would be? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Serializer-or-Out-of-Memory-issues-tp8533.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Lost TID: Loss was due to fetch failure from BlockManagerId
It could be cause you are out of memory on the worker nodes blocks are not getting registered.. A older issue with 0.6.0 was with dead nodes causing loss of task then resubmission of data in an infinite loop... It was fixed in 0.7.0 though. Are you seeing a crash log in this log.. or in the worker log @ 192.168.222.164 or any of the machines where the crash log is displayed. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Wed, Jul 2, 2014 at 7:51 AM, Yana Kadiyska yana.kadiy...@gmail.com wrote: A lot of things can get funny when you run distributed as opposed to local -- e.g. some jar not making it over. Do you see anything of interest in the log on the executor machines -- I'm guessing 192.168.222.152/192.168.222.164. From here https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala seems like the warning message is logged after the task fails -- but I wonder if you might see something more useful as to why it failed to begin with. As an example we've had cases in Hdfs where a small example would work, but on a larger example we'd hit a bad file. But the executor log is usually pretty explicit as to what happened... On Tue, Jul 1, 2014 at 8:57 PM, Mohammed Guller moham...@glassbeam.com wrote: I am running Spark 1.0 on a 4-node standalone spark cluster (1 master + 3 worker). Our app is fetching data from Cassandra and doing a basic filter, map, and countByKey on that data. I have run into a strange problem. Even if the number of rows in Cassandra is just 1M, the Spark job goes seems to go into an infinite loop and runs for hours. With a small amount of data (less than 100 rows), the job does finish, but takes almost 30-40 seconds and we frequently see the messages shown below. If we run the same application on a single node Spark (--master local[4]), then we don’t see these warnings and the task finishes in less than 6-7 seconds. Any idea what could be the cause for these problems when we run our application on a standalone 4-node spark cluster? 14/06/30 19:30:16 WARN TaskSetManager: Lost TID 25036 (task 6.0:90) 14/06/30 19:30:16 WARN TaskSetManager: Loss was due to fetch failure from BlockManagerId(2, 192.168.222.164, 57185, 0) 14/06/30 19:30:18 WARN TaskSetManager: Lost TID 25310 (task 6.1:0) 14/06/30 19:30:18 WARN TaskSetManager: Loss was due to fetch failure from BlockManagerId(2, 192.168.222.164, 57185, 0) 14/06/30 19:30:19 WARN TaskSetManager: Lost TID 25582 (task 6.2:0) 14/06/30 19:30:19 WARN TaskSetManager: Loss was due to fetch failure from BlockManagerId(2, 192.168.222.164, 57185, 0) 14/06/30 19:30:21 WARN TaskSetManager: Lost TID 25882 (task 6.3:34) 14/06/30 19:30:21 WARN TaskSetManager: Loss was due to fetch failure from BlockManagerId(0, 192.168.222.142, 39342, 0) 14/06/30 19:30:22 WARN TaskSetManager: Lost TID 26152 (task 6.4:0) 14/06/30 19:30:22 WARN TaskSetManager: Loss was due to fetch failure from BlockManagerId(0, 192.168.222.142, 39342, 0) 14/06/30 19:30:23 WARN TaskSetManager: Lost TID 26427 (task 6.5:4) 14/06/30 19:30:23 WARN TaskSetManager: Loss was due to fetch failure from BlockManagerId(2, 192.168.222.164, 57185, 0) 14/06/30 19:30:25 WARN TaskSetManager: Lost TID 26690 (task 6.6:0) 14/06/30 19:30:25 WARN TaskSetManager: Loss was due to fetch failure from BlockManagerId(2, 192.168.222.164, 57185, 0) 14/06/30 19:30:26 WARN TaskSetManager: Lost TID 26959 (task 6.7:0) 14/06/30 19:30:26 WARN TaskSetManager: Loss was due to fetch failure from BlockManagerId(2, 192.168.222.164, 57185, 0) 14/06/30 19:30:28 WARN TaskSetManager: Lost TID 27449 (task 6.8:218) 14/06/30 19:30:28 WARN TaskSetManager: Loss was due to fetch failure from BlockManagerId(2, 192.168.222.164, 57185, 0) 14/06/30 19:30:30 WARN TaskSetManager: Lost TID 27718 (task 6.9:0) 14/06/30 19:30:30 WARN TaskSetManager: Loss was due to fetch failure from BlockManagerId(2, 192.168.222.164, 57185, 0) 14/06/30 19:30:30 WARN TaskSetManager: Loss was due to fetch failure from BlockManagerId(2, 192.168.222.164, 57185, 0) 14/06/30 19:30:31 WARN TaskSetManager: Lost TID 27991 (task 6.10:1) 14/06/30 19:30:31 WARN TaskSetManager: Loss was due to fetch failure from BlockManagerId(2, 192.168.222.164, 57185, 0) 14/06/30 19:30:33 WARN TaskSetManager: Lost TID 28265 (task 6.11:0) 14/06/30 19:30:33 WARN TaskSetManager: Loss was due to fetch failure from BlockManagerId(2, 192.168.222.164, 57185, 0) 14/06/30 19:30:34 WARN TaskSetManager: Lost TID 28550 (task 6.12:0) 14/06/30 19:30:34 WARN TaskSetManager: Loss was due to fetch failure from BlockManagerId(2, 192.168.222.164, 57185, 0) 14/06/30 19:30:36 WARN TaskSetManager: Lost TID 28822 (task 6.13:0) 14/06/30 19:30:36 WARN TaskSetManager: Loss was due to fetch failure from
Re: Distribute data from Kafka evenly on cluster
how abou this? https://groups.google.com/forum/#!topic/spark-users/ntPQUZFJt4M Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Sat, Jun 28, 2014 at 10:19 AM, Tobias Pfeiffer t...@preferred.jp wrote: Hi, I have a number of questions using the Kafka receiver of Spark Streaming. Maybe someone has some more experience with that and can help me out. I have set up an environment for getting to know Spark, consisting of - a Mesos cluster with 3 only-slaves and 3 master-and-slaves, - 2 Kafka nodes, - 3 Zookeeper nodes providing service to both Kafka and Mesos. My Kafka cluster has only one topic with one partition (replicated to both nodes). When I start my Kafka receiver, it successfully connects to Kafka and does the processing, but it seems as if the (expensive) function in the final foreachRDD(...) is only executed on one node of my cluster, which is not what I had in mind when setting up the cluster ;-) So first, I was wondering about the parameter `topics: Map[String, Int]` to KafkaUtils.createStream(). Apparently it controls how many connections are made from my cluster nodes to Kafka. The Kafka doc at https://kafka.apache.org/documentation.html#introduction says each message published to a topic is delivered to one consumer instance within each subscribing consumer group and If all the consumer instances have the same consumer group, then this works just like a traditional queue balancing load over the consumers. The Kafka docs *also* say: Note however that there cannot be more consumer instances than partitions. This seems to imply that with only one partition, increasing the number in my Map should have no effect. However, if I increase the number of streams for my one topic in my `topics` Map, I actually *do* see that the task in my foreachRDD(...) call is now executed on multiple nodes. Maybe it's more of a Kafka question than a Spark one, but can anyone explain this to me? Should I always have more Kafka partitions than Mesos cluster nodes? So, assuming that changing the number in that Map is not what I want (although I don't know if it is), I tried to use .repartition(numOfClusterNodes) (which doesn't seem right if I want to add and remove Mesos nodes on demand). This *also* did spread the foreachRDD(...) action evenly – however, the function never seems to terminate, so I never get to process the next interval in the stream. A similar behavior can be observed when running locally, not on the cluster, then the program will not exit but instead hang after everything else has shut down. Any hints concerning this issue? Thanks Tobias
Re: Map with filter on JavaRdd
It happens in a single operation itself. You may write it separately but the stages are performed together if its possible. You will see only one task in the output of your application. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Fri, Jun 27, 2014 at 12:12 PM, ajay garg ajay.g...@mobileum.com wrote: Hi All, Is it possible to map and filter a javardd in a single operation? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Map-with-filter-on-JavaRdd-tp8401.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Google Cloud Engine adds out of the box Spark/Shark support
https://groups.google.com/forum/#!topic/gcp-hadoop-announce/EfQms8tK5cE I suspect they are using thr own builds.. has anybody had a chance to look at it? Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi
Re: Spark job tracker.
You can use SparkListener interface to track the tasks.. another is to use JSON patch (https://github.com/apache/spark/pull/882) track tasks with json api Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Fri, Jun 27, 2014 at 2:31 AM, abhiguruvayya sharath.abhis...@gmail.com wrote: I don't want to track it on the cluster UI. Once i launch the job i would to like to print the status. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-job-tracker-tp8367p8370.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Persistent Local Node variables
Are you trying to process data as part of the same Job(till same spark context), then all you have to do is cache the output rdd of your processing. It'll run your processing once cache the results for future tasks, unless your node caching the rdd goes down. if you are trying to retain it for quite a long time you can - Simplistically store it as hdfs load it each time - Either store that in a table try to pull it with sparksql every time(experimental). - Use Ooyala Jobserver to cache the data do all processing using that. Regards Mayur Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Mon, Jun 23, 2014 at 11:14 AM, Daedalus tushar.nagara...@gmail.com wrote: Will using mapPartitions and creating a new RDD of ParsedData objects avoid multiple parsing? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Persistent-Local-Node-variables-tp8104p8107.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Kafka Streaming - Error Could not compute split
I have seen this when I prevent spilling of shuffle data on disk. Can you change shuffle memory fraction. Is your data spilling to disk? Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Mon, Jun 23, 2014 at 12:09 PM, Kanwaldeep kanwal...@gmail.com wrote: We are using Spark 1.0.0 deployed on Spark Standalone cluster and I'm getting the following exception. With previous version I've seen this error occur along with OutOfMemory errors which I'm not seeing with Sparks 1.0. Any suggestions? Job aborted due to stage failure: Task 3748.0:20 failed 4 times, most recent failure: Exception failure in TID 225792 on host hslave32106.sjc9.service-now.com: java.lang.Exception: Could not compute split, block input-0-1403458929600 not found org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:77) org.apache.spark.rdd.RDD.iterator(RDD.scala:227) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) org.apache.spark.scheduler.Task.run(Task.scala:51) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) java.lang.Thread.run(Thread.java:662) Driver stacktrace: -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-Streaming-Error-Could-not-compute-split-tp8112.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Problems running Spark job on mesos in fine-grained mode
Hi Sebastien, Are you using Pyspark by any chance, is that working for you (post the patch?) Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Mon, Jun 23, 2014 at 1:51 PM, Fedechicco fedechi...@gmail.com wrote: I'm getting the same behavior and it's crucial I get it fixed for an evaluation of Spark + Mesos within my company. I'm bumping +1 for the request of putting this fix in the 1.0.1 if possible! thanks, Federico 2014-06-20 20:51 GMT+02:00 Sébastien Rainville sebastienrainvi...@gmail.com: Hi, this is just a follow-up regarding this issue. Turns out that it's caused by a bug in Spark. I created a case for it: https://issues.apache.org/jira/browse/SPARK-2204 and submitted a patch. Any chance this could be included in the 1.0.1 release? Thanks, - Sebastien On Tue, Jun 17, 2014 at 2:57 PM, Sébastien Rainville sebastienrainvi...@gmail.com wrote: Hi, I'm having trouble running spark on mesos in fine-grained mode. I'm running spark 1.0.0 and mesos 0.18.0. The tasks are failing randomly, which most of the time, but not always, cause the job to fail. The same code is running fine in coarse-grained mode. I see the following exceptions in the logs of the spark driver: W0617 10:57:36.774382 8735 sched.cpp:901] Attempting to launch task 21 with an unknown offer 20140416-011500-1369465866-5050-26096-52332715 W0617 10:57:36.774433 8735 sched.cpp:901] Attempting to launch task 22 with an unknown offer 20140416-011500-1369465866-5050-26096-52332715 14/06/17 10:57:36 INFO TaskSetManager: Re-queueing tasks for 201311011608-1369465866-5050-9189-46 from TaskSet 0.0 14/06/17 10:57:36 WARN TaskSetManager: Lost TID 22 (task 0.0:2) 14/06/17 10:57:36 WARN TaskSetManager: Lost TID 19 (task 0.0:0) 14/06/17 10:57:36 WARN TaskSetManager: Lost TID 21 (task 0.0:1) 14/06/17 10:57:36 INFO DAGScheduler: Executor lost: 201311011608-1369465866-5050-9189-46 (epoch 0) 14/06/17 10:57:36 INFO BlockManagerMasterActor: Trying to remove executor 201311011608-1369465866-5050-9189-46 from BlockManagerMaster. 14/06/17 10:57:36 INFO BlockManagerMaster: Removed 201311011608-1369465866-5050-9189-46 successfully in removeExecutor 14/06/17 10:57:36 DEBUG MapOutputTrackerMaster: Increasing epoch to 1 14/06/17 10:57:36 INFO DAGScheduler: Host added was in lost list earlier: ca1-dcc1-0065.lab.mtl I don't see any exceptions in the spark executor logs. The only error message I found in mesos itself is warnings in the mesos master: W0617 10:57:36.816748 26100 master.cpp:1615] Failed to validate task 21 : Task 21 attempted to use cpus(*):1 combined with already used cpus(*):1; mem(*):2048 is greater than offered mem(*):3216; disk(*):98304; ports(*):[11900-11919, 1192 1-11995, 11997-11999]; cpus(*):1 W0617 10:57:36.819807 26100 master.cpp:1615] Failed to validate task 22 : Task 22 attempted to use cpus(*):1 combined with already used cpus(*):1; mem(*):2048 is greater than offered mem(*):3216; disk(*):98304; ports(*):[11900-11919, 1192 1-11995, 11997-11999]; cpus(*):1 W0617 10:57:36.932287 26102 master.cpp:1615] Failed to validate task 28 : Task 28 attempted to use cpus(*):1 combined with already used cpus(*):1; mem(*):2048 is greater than offered cpus(*):1; mem(*):3216; disk(*):98304; ports(*):[11900- 11960, 11962-11978, 11980-11999] W0617 11:05:52.783133 26098 master.cpp:2106] Ignoring unknown exited executor 201311011608-1369465866-5050-9189-46 on slave 201311011608-1369465866-5050-9189-46 (ca1-dcc1-0065.lab.mtl) W0617 11:05:52.787739 26103 master.cpp:2106] Ignoring unknown exited executor 201311011608-1369465866-5050-9189-34 on slave 201311011608-1369465866-5050-9189-34 (ca1-dcc1-0053.lab.mtl) W0617 11:05:52.790292 26102 master.cpp:2106] Ignoring unknown exited executor 201311011608-1369465866-5050-9189-59 on slave 201311011608-1369465866-5050-9189-59 (ca1-dcc1-0079.lab.mtl) W0617 11:05:52.800649 26099 master.cpp:2106] Ignoring unknown exited executor 201311011608-1369465866-5050-9189-18 on slave 201311011608-1369465866-5050-9189-18 (ca1-dcc1-0027.lab.mtl) ... (more of those Ignoring unknown exited executor) I analyzed the difference in between the execution of the same job in coarse-grained mode and fine-grained mode, and I noticed that in the fine-grained mode the tasks get executed on executors different than the ones reported in spark, as if spark and mesos get out of sync as to which executor is responsible for which task. See the following: Coarse-grained mode: Spark Mesos Task IndexTask ID ExecutorStatusTask ID (UI)Task Name Task ID (logs)ExecutorState 0066SUCCESS 4Task 40 66RUNNING1 159SUCCESS0 Task 0159 RUNNING22 54SUCCESS10Task 10 254RUNNING 33128 SUCCESS6Task 6 3 128RUNNING ... Fine-grained mode: Spark Mesos Task IndexTask ID ExecutorTask ID (UI)Task NameTask ID (logs) ExecutorState0 23108SUCCESS 23task 0.0:023 27FINISHED0 1965 FAILED19 task 0.0:01986
Re: Serialization problem in Spark
did you try to register the class in Kryo serializer? Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Mon, Jun 23, 2014 at 7:00 PM, rrussell25 rrussel...@gmail.com wrote: Thanks for pointer...tried Kryo and ran into a strange error: org.apache.spark.SparkException: Job aborted due to stage failure: Exception while deserializing and fetching task: com.esotericsoftware.kryo.KryoException: Unable to find class: rg.apache.hadoop.hbase.io.ImmutableBytesWritable It is strange in that the complaint is for rg.apache... (missing o is not a typo). -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Serialization-problem-in-Spark-tp7049p8123.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Efficiently doing an analysis with Cartesian product (pyspark)
How about this.. map it to key,value pair, then reducebykey using max operation Then in the rdd you can do join with your lookup data reduce (if you only wanna lookup 2 values then you canuse lookup directly as well). PS: these are list of operations in Scala, I am not aware how far pyspark api is in those. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Tue, Jun 24, 2014 at 3:33 AM, Aaron aaron.doss...@target.com wrote: Sorry, I got my sample outputs wrong (1,1) - 400 (1,2) - 500 (2,2)- 600 On Jun 23, 2014, at 4:29 PM, Aaron Dossett [via Apache Spark User List] [hidden email] http://user/SendEmail.jtp?type=nodenode=8145i=0 wrote: I am relatively new to Spark and am getting stuck trying to do the following: - My input is integer key, value pairs where the key is not unique. I'm interested in information about all possible distinct key combinations, thus the Cartesian product. - My first attempt was to create a separate RDD of this cartesian product and then use map() to calculate the data. However, I was trying to pass another RDD to the function map was calling, which I eventually figured out was causing a run time error, even if the function I called with map did nothing. Here's a simple code example: --- def somefunc(x, y, RDD): return 0 input = sc.parallelize([(1,100), (1,200), (2, 100), (2,300)]) #Create all pairs of keys, including self-pairs itemPairs = input.map(lambda x: x[0]).distinct() itemPairs = itemPairs.cartesian(itemPairs) print itemPairs.collect() TC = itemPairs.map(lambda x: (x, somefunc(x[0], x[1], input))) print TC.collect() -- I'm assuming this isn't working because it isn't a very Spark-like way to do things and I could imagine that passing RDDs into other RDD's map functions might not make sense. Could someone suggest to me a way to apply transformations and actions to input that would produce a mapping of key pairs to some information related to the values. For example, I might want to (1, 2) to map to the sum of the maximum values found for each key in the input (500 in my sample data above). Extending that example (1,1) would map to 300 and (2,2) to 400. Please let me know if I should provide more details or a more robust example. Thank you, Aaron -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Efficiently-doing-an-analysis-with-Cartesian-product-pyspark-tp8144.html This email was sent by Aaron Dossett http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=user_nodesuser=1353 (via Nabble) To receive all replies by email, subscribe to this discussion http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=subscribe_by_codenode=8144code=YWFyb24uZG9zc2V0dEB0YXJnZXQuY29tfDgxNDR8MTM3NjcxOTg5 -- View this message in context: Re: Efficiently doing an analysis with Cartesian product (pyspark) http://apache-spark-user-list.1001560.n3.nabble.com/Efficiently-doing-an-analysis-with-Cartesian-product-pyspark-tp8144p8145.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.
Re: balancing RDDs
This would be really useful. Especially for Shark where shift of partitioning effects all subsequent queries unless task scheduling time beats spark.locality.wait. Can cause overall low performance for all subsequent tasks. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Tue, Jun 24, 2014 at 4:10 AM, Sean McNamara sean.mcnam...@webtrends.com wrote: We have a use case where we’d like something to execute once on each node and I thought it would be good to ask here. Currently we achieve this by setting the parallelism to the number of nodes and use a mod partitioner: val balancedRdd = sc.parallelize( (0 until Settings.parallelism) .map(id = id - Settings.settings) ).partitionBy(new ModPartitioner(Settings.parallelism)) .cache() This works great except in two instances where it can become unbalanced: 1. if a worker is restarted or dies, the partition will move to a different node (one of the nodes will run two tasks). When the worker rejoins, is there a way to have a partition move back over to the newly restarted worker so that it’s balanced again? 2. drivers need to be started in a staggered fashion, otherwise one driver can launch two tasks on one set of workers, and the other driver will do the same with the other set. Are there any scheduler/config semantics so that each driver will take one (and only one) core from *each* node? Thanks Sean
Re: How to Reload Spark Configuration Files
Not really. You are better off using a cluster manager like Mesos or Yarn for this. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Tue, Jun 24, 2014 at 11:35 AM, Sirisha Devineni sirisha_devin...@persistent.co.in wrote: Hi All, I am working with Spark to add new slaves automatically when there is more data to be processed by the cluster. During this process there is question arisen, after adding/removing new slave node to/from the spark cluster do we need to restart master and other existing slaves in the cluster? From my observations: 1. If a new slave node details are added in configuration files(/root/spark/conf/salves) on master node , running “start-slaves.sh” script will add the new slave to cluster without affecting existing slaves or master. 2. If a slave details are removed from the configuration file, one need to restart master using stop-master.sh and start-master.sh scripts to take effect. Is there any reload option available in Spark to load the changed configuration files without stopping the services. Here stopping the service of master or existing salves may lead to outage of services. You can find the options available to start/stop the services of spark at http://spark.apache.org/docs/latest/spark-standalone.html Thanks Regards, Sirisha Devineni. DISCLAIMER == This e-mail may contain privileged and confidential information which is the property of Persistent Systems Ltd. It is intended only for the use of the individual or entity to which it is addressed. If you are not the intended recipient, you are not authorized to read, retain, copy, print, distribute or use this message. If you have received this communication in error, please notify the sender and delete all copies of this message. Persistent Systems Ltd. does not accept any liability for virus infected mails.
Re: Questions regarding different spark pre-built packages
HDFS driver keeps changing breaking compatibility, hence all the build versions. If you dont use HDFS/YARN then you can safely ignore it. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Tue, Jun 24, 2014 at 12:16 PM, Sourav Chandra sourav.chan...@livestream.com wrote: Hi, I am just curious to know what are the difference between the prebuilt packages for Hadoop1, 2, CDH etc. I am using spark standalone cluster and we dont use hadoop at all. Can we use any one of the pre-buil;t packages OR we have to run make-distribution.sh script from the code? Thanks, -- Sourav Chandra Senior Software Engineer · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · sourav.chan...@livestream.com o: +91 80 4121 8723 m: +91 988 699 3746 skype: sourav.chandra Livestream Ajmera Summit, First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area, Bangalore 560034 www.livestream.com
Re: partitions, coalesce() and parallelism
To be clear number of map tasks are determined by number of partitions inside the rdd hence the suggestion by Nicholas. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Wed, Jun 25, 2014 at 4:17 AM, Nicholas Chammas nicholas.cham...@gmail.com wrote: So do you get 2171 as the output for that command? That command tells you how many partitions your RDD has, so it’s good to first confirm that rdd1 has as many partitions as you think it has. On Tue, Jun 24, 2014 at 4:22 PM, Alex Boisvert alex.boisv...@gmail.com wrote: It's actually a set of 2171 S3 files, with an average size of about 18MB. On Tue, Jun 24, 2014 at 1:13 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: What do you get for rdd1._jrdd.splits().size()? You might think you’re getting 100 partitions, but it may not be happening. On Tue, Jun 24, 2014 at 3:50 PM, Alex Boisvert alex.boisv...@gmail.com wrote: With the following pseudo-code, val rdd1 = sc.sequenceFile(...) // has 100 partitions val rdd2 = rdd1.coalesce(100) val rdd3 = rdd2 map { ... } val rdd4 = rdd3.coalesce(2) val rdd5 = rdd4.saveAsTextFile(...) // want only two output files I would expect the parallelism of the map() operation to be 100 concurrent tasks, and the parallelism of the save() operation to be 2. However, it appears the parallelism of the entire chain is 2 -- I only see two tasks created for the save() operation and those tasks appear to execute the map() operation as well. Assuming what I'm seeing is as-specified (meaning, how things are meant to be), what's the recommended way to force a parallelism of 100 on the map() operation? thanks!
Re: ElasticSearch enrich
Mostly ES client is not serializable for you. You can do 3 workarounds, 1. Switch to kryo serialization, register the client in kryo , might solve your serialization issue 2. Use mappartition for all your data initialize your client in the mappartition code, this will create client for each partition, reduce some parallelism add some overhead of creation of client but prevent serialization of esclient transfer to workers 3. Use serializablewrapper to serialize your ESclient manually send it across deserialize it manually, this may or may not work depending on whether your class is safely serializable. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Wed, Jun 25, 2014 at 4:12 AM, boci boci.b...@gmail.com wrote: Hi guys, I have a small question. I want to create a Worker class which using ElasticClient to make query to elasticsearch. (I want to enrich my data with geo search result). How can I do that? I try to create a worker instance with ES host/port parameter but spark throw an exceptino (my class not serializable). Any idea? Thanks b0c1
Re: ElasticSearch enrich
Its not used as default serializer for some issues with compatibility requirement to register the classes.. Which part are you getting as nonserializable... you need to serialize that class if you are sending it to spark workers inside a map, reduce , mappartition or any of the operations on RDD. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Wed, Jun 25, 2014 at 4:52 AM, Peng Cheng pc...@uow.edu.au wrote: I'm afraid persisting connection across two tasks is a dangerous act as they can't be guaranteed to be executed on the same machine. Your ES server may think its a man-in-the-middle attack! I think its possible to invoke a static method that give you a connection in a local 'pool', so nothing will sneak into your closure, but its too complex and there should be a better option. Never use kryo before, if its that good perhaps we should use it as the default serializer -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8222.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Running Spark alongside Hadoop
The ideal way to do that is to use a cluster manager like Yarn mesos. You can control how much resources to give to which node etc. You should be able to run both together in standalone mode however you may experience varying latency performance in the cluster as both MR spark demand resources from same machines etc. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Fri, Jun 20, 2014 at 3:41 PM, Sameer Tilak ssti...@live.com wrote: Dear Spark users, I have a small 4 node Hadoop cluster. Each node is a VM -- 4 virtual cores, 8GB memory and 500GB disk. I am currently running Hadoop on it. I would like to run Spark (in standalone mode) along side Hadoop on the same nodes. Given the configuration of my nodes, will that work? Does anyone has any experience in terms of stability and performance of running Spark and Hadoop on somewhat resource-constrained nodes. I was looking at the Spark documentation and there is a way to configure memory and cores for the and worker nodes and memory for the master node: SPARK_WORKER_CORES, SPARK_WORKER_MEMORY, SPARK_DAEMON_MEMORY. Any recommendations on how to share resource between HAdoop and Spark?
Re: Spark and RDF
You are looking to create Shark operators for RDF? Since Shark backend is shifting to SparkSQL it would be slightly hard but much better effort would be to shift Gremlin to Spark (though a much beefier one :) ) Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Fri, Jun 20, 2014 at 3:39 PM, andy petrella andy.petre...@gmail.com wrote: For RDF, may GraphX be particularly approriated? aℕdy ℙetrella about.me/noootsab [image: aℕdy ℙetrella on about.me] http://about.me/noootsab On Thu, Jun 19, 2014 at 4:49 PM, Flavio Pompermaier pomperma...@okkam.it wrote: Hi guys, I'm analyzing the possibility to use Spark to analyze RDF files and define reusable Shark operators on them (custom filtering, transforming, aggregating, etc). Is that possible? Any hint? Best, Flavio
Re: Spark and RDF
or a seperate RDD for sparql operations ala SchemaRDD .. operators for sparql can be defined thr.. not a bad idea :) Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Fri, Jun 20, 2014 at 3:56 PM, andy petrella andy.petre...@gmail.com wrote: Maybe some SPARQL features in Shark, then ? aℕdy ℙetrella about.me/noootsab [image: aℕdy ℙetrella on about.me] http://about.me/noootsab On Fri, Jun 20, 2014 at 9:45 PM, Mayur Rustagi mayur.rust...@gmail.com wrote: You are looking to create Shark operators for RDF? Since Shark backend is shifting to SparkSQL it would be slightly hard but much better effort would be to shift Gremlin to Spark (though a much beefier one :) ) Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Fri, Jun 20, 2014 at 3:39 PM, andy petrella andy.petre...@gmail.com wrote: For RDF, may GraphX be particularly approriated? aℕdy ℙetrella about.me/noootsab [image: aℕdy ℙetrella on about.me] http://about.me/noootsab On Thu, Jun 19, 2014 at 4:49 PM, Flavio Pompermaier pomperma...@okkam.it wrote: Hi guys, I'm analyzing the possibility to use Spark to analyze RDF files and define reusable Shark operators on them (custom filtering, transforming, aggregating, etc). Is that possible? Any hint? Best, Flavio
Re: Set the number/memory of workers under mesos
You should be able to configure in spark context in Spark shell. spark.cores.max memory. Regards Mayur Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Fri, Jun 20, 2014 at 4:30 PM, Shuo Xiang shuoxiang...@gmail.com wrote: Hi, just wondering anybody knows how to set up the number of workers (and the amount of memory) in mesos, while lauching spark-shell? I was trying to edit conf/spark-env.sh and it looks like that the environment variables are for YARN of standalone. Thanks!
Re: list of persisted rdds
val myRdds = sc.getPersistentRDDs assert(myRdds.size === 1) It'll return a map. Its pretty old 0.8.0 onwards. Regards Mayur Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Fri, Jun 13, 2014 at 9:42 AM, mrm ma...@skimlinks.com wrote: Hi Daniel, Thank you for your help! This is the sort of thing I was looking for. However, when I type sc.getPersistentRDDs, i get the error AttributeError: 'SparkContext' object has no attribute 'getPersistentRDDs'. I don't get any error when I type sc.defaultParallelism for example. I would appreciate it if you could help me with this, I have tried different ways and googling it! I suspect it might be a silly error but I can't figure it out. Maria -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/list-of-persisted-rdds-tp7564p7569.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Master not seeing recovered nodes(Got heartbeat from unregistered worker ....)
I have also had trouble in worker joining the working set. I have typically moved to Mesos based setup. Frankly for high availability you are better off using a cluster manager. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Fri, Jun 13, 2014 at 8:57 AM, Yana Kadiyska yana.kadiy...@gmail.com wrote: Hi, I see this has been asked before but has not gotten any satisfactory answer so I'll try again: (here is the original thread I found: http://mail-archives.apache.org/mod_mbox/spark-user/201403.mbox/%3c1394044078706-2312.p...@n3.nabble.com%3E ) I have a set of workers dying and coming back again. The master prints the following warning: Got heartbeat from unregistered worker What is the solution to this -- rolling the master is very undesirable to me as I have a Shark context sitting on top of it (it's meant to be highly available). Insights appreciated -- I don't think an executor going down is very unexpected but it does seem odd that it won't be able to rejoin the working set. I'm running Spark 0.9.1 on CDH
Re: multiple passes in mapPartitions
Sorry if this is a dumb question but why not several calls to map-partitions sequentially. Are you looking to avoid function serialization or is your function damaging partitions? Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Fri, Jun 13, 2014 at 1:30 AM, zhen z...@latrobe.edu.au wrote: I want to take multiple passes through my data in mapPartitions. However, the iterator only allows you to take one pass through the data. If I transformed the iterator into an array using iter.toArray, it is too slow, since it copies all the data into a new scala array. Also it takes twice the memory. Which is also bad in terms of more GC. Is there a faster/better way of taking multiple passes without copying all the data? Thank you, Zhen -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/multiple-passes-in-mapPartitions-tp7555.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: specifying fields for join()
You can resolve the columns to create keys using them.. then join. Is that what you did? Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Thu, Jun 12, 2014 at 9:24 PM, SK skrishna...@gmail.com wrote: This issue is resolved. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/specifying-fields-for-join-tp7528p7544.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Writing data to HBase using Spark
Are you able to use HadoopInputoutput reader for hbase in new hadoop Api reader? Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Thu, Jun 12, 2014 at 7:49 AM, gaurav.dasgupta gaurav.d...@gmail.com wrote: Is there anyone else who is facing this problem of writing to HBase when running Spark on YARN mode or Standalone mode using this example? If not, then do I need to explicitly, specify something in the classpath? Regards, Gaurav On Wed, Jun 11, 2014 at 1:53 PM, Gaurav Dasgupta [hidden email] http://user/SendEmail.jtp?type=nodenode=7474i=0 wrote: Hi Kanwaldeep, I have tried your code but arrived into a problem. The code is working fine in *local* mode. But if I run the same code in Spark stand alone mode or YARN mode, then it is continuously executing, but not saving anything in the HBase table. I guess, it is stopping data streaming once the *saveToHBase* method is called for the first time. This is strange. I just want to know whether you have tested the code on all Spark execution modes? Thanks, Gaurav On Tue, Jun 10, 2014 at 12:20 PM, Kanwaldeep [via Apache Spark User List] [hidden email] http://user/SendEmail.jtp?type=nodenode=7474i=1 wrote: Please see sample code attached at https://issues.apache.org/jira/browse/SPARK-944. -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Writing-data-to-HBase-using-Spark-tp7304p7305.html To start a new topic under Apache Spark User List, email [hidden email] http://user/SendEmail.jtp?type=nodenode=7474i=2 To unsubscribe from Apache Spark User List, click here. NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- View this message in context: Re: Writing data to HBase using Spark http://apache-spark-user-list.1001560.n3.nabble.com/Writing-data-to-HBase-using-Spark-tp7304p7474.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.
Re: Spark Streaming, download a s3 file to run a script shell on it
So you can run a job / spark job to get data to disk/hdfs. Then run a dstream from a hdfs folder. As you move your files, the dstream will kick in. Regards Mayur On 6 Jun 2014 21:13, Gianluca Privitera gianluca.privite...@studio.unibo.it wrote: Where are the API for QueueStream and RddQueue? In my solution I cannot open a DStream with S3 location because I need to run a script on the file (a script that unluckily doesn't accept stdin as input), so I have to download it on my disk somehow than handle it from there before creating the stream. Thanks Gianluca On 06/06/2014 02:19, Mayur Rustagi wrote: You can look to create a Dstream directly from S3 location using file stream. If you want to use any specific logic you can rely on Queuestream read data yourself from S3, process it push it into RDDQueue. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Fri, Jun 6, 2014 at 3:00 AM, Gianluca Privitera gianluca.privite...@studio.unibo.it wrote: Hi, I've got a weird question but maybe someone has already dealt with it. My Spark Streaming application needs to - download a file from a S3 bucket, - run a script with the file as input, - create a DStream from this script output. I've already got the second part done with the rdd.pipe() API that really fits my request, but I have no idea how to manage the first part. How can I manage to download a file and run a script on them inside a Spark Streaming Application? Should I use process() from Scala or it won't work? Thanks Gianluca
Re: Spark Streaming, download a s3 file to run a script shell on it
QueueStream example is in Spark Streaming examples: http://www.boyunjian.com/javasrc/org.spark-project/spark-examples_2.9.3/0.7.2/_/spark/streaming/examples/QueueStream.scala Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Sat, Jun 7, 2014 at 6:41 PM, Mayur Rustagi mayur.rust...@gmail.com wrote: So you can run a job / spark job to get data to disk/hdfs. Then run a dstream from a hdfs folder. As you move your files, the dstream will kick in. Regards Mayur On 6 Jun 2014 21:13, Gianluca Privitera gianluca.privite...@studio.unibo.it wrote: Where are the API for QueueStream and RddQueue? In my solution I cannot open a DStream with S3 location because I need to run a script on the file (a script that unluckily doesn't accept stdin as input), so I have to download it on my disk somehow than handle it from there before creating the stream. Thanks Gianluca On 06/06/2014 02:19, Mayur Rustagi wrote: You can look to create a Dstream directly from S3 location using file stream. If you want to use any specific logic you can rely on Queuestream read data yourself from S3, process it push it into RDDQueue. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Fri, Jun 6, 2014 at 3:00 AM, Gianluca Privitera gianluca.privite...@studio.unibo.it wrote: Hi, I've got a weird question but maybe someone has already dealt with it. My Spark Streaming application needs to - download a file from a S3 bucket, - run a script with the file as input, - create a DStream from this script output. I've already got the second part done with the rdd.pipe() API that really fits my request, but I have no idea how to manage the first part. How can I manage to download a file and run a script on them inside a Spark Streaming Application? Should I use process() from Scala or it won't work? Thanks Gianluca
Re: Spark Streaming, download a s3 file to run a script shell on it
You can look to create a Dstream directly from S3 location using file stream. If you want to use any specific logic you can rely on Queuestream read data yourself from S3, process it push it into RDDQueue. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Fri, Jun 6, 2014 at 3:00 AM, Gianluca Privitera gianluca.privite...@studio.unibo.it wrote: Hi, I've got a weird question but maybe someone has already dealt with it. My Spark Streaming application needs to - download a file from a S3 bucket, - run a script with the file as input, - create a DStream from this script output. I've already got the second part done with the rdd.pipe() API that really fits my request, but I have no idea how to manage the first part. How can I manage to download a file and run a script on them inside a Spark Streaming Application? Should I use process() from Scala or it won't work? Thanks Gianluca
Re: Serialization problem in Spark
Where are you getting serialization error. Its likely to be a different problem. Which class is not getting serialized? Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Thu, Jun 5, 2014 at 6:32 PM, Vibhor Banga vibhorba...@gmail.com wrote: Any inputs on this will be helpful. Thanks, -Vibhor On Thu, Jun 5, 2014 at 3:41 PM, Vibhor Banga vibhorba...@gmail.com wrote: Hi, I am trying to do something like following in Spark: JavaPairRDDbyte[], MyObject eventRDD = hBaseRDD.map(new PairFunctionTuple2ImmutableBytesWritable, Result, byte[], MyObject () { @Override public Tuple2byte[], MyObject call(Tuple2ImmutableBytesWritable, Result immutableBytesWritableResultTuple2) throws Exception { return new Tuple2byte[], MyObject (immutableBytesWritableResultTuple2._1.get(), MyClass.get(immutableBytesWritableResultTuple2._2)); } }); eventRDD.foreach(new VoidFunctionTuple2byte[], Event() { @Override public void call(Tuple2byte[], Event eventTuple2) throws Exception { processForEvent(eventTuple2._2); } }); processForEvent() function flow contains some processing and ultimately writing to HBase Table. But I am getting serialisation issues with Hadoop and HBase inbuilt classes. How do I solve this ? Does using Kyro Serialisation help in this case ? Thanks, -Vibhor -- Vibhor Banga Software Development Engineer Flipkart Internet Pvt. Ltd., Bangalore
Re: Using mongo with PySpark
Yes initialization each turn is hard.. you seem to using python. Another risky thing you can try is to serialize the mongoclient object using any serializer (like kryo wrappers in python) pass it on to mappers.. then in each mapper you'll just have to unserialize it use it directly... This may or may not work for you depending on internals of Mongodb client. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Wed, Jun 4, 2014 at 10:27 PM, Samarth Mailinglist mailinglistsama...@gmail.com wrote: Thanks a lot, sorry for the really late reply! (Didn't have my laptop) This is working, but it's dreadfully slow and seems to not run in parallel? On Mon, May 19, 2014 at 2:54 PM, Nick Pentreath nick.pentre...@gmail.com wrote: You need to use mapPartitions (or foreachPartition) to instantiate your client in each partition as it is not serializable by the pickle library. Something like def mapper(iter): db = MongoClient()['spark_test_db'] *collec = db['programs']* *for val in iter:* asc = val.encode('ascii','ignore') json = convertToJSON(asc, indexMap) yield collec.insert(json) def convertToJSON(string, indexMap): values = string.strip().split(,) json = {} for i in range(len(values)): json[indexMap[i]] = values[i] return json *doc_ids = data.mapPartitions(mapper)* On Mon, May 19, 2014 at 8:00 AM, Samarth Mailinglist mailinglistsama...@gmail.com wrote: db = MongoClient()['spark_test_db'] *collec = db['programs']* def mapper(val): asc = val.encode('ascii','ignore') json = convertToJSON(asc, indexMap) collec.insert(json) # *this is not working* def convertToJSON(string, indexMap): values = string.strip().split(,) json = {} for i in range(len(values)): json[indexMap[i]] = values[i] return json *jsons = data.map(mapper)* *The last line does the mapping. I am very new to Spark, can you explain what explicit serialization, etc is in the context of spark? The error I am getting:* *Traceback (most recent call last): File stdin, line 1, in module File /usr/local/spark-0.9.1/python/pyspark/rdd.py, line 712, in saveAsTextFile keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path) File /usr/local/spark-0.9.1/python/pyspark/rdd.py, line 1178, in _jrdd pickled_command = CloudPickleSerializer().dumps(command) File /usr/local/spark-0.9.1/python/pyspark/serializers.py, line 275, in dumps def dumps(self, obj): return cloudpickle.dumps(obj, 2) File /usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 801, in dumps cp.dump(obj) File /usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 140, in dump return pickle.Pickler.dump(self, obj) File /usr/lib/python2.7/pickle.py, line 224, in dump self.save(obj) File /usr/lib/python2.7/pickle.py, line 286, in savef(self, obj) # Call unbound method with explicit self File /usr/lib/python2.7/pickle.py, line 548, in save_tuple save(element) File /usr/lib/python2.7/pickle.py, line 286, in savef(self, obj) # Call unbound method with explicit self File /usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 259, in save_function self.save_function_tuple(obj, [themodule]) File /usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 316, in save_function_tuplesave(closure) File /usr/lib/python2.7/pickle.py, line 286, in save f(self, obj) # Call unbound method with explicit self File /usr/lib/python2.7/pickle.py, line 600, in save_list self._batch_appends(iter(obj)) File /usr/lib/python2.7/pickle.py, line 633, in _batch_appends save(x) File /usr/lib/python2.7/pickle.py, line 286, in savef(self, obj) # Call unbound method with explicit self File /usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 259, in save_function self.save_function_tuple(obj, [themodule]) File /usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 316, in save_function_tuplesave(closure) File /usr/lib/python2.7/pickle.py, line 286, in save f(self, obj) # Call unbound method with explicit self File /usr/lib/python2.7/pickle.py, line 600, in save_list self._batch_appends(iter(obj)) File /usr/lib/python2.7/pickle.py, line 636, in _batch_appends save(tmp[0]) File /usr/lib/python2.7/pickle.py, line 286, in savef(self, obj) # Call unbound method with explicit self File /usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 254, in save_function self.save_function_tuple(obj, modList) File /usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 314, in save_function_tuplesave(f_globals) File /usr/lib/python2.7/pickle.py, line 286, in save f(self, obj) # Call unbound method with explicit self File /usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 181, in save_dictpickle.Pickler.save_dict(self, obj) File
Re: stage kill link is awfully close to the stage name
And then a are you sure after that :) On 7 Jun 2014 06:59, Mikhail Strebkov streb...@gmail.com wrote: Nick Chammas wrote I think it would be better to have the kill link flush right, leaving a large amount of space between it the stage detail link. I think even better would be to have a pop-up confirmation Do you really want to kill this stage? :) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/stage-kill-link-is-awfully-close-to-the-stage-name-tp7153p7154.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Error related to serialisation in spark streaming
I had issues around embedded functions here's what I have figured. Every inner class actually contains a field referencing the outer class. The anonymous class actually has a this$0 field referencing the outer class, and thus why Spark is trying to serialize Outer class. In the Scala API, the closure (which is really just implemented as anonymous classes) has a field called $outer, and Spark uses a closure cleaner that goes into the anonymous class to remove the $outer field if it is not used in the closure itself. In Java, the compiler generates a field called this$0, and thus the closure cleaner doesn't find it and can't clean it properly. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Wed, Jun 4, 2014 at 4:18 PM, nilmish nilmish@gmail.com wrote: The error is resolved. I was using a comparator which was not serialised because of which it was throwing the error. I have now switched to kryo serializer as it is faster than java serialser. I have set the required config conf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer); conf.set(spark.kryo.registrator, MyRegistrator); and also in MyRegistrator class I have registered all the classes I am serialising. How can I confirm that my code is actually using kryo serialiser and not java serialiser now ? PS : It seems like my code is still not using kryo serialiser. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Error-related-to-serialisation-in-spark-streaming-tp6801p6904.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Error related to serialisation in spark streaming
So are you using Java 7 or 8. 7 doesnt clean closures properly. So you need to define a static class as a function then call that in your operations. Else it'll try to send the whole class along with the function. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Tue, Jun 3, 2014 at 7:19 PM, Sean Owen so...@cloudera.com wrote: Sorry if I'm dense but is OptimisingSort your class? it's saying you have included something from it in a function that is shipped off to remote workers but something in it is not java.io.Serializable. OptimisingSort$6$1 needs to be Serializable. On Tue, Jun 3, 2014 at 2:23 PM, nilmish nilmish@gmail.com wrote: I am using the following code segment : countPerWindow.foreachRDD(new FunctionJavaPairRDDlt;String, Long, Void() { @Override public Void call(JavaPairRDDString, Long rdd) throws Exception { ComparatorTuple2lt;String,Long comp = new ComparatorTuple2lt;String,Long () { public int compare(Tuple2String,Long tupleA, Tuple2String,Long tupleB) { return 1-tupleA._2.compareTo(tupleB._2); } }; Listscala.Tuple2lt;String,Long top = rdd.top(5,comp); // creating error System.out.println(Top 5 are : ); for(int i=0;itop.size();++i) { System.out.println(top.get(i)._2 + + top.get(i)._1); } return null; } }); } I am getting the following error related to serialisation : org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException Detailed Error : INFO org.apache.spark.scheduler.DAGScheduler - Failed to run top at OptimisingSort.java:173 2014-06-03 13:10:57,180 [spark-akka.actor.default-dispatcher-14] ERROR org.apache.spark.streaming.scheduler.JobScheduler - Error running job streaming job 1401801057000 ms.2 org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: OptimisingSort$6$1 at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026) at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:794) at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:737) How can I remove this error ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Error-related-to-serialisation-in-spark-streaming-tp6801.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Reg: Add/Remove slave nodes spark-ec2
You'll have to restart the cluster.. create copy of your existing slave.. add it to slave files in master restart the cluster Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Tue, Jun 3, 2014 at 4:30 PM, Sirisha Devineni sirisha_devin...@persistent.co.in wrote: Hi All, I have created a spark cluster on EC2 using spark-ec2 script. Whenever more data is there to be processed I would like to add new slaves to existing cluster and would like to remove slave node when the data to be processed is low. It seems currently spark-ec2 doesn’t have option to add/remove slaves to existing cluster. Could you please suggest how can we achieve this? *One liner problem statement*: How to add/remove slaves to/from Spark cluster on EC2 using spark-ec2? Suggestions on when to add/remove slaves is much appreciated. Thanks Regards, Sirisha Devineni DISCLAIMER == This e-mail may contain privileged and confidential information which is the property of Persistent Systems Ltd. It is intended only for the use of the individual or entity to which it is addressed. If you are not the intended recipient, you are not authorized to read, retain, copy, print, distribute or use this message. If you have received this communication in error, please notify the sender and delete all copies of this message. Persistent Systems Ltd. does not accept any liability for virus infected mails.
Re: WebUI's Application count doesn't get updated
Did you use docker or plain lxc specifically? Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Tue, Jun 3, 2014 at 1:40 PM, MrAsanjar . afsan...@gmail.com wrote: thanks guys, that fixed my problem. As you might have noticed, I am VERY new to spark. Building a spark cluster using LXC has been a challenge. On Tue, Jun 3, 2014 at 2:49 AM, Akhil Das ak...@sigmoidanalytics.com wrote: As Andrew said, your application is running on Standalone mode. You need to pass MASTER=spark://sanjar-local-machine-1:7077 before running your sparkPi example. Thanks Best Regards On Tue, Jun 3, 2014 at 1:12 PM, MrAsanjar . afsan...@gmail.com wrote: Thanks for your reply Andrew. I am running applications directly on the master node. My cluster also contain three worker nodes, all are visible on WebUI. Spark Master at spark://sanjar-local-machine-1:7077 - *URL:* spark://sanjar-local-machine-1:7077 - *Workers:* 3 - *Cores:* 24 Total, 0 Used - *Memory:* 43.7 GB Total, 0.0 B Used - *Applications:* 0 Running, 0 Completed - *Drivers:* 0 Running, 0 Completed - *Status:* ALIVE Workers Id AddressState CoresMemory worker-20140603013834-sanjar-local-machine-2-43334 http://sanjar-local-machine-2:8081/ sanjar-local-machine-2:43334 ALIVE 8 (0 Used)14.6 GB (0.0 B Used) worker-20140603015921-sanjar-local-machine-3-51926 http://sanjar-local-machine-3:8081/ sanjar-local-machine-3:51926 ALIVE8 (0 Used) 14.6 GB (0.0 B Used) worker-20140603020250-sanjar-local-machine-4-43167 http://sanjar-local-machine-4:8081/ sanjar-local-machine-4:43167 ALIVE8 (0 Used) 14.6 GB (0.0 B Used) Running Applications ID NameCores Memory per NodeSubmitted Time User State Duration Completed Applications ID NameCores Memory per NodeSubmitted Time User State Duration On Tue, Jun 3, 2014 at 2:33 AM, Andrew Ash and...@andrewash.com wrote: Your applications are probably not connecting to your existing cluster and instead running in local mode. Are you passing the master URL to the SparkPi application? Andrew On Tue, Jun 3, 2014 at 12:30 AM, MrAsanjar . afsan...@gmail.com wrote: - HI all, - Application running and completed count does not get updated, it is always zero. I have ran - SparkPi application at least 10 times. please help - - *Workers:* 3 - *Cores:* 24 Total, 0 Used - *Memory:* 43.7 GB Total, 0.0 B Used - *Applications:* 0 Running, 0 Completed - *Drivers:* 0 Running, 0 Completed - *Status:* ALIVE
Re: Using Spark on Data size larger than Memory size
Clearly thr will be impact on performance but frankly depends on what you are trying to achieve with the dataset. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Sat, May 31, 2014 at 11:45 AM, Vibhor Banga vibhorba...@gmail.com wrote: Some inputs will be really helpful. Thanks, -Vibhor On Fri, May 30, 2014 at 7:51 PM, Vibhor Banga vibhorba...@gmail.com wrote: Hi all, I am planning to use spark with HBase, where I generate RDD by reading data from HBase Table. I want to know that in the case when the size of HBase Table grows larger than the size of RAM available in the cluster, will the application fail, or will there be an impact in performance ? Any thoughts in this direction will be helpful and are welcome. Thanks, -Vibhor -- Vibhor Banga Software Development Engineer Flipkart Internet Pvt. Ltd., Bangalore
Re: Failed to remove RDD error
You can increase your akka timeout, should give you some more life.. are you running out of memory by any chance? Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Sat, May 31, 2014 at 6:52 AM, Michael Chang m...@tellapart.com wrote: I'm running a some kafka streaming spark contexts (on 0.9.1), and they seem to be dying after 10 or so minutes with a lot of these errors. I can't really tell what's going on here, except that maybe the driver is unresponsive somehow? Has anyone seen this before? 14/05/31 01:13:30 ERROR BlockManagerMaster: Failed to remove RDD 12635 akka.pattern.AskTimeoutException: Timed out at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334) at akka.actor.Scheduler$$anon$11.run(Scheduler.scala:118) at scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:691) at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:688) at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:455) at akka.actor.LightArrayRevolverScheduler$$anon$12.executeBucket$1(Scheduler.scala:407) at akka.actor.LightArrayRevolverScheduler$$anon$12.nextTick(Scheduler.scala:411) at akka.actor.LightArrayRevolverScheduler$$anon$12.run(Scheduler.scala:363) at java.lang.Thread.run(Thread.java:744) Thanks, Mike
Re: Reading bz2 files that do not end with .bz2
You can use Hadoop APi provide input/output reader hadoop configuration file to read the data. Regards Mayur Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Wed, May 28, 2014 at 7:22 PM, Laurent T laurent.thou...@ldmobile.netwrote: Hi, I have a bunch of files that are bz2 compressed but do not have the extension .bz2 Is there anyway to force spark to read them as bz2 files using sc.textFile ? FYI, if i add the .bz2 extension to the file it works fine but the process that creates those files can't do that and i'd like to find another way to make this work than renaming all the files before executing my Spark job. Thanks Regards Laurent -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Reading-bz2-files-that-do-not-end-with-bz2-tp6473.html Sent from the Apache Spark User List mailing list archive at Nabble.com.