Re: Need Help in Spark Hive Data Processing
It depends on how you fetch the single row. Does your query complex ? On Thu, Jan 7, 2016 at 12:47 PM, Balaraju.Kagidala Kagidala < balaraju.kagid...@gmail.com> wrote: > Hi , > > I am new user to spark. I am trying to use Spark to process huge Hive > data using Spark DataFrames. > > > I have 5 node Spark cluster each with 30 GB memory. i am want to process > hive table with 450GB data using DataFrames. To fetch single row from Hive > table its taking 36 mins. Pls suggest me what wrong here and any help is > appreciated. > > > Thanks > Bala > > > -- Best Regards Jeff Zhang
Can spark.scheduler.pool be applied globally ?
It seems currently spark.scheduler.pool must be set as localProperties (associate with thread). Any reason why spark.scheduler.pool can not be used globally. My scenario is that I want my thriftserver started with fair scheduler as the default pool without using set command to set the pool. Is there anyway to do that ? Or do I miss anything here ? -- Best Regards Jeff Zhang
Re: Can spark.scheduler.pool be applied globally ?
Thanks Mark, custom configuration file would be better for me. Changing code will make it affect all the applications, this is too risky for me. On Wed, Jan 6, 2016 at 10:50 AM, Mark Hamstra <m...@clearstorydata.com> wrote: > The other way to do it is to build a custom version of Spark where you > have changed the value of DEFAULT_SCHEDULING_MODE -- and if you were > paying close attention, I accidentally let it slip that that is what I've > done. I previously wrote "schedulingMode = DEFAULT_SCHEDULING_MODE -- > i.e. SchedulingMode.FAIR", but that should actually be SchedulingMode.FIFO > if you haven't changed the code: > https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala#L65 > > On Tue, Jan 5, 2016 at 5:29 PM, Jeff Zhang <zjf...@gmail.com> wrote: > >> Right, I can override the root pool in configuration file, Thanks Mark. >> >> On Wed, Jan 6, 2016 at 8:45 AM, Mark Hamstra <m...@clearstorydata.com> >> wrote: >> >>> Just configure with >>> FAIR in fairscheduler.xml (or >>> in spark.scheduler.allocation.file if you have over-riden the default name >>> for the config file.) `buildDefaultPool()` will only build the pool named >>> "default" with the default properties (such as schedulingMode = >>> DEFAULT_SCHEDULING_MODE -- i.e. SchedulingMode.FAIR) if that pool name is >>> not already built ( >>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala#L90 >>> ). >>> >>> >>> On Tue, Jan 5, 2016 at 4:15 PM, Jeff Zhang <zjf...@gmail.com> wrote: >>> >>>> Sorry, I don't make it clearly. What I want is the default pool is fair >>>> scheduling. But seems if I want to use fair scheduling now, I have to set >>>> spark.scheduler.pool explicitly. >>>> >>>> On Wed, Jan 6, 2016 at 2:03 AM, Mark Hamstra <m...@clearstorydata.com> >>>> wrote: >>>> >>>>> I don't understand. If you're using fair scheduling and don't set a >>>>> pool, the default pool will be used. >>>>> >>>>> On Tue, Jan 5, 2016 at 1:57 AM, Jeff Zhang <zjf...@gmail.com> wrote: >>>>> >>>>>> >>>>>> It seems currently spark.scheduler.pool must be set as >>>>>> localProperties (associate with thread). Any reason why >>>>>> spark.scheduler.pool can not be used globally. My scenario is that I >>>>>> want >>>>>> my thriftserver started with fair scheduler as the default pool without >>>>>> using set command to set the pool. Is there anyway to do that ? Or do I >>>>>> miss anything here ? >>>>>> >>>>>> -- >>>>>> Best Regards >>>>>> >>>>>> Jeff Zhang >>>>>> >>>>> >>>>> >>>> >>>> >>>> -- >>>> Best Regards >>>> >>>> Jeff Zhang >>>> >>> >>> >> >> >> -- >> Best Regards >> >> Jeff Zhang >> > > -- Best Regards Jeff Zhang
Re: Can spark.scheduler.pool be applied globally ?
Right, I can override the root pool in configuration file, Thanks Mark. On Wed, Jan 6, 2016 at 8:45 AM, Mark Hamstra <m...@clearstorydata.com> wrote: > Just configure with > FAIR in fairscheduler.xml (or > in spark.scheduler.allocation.file if you have over-riden the default name > for the config file.) `buildDefaultPool()` will only build the pool named > "default" with the default properties (such as schedulingMode = > DEFAULT_SCHEDULING_MODE -- i.e. SchedulingMode.FAIR) if that pool name is > not already built ( > https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala#L90 > ). > > > On Tue, Jan 5, 2016 at 4:15 PM, Jeff Zhang <zjf...@gmail.com> wrote: > >> Sorry, I don't make it clearly. What I want is the default pool is fair >> scheduling. But seems if I want to use fair scheduling now, I have to set >> spark.scheduler.pool explicitly. >> >> On Wed, Jan 6, 2016 at 2:03 AM, Mark Hamstra <m...@clearstorydata.com> >> wrote: >> >>> I don't understand. If you're using fair scheduling and don't set a >>> pool, the default pool will be used. >>> >>> On Tue, Jan 5, 2016 at 1:57 AM, Jeff Zhang <zjf...@gmail.com> wrote: >>> >>>> >>>> It seems currently spark.scheduler.pool must be set as localProperties >>>> (associate with thread). Any reason why spark.scheduler.pool can not be >>>> used globally. My scenario is that I want my thriftserver started with >>>> fair scheduler as the default pool without using set command to set the >>>> pool. Is there anyway to do that ? Or do I miss anything here ? >>>> >>>> -- >>>> Best Regards >>>> >>>> Jeff Zhang >>>> >>> >>> >> >> >> -- >> Best Regards >> >> Jeff Zhang >> > > -- Best Regards Jeff Zhang
Re: [discuss] dropping Python 2.6 support
>>>>>> On Tue, Jan 5, 2016 at 5:19 PM, Nicholas Chammas < >>>>>>>>>> nicholas.cham...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> As I pointed out in my earlier email, RHEL will support Python >>>>>>>>>>> 2.6 until 2020. So I'm assuming these large companies will have the >>>>>>>>>>> option >>>>>>>>>>> of riding out Python 2.6 until then. >>>>>>>>>>> >>>>>>>>>>> Are we seriously saying that Spark should likewise support >>>>>>>>>>> Python 2.6 for the next several years? Even though the core Python >>>>>>>>>>> devs >>>>>>>>>>> stopped supporting it in 2013? >>>>>>>>>>> >>>>>>>>>>> If that's not what we're suggesting, then when, roughly, can we >>>>>>>>>>> drop support? What are the criteria? >>>>>>>>>>> >>>>>>>>>>> I understand the practical concern here. If companies are stuck >>>>>>>>>>> using 2.6, it doesn't matter to them that it is deprecated. But >>>>>>>>>>> balancing >>>>>>>>>>> that concern against the maintenance burden on this project, I >>>>>>>>>>> would say >>>>>>>>>>> that "upgrade to Python 2.7 or stay on Spark 1.6.x" is a reasonable >>>>>>>>>>> position to take. There are many tiny annoyances one has to put up >>>>>>>>>>> with to >>>>>>>>>>> support 2.6. >>>>>>>>>>> >>>>>>>>>>> I suppose if our main PySpark contributors are fine putting up >>>>>>>>>>> with those annoyances, then maybe we don't need to drop support >>>>>>>>>>> just yet... >>>>>>>>>>> >>>>>>>>>>> Nick >>>>>>>>>>> 2016년 1월 5일 (화) 오후 2:27, Julio Antonio Soto de Vicente < >>>>>>>>>>> ju...@esbet.es>님이 작성: >>>>>>>>>>> >>>>>>>>>>>> Unfortunately, Koert is right. >>>>>>>>>>>> >>>>>>>>>>>> I've been in a couple of projects using Spark (banking >>>>>>>>>>>> industry) where CentOS + Python 2.6 is the toolbox available. >>>>>>>>>>>> >>>>>>>>>>>> That said, I believe it should not be a concern for Spark. >>>>>>>>>>>> Python 2.6 is old and busted, which is totally opposite to the >>>>>>>>>>>> Spark >>>>>>>>>>>> philosophy IMO. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> El 5 ene 2016, a las 20:07, Koert Kuipers <ko...@tresata.com> >>>>>>>>>>>> escribió: >>>>>>>>>>>> >>>>>>>>>>>> rhel/centos 6 ships with python 2.6, doesnt it? >>>>>>>>>>>> >>>>>>>>>>>> if so, i still know plenty of large companies where python 2.6 >>>>>>>>>>>> is the only option. asking them for python 2.7 is not going to work >>>>>>>>>>>> >>>>>>>>>>>> so i think its a bad idea >>>>>>>>>>>> >>>>>>>>>>>> On Tue, Jan 5, 2016 at 1:52 PM, Juliet Hougland < >>>>>>>>>>>> juliet.hougl...@gmail.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> I don't see a reason Spark 2.0 would need to support Python >>>>>>>>>>>>> 2.6. At this point, Python 3 should be the default that is >>>>>>>>>>>>> encouraged. >>>>>>>>>>>>> Most organizations acknowledge the 2.7 is common, but lagging >>>>>>>>>>>>> behind the version they should theoretically use. Dropping python >>>>>>>>>>>>> 2.6 >>>>>>>>>>>>> support sounds very reasonable to me. >>>>>>>>>>>>> >>>>>>>>>>>>> On Tue, Jan 5, 2016 at 5:45 AM, Nicholas Chammas < >>>>>>>>>>>>> nicholas.cham...@gmail.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> +1 >>>>>>>>>>>>>> >>>>>>>>>>>>>> Red Hat supports Python 2.6 on REHL 5 until 2020 >>>>>>>>>>>>>> <https://alexgaynor.net/2015/mar/30/red-hat-open-source-community/>, >>>>>>>>>>>>>> but otherwise yes, Python 2.6 is ancient history and the core >>>>>>>>>>>>>> Python >>>>>>>>>>>>>> developers stopped supporting it in 2013. REHL 5 is not a good >>>>>>>>>>>>>> enough >>>>>>>>>>>>>> reason to continue support for Python 2.6 IMO. >>>>>>>>>>>>>> >>>>>>>>>>>>>> We should aim to support Python 2.7 and Python 3.3+ (which I >>>>>>>>>>>>>> believe we currently do). >>>>>>>>>>>>>> >>>>>>>>>>>>>> Nick >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Tue, Jan 5, 2016 at 8:01 AM Allen Zhang < >>>>>>>>>>>>>> allenzhang...@126.com> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> plus 1, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> we are currently using python 2.7.2 in production >>>>>>>>>>>>>>> environment. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> 在 2016-01-05 18:11:45,"Meethu Mathew" < >>>>>>>>>>>>>>> meethu.mat...@flytxt.com> 写道: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> +1 >>>>>>>>>>>>>>> We use Python 2.7 >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Meethu Mathew >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Tue, Jan 5, 2016 at 12:47 PM, Reynold Xin < >>>>>>>>>>>>>>> r...@databricks.com> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Does anybody here care about us dropping support for Python >>>>>>>>>>>>>>>> 2.6 in Spark 2.0? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Python 2.6 is ancient, and is pretty slow in many aspects >>>>>>>>>>>>>>>> (e.g. json parsing) when compared with Python 2.7. Some >>>>>>>>>>>>>>>> libraries that >>>>>>>>>>>>>>>> Spark depend on stopped supporting 2.6. We can still convince >>>>>>>>>>>>>>>> the library >>>>>>>>>>>>>>>> maintainers to support 2.6, but it will be extra work. I'm >>>>>>>>>>>>>>>> curious if >>>>>>>>>>>>>>>> anybody still uses Python 2.6 to run Spark. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thanks. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>> >>>> >>> >>> >> > -- Best Regards Jeff Zhang
Re: Can spark.scheduler.pool be applied globally ?
Sorry, I don't make it clearly. What I want is the default pool is fair scheduling. But seems if I want to use fair scheduling now, I have to set spark.scheduler.pool explicitly. On Wed, Jan 6, 2016 at 2:03 AM, Mark Hamstra <m...@clearstorydata.com> wrote: > I don't understand. If you're using fair scheduling and don't set a pool, > the default pool will be used. > > On Tue, Jan 5, 2016 at 1:57 AM, Jeff Zhang <zjf...@gmail.com> wrote: > >> >> It seems currently spark.scheduler.pool must be set as localProperties >> (associate with thread). Any reason why spark.scheduler.pool can not be >> used globally. My scenario is that I want my thriftserver started with >> fair scheduler as the default pool without using set command to set the >> pool. Is there anyway to do that ? Or do I miss anything here ? >> >> -- >> Best Regards >> >> Jeff Zhang >> > > -- Best Regards Jeff Zhang
Re: sql:Exception in thread "main" scala.MatchError: StringType
Spark only support one json object per line. You need to reformat your file. On Mon, Jan 4, 2016 at 11:26 AM, Bonsen <hengbohe...@126.com> wrote: > (sbt) scala: > import org.apache.spark.SparkContext > import org.apache.spark.SparkConf > import org.apache.spark.sql > object SimpleApp { > def main(args: Array[String]) { > val conf = new SparkConf() > conf.setAppName("mytest").setMaster("spark://Master:7077") > val sc = new SparkContext(conf) > val sqlContext = new sql.SQLContext(sc) > val > > d=sqlContext.read.json("/home/hadoop/2015data_test/Data/Data/100808cb11e9898816ef15fcdde4e1d74cbc0b/Db6Jh2XeQ.json") > sc.stop() > } > } > > __ > after sbt package : > ./spark-submit --class "SimpleApp" > > /home/hadoop/Downloads/sbt/bin/target/scala-2.10/simple-project_2.10-1.0.jar > > ___ > json fIle: > { > "programmers": [ > { > "firstName": "Brett", > "lastName": "McLaughlin", > "email": "" > }, > { > "firstName": "Jason", > "lastName": "Hunter", > "email": "" > }, > { > "firstName": "Elliotte", > "lastName": "Harold", > "email": "" > } > ], > "authors": [ > { > "firstName": "Isaac", > "lastName": "Asimov", > "genre": "sciencefiction" > }, > { > "firstName": "Tad", > "lastName": "Williams", > "genre": "fantasy" > }, > { > "firstName": "Frank", > "lastName": "Peretti", > "genre": "christianfiction" > } > ], > "musicians": [ > { > "firstName": "Eric", > "lastName": "Clapton", > "instrument": "guitar" > }, > { > "firstName": "Sergei", > "lastName": "Rachmaninoff", > "instrument": "piano" > } > ] > } > > ___ > Exception in thread "main" scala.MatchError: StringType (of class > org.apache.spark.sql.types.StringType$) > at > org.apache.spark.sql.json.InferSchema$.apply(InferSchema.scala:58) > at > > org.apache.spark.sql.json.JSONRelation$$anonfun$schema$1.apply(JSONRelation.scala:139) > > ___ > why > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/sql-Exception-in-thread-main-scala-MatchError-StringType-tp25868.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > > -- Best Regards Jeff Zhang
Re: Cannot get repartitioning to work
You are using the wrong RDD, use the returned RDD as following. val repartitionedRDD = results.repartition(20) println(repartitionedRDD.partitions.size) On Sat, Jan 2, 2016 at 10:38 AM, jimitkr <ji...@softpath.net> wrote: > Hi, > > I'm trying to test some custom parallelism and repartitioning in spark. > > First, i reduce my RDD (forcing creation of 10 partitions for the same). > > I then repartition the data to 20 partitions and print out the number of > partitions, but i always get 10. Looks like the repartition command is > getting ignored. > > How do i get repartitioning to work? See code below: > > val > results=input.reduceByKey((x,y)=>x+y,10).persist(StorageLevel.DISK_ONLY) > results.repartition(20) > println(results.partitions.size) > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Cannot-get-repartitioning-to-work-tp25852.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > > -- Best Regards Jeff Zhang
Re: Is there anyway to log properties from a Spark application
set spark.logConf as true in spark-default.conf will log the property in driver side. But it would only log the property you set, not including the properties with default value. On Mon, Dec 28, 2015 at 8:18 PM, alvarobrandon <alvarobran...@gmail.com> wrote: > Hello: > > I was wondering if its possible to log properties from Spark Applications > like spark.yarn.am.memory, spark.driver.cores, > spark.reducer.maxSizeInFlight > without having to access the SparkConf object programmatically. I'm trying > to find some kind of log file that has traces of the execution of Spark > apps > and its parameters. > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Is-there-anyway-to-log-properties-from-a-Spark-application-tp25820.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > > -- Best Regards Jeff Zhang
Re: Is there anyway to log properties from a Spark application
If you run it as yarn-client mode, it will be client side log. If it is yarn-cluster mode, it will be logged in the AM container (the first container) On Mon, Dec 28, 2015 at 8:30 PM, Alvaro Brandon <alvarobran...@gmail.com> wrote: > Thanks for the swift response. > > I'm launching my applications through YARN. Where will these properties be > logged?. I guess they wont be part of YARN logs > > 2015-12-28 13:22 GMT+01:00 Jeff Zhang <zjf...@gmail.com>: > >> set spark.logConf as true in spark-default.conf will log the property in >> driver side. But it would only log the property you set, not including the >> properties with default value. >> >> >> On Mon, Dec 28, 2015 at 8:18 PM, alvarobrandon <alvarobran...@gmail.com> >> wrote: >> >>> Hello: >>> >>> I was wondering if its possible to log properties from Spark Applications >>> like spark.yarn.am.memory, spark.driver.cores, >>> spark.reducer.maxSizeInFlight >>> without having to access the SparkConf object programmatically. I'm >>> trying >>> to find some kind of log file that has traces of the execution of Spark >>> apps >>> and its parameters. >>> >>> >>> >>> -- >>> View this message in context: >>> http://apache-spark-user-list.1001560.n3.nabble.com/Is-there-anyway-to-log-properties-from-a-Spark-application-tp25820.html >>> Sent from the Apache Spark User List mailing list archive at Nabble.com. >>> >>> ----- >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>> For additional commands, e-mail: user-h...@spark.apache.org >>> >>> >> >> >> -- >> Best Regards >> >> Jeff Zhang >> > > -- Best Regards Jeff Zhang
Re: Passing parameters to spark SQL
You can do it using scala string interpolation http://docs.scala-lang.org/overviews/core/string-interpolation.html On Mon, Dec 28, 2015 at 5:11 AM, Ajaxx <ajack...@pobox.com> wrote: > Given a SQLContext (or HiveContext) is it possible to pass in parameters > to a > query. There are several reasons why this makes sense, including loss of > data type during conversion to string, SQL injection, etc. > > But currently, it appears that SQLContext.sql() only takes a single > parameter which is a string. > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Passing-parameters-to-spark-SQL-tp25806.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > > -- Best Regards Jeff Zhang
Re: Opening Dynamic Scaling Executors on Yarn
See http://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation On Mon, Dec 28, 2015 at 2:00 PM, 顾亮亮 <guliangli...@qiyi.com> wrote: > Hi all, > > > > SPARK-3174 (https://issues.apache.org/jira/browse/SPARK-3174) is a useful > feature to save resources on yarn. > > We want to open this feature on our yarn cluster. > > I have a question about the version of shuffle service. > > > > I’m now using spark-1.5.1 (shuffle service). > > If I want to upgrade to spark-1.6.0, should I replace the shuffle service > jar and restart all the namenode on yarn ? > > > > Thanks a lot. > > > > Mars > > > -- Best Regards Jeff Zhang
Re: Can anyone explain Spark behavior for below? Kudos in Advance
Not sure what you try to do, but the result is correct. Scenario 2: Partition 1 ("12", "23") ("","12") => "0" ("0","23") => "1" Partition 2 ("","345") ("","") => "0" ("0","345") => "1" Final merge: ("1","1") => "11" On Mon, Dec 28, 2015 at 7:14 AM, Prem Spark <sparksure...@gmail.com> wrote: > Scenario1: > val z = sc.parallelize(List("12","23","345",""),2) > z.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x > + y) > res143: String = 10 > > Scenario2: > val z = sc.parallelize(List("12","23","","345"),2) > z.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x > + y) > res144: String = 11 > > why the result is different . I was expecting 10 for both. also for the > first Partition > -- Best Regards Jeff Zhang
Re: should I file a bug? Re: trouble implementing Transformer and calling DataFrame.withColumn()
ing df.withColumn()"); > > transformerdDF.printSchema(); > > logger.info("show() after calling df.withColumn()"); > > transformerdDF.show(); > > > logger.info("END"); > > } > > > DataFrame createData() { > > Features f1 = new Features(1, category1); > > Features f2 = new Features(2, category2); > > ArrayList data = new ArrayList(2); > > data.add(f1); > > data.add(f2); > > //JavaRDD rdd = > javaSparkContext.parallelize(Arrays.asList(f1, f2)); // does not work > > JavaRDD rdd = javaSparkContext.parallelize(data); > > DataFrame df = sqlContext.createDataFrame(rdd, Features.class); > > return df; > > } > > > class MyUDF implements UDF1<String, String> { > > @Override > > public String call(String s) throws Exception { > > logger.info("AEDWIP s:{}", s); > > String ret = s.equalsIgnoreCase(category1) ? category1 : > category3; > > return ret; > > } > > } > > > public class Features implements Serializable{ > > private static final long serialVersionUID = 1L; > > int id; > > String labelStr; > > > Features(int id, String l) { > > this.id = id; > > this.labelStr = l; > > } > > > public int getId() { > > return id; > > } > > > public void setId(int id) { > > this.id = id; > > } > > > public String getLabelStr() { > > return labelStr; > > } > > > public void setLabelStr(String labelStr) { > > this.labelStr = labelStr; > > } > > } > > > > From: Andrew Davidson <a...@santacruzintegration.com> > Date: Monday, December 21, 2015 at 7:47 PM > To: Jeff Zhang <zjf...@gmail.com> > Cc: "user @spark" <user@spark.apache.org> > Subject: Re: trouble implementing Transformer and calling > DataFrame.withColumn() > > Hi Jeff > > I took a look at Tokenizer.cal, UnaryTransformer.scala, and > Transformer.scala. How ever I can not figure out how implement > createTransformFunc() > in Java 8. > > It would be nice to be able to use this transformer in my pipe line but > not required. The real problem is I can not figure out how to create a > Column I can pass to dataFrame.withColumn() in my Java code. Here is my > original python > > binomial = udf(lambda labelStr: labelStr if (labelStr == "noise") else > “signal", StringType()) > ret = dataFrame.withColumn(newColName, binomial(dataFrame["label"])) > > > Any suggestions would be greatly appreciated. > > Andy > > public class LabelToBinaryTransformer > > extends UnaryTransformer<String, String, > LabelToBinaryTransformer> { > > private static final long serialVersionUID = 4202800448830968904L; > > private final UUID uid = UUID.randomUUID(); > > > @Override > > public String uid() { > > return uid.toString(); > > } > > > @Override > > public Function1<String, String> createTransformFunc() { > > // original python code > > // binomial = udf(lambda labelStr: labelStr if (labelStr == "noise") else > “signal", StringType()) > > Function1 interface is not easy to implement lots of functions > > ??? > > } > > > @Override > > public DataType outputDataType() { > > StringType ret = new StringType(); > > return ret; > > } > > > > } > > > From: Jeff Zhang <zjf...@gmail.com> > Date: Monday, December 21, 2015 at 6:43 PM > To: Andrew Davidson <a...@santacruzintegration.com> > Cc: "user @spark" <user@spark.apache.org> > Subject: Re: trouble implementing Transformer and calling > DataFrame.withColumn() > > In your case, I would suggest you to extends UnaryTransformer which is > much easier. > > Yeah, I have to admit that there's no document about how to write a custom > Transformer, I think we need to add that, since writing custom Transformer > is a very typical work in machine learning. > > On Tue, Dec 22, 2015 at 9:54 AM, Andy Davidson < > a...@santacruzintegration.com> wrote: > >> >> I am trying to port the following python function to Java 8. I would like >> my java implementation to implement Transform
Re: Missing dependencies when submitting scala app
It might be jar conflict issue. Spark has dependency org.json4s.jackson, do you also specify org.json4s.jackson in your sbt dependency but with a different version ? On Wed, Dec 23, 2015 at 6:15 AM, Daniel Valdivia <h...@danielvaldivia.com> wrote: > Hi, > > I'm trying to figure out how to bundle dependendies with a scala > application, so far my code was tested successfully on the spark-shell > however now that I'm trying to run it as a stand alone application which > I'm compilin with sbt is yielding me the error: > > > *java.lang.NoSuchMethodError: > org.json4s.jackson.JsonMethods$.parse$default$3()Z at > ClusterIncidents$$anonfun$1.apply(ClusterInciden* > > I'm doing "sbt clean package" and then spark-submit of the resulting jar, > however seems like either my driver or workers don't have the json4s > dependency, therefor can't find the parse method > > Any idea on how to solve this depdendency problem? > > thanks in advance > -- Best Regards Jeff Zhang
Re: spark-submit for dependent jars
Put /test/target/spark16-0.0.1-SNAPSHOT.jar as the last argument ./spark-submit --master local --class test.Main --jars /home/user/download/jar/ojdbc7.jar /test/target/spark16-0.0.1-SNAPSHOT.jar On Mon, Dec 21, 2015 at 9:15 PM, Madabhattula Rajesh Kumar < mrajaf...@gmail.com> wrote: > Hi, > > How to add dependent jars in spark-submit command. For example: Oracle. > Could you please help me to resolve this issue > > I have a standalone cluster. One Master and One slave. > > I have used below command it is not working > > ./spark-submit --master local --class test.Main > /test/target/spark16-0.0.1-SNAPSHOT.jar --jars > /home/user/download/jar/ojdbc7.jar > > *I'm getting below exception :* > > Exception in thread "main" java.sql.SQLException: No suitable driver found > for jdbc:oracle:thin:@:1521:xxx > at java.sql.DriverManager.getConnection(DriverManager.java:596) > at java.sql.DriverManager.getConnection(DriverManager.java:187) > at > org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$$anonfun$getConnector$1.apply(JDBCRDD.scala:188) > at > org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$$anonfun$getConnector$1.apply(JDBCRDD.scala:181) > at > org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:121) > at > org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation.(JDBCRelation.scala:91) > at > org.apache.spark.sql.execution.datasources.jdbc.DefaultSource.createRelation(DefaultSource.scala:60) > at > org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:125) > at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:114) > at com.cisco.ss.etl.utils.ETLHelper$class.getData(ETLHelper.scala:22) > at com.cisco.ss.etl.Main$.getData(Main.scala:9) > at com.cisco.ss.etl.Main$delayedInit$body.apply(Main.scala:13) > at scala.Function0$class.apply$mcV$sp(Function0.scala:40) > at > scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) > at scala.App$$anonfun$main$1.apply(App.scala:71) > at scala.App$$anonfun$main$1.apply(App.scala:71) > at scala.collection.immutable.List.foreach(List.scala:318) > at > scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32) > at scala.App$class.main(App.scala:71) > at com.cisco.ss.etl.Main$.main(Main.scala:9) > at com.cisco.ss.etl.Main.main(Main.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672) > at > org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180) > at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205) > at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120) > at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) > > Regards, > Rajesh > -- Best Regards Jeff Zhang
Re: spark-submit for dependent jars
ion.(JDBCRelation.scala:91) >>> at >>> org.apache.spark.sql.execution.datasources.jdbc.DefaultSource.createRelation(DefaultSource.scala:60) >>> at >>> org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:125) >>> at >>> org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:114) >>> at com.cisco.ss.etl.utils.ETLHelper$class.getData(ETLHelper.scala:22) >>> at com.cisco.ss.etl.Main$.getData(Main.scala:9) >>> at com.cisco.ss.etl.Main$delayedInit$body.apply(Main.scala:13) >>> at scala.Function0$class.apply$mcV$sp(Function0.scala:40) >>> at >>> scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) >>> at scala.App$$anonfun$main$1.apply(App.scala:71) >>> at scala.App$$anonfun$main$1.apply(App.scala:71) >>> at scala.collection.immutable.List.foreach(List.scala:318) >>> at >>> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32) >>> at scala.App$class.main(App.scala:71) >>> at com.cisco.ss.etl.Main$.main(Main.scala:9) >>> at com.cisco.ss.etl.Main.main(Main.scala) >>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>> at >>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >>> at >>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> at java.lang.reflect.Method.invoke(Method.java:606) >>> at >>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672) >>> at >>> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180) >>> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205) >>> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120) >>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) >>> >>> Regards, >>> Rajesh >>> >> >> > -- Best Regards Jeff Zhang
Re: trouble implementing Transformer and calling DataFrame.withColumn()
In your case, I would suggest you to extends UnaryTransformer which is much easier. Yeah, I have to admit that there's no document about how to write a custom Transformer, I think we need to add that, since writing custom Transformer is a very typical work in machine learning. On Tue, Dec 22, 2015 at 9:54 AM, Andy Davidson < a...@santacruzintegration.com> wrote: > > I am trying to port the following python function to Java 8. I would like > my java implementation to implement Transformer so I can use it in a > pipeline. > > I am having a heck of a time trying to figure out how to create a Column > variable I can pass to DataFrame.withColumn(). As far as I know > withColumn() the only way to append a column to a data frame. > > Any comments or suggestions would be greatly appreciated > > Andy > > > def convertMultinomialLabelToBinary(dataFrame): > newColName = "binomialLabel" > binomial = udf(lambda labelStr: labelStr if (labelStr == "noise") else > “signal", StringType()) > ret = dataFrame.withColumn(newColName, binomial(dataFrame["label"])) > return ret > trainingDF2 = convertMultinomialLabelToBinary(trainingDF1) > > > > public class LabelToBinaryTransformer extends Transformer { > > private static final long serialVersionUID = 4202800448830968904L; > > private final UUID uid = UUID.randomUUID(); > > public String inputCol; > > public String outputCol; > > > > @Override > > public String uid() { > > return uid.toString(); > > } > > > @Override > > public Transformer copy(ParamMap pm) { > > Params xx = defaultCopy(pm); > > return ???; > > } > > > @Override > > public DataFrame transform(DataFrame df) { > > MyUDF myUDF = new MyUDF(myUDF, null, null); > > Column c = df.col(inputCol); > > ??? UDF apply does not take a col > > Column col = myUDF.apply(df.col(inputCol)); > > DataFrame ret = df.withColumn(outputCol, col); > > return ret; > > } > > > @Override > > public StructType transformSchema(StructType arg0) { > >*??? What is this function supposed to do???* > > ???Is this the type of the new output column > > } > > > > class MyUDF extends UserDefinedFunction { > > public MyUDF(Object f, DataType dataType, Seq inputTypes) > { > > super(f, dataType, inputTypes); > > ??? Why do I have to implement this constructor ??? > > ??? What are the arguments ??? > > } > > > > @Override > > public > > Column apply(scala.collection.Seq exprs) { > > What do you do with a scala seq? > > return ???; > > } > > } > > } > > > -- Best Regards Jeff Zhang
Re: get parameters of spark-submit
don't understand your question. These parameter are passed to your program as args of the main function. On Mon, Dec 21, 2015 at 9:09 PM, Bonsen <hengbohe...@126.com> wrote: > 1.I code my scala class and pack.(not input the hdfs files' paths,just use > the paths from "spark-submit"'s parameters) > 2.Then,If I input like this: > ${SPARK_HOME/bin}/spark-submit \ > --master \ > \ > hdfs:// \ > hdfs:// \ > > what should I do to get the two hdfs files' paths in my scala class's > code(before pack the jar file)? > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/get-parameters-of-spark-submit-tp25749.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > > -- Best Regards Jeff Zhang
Re: DataFrame operations
If it does not return a column you expect, then what does this return ? Do you will have 2 columns with the same column name ? On Sun, Dec 20, 2015 at 7:40 PM, Eran Witkon <eranwit...@gmail.com> wrote: > Hi, > > I am a bit confused with dataframe operations. > I have a function which takes a string and returns a string > I want to apply this functions on all rows on a single column in my > dataframe > > I was thinking of the following: > jsonData.withColumn("computedField",computeString(jsonData("hse"))) > > BUT jsonData("hse") return a column not the row data > What am I missing here? > -- Best Regards Jeff Zhang
Re: Spark batch getting hung up
>>> Would the driver not wait till all the stuff related to test1 is completed before calling test2 as test2 is dependent on test1? >>> val test1 =RDD1.mapPartitions.() >>> val test2 = test1.mapPartititions() On the driver side, actually these 2 lines of code will be executed but the real computation on the executor won't get executed until you call action on the rdd (I suppose you call that after these code) You might need to check on the executor side why the hang issue happens. On Mon, Dec 21, 2015 at 11:04 AM, swetha kasireddy < swethakasire...@gmail.com> wrote: > I see this happens when there is a deadlock situation. The RDD test1 has a > Couchbase call and it seems to be having threads hanging there. Eventhough > all the connections are closed I see the threads related to Couchbase > causing the job to hang for sometime before it gets cleared up. > > Would the driver not wait till all the stuff related to test1 is completed > before calling test2 as test2 is dependent on test1? > > val test1 =RDD1.mapPartitions.() > > val test2 = test1.mapPartititions() > > On Sat, Dec 19, 2015 at 12:24 AM, Jeff Zhang <zjf...@gmail.com> wrote: > >> First you need to know where the hang happens (driver or executor), >> checking log would be helpful >> >> On Sat, Dec 19, 2015 at 12:25 AM, SRK <swethakasire...@gmail.com> wrote: >> >>> Hi, >>> >>> My Spark Batch job seems to hung up sometimes for a long time before it >>> starts the next stage/exits. Basically it happens when it has >>> mapPartition/foreachPartition in a stage. Any idea as to why this is >>> happening? >>> >>> Thanks, >>> Swetha >>> >>> >>> >>> -- >>> View this message in context: >>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-batch-getting-hung-up-tp25735.html >>> Sent from the Apache Spark User List mailing list archive at Nabble.com. >>> >>> ----- >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>> For additional commands, e-mail: user-h...@spark.apache.org >>> >>> >> >> >> -- >> Best Regards >> >> Jeff Zhang >> > > -- Best Regards Jeff Zhang
Re: Spark batch getting hung up
First you need to know where the hang happens (driver or executor), checking log would be helpful On Sat, Dec 19, 2015 at 12:25 AM, SRK <swethakasire...@gmail.com> wrote: > Hi, > > My Spark Batch job seems to hung up sometimes for a long time before it > starts the next stage/exits. Basically it happens when it has > mapPartition/foreachPartition in a stage. Any idea as to why this is > happening? > > Thanks, > Swetha > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Spark-batch-getting-hung-up-tp25735.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > > -- Best Regards Jeff Zhang
Re: Dynamic jar loading
Actually I would say yes and no. Yes means the jar will be fetched by executor and added to classpath, No means it would not be added to classpath of driver. That means you can not invoke the class in the jar explicitly. But you can call them indirectly. like following (or if the jar is only dependency, won't be called directly ) >>> rdd.map(e=>{Class.forName("com.zjffdu.tutorial.spark.java.MyStack"); e}).collect() On Sat, Dec 19, 2015 at 5:47 AM, Jim Lohse <j...@megalearningllc.com> wrote: > I am going to say no, but have not actually tested this. Just going on > this line in the docs: > > http://spark.apache.org/docs/latest/configuration.html > > spark.driver.extraClassPath (none) Extra classpath entries to prepend to > the classpath of the driver. > *Note:* In client mode, this config must not be set through the SparkConf > directly in your application, because the driver JVM has already started at > that point. Instead, please set this through the --driver-class-path > command line option or in your default properties file. > > > > On 12/17/2015 07:53 AM, amarouni wrote: > > Hello guys, > > Do you know if the method SparkContext.addJar("file:///...") can be used > on a running context (an already started spark-shell) ? > And if so, does it add the jar to the class-path of the Spark workers > (Yarn containers in case of yarn-client) ? > > Thanks, > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > > > -- Best Regards Jeff Zhang
Re: Base ERROR
I believe this is hbase issue, you'd better to ask on hbase mail list. On Fri, Dec 18, 2015 at 9:57 AM, censj <ce...@lotuseed.com> wrote: > hi,all: > I wirte data to hbase,but Hbase arise this ERROR,Could you help me? > > > r.KeeperException$SessionExpiredException: KeeperErrorCode = Session > expired for /hbase-unsecure/rs/byd0157,16020,1449106975377 > 2015-12-17 21:24:29,854 WARN [regionserver/byd0157/192.168.0.157:16020] > zookeeper.RecoverableZooKeeper: Possibly transient ZooKeeper, > quorum=byd0151:2181,byd0150:2181,byd0152:2181, > exception=org.apache.zookeeper.KeeperException$SessionExpiredException: > KeeperErrorCode = Session expired for > /hbase-unsecure/rs/byd0157,16020,1449106975377 > 2015-12-17 21:24:29,854 ERROR [regionserver/byd0157/192.168.0.157:16020] > zookeeper.RecoverableZooKeeper: ZooKeeper delete failed after 4 attempts > 2015-12-17 21:24:29,854 WARN [regionserver/byd0157/192.168.0.157:16020] > regionserver.HRegionServer: Failed deleting my ephemeral node > org.apache.zookeeper.KeeperException$SessionExpiredException: > KeeperErrorCode = Session expired for > /hbase-unsecure/rs/byd0157,16020,1449106975377 >at > org.apache.zookeeper.KeeperException.create(KeeperException.java:127) >at > org.apache.zookeeper.KeeperException.create(KeeperException.java:51) >at org.apache.zookeeper.ZooKeeper.delete(ZooKeeper.java:873) >at > org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper.delete(RecoverableZooKeeper.java:179) >at > org.apache.hadoop.hbase.zookeeper.ZKUtil.deleteNode(ZKUtil.java:1345) >at > org.apache.hadoop.hbase.zookeeper.ZKUtil.deleteNode(ZKUtil.java:1334) >at > org.apache.hadoop.hbase.regionserver.HRegionServer.deleteMyEphemeralNode(HRegionServer.java:1393) >at > org.apache.hadoop.hbase.regionserver.HRegionServer.run(HRegionServer.java:1076) >at java.lang.Thread.run(Thread.java:745) > 2015-12-17 21:24:29,855 INFO [regionserver/byd0157/192.168.0.157:16020] > regionserver.HRegionServer: stopping server byd0157,16020,1449106975377; > zookeeper connection closed. > 2015-12-17 21:24:29,855 INFO [regionserver/byd0157/192.168.0.157:16020] > regionserver.HRegionServer: regionserver/byd0157/192.168.0.157:16020 > exiting > 2015-12-17 21:24:29,858 ERROR [main] > regionserver.HRegionServerCommandLine: Region server exiting > java.lang.RuntimeException: HRegionServer Aborted >at > org.apache.hadoop.hbase.regionserver.HRegionServerCommandLine.start(HRegionServerCommandLine.java:68) >at > org.apache.hadoop.hbase.regionserver.HRegionServerCommandLine.run(HRegionServerCommandLine.java:87) >at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70) >at > org.apache.hadoop.hbase.util.ServerCommandLine.doMain(ServerCommandLine.java:126) >at > org.apache.hadoop.hbase.regionserver.HRegionServer.main(HRegionServer.java:2641) > 2015-12-17 21:24:29,940 INFO [Thread-6] regionserver.ShutdownHook: > Shutdown hook starting; hbase.shutdown.hook=true; > fsShutdownHook=org.apache.hadoop.fs.FileSystem$Cache$ClientFinalizer@6de54b40 > 2015-12-17 21:24:29,942 INFO [Thread-6] regionserver.ShutdownHook: > Starting fs shutdown hook thread. > 2015-12-17 21:24:29,953 INFO [Thread-6] regionserver.ShutdownHook: > Shutdown hook finished. > > > -- Best Regards Jeff Zhang
Re: Access row column by field name
use Row.getAs[String](fieldname) On Thu, Dec 17, 2015 at 10:58 AM, Daniel Valdivia <h...@danielvaldivia.com> wrote: > Hi, > > I'm processing the json I have in a text file using DataFrames, however > right now I'm trying to figure out a way to access a certain value within > the rows of my data frame if I only know the field name and not the > respective field position in the schema. > > I noticed that row.schema and row.dtypes give me information about the > auto-generate schema, but I cannot see a straigh forward patch for this, > I'm trying to create a PairRdd out of this > > Is there any easy way to figure out the field position by it's field name > (the key it had in the json)? > > so this > > val sqlContext = new SQLContext(sc) > val rawIncRdd = sc.textFile(" > hdfs://1.2.3.4:8020/user/hadoop/incidents/unstructured/inc-0-500.txt") > val df = sqlContext.jsonRDD(rawIncRdd) > df.foreach(line => println(line.getString(0))) > > > would turn into something like this > > val sqlContext = new SQLContext(sc) > val rawIncRdd = sc.textFile(" > hdfs://1.2.3.4:8020/user/hadoop/incidents/unstructured/inc-0-500.txt") > val df = sqlContext.jsonRDD(rawIncRdd) > df.foreach(line => println(line.getString(*"field_name"*))) > > thanks for the advice > -- Best Regards Jeff Zhang
Re: hiveContext: storing lookup of partitions
oh, you are using S3. As I remember, S3 has performance issue when processing large amount of files. On Wed, Dec 16, 2015 at 7:58 PM, Gourav Sengupta <gourav.sengu...@gmail.com> wrote: > The HIVE table has very large number of partitions around 365 * 5 * 10 and > when I say hivemetastore to start running queries on it (the one with > .count() or .show()) then it takes around 2 hours before the job starts in > SPARK. > > On the pyspark screen I can see that it is parsing the S3 locations for > these 2 hours. > > Regards, > Gourav > > On Wed, Dec 16, 2015 at 3:38 AM, Jeff Zhang <zjf...@gmail.com> wrote: > >> >>> Currently it takes around 1.5 hours for me just to cache in the >> partition information and after that I can see that the job gets queued in >> the SPARK UI. >> I guess you mean the stage of getting the split info. I suspect it might >> be your cluster issue (or metadata store), unusually it won't take such >> long time for splitting. >> >> On Wed, Dec 16, 2015 at 8:06 AM, Gourav Sengupta < >> gourav.sengu...@gmail.com> wrote: >> >>> Hi, >>> >>> I have a HIVE table with few thousand partitions (based on date and >>> time). It takes a long time to run if for the first time and then >>> subsequently it is fast. >>> >>> Is there a way to store the cache of partition lookups so that every >>> time I start a new SPARK instance (cannot keep my personal server running >>> continuously), I can immediately restore back the temptable in hiveContext >>> without asking it go again and cache the partition lookups? >>> >>> Currently it takes around 1.5 hours for me just to cache in the >>> partition information and after that I can see that the job gets queued in >>> the SPARK UI. >>> >>> Regards, >>> Gourav >>> >> >> >> >> -- >> Best Regards >> >> Jeff Zhang >> > > -- Best Regards Jeff Zhang
Re: YarnScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
>>> *15/12/16 10:22:01 WARN cluster.YarnScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources* That means you don't have resources for your application, please check your hadoop web ui. On Wed, Dec 16, 2015 at 10:32 AM, zml张明磊 <mingleizh...@ctrip.com> wrote: > Yesterday night, I run the jar on my pseudo-distributed mode without WARN > and ERROR. However, Today, Getting the WARN and directly leading to the > ERROR below. My computer memory is 8GB and I think it’s not the issue as > the LOG WARN describe. What ‘s wrong ? The code haven’t change yet. And the > environment haven’t change too. So Strange. Can anybody help me ? Why ……. > > > > Thanks. > > Minglei. > > > > Here is the submit job script > > > > /bin/spark-submit --master local[*] --driver-memory 8g --executor-memory > 8g --class com.ctrip.ml.client.Client > /root/di-ml-tool/target/di-ml-tool-1.0-SNAPSHOT.jar > > > > Error below > > *15/12/16 10:22:01 WARN cluster.YarnScheduler: Initial job has not > accepted any resources; check your cluster UI to ensure that workers are > registered and have sufficient resources* > > 15/12/16 10:22:04 WARN cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: > ApplicationMaster has disassociated: 10.32.3.21:48311 > > 15/12/16 10:22:04 WARN cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: > ApplicationMaster has disassociated: 10.32.3.21:48311 > > 15/12/16 10:22:04 WARN remote.ReliableDeliverySupervisor: Association with > remote system [akka.tcp://sparkYarnAM@10.32.3.21:48311] has failed, > address is now gated for [5000] ms. Reason is: [Disassociated]. > > *15/12/16 10:22:04 ERROR cluster.YarnClientSchedulerBackend: Yarn > application has already exited with state FINISHED!* > > > > Exception in thread "main" 15/12/16 10:22:04 INFO > cluster.YarnClientSchedulerBackend: Shutting down all executors > > Exception in thread "Yarn application state monitor" > org.apache.spark.SparkException: Error asking standalone scheduler to shut > down executors > > at > org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.stopExecutors(CoarseGrainedSchedulerBackend.scala:261) > > at > org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.stop(CoarseGrainedSchedulerBackend.scala:266) > > at > org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.stop(YarnClientSchedulerBackend.scala:158) > > at > org.apache.spark.scheduler.TaskSchedulerImpl.stop(TaskSchedulerImpl.scala:416) > > at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1411) > > at org.apache.spark.SparkContext.stop(SparkContext.scala:1644) > > at > org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend$$anon$1.run(YarnClientSchedulerBackend.scala:139) > > Caused by: java.lang.InterruptedException > > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1325) > > at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:208) > > at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218) > > at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) > > at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) > > at > scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) > > at scala.concurrent.Await$.result(package.scala:107) > > at > org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102) > > at > org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:78) > > at > org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.stopExecutors(CoarseGrainedSchedulerBackend.scala:257) > > > -- Best Regards Jeff Zhang
Re: Can't create UDF through thriftserver, no error reported
It should be resolved by this ticket https://issues.apache.org/jira/browse/SPARK-11191 On Wed, Dec 16, 2015 at 3:14 AM, Antonio Piccolboni <anto...@piccolboni.info > wrote: > Hi, > I am trying to create a UDF using the thiftserver. I followed this example > <https://gist.github.com/airawat/7461612>, which is originally for hive. > My understanding is that the thriftserver creates a hivecontext and Hive > UDFs should be supported. I then sent this query to the thriftserver (I use > the RJDBC module for R but I doubt any other JDBC client would be any > different): > > > CREATE TEMPORARY FUNCTION NVL2 AS 'khanolkar.HiveUDFs.NVL2GenericUDF' > > I only changed some name wrt the posted examples, but I think the class > was found just right because 1)There's no errors in the log or console 2)I > can generate a class not found error mistyping the class name, and I see it > in the logs 3) I can use the reflect builtin to invoke a different function > that I wrote and supplied to spark in the same way (--jars option to > start-thriftserver) > > After this, I can't use the NVL2 function in a query and I can't even do a > DESCRIBE query on it, nor does it list with SHOW FUNCTIONS. I tried both > 1.5.1 and 1.6.0-rc2 built with thriftserver support for Hadoop 2.6 > > I know the HiveContext is slightly behind the latest Hive as far as > features, I believe one or two revs, so that may be one potential problem, > but all these feature I believe are present in Hive 0.11 and should have > made it into Spark. At the very least, I would like to see some message in > the logs and console so that I can find the error of my ways, repent and > fix my code. Any suggestions? Anything I should post to support > troubleshooting? Is this JIRA-worthy? Thanks > > Antonio > > > > -- Best Regards Jeff Zhang
Re: hiveContext: storing lookup of partitions
>>> Currently it takes around 1.5 hours for me just to cache in the partition information and after that I can see that the job gets queued in the SPARK UI. I guess you mean the stage of getting the split info. I suspect it might be your cluster issue (or metadata store), unusually it won't take such long time for splitting. On Wed, Dec 16, 2015 at 8:06 AM, Gourav Sengupta <gourav.sengu...@gmail.com> wrote: > Hi, > > I have a HIVE table with few thousand partitions (based on date and time). > It takes a long time to run if for the first time and then subsequently it > is fast. > > Is there a way to store the cache of partition lookups so that every time > I start a new SPARK instance (cannot keep my personal server running > continuously), I can immediately restore back the temptable in hiveContext > without asking it go again and cache the partition lookups? > > Currently it takes around 1.5 hours for me just to cache in the partition > information and after that I can see that the job gets queued in the SPARK > UI. > > Regards, > Gourav > -- Best Regards Jeff Zhang
Re: [SparkR] Is rdd in SparkR deprecated ?
Thanks Felix, Just curious when I read the code. On Tue, Dec 15, 2015 at 1:32 AM, Felix Cheung <felixcheun...@hotmail.com> wrote: > RDD API in SparkR is not officially supported. You could still access them > with the SparkR::: prefix though. > > May I ask what uses you have for them? Would the DataFrame API sufficient? > > > > > > On Mon, Dec 14, 2015 at 4:26 AM -0800, "Jeff Zhang" <zjf...@gmail.com> > wrote: > > From the source code of SparkR, seems SparkR support rdd api. But there's > no documentation on that. ( > http://spark.apache.org/docs/latest/sparkr.html ) So I guess it is > deprecated, is that right ? > > -- > Best Regards > > Jeff Zhang > -- Best Regards Jeff Zhang
[SparkR] Is rdd in SparkR deprecated ?
>From the source code of SparkR, seems SparkR support rdd api. But there's no documentation on that. ( http://spark.apache.org/docs/latest/sparkr.html ) So I guess it is deprecated, is that right ? -- Best Regards Jeff Zhang
Re: how to make a dataframe of Array[Doubles] ?
Please use tuple instead of array. ( the element must implement trait Product if you want to convert RDD to DF) val testvec = Array( (1.0, 2.0, 3.0, 4.0), (5.0, 6.0, 7.0, 8.0)) On Tue, Dec 15, 2015 at 1:12 PM, AlexG <swift...@gmail.com> wrote: > My attempts to create a dataframe of Array[Doubles], I get an error about > RDD[Array[Double]] not having a toDF function: > > import sqlContext.implicits._ > val testvec = Array( Array(1.0, 2.0, 3.0, 4.0), Array(5.0, 6.0, 7.0, 8.0)) > val testrdd = sc.parallelize(testvec) > testrdd.toDF > > gives > > :29: error: value toDF is not a member of > org.apache.spark.rdd.RDD[Array[Double]] > testrdd.toD > > on the other hand, if I make the dataframe more complicated, e.g. > Tuple2[String, Array[Double]], the transformation goes through: > > val testvec = Array( ("row 1", Array(1.0, 2.0, 3.0, 4.0)), ("row 2", > Array(5.0, 6.0, 7.0, 8.0)) ) > val testrdd = sc.parallelize(testvec) > testrdd.toDF > > gives > testrdd: org.apache.spark.rdd.RDD[(String, Array[Double])] = > ParallelCollectionRDD[1] at parallelize at :29 > res3: org.apache.spark.sql.DataFrame = [_1: string, _2: array] > > What's the cause of this, and how can I get around it to create a dataframe > of Array[Double]? My end goal is to store that dataframe in Parquet (yes, I > do want to store all the values in a single column, not individual columns) > > I am using Spark 1.5.2 > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/how-to-make-a-dataframe-of-Array-Doubles-tp25704.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > > -- Best Regards Jeff Zhang
Re: Database does not exist: (Spark-SQL ===> Hive)
Do you put hive-site.xml on the classpath ? On Tue, Dec 15, 2015 at 11:14 AM, Gokula Krishnan D <email2...@gmail.com> wrote: > Hello All - > > > I tried to execute a Spark-Scala Program in order to create a table in > HIVE and faced couple of error so I just tried to execute the "show tables" > and "show databases" > > And I have already created a database named "test_db".But I have > encountered the error "Database does not exist" > > *Note: I do see couple of posts related to this error but nothing was > helpful for me.* > > > = > name := "ExploreSBT_V1" > > version := "1.0" > > scalaVersion := "2.11.5" > > libraryDependencies > ++=Seq("org.apache.spark"%%"spark-core"%"1.3.0","org.apache.spark"%%"spark-sql"%"1.3.0") > libraryDependencies += "org.apache.spark"%%"spark-hive"%"1.3.0" > > = > [image: Inline image 1] > > Error: Encountered the following exceptions > :org.apache.spark.sql.execution.QueryExecutionException: FAILED: Execution > Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. Database > does not exist: test_db > 15/12/14 18:49:57 ERROR HiveContext: > == > HIVE FAILURE OUTPUT > == > > > > >OK > FAILED: Execution Error, return code 1 from > org.apache.hadoop.hive.ql.exec.DDLTask. Database does not exist: test_db > > == > END HIVE FAILURE OUTPUT > == > > > Process finished with exit code 0 > > Thanks & Regards, > Gokula Krishnan* (Gokul)* > -- Best Regards Jeff Zhang
Re: [SparkR] Any reason why saveDF's mode is append by default ?
Thanks Shivaram, created https://issues.apache.org/jira/browse/SPARK-12318 I will work on it. On Mon, Dec 14, 2015 at 4:13 PM, Shivaram Venkataraman < shiva...@eecs.berkeley.edu> wrote: > I think its just a bug -- I think we originally followed the Python > API (in the original PR [1]) but the Python API seems to have been > changed to match Scala / Java in > https://issues.apache.org/jira/browse/SPARK-6366 > > Feel free to open a JIRA / PR for this. > > Thanks > Shivaram > > [1] https://github.com/amplab-extras/SparkR-pkg/pull/199/files > > On Sun, Dec 13, 2015 at 11:58 PM, Jeff Zhang <zjf...@gmail.com> wrote: > > It is inconsistent with scala api which is error by default. Any reason > for > > that ? Thanks > > > > > > > > -- > > Best Regards > > > > Jeff Zhang > -- Best Regards Jeff Zhang
[SparkR] Any reason why saveDF's mode is append by default ?
It is inconsistent with scala api which is error by default. Any reason for that ? Thanks -- Best Regards Jeff Zhang
Re: Spark assembly in Maven repo?
I don't think make the assembly jar as dependency a good practice. You may meet jar hell issue in that case. On Fri, Dec 11, 2015 at 2:46 PM, Xiaoyong Zhu <xiaoy...@microsoft.com> wrote: > Hi Experts, > > > > We have a project which has a dependency for the following jar > > > > spark-assembly--hadoop.jar > > for example: > > spark-assembly-1.4.1.2.3.3.0-2983-hadoop2.7.1.2.3.3.0-2983.jar > > > > since this assembly might be updated in the future, I am not sure if there > is a Maven repo that has the above spark assembly jar? Or should we create > & upload it to Maven central? > > > > Thanks! > > > > Xiaoyong > > > -- Best Regards Jeff Zhang
Re: Can't create UDF's in spark 1.5 while running using the hive thrift service
leAdapter.call(Executors.java:511) >> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >> at java.lang.Thread.run(Thread.java:745) >> >> >> >> When I ran the same against 1.4 it worked. >> >> I've also changed the spark.sql.hive.metastore.version version to be 0.13 >> (similar to what it was in 1.4) and 0.14 but I still get the same errors. >> >> >> Any suggestions? >> >> Thanks, >> Trystan >> >> > -- Best Regards Jeff Zhang
Re: sparkSQL Load multiple tables
Do you want to load multiple tables by using sql ? JdbcRelation now only can load single table. It doesn't accept sql as loading command. On Wed, Dec 2, 2015 at 4:33 PM, censj <ce...@lotuseed.com> wrote: > hi Fengdong Yu: > I want to use sqlContext.read.format('jdbc').options( ... ).load() > but this function only load a table so i want to know through some > operations load multiple tables? > > > 在 2015年12月2日,16:28,Fengdong Yu <fengdo...@everstring.com> 写道: > > It cannot read multiple tables, > > but if your tables have the same columns, you can read them one by one, > then unionAll them, such as: > > val df1 = sqlContext.table(“table1”) > val df2 = sqlContext.table(“table2”) > > val df = df1.unionAll(df2) > > > > > > On Dec 2, 2015, at 4:06 PM, censj <ce...@lotuseed.com> wrote: > > Dear all, > Can you tell me how did get past SQLContext load function read multiple > tables? > > > > -- Best Regards Jeff Zhang
Re: how to skip headers when reading multiple files
Are you read csv file ? If so you can use spark-csv which support skip header http://spark-packages.org/package/databricks/spark-csv On Thu, Dec 3, 2015 at 10:52 AM, Divya Gehlot <divya.htco...@gmail.com> wrote: > Hi, > I am new bee to Spark and Scala . > As one of my requirement to read and process multiple text files with > headers using DataFrame API . > How can I skip headers when processing data with DataFrame API > > Thanks in advance . > Regards, > Divya > > -- Best Regards Jeff Zhang
Re: SparkSQL API to insert DataFrame into a static partition?
I don't think there's api for that, but think it is reasonable and helpful for ETL. As a workaround you can first register your dataframe as temp table, and use sql to insert to the static partition. On Wed, Dec 2, 2015 at 10:50 AM, Isabelle Phan <nlip...@gmail.com> wrote: > Hello, > > Is there any API to insert data into a single partition of a table? > > Let's say I have a table with 2 columns (col_a, col_b) and a partition by > date. > After doing some computation for a specific date, I have a DataFrame with > 2 columns (col_a, col_b) which I would like to insert into a specific date > partition. What is the best way to achieve this? > > It seems that if I add a date column to my DataFrame, and turn on dynamic > partitioning, I can do: > df.write.partitionBy("date").insertInto("my_table") > But it seems overkill to use dynamic partitioning function for such a case. > > > Thanks for any pointers! > > Isabelle > > > -- Best Regards Jeff Zhang
No documentation for how to write custom Transformer in ml pipeline ?
Although writing a custom UnaryTransformer is not difficult, but writing a non-UnaryTransformer is a little tricky (have to check the source code). And I don't find any document about how to write custom Transformer in ml pipeline, but writing custom Transformer is a very basic requirement. Is this because the interface is still unstable now ? -- Best Regards Jeff Zhang
Re: General question on using StringIndexer in SparkML
StringIndexer is an estimator which would train a model to be used both in training & prediction. So it is consistent between training & prediction. You may want to read this section of spark ml doc http://spark.apache.org/docs/latest/ml-guide.html#how-it-works On Mon, Nov 30, 2015 at 12:52 AM, Vishnu Viswanath < vishnu.viswanat...@gmail.com> wrote: > Thanks for the reply Yanbo. > > I understand that the model will be trained using the indexer map created > during the training stage. > > But since I am getting a new set of data during prediction, and I have to > do StringIndexing on the new data also, > Right now I am using a new StringIndexer for this purpose, or is there any > way that I can reuse the Indexer used for training stage. > > Note: I am having a pipeline with StringIndexer in it, and I am fitting my > train data in it and building the model. Then later when i get the new data > for prediction, I am using the same pipeline to fit the data again and do > the prediction. > > Thanks and Regards, > Vishnu Viswanath > > > On Sun, Nov 29, 2015 at 8:14 AM, Yanbo Liang <yblia...@gmail.com> wrote: > >> Hi Vishnu, >> >> The string and indexer map is generated at model training step and >> used at model prediction step. >> It means that the string and indexer map will not changed when >> prediction. You will use the original trained model when you do >> prediction. >> >> 2015-11-29 4:33 GMT+08:00 Vishnu Viswanath <vishnu.viswanat...@gmail.com >> >: >> > Hi All, >> > >> > I have a general question on using StringIndexer. >> > StringIndexer gives an index to each label in the feature starting from >> 0 ( >> > 0 for least frequent word). >> > >> > Suppose I am building a model, and I use StringIndexer for transforming >> on >> > of my column. >> > e.g., suppose A was most frequent word followed by B and C. >> > >> > So the StringIndexer will generate >> > >> > A 0.0 >> > B 1.0 >> > C 2.0 >> > >> > After building the model, I am going to do some prediction using this >> model, >> > So I do the same transformation on my new data which I need to predict. >> And >> > suppose the new dataset has C as the most frequent word, followed by B >> and >> > A. So the StringIndexer will assign index as >> > >> > C 0.0 >> > B 1.0 >> > A 2.0 >> > >> > These indexes are different from what we used for modeling. So won’t >> this >> > give me a wrong prediction if I use StringIndexer? >> > >> > -- >> > Thanks and Regards, >> > Vishnu Viswanath, >> > www.vishnuviswanath.com >> > > > > -- > Thanks and Regards, > Vishnu Viswanath, > *www.vishnuviswanath.com <http://www.vishnuviswanath.com>* > -- Best Regards Jeff Zhang
Re: Millions of entities in custom Hadoop InputFormat and broadcast variable
Where do you load all IDs of your dataset ? In your custom InputFormat#getSplits ? getSplits will be invoked in driver side to build the Partition which will be serialized to executor as part of the task. Do you put all the ids in the InputSplit ? That would make it pretty large. In your case, I think you can load the ids directly rather than creating custom Hadoop InputFormat. e.g. sc.textFile(id_file, 100).map(load data using the id) Please make sure use a high partition number ( I use 100 here) in sc.textFile to get high parallelism. On Fri, Nov 27, 2015 at 2:06 PM, Anfernee Xu <anfernee...@gmail.com> wrote: > Hi Spark experts, > > First of all, happy Thanksgiving! > > The comes to my question, I have implemented custom Hadoop InputFormat to > load millions of entities from my data source to Spark(as JavaRDD and > transform to DataFrame). The approach I took in implementing the custom > Hadoop RDD is loading all ID's of my data entity(each entity has an unique > ID: Long) and split the ID list(contains 3 millions of Long number for > example) into configured splits, each split contains a sub-set of ID's, in > turn my custom RecordReader will load the full entity(a plain Java Bean) > from my data source for each ID in the specific split. > > My first observation is some Spark tasks were timeout, and looks like > Spark broadcast variable is being used to distribute my splits, is that > correct? If so, from performance perspective, what enhancement I can make > to make it better? > > Thanks > > -- > --Anfernee > -- Best Regards Jeff Zhang
Re: Stop Spark yarn-client job
Could you attach the yarn AM log ? On Fri, Nov 27, 2015 at 8:10 AM, Jagat Singh <jagatsi...@gmail.com> wrote: > Hi, > > What is the correct way to stop fully the Spark job which is running as > yarn-client using spark-submit. > > We are using sc.stop in the code and can see the job still running (in > yarn resource manager) after final hive insert is complete. > > The code flow is > > start context > do somework > insert to hive > sc.stop > > This is sparkling water job is that matters. > > Is there anything else needed ? > > Thanks, > > J > > > -- Best Regards Jeff Zhang
Re: Spark on yarn vs spark standalone
If your cluster is a dedicated spark cluster (only running spark job, no other jobs like hive/pig/mr), then spark standalone would be fine. Otherwise I think yarn would be a better option. On Fri, Nov 27, 2015 at 3:36 PM, cs user <acldstk...@gmail.com> wrote: > Hi All, > > Apologies if this question has been asked before. I'd like to know if > there are any downsides to running spark over yarn with the --master > yarn-cluster option vs having a separate spark standalone cluster to > execute jobs? > > We're looking at installing a hdfs/hadoop cluster with Ambari and > submitting jobs to the cluster using yarn, or having an Ambari cluster and > a separate standalone spark cluster, which will run the spark jobs on data > within hdfs. > > With yarn, will we still get all the benefits of spark? > > Will it be possible to process streaming data? > > Many thanks in advance for any responses. > > Cheers! > -- Best Regards Jeff Zhang
Re: Optimizing large collect operations
For such large output, I would suggest you to do the following processing in cluster rather than in driver (use RDD api to do that). If you really want to pull it to driver, then you can first save it in hdfs and then read it using hdfs api to avoid the akka issue On Fri, Nov 27, 2015 at 2:41 PM, Gylfi <gy...@berkeley.edu> wrote: > Hi. > > I am doing very large collectAsMap() operations, about 10,000,000 records, > and I am getting > "org.apache.spark.SparkException: Error communicating with > MapOutputTracker" > errors.. > > details: > "org.apache.spark.SparkException: Error communicating with MapOutputTracker > at > org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:117) > at > > org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:164) > at > > org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42) > at > > org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40) > at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) > at > org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) > at org.apache.spark.scheduler.Task.run(Task.scala:64) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) > at > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > Caused by: org.apache.spark.SparkException: Error sending message [message > = > GetMapOutputStatuses(1)] > at > org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:209) > at > org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:113) > ... 12 more > Caused by: java.util.concurrent.TimeoutException: Futures timed out after > [300 seconds] > at > scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) > at > scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) > at > scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) > at > > scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) > at scala.concurrent.Await$.result(package.scala:107) > at > org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:195) > ... 13 more" > > I have already set set the akka.timeout to 300 etc. > Anyone have any ideas on what the problem could be ? > > Regares, > Gylfi. > > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Optimizing-large-collect-operations-tp25498.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > > -- Best Regards Jeff Zhang
Re: Adding new column to Dataframe
>>> I tried to use df.withColumn but I am getting below exception. What is rowNumber here ? UDF ? You can use monotonicallyIncreasingId for generating id >>> Also, is it possible to add a column from one dataframe to another? You can't, because how can you add one dataframe to another if they have different number of rows. You'd better to use join to correlate 2 data frames. On Thu, Nov 26, 2015 at 6:39 AM, Vishnu Viswanath < vishnu.viswanat...@gmail.com> wrote: > Hi, > > I am trying to add the row number to a spark dataframe. > This is my dataframe: > > scala> df.printSchema > root > |-- line: string (nullable = true) > > I tried to use df.withColumn but I am getting below exception. > > scala> df.withColumn("row",rowNumber) > org.apache.spark.sql.AnalysisException: unresolved operator 'Project > [line#2326,'row_number() AS row#2327]; > at > org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:37) > at > org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:44) > at > org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:174) > at > org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:49) > > Also, is it possible to add a column from one dataframe to another? > something like > > scala> df.withColumn("line2",df2("line")) > > org.apache.spark.sql.AnalysisException: resolved attribute(s) line#2330 > missing from line#2326 in operator !Project [line#2326,line#2330 AS > line2#2331]; > > > > Thanks and Regards, > Vishnu Viswanath > *www.vishnuviswanath.com <http://www.vishnuviswanath.com>* > -- Best Regards Jeff Zhang
Re: SparkR DataFrame , Out of memory exception for very small file.
>>> Do I need to create a new DataFrame for every update to the DataFrame like addition of new column or need to update the original sales DataFrame. Yes, DataFrame is immutable, and every mutation of DataFrame will produce a new DataFrame. On Mon, Nov 23, 2015 at 4:44 PM, Vipul Rai <vipulrai8...@gmail.com> wrote: > Hello Rui, > > Sorry , What I meant was the resultant of the original dataframe to which > a new column was added gives a new DataFrame. > > Please check this for more > > https://spark.apache.org/docs/1.5.1/api/R/index.html > > Check for > WithColumn > > > Thanks, > Vipul > > > On 23 November 2015 at 12:42, Sun, Rui <rui@intel.com> wrote: > >> Vipul, >> >> Not sure if I understand your question. DataFrame is immutable. You can't >> update a DataFrame. >> >> Could you paste some log info for the OOM error? >> >> -Original Message- >> From: vipulrai [mailto:vipulrai8...@gmail.com] >> Sent: Friday, November 20, 2015 12:11 PM >> To: user@spark.apache.org >> Subject: SparkR DataFrame , Out of memory exception for very small file. >> >> Hi Users, >> >> I have a general doubt regarding DataFrames in SparkR. >> >> I am trying to read a file from Hive and it gets created as DataFrame. >> >> sqlContext <- sparkRHive.init(sc) >> >> #DF >> sales <- read.df(sqlContext, "hdfs://sample.csv", header ='true', >> source = "com.databricks.spark.csv", inferSchema='true') >> >> registerTempTable(sales,"Sales") >> >> Do I need to create a new DataFrame for every update to the DataFrame >> like addition of new column or need to update the original sales DataFrame. >> >> sales1<- SparkR::sql(sqlContext,"Select a.* , 607 as C1 from Sales as a") >> >> >> Please help me with this , as the orignal file is only 20MB but it throws >> out of memory exception on a cluster of 4GB Master and Two workers of 4GB >> each. >> >> Also, what is the logic with DataFrame do I need to register and drop >> tempTable after every update?? >> >> Thanks, >> Vipul >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/SparkR-DataFrame-Out-of-memory-exception-for-very-small-file-tp25435.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional >> commands, e-mail: user-h...@spark.apache.org >> >> > > > -- > Regards, > Vipul Rai > www.vipulrai.me > +91-8892598819 > <http://in.linkedin.com/in/vipulrai/> > -- Best Regards Jeff Zhang
Re: SparkR DataFrame , Out of memory exception for very small file.
If possible, could you share your code ? What kind of operation are you doing on the dataframe ? On Mon, Nov 23, 2015 at 5:10 PM, Vipul Rai <vipulrai8...@gmail.com> wrote: > Hi Zeff, > > Thanks for the reply, but could you tell me why is it taking so much time. > What could be wrong , also when I remove the DataFrame from memory using > rm(). > It does not clear the memory but the object is deleted. > > Also , What about the R functions which are not supported in SparkR. > Like ddply ?? > > How to access the nth ROW of SparkR DataFrame. > > Regards, > Vipul > > On 23 November 2015 at 14:25, Jeff Zhang <zjf...@gmail.com> wrote: > >> >>> Do I need to create a new DataFrame for every update to the >> DataFrame like >> addition of new column or need to update the original sales DataFrame. >> >> Yes, DataFrame is immutable, and every mutation of DataFrame will produce >> a new DataFrame. >> >> >> >> On Mon, Nov 23, 2015 at 4:44 PM, Vipul Rai <vipulrai8...@gmail.com> >> wrote: >> >>> Hello Rui, >>> >>> Sorry , What I meant was the resultant of the original dataframe to >>> which a new column was added gives a new DataFrame. >>> >>> Please check this for more >>> >>> https://spark.apache.org/docs/1.5.1/api/R/index.html >>> >>> Check for >>> WithColumn >>> >>> >>> Thanks, >>> Vipul >>> >>> >>> On 23 November 2015 at 12:42, Sun, Rui <rui@intel.com> wrote: >>> >>>> Vipul, >>>> >>>> Not sure if I understand your question. DataFrame is immutable. You >>>> can't update a DataFrame. >>>> >>>> Could you paste some log info for the OOM error? >>>> >>>> -Original Message- >>>> From: vipulrai [mailto:vipulrai8...@gmail.com] >>>> Sent: Friday, November 20, 2015 12:11 PM >>>> To: user@spark.apache.org >>>> Subject: SparkR DataFrame , Out of memory exception for very small file. >>>> >>>> Hi Users, >>>> >>>> I have a general doubt regarding DataFrames in SparkR. >>>> >>>> I am trying to read a file from Hive and it gets created as DataFrame. >>>> >>>> sqlContext <- sparkRHive.init(sc) >>>> >>>> #DF >>>> sales <- read.df(sqlContext, "hdfs://sample.csv", header ='true', >>>> source = "com.databricks.spark.csv", >>>> inferSchema='true') >>>> >>>> registerTempTable(sales,"Sales") >>>> >>>> Do I need to create a new DataFrame for every update to the DataFrame >>>> like addition of new column or need to update the original sales >>>> DataFrame. >>>> >>>> sales1<- SparkR::sql(sqlContext,"Select a.* , 607 as C1 from Sales as >>>> a") >>>> >>>> >>>> Please help me with this , as the orignal file is only 20MB but it >>>> throws out of memory exception on a cluster of 4GB Master and Two workers >>>> of 4GB each. >>>> >>>> Also, what is the logic with DataFrame do I need to register and drop >>>> tempTable after every update?? >>>> >>>> Thanks, >>>> Vipul >>>> >>>> >>>> >>>> -- >>>> View this message in context: >>>> http://apache-spark-user-list.1001560.n3.nabble.com/SparkR-DataFrame-Out-of-memory-exception-for-very-small-file-tp25435.html >>>> Sent from the Apache Spark User List mailing list archive at Nabble.com. >>>> >>>> - >>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For >>>> additional commands, e-mail: user-h...@spark.apache.org >>>> >>>> >>> >>> >>> -- >>> Regards, >>> Vipul Rai >>> www.vipulrai.me >>> +91-8892598819 >>> <http://in.linkedin.com/in/vipulrai/> >>> >> >> >> >> -- >> Best Regards >> >> Jeff Zhang >> > > > > -- > Regards, > Vipul Rai > www.vipulrai.me > +91-8892598819 > <http://in.linkedin.com/in/vipulrai/> > -- Best Regards Jeff Zhang
Re: Re: driver ClassNotFoundException when MySQL JDBC exceptions are thrown on executor
duler.scala:874 > 15/10/16 17:42:41 INFO DAGScheduler: Submitting 1 missing tasks from > ResultStage 2 (MapPartitionsRDD[5] at map at repro.scala:48) > 15/10/16 17:42:41 INFO TaskSchedulerImpl: Adding task set 2.0 with 1 tasks > 15/10/16 17:42:41 INFO TaskSetManager: Starting task 0.0 in stage 2.0 (TID > 2, localhost, PROCESS_LOCAL, 1444 bytes) > 15/10/16 17:42:41 INFO Executor: Running task 0.0 in stage 2.0 (TID 2) > 15/10/16 17:42:41 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID > 2) > com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException > at repro.Repro$$anonfun$main$2.apply$mcZI$sp(repro.scala:49) > at repro.Repro$$anonfun$main$2.apply(repro.scala:48) > at repro.Repro$$anonfun$main$2.apply(repro.scala:48) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) > at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) > at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) > at scala.collection.AbstractIterator.to > <http://scala.collection.abstractiterator.to/>(Iterator.scala:1157) > at > scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) > at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) > at > scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) > at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) > at > org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:885) > at > org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:885) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) > at org.apache.spark.scheduler.Task.run(Task.scala:70) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > 15/10/16 17:42:41 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 2, > localhost): com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException > at repro.Repro$$anonfun$main$2.apply$mcZI$sp(repro.scala:49) > at repro.Repro$$anonfun$main$2.apply
Re: has any spark write orc document
It should be very similar with parquet in the api perspective, Please refer this doc http://hortonworks.com/hadoop-tutorial/using-hive-with-orc-from-apache-spark/ On Fri, Nov 20, 2015 at 2:59 PM, zhangjp <592426...@qq.com> wrote: > Hi, > has any spark write orc document which like the parquet document. > > http://spark.apache.org/docs/latest/sql-programming-guide.html#parquet-files > > Thanks > -- Best Regards Jeff Zhang
Re: Spark build error
ql.test.ExamplePointUDT in >> class ExamplePointUDT >> >> "" >> [] >> List(Nil) >> // tree.tpe=org.apache.spark.sql.test.ExamplePointUDT >> Block( // tree.tpe=Unit >> Apply( // def (): org.apache.spark.sql.types.UserDefinedType >> in class UserDefinedType, >> tree.tpe=org.apache.spark.sql.types.UserDefinedType >> ExamplePointUDT.super."" // def (): >> org.apache.spark.sql.types.UserDefinedType in class UserDefinedType, >> tree.tpe=()org.apache.spark.sql.types.UserDefinedType >> Nil >> ) >> () >> ) >> ) >> ) >> == Expanded type of tree == >> *ConstantType(* >> * value = Constant(org.apache.spark.sql.test.ExamplePoint)* >> *)* >> *uncaught exception during compilation: java.lang.AssertionError* >> >> *Error:scala: Error: assertion failed: List(object package$DebugNode, >> object package$DebugNode)* >> *java.lang.AssertionError: assertion failed: List(object >> package$DebugNode, object package$DebugNode)* >> at scala.reflect.internal.Symbols$Symbol.suchThat(Symbols.scala:1678) >> at >> scala.reflect.internal.Symbols$ClassSymbol.companionModule0(Symbols.scala:2988) >> at >> scala.reflect.internal.Symbols$ClassSymbol.companionModule(Symbols.scala:2991) >> at >> scala.tools.nsc.backend.jvm.GenASM$JPlainBuilder.genClass(GenASM.scala:1371) >> at scala.tools.nsc.backend.jvm.GenASM$AsmPhase.run(GenASM.scala:120) >> at scala.tools.nsc.Global$Run.compileUnitsInternal(Global.scala:1583) >> at scala.tools.nsc.Global$Run.compileUnits(Global.scala:1557) >> at scala.tools.nsc.Global$Run.compileSources(Global.scala:1553) >> at scala.tools.nsc.Global$Run.compile(Global.scala:1662) >> at xsbt.CachedCompiler0.run(CompilerInterface.scala:126) >> at xsbt.CachedCompiler0.run(CompilerInterface.scala:102) >> at xsbt.CompilerInterface.run(CompilerInterface.scala:27) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:606) >> at sbt.compiler.AnalyzingCompiler.call(AnalyzingCompiler.scala:102) >> at sbt.compiler.AnalyzingCompiler.compile(AnalyzingCompiler.scala:48) >> at sbt.compiler.AnalyzingCompiler.compile(AnalyzingCompiler.scala:41) >> at >> sbt.compiler.AggressiveCompile$$anonfun$6$$anonfun$compileScala$1$1$$anonfun$apply$3$$anonfun$apply$1.apply$mcV$sp(AggressiveCompile.scala:106) >> at >> sbt.compiler.AggressiveCompile$$anonfun$6$$anonfun$compileScala$1$1$$anonfun$apply$3$$anonfun$apply$1.apply(AggressiveCompile.scala:106) >> at >> sbt.compiler.AggressiveCompile$$anonfun$6$$anonfun$compileScala$1$1$$anonfun$apply$3$$anonfun$apply$1.apply(AggressiveCompile.scala:106) >> at >> sbt.compiler.AggressiveCompile.sbt$compiler$AggressiveCompile$$timed(AggressiveCompile.scala:179) >> at >> sbt.compiler.AggressiveCompile$$anonfun$6$$anonfun$compileScala$1$1$$anonfun$apply$3.apply(AggressiveCompile.scala:105) >> at >> sbt.compiler.AggressiveCompile$$anonfun$6$$anonfun$compileScala$1$1$$anonfun$apply$3.apply(AggressiveCompile.scala:102) >> at scala.Option.foreach(Option.scala:245) >> at >> sbt.compiler.AggressiveCompile$$anonfun$6$$anonfun$compileScala$1$1.apply(AggressiveCompile.scala:102) >> at >> sbt.compiler.AggressiveCompile$$anonfun$6$$anonfun$compileScala$1$1.apply(AggressiveCompile.scala:102) >> at scala.Option.foreach(Option.scala:245) >> at >> sbt.compiler.AggressiveCompile$$anonfun$6.compileScala$1(AggressiveCompile.scala:102) >> at >> sbt.compiler.AggressiveCompile$$anonfun$6.apply(AggressiveCompile.scala:151) >> at >> sbt.compiler.AggressiveCompile$$anonfun$6.apply(AggressiveCompile.scala:89) >> at sbt.inc.IncrementalCompile$$anonfun$doCompile$1.apply(Compile.scala:40) >> at sbt.inc.IncrementalCompile$$anonfun$doCompile$1.apply(Compile.scala:38) >> at sbt.inc.IncrementalCommon.cycle(Incremental.scala:103) >> at sbt.inc.Incremental$$anonfun$1.apply(Incremental.scala:39) >> at sbt.inc.Incremental$$anonfun$1.apply(Incremental.scala:38) >> at sbt.inc.Incremental$.manageClassfiles(Incremental.scala:69) >> at sbt.inc.Incremental$.compile(Incremental.scala:38) >> at sbt.inc.IncrementalCompile$.apply(Compile.scala:28) >> at sbt.compiler.AggressiveCompile.compile2(AggressiveCompile.scala:170) >> at sbt.compiler.AggressiveCompile.compile1(AggressiveCompile.scala:73) >> at >> org.jetbrains.jps.incremental.scala.local.SbtCompiler.compile(SbtCompiler.scala:66) >> at >> org.jetbrains.jps.incremental.scala.local.LocalServer.compile(LocalServer.scala:26) >> at org.jetbrains.jps.incremental.scala.remote.Main$.make(Main.scala:62) >> at >> org.jetbrains.jps.incremental.scala.remote.Main$.nailMain(Main.scala:20) >> at org.jetbrains.jps.incremental.scala.remote.Main.nailMain(Main.scala) >> at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:606) >> at com.martiansoftware.nailgun.NGSession.run(NGSession.java:319) >> >> I just highlighted some error message that I think important as *bold >> and red.* >> >> This really bothered me for several days, I don't know how to get >> through. Any suggestions? Thanks. >> > > -- Best Regards Jeff Zhang
Re: No spark examples jar in maven repository after 1.1.1 ?
But it may be useful for user to check the example source code in IDE just by adding it to maven dependency. Otherwise user have to either download the source code or check it in github. On Mon, Nov 16, 2015 at 5:32 PM, Sean Owen <so...@cloudera.com> wrote: > I think because they're not a library? they're example code, not > something you build an app on. > > On Mon, Nov 16, 2015 at 9:27 AM, Jeff Zhang <zjf...@gmail.com> wrote: > > I don't find spark examples jar in maven repository after 1.1.1. Any > reason > > for that ? > > > > http://mvnrepository.com/artifact/org.apache.spark/spark-examples_2.10 > > > > > > -- > > Best Regards > > > > Jeff Zhang > -- Best Regards Jeff Zhang
No spark examples jar in maven repository after 1.1.1 ?
I don't find spark examples jar in maven repository after 1.1.1. Any reason for that ? http://mvnrepository.com/artifact/org.apache.spark/spark-examples_2.10 -- Best Regards Jeff Zhang
Re: Why there's no api for SparkContext#textFiles to support multiple inputs ?
Didn't notice that I can pass comma separated path in the existing API (SparkContext#textFile). So no necessary for new api. Thanks all. On Thu, Nov 12, 2015 at 10:24 AM, Jeff Zhang <zjf...@gmail.com> wrote: > Hi Pradeep > > ≥≥≥ Looks like what I was suggesting doesn't work. :/ > I guess you mean put comma separated path into one string and pass it > to existing API (SparkContext#textFile). It should not work. I suggest to > create new api SparkContext#textFiles to accept an array of string. I have > already implemented a simple patch and it works. > > > > > On Thu, Nov 12, 2015 at 10:17 AM, Pradeep Gollakota <pradeep...@gmail.com> > wrote: > >> Looks like what I was suggesting doesn't work. :/ >> >> On Wed, Nov 11, 2015 at 4:49 PM, Jeff Zhang <zjf...@gmail.com> wrote: >> >>> Yes, that's what I suggest. TextInputFormat support multiple inputs. So >>> in spark side, we just need to provide API to for that. >>> >>> On Thu, Nov 12, 2015 at 8:45 AM, Pradeep Gollakota <pradeep...@gmail.com >>> > wrote: >>> >>>> IIRC, TextInputFormat supports an input path that is a comma separated >>>> list. I haven't tried this, but I think you should just be able to do >>>> sc.textFile("file1,file2,...") >>>> >>>> On Wed, Nov 11, 2015 at 4:30 PM, Jeff Zhang <zjf...@gmail.com> wrote: >>>> >>>>> I know these workaround, but wouldn't it be more convenient and >>>>> straightforward to use SparkContext#textFiles ? >>>>> >>>>> On Thu, Nov 12, 2015 at 2:27 AM, Mark Hamstra <m...@clearstorydata.com >>>>> > wrote: >>>>> >>>>>> For more than a small number of files, you'd be better off using >>>>>> SparkContext#union instead of RDD#union. That will avoid building up a >>>>>> lengthy lineage. >>>>>> >>>>>> On Wed, Nov 11, 2015 at 10:21 AM, Jakob Odersky <joder...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hey Jeff, >>>>>>> Do you mean reading from multiple text files? In that case, as a >>>>>>> workaround, you can use the RDD#union() (or ++) method to concatenate >>>>>>> multiple rdds. For example: >>>>>>> >>>>>>> val lines1 = sc.textFile("file1") >>>>>>> val lines2 = sc.textFile("file2") >>>>>>> >>>>>>> val rdd = lines1 union lines2 >>>>>>> >>>>>>> regards, >>>>>>> --Jakob >>>>>>> >>>>>>> On 11 November 2015 at 01:20, Jeff Zhang <zjf...@gmail.com> wrote: >>>>>>> >>>>>>>> Although user can use the hdfs glob syntax to support multiple >>>>>>>> inputs. But sometimes, it is not convenient to do that. Not sure why >>>>>>>> there's no api of SparkContext#textFiles. It should be easy to >>>>>>>> implement >>>>>>>> that. I'd love to create a ticket and contribute for that if there's no >>>>>>>> other consideration that I don't know. >>>>>>>> >>>>>>>> -- >>>>>>>> Best Regards >>>>>>>> >>>>>>>> Jeff Zhang >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>>>> >>>>> -- >>>>> Best Regards >>>>> >>>>> Jeff Zhang >>>>> >>>> >>>> >>> >>> >>> -- >>> Best Regards >>> >>> Jeff Zhang >>> >> >> > > > -- > Best Regards > > Jeff Zhang > -- Best Regards Jeff Zhang
Why there's no api for SparkContext#textFiles to support multiple inputs ?
Although user can use the hdfs glob syntax to support multiple inputs. But sometimes, it is not convenient to do that. Not sure why there's no api of SparkContext#textFiles. It should be easy to implement that. I'd love to create a ticket and contribute for that if there's no other consideration that I don't know. -- Best Regards Jeff Zhang
Re: Why there's no api for SparkContext#textFiles to support multiple inputs ?
I know these workaround, but wouldn't it be more convenient and straightforward to use SparkContext#textFiles ? On Thu, Nov 12, 2015 at 2:27 AM, Mark Hamstra <m...@clearstorydata.com> wrote: > For more than a small number of files, you'd be better off using > SparkContext#union instead of RDD#union. That will avoid building up a > lengthy lineage. > > On Wed, Nov 11, 2015 at 10:21 AM, Jakob Odersky <joder...@gmail.com> > wrote: > >> Hey Jeff, >> Do you mean reading from multiple text files? In that case, as a >> workaround, you can use the RDD#union() (or ++) method to concatenate >> multiple rdds. For example: >> >> val lines1 = sc.textFile("file1") >> val lines2 = sc.textFile("file2") >> >> val rdd = lines1 union lines2 >> >> regards, >> --Jakob >> >> On 11 November 2015 at 01:20, Jeff Zhang <zjf...@gmail.com> wrote: >> >>> Although user can use the hdfs glob syntax to support multiple inputs. >>> But sometimes, it is not convenient to do that. Not sure why there's no api >>> of SparkContext#textFiles. It should be easy to implement that. I'd love to >>> create a ticket and contribute for that if there's no other consideration >>> that I don't know. >>> >>> -- >>> Best Regards >>> >>> Jeff Zhang >>> >> >> > -- Best Regards Jeff Zhang
Re: Why there's no api for SparkContext#textFiles to support multiple inputs ?
Yes, that's what I suggest. TextInputFormat support multiple inputs. So in spark side, we just need to provide API to for that. On Thu, Nov 12, 2015 at 8:45 AM, Pradeep Gollakota <pradeep...@gmail.com> wrote: > IIRC, TextInputFormat supports an input path that is a comma separated > list. I haven't tried this, but I think you should just be able to do > sc.textFile("file1,file2,...") > > On Wed, Nov 11, 2015 at 4:30 PM, Jeff Zhang <zjf...@gmail.com> wrote: > >> I know these workaround, but wouldn't it be more convenient and >> straightforward to use SparkContext#textFiles ? >> >> On Thu, Nov 12, 2015 at 2:27 AM, Mark Hamstra <m...@clearstorydata.com> >> wrote: >> >>> For more than a small number of files, you'd be better off using >>> SparkContext#union instead of RDD#union. That will avoid building up a >>> lengthy lineage. >>> >>> On Wed, Nov 11, 2015 at 10:21 AM, Jakob Odersky <joder...@gmail.com> >>> wrote: >>> >>>> Hey Jeff, >>>> Do you mean reading from multiple text files? In that case, as a >>>> workaround, you can use the RDD#union() (or ++) method to concatenate >>>> multiple rdds. For example: >>>> >>>> val lines1 = sc.textFile("file1") >>>> val lines2 = sc.textFile("file2") >>>> >>>> val rdd = lines1 union lines2 >>>> >>>> regards, >>>> --Jakob >>>> >>>> On 11 November 2015 at 01:20, Jeff Zhang <zjf...@gmail.com> wrote: >>>> >>>>> Although user can use the hdfs glob syntax to support multiple inputs. >>>>> But sometimes, it is not convenient to do that. Not sure why there's no >>>>> api >>>>> of SparkContext#textFiles. It should be easy to implement that. I'd love >>>>> to >>>>> create a ticket and contribute for that if there's no other consideration >>>>> that I don't know. >>>>> >>>>> -- >>>>> Best Regards >>>>> >>>>> Jeff Zhang >>>>> >>>> >>>> >>> >> >> >> -- >> Best Regards >> >> Jeff Zhang >> > > -- Best Regards Jeff Zhang
Re: ResultStage's parent stages only ShuffleMapStages?
Right, there're only 2 kinds of stage: ResultStage & ShuffleMapStage. ShuffleMapStage will shuffle its data for downstream consumption, but ResultStage don't need to do that. I guess you may be confused these concepts with Map/Reduce. Actually ShuffleMapStage could be represented as either Map or Reduce as long as it produce intermediate data for downstream consumption. On Fri, Nov 6, 2015 at 4:15 PM, Jacek Laskowski <ja...@japila.pl> wrote: > Hi, > > Just to make sure that what I see in the code and think I understand > is indeed correct... > > When a job is submitted to DAGScheduler, it creates a new ResultStage > that in turn queries for the parent stages of itself given the RDD > (using `getParentStagesAndId` in `newResultStage`). > > Are a ResultStage's parent stages only ShuffleMapStages? > > Pozdrawiam, > Jacek > > -- > Jacek Laskowski | http://blog.japila.pl | http://blog.jaceklaskowski.pl > Follow me at https://twitter.com/jaceklaskowski > Upvote at http://stackoverflow.com/users/1305344/jacek-laskowski > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > > -- Best Regards Jeff Zhang
How to use And Operator in filter (PySpark)
I can do it in scala api, but not sure what's the syntax in pyspark. (Didn't find it in python api) Here's what I tried, both failed >>> df.filter(df.age>3 & df.name=="Andy").collect() >>> df.filter(df.age>3 and df.name=="Andy").collect() -- Best Regards Jeff Zhang
Re: Location preferences in pyspark?
Yes, I don't think there is. You can use SparkContext.parallelize() to make a RDD from a list. But no location preferences supported yet. On Sat, Oct 17, 2015 at 8:42 AM, Philip Weaver <philip.wea...@gmail.com> wrote: > I believe what I want is the exact functionality provided by > SparkContext.makeRDD in Scala. For each element in the RDD, I want specify > a list of preferred hosts for processing that element. > > It looks like this method only exists in Scala, and as far as I can tell > there is no similar functionality available in python. Is this true? > > - Philip > > -- Best Regards Jeff Zhang
Re: Reading JSON in Pyspark throws scala.MatchError
BTW, I think Json Parser should verify the json format at least when inferring the schema of json. On Wed, Oct 21, 2015 at 12:59 PM, Jeff Zhang <zjf...@gmail.com> wrote: > I think this is due to the json file format. DataFrame can only accept > json file with one valid record per line. Multiple line per record is > invalid for DataFrame. > > > On Tue, Oct 6, 2015 at 2:48 AM, Davies Liu <dav...@databricks.com> wrote: > >> Could you create a JIRA to track this bug? >> >> On Fri, Oct 2, 2015 at 1:42 PM, balajikvijayan >> <balaji.k.vija...@gmail.com> wrote: >> > Running Windows 8.1, Python 2.7.x, Scala 2.10.5, Spark 1.4.1. >> > >> > I'm trying to read in a large quantity of json data in a couple of >> files and >> > I receive a scala.MatchError when I do so. Json, Python and stack trace >> all >> > shown below. >> > >> > Json: >> > >> > { >> > "dataunit": { >> > "page_view": { >> > "nonce": 438058072, >> > "person": { >> > "user_id": 5846 >> > }, >> > "page": { >> > "url": "http://mysite.com/blog; >> > } >> > } >> > }, >> > "pedigree": { >> > "true_as_of_secs": 1438627992 >> > } >> > } >> > >> > Python: >> > >> > import pyspark >> > sc = pyspark.SparkContext() >> > sqlContext = pyspark.SQLContext(sc) >> > pageviews = sqlContext.read.json("[Path to folder containing file with >> above >> > json]") >> > pageviews.collect() >> > >> > Stack Trace: >> > Py4JJavaError: An error occurred while calling >> > z:org.apache.spark.api.python.PythonRDD.collectAndServe. >> > : org.apache.spark.SparkException: Job aborted due to stage failure: >> Task 1 >> > in stage 32.0 failed 1 times, most recent failure: Lost task 1.0 in >> stage >> > 32.0 (TID 133, localhost): scala.MatchError: >> > (VALUE_STRING,ArrayType(StructType(),true)) (of class scala.Tuple2) >> > at >> > >> org.apache.spark.sql.json.JacksonParser$.convertField(JacksonParser.scala:49) >> > at >> > >> org.apache.spark.sql.json.JacksonParser$$anonfun$parseJson$1$$anonfun$apply$1.apply(JacksonParser.scala:201) >> > at >> > >> org.apache.spark.sql.json.JacksonParser$$anonfun$parseJson$1$$anonfun$apply$1.apply(JacksonParser.scala:193) >> > at >> scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) >> > at >> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >> > at >> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >> > at >> > >> org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:116) >> > at scala.collection.Iterator$class.foreach(Iterator.scala:727) >> > at >> > >> org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:111) >> > at >> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) >> > at >> > >> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) >> > at >> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) >> > at scala.collection.TraversableOnce$class.to >> (TraversableOnce.scala:273) >> > at >> > >> org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.to(SerDeUtil.scala:111) >> > at >> > >> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) >> > at >> > >> org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.toBuffer(SerDeUtil.scala:111) >> > at >> > >> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) >> > at >> > >> org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.toArray(SerDeUtil.scala:111) >> > at >> > >> org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:885) >> > at >> > >> org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:885) >> > at >> > >> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkCo
Re: Reading JSON in Pyspark throws scala.MatchError
hread.run(Thread.java:745) > > > > Driver stacktrace: > > at > > org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273) > > at > > > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264) > > at > > > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263) > > at > > > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > > at > scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > > at > > > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263) > > at > > > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) > > at > > > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) > > at scala.Option.foreach(Option.scala:236) > > at > > > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730) > > at > > > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1457) > > at > > > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418) > > at > org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > > > > It seems like this issue has been resolved in scala per SPARK-3390 > > <https://issues.apache.org/jira/browse/SPARK-3390> ; any thoughts on > the > > root cause of this in pyspark? > > > > > > > > -- > > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Reading-JSON-in-Pyspark-throws-scala-MatchError-tp24911.html > > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > > > - > > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > > For additional commands, e-mail: user-h...@spark.apache.org > > > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > > -- Best Regards Jeff Zhang
Re: pyspark groupbykey throwing error: unpack requires a string argument of length 4
Stacktrace would be helpful if you can provide that. On Mon, Oct 19, 2015 at 1:42 PM, fahad shah <sfaha...@gmail.com> wrote: > Hi > > I am trying to do pair rdd's, group by the key assign id based on key. > I am using Pyspark with spark 1.3, for some reason, I am getting this > error that I am unable to figure out - any help much appreciated. > > Things I tried (but to no effect), > > 1. make sure I am not doing any conversions on the strings > 2. make sure that the fields used in the key are all there and not > empty string (or else I toss the row out) > > My code is along following lines (split is using stringio to parse > csv, header removes the header row and parse_train is putting the 54 > fields into named tuple after whitespace/quote removal): > > #Error for string argument is thrown on the BB.take(1) where the > groupbykey is evaluated > > A = sc.textFile("train.csv").filter(lambda x:not > isHeader(x)).map(split).map(parse_train).filter(lambda x: not x is > None) > > A.count() > > B = A.map(lambda k: > > ((k.srch_destination_id,k.srch_length_of_stay,k.srch_booking_window,k.srch_adults_count, > k.srch_children_count,k.srch_room_count), > (k[0:54]))) > BB = B.groupByKey() > BB.take(1) > > > best fahad > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > > -- Best Regards Jeff Zhang
Re: Setting executors per worker - Standalone
use "--executor-cores 1" you will get 4 executors per worker since you have 4 cores per worker On Tue, Sep 29, 2015 at 8:24 AM, James Pirz <james.p...@gmail.com> wrote: > Hi, > > I am using speak 1.5 (standalone mode) on a cluster with 10 nodes while > each machine has 12GB of RAM and 4 cores. On each machine I have one worker > which is running one executor that grabs all 4 cores. I am interested to > check the performance with "one worker but 4 executors per machine - each > with one core". > > I can see that "running multiple executors per worker in Standalone mode" > is possible based on the closed issue: > > https://issues.apache.org/jira/browse/SPARK-1706 > > But I can not find a way to do that. "SPARK_EXECUTOR_INSTANCES" is only > available for the Yarn mode, and in the standalone mode I can just set > "SPARK_WORKER_INSTANCES" and "SPARK_WORKER_CORES" and "SPARK_WORKER_MEMORY". > > Any hint or suggestion would be great. > > -- Best Regards Jeff Zhang
Task serialization error for mllib.MovieLensALS
I run the MovieLensALS, but meet the following error. The weird thing is that this issue only appear under openjdk. And this is based on the 1.5, I found several related tickets, not sure has anyone else meet the same issue and know the solution ? Thanks https://issues.apache.org/jira/browse/SPARK-4672 https://issues.apache.org/jira/browse/SPARK-4838 Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task serialization failed: java.lang.StackOverflowError java.io.ObjectOutputStream$BlockDataOutputStream.write(ObjectOutputStream.java:1841) java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1534) java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) scala.collection.immutable.$colon$colon.writeObject(List.scala:379) -- Best Regards Jeff Zhang
Re: Event logging not working when worker machine terminated
What cluster mode do you use ? Standalone/Yarn/Mesos ? On Wed, Sep 9, 2015 at 11:15 AM, David Rosenstrauch <dar...@darose.net> wrote: > Our Spark cluster is configured to write application history event logging > to a directory on HDFS. This all works fine. (I've tested it with Spark > shell.) > > However, on a large, long-running job that we ran tonight, one of our > machines at the cloud provider had issues and had to be terminated and > replaced in the middle of the job. > > The job completed correctly, and shows in state FINISHED in the "Completed > Applications" section of the Spark GUI. However, when I try to look at the > application's history, the GUI says "Application history not found" and > "Application ... is still in progress". > > The reason appears to be the machine that was terminated. When I click on > the executor list for that job, Spark is showing the executor from the > terminated machine as still in state RUNNING. > > Any solution/workaround for this? BTW, I'm running Spark v1.3.0. > > Thanks, > > DR > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > > -- Best Regards Jeff Zhang
Re: Submitted applications does not run.
This is master log. There's no worker registration info in the log. That means the worker may not start properly. Please check the log file with apache.spark.deploy.worker in file name. On Tue, Sep 1, 2015 at 2:55 PM, Madawa Soysa <madawa...@cse.mrt.ac.lk> wrote: > I cannot see anything abnormal in logs. What would be the reason for not > availability of executors? > > On 1 September 2015 at 12:24, Madawa Soysa <madawa...@cse.mrt.ac.lk> > wrote: > >> Following are the logs available. Please find the attached. >> >> On 1 September 2015 at 12:18, Jeff Zhang <zjf...@gmail.com> wrote: >> >>> It's in SPARK_HOME/logs >>> >>> Or you can check the spark web ui. http://[master-machine]:8080 >>> >>> >>> On Tue, Sep 1, 2015 at 2:44 PM, Madawa Soysa <madawa...@cse.mrt.ac.lk> >>> wrote: >>> >>>> How do I check worker logs? SPARK_HOME/work folder does not exist. I am >>>> using the spark standalone mode. >>>> >>>> On 1 September 2015 at 12:05, Jeff Zhang <zjf...@gmail.com> wrote: >>>> >>>>> No executors ? Please check the worker logs if you are using spark >>>>> standalone mode. >>>>> >>>>> On Tue, Sep 1, 2015 at 2:17 PM, Madawa Soysa <madawa...@cse.mrt.ac.lk> >>>>> wrote: >>>>> >>>>>> Hi All, >>>>>> >>>>>> I have successfully submitted some jobs to spark master. But the jobs >>>>>> won't progress and not finishing. Please see the attached screenshot. >>>>>> These >>>>>> are fairly very small jobs and this shouldn't take more than a minute to >>>>>> finish. >>>>>> >>>>>> I'm new to spark and any help would be appreciated. >>>>>> >>>>>> Thanks, >>>>>> Madawa. >>>>>> >>>>>> >>>>>> - >>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>>>> For additional commands, e-mail: user-h...@spark.apache.org >>>>>> >>>>> >>>>> >>>>> >>>>> -- >>>>> Best Regards >>>>> >>>>> Jeff Zhang >>>>> >>>> >>>> >>>> >>>> -- >>>> >>>> *_**Madawa Soysa* >>>> >>>> Undergraduate, >>>> >>>> Department of Computer Science and Engineering, >>>> >>>> University of Moratuwa. >>>> >>>> >>>> Mobile: +94 71 461 6050 <%2B94%2075%20812%200726> | Email: >>>> madawa...@cse.mrt.ac.lk >>>> LinkedIn <http://lk.linkedin.com/in/madawasoysa> | Twitter >>>> <https://twitter.com/madawa_rc> | Tumblr <http://madawas.tumblr.com/> >>>> >>> >>> >>> >>> -- >>> Best Regards >>> >>> Jeff Zhang >>> >> >> >> >> -- >> >> *_**Madawa Soysa* >> >> Undergraduate, >> >> Department of Computer Science and Engineering, >> >> University of Moratuwa. >> >> >> Mobile: +94 71 461 6050 <%2B94%2075%20812%200726> | Email: >> madawa...@cse.mrt.ac.lk >> LinkedIn <http://lk.linkedin.com/in/madawasoysa> | Twitter >> <https://twitter.com/madawa_rc> | Tumblr <http://madawas.tumblr.com/> >> > > > > -- > > *_**Madawa Soysa* > > Undergraduate, > > Department of Computer Science and Engineering, > > University of Moratuwa. > > > Mobile: +94 71 461 6050 <%2B94%2075%20812%200726> | Email: > madawa...@cse.mrt.ac.lk > LinkedIn <http://lk.linkedin.com/in/madawasoysa> | Twitter > <https://twitter.com/madawa_rc> | Tumblr <http://madawas.tumblr.com/> > -- Best Regards Jeff Zhang
Re: Submitted applications does not run.
It's in SPARK_HOME/logs Or you can check the spark web ui. http://[master-machine]:8080 On Tue, Sep 1, 2015 at 2:44 PM, Madawa Soysa <madawa...@cse.mrt.ac.lk> wrote: > How do I check worker logs? SPARK_HOME/work folder does not exist. I am > using the spark standalone mode. > > On 1 September 2015 at 12:05, Jeff Zhang <zjf...@gmail.com> wrote: > >> No executors ? Please check the worker logs if you are using spark >> standalone mode. >> >> On Tue, Sep 1, 2015 at 2:17 PM, Madawa Soysa <madawa...@cse.mrt.ac.lk> >> wrote: >> >>> Hi All, >>> >>> I have successfully submitted some jobs to spark master. But the jobs >>> won't progress and not finishing. Please see the attached screenshot. These >>> are fairly very small jobs and this shouldn't take more than a minute to >>> finish. >>> >>> I'm new to spark and any help would be appreciated. >>> >>> Thanks, >>> Madawa. >>> >>> >>> ----- >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>> For additional commands, e-mail: user-h...@spark.apache.org >>> >> >> >> >> -- >> Best Regards >> >> Jeff Zhang >> > > > > -- > > *_**Madawa Soysa* > > Undergraduate, > > Department of Computer Science and Engineering, > > University of Moratuwa. > > > Mobile: +94 71 461 6050 <%2B94%2075%20812%200726> | Email: > madawa...@cse.mrt.ac.lk > LinkedIn <http://lk.linkedin.com/in/madawasoysa> | Twitter > <https://twitter.com/madawa_rc> | Tumblr <http://madawas.tumblr.com/> > -- Best Regards Jeff Zhang
Re: Submitted applications does not run.
No executors ? Please check the worker logs if you are using spark standalone mode. On Tue, Sep 1, 2015 at 2:17 PM, Madawa Soysa <madawa...@cse.mrt.ac.lk> wrote: > Hi All, > > I have successfully submitted some jobs to spark master. But the jobs > won't progress and not finishing. Please see the attached screenshot. These > are fairly very small jobs and this shouldn't take more than a minute to > finish. > > I'm new to spark and any help would be appreciated. > > Thanks, > Madawa. > > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > -- Best Regards Jeff Zhang
Re: cached data between jobs
Hi Eric, If the 2 jobs share the same parent stages. these stages can be skipped for the second job. Here's one simple example: val rdd1 = sc.parallelize(1 to 10).map(e=>(e,e)) val rdd2 = rdd1.groupByKey() rdd2.map(e=>e._1).collect() foreach println rdd2.map(e=> (e._1, e._2.size)).collect foreach println Obviously, there are 2 jobs and both of them have 2 stages. Luckily here these 2 jobs share the same stage (the first stage of each job), although you doesn't cache these data explicitly, once one stage is completed, it is marked as available and can used for other jobs. so for the second job, it only needs to run one stage. You should be able to see the skipped stage in the spark job ui. [image: Inline image 1] On Wed, Sep 2, 2015 at 12:53 AM, Eric Walker <eric.wal...@gmail.com> wrote: > Hi, > > I'm noticing that a 30 minute job that was initially IO-bound may not be > during subsequent runs. Is there some kind of between-job caching that > happens in Spark or in Linux that outlives jobs and that might be making > subsequent runs faster? If so, is there a way to avoid the caching in > order to get a better sense of the worst-case scenario? > > (It's also possible that I've simply changed something that made things > faster.) > > Eric > > -- Best Regards Jeff Zhang
Re: Submitted applications does not run.
Did you start spark cluster using command sbin/start-all.sh ? You should have 2 log files under folder if it is single-node cluster. Like the following spark-jzhang-org.apache.spark.deploy.master.Master-1-jzhangMBPr.local.out spark-jzhang-org.apache.spark.deploy.worker.Worker-1-jzhangMBPr.local.out On Tue, Sep 1, 2015 at 4:01 PM, Madawa Soysa <madawa...@cse.mrt.ac.lk> wrote: > There are no logs which includes apache.spark.deploy.worker in file name > in the SPARK_HOME/logs folder. > > On 1 September 2015 at 13:00, Jeff Zhang <zjf...@gmail.com> wrote: > >> This is master log. There's no worker registration info in the log. That >> means the worker may not start properly. Please check the log file >> with apache.spark.deploy.worker in file name. >> >> >> >> On Tue, Sep 1, 2015 at 2:55 PM, Madawa Soysa <madawa...@cse.mrt.ac.lk> >> wrote: >> >>> I cannot see anything abnormal in logs. What would be the reason for not >>> availability of executors? >>> >>> On 1 September 2015 at 12:24, Madawa Soysa <madawa...@cse.mrt.ac.lk> >>> wrote: >>> >>>> Following are the logs available. Please find the attached. >>>> >>>> On 1 September 2015 at 12:18, Jeff Zhang <zjf...@gmail.com> wrote: >>>> >>>>> It's in SPARK_HOME/logs >>>>> >>>>> Or you can check the spark web ui. http://[master-machine]:8080 >>>>> >>>>> >>>>> On Tue, Sep 1, 2015 at 2:44 PM, Madawa Soysa <madawa...@cse.mrt.ac.lk> >>>>> wrote: >>>>> >>>>>> How do I check worker logs? SPARK_HOME/work folder does not exist. I >>>>>> am using the spark standalone mode. >>>>>> >>>>>> On 1 September 2015 at 12:05, Jeff Zhang <zjf...@gmail.com> wrote: >>>>>> >>>>>>> No executors ? Please check the worker logs if you are using spark >>>>>>> standalone mode. >>>>>>> >>>>>>> On Tue, Sep 1, 2015 at 2:17 PM, Madawa Soysa < >>>>>>> madawa...@cse.mrt.ac.lk> wrote: >>>>>>> >>>>>>>> Hi All, >>>>>>>> >>>>>>>> I have successfully submitted some jobs to spark master. But the >>>>>>>> jobs won't progress and not finishing. Please see the attached >>>>>>>> screenshot. >>>>>>>> These are fairly very small jobs and this shouldn't take more than a >>>>>>>> minute >>>>>>>> to finish. >>>>>>>> >>>>>>>> I'm new to spark and any help would be appreciated. >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Madawa. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> - >>>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>>>>>> For additional commands, e-mail: user-h...@spark.apache.org >>>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Best Regards >>>>>>> >>>>>>> Jeff Zhang >>>>>>> >>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> >>>>>> *_**Madawa Soysa* >>>>>> >>>>>> Undergraduate, >>>>>> >>>>>> Department of Computer Science and Engineering, >>>>>> >>>>>> University of Moratuwa. >>>>>> >>>>>> >>>>>> Mobile: +94 71 461 6050 <%2B94%2075%20812%200726> | Email: >>>>>> madawa...@cse.mrt.ac.lk >>>>>> LinkedIn <http://lk.linkedin.com/in/madawasoysa> | Twitter >>>>>> <https://twitter.com/madawa_rc> | Tumblr <http://madawas.tumblr.com/> >>>>>> >>>>> >>>>> >>>>> >>>>> -- >>>>> Best Regards >>>>> >>>>> Jeff Zhang >>>>> >>>> >>>> >>>> >>>> -- >>>> >>>> *_**Madawa Soysa* >>>> >>>> Undergraduate, >>>> >>>> Department of Computer Science and Engineering, >>>> >>>> University of Moratuwa. >>>> >>>> >>>> Mobile: +94 71 461 6050 <%2B94%2075%20812%200726> | Email: >>>> madawa...@cse.mrt.ac.lk >>>> LinkedIn <http://lk.linkedin.com/in/madawasoysa> | Twitter >>>> <https://twitter.com/madawa_rc> | Tumblr <http://madawas.tumblr.com/> >>>> >>> >>> >>> >>> -- >>> >>> *_**Madawa Soysa* >>> >>> Undergraduate, >>> >>> Department of Computer Science and Engineering, >>> >>> University of Moratuwa. >>> >>> >>> Mobile: +94 71 461 6050 <%2B94%2075%20812%200726> | Email: >>> madawa...@cse.mrt.ac.lk >>> LinkedIn <http://lk.linkedin.com/in/madawasoysa> | Twitter >>> <https://twitter.com/madawa_rc> | Tumblr <http://madawas.tumblr.com/> >>> >> >> >> >> -- >> Best Regards >> >> Jeff Zhang >> > > > > -- > > *_**Madawa Soysa* > > Undergraduate, > > Department of Computer Science and Engineering, > > University of Moratuwa. > > > Mobile: +94 71 461 6050 <%2B94%2075%20812%200726> | Email: > madawa...@cse.mrt.ac.lk > LinkedIn <http://lk.linkedin.com/in/madawasoysa> | Twitter > <https://twitter.com/madawa_rc> | Tumblr <http://madawas.tumblr.com/> > -- Best Regards Jeff Zhang
Re: Submitted applications does not run.
You need to make yourself able to ssh to localhost without password, please check this blog. http://hortonworks.com/kb/generating-ssh-keys-for-passwordless-login/ On Tue, Sep 1, 2015 at 4:31 PM, Madawa Soysa <madawa...@cse.mrt.ac.lk> wrote: > I used ./sbin/start-master.sh > > When I used ./sbin/start-all.sh the start fails. I get the following error. > > failed to launch org.apache.spark.deploy.master.Master: > localhost: ssh: connect to host localhost port 22: Connection refused > > On 1 September 2015 at 13:41, Jeff Zhang <zjf...@gmail.com> wrote: > >> Did you start spark cluster using command sbin/start-all.sh ? >> You should have 2 log files under folder if it is single-node cluster. >> Like the following >> >> spark-jzhang-org.apache.spark.deploy.master.Master-1-jzhangMBPr.local.out >> spark-jzhang-org.apache.spark.deploy.worker.Worker-1-jzhangMBPr.local.out >> >> >> >> On Tue, Sep 1, 2015 at 4:01 PM, Madawa Soysa <madawa...@cse.mrt.ac.lk> >> wrote: >> >>> There are no logs which includes apache.spark.deploy.worker in file >>> name in the SPARK_HOME/logs folder. >>> >>> On 1 September 2015 at 13:00, Jeff Zhang <zjf...@gmail.com> wrote: >>> >>>> This is master log. There's no worker registration info in the log. >>>> That means the worker may not start properly. Please check the log file >>>> with apache.spark.deploy.worker in file name. >>>> >>>> >>>> >>>> On Tue, Sep 1, 2015 at 2:55 PM, Madawa Soysa <madawa...@cse.mrt.ac.lk> >>>> wrote: >>>> >>>>> I cannot see anything abnormal in logs. What would be the reason for >>>>> not availability of executors? >>>>> >>>>> On 1 September 2015 at 12:24, Madawa Soysa <madawa...@cse.mrt.ac.lk> >>>>> wrote: >>>>> >>>>>> Following are the logs available. Please find the attached. >>>>>> >>>>>> On 1 September 2015 at 12:18, Jeff Zhang <zjf...@gmail.com> wrote: >>>>>> >>>>>>> It's in SPARK_HOME/logs >>>>>>> >>>>>>> Or you can check the spark web ui. http://[master-machine]:8080 >>>>>>> >>>>>>> >>>>>>> On Tue, Sep 1, 2015 at 2:44 PM, Madawa Soysa < >>>>>>> madawa...@cse.mrt.ac.lk> wrote: >>>>>>> >>>>>>>> How do I check worker logs? SPARK_HOME/work folder does not exist. >>>>>>>> I am using the spark standalone mode. >>>>>>>> >>>>>>>> On 1 September 2015 at 12:05, Jeff Zhang <zjf...@gmail.com> wrote: >>>>>>>> >>>>>>>>> No executors ? Please check the worker logs if you are using spark >>>>>>>>> standalone mode. >>>>>>>>> >>>>>>>>> On Tue, Sep 1, 2015 at 2:17 PM, Madawa Soysa < >>>>>>>>> madawa...@cse.mrt.ac.lk> wrote: >>>>>>>>> >>>>>>>>>> Hi All, >>>>>>>>>> >>>>>>>>>> I have successfully submitted some jobs to spark master. But the >>>>>>>>>> jobs won't progress and not finishing. Please see the attached >>>>>>>>>> screenshot. >>>>>>>>>> These are fairly very small jobs and this shouldn't take more than a >>>>>>>>>> minute >>>>>>>>>> to finish. >>>>>>>>>> >>>>>>>>>> I'm new to spark and any help would be appreciated. >>>>>>>>>> >>>>>>>>>> Thanks, >>>>>>>>>> Madawa. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> - >>>>>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>>>>>>>> For additional commands, e-mail: user-h...@spark.apache.org >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Best Regards >>>>&
Re: Exception throws when running spark pi in Intellij Idea that scala.collection.Seq is not found
As I remember, you also need to change guava and jetty related dependency to compile if you run to run SparkPi in intellij. On Tue, Aug 25, 2015 at 3:15 PM, Hemant Bhanawat hemant9...@gmail.com wrote: Go to the module settings of the project and in the dependencies section check the scope of scala jars. It would be either Test or Provided. Change it to compile and it should work. Check the following link to understand more about scope of modules: https://maven.apache.org/guides/introduction/introduction-to-dependency-mechanism.html On Tue, Aug 25, 2015 at 12:18 PM, Todd bit1...@163.com wrote: I cloned the code from https://github.com/apache/spark to my machine. It can compile successfully, But when I run the sparkpi, it throws an exception below complaining the scala.collection.Seq is not found. I have installed scala2.10.4 in my machine, and use the default profiles: window,scala2.10,maven-3,test-java-home. In Idea, I can find that the Seq class is on my classpath: Exception in thread main java.lang.NoClassDefFoundError: scala/collection/Seq at org.apache.spark.examples.SparkPi.main(SparkPi.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) Caused by: java.lang.ClassNotFoundException: scala.collection.Seq at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ... 6 more -- Best Regards Jeff Zhang
DataFrame#show cost 2 Spark Jobs ?
It's weird to me that the simple show function will cost 2 spark jobs. DataFrame#explain shows it is a very simple operation, not sure why need 2 jobs. == Parsed Logical Plan == Relation[age#0L,name#1] JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json] == Analyzed Logical Plan == age: bigint, name: string Relation[age#0L,name#1] JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json] == Optimized Logical Plan == Relation[age#0L,name#1] JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json] == Physical Plan == Scan JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json][age#0L,name#1] -- Best Regards Jeff Zhang
Re: DataFrame#show cost 2 Spark Jobs ?
Hi Cheng, I know that sqlContext.read will trigger one spark job to infer the schema. What I mean is DataFrame#show cost 2 spark jobs. So overall it would cost 3 jobs. Here's the command I use: val df = sqlContext.read.json(file:///Users/hadoop/github/spark/examples/src/main/resources/people.json) // trigger one spark job to infer schema df.show()// trigger 2 spark jobs which is weird On Mon, Aug 24, 2015 at 10:56 PM, Cheng, Hao hao.ch...@intel.com wrote: The first job is to infer the json schema, and the second one is what you mean of the query. You can provide the schema while loading the json file, like below: sqlContext.read.schema(xxx).json(“…”)? Hao *From:* Jeff Zhang [mailto:zjf...@gmail.com] *Sent:* Monday, August 24, 2015 6:20 PM *To:* user@spark.apache.org *Subject:* DataFrame#show cost 2 Spark Jobs ? It's weird to me that the simple show function will cost 2 spark jobs. DataFrame#explain shows it is a very simple operation, not sure why need 2 jobs. == Parsed Logical Plan == Relation[age#0L,name#1] JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json] == Analyzed Logical Plan == age: bigint, name: string Relation[age#0L,name#1] JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json] == Optimized Logical Plan == Relation[age#0L,name#1] JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json] == Physical Plan == Scan JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json][age#0L,name#1] -- Best Regards Jeff Zhang -- Best Regards Jeff Zhang
Re: Transformation not happening for reduceByKey or GroupByKey
Hi Satish, I don't see where spark support -i, so suspect it is provided by DSE. In that case, it might be bug of DSE. On Fri, Aug 21, 2015 at 6:02 PM, satish chandra j jsatishchan...@gmail.com wrote: HI Robin, Yes, it is DSE but issue is related to Spark only Regards, Satish Chandra On Fri, Aug 21, 2015 at 3:06 PM, Robin East robin.e...@xense.co.uk wrote: Not sure, never used dse - it’s part of DataStax Enterprise right? On 21 Aug 2015, at 10:07, satish chandra j jsatishchan...@gmail.com wrote: HI Robin, Yes, below mentioned piece or code works fine in Spark Shell but the same when place in Script File and executed with -i file name it creating an empty RDD scala val pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40))) pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[77] at makeRDD at console:28 scala pairs.reduceByKey((x,y) = x + y).collect res43: Array[(Int, Int)] = Array((0,3), (1,50), (2,40)) Command: dse spark --master local --jars postgresql-9.4-1201.jar -i ScriptFile I understand, I am missing something here due to which my final RDD does not have as required output Regards, Satish Chandra On Thu, Aug 20, 2015 at 8:23 PM, Robin East robin.e...@xense.co.uk wrote: This works for me: scala val pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40))) pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[77] at makeRDD at console:28 scala pairs.reduceByKey((x,y) = x + y).collect res43: Array[(Int, Int)] = Array((0,3), (1,50), (2,40)) On 20 Aug 2015, at 11:05, satish chandra j jsatishchan...@gmail.com wrote: HI All, I have data in RDD as mentioned below: RDD : Array[(Int),(Int)] = Array((0,1), (0,2),(1,20),(1,30),(2,40)) I am expecting output as Array((0,3),(1,50),(2,40)) just a sum function on Values for each key Code: RDD.reduceByKey((x,y) = x+y) RDD.take(3) Result in console: RDD: org.apache.spark.rdd.RDD[(Int,Int)]= ShuffledRDD[1] at reduceByKey at console:73 res:Array[(Int,Int)] = Array() Command as mentioned dse spark --master local --jars postgresql-9.4-1201.jar -i ScriptFile Please let me know what is missing in my code, as my resultant Array is empty Regards, Satish -- Best Regards Jeff Zhang
Re: SparkPi is geting java.lang.NoClassDefFoundError: scala/collection/Seq
Check module example's dependency (right click examples and click Open Modules Settings), by default scala-library is provided, you need to change it to compile to run SparkPi in Intellij. As I remember, you also need to change guava and jetty related library to compile too. On Mon, Aug 17, 2015 at 2:14 AM, xiaohe lan zombiexco...@gmail.com wrote: Hi, I am trying to run SparkPi in Intellij and getting NoClassDefFoundError. Anyone else saw this issue before ? Exception in thread main java.lang.NoClassDefFoundError: scala/collection/Seq at org.apache.spark.examples.SparkPi.main(SparkPi.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) Caused by: java.lang.ClassNotFoundException: scala.collection.Seq at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 6 more Process finished with exit code 1 Thanks, Xiaohe -- Best Regards Jeff Zhang
Re: Spark Master HA on YARN
To make it clear, Spark Standalone is similar to Yarn as a simple cluster management system. Spark Master --- Yarn Resource Manager Spark Worker --- Yarn Node Manager On Mon, Aug 17, 2015 at 4:59 AM, Ruslan Dautkhanov dautkha...@gmail.com wrote: There is no Spark master in YARN mode. It's standalone mode terminology. In YARN cluster mode, Spark's Application Master (Spark Driver runs in it) will be restarted automatically by RM up to yarn.resourcemanager.am.max-retries times (default is 2). -- Ruslan Dautkhanov On Fri, Jul 17, 2015 at 1:29 AM, Bhaskar Dutta bhas...@gmail.com wrote: Hi, Is Spark master high availability supported on YARN (yarn-client mode) analogous to https://spark.apache.org/docs/1.4.0/spark-standalone.html#high-availability ? Thanks Bhaskie -- Best Regards Jeff Zhang
Re: Always two tasks slower than others, and then job fails
Data skew ? May your partition key has some special value like null or empty string On Fri, Aug 14, 2015 at 11:01 AM, randylu randyl...@gmail.com wrote: It is strange that there are always two tasks slower than others, and the corresponding partitions's data are larger, no matter how many partitions? Executor ID Address Task Time Shuffle Read Size / Records 1 slave129.vsvs.com:56691 16 s1 99.5 MB / 18865432 *10 slave317.vsvs.com:59281 0 ms0 413.5 MB / 311001318* 100 slave290.vsvs.com:60241 19 s1 110.8 MB / 27075926 101 slave323.vsvs.com:36246 14 s1 126.1 MB / 25052808 Task time and records of Executor 10 seems strange, and the cpus on the node are all 100% busy. Anyone meets the same problem, Thanks in advance for any answer! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Always-two-tasks-slower-than-others-and-then-job-fails-tp24257.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Best Regards Jeff Zhang
Re: Job is Failing automatically
15/08/11 12:59:34 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 3.0 (TID 71, sdldalplhdw02.suddenlink.cequel3.com): java.lang.NullPointerException at com.suddenlink.pnm.process.HBaseStoreHelper.flush(HBaseStoreHelper.java:313) It's your app error. NPE from HBaseStoreHelper On Wed, Aug 12, 2015 at 5:12 AM, Nikhil Gs gsnikhil1432...@gmail.com wrote: Hello Team, I am facing an error which I have pasted below. My job is failing when I am copying my data files into flume spool directory. Most of the time the job is getting failed. Dont know why.. Facing this issue several times. Also, for your reference I have attached the complete Yarn log file. Please suggest me whats the issue. Thanks in advance. 15/08/11 12:59:30 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in memory on sdldalplhdw02.suddenlink.cequel3.com:35668 (size: 2.1 KB, free: 1059.7 MB) 15/08/11 12:59:31 INFO storage.BlockManagerInfo: Added rdd_5_0 in memory on sdldalplhdw02.suddenlink.cequel3.com:35668 (size: 1693.6 KB, free: 1058.0 MB) 15/08/11 12:59:32 INFO storage.BlockManagerInfo: Added rdd_7_0 in memory on sdldalplhdw02.suddenlink.cequel3.com:35668 (size: 1697.6 KB, free: 1056.4 MB) 15/08/11 12:59:34 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 3.0 (TID 71, sdldalplhdw02.suddenlink.cequel3.com): java.lang.NullPointerException at com.suddenlink.pnm.process.HBaseStoreHelper.flush(HBaseStoreHelper.java:313) at com.suddenlink.pnm.process.StoreNodeInHBase$1.call(StoreNodeInHBase.java:57) at com.suddenlink.pnm.process.StoreNodeInHBase$1.call(StoreNodeInHBase.java:31) at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreach$1.apply(JavaRDDLike.scala:304) at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreach$1.apply(JavaRDDLike.scala:304) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28) at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:798) at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:798) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1503) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1503) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 15/08/11 12:59:34 INFO scheduler.TaskSetManager: Starting task 0.1 in stage 3.0 (TID 72, sdldalplhdw02.suddenlink.cequel3.com, NODE_LOCAL, 1179 bytes) 15/08/11 12:59:34 INFO scheduler.TaskSetManager: Lost task 0.1 in stage 3.0 (TID 72) on executor sdldalplhdw02.suddenlink.cequel3.com: java.lang.NullPointerException (null) [duplicate 1] 15/08/11 12:59:34 INFO scheduler.TaskSetManager: Starting task 0.2 in stage 3.0 (TID 73, sdldalplhdw02.suddenlink.cequel3.com, NODE_LOCAL, 1179 bytes) 15/08/11 12:59:34 INFO scheduler.TaskSetManager: Lost task 0.2 in stage 3.0 (TID 73) on executor sdldalplhdw02.suddenlink.cequel3.com: java.lang.NullPointerException (null) [duplicate 2] 15/08/11 12:59:34 INFO scheduler.TaskSetManager: Starting task 0.3 in stage 3.0 (TID 74, sdldalplhdw02.suddenlink.cequel3.com, NODE_LOCAL, 1179 bytes) 15/08/11 12:59:34 INFO scheduler.TaskSetManager: Lost task 0.3 in stage 3.0 (TID 74) on executor sdldalplhdw02.suddenlink.cequel3.com: java.lang.NullPointerException (null) [duplicate 3] 15/08/11 12:59:34 ERROR scheduler.TaskSetManager: Task 0 in stage 3.0 failed 4 times; aborting job 15/08/11 12:59:34 INFO cluster.YarnClusterScheduler: Removed TaskSet 3.0, whose tasks have all completed, from pool 15/08/11 12:59:34 INFO cluster.YarnClusterScheduler: Cancelling stage 3 15/08/11 12:59:34 INFO scheduler.DAGScheduler: Job 2 failed: foreachRDD at NodeProcessor.java:101, took 4.750491 s 15/08/11 12:59:34 ERROR scheduler.JobScheduler: Error running job streaming job 143931597 ms.0 org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 4 times, most recent failure: Lost task 0.3 in stage 3.0 (TID 74, sdldalplhdw02.suddenlink.cequel3.com): java.lang.NullPointerException Regards, Nik. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Best Regards Jeff Zhang
Re: Spark Job Hangs on our production cluster
(SelectorImpl.java:87) - locked 0x00067bf47710 (a io.netty.channel.nio.SelectedSelectionKeySet) - locked 0x00067bf374e8 (a java.util.Collections$UnmodifiableSet) - locked 0x00067bf373d0 (a sun.nio.ch.EPollSelectorImpl) at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98) at io.netty.channel.nio.NioEventLoop.select(NioEventLoop.java:622) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:310) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) at java.lang.Thread.run(Thread.java:745) Meantime, I can confirm our Hadoop/HDFS cluster works fine, as the MapReduce jobs also run without any problem, and Hadoop fs command works fine in the BigInsight. I attached the jstack output with this email, but I don't know what could be the root reason. The same Spark shell command works fine, if I point to the small dataset, instead of big dataset. The small dataset will have around 800 HDFS blocks, and Spark finishes without any problem. Here are some facts I know: 1) Since the BigInsight is running on IBM JDK, so I make the Spark run under the same JDK, same problem for BigData set. 2) I even changed --total-executor-cores to 42, which will make each executor runs with one core (as we have 42 Spark workers), to avoid any multithreads, but still no luck. 3) This problem of scanning 1T data hanging is NOT 100% for sure happening. Sometime I didn't see it, but more than 50% I will see it if I try. 4) We never met this issue on our stage cluster, but it has only (1 namenode + 1 jobtracker + 3 data/task nodes), and the same dataset is only 160G on it. 5) While the Spark java processing hanging, I didn't see any exception or issue on the HDFS data node log. Does anyone have any clue about this? Thanks Yong - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Best Regards Jeff Zhang
Re: How to control Spark Executors from getting Lost when using YARN client mode?
Please check the node manager logs to see why the container is killed. On Mon, Aug 3, 2015 at 11:59 PM, Umesh Kacha umesh.ka...@gmail.com wrote: Hi all any help will be much appreciated my spark job runs fine but in the middle it starts loosing executors because of netafetchfailed exception saying shuffle not found at the location since executor is lost On Jul 31, 2015 11:41 PM, Umesh Kacha umesh.ka...@gmail.com wrote: Hi thanks for the response. It looks like YARN container is getting killed but dont know why I see shuffle metafetchexception as mentioned in the following SO link. I have enough memory 8 nodes 8 cores 30 gig memory each. And because of this metafetchexpcetion YARN killing container running executor how can it over run memory I tried to give each executor 25 gig still it is not sufficient and it fails. Please guide I dont understand what is going on I am using Spark 1.4.0 I am using spark.shuffle.memory as 0.0 and spark.storage.memory as 0.5. I have almost all optimal properties like Kyro serializer I have kept 500 akka frame size 20 akka threads dont know I am trapped its been two days I am trying to recover from this issue. http://stackoverflow.com/questions/29850784/what-are-the-likely-causes-of-org-apache-spark-shuffle-metadatafetchfailedexcept On Thu, Jul 30, 2015 at 9:56 PM, Ashwin Giridharan ashwin.fo...@gmail.com wrote: What is your cluster configuration ( size and resources) ? If you do not have enough resources, then your executor will not run. Moreover allocating 8 cores to an executor is too much. If you have a cluster with four nodes running NodeManagers, each equipped with 4 cores and 8GB of memory, then an optimal configuration would be, --num-executors 8 --executor-cores 2 --executor-memory 2G Thanks, Ashwin On Thu, Jul 30, 2015 at 12:08 PM, unk1102 umesh.ka...@gmail.com wrote: Hi I have one Spark job which runs fine locally with less data but when I schedule it on YARN to execute I keep on getting the following ERROR and slowly all executors gets removed from UI and my job fails 15/07/30 10:18:13 ERROR cluster.YarnScheduler: Lost executor 8 on myhost1.com: remote Rpc client disassociated 15/07/30 10:18:13 ERROR cluster.YarnScheduler: Lost executor 6 on myhost2.com: remote Rpc client disassociated I use the following command to schedule spark job in yarn-client mode ./spark-submit --class com.xyz.MySpark --conf spark.executor.extraJavaOptions=-XX:MaxPermSize=512M --driver-java-options -XX:MaxPermSize=512m --driver-memory 3g --master yarn-client --executor-memory 2G --executor-cores 8 --num-executors 12 /home/myuser/myspark-1.0.jar I dont know what is the problem please guide. I am new to Spark. Thanks in advance. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-control-Spark-Executors-from-getting-Lost-when-using-YARN-client-mode-tp24084.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Thanks Regards, Ashwin Giridharan -- Best Regards Jeff Zhang
Re: Spark on YARN
:59596 15/07/30 12:13:34 INFO util.Utils: Successfully started service 'SparkUI' on port 59596. 15/07/30 12:13:34 INFO ui.SparkUI: Started SparkUI at http://10.21.1.77:59596 15/07/30 12:13:34 INFO cluster.YarnClusterScheduler: Created YarnClusterScheduler 15/07/30 12:13:34 INFO util.Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 43354. 15/07/30 12:13:34 INFO netty.NettyBlockTransferService: Server created on 43354 15/07/30 12:13:34 INFO storage.BlockManagerMaster: Trying to register BlockManager 15/07/30 12:13:34 INFO storage.BlockManagerMasterEndpoint: Registering block manager 10.21.1.77:43354 with 246.0 MB RAM, BlockManagerId(driver, 10.21.1.77, 43354) 15/07/30 12:13:34 INFO storage.BlockManagerMaster: Registered BlockManager 15/07/30 12:13:34 INFO cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: ApplicationMaster registered as AkkaRpcEndpointRef(Actor[akka://sparkDriver/user/YarnAM#-603094240]) 15/07/30 12:13:34 INFO client.RMProxy: Connecting to ResourceManager at hadoop-1/10.21.1.77:8030 15/07/30 12:13:34 INFO yarn.YarnRMClient: Registering the ApplicationMaster 15/07/30 12:13:34 INFO yarn.YarnAllocator: Will request 2 executor containers, each with 1 cores and 1408 MB memory including 384 MB overhead 15/07/30 12:13:34 INFO yarn.YarnAllocator: Container request (host: Any, capability: memory:1408, vCores:1) 15/07/30 12:13:34 INFO yarn.YarnAllocator: Container request (host: Any, capability: memory:1408, vCores:1) 15/07/30 12:13:35 INFO yarn.ApplicationMaster: Started progress reporter thread - sleep time : 5000 15/07/30 12:13:35 ERROR yarn.ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM 15/07/30 12:13:35 INFO yarn.ApplicationMaster: Final app status: SUCCEEDED, exitCode: 0, (reason: Shutdown hook called before final status was reported.) 15/07/30 12:13:35 INFO yarn.ApplicationMaster: Unregistering ApplicationMaster with SUCCEEDED (diag message: Shutdown hook called before final status was reported.) 15/07/30 12:13:35 INFO impl.AMRMClientImpl: Waiting for application to be successfully unregistered. 15/07/30 12:13:35 INFO yarn.ApplicationMaster: Deleting staging directory .sparkStaging/application_1438090734187_0010 15/07/30 12:13:35 INFO storage.DiskBlockManager: Shutdown hook called 15/07/30 12:13:35 INFO util.Utils: Shutdown hook called 15/07/30 12:13:35 INFO util.Utils: Deleting directory /home/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1438090734187_0010/userFiles-337c9be5-569f-43ff-ba1f-ec24daab9ea5 15/07/30 12:13:35 INFO util.Utils: Deleting directory /home/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1438090734187_0010/httpd-d1232310-5aa1-44e7-a99a-cc2ae614f89c -- Best Regards Jeff Zhang
Re: help plz! how to use zipWithIndex to each subset of a RDD
This may be what you want val conf = new SparkConf().setMaster(local).setAppName(test) val sc = new SparkContext(conf) val inputRdd = sc.parallelize(Array((key_1, a), (key_1,b), (key_2,c), (key_2, d))) val result = inputRdd.groupByKey().flatMap(e={ val key= e._1 val valuesWithIndex = e._2.zipWithIndex valuesWithIndex.map(value = (key, value._2, value._1)) }) result.collect() foreach println /// output *(key_2,0,c) (key_2,1,d) (key_1,0,a) (key_1,1,b)* On Thu, Jul 30, 2015 at 10:19 AM, ayan guha guha.a...@gmail.com wrote: Is there a relationship between data and index? I.e with a,b,c to 1,2,3? On 30 Jul 2015 12:13, askformore askf0rm...@163.com wrote: I have some data like this: RDD[(String, String)] = ((*key-1*, a), ( *key-1*,b), (*key-2*,a), (*key-2*,c),(*key-3*,b),(*key-4*,d)) and I want to group the data by Key, and for each group, add index fields to the groupmember, at last I can transform the data to below : RDD[(String, *Int*, String)] = ((key-1,*1*, a), (key-1,*2,*b), (key-2,*1*,a), (key-2, *2*,b),(key-3,*1*,b),(key-4,*1*,d)) I tried to groupByKey firstly, then I got a RDD[(String, Iterable[String])], but I don't know how to use zipWithIndex function to each Iterable... thanks. -- View this message in context: help plz! how to use zipWithIndex to each subset of a RDD http://apache-spark-user-list.1001560.n3.nabble.com/help-plz-how-to-use-zipWithIndex-to-each-subset-of-a-RDD-tp24071.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com. -- Best Regards Jeff Zhang
Re: Console log file of CoarseGrainedExecutorBackend
By default it is in ${SPARK_HOME}/work/${APP_ID}/${EXECUTOR_ID} On Thu, Jul 16, 2015 at 3:43 PM, Tao Lu taolu2...@gmail.com wrote: Hi, Guys, Where can I find the console log file of CoarseGrainedExecutorBackend process? Thanks! Tao -- Best Regards Jeff Zhang
Re: The auxService:spark_shuffle does not exist
Did you enable the dynamic resource allocation ? You can refer to this page for how to configure spark shuffle service for yarn. https://spark.apache.org/docs/1.4.0/job-scheduling.html On Tue, Jul 7, 2015 at 10:55 PM, roy rp...@njit.edu wrote: we tried --master yarn-client with no different result. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/The-auxService-spark-shuffle-does-not-exist-tp23662p23689.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Best Regards Jeff Zhang
Re: Why does driver transfer application jar to executors?
TaskDescription only serialize the jar path not the jar content. Multiple tasks can run on the same executor. Executor will check whether the jar has been fetched when each time task is launched. If so, it won't fetch it again. Only serialize the jar path can prevent serialize jar multiple times which is inefficient. On Thu, Jun 18, 2015 at 10:48 AM, Shiyao Ma i...@introo.me wrote: Hi, Looking from my executor logs, the submitted application jar is transmitted to each executors? Why does spark do the above? To my understanding, the tasks to be run are already serialized with TaskDescription. Regards. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Best Regards Jeff Zhang
Re: Filter operation to return two RDDs at once.
As far as I know, spark don't support multiple outputs On Wed, Jun 3, 2015 at 2:15 PM, ayan guha guha.a...@gmail.com wrote: Why do you need to do that if filter and content of the resulting rdd are exactly same? You may as well declare them as 1 RDD. On 3 Jun 2015 15:28, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I want to do this val qtSessionsWithQt = rawQtSession.filter(_._2.qualifiedTreatmentId != NULL_VALUE) val guidUidMapSessions = rawQtSession.filter(_._2. qualifiedTreatmentId == NULL_VALUE) This will run two different stages can this be done in one stage ? val (qtSessionsWithQt, guidUidMapSessions) = rawQtSession. *magicFilter*(_._2.qualifiedTreatmentId != NULL_VALUE) -- Deepak -- Best Regards Jeff Zhang
Re: Filter operation to return two RDDs at once.
I check the RDD#randSplit, it is much more like multiple one-to-one transformation rather than a one-to-multiple transformation. I write one sample code as following, it would generate 3 stages. Although we can use cache here to make it better, If spark can support multiple outputs, only 2 stages are needed. ( This would be useful for pig's multiple query and hive's self join ) val data = sc.textFile(/Users/jzhang/a.log).flatMap(line=line.split(\\s)).map(w=(w,1)) val parts = data.randomSplit(Array(0.2,0.8)) val joinResult = parts(0).join(parts(1)) println(joinResult.toDebugString) (1) MapPartitionsRDD[8] at join at WordCount.scala:22 [] | MapPartitionsRDD[7] at join at WordCount.scala:22 [] | CoGroupedRDD[6] at join at WordCount.scala:22 [] +-(1) PartitionwiseSampledRDD[4] at randomSplit at WordCount.scala:21 [] | | MapPartitionsRDD[3] at map at WordCount.scala:20 [] | | MapPartitionsRDD[2] at flatMap at WordCount.scala:20 [] | | /Users/jzhang/a.log MapPartitionsRDD[1] at textFile at WordCount.scala:20 [] | | /Users/jzhang/a.log HadoopRDD[0] at textFile at WordCount.scala:20 [] +-(1) PartitionwiseSampledRDD[5] at randomSplit at WordCount.scala:21 [] | MapPartitionsRDD[3] at map at WordCount.scala:20 [] | MapPartitionsRDD[2] at flatMap at WordCount.scala:20 [] | /Users/jzhang/a.log MapPartitionsRDD[1] at textFile at WordCount.scala:20 [] | /Users/jzhang/a.log HadoopRDD[0] at textFile at WordCount.scala:20 [] On Wed, Jun 3, 2015 at 2:45 PM, Sean Owen so...@cloudera.com wrote: In the sense here, Spark actually does have operations that make multiple RDDs like randomSplit. However there is not an equivalent of the partition operation which gives the elements that matched and did not match at once. On Wed, Jun 3, 2015, 8:32 AM Jeff Zhang zjf...@gmail.com wrote: As far as I know, spark don't support multiple outputs On Wed, Jun 3, 2015 at 2:15 PM, ayan guha guha.a...@gmail.com wrote: Why do you need to do that if filter and content of the resulting rdd are exactly same? You may as well declare them as 1 RDD. On 3 Jun 2015 15:28, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I want to do this val qtSessionsWithQt = rawQtSession.filter(_._2. qualifiedTreatmentId != NULL_VALUE) val guidUidMapSessions = rawQtSession.filter(_._2. qualifiedTreatmentId == NULL_VALUE) This will run two different stages can this be done in one stage ? val (qtSessionsWithQt, guidUidMapSessions) = rawQtSession. *magicFilter*(_._2.qualifiedTreatmentId != NULL_VALUE) -- Deepak -- Best Regards Jeff Zhang -- Best Regards Jeff Zhang
Re: ERROR cluster.YarnScheduler: Lost executor
node down or container preempted ? You need to check the executor log / node manager log for more info. On Wed, Jun 3, 2015 at 2:31 PM, patcharee patcharee.thong...@uni.no wrote: Hi, What can be the cause of this ERROR cluster.YarnScheduler: Lost executor? How can I fix it? Best, Patcharee - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Best Regards Jeff Zhang
No overwrite flag for saveAsXXFile
Hi folks, I found that RDD:saveXXFile has no overwrite flag which I think is very helpful. Is there any reason for this ? -- Best Regards Jeff Zhang
Re: Is the RDD's Partitions determined before hand ?
Hi Sean, If you know a stage needs unusually high parallelism for example you can repartition further for that stage. The problem is we may don't know whether high parallelism is needed. e.g. for the join operator, high parallelism may only be necessary for some dataset that lots of data can join together while for other dataset high parallelism may not be necessary if only a few data can join together. So my question is that unable changing parallelism at runtime dynamically may not be flexible. On Wed, Mar 4, 2015 at 5:36 PM, Sean Owen so...@cloudera.com wrote: Hm, what do you mean? You can control, to some extent, the number of partitions when you read the data, and can repartition if needed. You can set the default parallelism too so that it takes effect for most ops thay create an RDD. One # of partitions is usually about right for all work (2x or so the number of execution slots). If you know a stage needs unusually high parallelism for example you can repartition further for that stage. On Mar 4, 2015 1:50 AM, Jeff Zhang zjf...@gmail.com wrote: Thanks Sean. But if the partitions of RDD is determined before hand, it would not be flexible to run the same program on the different dataset. Although for the first stage the partitions can be determined by the input data set, for the intermediate stage it is not possible. Users have to create policy to repartition or coalesce based on the data set size. On Tue, Mar 3, 2015 at 6:29 PM, Sean Owen so...@cloudera.com wrote: An RDD has a certain fixed number of partitions, yes. You can't change an RDD. You can repartition() or coalese() and RDD to make a new one with a different number of RDDs, possibly requiring a shuffle. On Tue, Mar 3, 2015 at 10:21 AM, Jeff Zhang zjf...@gmail.com wrote: I mean is it possible to change the partition number at runtime. Thanks -- Best Regards Jeff Zhang -- Best Regards Jeff Zhang -- Best Regards Jeff Zhang
Re: Is the RDD's Partitions determined before hand ?
Thanks Sean. But if the partitions of RDD is determined before hand, it would not be flexible to run the same program on the different dataset. Although for the first stage the partitions can be determined by the input data set, for the intermediate stage it is not possible. Users have to create policy to repartition or coalesce based on the data set size. On Tue, Mar 3, 2015 at 6:29 PM, Sean Owen so...@cloudera.com wrote: An RDD has a certain fixed number of partitions, yes. You can't change an RDD. You can repartition() or coalese() and RDD to make a new one with a different number of RDDs, possibly requiring a shuffle. On Tue, Mar 3, 2015 at 10:21 AM, Jeff Zhang zjf...@gmail.com wrote: I mean is it possible to change the partition number at runtime. Thanks -- Best Regards Jeff Zhang -- Best Regards Jeff Zhang