Spark launching without all of the requested YARN resources
Sometimes if my Hortonworks yarn-enabled cluster is fairly busy, Spark (via spark-submit) will begin its processing even though it apparently did not get all of the requested resources; it is running very slowly. Is there a way to force Spark/YARN to only begin when it has the full set of resources that I request? Thanks, Arun
Re: [Spark Streaming] Runtime Error in call to max function for JavaPairRDD
Thanks, will try this out and get back... On Tue, Jun 23, 2015 at 2:30 AM, Tathagata Das t...@databricks.com wrote: Try adding the provided scopes dependency !-- Spark dependency -- groupIdorg.apache.spark/groupId artifactIdspark-core_2.10/artifactId version1.4.0/version *scopeprovided/scope */dependency dependency !-- Spark Streaming dependency -- groupIdorg.apache.spark/groupId artifactIdspark-streaming_2.10/artifactId version1.4.0/version *scopeprovided/scope */dependency This prevents these artifacts from being included in the assembly JARs. See scope https://maven.apache.org/guides/introduction/introduction-to-dependency-mechanism.html#Dependency_Scope On Mon, Jun 22, 2015 at 10:28 AM, Nipun Arora nipunarora2...@gmail.com wrote: Hi Tathagata, I am attaching a snapshot of my pom.xml. It would help immensely, if I can include max, and min values in my mapper phase. The question is still open at : http://stackoverflow.com/questions/30902090/adding-max-and-min-in-spark-stream-in-java/30909796#30909796 I see that there is a bug report filed for a similar error as well: https://issues.apache.org/jira/browse/SPARK-3266 Please let me know, how I can get the same version of spark streaming in my assembly. I am using the following spark version: http://www.apache.org/dyn/closer.cgi/spark/spark-1.4.0/spark-1.4.0-bin-hadoop2.6.tgz .. no compilation, just an untar and use the spark-submit script in a local install. I still get the same error. Exception in thread JobGenerator java.lang.NoSuchMethodError: org.apache.spark.api.java.JavaPairRDD.max(Ljava/util/Comparator;)Lscala/Tuple2; dependencies dependency !-- Spark dependency -- groupIdorg.apache.spark/groupId artifactIdspark-core_2.10/artifactId version1.4.0/version /dependency dependency !-- Spark Streaming dependency -- groupIdorg.apache.spark/groupId artifactIdspark-streaming_2.10/artifactId version1.4.0/version /dependency Thanks Nipun On Thu, Jun 18, 2015 at 11:16 PM, Nipun Arora nipunarora2...@gmail.com wrote: Hi Tathagata, When you say please mark spark-core and spark-streaming as dependencies how do you mean? I have installed the pre-build spark-1.4 for Hadoop 2.6 from spark downloads. In my maven pom.xml, I am using version 1.4 as described. Please let me know how I can fix that? Thanks Nipun On Thu, Jun 18, 2015 at 4:22 PM, Tathagata Das t...@databricks.com wrote: I think you may be including a different version of Spark Streaming in your assembly. Please mark spark-core nd spark-streaming as provided dependencies. Any installation of Spark will automatically provide Spark in the classpath so you do not have to bundle it. On Thu, Jun 18, 2015 at 8:44 AM, Nipun Arora nipunarora2...@gmail.com wrote: Hi, I have the following piece of code, where I am trying to transform a spark stream and add min and max to it of eachRDD. However, I get an error saying max call does not exist, at run-time (compiles properly). I am using spark-1.4 I have added the question to stackoverflow as well: http://stackoverflow.com/questions/30902090/adding-max-and-min-in-spark-stream-in-java/30909796#30909796 Any help is greatly appreciated :) Thanks Nipun JavaPairDStreamTuple2Long, Integer, Tuple3Integer,Long,Long sortedtsStream = transformedMaxMintsStream.transformToPair(new Sort2()); sortedtsStream.foreach( new FunctionJavaPairRDDTuple2Long, Integer, Tuple3Integer, Long, Long, Void() { @Override public Void call(JavaPairRDDTuple2Long, Integer, Tuple3Integer, Long, Long tuple2Tuple3JavaPairRDD) throws Exception { ListTuple2Tuple2Long, Integer, Tuple3Integer,Long,Long templist = tuple2Tuple3JavaPairRDD.collect(); for(Tuple2Tuple2Long,Integer, Tuple3Integer,Long,Long tuple :templist){ Date date = new Date(tuple._1._1); int pattern = tuple._1._2; int count = tuple._2._1(); Date maxDate = new Date(tuple._2._2()); Date minDate = new Date(tuple._2._2()); System.out.println(TimeSlot: + date.toString() + Pattern: + pattern + Count: + count + Max: + maxDate.toString() + Min: + minDate.toString()); } return null; } } ); Error: 15/06/18 11:05:06 INFO BlockManagerInfo: Added input-0-1434639906000 in memory on localhost:42829 (size: 464.0 KB, free: 264.9 MB)15/06/18 11:05:06 INFO BlockGenerator: Pushed block input-0-1434639906000Exception in thread JobGenerator java.lang.NoSuchMethodError: org.apache.spark.api.java.JavaPairRDD.max(Ljava/util/Comparator;)Lscala/Tuple2; at
Re: [Spark Streaming] Null Pointer Exception when accessing broadcast variable to store a hashmap in Java
Are you using checkpointing? I had a similar issue when recreating a streaming context from checkpoint as broadcast variables are not checkpointed. On 23 Jun 2015 5:01 pm, Nipun Arora nipunarora2...@gmail.com wrote: Hi, I have a spark streaming application where I need to access a model saved in a HashMap. I have *no problems in running the same code with broadcast variables in the local installation.* However I get a *null pointer* *exception* when I deploy it on my spark test cluster. I have stored a model in a HashMapString, FieldModel which is serializable. I use a broadcast variables declared as a global static variable to broadcast this hashmap: public static BroadcastHashMapString,FieldModel br; HashMapString,FieldModel hm = checkerObj.getModel(esserver, type); br = ssc.sparkContext().broadcast(hm); I need to access this model in my mapper phase, and do some operation based on the checkup. The following is a snippet of how I access the broadcast variable. JavaDStreamTuple3Long,Double,String split = matched.map(new GenerateType2Scores()); class GenerateType2Scores implements FunctionString, Tuple3Long, Double, String { @Override public Tuple3Long, Double, String call(String s) throws Exception{ Long time = Type2ViolationChecker.getMTS(s); HashMapString,FieldModel temphm= Type2ViolationChecker.br.value(); Double score = Type2ViolationChecker.getAnomalyScore(temphm,s); return new Tuple3Long, Double, String(time,score, s);} } The temphm should refer to the hashmap stored in the broadcast variable. Can anyone help me understand what is the correct way to access broadcast variables in JAVA? Thanks Nipun
Re: [Spark Streaming] Null Pointer Exception when accessing broadcast variable to store a hashmap in Java
btw. just for reference I have added the code in a gist: https://gist.github.com/nipunarora/ed987e45028250248edc and a stackoverflow reference here: http://stackoverflow.com/questions/31006490/broadcast-variable-null-pointer-exception-in-spark-streaming On Tue, Jun 23, 2015 at 11:01 AM, Nipun Arora nipunarora2...@gmail.com wrote: Hi, I have a spark streaming application where I need to access a model saved in a HashMap. I have *no problems in running the same code with broadcast variables in the local installation.* However I get a *null pointer* *exception* when I deploy it on my spark test cluster. I have stored a model in a HashMapString, FieldModel which is serializable. I use a broadcast variables declared as a global static variable to broadcast this hashmap: public static BroadcastHashMapString,FieldModel br; HashMapString,FieldModel hm = checkerObj.getModel(esserver, type); br = ssc.sparkContext().broadcast(hm); I need to access this model in my mapper phase, and do some operation based on the checkup. The following is a snippet of how I access the broadcast variable. JavaDStreamTuple3Long,Double,String split = matched.map(new GenerateType2Scores()); class GenerateType2Scores implements FunctionString, Tuple3Long, Double, String { @Override public Tuple3Long, Double, String call(String s) throws Exception{ Long time = Type2ViolationChecker.getMTS(s); HashMapString,FieldModel temphm= Type2ViolationChecker.br.value(); Double score = Type2ViolationChecker.getAnomalyScore(temphm,s); return new Tuple3Long, Double, String(time,score, s);} } The temphm should refer to the hashmap stored in the broadcast variable. Can anyone help me understand what is the correct way to access broadcast variables in JAVA? Thanks Nipun
Re: workaround for groupByKey
It all depends on what it is you need to do with the pages. If you’re just going to be collecting them then it’s really not much different than a groupByKey. If instead you’re looking to derive some other value from the series of pages then you could potentially partition by user id and run a mapPartitions or one of the other combineByKey APIs? From: Jianguo Li Date: Tuesday, June 23, 2015 at 9:46 AM To: Silvio Fiorito Cc: user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: workaround for groupByKey Thanks. Yes, unfortunately, they all need to be grouped. I guess I can partition the record by user id. However, I have millions of users, do you think partition by user id will help? Jianguo On Mon, Jun 22, 2015 at 6:28 PM, Silvio Fiorito silvio.fior...@granturing.commailto:silvio.fior...@granturing.com wrote: You’re right of course, I’m sorry. I was typing before thinking about what you actually asked! On a second thought, what is the ultimate outcome for what you want the sequence of pages for? Do they need to actually all be grouped? Could you instead partition by user id then use a mapPartitions perhaps? From: Jianguo Li Date: Monday, June 22, 2015 at 6:21 PM To: Silvio Fiorito Cc: user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: workaround for groupByKey Thanks for your suggestion. I guess aggregateByKey is similar to combineByKey. I read in the Learning Sparking We can disable map-side aggregation in combineByKey() if we know that our data won’t benefit from it. For example, groupByKey() disables map-side aggregation as the aggregation function (appending to a list) does not save any space. If we want to disable map-side combines, we need to specify the partitioner; for now you can just use the partitioner on the source RDD by passingrdd.partitioner It seems that when the map-side aggregation function is to append something to a list (as opposed to summing over all the numbers), then this map-side aggregation does not offer any benefit since appending to a list does not save any space. Is my understanding correct? Thanks, Jianguo On Mon, Jun 22, 2015 at 4:43 PM, Silvio Fiorito silvio.fior...@granturing.commailto:silvio.fior...@granturing.com wrote: You can use aggregateByKey as one option: val input: RDD[Int, String] = ... val test = input.aggregateByKey(ListBuffer.empty[String])((a, b) = a += b, (a, b) = a ++ b) From: Jianguo Li Date: Monday, June 22, 2015 at 5:12 PM To: user@spark.apache.orgmailto:user@spark.apache.org Subject: workaround for groupByKey Hi, I am processing an RDD of key-value pairs. The key is an user_id, and the value is an website url the user has ever visited. Since I need to know all the urls each user has visited, I am tempted to call the groupByKey on this RDD. However, since there could be millions of users and urls, the shuffling caused by groupByKey proves to be a major bottleneck to get the job done. Is there any workaround? I want to end up with an RDD of key-value pairs, where the key is an user_id, the value is a list of all the urls visited by the user. Thanks, Jianguo
Re: [Spark Streaming] Null Pointer Exception when accessing broadcast variable to store a hashmap in Java
I don't think I have explicitly check-pointed anywhere. Unless it's internal in some interface, I don't believe the application is checkpointed. Thanks for the suggestion though.. Nipun On Tue, Jun 23, 2015 at 11:05 AM, Benjamin Fradet benjamin.fra...@gmail.com wrote: Are you using checkpointing? I had a similar issue when recreating a streaming context from checkpoint as broadcast variables are not checkpointed. On 23 Jun 2015 5:01 pm, Nipun Arora nipunarora2...@gmail.com wrote: Hi, I have a spark streaming application where I need to access a model saved in a HashMap. I have *no problems in running the same code with broadcast variables in the local installation.* However I get a *null pointer* *exception* when I deploy it on my spark test cluster. I have stored a model in a HashMapString, FieldModel which is serializable. I use a broadcast variables declared as a global static variable to broadcast this hashmap: public static BroadcastHashMapString,FieldModel br; HashMapString,FieldModel hm = checkerObj.getModel(esserver, type); br = ssc.sparkContext().broadcast(hm); I need to access this model in my mapper phase, and do some operation based on the checkup. The following is a snippet of how I access the broadcast variable. JavaDStreamTuple3Long,Double,String split = matched.map(new GenerateType2Scores()); class GenerateType2Scores implements FunctionString, Tuple3Long, Double, String { @Override public Tuple3Long, Double, String call(String s) throws Exception{ Long time = Type2ViolationChecker.getMTS(s); HashMapString,FieldModel temphm= Type2ViolationChecker.br.value(); Double score = Type2ViolationChecker.getAnomalyScore(temphm,s); return new Tuple3Long, Double, String(time,score, s);} } The temphm should refer to the hashmap stored in the broadcast variable. Can anyone help me understand what is the correct way to access broadcast variables in JAVA? Thanks Nipun
[Spark Streaming] Null Pointer Exception when accessing broadcast variable to store a hashmap in Java
Hi, I have a spark streaming application where I need to access a model saved in a HashMap. I have *no problems in running the same code with broadcast variables in the local installation.* However I get a *null pointer* *exception* when I deploy it on my spark test cluster. I have stored a model in a HashMapString, FieldModel which is serializable. I use a broadcast variables declared as a global static variable to broadcast this hashmap: public static BroadcastHashMapString,FieldModel br; HashMapString,FieldModel hm = checkerObj.getModel(esserver, type); br = ssc.sparkContext().broadcast(hm); I need to access this model in my mapper phase, and do some operation based on the checkup. The following is a snippet of how I access the broadcast variable. JavaDStreamTuple3Long,Double,String split = matched.map(new GenerateType2Scores()); class GenerateType2Scores implements FunctionString, Tuple3Long, Double, String { @Override public Tuple3Long, Double, String call(String s) throws Exception{ Long time = Type2ViolationChecker.getMTS(s); HashMapString,FieldModel temphm= Type2ViolationChecker.br.value(); Double score = Type2ViolationChecker.getAnomalyScore(temphm,s); return new Tuple3Long, Double, String(time,score, s);} } The temphm should refer to the hashmap stored in the broadcast variable. Can anyone help me understand what is the correct way to access broadcast variables in JAVA? Thanks Nipun
Re: SQL vs. DataFrame API
Thanks! The solution: https://gist.github.com/dokipen/018a1deeab668efdf455 On Mon, Jun 22, 2015 at 4:33 PM Davies Liu dav...@databricks.com wrote: Right now, we can not figure out which column you referenced in `select`, if there are multiple row with the same name in the joined DataFrame (for example, two `value`). A workaround could be: numbers2 = numbers.select(df.name, df.value.alias('other')) rows = numbers.join(numbers2, (numbers.name==numbers2.name) (numbers.value != numbers2.other), how=inner) \ .select(numbers.name, numbers.value, numbers2.other) \ .collect() On Mon, Jun 22, 2015 at 12:53 PM, Ignacio Blasco elnopin...@gmail.com wrote: Sorry thought it was scala/spark El 22/6/2015 9:49 p. m., Bob Corsaro rcors...@gmail.com escribió: That's invalid syntax. I'm pretty sure pyspark is using a DSL to create a query here and not actually doing an equality operation. On Mon, Jun 22, 2015 at 3:43 PM Ignacio Blasco elnopin...@gmail.com wrote: Probably you should use === instead of == and !== instead of != Can anyone explain why the dataframe API doesn't work as I expect it to here? It seems like the column identifiers are getting confused. https://gist.github.com/dokipen/4b324a7365ae87b7b0e5
java.lang.IllegalArgumentException: A metric named ... already exists
Hi, I'm running a program in Spark 1.4 where several Spark Streaming contexts are created from the same Spark context. As pointed in https://spark.apache.org/docs/latest/streaming-programming-guide.html each Spark Streaming context is stopped before creating the next Spark Streaming context. The program works ok, but I get exceptions like the following when a new Spark Streaming context is created 15/06/23 16:34:51 INFO MetricsSystem: Metrics already registered java.lang.IllegalArgumentException: A metric named local-1435070090627.driver.SampleStreamingTest.StreamingMetrics.streaming.lastReceivedBatch_processingEndTime already exists at com.codahale.metrics.MetricRegistry.register(MetricRegistry.java:91) at com.codahale.metrics.MetricRegistry.registerAll(MetricRegistry.java:385) at com.codahale.metrics.MetricRegistry.register(MetricRegistry.java:85) Is this something to be concerned, or just a minor nuisance? Thanks a lot in advance. Greetings, Juan Rodriguez Hortala
Re: workaround for groupByKey
Thanks. Yes, unfortunately, they all need to be grouped. I guess I can partition the record by user id. However, I have millions of users, do you think partition by user id will help? Jianguo On Mon, Jun 22, 2015 at 6:28 PM, Silvio Fiorito silvio.fior...@granturing.com wrote: You’re right of course, I’m sorry. I was typing before thinking about what you actually asked! On a second thought, what is the ultimate outcome for what you want the sequence of pages for? Do they need to actually all be grouped? Could you instead partition by user id then use a mapPartitions perhaps? From: Jianguo Li Date: Monday, June 22, 2015 at 6:21 PM To: Silvio Fiorito Cc: user@spark.apache.org Subject: Re: workaround for groupByKey Thanks for your suggestion. I guess aggregateByKey is similar to combineByKey. I read in the Learning Sparking *We can disable map-side aggregation in combineByKey() if we know that our data won’t benefit from it. For example, groupByKey() disables map-side aggregation as the aggregation function (appending to a list) does not save any space. If we want to disable map-side combines, we need to specify the partitioner; for now you can just use the partitioner on the source RDD by passingrdd.partitioner* It seems that when the map-side aggregation function is to append something to a list (as opposed to summing over all the numbers), then this map-side aggregation does not offer any benefit since appending to a list does not save any space. Is my understanding correct? Thanks, Jianguo On Mon, Jun 22, 2015 at 4:43 PM, Silvio Fiorito silvio.fior...@granturing.com wrote: You can use aggregateByKey as one option: val input: RDD[Int, String] = ... val test = input.aggregateByKey(ListBuffer.empty[String])((a, b) = a += b, (a, b) = a ++ b) From: Jianguo Li Date: Monday, June 22, 2015 at 5:12 PM To: user@spark.apache.org Subject: workaround for groupByKey Hi, I am processing an RDD of key-value pairs. The key is an user_id, and the value is an website url the user has ever visited. Since I need to know all the urls each user has visited, I am tempted to call the groupByKey on this RDD. However, since there could be millions of users and urls, the shuffling caused by groupByKey proves to be a major bottleneck to get the job done. Is there any workaround? I want to end up with an RDD of key-value pairs, where the key is an user_id, the value is a list of all the urls visited by the user. Thanks, Jianguo
Re: Shutdown with streaming driver running in cluster broke master web UI permanently
Thank you Tathagata, It is great to know about this issue, but our problem is a little bit different. We have 3 nodes in our Spark cluster, and when the Zookeeper leader dies, the Master Spark gets shut down, and remains down, but a new master gets elected and loads the UI. I think if the problem was the eventlogging, the new master would have failed as well. Or maybe i am wrong On Tue, Jun 23, 2015 at 3:00 AM, Tathagata Das t...@databricks.com wrote: Maybe this is a known issue with spark streaming and master web ui. Disable event logging, and it should be fine. https://issues.apache.org/jira/browse/SPARK-6270 On Mon, Jun 22, 2015 at 8:54 AM, scar scar scar0...@gmail.com wrote: Sorry I was on vacation for a few days. Yes, it is on. This is what I have in the logs: 15/06/22 10:44:00 INFO ClientCnxn: Unable to read additional data from server sessionid 0x14dd82e22f70ef1, likely server has closed socket, closing socket connection and attempting reconnect 15/06/22 10:44:00 INFO ClientCnxn: Unable to read additional data from server sessionid 0x24dc5a319b40090, likely server has closed socket, closing socket connection and attempting reconnect 15/06/22 10:44:01 INFO ConnectionStateManager: State change: SUSPENDED 15/06/22 10:44:01 INFO ConnectionStateManager: State change: SUSPENDED 15/06/22 10:44:01 WARN ConnectionStateManager: There are no ConnectionStateListeners registered. 15/06/22 10:44:01 INFO ZooKeeperLeaderElectionAgent: We have lost leadership 15/06/22 10:44:01 ERROR Master: Leadership has been revoked -- master shutting down. On Thu, Jun 11, 2015 at 8:59 PM, Tathagata Das t...@databricks.com wrote: Do you have the event logging enabled? TD On Thu, Jun 11, 2015 at 11:24 AM, scar0909 scar0...@gmail.com wrote: I have the same problem. i realized that the master spark becomes unresponsive when we kill the leader zookeeper (of course i assigned the leader election task to the zookeeper). please let me know if you have any devlepments. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Shutdown-with-streaming-driver-running-in-cluster-broke-master-web-UI-permanently-tp4149p23284.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SQL vs. DataFrame API
That issue happens only in python dsl? El 23/6/2015 5:05 p. m., Bob Corsaro rcors...@gmail.com escribió: Thanks! The solution: https://gist.github.com/dokipen/018a1deeab668efdf455 On Mon, Jun 22, 2015 at 4:33 PM Davies Liu dav...@databricks.com wrote: Right now, we can not figure out which column you referenced in `select`, if there are multiple row with the same name in the joined DataFrame (for example, two `value`). A workaround could be: numbers2 = numbers.select(df.name, df.value.alias('other')) rows = numbers.join(numbers2, (numbers.name==numbers2.name) (numbers.value != numbers2.other), how=inner) \ .select(numbers.name, numbers.value, numbers2.other) \ .collect() On Mon, Jun 22, 2015 at 12:53 PM, Ignacio Blasco elnopin...@gmail.com wrote: Sorry thought it was scala/spark El 22/6/2015 9:49 p. m., Bob Corsaro rcors...@gmail.com escribió: That's invalid syntax. I'm pretty sure pyspark is using a DSL to create a query here and not actually doing an equality operation. On Mon, Jun 22, 2015 at 3:43 PM Ignacio Blasco elnopin...@gmail.com wrote: Probably you should use === instead of == and !== instead of != Can anyone explain why the dataframe API doesn't work as I expect it to here? It seems like the column identifiers are getting confused. https://gist.github.com/dokipen/4b324a7365ae87b7b0e5
SPARK-8566
I logged this Jira this morning: https://issues.apache.org/jira/browse/SPARK-8566 I'm curious if any of the cognoscenti can advise as to a likely cause of the problem?
Re: [Spark Streaming] Null Pointer Exception when accessing broadcast variable to store a hashmap in Java
I found the error so just posting on the list. It seems broadcast variables cannot be declared static. If you do you get a null pointer exception. Thanks Nipun On Tue, Jun 23, 2015 at 11:08 AM, Nipun Arora nipunarora2...@gmail.com wrote: btw. just for reference I have added the code in a gist: https://gist.github.com/nipunarora/ed987e45028250248edc and a stackoverflow reference here: http://stackoverflow.com/questions/31006490/broadcast-variable-null-pointer-exception-in-spark-streaming On Tue, Jun 23, 2015 at 11:01 AM, Nipun Arora nipunarora2...@gmail.com wrote: Hi, I have a spark streaming application where I need to access a model saved in a HashMap. I have *no problems in running the same code with broadcast variables in the local installation.* However I get a *null pointer* *exception* when I deploy it on my spark test cluster. I have stored a model in a HashMapString, FieldModel which is serializable. I use a broadcast variables declared as a global static variable to broadcast this hashmap: public static BroadcastHashMapString,FieldModel br; HashMapString,FieldModel hm = checkerObj.getModel(esserver, type); br = ssc.sparkContext().broadcast(hm); I need to access this model in my mapper phase, and do some operation based on the checkup. The following is a snippet of how I access the broadcast variable. JavaDStreamTuple3Long,Double,String split = matched.map(new GenerateType2Scores()); class GenerateType2Scores implements FunctionString, Tuple3Long, Double, String { @Override public Tuple3Long, Double, String call(String s) throws Exception{ Long time = Type2ViolationChecker.getMTS(s); HashMapString,FieldModel temphm= Type2ViolationChecker.br.value(); Double score = Type2ViolationChecker.getAnomalyScore(temphm,s); return new Tuple3Long, Double, String(time,score, s);} } The temphm should refer to the hashmap stored in the broadcast variable. Can anyone help me understand what is the correct way to access broadcast variables in JAVA? Thanks Nipun
Should I keep memory dedicated for HDFS and Spark on cluster nodes?
I'm wondering if there is a real benefit for splitting my memory in two for the datanode/workers. Datanodes and OS needs memory to perform their business. I suppose there could be loss of performance if they came to compete for memory with the worker(s). Any opinion? :-) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Should-I-keep-memory-dedicated-for-HDFS-and-Spark-on-cluster-nodes-tp23451.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
org.apache.spark.sql.ScalaReflectionLock
just a heads up, i was doing some basic coding using DataFrame, Row, StructType, etc. and i ended up with deadlocks in my sbt tests due to the usage of ScalaReflectionLock.synchronized in the spark sql code. the issue away when i changed my tests to run consecutively...
Re: SQL vs. DataFrame API
I've only tried it in python On Tue, Jun 23, 2015 at 12:16 PM Ignacio Blasco elnopin...@gmail.com wrote: That issue happens only in python dsl? El 23/6/2015 5:05 p. m., Bob Corsaro rcors...@gmail.com escribió: Thanks! The solution: https://gist.github.com/dokipen/018a1deeab668efdf455 On Mon, Jun 22, 2015 at 4:33 PM Davies Liu dav...@databricks.com wrote: Right now, we can not figure out which column you referenced in `select`, if there are multiple row with the same name in the joined DataFrame (for example, two `value`). A workaround could be: numbers2 = numbers.select(df.name, df.value.alias('other')) rows = numbers.join(numbers2, (numbers.name==numbers2.name) (numbers.value != numbers2.other), how=inner) \ .select(numbers.name, numbers.value, numbers2.other) \ .collect() On Mon, Jun 22, 2015 at 12:53 PM, Ignacio Blasco elnopin...@gmail.com wrote: Sorry thought it was scala/spark El 22/6/2015 9:49 p. m., Bob Corsaro rcors...@gmail.com escribió: That's invalid syntax. I'm pretty sure pyspark is using a DSL to create a query here and not actually doing an equality operation. On Mon, Jun 22, 2015 at 3:43 PM Ignacio Blasco elnopin...@gmail.com wrote: Probably you should use === instead of == and !== instead of != Can anyone explain why the dataframe API doesn't work as I expect it to here? It seems like the column identifiers are getting confused. https://gist.github.com/dokipen/4b324a7365ae87b7b0e5
Re: Help optimising Spark SQL query
64GB in parquet could be many billions of rows because of the columnar compression. And count distinct by itself is an expensive operation. This is not just on Spark, even on Presto/Impala, you would see performance dip with count distincts. And the cluster is not that powerful either. The one issue here is that Spark has to sift through all the data to get to just a week's worth. To achieve better performance you might want to partition the data by date/week and then Spark wouldn't have to sift through all the billions of rows to get to the millions it needs to aggregate. Regards Sab On Tue, Jun 23, 2015 at 4:35 PM, James Aley james.a...@swiftkey.com wrote: Thanks for the suggestions everyone, appreciate the advice. I tried replacing DISTINCT for the nested GROUP BY, running on 1.4 instead of 1.3, replacing the date casts with a between operation on the corresponding long constants instead and changing COUNT(*) to COUNT(1). None of these seem to have made any remarkable difference in running time for the query. I'll hook up YourKit and see if we can figure out where the CPU time is going, then post back. On 22 June 2015 at 16:01, Yin Huai yh...@databricks.com wrote: Hi James, Maybe it's the DISTINCT causing the issue. I rewrote the query as follows. Maybe this one can finish faster. select sum(cnt) as uses, count(id) as users from ( select count(*) cnt, cast(id as string) as id, from usage_events where from_unixtime(cast(timestamp_millis/1000 as bigint)) between '2015-06-09' and '2015-06-16' group by cast(id as string) ) tmp Thanks, Yin On Mon, Jun 22, 2015 at 12:55 PM, Jörn Franke jornfra...@gmail.com wrote: Generally (not only spark sql specific) you should not cast in the where part of a sql query. It is also not necessary in your case. Getting rid of casts in the whole query will be also beneficial. Le lun. 22 juin 2015 à 17:29, James Aley james.a...@swiftkey.com a écrit : Hello, A colleague of mine ran the following Spark SQL query: select count(*) as uses, count (distinct cast(id as string)) as users from usage_events where from_unixtime(cast(timestamp_millis/1000 as bigint)) between '2015-06-09' and '2015-06-16' The table contains billions of rows, but totals only 64GB of data across ~30 separate files, which are stored as Parquet with LZO compression in S3. From the referenced columns: * id is Binary, which we cast to a String so that we can DISTINCT by it. (I was already told this will improve in a later release, in a separate thread.) * timestamp_millis is a long, containing a unix timestamp with millisecond resolution This took nearly 2 hours to run on a 5 node cluster of r3.xlarge EC2 instances, using 20 executors, each with 4GB memory. I can see from monitoring tools that the CPU usage is at 100% on all nodes, but incoming network seems a bit low at 2.5MB/s, suggesting to me that this is CPU-bound. Does that seem slow? Can anyone offer any ideas by glancing at the query as to why this might be slow? We'll profile it meanwhile and post back if we find anything ourselves. A side issue - I've found that this query, and others, sometimes completes but doesn't return any results. There appears to be no error that I can see in the logs, and Spark reports the job as successful, but the connected JDBC client (SQLWorkbenchJ in this case), just sits there forever waiting. I did a quick Google and couldn't find anyone else having similar issues. Many thanks, James. -- Architect - Big Data Ph: +91 99805 99458 Manthan Systems | *Company of the year - Analytics (2014 Frost and Sullivan India ICT)* +++
Limitations using SparkContext
So the situation is following: got a spray server, with a spark context available (fair scheduling in a cluster mode, via spark-submit). There are some http urls, which calling spark rdd, and collecting information from accumulo / hdfs / etc (using rdd). Noticed, that there is a sort of limitation, on requests: wrk -t8 -c50 -d30s http://localhost:/…/; Running 30s test @ http://localhost:/…/ 8 threads and 50 connections Thread Stats Avg Stdev Max +/- Stdev Latency 1.03s 523.30ms 1.70s50.00% Req/Sec 6.05 5.4920.00 71.58% 452 requests in 30.04s, 234.39KB read Socket errors: connect 0, read 0, write 0, timeout 440 So this happens on making some calls with spark rdd (not depends on called function), and in browser you can see ERR_EMPTY_RESPONSE Now the solution was to use cache, but want to know about this limitations, or mb some settings. This error happens in local mode and in cluster mode, so guess not depends on it. P.S. logs are clear (or simply don't know where to look, but stdout of a spar-submit in a client mode is clear). -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Limitations-using-SparkContext-tp23452.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Limitations using SparkContext
Hi, can you detail the symptom further? Was it that only 12 requests were services and the other 440 timed out? I don't think that Spark is well suited for this kind of workload, or at least the way it is being represented. How long does a single request take Spark to complete? Even with fair scheduling, you will only be able to have a fixed amount of tasks running on Spark at once. Usually this is bounded by the max cores setting in configuration. Since you mention local as a comparison point I get the impression you are running Spark Standalone for cluster. The implication, if this is reflective of your current setup, is that you aren't going to get much concurrency for separate spray requests. lets say your max cores is 16 and your number of tasks/partitions per stage of your spark DAG is 8. Then at any given time only 2 requests can be serviced. It may also be the case that with fair scheduling that a single request gets pre-empted after completing one stage of the DAG and has to wait to continue instead of proceeding directly to the next stage. This hypothesis would also support the observation that local is no better than cluster, because you probably have even less concurrent spark tasks available on the single local machine. spark.cores.max(not set)When running on a standalone deploy cluster https://spark.apache.org/docs/1.3.1/spark-standalone.html or a Mesos cluster in coarse-grained sharing mode https://spark.apache.org/docs/1.3.1/running-on-mesos.html#mesos-run-modes, the maximum amount of CPU cores to request for the application from across the cluster (not from each machine). If not set, the default will be spark.deploy.defaultCores on Spark's standalone cluster manager, or infinite (all available cores) on Mesos. On Tue, Jun 23, 2015 at 12:44 PM, daunnc dau...@gmail.com wrote: So the situation is following: got a spray server, with a spark context available (fair scheduling in a cluster mode, via spark-submit). There are some http urls, which calling spark rdd, and collecting information from accumulo / hdfs / etc (using rdd). Noticed, that there is a sort of limitation, on requests: wrk -t8 -c50 -d30s http://localhost:/…/; Running 30s test @ http://localhost:/…/ 8 threads and 50 connections Thread Stats Avg Stdev Max +/- Stdev Latency 1.03s 523.30ms 1.70s50.00% Req/Sec 6.05 5.4920.00 71.58% 452 requests in 30.04s, 234.39KB read Socket errors: connect 0, read 0, write 0, timeout 440 So this happens on making some calls with spark rdd (not depends on called function), and in browser you can see ERR_EMPTY_RESPONSE Now the solution was to use cache, but want to know about this limitations, or mb some settings. This error happens in local mode and in cluster mode, so guess not depends on it. P.S. logs are clear (or simply don't know where to look, but stdout of a spar-submit in a client mode is clear). -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Limitations-using-SparkContext-tp23452.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: org.apache.spark.sql.ScalaReflectionLock
Mind filing a JIRA? On Tue, Jun 23, 2015 at 9:34 AM, Koert Kuipers ko...@tresata.com wrote: just a heads up, i was doing some basic coding using DataFrame, Row, StructType, etc. and i ended up with deadlocks in my sbt tests due to the usage of ScalaReflectionLock.synchronized in the spark sql code. the issue away when i changed my tests to run consecutively...
Can Spark1.4 work with CDH4.6
Hi folks, I have been using Spark against an external Metastore service which runs Hive with Cdh 4.6 In Spark 1.2, I was able to successfully connect by building with the following: ./make-distribution.sh --tgz -Dhadoop.version=2.0.0-mr1-cdh4.2.0 -Phive-thriftserver -Phive-0.12.0 I see that in Spark 1.4 the Hive 0.12.0 profile is deprecated in favor of spark.sql.hive.metastore.version/spark.sql.hive.metastore.jars When I tried to use this setup spark-shell fails for me with the following error: 15/06/23 18:18:07 INFO hive.HiveContext: Initializing HiveMetastoreConnection version 0.12.0 using [Ljava.net.URL;@7b7a9a6c java.lang.ClassNotFoundException: java.lang.NoClassDefFoundError: com/google/common/base/Preconditions when creating Hive client using classpath: file:/hive/lib/guava-11.0.2.jar, file:/hive/lib/hive-exec-0.10.0-cdh4.6.0.jar, file:/hive/lib/hive-metastore-0.10.0-cdh4.6.0.jar, file:/hadoop/share/hadoop/mapreduce1/lib/hadoop-common-2.0.0-cdh4.6.0.jar, file:/hive/lib/commons-logging-1.0.4.jar I don't know why it's not seeing the class -- it's in the guava jar. If anyone has had success with 0.12 version please let me know what jars need to be on the classpath. I think my Hive version might be too outdated but I don't control the metastore and I had success with Spark1.2 so I'm hoping...
Re: Registering custom metrics
Hi, Not sure if this will fit your needs, but if you are trying to collect+chart some metrics specific to your app, yet want to correlate them with what's going on in Spark, maybe Spark's performance numbers, you may want to send your custom metrics to SPM, so they can be visualized/analyzed/dashboarded along with your Spark metrics. See http://sematext.com/spm/integrations/spark-monitoring.html for the Spark piece and https://sematext.atlassian.net/wiki/display/PUBSPM/Custom+Metrics for Custom Metrics. If you use Coda Hale's metrics lib, that works, too, there is a pluggable reported that will send Coda Hale metrics to SPM, too. HTH. Otis -- Monitoring * Alerting * Anomaly Detection * Centralized Log Management Solr Elasticsearch Support * http://sematext.com/ On Mon, Jun 22, 2015 at 9:57 AM, dgoldenberg dgoldenberg...@gmail.com wrote: Hi Gerard, Have there been any responses? Any insights as to what you ended up doing to enable custom metrics? I'm thinking of implementing a custom metrics sink, not sure how doable that is yet... Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Registering-custom-metrics-tp17765p23426.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark streaming with kafka jar missing
You must not include spark-core and spark-streaming in the assembly. They are already present in the installation and the presence of multiple versions of spark may throw off the classloaders in weird ways. So make the assembly marking the those dependencies as scope=provided. On Tue, Jun 23, 2015 at 11:56 AM, Shushant Arora shushantaror...@gmail.com wrote: hi While using spark streaming (1.2) with kafka . I am getting below error and receivers are getting killed but jobs get scheduled at each stream interval. 15/06/23 18:42:35 WARN TaskSetManager: Lost task 0.1 in stage 18.0 (TID 82, ip(XX)): java.io.IOException: Failed to connect to ip() at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156) at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78) at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) at org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43) at org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) 15/06/23 18:42:36 ERROR ReceiverTracker: Deregistered receiver for stream 0: Error starting receiver 0 - java.lang.NoClassDefFoundError: org/apache/spark/util/ThreadUtils$ at org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:115) at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121) at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:277) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:269) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1319) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1319) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) I created jar with include all dependencies. Which jar is missing here ?
When to use underlying data management layer versus standalone Spark?
Hi, I work at a large financial institution in New York. We're looking into Spark and trying to learn more about the deployment/use cases for real-time analytics with Spark. When would it be better to deploy standalone Spark versus Spark on top of a more comprehensive data management layer (Hadoop, Cassandra, MongoDB, etc.)? If you do deploy on top of one of these, are there different use cases where one of these database management layers are better or worse? Any color would be very helpful. Thank you in advance. Sincerely, Michael -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/When-to-use-underlying-data-management-layer-versus-standalone-Spark-tp23455.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: java.lang.IllegalArgumentException: A metric named ... already exists
Aaah this could be potentially major issue as it may prevent metrics from restarted streaming context be not published. Can you make it a JIRA. TD On Tue, Jun 23, 2015 at 7:59 AM, Juan Rodríguez Hortalá juan.rodriguez.hort...@gmail.com wrote: Hi, I'm running a program in Spark 1.4 where several Spark Streaming contexts are created from the same Spark context. As pointed in https://spark.apache.org/docs/latest/streaming-programming-guide.html each Spark Streaming context is stopped before creating the next Spark Streaming context. The program works ok, but I get exceptions like the following when a new Spark Streaming context is created 15/06/23 16:34:51 INFO MetricsSystem: Metrics already registered java.lang.IllegalArgumentException: A metric named local-1435070090627.driver.SampleStreamingTest.StreamingMetrics.streaming.lastReceivedBatch_processingEndTime already exists at com.codahale.metrics.MetricRegistry.register(MetricRegistry.java:91) at com.codahale.metrics.MetricRegistry.registerAll(MetricRegistry.java:385) at com.codahale.metrics.MetricRegistry.register(MetricRegistry.java:85) Is this something to be concerned, or just a minor nuisance? Thanks a lot in advance. Greetings, Juan Rodriguez Hortala
spark streaming with kafka jar missing
hi While using spark streaming (1.2) with kafka . I am getting below error and receivers are getting killed but jobs get scheduled at each stream interval. 15/06/23 18:42:35 WARN TaskSetManager: Lost task 0.1 in stage 18.0 (TID 82, ip(XX)): java.io.IOException: Failed to connect to ip() at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156) at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78) at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) at org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43) at org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) 15/06/23 18:42:36 ERROR ReceiverTracker: Deregistered receiver for stream 0: Error starting receiver 0 - java.lang.NoClassDefFoundError: org/apache/spark/util/ThreadUtils$ at org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:115) at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121) at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:277) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:269) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1319) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1319) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) I created jar with include all dependencies. Which jar is missing here ?
Re: [Spark Streaming] Null Pointer Exception when accessing broadcast variable to store a hashmap in Java
Yes, this is a known behavior. Some static stuff are not serialized as part of a task. On Tue, Jun 23, 2015 at 10:24 AM, Nipun Arora nipunarora2...@gmail.com wrote: I found the error so just posting on the list. It seems broadcast variables cannot be declared static. If you do you get a null pointer exception. Thanks Nipun On Tue, Jun 23, 2015 at 11:08 AM, Nipun Arora nipunarora2...@gmail.com wrote: btw. just for reference I have added the code in a gist: https://gist.github.com/nipunarora/ed987e45028250248edc and a stackoverflow reference here: http://stackoverflow.com/questions/31006490/broadcast-variable-null-pointer-exception-in-spark-streaming On Tue, Jun 23, 2015 at 11:01 AM, Nipun Arora nipunarora2...@gmail.com wrote: Hi, I have a spark streaming application where I need to access a model saved in a HashMap. I have *no problems in running the same code with broadcast variables in the local installation.* However I get a *null pointer* *exception* when I deploy it on my spark test cluster. I have stored a model in a HashMapString, FieldModel which is serializable. I use a broadcast variables declared as a global static variable to broadcast this hashmap: public static BroadcastHashMapString,FieldModel br; HashMapString,FieldModel hm = checkerObj.getModel(esserver, type); br = ssc.sparkContext().broadcast(hm); I need to access this model in my mapper phase, and do some operation based on the checkup. The following is a snippet of how I access the broadcast variable. JavaDStreamTuple3Long,Double,String split = matched.map(new GenerateType2Scores()); class GenerateType2Scores implements FunctionString, Tuple3Long, Double, String { @Override public Tuple3Long, Double, String call(String s) throws Exception{ Long time = Type2ViolationChecker.getMTS(s); HashMapString,FieldModel temphm= Type2ViolationChecker.br.value(); Double score = Type2ViolationChecker.getAnomalyScore(temphm,s); return new Tuple3Long, Double, String(time,score, s);} } The temphm should refer to the hashmap stored in the broadcast variable. Can anyone help me understand what is the correct way to access broadcast variables in JAVA? Thanks Nipun
Re: Spark standalone cluster - resource management
probably there are already running jobs there in addition, memory is also a resource, so if you are running 1 application that took all your memory and then you are trying to run another application that asks for the memory the cluster doesn't have then the second app wont be running so why are u specifying 22g as executor memory? how much memory you have for each machine? On 23 June 2015 at 09:33, nizang ni...@windward.eu wrote: to give a bit more data on what I'm trying to get - I have many tasks I want to run in parallel, so I want each task to catch small part of the cluster (- only limited part of my 20 cores in the cluster) I have important tasks that I want them to get 10 cores, and I have small tasks that I want to run with only 1 or 2 cores) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-standalone-cluster-resource-management-tp23444p23445.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Calculating tuple count /input rate with time
This should give accurate count for each batch, though for getting the rate you have to make sure that you streaming app is stable, that is, batches are processed as fast as they are received (scheduling delay in the spark streaming UI is approx 0). TD On Tue, Jun 23, 2015 at 2:49 AM, anshu shukla anshushuk...@gmail.com wrote: I am calculating input rate using the following logic. And i think this foreachRDD is always running on driver (println are seen on driver) 1- Is there any other way to do that in less cost . 2- Will this give me the correct count for rate . //code - inputStream.foreachRDD(new FunctionJavaRDDString, Void() { @Override public Void call(JavaRDDString stringJavaRDD) throws Exception { System.out.println(System.currentTimeMillis()+,spoutstringJavaRDD, + stringJavaRDD.count() ); return null; } }); -- Thanks Regards, Anshu Shukla
Re: spark streaming with kafka jar missing
Why are you mixing spark versions between streaming and core?? Your core is 1.2.0 and streaming is 1.4.0. On Tue, Jun 23, 2015 at 1:32 PM, Shushant Arora shushantaror...@gmail.com wrote: It throws exception for WriteAheadLogUtils after excluding core and streaming jar. Exception in thread main java.lang.NoClassDefFoundError: org/apache/spark/streaming/util/WriteAheadLogUtils$ at org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:84) at org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:65) at org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:103) at org.apache.spark.streaming.kafka.KafkaUtils.createStream(KafkaUtils.scala) at com.adobe.hadoop.saprk.sample.SampleSparkStreamApp.main(SampleSparkStreamApp.java:25) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) pom.xml is : project xmlns=http://maven.apache.org/POM/4.0.0; xmlns:xsi= http://www.w3.org/2001/XMLSchema-instance; xsi:schemaLocation= http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd; modelVersion4.0.0/modelVersion groupId/groupId artifactIdSampleSparkStreamApp/artifactId version1.0/version dependencies dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.10/artifactId version1.2.0/version scopeprovided/scope /dependency dependency groupIdorg.apache.spark/groupId artifactIdspark-streaming-kafka_2.10/artifactId version1.4.0/version /dependency dependency groupIdorg.apache.spark/groupId artifactIdspark-streaming_2.10/artifactId scopeprovided/scope version1.4.0/version /dependency /dependencies build plugins !-- any other plugins -- plugin artifactIdmaven-assembly-plugin/artifactId executions execution phasepackage/phase goals goalsingle/goal /goals /execution /executions configuration descriptorRefs descriptorRefjar-with-dependencies/descriptorRef /descriptorRefs /configuration /plugin /plugins /build /project And when I pass streaming jar using --jar option , it threw same java.lang.NoClassDefFoundError: org/apache/spark/util/ThreadUtils$. Thanks On Wed, Jun 24, 2015 at 1:17 AM, Tathagata Das t...@databricks.com wrote: You must not include spark-core and spark-streaming in the assembly. They are already present in the installation and the presence of multiple versions of spark may throw off the classloaders in weird ways. So make the assembly marking the those dependencies as scope=provided. On Tue, Jun 23, 2015 at 11:56 AM, Shushant Arora shushantaror...@gmail.com wrote: hi While using spark streaming (1.2) with kafka . I am getting below error and receivers are getting killed but jobs get scheduled at each stream interval. 15/06/23 18:42:35 WARN TaskSetManager: Lost task 0.1 in stage 18.0 (TID 82, ip(XX)): java.io.IOException: Failed to connect to ip() at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156) at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78) at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) at org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43) at org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) 15/06/23 18:42:36 ERROR ReceiverTracker: Deregistered receiver for stream 0: Error starting receiver 0 - java.lang.NoClassDefFoundError: org/apache/spark/util/ThreadUtils$ at org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:115) at
Re: Accumulators / Accumulables : thread-local, task-local, executor-local ?
Hi, So I've done this Node-centered accumulator, I've written a small piece about it : http://blog.guillaume-pitel.fr/2015/06/spark-trick-shot-node-centered-aggregator/ Hope it can help someone Guillaume 2015-06-18 15:17 GMT+02:00 Guillaume Pitel guillaume.pi...@exensa.com mailto:guillaume.pi...@exensa.com: I was thinking exactly the same. I'm going to try it, It doesn't really matter if I lose an executor, since its sketch will be lost, but then reexecuted somewhere else. I mean that between the action that will update the sketches and the action to collect/merge them you can loose an executor. So outside of sparks control. But it's probably an acceptable risk. And anyway, it's an approximate data structure, and what matters are ratios, not exact values. I mostly need to take care of the concurrency problem for my sketch. I think you could do something like: - Have this singleton that holds N sketch instances (one for each executor core) - Inside an operation over partitions (like forEachPartition/mapPartitions) - at the begin you ask the singleton to provide you with one instance to use, in a sync. fashion and pick it out from the N available instances or mark it as in use - when the iterator over the partition doesn't have more elements then you release this sketch - Then you can do something like sc.parallelize(Seq(...)).coalesce(numExecutors).map(pickTheSketches).reduce(blabla), but you will have to make sure that this will be executed over each executor (not sure if a dataset than executor num, will trigger an action on every executor) Please let me know what you end up doing, sounds interesting :) Eugen Guillaume Yeah thats the problem. There is probably some perfect num of partitions that provides the best balance between partition size and memory and merge overhead. Though it's not an ideal solution :( There could be another way but very hacky... for example if you store one sketch in a singleton per jvm (so per executor). Do a first pass over your data and update those. Then you trigger some other dummy operation that will just retrieve the sketches. Thats kind of a hack but should work. Note that if you loose an executor in between, then that doesn't work anymore, probably you could detect it and recompute the sketches, but it would become over complicated. 2015-06-18 14:27 GMT+02:00 Guillaume Pitel guillaume.pi...@exensa.com mailto:guillaume.pi...@exensa.com: Hi, Thank you for this confirmation. Coalescing is what we do now. It creates, however, very big partitions. Guillaume Hey, I am not 100% sure but from my understanding accumulators are per partition (so per task as its the same) and are sent back to the driver with the task result and merged. When a task needs to be run n times (multiple rdds depend on this one, some partition loss later in the chain etc) then the accumulator will count n times the values from that task. So in short I don't think you'd win from using an accumulator over what you are doing right now. You could maybe coalesce your rdd to num-executors without a shuffle and then update the sketches. You should endup with 1 partition per executor thus 1 sketch per executor. You could then increase the number of threads per task if you can use the sketches concurrently. Eugen 2015-06-18 13:36 GMT+02:00 Guillaume Pitel guillaume.pi...@exensa.com mailto:guillaume.pi...@exensa.com: Hi, I'm trying to figure out the smartest way to implement a global count-min-sketch on accumulators. For now, we are doing that with RDDs. It works well, but with one sketch per partition, merging takes too long. As you probably know, a count-min sketch is a big mutable array of array of ints. To distribute it, all sketches must have the same size. Since it can be big, and since merging is not free, I would like to minimize the number of sketches and maximize reuse and conccurent use of the sketches. Ideally, I would like to just have one sketch per worker. I think accumulables might be the right structures for that, but it seems that they are not shared between executors, or even between tasks. https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/Accumulators.scala (289) /** * This thread-local map holds per-task copies of accumulators; it is used to collect the set * of accumulator updates to send back to the driver when tasks complete. After tasks complete,
Re: How Spark Execute chaining vs no chaining statements
There should be no difference assuming you don't use the intermediately stored rdd values you are creating for anything else (rdd1, rdd2). In the first example it still is creating these intermediate rdd objects you are just using them implicitly and not storing the value. It's also worth pointing out that Spark is able to pipeline operations together into stages. That is, it should effectively translate something like like map(f1).map(f2).map(f3) to map(f1 - f2 - f3) in pseudcode, if you will. Here is a more detailed explanation from one of the committer's on SO: http://stackoverflow.com/questions/19340808/spark-single-pipelined-scala-command-better-than-separate-commands On Tue, Jun 23, 2015 at 5:17 PM, Ashish Soni asoni.le...@gmail.com wrote: Hi All , What is difference between below in terms of execution to the cluster with 1 or more worker node rdd.map(...).map(...)...map(..) vs val rdd1 = rdd.map(...) val rdd2 = rdd1.map(...) val rdd3 = rdd2.map(...) Thanks, Ashish
Re: spark streaming with kafka jar missing
It throws exception for WriteAheadLogUtils after excluding core and streaming jar. Exception in thread main java.lang.NoClassDefFoundError: org/apache/spark/streaming/util/WriteAheadLogUtils$ at org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:84) at org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:65) at org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:103) at org.apache.spark.streaming.kafka.KafkaUtils.createStream(KafkaUtils.scala) at com.adobe.hadoop.saprk.sample.SampleSparkStreamApp.main(SampleSparkStreamApp.java:25) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) pom.xml is : project xmlns=http://maven.apache.org/POM/4.0.0; xmlns:xsi= http://www.w3.org/2001/XMLSchema-instance; xsi:schemaLocation= http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd; modelVersion4.0.0/modelVersion groupId/groupId artifactIdSampleSparkStreamApp/artifactId version1.0/version dependencies dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.10/artifactId version1.2.0/version scopeprovided/scope /dependency dependency groupIdorg.apache.spark/groupId artifactIdspark-streaming-kafka_2.10/artifactId version1.4.0/version /dependency dependency groupIdorg.apache.spark/groupId artifactIdspark-streaming_2.10/artifactId scopeprovided/scope version1.4.0/version /dependency /dependencies build plugins !-- any other plugins -- plugin artifactIdmaven-assembly-plugin/artifactId executions execution phasepackage/phase goals goalsingle/goal /goals /execution /executions configuration descriptorRefs descriptorRefjar-with-dependencies/descriptorRef /descriptorRefs /configuration /plugin /plugins /build /project And when I pass streaming jar using --jar option , it threw same java.lang.NoClassDefFoundError: org/apache/spark/util/ThreadUtils$. Thanks On Wed, Jun 24, 2015 at 1:17 AM, Tathagata Das t...@databricks.com wrote: You must not include spark-core and spark-streaming in the assembly. They are already present in the installation and the presence of multiple versions of spark may throw off the classloaders in weird ways. So make the assembly marking the those dependencies as scope=provided. On Tue, Jun 23, 2015 at 11:56 AM, Shushant Arora shushantaror...@gmail.com wrote: hi While using spark streaming (1.2) with kafka . I am getting below error and receivers are getting killed but jobs get scheduled at each stream interval. 15/06/23 18:42:35 WARN TaskSetManager: Lost task 0.1 in stage 18.0 (TID 82, ip(XX)): java.io.IOException: Failed to connect to ip() at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156) at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78) at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) at org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43) at org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) 15/06/23 18:42:36 ERROR ReceiverTracker: Deregistered receiver for stream 0: Error starting receiver 0 - java.lang.NoClassDefFoundError: org/apache/spark/util/ThreadUtils$ at org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:115) at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121) at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106) at
Kafka createDirectStream issue
Hello, I am trying use the new Kafka consumer KafkaUtils.createDirectStream but I am having some issues making it work. I have tried different versions of Spark v1.4.0 and branch-1.4 #8d6e363 and I am still getting the same strange exception ClassNotFoundException: $line49.$read$$iwC$$i Has anyone else been facing this kind of problem? The following is the code and logs that I have been using to reproduce the issue: spark-shell: script -- sc.stop() import _root_.kafka.serializer.StringDecoder import org.apache.spark.SparkConf import org.apache.spark.streaming._ import org.apache.spark.streaming.kafka.KafkaUtils val sparkConf = new SparkConf().setMaster(spark://localhost:7077).setAppName(KCon).set(spark.ui.port, 4041 ).set(spark.driver.allowMultipleContexts, true).setJars(Array(/opt/spark-libs/spark-streaming-kafka-assembly_2.10-1.4.2-SNAPSHOT.jar)) val ssc = new StreamingContext(sparkConf, Seconds(5)) val kafkaParams = Map[String, String](bootstrap.servers - localhost:9092, schema.registry.url - http://localhost:8081;, zookeeper.connect - localhost:2181, group.id - KCon ) val topic = Set(test) val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topic) val raw = messages.map(_._2) val words = raw.flatMap(_.split( )) val wordCounts = words.map(x = (x, 1L)).reduceByKey(_ + _) wordCounts.print() ssc.start() ssc.awaitTermination() -- spark-shell: output -- sparkConf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@330e37b2 ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@28ec9c23 kafkaParams: scala.collection.immutable.Map[String,String] = Map(bootstrap.servers - localhost:9092, schema.registry.url - http://localhost:8081, zookeeper.connect - localhost:2181, group.id - OPC)topic: scala.collection.immutable.Set[String] = Set(test) WARN [main] kafka.utils.VerifiableProperties - Property schema.registry.url is not valid messages: org.apache.spark.streaming.dstream.InputDStream[(String, String)] = org.apache.spark.streaming.kafka.DirectKafkaInputDStream@1e71b70d raw: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.MappedDStream@578ce232 words: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.FlatMappedDStream@351cc4b5 wordCounts: org.apache.spark.streaming.dstream.DStream[(String, Long)] = org.apache.spark.streaming.dstream.ShuffledDStream@ae04104 WARN [JobGenerator] kafka.utils.VerifiableProperties - Property schema.registry.url is not valid WARN [task-result-getter-0] org.apache.spark.scheduler.TaskSetManager - Lost task 0.0 in stage 0.0 (TID 0, 10.3.30.87): java.lang.ClassNotFoundException: $line49.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1 at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:66) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) .. .. Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256) -- Best regards and thanks in advance for any help. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-createDirectStream-issue-tp23456.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail:
Re: Spark standalone cluster - resource management
I'm having 30G per machine This is the first (and only) job I'm trying to submit. So it's weird that for --total-executor-cores=20 it works, and for --total-executor-cores=4 it doesn't On Tue, Jun 23, 2015 at 10:46 PM, Igor Berman igor.ber...@gmail.com wrote: probably there are already running jobs there in addition, memory is also a resource, so if you are running 1 application that took all your memory and then you are trying to run another application that asks for the memory the cluster doesn't have then the second app wont be running so why are u specifying 22g as executor memory? how much memory you have for each machine? On 23 June 2015 at 09:33, nizang ni...@windward.eu wrote: to give a bit more data on what I'm trying to get - I have many tasks I want to run in parallel, so I want each task to catch small part of the cluster (- only limited part of my 20 cores in the cluster) I have important tasks that I want them to get 10 cores, and I have small tasks that I want to run with only 1 or 2 cores) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-standalone-cluster-resource-management-tp23444p23445.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark streaming with kafka jar missing
Thanks a lot. It worked after keeping all versions to same.1.2.0 On Wed, Jun 24, 2015 at 2:24 AM, Tathagata Das t...@databricks.com wrote: Why are you mixing spark versions between streaming and core?? Your core is 1.2.0 and streaming is 1.4.0. On Tue, Jun 23, 2015 at 1:32 PM, Shushant Arora shushantaror...@gmail.com wrote: It throws exception for WriteAheadLogUtils after excluding core and streaming jar. Exception in thread main java.lang.NoClassDefFoundError: org/apache/spark/streaming/util/WriteAheadLogUtils$ at org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:84) at org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:65) at org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:103) at org.apache.spark.streaming.kafka.KafkaUtils.createStream(KafkaUtils.scala) at com.adobe.hadoop.saprk.sample.SampleSparkStreamApp.main(SampleSparkStreamApp.java:25) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) pom.xml is : project xmlns=http://maven.apache.org/POM/4.0.0; xmlns:xsi= http://www.w3.org/2001/XMLSchema-instance; xsi:schemaLocation= http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd; modelVersion4.0.0/modelVersion groupId/groupId artifactIdSampleSparkStreamApp/artifactId version1.0/version dependencies dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.10/artifactId version1.2.0/version scopeprovided/scope /dependency dependency groupIdorg.apache.spark/groupId artifactIdspark-streaming-kafka_2.10/artifactId version1.4.0/version /dependency dependency groupIdorg.apache.spark/groupId artifactIdspark-streaming_2.10/artifactId scopeprovided/scope version1.4.0/version /dependency /dependencies build plugins !-- any other plugins -- plugin artifactIdmaven-assembly-plugin/artifactId executions execution phasepackage/phase goals goalsingle/goal /goals /execution /executions configuration descriptorRefs descriptorRefjar-with-dependencies/descriptorRef /descriptorRefs /configuration /plugin /plugins /build /project And when I pass streaming jar using --jar option , it threw same java.lang.NoClassDefFoundError: org/apache/spark/util/ThreadUtils$. Thanks On Wed, Jun 24, 2015 at 1:17 AM, Tathagata Das t...@databricks.com wrote: You must not include spark-core and spark-streaming in the assembly. They are already present in the installation and the presence of multiple versions of spark may throw off the classloaders in weird ways. So make the assembly marking the those dependencies as scope=provided. On Tue, Jun 23, 2015 at 11:56 AM, Shushant Arora shushantaror...@gmail.com wrote: hi While using spark streaming (1.2) with kafka . I am getting below error and receivers are getting killed but jobs get scheduled at each stream interval. 15/06/23 18:42:35 WARN TaskSetManager: Lost task 0.1 in stage 18.0 (TID 82, ip(XX)): java.io.IOException: Failed to connect to ip() at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156) at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78) at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) at org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43) at org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) 15/06/23 18:42:36 ERROR ReceiverTracker: Deregistered receiver for stream 0: Error starting receiver 0 - java.lang.NoClassDefFoundError:
kafka spark streaming with mesos
Hey, I’m trying to run kafka spark streaming using mesos with following example: sc.stop import org.apache.spark.SparkConf import org.apache.spark.SparkContext._ import kafka.serializer.StringDecoder import org.apache.spark.streaming._ import org.apache.spark.streaming.kafka._ import org.apache.spark.storage.StorageLevel val sparkConf = new SparkConf().setAppName(Summarizer).setMaster(zk://127.0.0.1:2181/mesos) val ssc = new StreamingContext(sparkConf, Seconds(10)) val kafkaParams = Map[String, String](zookeeper.connect - 127.0.0.1:2181, group.id - test) val messages = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Map(test - 1), StorageLevel.MEMORY_ONLY_SER).map(_._2) messages.foreachRDD { pairRDD = println(sDataListener.listen() [pairRDD = ${pairRDD}]) println(sDataListener.listen() [pairRDD.count = ${pairRDD.count()}]) pairRDD.foreach(row = println(sDataListener.listen() [row = ${row}])) val msgData = pairRDD.collect() } Unfortunately println(sDataListener.listen() [pairRDD.count = ${pairRDD.count()}]”) returning always 0 I tested same example but using “local[2]” instead of zk://127.0.0.1:2181/mesos” and all working perfect (count return correct produced message count, and pairRDD.foreach(row = println(sDataListener.listen() [row = ${row}]”)) returning kafka msg. Could you help me to understand that issue? what i’m going wrong? attaching: spark shell output http://pastebin.com/zdYFBj4T http://pastebin.com/zdYFBj4T executor output http://pastebin.com/LDMtCjq0 http://pastebin.com/LDMtCjq0 thanks! bartek
Re: Kafka createDirectStream issue
The exception $line49 is referring to a line of the spark shell. Have you tried it from an actual assembled job with spark-submit ? On Tue, Jun 23, 2015 at 3:48 PM, syepes sye...@gmail.com wrote: Hello, I am trying use the new Kafka consumer KafkaUtils.createDirectStream but I am having some issues making it work. I have tried different versions of Spark v1.4.0 and branch-1.4 #8d6e363 and I am still getting the same strange exception ClassNotFoundException: $line49.$read$$iwC$$i Has anyone else been facing this kind of problem? The following is the code and logs that I have been using to reproduce the issue: spark-shell: script -- sc.stop() import _root_.kafka.serializer.StringDecoder import org.apache.spark.SparkConf import org.apache.spark.streaming._ import org.apache.spark.streaming.kafka.KafkaUtils val sparkConf = new SparkConf().setMaster(spark://localhost:7077).setAppName(KCon).set(spark.ui.port, 4041 ).set(spark.driver.allowMultipleContexts, true).setJars(Array(/opt/spark-libs/spark-streaming-kafka-assembly_2.10-1.4.2-SNAPSHOT.jar)) val ssc = new StreamingContext(sparkConf, Seconds(5)) val kafkaParams = Map[String, String](bootstrap.servers - localhost:9092, schema.registry.url - http://localhost:8081;, zookeeper.connect - localhost:2181, group.id - KCon ) val topic = Set(test) val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topic) val raw = messages.map(_._2) val words = raw.flatMap(_.split( )) val wordCounts = words.map(x = (x, 1L)).reduceByKey(_ + _) wordCounts.print() ssc.start() ssc.awaitTermination() -- spark-shell: output -- sparkConf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@330e37b2 ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@28ec9c23 kafkaParams: scala.collection.immutable.Map[String,String] = Map(bootstrap.servers - localhost:9092, schema.registry.url - http://localhost:8081, zookeeper.connect - localhost:2181, group.id - OPC)topic: scala.collection.immutable.Set[String] = Set(test) WARN [main] kafka.utils.VerifiableProperties - Property schema.registry.url is not valid messages: org.apache.spark.streaming.dstream.InputDStream[(String, String)] = org.apache.spark.streaming.kafka.DirectKafkaInputDStream@1e71b70d raw: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.MappedDStream@578ce232 words: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.FlatMappedDStream@351cc4b5 wordCounts: org.apache.spark.streaming.dstream.DStream[(String, Long)] = org.apache.spark.streaming.dstream.ShuffledDStream@ae04104 WARN [JobGenerator] kafka.utils.VerifiableProperties - Property schema.registry.url is not valid WARN [task-result-getter-0] org.apache.spark.scheduler.TaskSetManager - Lost task 0.0 in stage 0.0 (TID 0, 10.3.30.87): java.lang.ClassNotFoundException: $line49.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1 at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:66) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) .. .. Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256) -- Best regards and thanks in advance for any help. -- View this
How Spark Execute chaining vs no chaining statements
Hi All , What is difference between below in terms of execution to the cluster with 1 or more worker node rdd.map(...).map(...)...map(..) vs val rdd1 = rdd.map(...) val rdd2 = rdd1.map(...) val rdd3 = rdd2.map(...) Thanks, Ashish
Re: SQL vs. DataFrame API
It seems that it doesn't happen in Scala API. Not exactly the same as in python, but pretty close. https://gist.github.com/elnopintan/675968d2e4be68958df8 2015-06-23 23:11 GMT+02:00 Davies Liu dav...@databricks.com: I think it also happens in DataFrames API of all languages. On Tue, Jun 23, 2015 at 9:16 AM, Ignacio Blasco elnopin...@gmail.com wrote: That issue happens only in python dsl? El 23/6/2015 5:05 p. m., Bob Corsaro rcors...@gmail.com escribió: Thanks! The solution: https://gist.github.com/dokipen/018a1deeab668efdf455 On Mon, Jun 22, 2015 at 4:33 PM Davies Liu dav...@databricks.com wrote: Right now, we can not figure out which column you referenced in `select`, if there are multiple row with the same name in the joined DataFrame (for example, two `value`). A workaround could be: numbers2 = numbers.select(df.name, df.value.alias('other')) rows = numbers.join(numbers2, (numbers.name==numbers2.name) (numbers.value != numbers2.other), how=inner) \ .select(numbers.name, numbers.value, numbers2.other) \ .collect() On Mon, Jun 22, 2015 at 12:53 PM, Ignacio Blasco elnopin...@gmail.com wrote: Sorry thought it was scala/spark El 22/6/2015 9:49 p. m., Bob Corsaro rcors...@gmail.com escribió: That's invalid syntax. I'm pretty sure pyspark is using a DSL to create a query here and not actually doing an equality operation. On Mon, Jun 22, 2015 at 3:43 PM Ignacio Blasco elnopin...@gmail.com wrote: Probably you should use === instead of == and !== instead of != Can anyone explain why the dataframe API doesn't work as I expect it to here? It seems like the column identifiers are getting confused. https://gist.github.com/dokipen/4b324a7365ae87b7b0e5
Re: SQL vs. DataFrame API
I think it also happens in DataFrames API of all languages. On Tue, Jun 23, 2015 at 9:16 AM, Ignacio Blasco elnopin...@gmail.com wrote: That issue happens only in python dsl? El 23/6/2015 5:05 p. m., Bob Corsaro rcors...@gmail.com escribió: Thanks! The solution: https://gist.github.com/dokipen/018a1deeab668efdf455 On Mon, Jun 22, 2015 at 4:33 PM Davies Liu dav...@databricks.com wrote: Right now, we can not figure out which column you referenced in `select`, if there are multiple row with the same name in the joined DataFrame (for example, two `value`). A workaround could be: numbers2 = numbers.select(df.name, df.value.alias('other')) rows = numbers.join(numbers2, (numbers.name==numbers2.name) (numbers.value != numbers2.other), how=inner) \ .select(numbers.name, numbers.value, numbers2.other) \ .collect() On Mon, Jun 22, 2015 at 12:53 PM, Ignacio Blasco elnopin...@gmail.com wrote: Sorry thought it was scala/spark El 22/6/2015 9:49 p. m., Bob Corsaro rcors...@gmail.com escribió: That's invalid syntax. I'm pretty sure pyspark is using a DSL to create a query here and not actually doing an equality operation. On Mon, Jun 22, 2015 at 3:43 PM Ignacio Blasco elnopin...@gmail.com wrote: Probably you should use === instead of == and !== instead of != Can anyone explain why the dataframe API doesn't work as I expect it to here? It seems like the column identifiers are getting confused. https://gist.github.com/dokipen/4b324a7365ae87b7b0e5 - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming: limit number of nodes
I can not. I've already limited the number of cores to 10, so it gets 5 executors with 2 cores each... wt., 23.06.2015 o 13:45 użytkownik Akhil Das ak...@sigmoidanalytics.com napisał: Use *spark.cores.max* to limit the CPU per job, then you can easily accommodate your third job also. Thanks Best Regards On Tue, Jun 23, 2015 at 5:07 PM, Wojciech Pituła w.pit...@gmail.com wrote: I have set up small standalone cluster: 5 nodes, every node has 5GB of memory an 8 cores. As you can see, node doesn't have much RAM. I have 2 streaming apps, first one is configured to use 3GB of memory per node and second one uses 2GB per node. My problem is, that smaller app could easily run on 2 or 3 nodes, instead of 5 so I could lanuch third app. Is it possible to limit number of nodes(executors) that app wil get from standalone cluster?
flume sinks supported by spark streaming
Hi! I want to integrate flume with spark streaming. I want to know which sink type of flume are supported by spark streaming? I saw one example using avroSink. Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/flume-sinks-supported-by-spark-streaming-tp23462.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: when cached RDD will unpersist its data
In a case that memory cannot hold all the cached RDD, then BlockManager will evict some older block for storage of new RDD block. Hope that will helpful. 2015-06-24 13:22 GMT+08:00 bit1...@163.com bit1...@163.com: I am kind of consused about when cached RDD will unpersist its data. I know we can explicitly unpersist it with RDD.unpersist ,but can it be unpersist automatically by the spark framework? Thanks. -- bit1...@163.com -- 王海华
when cached RDD will unpersist its data
I am kind of consused about when cached RDD will unpersist its data. I know we can explicitly unpersist it with RDD.unpersist ,but can it be unpersist automatically by the spark framework? Thanks. bit1...@163.com
Re: Spark standalone cluster - resource management
to give a bit more data on what I'm trying to get - I have many tasks I want to run in parallel, so I want each task to catch small part of the cluster (- only limited part of my 20 cores in the cluster) I have important tasks that I want them to get 10 cores, and I have small tasks that I want to run with only 1 or 2 cores) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-standalone-cluster-resource-management-tp23444p23445.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: jars are not loading from 1.3. those set via setJars to the SparkContext
I am invoking it from the java application by creating the sparkcontext On Tue, Jun 23, 2015 at 12:17 PM, Tathagata Das t...@databricks.com wrote: How are you adding that to the classpath? Through spark-submit or otherwise? On Mon, Jun 22, 2015 at 5:02 PM, Murthy Chelankuri kmurt...@gmail.com wrote: Yes I have the producer in the class path. And I am using in standalone mode. Sent from my iPhone On 23-Jun-2015, at 3:31 am, Tathagata Das t...@databricks.com wrote: Do you have Kafka producer in your classpath? If so how are adding that library? Are you running on YARN, or Mesos or Standalone or local. These details will be very useful. On Mon, Jun 22, 2015 at 8:34 AM, Murthy Chelankuri kmurt...@gmail.com wrote: I am using spark streaming. what i am trying to do is sending few messages to some kafka topic. where its failing. java.lang.ClassNotFoundException: com.abc.mq.msg.ObjectEncoder at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:264) at kafka.utils.Utils$.createObject(Utils.scala:438) at kafka.producer.Producer.init(Producer.scala:61) On Mon, Jun 22, 2015 at 8:24 PM, Murthy Chelankuri kmurt...@gmail.com wrote: I have been using the spark from the last 6 months with the version 1.2.0. I am trying to migrate to the 1.3.0 but the same problem i have written is not wokring. Its giving class not found error when i try to load some dependent jars from the main program. This use to work in 1.2.0 when set all the dependent jars array to the spark context but not working in 1.3.0 Please help me how to resolve this. Thanks, Murthy Chelankuri
Re: jars are not loading from 1.3. those set via setJars to the SparkContext
How are you adding that to the classpath? Through spark-submit or otherwise? On Mon, Jun 22, 2015 at 5:02 PM, Murthy Chelankuri kmurt...@gmail.com wrote: Yes I have the producer in the class path. And I am using in standalone mode. Sent from my iPhone On 23-Jun-2015, at 3:31 am, Tathagata Das t...@databricks.com wrote: Do you have Kafka producer in your classpath? If so how are adding that library? Are you running on YARN, or Mesos or Standalone or local. These details will be very useful. On Mon, Jun 22, 2015 at 8:34 AM, Murthy Chelankuri kmurt...@gmail.com wrote: I am using spark streaming. what i am trying to do is sending few messages to some kafka topic. where its failing. java.lang.ClassNotFoundException: com.abc.mq.msg.ObjectEncoder at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:264) at kafka.utils.Utils$.createObject(Utils.scala:438) at kafka.producer.Producer.init(Producer.scala:61) On Mon, Jun 22, 2015 at 8:24 PM, Murthy Chelankuri kmurt...@gmail.com wrote: I have been using the spark from the last 6 months with the version 1.2.0. I am trying to migrate to the 1.3.0 but the same problem i have written is not wokring. Its giving class not found error when i try to load some dependent jars from the main program. This use to work in 1.2.0 when set all the dependent jars array to the spark context but not working in 1.3.0 Please help me how to resolve this. Thanks, Murthy Chelankuri
Re: jars are not loading from 1.3. those set via setJars to the SparkContext
So you have Kafka in your classpath in you Java application, where you are creating the sparkContext with the spark standalone master URL, right? The recommended way of submitting spark applications to any cluster is using spark-submit. See https://spark.apache.org/docs/latest/submitting-applications.html. This takes care of sending all the libraries to the cluster workers so that they can be found. Please try that. On Mon, Jun 22, 2015 at 11:50 PM, Murthy Chelankuri kmurt...@gmail.com wrote: I am invoking it from the java application by creating the sparkcontext On Tue, Jun 23, 2015 at 12:17 PM, Tathagata Das t...@databricks.com wrote: How are you adding that to the classpath? Through spark-submit or otherwise? On Mon, Jun 22, 2015 at 5:02 PM, Murthy Chelankuri kmurt...@gmail.com wrote: Yes I have the producer in the class path. And I am using in standalone mode. Sent from my iPhone On 23-Jun-2015, at 3:31 am, Tathagata Das t...@databricks.com wrote: Do you have Kafka producer in your classpath? If so how are adding that library? Are you running on YARN, or Mesos or Standalone or local. These details will be very useful. On Mon, Jun 22, 2015 at 8:34 AM, Murthy Chelankuri kmurt...@gmail.com wrote: I am using spark streaming. what i am trying to do is sending few messages to some kafka topic. where its failing. java.lang.ClassNotFoundException: com.abc.mq.msg.ObjectEncoder at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:264) at kafka.utils.Utils$.createObject(Utils.scala:438) at kafka.producer.Producer.init(Producer.scala:61) On Mon, Jun 22, 2015 at 8:24 PM, Murthy Chelankuri kmurt...@gmail.com wrote: I have been using the spark from the last 6 months with the version 1.2.0. I am trying to migrate to the 1.3.0 but the same problem i have written is not wokring. Its giving class not found error when i try to load some dependent jars from the main program. This use to work in 1.2.0 when set all the dependent jars array to the spark context but not working in 1.3.0 Please help me how to resolve this. Thanks, Murthy Chelankuri
Re: [Spark Streaming 1.4.0] SPARK-5063, Checkpointing and queuestream
queue stream does not support driver checkpoint recovery since the RDDs in the queue are arbitrary generated by the user and its hard for Spark Streaming to keep track of the data in the RDDs (thats necessary for recovering from checkpoint). Anyways queue stream is meant of testing and development, not for production and hence the question of recovering the driver does not arise in that case. On Mon, Jun 22, 2015 at 8:10 AM, Shaanan Cohney shaan...@gmail.com wrote: It's a generated set of shell commands to run (written in C, highly optimized numerical computer), which is create from a set of user provided parameters. The snippet above is: task_outfiles_to_cmds = OrderedDict(run_sieving.leftover_tasks) task_outfiles_to_cmds.update(generate_sieving_task_commands(parameters, poly_path, fb_paths)) task_commands = list(task_outfiles_to_cmds.values()) task_path = os.path.join(utils.WORK_DIR, sieving_tasks) if not os.path.exists(task_path): os.makedirs(task_path) batch_size = utils.TOTAL_CORES task_batches = [task_commands[c:c+batch_size] for c in range(0, len(task_commands),batch_size)] Which does not reference the SparkContext or StreamingContext at all. -- Shaanan Cohney PhD Student University of Pennsylvania shaan...@gmail.com On Tue, Jun 23, 2015 at 1:05 AM, Benjamin Fradet benjamin.fra...@gmail.com wrote: Where does task_batches come from? On 22 Jun 2015 4:48 pm, Shaanan Cohney shaan...@gmail.com wrote: Thanks, I've updated my code to use updateStateByKey but am still getting these errors when I resume from a checkpoint. One thought of mine was that I used sc.parallelize to generate the RDDs for the queue, but perhaps on resume, it doesn't recreate the context needed? -- Shaanan Cohney PhD Student University of Pennsylvania shaan...@gmail.com On Mon, Jun 22, 2015 at 9:27 PM, Benjamin Fradet benjamin.fra...@gmail.com wrote: I would suggest you have a look at the updateStateByKey transformation in the Spark Streaming programming guide which should fit your needs better than your update_state function. On 22 Jun 2015 1:03 pm, Shaanan Cohney shaan...@gmail.com wrote: Counts is a list (counts = []) in the driver, used to collect the results. It seems like it's also not the best way to be doing things, but I'm new to spark and editing someone else's code so still learning. Thanks! def update_state(out_files, counts, curr_rdd): try: for c in curr_rdd.collect(): fnames, count = c counts.append(count) out_files |= fnames except Py4JJavaError as e: print(EXCEPTION: %s % str(e)) -- Shaanan Cohney PhD Student University of Pennsylvania shaan...@gmail.com On Mon, Jun 22, 2015 at 8:56 PM, Benjamin Fradet benjamin.fra...@gmail.com wrote: What does counts refer to? Could you also paste the code of your update_state function? On 22 Jun 2015 12:48 pm, Shaanan Cohney shaan...@gmail.com wrote: I'm receiving the SPARK-5063 error (RDD transformations and actions can only be invoked by the driver, not inside of other transformations) whenever I try and restore from a checkpoint in spark streaming on my app. I'm using python3 and my RDDs are inside a queuestream DStream. This is the little chunk of code causing issues: - p_batches = [sc.parallelize(batch) for batch in task_batches] sieving_tasks = ssc.queueStream(p_batches) sieving_tasks.checkpoint(20) relations = sieving_tasks.map(lambda s: run_sieving_command(s, poly, poly_path, fb_paths)) relations.reduce(lambda a, b: (a[0] | b[0], a[1] + b[1]) ).foreachRDD(lambda s: update_state(out_files, counts, s)) ssc.checkpoint(s3n_path) - Thanks again! -- Shaanan Cohney PhD Student University of Pennsylvania shaan...@gmail.com
Re: Multiple executors writing file using java filewriter
Why don't you do a normal .saveAsTextFiles? Thanks Best Regards On Mon, Jun 22, 2015 at 11:55 PM, anshu shukla anshushuk...@gmail.com wrote: Thanx for reply !! YES , Either it should write on any machine of cluster or Can you please help me ... that how to do this . Previously i was using writing using collect () , so some of my tuples are missing while writing. //previous logic that was just creating the file on master - newinputStream.foreachRDD(new Function2JavaRDDString, Time, Void() { @Override public Void call(JavaRDDString v1, Time v2) throws Exception { for(String s:v1.collect()) { //System.out.println(v1 here is + v1 + --- + s); spoutlog.batchLogwriter(System.currentTimeMillis(), spout-MSGID, + msgeditor.getMessageId(s)); //System.out.println(msgeditor.getMessageId(s)); } return null; } }); On Mon, Jun 22, 2015 at 11:31 PM, Richard Marscher rmarsc...@localytics.com wrote: Is spoutLog just a non-spark file writer? If you run that in the map call on a cluster its going to be writing in the filesystem of the executor its being run on. I'm not sure if that's what you intended. On Mon, Jun 22, 2015 at 1:35 PM, anshu shukla anshushuk...@gmail.com wrote: Running perfectly in local system but not writing to file in cluster mode .ANY suggestions please .. //msgid is long counter JavaDStreamString newinputStream=inputStream.map(new FunctionString, String() { @Override public String call(String v1) throws Exception { String s1=msgId+@+v1; System.out.println(s1); msgId++; try { *//filewriter logic spoutlog.batchLogwriter(System.currentTimeMillis(), spout-MSGID, + msgeditor.getMessageId(s1));* } catch (Exception e) { System.out.println(exeception is here); e.printStackTrace(); throw e; } System.out.println(msgid,+msgId); return msgeditor.addMessageId(v1,msgId); } }); -- Thanks Regards, Anshu Shukla On Mon, Jun 22, 2015 at 10:50 PM, anshu shukla anshushuk...@gmail.com wrote: Can not we write some data to a txt file in parallel with multiple executors running in parallel ?? -- Thanks Regards, Anshu Shukla -- Thanks Regards, Anshu Shukla -- Thanks Regards, Anshu Shukla
MLLIB - Storing the Trained Model
HI All, I was trying to store a trained model to the local hard disk. i am able to save it using save() function. while i am trying to retrieve the stored model using load() function i am end up with following error. kindly help me on this. scala val sameModel = RandomForestModel.load(sc,/home/ec2-user/myModel) 15/06/23 02:04:25 INFO MemoryStore: ensureFreeSpace(255260) called with curMem=592097, maxMem=278302556 15/06/23 02:04:25 INFO MemoryStore: Block broadcast_6 stored as values in memory (estimated size 249.3 KB, free 264.6 MB) 15/06/23 02:04:25 INFO MemoryStore: ensureFreeSpace(36168) called with curMem=847357, maxMem=278302556 15/06/23 02:04:25 INFO MemoryStore: Block broadcast_6_piece0 stored as bytes in memory (estimated size 35.3 KB, free 264.6 MB) 15/06/23 02:04:25 INFO BlockManagerInfo: Added broadcast_6_piece0 in memory on localhost:42290 (size: 35.3 KB, free: 265.3 MB) 15/06/23 02:04:25 INFO BlockManagerMaster: Updated info of block broadcast_6_piece0 15/06/23 02:04:25 INFO SparkContext: Created broadcast 6 from textFile at modelSaveLoad.scala:125 15/06/23 02:04:25 INFO FileInputFormat: Total input paths to process : 1 15/06/23 02:04:25 INFO SparkContext: Starting job: first at modelSaveLoad.scala:125 15/06/23 02:04:25 INFO DAGScheduler: Got job 3 (first at modelSaveLoad.scala:125) with 1 output partitions (allowLocal=true) 15/06/23 02:04:25 INFO DAGScheduler: Final stage: Stage 3(first at modelSaveLoad.scala:125) 15/06/23 02:04:25 INFO DAGScheduler: Parents of final stage: List() 15/06/23 02:04:25 INFO DAGScheduler: Missing parents: List() 15/06/23 02:04:25 INFO DAGScheduler: Submitting Stage 3 (/home/ec2-user/myModel/metadata MapPartitionsRDD[7] at textFile at modelSaveLoad.scala:125), which has no missing parents 15/06/23 02:04:25 INFO MemoryStore: ensureFreeSpace(2680) called with curMem=883525, maxMem=278302556 15/06/23 02:04:25 INFO MemoryStore: Block broadcast_7 stored as values in memory (estimated size 2.6 KB, free 264.6 MB) 15/06/23 02:04:25 INFO MemoryStore: ensureFreeSpace(1965) called with curMem=886205, maxMem=278302556 15/06/23 02:04:25 INFO MemoryStore: Block broadcast_7_piece0 stored as bytes in memory (estimated size 1965.0 B, free 264.6 MB) 15/06/23 02:04:25 INFO BlockManagerInfo: Added broadcast_7_piece0 in memory on localhost:42290 (size: 1965.0 B, free: 265.3 MB) 15/06/23 02:04:25 INFO BlockManagerMaster: Updated info of block broadcast_7_piece0 15/06/23 02:04:25 INFO SparkContext: Created broadcast 7 from broadcast at DAGScheduler.scala:839 15/06/23 02:04:25 INFO DAGScheduler: Submitting 1 missing tasks from Stage 3 (/home/ec2-user/myModel/metadata MapPartitionsRDD[7] at textFile at modelSaveLoad.scala:125) 15/06/23 02:04:25 INFO TaskSchedulerImpl: Adding task set 3.0 with 1 tasks 15/06/23 02:04:25 INFO TaskSetManager: Starting task 0.0 in stage 3.0 (TID 3, localhost, PROCESS_LOCAL, 1311 bytes) 15/06/23 02:04:25 INFO Executor: Running task 0.0 in stage 3.0 (TID 3) 15/06/23 02:04:25 INFO HadoopRDD: Input split: file:/home/ec2-user/myModel/metadata/part-0:0+97 15/06/23 02:04:25 INFO Executor: Finished task 0.0 in stage 3.0 (TID 3). 1989 bytes result sent to driver 15/06/23 02:04:25 INFO TaskSetManager: Finished task 0.0 in stage 3.0 (TID 3) in 10 ms on localhost (1/1) 15/06/23 02:04:25 INFO DAGScheduler: Stage 3 (first at modelSaveLoad.scala:125) finished in 0.010 s 15/06/23 02:04:25 INFO TaskSchedulerImpl: Removed TaskSet 3.0, whose tasks have all completed, from pool 15/06/23 02:04:25 INFO DAGScheduler: Job 3 finished: first at modelSaveLoad.scala:125, took 0.016193 s 15/06/23 02:04:25 WARN FSInputChecker: Problem opening checksum file: file:/home/ec2-user/myModel/data/_temporary/0/_temporary/attempt_201506230149_0027_r_01_0/part-r-2.parquet. Ignoring exception: java.io.EOFException at java.io.DataInputStream.readFully(DataInputStream.java:197) at java.io.DataInputStream.readFully(DataInputStream.java:169) at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.init(ChecksumFileSystem.java:149) at org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:341) at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:766) at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:402) at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:298) at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:297) at scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53) at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56) at
Spark standalone cluster - resource management
hi, I'm running spark standalone cluster with 5 slaves, each has 4 cores. When I run job with the following configuration: /root/spark/bin/spark-submit -v --total-executor-cores 20 --executor-memory 22g --executor-cores 4 --class com.windward.spark.apps.MyApp --name dev-app --properties-file /mnt/spark-apps/apps/dev/my-app/app.properties /mnt/spark-apps/apps/dev/my-app/my-app-1.0-SNAPSHOT-jar-with-dependencies.jar 20140101 20140103 everything runs fine (total-executor-cores=20 - I set it to all the cores I have 4*5) If I run it with the following configuration: /root/spark/bin/spark-submit -v --total-executor-cores 4 --executor-memory 22g --executor-cores 4 --class com.windward.spark.apps.MyApp --name dev-app --properties-file /mnt/spark-apps/apps/dev/my-app/app.properties /mnt/spark-apps/apps/dev/my-app/my-app-1.0-SNAPSHOT-jar-with-dependencies.jar 20140101 20140103 (I set total-executor-cores=4. because I want to use only small part of my cluster for that task), I get the following message: org.apache.spark.scheduler.TaskSchedulerImpl- Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources Can't I tell spark to use only part of my cores for specific task? I need it if I want to run many tasks in parallel thanks, nizan -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-standalone-cluster-resource-management-tp23444.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Shutdown with streaming driver running in cluster broke master web UI permanently
Maybe this is a known issue with spark streaming and master web ui. Disable event logging, and it should be fine. https://issues.apache.org/jira/browse/SPARK-6270 On Mon, Jun 22, 2015 at 8:54 AM, scar scar scar0...@gmail.com wrote: Sorry I was on vacation for a few days. Yes, it is on. This is what I have in the logs: 15/06/22 10:44:00 INFO ClientCnxn: Unable to read additional data from server sessionid 0x14dd82e22f70ef1, likely server has closed socket, closing socket connection and attempting reconnect 15/06/22 10:44:00 INFO ClientCnxn: Unable to read additional data from server sessionid 0x24dc5a319b40090, likely server has closed socket, closing socket connection and attempting reconnect 15/06/22 10:44:01 INFO ConnectionStateManager: State change: SUSPENDED 15/06/22 10:44:01 INFO ConnectionStateManager: State change: SUSPENDED 15/06/22 10:44:01 WARN ConnectionStateManager: There are no ConnectionStateListeners registered. 15/06/22 10:44:01 INFO ZooKeeperLeaderElectionAgent: We have lost leadership 15/06/22 10:44:01 ERROR Master: Leadership has been revoked -- master shutting down. On Thu, Jun 11, 2015 at 8:59 PM, Tathagata Das t...@databricks.com wrote: Do you have the event logging enabled? TD On Thu, Jun 11, 2015 at 11:24 AM, scar0909 scar0...@gmail.com wrote: I have the same problem. i realized that the master spark becomes unresponsive when we kill the leader zookeeper (of course i assigned the leader election task to the zookeeper). please let me know if you have any devlepments. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Shutdown-with-streaming-driver-running-in-cluster-broke-master-web-UI-permanently-tp4149p23284.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: jars are not loading from 1.3. those set via setJars to the SparkContext
yes , in spark standalone mode witht the master URL. Jar are copying to execeutor and the application is running fine but its failing at some point when kafka is trying to load the classes using some reflection mechanisims for loading the Encoder and Partitioner classes. Here are my finding so far on this issue. But in the driver app if any module is trying to load class using the class loader (using some reflection ) its not able to find the class. This use to work in the 1.2.0 not sure why its not working with 1.3.0 Is there any way can we make the driver to use the spark executor classloader for loading the classes or some thing like that? On Tue, Jun 23, 2015 at 12:28 PM, Tathagata Das t...@databricks.com wrote: So you have Kafka in your classpath in you Java application, where you are creating the sparkContext with the spark standalone master URL, right? The recommended way of submitting spark applications to any cluster is using spark-submit. See https://spark.apache.org/docs/latest/submitting-applications.html. This takes care of sending all the libraries to the cluster workers so that they can be found. Please try that. On Mon, Jun 22, 2015 at 11:50 PM, Murthy Chelankuri kmurt...@gmail.com wrote: I am invoking it from the java application by creating the sparkcontext On Tue, Jun 23, 2015 at 12:17 PM, Tathagata Das t...@databricks.com wrote: How are you adding that to the classpath? Through spark-submit or otherwise? On Mon, Jun 22, 2015 at 5:02 PM, Murthy Chelankuri kmurt...@gmail.com wrote: Yes I have the producer in the class path. And I am using in standalone mode. Sent from my iPhone On 23-Jun-2015, at 3:31 am, Tathagata Das t...@databricks.com wrote: Do you have Kafka producer in your classpath? If so how are adding that library? Are you running on YARN, or Mesos or Standalone or local. These details will be very useful. On Mon, Jun 22, 2015 at 8:34 AM, Murthy Chelankuri kmurt...@gmail.com wrote: I am using spark streaming. what i am trying to do is sending few messages to some kafka topic. where its failing. java.lang.ClassNotFoundException: com.abc.mq.msg.ObjectEncoder at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:264) at kafka.utils.Utils$.createObject(Utils.scala:438) at kafka.producer.Producer.init(Producer.scala:61) On Mon, Jun 22, 2015 at 8:24 PM, Murthy Chelankuri kmurt...@gmail.com wrote: I have been using the spark from the last 6 months with the version 1.2.0. I am trying to migrate to the 1.3.0 but the same problem i have written is not wokring. Its giving class not found error when i try to load some dependent jars from the main program. This use to work in 1.2.0 when set all the dependent jars array to the spark context but not working in 1.3.0 Please help me how to resolve this. Thanks, Murthy Chelankuri
Re: Spark job fails silently
Looks like a hostname conflict to me. 15/06/22 17:04:45 WARN Utils: Your hostname, datasci01.dev.abc.com resolves to a loopback address: 127.0.0.1; using 10.0.3.197 instead (on interface eth0) 15/06/22 17:04:45 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address Can you paste your /etc/hosts here? Thanks Best Regards On Tue, Jun 23, 2015 at 2:40 AM, roy rp...@njit.edu wrote: Hi, Our spark job on yarn suddenly started failing silently without showing any error following is the trace. Using properties file: /usr/lib/spark/conf/spark-defaults.conf Adding default property: spark.serializer=org.apache.spark.serializer.KryoSerializer Adding default property: spark.executor.extraJavaOptions=-Dlog4j.configuration=file:///etc/spark/log4j.properties Adding default property: spark.eventLog.enabled=true Adding default property: spark.shuffle.service.enabled=true Adding default property: spark.driver.extraLibraryPath=/usr/lib/hadoop/lib/native Adding default property: spark.yarn.historyServer.address=http://ds-hnn002.dev.abc.com:18088 Adding default property: spark.yarn.am.extraLibraryPath=/usr/lib/hadoop/lib/native Adding default property: spark.ui.showConsoleProgress=true Adding default property: spark.shuffle.service.port=7337 Adding default property: spark.master=yarn-client Adding default property: spark.executor.extraLibraryPath=/usr/lib/hadoop/lib/native Adding default property: spark.eventLog.dir=hdfs://magnetic-hadoop-dev/user/spark/applicationHistory Adding default property: spark.yarn.jar=local:/usr/lib/spark/assembly/lib/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar Parsed arguments: master yarn deployMode null executorMemory 3G executorCores null totalExecutorCores null propertiesFile /usr/lib/spark/conf/spark-defaults.conf driverMemory4G driverCores null driverExtraClassPathnull driverExtraLibraryPath /usr/lib/hadoop/lib/native driverExtraJavaOptions null supervise false queue null numExecutors30 files null pyFiles null archivesnull mainClass null primaryResource file:/home/jonathanarfa/code/updb/spark/updb2vw_testing.py nameupdb2vw_testing.py childArgs [--date 2015-05-20] jarsnull packagesnull repositoriesnull verbose true Spark properties used, including those specified through --conf and those from the properties file /usr/lib/spark/conf/spark-defaults.conf: spark.executor.extraLibraryPath - /usr/lib/hadoop/lib/native spark.yarn.jar - local:/usr/lib/spark/assembly/lib/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar spark.driver.extraLibraryPath - /usr/lib/hadoop/lib/native spark.yarn.historyServer.address - http://ds-hnn002.dev.abc.com:18088 spark.yarn.am.extraLibraryPath - /usr/lib/hadoop/lib/native spark.eventLog.enabled - true spark.ui.showConsoleProgress - true spark.serializer - org.apache.spark.serializer.KryoSerializer spark.executor.extraJavaOptions - -Dlog4j.configuration=file:///etc/spark/log4j.properties spark.shuffle.service.enabled - true spark.shuffle.service.port - 7337 spark.eventLog.dir - hdfs://magnetic-hadoop-dev/user/spark/applicationHistory spark.master - yarn-client Main class: org.apache.spark.deploy.PythonRunner Arguments: file:/home/jonathanarfa/code/updb/spark/updb2vw_testing.py null --date 2015-05-20 System properties: spark.executor.extraLibraryPath - /usr/lib/hadoop/lib/native spark.driver.memory - 4G spark.executor.memory - 3G spark.yarn.jar - local:/usr/lib/spark/assembly/lib/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar spark.driver.extraLibraryPath - /usr/lib/hadoop/lib/native spark.executor.instances - 30 spark.yarn.historyServer.address - http://ds-hnn002.dev.abc.com:18088 spark.yarn.am.extraLibraryPath - /usr/lib/hadoop/lib/native spark.ui.showConsoleProgress - true spark.eventLog.enabled - true spark.yarn.dist.files - file:/home/jonathanarfa/code/updb/spark/updb2vw_testing.py SPARK_SUBMIT - true spark.serializer - org.apache.spark.serializer.KryoSerializer spark.executor.extraJavaOptions - -Dlog4j.configuration=file:///etc/spark/log4j.properties spark.shuffle.service.enabled - true spark.app.name - updb2vw_testing.py spark.shuffle.service.port - 7337 spark.eventLog.dir - hdfs://magnetic-hadoop-dev/user/spark/applicationHistory spark.master - yarn-client Classpath elements: spark.akka.frameSize=60 spark.app.name=updb2vw_2015-05-20 spark.driver.extraLibraryPath=/usr/lib/hadoop/lib/native spark.driver.maxResultSize=2G spark.driver.memory=4G spark.eventLog.dir=hdfs://magnetic-hadoop-dev/user/spark/applicationHistory
Re: Any way to retrieve time of message arrival to Kafka topic, in Spark Streaming?
May be while producing the messages, you can make it as a keyedMessage with the timestamp as key and on the consumer end you can easily identify the key (which will be the timestamp) from the message. If the network is fast enough, then i think there could be a small millisecond lag. Thanks Best Regards On Tue, Jun 23, 2015 at 10:22 AM, dgoldenberg dgoldenberg...@gmail.com wrote: Is there any way to retrieve the time of each message's arrival into a Kafka topic, when streaming in Spark, whether with receiver-based or direct streaming? Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Any-way-to-retrieve-time-of-message-arrival-to-Kafka-topic-in-Spark-Streaming-tp23442.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to figure out how many records received by individual receiver
Hi, I am using spark1.3.1, and have 2 receivers, On the web UI, I can only see the total records received by all these 2 receivers, but I can't figure out the records received by individual receiver? Not sure whether the information is shown on the UI in spark1.4. bit1...@163.com
Re: jars are not loading from 1.3. those set via setJars to the SparkContext
This could be because of some subtle change in the classloaders used by executors. I think there has been issues in the past with libraries that use Class.forName to find classes by reflection. Because the executors load classes dynamically using custom class loaders, libraries that use Class.forName does not use the right class loader that has the custom class loader with dynamically loaded classes. One workaround is the add the relevant library in the spark conf spark.executor.extraClasspath and see if it works. Make sure that the library should be already present in the worker machines at the given path. This should start the executor with the library already present in the initial classpath and therefore present in the system classloader. Then probably Class.forName would find it. TD On Tue, Jun 23, 2015 at 12:14 AM, Murthy Chelankuri kmurt...@gmail.com wrote: yes , in spark standalone mode witht the master URL. Jar are copying to execeutor and the application is running fine but its failing at some point when kafka is trying to load the classes using some reflection mechanisims for loading the Encoder and Partitioner classes. Here are my finding so far on this issue. But in the driver app if any module is trying to load class using the class loader (using some reflection ) its not able to find the class. This use to work in the 1.2.0 not sure why its not working with 1.3.0 Is there any way can we make the driver to use the spark executor classloader for loading the classes or some thing like that? On Tue, Jun 23, 2015 at 12:28 PM, Tathagata Das t...@databricks.com wrote: So you have Kafka in your classpath in you Java application, where you are creating the sparkContext with the spark standalone master URL, right? The recommended way of submitting spark applications to any cluster is using spark-submit. See https://spark.apache.org/docs/latest/submitting-applications.html. This takes care of sending all the libraries to the cluster workers so that they can be found. Please try that. On Mon, Jun 22, 2015 at 11:50 PM, Murthy Chelankuri kmurt...@gmail.com wrote: I am invoking it from the java application by creating the sparkcontext On Tue, Jun 23, 2015 at 12:17 PM, Tathagata Das t...@databricks.com wrote: How are you adding that to the classpath? Through spark-submit or otherwise? On Mon, Jun 22, 2015 at 5:02 PM, Murthy Chelankuri kmurt...@gmail.com wrote: Yes I have the producer in the class path. And I am using in standalone mode. Sent from my iPhone On 23-Jun-2015, at 3:31 am, Tathagata Das t...@databricks.com wrote: Do you have Kafka producer in your classpath? If so how are adding that library? Are you running on YARN, or Mesos or Standalone or local. These details will be very useful. On Mon, Jun 22, 2015 at 8:34 AM, Murthy Chelankuri kmurt...@gmail.com wrote: I am using spark streaming. what i am trying to do is sending few messages to some kafka topic. where its failing. java.lang.ClassNotFoundException: com.abc.mq.msg.ObjectEncoder at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:264) at kafka.utils.Utils$.createObject(Utils.scala:438) at kafka.producer.Producer.init(Producer.scala:61) On Mon, Jun 22, 2015 at 8:24 PM, Murthy Chelankuri kmurt...@gmail.com wrote: I have been using the spark from the last 6 months with the version 1.2.0. I am trying to migrate to the 1.3.0 but the same problem i have written is not wokring. Its giving class not found error when i try to load some dependent jars from the main program. This use to work in 1.2.0 when set all the dependent jars array to the spark context but not working in 1.3.0 Please help me how to resolve this. Thanks, Murthy Chelankuri
RE: MLLIB - Storing the Trained Model
Hi Samsudhin, If possible, can you please provide a part of the code? Or perhaps try with the ut in RandomForestSuite to see if the issue repros. Regards, yuhao -Original Message- From: samsudhin [mailto:samsud...@pigstick.com] Sent: Tuesday, June 23, 2015 2:14 PM To: user@spark.apache.org Subject: MLLIB - Storing the Trained Model HI All, I was trying to store a trained model to the local hard disk. i am able to save it using save() function. while i am trying to retrieve the stored model using load() function i am end up with following error. kindly help me on this. scala val sameModel = scala RandomForestModel.load(sc,/home/ec2-user/myModel) 15/06/23 02:04:25 INFO MemoryStore: ensureFreeSpace(255260) called with curMem=592097, maxMem=278302556 15/06/23 02:04:25 INFO MemoryStore: Block broadcast_6 stored as values in memory (estimated size 249.3 KB, free 264.6 MB) 15/06/23 02:04:25 INFO MemoryStore: ensureFreeSpace(36168) called with curMem=847357, maxMem=278302556 15/06/23 02:04:25 INFO MemoryStore: Block broadcast_6_piece0 stored as bytes in memory (estimated size 35.3 KB, free 264.6 MB) 15/06/23 02:04:25 INFO BlockManagerInfo: Added broadcast_6_piece0 in memory on localhost:42290 (size: 35.3 KB, free: 265.3 MB) 15/06/23 02:04:25 INFO BlockManagerMaster: Updated info of block broadcast_6_piece0 15/06/23 02:04:25 INFO SparkContext: Created broadcast 6 from textFile at modelSaveLoad.scala:125 15/06/23 02:04:25 INFO FileInputFormat: Total input paths to process : 1 15/06/23 02:04:25 INFO SparkContext: Starting job: first at modelSaveLoad.scala:125 15/06/23 02:04:25 INFO DAGScheduler: Got job 3 (first at modelSaveLoad.scala:125) with 1 output partitions (allowLocal=true) 15/06/23 02:04:25 INFO DAGScheduler: Final stage: Stage 3(first at modelSaveLoad.scala:125) 15/06/23 02:04:25 INFO DAGScheduler: Parents of final stage: List() 15/06/23 02:04:25 INFO DAGScheduler: Missing parents: List() 15/06/23 02:04:25 INFO DAGScheduler: Submitting Stage 3 (/home/ec2-user/myModel/metadata MapPartitionsRDD[7] at textFile at modelSaveLoad.scala:125), which has no missing parents 15/06/23 02:04:25 INFO MemoryStore: ensureFreeSpace(2680) called with curMem=883525, maxMem=278302556 15/06/23 02:04:25 INFO MemoryStore: Block broadcast_7 stored as values in memory (estimated size 2.6 KB, free 264.6 MB) 15/06/23 02:04:25 INFO MemoryStore: ensureFreeSpace(1965) called with curMem=886205, maxMem=278302556 15/06/23 02:04:25 INFO MemoryStore: Block broadcast_7_piece0 stored as bytes in memory (estimated size 1965.0 B, free 264.6 MB) 15/06/23 02:04:25 INFO BlockManagerInfo: Added broadcast_7_piece0 in memory on localhost:42290 (size: 1965.0 B, free: 265.3 MB) 15/06/23 02:04:25 INFO BlockManagerMaster: Updated info of block broadcast_7_piece0 15/06/23 02:04:25 INFO SparkContext: Created broadcast 7 from broadcast at DAGScheduler.scala:839 15/06/23 02:04:25 INFO DAGScheduler: Submitting 1 missing tasks from Stage 3 (/home/ec2-user/myModel/metadata MapPartitionsRDD[7] at textFile at modelSaveLoad.scala:125) 15/06/23 02:04:25 INFO TaskSchedulerImpl: Adding task set 3.0 with 1 tasks 15/06/23 02:04:25 INFO TaskSetManager: Starting task 0.0 in stage 3.0 (TID 3, localhost, PROCESS_LOCAL, 1311 bytes) 15/06/23 02:04:25 INFO Executor: Running task 0.0 in stage 3.0 (TID 3) 15/06/23 02:04:25 INFO HadoopRDD: Input split: file:/home/ec2-user/myModel/metadata/part-0:0+97 15/06/23 02:04:25 INFO Executor: Finished task 0.0 in stage 3.0 (TID 3). 1989 bytes result sent to driver 15/06/23 02:04:25 INFO TaskSetManager: Finished task 0.0 in stage 3.0 (TID 3) in 10 ms on localhost (1/1) 15/06/23 02:04:25 INFO DAGScheduler: Stage 3 (first at modelSaveLoad.scala:125) finished in 0.010 s 15/06/23 02:04:25 INFO TaskSchedulerImpl: Removed TaskSet 3.0, whose tasks have all completed, from pool 15/06/23 02:04:25 INFO DAGScheduler: Job 3 finished: first at modelSaveLoad.scala:125, took 0.016193 s 15/06/23 02:04:25 WARN FSInputChecker: Problem opening checksum file: file:/home/ec2-user/myModel/data/_temporary/0/_temporary/attempt_201506230149_0027_r_01_0/part-r-2.parquet. Ignoring exception: java.io.EOFException at java.io.DataInputStream.readFully(DataInputStream.java:197) at java.io.DataInputStream.readFully(DataInputStream.java:169) at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.init(ChecksumFileSystem.java:149) at org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:341) at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:766) at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:402) at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:298) at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:297) at scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658) at
Re: What does [Stage 0: (0 + 2) / 2] mean on the console
Well, you could that (Stage information) is an ASCII representation of the WebUI (running on port 4040). Since you set local[4] you will have 4 threads for your computation, and since you are having 2 receivers, you are left with 2 threads to process ((0 + 2) -- This 2 is your 2 threads.) And the other /2 means you are having 2 tasks in that stage (with id 0). Thanks Best Regards On Tue, Jun 23, 2015 at 1:21 PM, bit1...@163.com bit1...@163.com wrote: Hi, I have a spark streaming application that runs locally with two receivers, some code snippet is as follows: conf.setMaster(local[4]) //RPC Log Streaming val rpcStream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, consumerParams, topicRPC, StorageLevel.MEMORY_ONLY) RPCLogStreamProcessor.process(rpcStream, taskConfBroadcast) //HTTP Log Streaming val httpStream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, consumerParams, topicHTTP, StorageLevel.MEMORY_ONLY) HttpLogStreamProcessor.process(httpStream, taskConfBroadcast) There is a log information showing on the console in red color [Stage 0: (0 + 2) / 2] It appears, then disappear, and then appear, disappear... For the above code, if I only have rpc streaming and comment the httpStream, then it disappear. I don't know how it occurs and how to suppress it -- bit1...@163.com
Re: Programming with java on spark
Did you happened to try this? JavaPairRDDInteger, String hadoopFile = sc.hadoopFile( /sigmoid, DataInputFormat.class, LongWritable.class, Text.class) Thanks Best Regards On Tue, Jun 23, 2015 at 6:58 AM, 付雅丹 yadanfu1...@gmail.com wrote: Hello, everyone! I'm new in spark. I have already written programs in Hadoop2.5.2, where I defined my own InputFormat and OutputFormat. Now I want to move my codes to spark using java language. The first problem I encountered is how to transform big txt file in local storage to RDD, which is compatible to my program written in hadoop. I found that there are functions in SparkContext which maybe helpful. But I don't know how to use them. E.G. public K,V,F extends org.apache.hadoop.mapreduce.InputFormatK,V RDD http://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/rdd/RDD.htmlscala.Tuple2K,V newAPIHadoopFile(String path, ClassF fClass, ClassK kClass, ClassV vClass, org.apache.hadoop.conf.Configuration conf) Get an RDD for a given Hadoop file with an arbitrary new API InputFormat and extra configuration options to pass to the input format. '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each record, directly caching the returned RDD or directly passing it to an aggregation or shuffle operation will create many references to the same object. If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first copy them using a map function. In java, the following is wrong. /option one Configuration confHadoop = new Configuration(); JavaPairRDDLongWritable,Text distFile=sc.newAPIHadoopFile( hdfs://cMaster:9000/wcinput/data.txt, DataInputFormat,LongWritable,Text,confHadoop); /option two Configuration confHadoop = new Configuration(); DataInputFormat input=new DataInputFormat(); LongWritable longType=new LongWritable(); Text text=new Text(); JavaPairRDDLongWritable,Text distFile=sc.newAPIHadoopFile( hdfs://cMaster:9000/wcinput/data.txt, input,longType,text,confHadoop); Can anyone help me? Thank you so much.
What does [Stage 0: (0 + 2) / 2] mean on the console
Hi, I have a spark streaming application that runs locally with two receivers, some code snippet is as follows: conf.setMaster(local[4]) //RPC Log Streaming val rpcStream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, consumerParams, topicRPC, StorageLevel.MEMORY_ONLY) RPCLogStreamProcessor.process(rpcStream, taskConfBroadcast) //HTTP Log Streaming val httpStream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, consumerParams, topicHTTP, StorageLevel.MEMORY_ONLY) HttpLogStreamProcessor.process(httpStream, taskConfBroadcast) There is a log information showing on the console in red color [Stage 0: (0 + 2) / 2] It appears, then disappear, and then appear, disappear... For the above code, if I only have rpc streaming and comment the httpStream, then it disappear. I don't know how it occurs and how to suppress it bit1...@163.com
Re: Re: What does [Stage 0: (0 + 2) / 2] mean on the console
Hi, Akhil, Thank you for the explanation! bit1...@163.com From: Akhil Das Date: 2015-06-23 16:29 To: bit1...@163.com CC: user Subject: Re: What does [Stage 0: (0 + 2) / 2] mean on the console Well, you could that (Stage information) is an ASCII representation of the WebUI (running on port 4040). Since you set local[4] you will have 4 threads for your computation, and since you are having 2 receivers, you are left with 2 threads to process ((0 + 2) -- This 2 is your 2 threads.) And the other /2 means you are having 2 tasks in that stage (with id 0). Thanks Best Regards On Tue, Jun 23, 2015 at 1:21 PM, bit1...@163.com bit1...@163.com wrote: Hi, I have a spark streaming application that runs locally with two receivers, some code snippet is as follows: conf.setMaster(local[4]) //RPC Log Streaming val rpcStream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, consumerParams, topicRPC, StorageLevel.MEMORY_ONLY) RPCLogStreamProcessor.process(rpcStream, taskConfBroadcast) //HTTP Log Streaming val httpStream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, consumerParams, topicHTTP, StorageLevel.MEMORY_ONLY) HttpLogStreamProcessor.process(httpStream, taskConfBroadcast) There is a log information showing on the console in red color [Stage 0: (0 + 2) / 2] It appears, then disappear, and then appear, disappear... For the above code, if I only have rpc streaming and comment the httpStream, then it disappear. I don't know how it occurs and how to suppress it bit1...@163.com
RE: Code review - Spark SQL command-line client for Cassandra
Awesome, thanks Pawan – for now I’ll give spark-notebook a go until Zeppelin catches up to Spark 1.4 (and when Zeppelin has a binary release – my PC doesn’t seem too happy about building a Node.js app from source). Thanks for the detailed instructions!! *From:* pawan kumar [mailto:pkv...@gmail.com] *Sent:* 22 June 2015 18:53 *To:* Matthew Johnson *Cc:* Silvio Fiorito; Mohammed Guller; shahid ashraf; user *Subject:* Re: Code review - Spark SQL command-line client for Cassandra Hi Matthew, you could add the dependencies yourself by using the %dep command in zeppelin ( https://zeppelin.incubator.apache.org/docs/interpreter/spark.html). I have not tried with zeppelin but have used spark-notebook https://github.com/andypetrella/spark-notebook and got Cassandra connector working. Below have provided samples. *In Zeppelin: (Not Tested)* *%*dep z.load(com.datastax.com:spark-cassandra-connector_2.11:1.4.0-M1) Note: In order for Spark and Cassandra to work the Spark , Spark-Cassandra-Connector, Spark-notebook spark version should match. In the above case it was 1.2.0 *If using spark-notebook: (Tested works)* Installed : 1. Apache Spark 1.2.0 2. Cassandra DSE - 1 node (just Cassandra and no analytics) 3. Notebook: wget https://s3.eu-central-1.amazonaws.com/spark-notebook/tgz/spark-notebook-0.4.3-scala-2.10.4-spark-1.2.0-hadoop-2.4.0.tgz Once notebook have been started : http://ec2-xx-x-xx-xxx.us-west-x.compute.amazonaws.com:9000/#clusters Select Standalone: In SparkConf : update the spark master ip to EC2 : internal DNS name. *In Spark Notebook:* :dp com.datastax.spark % spark-cassandra-connector_2.10 % 1.2.0-rc3 import com.datastax.spark.connector._ import com.datastax.spark.connector.rdd.CassandraRDD val cassandraHost:String = localhost reset(lastChanges = _.set(spark.cassandra.connection.host, cassandraHost)) val rdd = sparkContext.cassandraTable(excelsior,test) rdd.toArray.foreach(println) Note: In order for Spark and Cassandra to work the Spark , Spark-Cassandra-Connector, Spark-notebook spark version should match. In the above case it was 1.2.0 On Mon, Jun 22, 2015 at 9:52 AM, Matthew Johnson matt.john...@algomi.com wrote: Hi Pawan, Looking at the changes for that git pull request, it looks like it just pulls in the dependency (and transitives) for “spark-cassandra-connector”. Since I am having to build Zeppelin myself anyway, would it be ok to just add this myself for the connector for 1.4.0 (as found here http://search.maven.org/#artifactdetails%7Ccom.datastax.spark%7Cspark-cassandra-connector_2.11%7C1.4.0-M1%7Cjar)? What exactly is it that does not currently exist for Spark 1.4? Thanks, Matthew *From:* pawan kumar [mailto:pkv...@gmail.com] *Sent:* 22 June 2015 17:19 *To:* Silvio Fiorito *Cc:* Mohammed Guller; Matthew Johnson; shahid ashraf; user@spark.apache.org *Subject:* Re: Code review - Spark SQL command-line client for Cassandra Hi, Zeppelin has a cassandra-spark-connector built into the build. I have not tried it yet may be you could let us know. https://github.com/apache/incubator-zeppelin/pull/79 To build a Zeppelin version with the *Datastax Spark/Cassandra connector https://github.com/datastax/spark-cassandra-connector* mvn clean package *-Pcassandra-spark-1.x* -Dhadoop.version=xxx -Phadoop-x.x -DskipTests Right now the Spark/Cassandra connector is available for *Spark 1.1* and *Spark 1.2*. Support for *Spark 1.3* is not released yet (*but you can build you own Spark/Cassandra connector version **1.3.0-SNAPSHOT*). Support for *Spark 1.4* does not exist yet Please do not forget to add -Dspark.cassandra.connection.host=xxx to the *ZEPPELIN_JAVA_OPTS*parameter in *conf/zeppelin-env.sh* file. Alternatively you can add this parameter in the parameter list of the *Spark interpreter* on the GUI -Pawan On Mon, Jun 22, 2015 at 9:04 AM, Silvio Fiorito silvio.fior...@granturing.com wrote: Yes, just put the Cassandra connector on the Spark classpath and set the connector config properties in the interpreter settings. *From: *Mohammed Guller *Date: *Monday, June 22, 2015 at 11:56 AM *To: *Matthew Johnson, shahid ashraf *Cc: *user@spark.apache.org *Subject: *RE: Code review - Spark SQL command-line client for Cassandra I haven’t tried using Zeppelin with Spark on Cassandra, so can’t say for sure, but it should not be difficult. Mohammed *From:* Matthew Johnson [mailto:matt.john...@algomi.com matt.john...@algomi.com] *Sent:* Monday, June 22, 2015 2:15 AM *To:* Mohammed Guller; shahid ashraf *Cc:* user@spark.apache.org *Subject:* RE: Code review - Spark SQL command-line client for Cassandra Thanks Mohammed, it’s good to know I’m not alone! How easy is it to integrate Zeppelin with Spark on Cassandra? It looks like it would only support Hadoop out of the box. Is it just a case of dropping the Cassandra Connector onto the Spark classpath? Cheers, Matthew *From:*
Spark Streaming: limit number of nodes
I have set up small standalone cluster: 5 nodes, every node has 5GB of memory an 8 cores. As you can see, node doesn't have much RAM. I have 2 streaming apps, first one is configured to use 3GB of memory per node and second one uses 2GB per node. My problem is, that smaller app could easily run on 2 or 3 nodes, instead of 5 so I could lanuch third app. Is it possible to limit number of nodes(executors) that app wil get from standalone cluster?
Re: Spark Streaming: limit number of nodes
Use *spark.cores.max* to limit the CPU per job, then you can easily accommodate your third job also. Thanks Best Regards On Tue, Jun 23, 2015 at 5:07 PM, Wojciech Pituła w.pit...@gmail.com wrote: I have set up small standalone cluster: 5 nodes, every node has 5GB of memory an 8 cores. As you can see, node doesn't have much RAM. I have 2 streaming apps, first one is configured to use 3GB of memory per node and second one uses 2GB per node. My problem is, that smaller app could easily run on 2 or 3 nodes, instead of 5 so I could lanuch third app. Is it possible to limit number of nodes(executors) that app wil get from standalone cluster?
Calculating tuple count /input rate with time
I am calculating input rate using the following logic. And i think this foreachRDD is always running on driver (println are seen on driver) 1- Is there any other way to do that in less cost . 2- Will this give me the correct count for rate . //code - inputStream.foreachRDD(new FunctionJavaRDDString, Void() { @Override public Void call(JavaRDDString stringJavaRDD) throws Exception { System.out.println(System.currentTimeMillis()+,spoutstringJavaRDD, + stringJavaRDD.count() ); return null; } }); -- Thanks Regards, Anshu Shukla
Re: Multiple executors writing file using java filewriter
Thanks alot , Because i just want to log timestamp and unique message id and not full RDD . On Tue, Jun 23, 2015 at 12:41 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Why don't you do a normal .saveAsTextFiles? Thanks Best Regards On Mon, Jun 22, 2015 at 11:55 PM, anshu shukla anshushuk...@gmail.com wrote: Thanx for reply !! YES , Either it should write on any machine of cluster or Can you please help me ... that how to do this . Previously i was using writing using collect () , so some of my tuples are missing while writing. //previous logic that was just creating the file on master - newinputStream.foreachRDD(new Function2JavaRDDString, Time, Void() { @Override public Void call(JavaRDDString v1, Time v2) throws Exception { for(String s:v1.collect()) { //System.out.println(v1 here is + v1 + --- + s); spoutlog.batchLogwriter(System.currentTimeMillis(), spout-MSGID, + msgeditor.getMessageId(s)); //System.out.println(msgeditor.getMessageId(s)); } return null; } }); On Mon, Jun 22, 2015 at 11:31 PM, Richard Marscher rmarsc...@localytics.com wrote: Is spoutLog just a non-spark file writer? If you run that in the map call on a cluster its going to be writing in the filesystem of the executor its being run on. I'm not sure if that's what you intended. On Mon, Jun 22, 2015 at 1:35 PM, anshu shukla anshushuk...@gmail.com wrote: Running perfectly in local system but not writing to file in cluster mode .ANY suggestions please .. //msgid is long counter JavaDStreamString newinputStream=inputStream.map(new FunctionString, String() { @Override public String call(String v1) throws Exception { String s1=msgId+@+v1; System.out.println(s1); msgId++; try { *//filewriter logic spoutlog.batchLogwriter(System.currentTimeMillis(), spout-MSGID, + msgeditor.getMessageId(s1));* } catch (Exception e) { System.out.println(exeception is here); e.printStackTrace(); throw e; } System.out.println(msgid,+msgId); return msgeditor.addMessageId(v1,msgId); } }); -- Thanks Regards, Anshu Shukla On Mon, Jun 22, 2015 at 10:50 PM, anshu shukla anshushuk...@gmail.com wrote: Can not we write some data to a txt file in parallel with multiple executors running in parallel ?? -- Thanks Regards, Anshu Shukla -- Thanks Regards, Anshu Shukla -- Thanks Regards, Anshu Shukla -- Thanks Regards, Anshu Shukla
Re: Velox Model Server
Yes, and typically needs are 100ms. Now imagine even 10 concurrent requests. My experience has been that this approach won't nearly scale. The best you could probably do is async mini-batch near-real-time scoring, pushing results to some store for retrieval, which could be entirely suitable for your use case. On Tue, Jun 23, 2015 at 8:52 AM, Nick Pentreath nick.pentre...@gmail.com wrote: If your recommendation needs are real-time (1s) I am not sure job server and computing the refs with spark will do the trick (though those new BLAS-based methods may have given sufficient speed up). - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Help optimising Spark SQL query
Thanks for the suggestions everyone, appreciate the advice. I tried replacing DISTINCT for the nested GROUP BY, running on 1.4 instead of 1.3, replacing the date casts with a between operation on the corresponding long constants instead and changing COUNT(*) to COUNT(1). None of these seem to have made any remarkable difference in running time for the query. I'll hook up YourKit and see if we can figure out where the CPU time is going, then post back. On 22 June 2015 at 16:01, Yin Huai yh...@databricks.com wrote: Hi James, Maybe it's the DISTINCT causing the issue. I rewrote the query as follows. Maybe this one can finish faster. select sum(cnt) as uses, count(id) as users from ( select count(*) cnt, cast(id as string) as id, from usage_events where from_unixtime(cast(timestamp_millis/1000 as bigint)) between '2015-06-09' and '2015-06-16' group by cast(id as string) ) tmp Thanks, Yin On Mon, Jun 22, 2015 at 12:55 PM, Jörn Franke jornfra...@gmail.com wrote: Generally (not only spark sql specific) you should not cast in the where part of a sql query. It is also not necessary in your case. Getting rid of casts in the whole query will be also beneficial. Le lun. 22 juin 2015 à 17:29, James Aley james.a...@swiftkey.com a écrit : Hello, A colleague of mine ran the following Spark SQL query: select count(*) as uses, count (distinct cast(id as string)) as users from usage_events where from_unixtime(cast(timestamp_millis/1000 as bigint)) between '2015-06-09' and '2015-06-16' The table contains billions of rows, but totals only 64GB of data across ~30 separate files, which are stored as Parquet with LZO compression in S3. From the referenced columns: * id is Binary, which we cast to a String so that we can DISTINCT by it. (I was already told this will improve in a later release, in a separate thread.) * timestamp_millis is a long, containing a unix timestamp with millisecond resolution This took nearly 2 hours to run on a 5 node cluster of r3.xlarge EC2 instances, using 20 executors, each with 4GB memory. I can see from monitoring tools that the CPU usage is at 100% on all nodes, but incoming network seems a bit low at 2.5MB/s, suggesting to me that this is CPU-bound. Does that seem slow? Can anyone offer any ideas by glancing at the query as to why this might be slow? We'll profile it meanwhile and post back if we find anything ourselves. A side issue - I've found that this query, and others, sometimes completes but doesn't return any results. There appears to be no error that I can see in the logs, and Spark reports the job as successful, but the connected JDBC client (SQLWorkbenchJ in this case), just sits there forever waiting. I did a quick Google and couldn't find anyone else having similar issues. Many thanks, James.
Re: s3 - Can't make directory for path
On 23 Jun 2015, at 00:09, Danny kont...@dannylinden.de wrote: hi, have you tested s3://ww-sandbox/name_of_path/ instead of s3://ww-sandbox/name_of_path + make sure the bucket is there already. Hadoop s3 clients don't currently handle that step or have you test to add your file extension with placeholder (*) like: s3://ww-sandbox/name_of_path/*.gz or s3://ww-sandbox/name_of_path/*.csv depend on your files. If it does not work pls test with the new s3a protocol of Spark/Hadoop: https://issues.apache.org/jira/browse/HADOOP-10400 ...but don't expect performance or scalability in Hadoop 2.6; Hadoop 2.7 has the fixes needed for production use, as does CDH5.4 - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Web UI vs History Server Bugs
Probably your application has crashed or was terminated without invoking the stop method of spark context - in such cases it doesn't create the empty flag file which apparently tells the history server that it can safely show the log data - simpy go to some of the other dirs of the history server to see what the name of the flag file was and then create it manually in the dirs of the missing apps - then they will appear in the history server ui From: Steve Loughran [mailto:ste...@hortonworks.com] Sent: Monday, June 22, 2015 7:22 PM To: Jonathon Cai Cc: user@spark.apache.org Subject: Re: Web UI vs History Server Bugs well, I'm afraid you've reached the limits of my knowledge ... hopefully someone else can answer On 22 Jun 2015, at 16:37, Jonathon Cai jonathon@yale.edu wrote: No, what I'm seeing is that while the cluster is running, I can't see the app info after the app is completed. That is to say, when I click on the application name on master:8080, no info is shown. However, when I examine the same file on the History Server, the application information opens fine. On Sat, Jun 20, 2015 at 6:47 AM, Steve Loughran ste...@hortonworks.com wrote: On 17 Jun 2015, at 19:10, jcai jonathon@yale.edu wrote: Hi, I am running this on Spark stand-alone mode. I find that when I examine the web UI, a couple bugs arise: 1. There is a discrepancy between the number denoting the duration of the application when I run the history server and the number given by the web UI (default address is master:8080). I checked more specific details, including task and stage durations (when clicking on the application), and these appear to be the same for both avenues. 2. Sometimes the web UI on master:8080 is unable to display more specific information for an application that has finished (when clicking on the application), even when there is a log file in the appropriate directory. But when the history server is opened, it is able to read this file and output information. There's a JIRA open on the history server caching incomplete work...if you click on the link to a job while it's in progress, you don't get any updates later. does this sound like what you are seeing?
Re: flume sinks supported by spark streaming
https://spark.apache.org/docs/latest/streaming-flume-integration.html Yep, avro sink is the correct one. -- Ruslan Dautkhanov On Tue, Jun 23, 2015 at 9:46 PM, Hafiz Mujadid hafizmujadi...@gmail.com wrote: Hi! I want to integrate flume with spark streaming. I want to know which sink type of flume are supported by spark streaming? I saw one example using avroSink. Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/flume-sinks-supported-by-spark-streaming-tp23462.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark standalone cluster - resource management
Check the available resources you have (cpu cores memory ) on master web ui. The log you see means the job can't get any resources. On Wed, Jun 24, 2015 at 5:03 AM, Nizan Grauer ni...@windward.eu wrote: I'm having 30G per machine This is the first (and only) job I'm trying to submit. So it's weird that for --total-executor-cores=20 it works, and for --total-executor-cores=4 it doesn't On Tue, Jun 23, 2015 at 10:46 PM, Igor Berman igor.ber...@gmail.com wrote: probably there are already running jobs there in addition, memory is also a resource, so if you are running 1 application that took all your memory and then you are trying to run another application that asks for the memory the cluster doesn't have then the second app wont be running so why are u specifying 22g as executor memory? how much memory you have for each machine? On 23 June 2015 at 09:33, nizang ni...@windward.eu wrote: to give a bit more data on what I'm trying to get - I have many tasks I want to run in parallel, so I want each task to catch small part of the cluster (- only limited part of my 20 cores in the cluster) I have important tasks that I want them to get 10 cores, and I have small tasks that I want to run with only 1 or 2 cores) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-standalone-cluster-resource-management-tp23444p23445.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: map V mapPartitions
One example is that you'd like to set up jdbc connection for each partition and share this connection across the records. mapPartitions is much more like the paradigm of mapper in mapreduce. In the mapper of mapreduce, you have setup method to do any initialization stuff before processing the split and read and process records one by one in the map method. On Wed, Jun 24, 2015 at 8:03 AM, Holden Karau hol...@pigscanfly.ca wrote: I think one of the primary cases where mapPartitions is useful if you are going to be doing any setup work that can be re-used between processing each element, this way the setup work only needs to be done once per partition (for example creating an instance of jodatime). Both map and mapPartitions are implemented using the MapPartitionsRDD. In general if your logic is easily expressed with map, and there isn't any setup work you are doing that could be shared, using map instead of map partitions tends to result in more readable code which is valuable in and off its self. On Tue, Jun 23, 2015 at 4:57 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I know when to use a map () but when should i use mapPartitions() ? Which is faster ? -- Deepak -- Cell : 425-233-8271 Twitter: https://twitter.com/holdenkarau Linked In: https://www.linkedin.com/in/holdenkarau
Re: When to use underlying data management layer versus standalone Spark?
I don't think this is the correct question. Spark can be deployed on different cluster manager frameworks like standard alone, yarn mesos. Spark can't run without these cluster manager framework, that means spark depend on cluster manager framework. And the data management layer is the upstream of spark which is independent with spark. But spark do provide apis to access different data management layer. It should depend on your upstream application which data store should use, it's not related with spark. On Wed, Jun 24, 2015 at 3:46 AM, commtech michael.leon...@opco.com wrote: Hi, I work at a large financial institution in New York. We're looking into Spark and trying to learn more about the deployment/use cases for real-time analytics with Spark. When would it be better to deploy standalone Spark versus Spark on top of a more comprehensive data management layer (Hadoop, Cassandra, MongoDB, etc.)? If you do deploy on top of one of these, are there different use cases where one of these database management layers are better or worse? Any color would be very helpful. Thank you in advance. Sincerely, Michael -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/When-to-use-underlying-data-management-layer-versus-standalone-Spark-tp23455.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark launching without all of the requested YARN resources
Why do you want it start until all the resources are ready ? Make it start as early as possible should make it complete earlier and increase the utilization of resources On Tue, Jun 23, 2015 at 10:34 PM, Arun Luthra arun.lut...@gmail.com wrote: Sometimes if my Hortonworks yarn-enabled cluster is fairly busy, Spark (via spark-submit) will begin its processing even though it apparently did not get all of the requested resources; it is running very slowly. Is there a way to force Spark/YARN to only begin when it has the full set of resources that I request? Thanks, Arun
Re: Yarn application ID for Spark job on Yarn
I don't think there is yarn related stuff to access in spark. Spark don't depend on yarn. BTW, why do you want the yarn application id ? On Mon, Jun 22, 2015 at 11:45 PM, roy rp...@njit.edu wrote: Hi, Is there a way to get Yarn application ID inside spark application, when running spark Job on YARN ? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Yarn-application-ID-for-Spark-job-on-Yarn-tp23429.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Kafka createDirectStream issue
Please run it in your own application and not in the spark shell. I see that you are trying to stop the Spark context and create a new StreamingContext. That will lead to unexpected issue, that you are seeing. Please make a standalone SBT/Maven app for Spark Streaming. On Tue, Jun 23, 2015 at 3:43 PM, syepes sye...@gmail.com wrote: yes, I have two clusters one standalone an another using Mesos Sebastian YEPES http://sebastian-yepes.com On Wed, Jun 24, 2015 at 12:37 AM, drarse [via Apache Spark User List] [hidden email] http:///user/SendEmail.jtp?type=nodenode=23458i=0 wrote: Hi syepes, Are u run the application in standalone mode? Regards El 23/06/2015 22:48, syepes [via Apache Spark User List] [hidden email] http:///user/SendEmail.jtp?type=nodenode=23457i=0 escribió: Hello, I am trying use the new Kafka consumer KafkaUtils.createDirectStream but I am having some issues making it work. I have tried different versions of Spark v1.4.0 and branch-1.4 #8d6e363 and I am still getting the same strange exception ClassNotFoundException: $line49.$read$$iwC$$i Has anyone else been facing this kind of problem? The following is the code and logs that I have been using to reproduce the issue: spark-shell: script -- sc.stop() import _root_.kafka.serializer.StringDecoder import org.apache.spark.SparkConf import org.apache.spark.streaming._ import org.apache.spark.streaming.kafka.KafkaUtils val sparkConf = new SparkConf().setMaster(spark://localhost:7077).setAppName(KCon).set(spark.ui.port, 4041 ).set(spark.driver.allowMultipleContexts, true).setJars(Array(/opt/spark-libs/spark-streaming-kafka-assembly_2.10-1.4.2-SNAPSHOT.jar)) val ssc = new StreamingContext(sparkConf, Seconds(5)) val kafkaParams = Map[String, String](bootstrap.servers - localhost:9092, schema.registry.url - http://localhost:8081;, zookeeper.connect - localhost:2181, group.id - KCon ) val topic = Set(test) val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topic) val raw = messages.map(_._2) val words = raw.flatMap(_.split( )) val wordCounts = words.map(x = (x, 1L)).reduceByKey(_ + _) wordCounts.print() ssc.start() ssc.awaitTermination() -- spark-shell: output -- sparkConf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@330e37b2 ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@28ec9c23 kafkaParams: scala.collection.immutable.Map[String,String] = Map(bootstrap.servers - localhost:9092, schema.registry.url - http://localhost:8081, zookeeper.connect - localhost:2181, group.id - OPC)topic: scala.collection.immutable.Set[String] = Set(test) WARN [main] kafka.utils.VerifiableProperties - Property schema.registry.url is not valid messages: org.apache.spark.streaming.dstream.InputDStream[(String, String)] = org.apache.spark.streaming.kafka.DirectKafkaInputDStream@1e71b70d raw: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.MappedDStream@578ce232 words: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.FlatMappedDStream@351cc4b5 wordCounts: org.apache.spark.streaming.dstream.DStream[(String, Long)] = org.apache.spark.streaming.dstream.ShuffledDStream@ae04104 WARN [JobGenerator] kafka.utils.VerifiableProperties - Property schema.registry.url is not valid WARN [task-result-getter-0] org.apache.spark.scheduler.TaskSetManager - Lost task 0.0 in stage 0.0 (TID 0, 10.3.30.87): java.lang.ClassNotFoundException: $line49.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1 at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:66) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) .. .. Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)
Re: SQL vs. DataFrame API
If yo change to ```val numbers2 = numbers```, then it have the same problem On Tue, Jun 23, 2015 at 2:54 PM, Ignacio Blasco elnopin...@gmail.com wrote: It seems that it doesn't happen in Scala API. Not exactly the same as in python, but pretty close. https://gist.github.com/elnopintan/675968d2e4be68958df8 2015-06-23 23:11 GMT+02:00 Davies Liu dav...@databricks.com: I think it also happens in DataFrames API of all languages. On Tue, Jun 23, 2015 at 9:16 AM, Ignacio Blasco elnopin...@gmail.com wrote: That issue happens only in python dsl? El 23/6/2015 5:05 p. m., Bob Corsaro rcors...@gmail.com escribió: Thanks! The solution: https://gist.github.com/dokipen/018a1deeab668efdf455 On Mon, Jun 22, 2015 at 4:33 PM Davies Liu dav...@databricks.com wrote: Right now, we can not figure out which column you referenced in `select`, if there are multiple row with the same name in the joined DataFrame (for example, two `value`). A workaround could be: numbers2 = numbers.select(df.name, df.value.alias('other')) rows = numbers.join(numbers2, (numbers.name==numbers2.name) (numbers.value != numbers2.other), how=inner) \ .select(numbers.name, numbers.value, numbers2.other) \ .collect() On Mon, Jun 22, 2015 at 12:53 PM, Ignacio Blasco elnopin...@gmail.com wrote: Sorry thought it was scala/spark El 22/6/2015 9:49 p. m., Bob Corsaro rcors...@gmail.com escribió: That's invalid syntax. I'm pretty sure pyspark is using a DSL to create a query here and not actually doing an equality operation. On Mon, Jun 22, 2015 at 3:43 PM Ignacio Blasco elnopin...@gmail.com wrote: Probably you should use === instead of == and !== instead of != Can anyone explain why the dataframe API doesn't work as I expect it to here? It seems like the column identifiers are getting confused. https://gist.github.com/dokipen/4b324a7365ae87b7b0e5 - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: map V mapPartitions
I think one of the primary cases where mapPartitions is useful if you are going to be doing any setup work that can be re-used between processing each element, this way the setup work only needs to be done once per partition (for example creating an instance of jodatime). Both map and mapPartitions are implemented using the MapPartitionsRDD. In general if your logic is easily expressed with map, and there isn't any setup work you are doing that could be shared, using map instead of map partitions tends to result in more readable code which is valuable in and off its self. On Tue, Jun 23, 2015 at 4:57 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I know when to use a map () but when should i use mapPartitions() ? Which is faster ? -- Deepak -- Cell : 425-233-8271 Twitter: https://twitter.com/holdenkarau Linked In: https://www.linkedin.com/in/holdenkarau
Re: Settings for K-Means Clustering in Mlib for large data set
A rough estimate of the worst case memory requirement for driver is about 2 * k * runs * numFeatures * numPartitions * 8 bytes. I put 2 at the beginning because the previous centers are still in memory while receiving new center updates. -Xiangrui On Fri, Jun 19, 2015 at 9:02 AM, Rogers Jeffrey rogers.john2...@gmail.com wrote: Thanks. Setting the driver memory property worked for K=1000 . But when I increased K to1500 I get the following error: 15/06/19 09:38:44 INFO ContextCleaner: Cleaned accumulator 7 15/06/19 09:38:44 INFO BlockManagerInfo: Removed broadcast_34_piece0 on 172.31.3.51:45157 in memory (size: 1568.0 B, free: 10.4 GB) 15/06/19 09:38:44 INFO BlockManagerInfo: Removed broadcast_34_piece0 on 172.31.9.50:59356 in memory (size: 1568.0 B, free: 73.6 GB) 15/06/19 09:38:44 INFO BlockManagerInfo: Removed broadcast_34_piece0 on 172.31.9.50:60934 in memory (size: 1568.0 B, free: 73.6 GB) 15/06/19 09:38:44 INFO BlockManagerInfo: Removed broadcast_34_piece0 on 172.31.15.51:37825 in memory (size: 1568.0 B, free: 73.6 GB) 15/06/19 09:38:44 INFO BlockManagerInfo: Removed broadcast_34_piece0 on 172.31.15.51:60610 in memory (size: 1568.0 B, free: 73.6 GB) 15/06/19 09:38:44 INFO ContextCleaner: Cleaned shuffle 5 Exception in thread Thread-2 java.lang.OutOfMemoryError: Requested array size exceeds VM limit at java.util.Arrays.copyOf(Arrays.java:2367) at java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:130) at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:114) at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:587) at java.lang.StringBuilder.append(StringBuilder.java:214) at py4j.Protocol.getOutputCommand(Protocol.java:305) at py4j.commands.CallCommand.execute(CallCommand.java:82) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:745) Exception in thread Thread-300 java.lang.OutOfMemoryError: Requested array size exceeds VM limit at java.util.Arrays.copyOf(Arrays.java:2367) at java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:130) at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:114) at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:587) at java.lang.StringBuilder.append(StringBuilder.java:214) at py4j.Protocol.getOutputCommand(Protocol.java:305) at py4j.commands.CallCommand.execute(CallCommand.java:82) at py4j.GatewayConnection.run(GatewayConnection.java:207) Is there any method/guideline through which I can understand the memory requirement before hand and make appropriate configurations? Regards, Rogers Jeffrey L On Thu, Jun 18, 2015 at 8:14 PM, Rogers Jeffrey rogers.john2...@gmail.com wrote: I am submitting the application from a python notebook. I am launching pyspark as follows: SPARK_PUBLIC_DNS=ec2-54-165-202-17.compute-1.amazonaws.com SPARK_WORKER_CORES=8 SPARK_WORKER_MEMORY=15g SPARK_MEM=30g OUR_JAVA_MEM=30g SPARK_DAEMON_JAVA_OPTS=-XX:MaxPermSize=30g -Xms30g -Xmx30g IPYTHON=1 PYSPARK_PYTHON=/usr/bin/python SPARK_PRINT_LAUNCH_COMMAND=1 ./spark/bin/pyspark --master spark://54.165.202.17.compute-1.amazonaws.com:7077 --deploy-mode client I guess I should be adding another extra argument --conf spark.driver.memory=15g . Is that correct? Regards, Rogers Jeffrey L On Thu, Jun 18, 2015 at 7:50 PM, Xiangrui Meng men...@gmail.com wrote: With 80,000 features and 1000 clusters, you need 80,000,000 doubles to store the cluster centers. That is ~600MB. If there are 10 partitions, you might need 6GB on the driver to collect updates from workers. I guess the driver died. Did you specify driver memory with spark-submit? -Xiangrui On Thu, Jun 18, 2015 at 12:22 PM, Rogers Jeffrey rogers.john2...@gmail.com wrote: Hi All, I am trying to run KMeans clustering on a large data set with 12,000 points and 80,000 dimensions. I have a spark cluster in Ec2 stand alone mode with 8 workers running on 2 slaves with 160 GB Ram and 40 VCPU. My Code is as Follows: def convert_into_sparse_vector(A): non_nan_indices=np.nonzero(~np.isnan(A) ) non_nan_values=A[non_nan_indices] dictionary=dict(zip(non_nan_indices[0],non_nan_values)) return Vectors.sparse (len(A),dictionary) X=[convert_into_sparse_vector(A) for A in complete_dataframe.values ] sc=SparkContext(appName=parallel_kmeans) data=sc.parallelize(X,10) model = KMeans.train(data, 1000, initializationMode=k-means||) where complete_dataframe is a pandas data frame that has my data. I get the error: Py4JNetworkError: An error occurred while trying to connect to the Java server. The error trace is as follows: Exception happened during processing of request from ('127.0.0.1', 41360) Traceback (most recent call last): File /usr/lib64/python2.6/SocketServer.py, line
Re: Spark FP-Growth algorithm for frequent sequential patterns
This is on the wish list for Spark 1.5. Assuming that the items from the same transaction are distinct. We can still follow FP-Growth's steps: 1. find frequent items 2. filter transactions and keep only frequent items 3. do NOT order by frequency 4. use suffix to partition the transactions (whether to use prefix or suffix doesn't really matter in this case) 5. grow FP-tree locally on each partition (the data structure should be the same) 6. generate frequent sub-sequences +Feynman Best, Xiangrui On Fri, Jun 19, 2015 at 10:51 AM, ping yan sharon...@gmail.com wrote: Hi, I have a use case where I'd like to mine frequent sequential patterns (consider the clickpath scenario). Transaction A - B doesn't equal Transaction B-A.. From what I understand about FP-growth in general and the MLlib implementation of it, the orders are not preserved. Anyone can provide some insights or ideas in extending the algorithm to solve frequent sequential pattern mining problems? Thanks as always. Ping - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Issue running Spark 1.4 on Yarn
Hi Kevin I never did. I checked for free space in the root partition, don't think this was an issue. Now that 1.4 is officially out I'll probably give it another shot. On Jun 22, 2015 4:28 PM, Kevin Markey kevin.mar...@oracle.com wrote: Matt: Did you ever resolve this issue? When running on a cluster or pseudocluster with too little space for /tmp or /var files, we've seen this sort of behavior. There's enough memory, and enough HDFS space, but there's insufficient space on one or more nodes for other temporary files as logs grow and don't get cleared or deleted. Depends on your configuration. Often restarting will temporarily fix things, but for shorter and shorter periods of time until nothing works. Fix is to expand space available for logs, pruning them, a cron job to prune them periodically, and/or modifying limits on logs. Kevin On 06/09/2015 04:15 PM, Matt Kapilevich wrote: I've tried running a Hadoop app pointing to the same queue. Same thing now, the job doesn't get accepted. I've cleared out the queue and killed all the pending jobs, the queue is still unusable. It seems like an issue with YARN, but it's specifically Spark that leaves the queue in this state. I've ran a Hadoop job in a for loop 10x, while specifying the queue explicitly, just to double-check. On Tue, Jun 9, 2015 at 4:45 PM, Matt Kapilevich matve...@gmail.com wrote: From the RM scheduler, I see 3 applications currently stuck in the root.thequeue queue. Used Resources: memory:0, vCores:0 Num Active Applications: 0 Num Pending Applications: 3 Min Resources: memory:0, vCores:0 Max Resources: memory:6655, vCores:4 Steady Fair Share: memory:1664, vCores:0 Instantaneous Fair Share: memory:6655, vCores:0 On Tue, Jun 9, 2015 at 4:30 PM, Matt Kapilevich matve...@gmail.com wrote: Yes! If I either specify a different queue or don't specify a queue at all, it works. On Tue, Jun 9, 2015 at 4:25 PM, Marcelo Vanzin van...@cloudera.com wrote: Does it work if you don't specify a queue? On Tue, Jun 9, 2015 at 1:21 PM, Matt Kapilevich matve...@gmail.com wrote: Hi Marcelo, Yes, restarting YARN fixes this behavior and it again works the first few times. The only thing that's consistent is that once Spark job submissions stop working, it's broken for good. On Tue, Jun 9, 2015 at 4:12 PM, Marcelo Vanzin van...@cloudera.com wrote: Apologies, I see you already posted everything from the RM logs that mention your stuck app. Have you tried restarting the YARN cluster to see if that changes anything? Does it go back to the first few tries work behaviour? I run 1.4 on top of CDH 5.4 pretty often and haven't seen anything like this. On Tue, Jun 9, 2015 at 1:01 PM, Marcelo Vanzin van...@cloudera.com wrote: On Tue, Jun 9, 2015 at 11:31 AM, Matt Kapilevich matve...@gmail.com wrote: Like I mentioned earlier, I'm able to execute Hadoop jobs fine even now - this problem is specific to Spark. That doesn't necessarily mean anything. Spark apps have different resource requirements than Hadoop apps. Check your RM logs for any line that mentions your Spark app id. That may give you some insight into what's happening or not. -- Marcelo -- Marcelo -- Marcelo
Re: Failed stages and dropped executors when running implicit matrix factorization/ALS
It shouldn't be hard to handle 1 billion ratings in 1.3. Just need more information to guess what happened: 1. Could you share the ALS settings, e.g., number of blocks, rank and number of iterations, as well as number of users/items in your dataset? 2. If you monitor the progress in the WebUI, how much data is stored in memory and how much data is shuffled per iteration? 3. Do you have enough disk space for the shuffle files? 4. Did you set checkpointDir in SparkContext and checkpointInterval in ALS? Best, Xiangrui On Fri, Jun 19, 2015 at 11:43 AM, Ravi Mody rmody...@gmail.com wrote: Hi, I'm running implicit matrix factorization/ALS in Spark 1.3.1 on fairly large datasets (1+ billion input records). As I grow my dataset I often run into issues with a lot of failed stages and dropped executors, ultimately leading to the whole application failing. The errors are like org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 19 and org.apache.spark.shuffle.FetchFailedException: Failed to connect to These occur during flatMap, mapPartitions, and aggregate stages. I know that increasing memory fixes this issue, but most of the time my executors are only using a tiny portion of the their allocated memory (10%). Often, the stages run fine until the last iteration or two of ALS, but this could just be a coincidence. I've tried tweaking a lot of settings, but it's time-consuming to do this through guess-and-check. Right now I have these set: spark.shuffle.memoryFraction = 0.3 spark.storage.memoryFraction = 0.65 spark.executor.heartbeatInterval = 60 I'm sure these settings aren't optimal - any idea of what could be causing my errors, and what direction I can push these settings in to get more out of my memory? I'm currently using 240 GB of memory (on 7 executors) for a 1 billion record dataset, which seems like too much. Thanks! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How could output the StreamingLinearRegressionWithSGD prediction result?
Please check the input path to your test data, and call `.count()` and see whether there are records in it. -Xiangrui On Sat, Jun 20, 2015 at 9:23 PM, Gavin Yue yue.yuany...@gmail.com wrote: Hey, I am testing the StreamingLinearRegressionWithSGD following the tutorial. It works, but I could not output the prediction results. I tried the saveAsTextFile, but it only output _SUCCESS to the folder. I am trying to check the prediction results and use BinaryClassificationMetrics to get areaUnderROC. Any example for this? Thank you ! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: which mllib algorithm for large multi-class classification?
We have multinomial logistic regression implemented. For your case, the model size is 500 * 300,000 = 150,000,000. MLlib's implementation might not be able to handle it efficiently, we plan to have a more scalable implementation in 1.5. However, it shouldn't give you an array larger than MaxInt exception. Could you paste the stack trace? -Xiangrui On Mon, Jun 22, 2015 at 4:21 PM, Danny kont...@dannylinden.de wrote: hi, I am unfortunately not very fit in the whole MLlib stuff, so I would appreciate a little help: Which multi-class classification algorithm i should use if i want to train texts (100-1000 words each) into categories. The number of categories is between 100-500 and the number of training documents which i have transform to tf-idf vectors is max ~ 300.000 it looks like the most algorithms are running into OOM exception or array larger than MaxInt exceptions with a large number of classes/categories cause there are collect steps in it? thanks a lot -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/which-mllib-algorithm-for-large-multi-class-classification-tp23439.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Difference between Lasso regression in MLlib package and ML package
Please see the current version of code for better documentation. https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala Sincerely, DB Tsai -- Blog: https://www.dbtsai.com PGP Key ID: 0xAF08DF8D On Tue, Jun 23, 2015 at 3:58 PM, DB Tsai dbt...@dbtsai.com wrote: The regularization is handled after the objective function of data is computed. See https://github.com/apache/spark/blob/6a827d5d1ec520f129e42c3818fe7d0d870dcbef/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala line 348 for L2. For L1, it's handled by OWLQN, so you don't see it explicitly, but the code is in line 128. Sincerely, DB Tsai -- Blog: https://www.dbtsai.com PGP Key ID: 0xAF08DF8D On Tue, Jun 23, 2015 at 3:14 PM, Wei Zhou zhweisop...@gmail.com wrote: Hi DB Tsai, Thanks for your reply. I went through the source code of LinearRegression.scala. The algorithm minimizes square error L = 1/2n ||A weights - y||^2^. I cannot match this with the elasticNet loss function found here http://web.stanford.edu/~hastie/glmnet/glmnet_alpha.html, which is the sum of square error plus L1 and L2 penalty. I am able to follow the rest of the mathematical deviation in the code comment. I am hoping if you could point me to any references that can fill this knowledge gap. Best, Wei 2015-06-19 12:35 GMT-07:00 DB Tsai dbt...@dbtsai.com: Hi Wei, I don't think ML is meant for single node computation, and the algorithms in ML are designed for pipeline framework. In short, the lasso regression in ML is new algorithm implemented from scratch, and it's faster, and converged to the same solution as R's glmnet but with scalability. Here is the talk I gave in Spark summit about the new elastic-net feature in ML. I will encourage you to try the one ML. http://www.slideshare.net/dbtsai/2015-06-largescale-lasso-and-elasticnet-regularized-generalized-linear-models-at-spark-summit Sincerely, DB Tsai -- Blog: https://www.dbtsai.com PGP Key ID: 0xAF08DF8D On Fri, Jun 19, 2015 at 11:38 AM, Wei Zhou zhweisop...@gmail.com wrote: Hi Spark experts, I see lasso regression/ elastic net implementation under both MLLib and ML, does anyone know what is the difference between the two implementation? In spark summit, one of the keynote speakers mentioned that ML is meant for single node computation, could anyone elaborate this? Thanks. Wei - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Nested DataFrame(SchemaRDD)
I wrote a brief howto on building nested records in spark and storing them in parquet here: http://www.congiu.com/creating-nested-data-parquet-in-spark-sql/ 2015-06-23 16:12 GMT-07:00 Richard Catlin richard.m.cat...@gmail.com: How do I create a DataFrame(SchemaRDD) with a nested array of Rows in a column? Is there an example? Will this store as a nested parquet file? Thanks. Richard Catlin
Re: mutable vs. pure functional implementation - StatCounter
Creating millions of temporary (immutable) objects is bad for performance. It should be simple to do a micro-benchmark locally. -Xiangrui On Mon, Jun 22, 2015 at 7:25 PM, mzeltser mzelt...@gmail.com wrote: Using StatCounter as an example, I'd like to understand if pure functional implementation would be more or less beneficial for accumulating structures used inside RDD.map StatCounter.merge is updating mutable class variables and returning reference to same object. This is clearly a non-functional implementation and it mutates existing state of the instance. (Unless I'm missing something) Would it be preferable to have all the class variables declared as val and create new instance to hold merged values? The StatCounter would be used inside the RDD.map to collect stats on the fly. Would mutable state present bottleneck? Can anybody comment on why non-functional implementation has been chosen? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/mutable-vs-pure-functional-implementation-StatCounter-tp23441.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Difference between Lasso regression in MLlib package and ML package
Hi DB Tsai, Thanks for your reply. I went through the source code of LinearRegression.scala. The algorithm minimizes square error L = 1/2n ||A weights - y||^2^. I cannot match this with the elasticNet loss function found here http://web.stanford.edu/~hastie/glmnet/glmnet_alpha.html, which is the sum of square error plus L1 and L2 penalty. I am able to follow the rest of the mathematical deviation in the code comment. I am hoping if you could point me to any references that can fill this knowledge gap. Best, Wei 2015-06-19 12:35 GMT-07:00 DB Tsai dbt...@dbtsai.com: Hi Wei, I don't think ML is meant for single node computation, and the algorithms in ML are designed for pipeline framework. In short, the lasso regression in ML is new algorithm implemented from scratch, and it's faster, and converged to the same solution as R's glmnet but with scalability. Here is the talk I gave in Spark summit about the new elastic-net feature in ML. I will encourage you to try the one ML. http://www.slideshare.net/dbtsai/2015-06-largescale-lasso-and-elasticnet-regularized-generalized-linear-models-at-spark-summit Sincerely, DB Tsai -- Blog: https://www.dbtsai.com PGP Key ID: 0xAF08DF8D On Fri, Jun 19, 2015 at 11:38 AM, Wei Zhou zhweisop...@gmail.com wrote: Hi Spark experts, I see lasso regression/ elastic net implementation under both MLLib and ML, does anyone know what is the difference between the two implementation? In spark summit, one of the keynote speakers mentioned that ML is meant for single node computation, could anyone elaborate this? Thanks. Wei
Re: Kafka createDirectStream issue
yes, I have two clusters one standalone an another using Mesos Sebastian YEPES http://sebastian-yepes.com On Wed, Jun 24, 2015 at 12:37 AM, drarse [via Apache Spark User List] ml-node+s1001560n23457...@n3.nabble.com wrote: Hi syepes, Are u run the application in standalone mode? Regards El 23/06/2015 22:48, syepes [via Apache Spark User List] [hidden email] http:///user/SendEmail.jtp?type=nodenode=23457i=0 escribió: Hello, I am trying use the new Kafka consumer KafkaUtils.createDirectStream but I am having some issues making it work. I have tried different versions of Spark v1.4.0 and branch-1.4 #8d6e363 and I am still getting the same strange exception ClassNotFoundException: $line49.$read$$iwC$$i Has anyone else been facing this kind of problem? The following is the code and logs that I have been using to reproduce the issue: spark-shell: script -- sc.stop() import _root_.kafka.serializer.StringDecoder import org.apache.spark.SparkConf import org.apache.spark.streaming._ import org.apache.spark.streaming.kafka.KafkaUtils val sparkConf = new SparkConf().setMaster(spark://localhost:7077).setAppName(KCon).set(spark.ui.port, 4041 ).set(spark.driver.allowMultipleContexts, true).setJars(Array(/opt/spark-libs/spark-streaming-kafka-assembly_2.10-1.4.2-SNAPSHOT.jar)) val ssc = new StreamingContext(sparkConf, Seconds(5)) val kafkaParams = Map[String, String](bootstrap.servers - localhost:9092, schema.registry.url - http://localhost:8081;, zookeeper.connect - localhost:2181, group.id - KCon ) val topic = Set(test) val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topic) val raw = messages.map(_._2) val words = raw.flatMap(_.split( )) val wordCounts = words.map(x = (x, 1L)).reduceByKey(_ + _) wordCounts.print() ssc.start() ssc.awaitTermination() -- spark-shell: output -- sparkConf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@330e37b2 ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@28ec9c23 kafkaParams: scala.collection.immutable.Map[String,String] = Map(bootstrap.servers - localhost:9092, schema.registry.url - http://localhost:8081, zookeeper.connect - localhost:2181, group.id - OPC)topic: scala.collection.immutable.Set[String] = Set(test) WARN [main] kafka.utils.VerifiableProperties - Property schema.registry.url is not valid messages: org.apache.spark.streaming.dstream.InputDStream[(String, String)] = org.apache.spark.streaming.kafka.DirectKafkaInputDStream@1e71b70d raw: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.MappedDStream@578ce232 words: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.FlatMappedDStream@351cc4b5 wordCounts: org.apache.spark.streaming.dstream.DStream[(String, Long)] = org.apache.spark.streaming.dstream.ShuffledDStream@ae04104 WARN [JobGenerator] kafka.utils.VerifiableProperties - Property schema.registry.url is not valid WARN [task-result-getter-0] org.apache.spark.scheduler.TaskSetManager - Lost task 0.0 in stage 0.0 (TID 0, 10.3.30.87): java.lang.ClassNotFoundException: $line49.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1 at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:66) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) .. .. Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at
RE: Nested DataFrame(SchemaRDD)
How do I create a DataFrame(SchemaRDD) with a nested array of Rows in a column? Is there an example? Will this store as a nested parquet file? Thanks. Richard Catlin
Re: Difference between Lasso regression in MLlib package and ML package
Thanks DB Tsai, it is very helpful. Cheers, Wei 2015-06-23 16:00 GMT-07:00 DB Tsai dbt...@dbtsai.com: Please see the current version of code for better documentation. https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala Sincerely, DB Tsai -- Blog: https://www.dbtsai.com PGP Key ID: 0xAF08DF8D On Tue, Jun 23, 2015 at 3:58 PM, DB Tsai dbt...@dbtsai.com wrote: The regularization is handled after the objective function of data is computed. See https://github.com/apache/spark/blob/6a827d5d1ec520f129e42c3818fe7d0d870dcbef/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala line 348 for L2. For L1, it's handled by OWLQN, so you don't see it explicitly, but the code is in line 128. Sincerely, DB Tsai -- Blog: https://www.dbtsai.com PGP Key ID: 0xAF08DF8D On Tue, Jun 23, 2015 at 3:14 PM, Wei Zhou zhweisop...@gmail.com wrote: Hi DB Tsai, Thanks for your reply. I went through the source code of LinearRegression.scala. The algorithm minimizes square error L = 1/2n ||A weights - y||^2^. I cannot match this with the elasticNet loss function found here http://web.stanford.edu/~hastie/glmnet/glmnet_alpha.html, which is the sum of square error plus L1 and L2 penalty. I am able to follow the rest of the mathematical deviation in the code comment. I am hoping if you could point me to any references that can fill this knowledge gap. Best, Wei 2015-06-19 12:35 GMT-07:00 DB Tsai dbt...@dbtsai.com: Hi Wei, I don't think ML is meant for single node computation, and the algorithms in ML are designed for pipeline framework. In short, the lasso regression in ML is new algorithm implemented from scratch, and it's faster, and converged to the same solution as R's glmnet but with scalability. Here is the talk I gave in Spark summit about the new elastic-net feature in ML. I will encourage you to try the one ML. http://www.slideshare.net/dbtsai/2015-06-largescale-lasso-and-elasticnet-regularized-generalized-linear-models-at-spark-summit Sincerely, DB Tsai -- Blog: https://www.dbtsai.com PGP Key ID: 0xAF08DF8D On Fri, Jun 19, 2015 at 11:38 AM, Wei Zhou zhweisop...@gmail.com wrote: Hi Spark experts, I see lasso regression/ elastic net implementation under both MLLib and ML, does anyone know what is the difference between the two implementation? In spark summit, one of the keynote speakers mentioned that ML is meant for single node computation, could anyone elaborate this? Thanks. Wei
[no subject]
I have a Spark job that has 7 stages. The first 3 stage complete and the fourth stage beings (joins two RDDs). This stage has multiple task failures all the below exception. Multiple tasks (100s) of them get the same exception with different hosts. How can all the host suddenly stop responding when few moments ago 3 stages ran successfully. If I re-run the three stages will again run successfully. I cannot think of it being a cluster issue. Any suggestions ? Spark Version : 1.3.1 Exception: org.apache.spark.shuffle.FetchFailedException: Failed to connect to HOST at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125) at org.apache.sp -- Deepak
Re: Kafka createDirectStream issue
Hi syepes, Are u run the application in standalone mode? Regards El 23/06/2015 22:48, syepes [via Apache Spark User List] ml-node+s1001560n23456...@n3.nabble.com escribió: Hello, I am trying use the new Kafka consumer KafkaUtils.createDirectStream but I am having some issues making it work. I have tried different versions of Spark v1.4.0 and branch-1.4 #8d6e363 and I am still getting the same strange exception ClassNotFoundException: $line49.$read$$iwC$$i Has anyone else been facing this kind of problem? The following is the code and logs that I have been using to reproduce the issue: spark-shell: script -- sc.stop() import _root_.kafka.serializer.StringDecoder import org.apache.spark.SparkConf import org.apache.spark.streaming._ import org.apache.spark.streaming.kafka.KafkaUtils val sparkConf = new SparkConf().setMaster(spark://localhost:7077).setAppName(KCon).set(spark.ui.port, 4041 ).set(spark.driver.allowMultipleContexts, true).setJars(Array(/opt/spark-libs/spark-streaming-kafka-assembly_2.10-1.4.2-SNAPSHOT.jar)) val ssc = new StreamingContext(sparkConf, Seconds(5)) val kafkaParams = Map[String, String](bootstrap.servers - localhost:9092, schema.registry.url - http://localhost:8081;, zookeeper.connect - localhost:2181, group.id - KCon ) val topic = Set(test) val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topic) val raw = messages.map(_._2) val words = raw.flatMap(_.split( )) val wordCounts = words.map(x = (x, 1L)).reduceByKey(_ + _) wordCounts.print() ssc.start() ssc.awaitTermination() -- spark-shell: output -- sparkConf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@330e37b2 ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@28ec9c23 kafkaParams: scala.collection.immutable.Map[String,String] = Map(bootstrap.servers - localhost:9092, schema.registry.url - http://localhost:8081, zookeeper.connect - localhost:2181, group.id - OPC)topic: scala.collection.immutable.Set[String] = Set(test) WARN [main] kafka.utils.VerifiableProperties - Property schema.registry.url is not valid messages: org.apache.spark.streaming.dstream.InputDStream[(String, String)] = org.apache.spark.streaming.kafka.DirectKafkaInputDStream@1e71b70d raw: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.MappedDStream@578ce232 words: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.FlatMappedDStream@351cc4b5 wordCounts: org.apache.spark.streaming.dstream.DStream[(String, Long)] = org.apache.spark.streaming.dstream.ShuffledDStream@ae04104 WARN [JobGenerator] kafka.utils.VerifiableProperties - Property schema.registry.url is not valid WARN [task-result-getter-0] org.apache.spark.scheduler.TaskSetManager - Lost task 0.0 in stage 0.0 (TID 0, 10.3.30.87): java.lang.ClassNotFoundException: $line49.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1 at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:66) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) .. .. Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256) -- Best regards and thanks in advance for any help. -- If you reply to
Re: Difference between Lasso regression in MLlib package and ML package
The regularization is handled after the objective function of data is computed. See https://github.com/apache/spark/blob/6a827d5d1ec520f129e42c3818fe7d0d870dcbef/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala line 348 for L2. For L1, it's handled by OWLQN, so you don't see it explicitly, but the code is in line 128. Sincerely, DB Tsai -- Blog: https://www.dbtsai.com PGP Key ID: 0xAF08DF8D On Tue, Jun 23, 2015 at 3:14 PM, Wei Zhou zhweisop...@gmail.com wrote: Hi DB Tsai, Thanks for your reply. I went through the source code of LinearRegression.scala. The algorithm minimizes square error L = 1/2n ||A weights - y||^2^. I cannot match this with the elasticNet loss function found here http://web.stanford.edu/~hastie/glmnet/glmnet_alpha.html, which is the sum of square error plus L1 and L2 penalty. I am able to follow the rest of the mathematical deviation in the code comment. I am hoping if you could point me to any references that can fill this knowledge gap. Best, Wei 2015-06-19 12:35 GMT-07:00 DB Tsai dbt...@dbtsai.com: Hi Wei, I don't think ML is meant for single node computation, and the algorithms in ML are designed for pipeline framework. In short, the lasso regression in ML is new algorithm implemented from scratch, and it's faster, and converged to the same solution as R's glmnet but with scalability. Here is the talk I gave in Spark summit about the new elastic-net feature in ML. I will encourage you to try the one ML. http://www.slideshare.net/dbtsai/2015-06-largescale-lasso-and-elasticnet-regularized-generalized-linear-models-at-spark-summit Sincerely, DB Tsai -- Blog: https://www.dbtsai.com PGP Key ID: 0xAF08DF8D On Fri, Jun 19, 2015 at 11:38 AM, Wei Zhou zhweisop...@gmail.com wrote: Hi Spark experts, I see lasso regression/ elastic net implementation under both MLLib and ML, does anyone know what is the difference between the two implementation? In spark summit, one of the keynote speakers mentioned that ML is meant for single node computation, could anyone elaborate this? Thanks. Wei - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
map V mapPartitions
I know when to use a map () but when should i use mapPartitions() ? Which is faster ? -- Deepak