Re: spark streaming 1.3 doubts(force it to not consume anything)
Is there a reason not to just use scala? It's not a lot of code... and it'll be even less code in scala ;) On Wed, Aug 19, 2015 at 4:14 AM, Shushant Arora shushantaror...@gmail.com wrote: To correct myself - KafkaRDD[K, V, U, T, R] is subclass of RDD[R] but OptionKafkaRDD[K, V, U, T, R] is not subclass of OptionRDD[R]; In scala C[T’] is a subclass of C[T] as per https://twitter.github.io/scala_school/type-basics.html but this is not allowed in java. So is there any workaround to achieve this in java for overriding DirectKafkaInputDStream ? On Wed, Aug 19, 2015 at 12:45 AM, Shushant Arora shushantaror...@gmail.com wrote: But KafkaRDD[K, V, U, T, R] is not subclass of RDD[R] as per java generic inheritance is not supported so derived class cannot return different genric typed subclass from overriden method. On Wed, Aug 19, 2015 at 12:02 AM, Cody Koeninger c...@koeninger.org wrote: Option is covariant and KafkaRDD is a subclass of RDD On Tue, Aug 18, 2015 at 1:12 PM, Shushant Arora shushantaror...@gmail.com wrote: Is it that in scala its allowed for derived class to have any return type ? And streaming jar is originally created in scala so its allowed for DirectKafkaInputDStream to return Option[KafkaRDD[K, V, U, T, R]] compute method ? On Tue, Aug 18, 2015 at 8:36 PM, Shushant Arora shushantaror...@gmail.com wrote: looking at source code of org.apache.spark.streaming.kafka.DirectKafkaInputDStream override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] = { val untilOffsets = clamp(latestLeaderOffsets(maxRetries)) val rdd = KafkaRDD[K, V, U, T, R]( context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler) currentOffsets = untilOffsets.map(kv = kv._1 - kv._2.offset) Some(rdd) } But in DStream its def compute (validTime: Time): Option[RDD[T]] , So what should be the return type of custom DStream extends DirectKafkaInputDStream . Since I want the behaviour to be same as of DirectKafkaInputDStream in normal scenarios and return none in specific scenario. And why the same error did not come while extending DirectKafkaInputDStream from InputDStream ? Since new return type Option[KafkaRDD[K, V, U, T, R]] is not subclass of Option[RDD[T] so it should have been failed? On Tue, Aug 18, 2015 at 7:28 PM, Cody Koeninger c...@koeninger.org wrote: The superclass method in DStream is defined as returning an Option[RDD[T]] On Tue, Aug 18, 2015 at 8:07 AM, Shushant Arora shushantaror...@gmail.com wrote: Getting compilation error while overriding compute method of DirectKafkaInputDStream. [ERROR] CustomDirectKafkaInputDstream.java:[51,83] compute(org.apache.spark.streaming.Time) in CustomDirectKafkaInputDstream cannot override compute(org.apache.spark.streaming.Time) in org.apache.spark.streaming.dstream.DStream; attempting to use incompatible return type [ERROR] found : scala.Optionorg.apache.spark.streaming.kafka.KafkaRDDbyte[],byte[],kafka.serializer.DefaultDecoder,kafka.serializer.DefaultDecoder,byte[][] [ERROR] required: scala.Optionorg.apache.spark.rdd.RDDbyte[][] class : public class CustomDirectKafkaInputDstream extends DirectKafkaInputDStreambyte[], byte[], kafka.serializer.DefaultDecoder, kafka.serializer.DefaultDecoder, byte[][]{ @Override public OptionKafkaRDDbyte[], byte[], DefaultDecoder, DefaultDecoder, byte[][] compute( Time validTime) { int processed=processedCounter.value(); int failed = failedProcessingsCounter.value(); if((processed==failed)){ System.out.println(backing off since its 100 % failure); return Option.empty(); }else{ System.out.println(starting the stream ); return super.compute(validTime); } } } What should be the return type of compute method ? super class is returning OptionKafkaRDDbyte[], byte[], DefaultDecoder, DefaultDecoder, byte[][] but its expecting scala.Optionorg.apache.spark.rdd.RDDbyte[][] from derived class . Is there something wring with code? On Mon, Aug 17, 2015 at 7:08 PM, Cody Koeninger c...@koeninger.org wrote: Look at the definitions of the java-specific KafkaUtils.createDirectStream methods (the ones that take a JavaStreamingContext) On Mon, Aug 17, 2015 at 5:13 AM, Shushant Arora shushantaror...@gmail.com wrote: How to create classtag in java ?Also Constructor of DirectKafkaInputDStream takes Function1 not Function but kafkautils.createDirectStream allows function. I have below as overriden DirectKafkaInputDStream. public class CustomDirectKafkaInputDstream extends DirectKafkaInputDStreambyte[], byte[], kafka.serializer.DefaultDecoder, kafka.serializer.DefaultDecoder, byte[][]{ public CustomDirectKafkaInputDstream( StreamingContext ssc_, MapString, String kafkaParams, MapTopicAndPartition, Object fromOffsets, Function1MessageAndMetadatabyte[], byte[], byte[][] messageHandler, ClassTagbyte[] evidence$1, ClassTagbyte[] evidence$2,
Re: What's the best practice for developing new features for spark ?
I personally build with SBT and run Spark on YARN with IntelliJ. You need to connect to remote JVMs with a remote debugger. You also need to do similar, if you use Python, because it will launch a JVM on the driver aswell. On Wed, Aug 19, 2015 at 2:10 PM canan chen ccn...@gmail.com wrote: Thanks Ted. I notice another thread about running spark programmatically (client mode for standalone and yarn). Would it be much easier to debug spark if is is possible ? Hasn't anyone thought about it ? On Wed, Aug 19, 2015 at 5:50 PM, Ted Yu yuzhih...@gmail.com wrote: See this thread: http://search-hadoop.com/m/q3RTtdZv0d1btRHl/Spark+build+modulesubj=Building+Spark+Building+just+one+module+ On Aug 19, 2015, at 1:44 AM, canan chen ccn...@gmail.com wrote: I want to work on one jira, but it is not easy to do unit test, because it involves different components especially UI. spark building is pretty slow, I don't want to build it each time to test my code change. I am wondering how other people do ? Is there any experience can share ? Thanks
RE: Scala: How to match a java object????
Hi, thank you all for the asssistance. It is odd, it works when creating a new java.mathBigDecimal object, but not if I work directly with scala 5 match { case x: java.math.BigDecimal = 2 } console:23: error: scrutinee is incompatible with pattern type; found : java.math.BigDecimal required: Int 5 match { case x: java.math.BigDecimal = 2 } I will try and see how it works for my Seq[Any] and see. Thanks for the work arounds. Saif From: Sujit Pal [mailto:sujitatgt...@gmail.com] Sent: Tuesday, August 18, 2015 6:25 PM To: Ellafi, Saif A. Cc: wrbri...@gmail.com; user Subject: Re: Scala: How to match a java object Hi Saif, Would this work? import scala.collection.JavaConversions._ new java.math.BigDecimal(5) match { case x: java.math.BigDecimal = x.doubleValue } It gives me on the scala console. res9: Double = 5.0 Assuming you had a stream of BigDecimals, you could just call map on it. myBigDecimals.map(_.doubleValue) to get your Seq of Doubles. You will need the JavaConversions._ import to allow Java Doubles to be treated by Scala as Scala Doubles. -sujit On Tue, Aug 18, 2015 at 12:59 PM, saif.a.ell...@wellsfargo.commailto:saif.a.ell...@wellsfargo.com wrote: Hi, thank you for further assistance you can reproduce this by simply running 5 match { case java.math.BigDecimal = 2 } In my personal case, I am applying a map acton to a Seq[Any], so the elements inside are of type any, to which I need to apply a proper .asInstanceOf[WhoYouShouldBe]. Saif From: William Briggs [mailto:wrbri...@gmail.commailto:wrbri...@gmail.com] Sent: Tuesday, August 18, 2015 4:46 PM To: Ellafi, Saif A.; user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Scala: How to match a java object Could you share your pattern matching expression that is failing? On Tue, Aug 18, 2015, 3:38 PM saif.a.ell...@wellsfargo.commailto:saif.a.ell...@wellsfargo.com wrote: Hi all, I am trying to run a spark job, in which I receive java.math.BigDecimal objects, instead of the scala equivalents, and I am trying to convert them into Doubles. If I try to match-case this object class, I get: “error: object java.math.BigDecimal is not a value” How could I get around matching java objects? I would like to avoid a multiple try-catch on ClassCastExceptions for all my checks. Thank you, Saif
RE: Scala: How to match a java object????
It is okay. this methodology works very well for mapping objects of my Seq[Any]. It is indeed very cool :-) Saif From: Ted Yu [mailto:yuzhih...@gmail.com] Sent: Wednesday, August 19, 2015 10:47 AM To: Ellafi, Saif A. Cc: sujitatgt...@gmail.com; wrbri...@gmail.com; user@spark.apache.org Subject: Re: Scala: How to match a java object Saif: In your example below, the error was due to there is no automatic conversion from Int to BigDecimal. Cheers On Aug 19, 2015, at 6:40 AM, saif.a.ell...@wellsfargo.commailto:saif.a.ell...@wellsfargo.com saif.a.ell...@wellsfargo.commailto:saif.a.ell...@wellsfargo.com wrote: Hi, thank you all for the asssistance. It is odd, it works when creating a new java.mathBigDecimal object, but not if I work directly with scala 5 match { case x: java.math.BigDecimal = 2 } console:23: error: scrutinee is incompatible with pattern type; found : java.math.BigDecimal required: Int 5 match { case x: java.math.BigDecimal = 2 } I will try and see how it works for my Seq[Any] and see. Thanks for the work arounds. Saif From: Sujit Pal [mailto:sujitatgt...@gmail.com] Sent: Tuesday, August 18, 2015 6:25 PM To: Ellafi, Saif A. Cc: wrbri...@gmail.commailto:wrbri...@gmail.com; user Subject: Re: Scala: How to match a java object Hi Saif, Would this work? import scala.collection.JavaConversions._ new java.math.BigDecimal(5) match { case x: java.math.BigDecimal = x.doubleValue } It gives me on the scala console. res9: Double = 5.0 Assuming you had a stream of BigDecimals, you could just call map on it. myBigDecimals.map(_.doubleValue) to get your Seq of Doubles. You will need the JavaConversions._ import to allow Java Doubles to be treated by Scala as Scala Doubles. -sujit On Tue, Aug 18, 2015 at 12:59 PM, saif.a.ell...@wellsfargo.commailto:saif.a.ell...@wellsfargo.com wrote: Hi, thank you for further assistance you can reproduce this by simply running 5 match { case java.math.BigDecimal = 2 } In my personal case, I am applying a map acton to a Seq[Any], so the elements inside are of type any, to which I need to apply a proper .asInstanceOf[WhoYouShouldBe]. Saif From: William Briggs [mailto:wrbri...@gmail.commailto:wrbri...@gmail.com] Sent: Tuesday, August 18, 2015 4:46 PM To: Ellafi, Saif A.; user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Scala: How to match a java object Could you share your pattern matching expression that is failing? On Tue, Aug 18, 2015, 3:38 PM saif.a.ell...@wellsfargo.commailto:saif.a.ell...@wellsfargo.com wrote: Hi all, I am trying to run a spark job, in which I receive java.math.BigDecimal objects, instead of the scala equivalents, and I am trying to convert them into Doubles. If I try to match-case this object class, I get: “error: object java.math.BigDecimal is not a value” How could I get around matching java objects? I would like to avoid a multiple try-catch on ClassCastExceptions for all my checks. Thank you, Saif
Re: What is the reason for ExecutorLostFailure?
Hints are good. Thanks Corey. I will try to find out more from the logs. On Aug 18, 2015, at 7:23 PM, Corey Nolet cjno...@gmail.com wrote: Usually more information as to the cause of this will be found down in your logs. I generally see this happen when an out of memory exception has occurred for one reason or another on an executor. It's possible your memory settings are too small per executor or the concurrent number of tasks you are running are too large for some of the executors. Other times, it's possible using RDD functions like groupBy() that collect an unbounded amount of items into memory could be causing it. Either way, the logs for the executors should be able to give you some insight, have you looked at those yet? On Tue, Aug 18, 2015 at 6:26 PM, VIJAYAKUMAR JAWAHARLAL sparkh...@data2o.io mailto:sparkh...@data2o.io wrote: Hi All Why am I getting ExecutorLostFailure and executors are completely lost for rest of the processing? Eventually it makes job to fail. One thing for sure that lot of shuffling happens across executors in my program. Is there a way to understand and debug ExecutorLostFailure? Any pointers regarding “ExecutorLostFailure” would help me a lot. Thanks Vijay
Re: What's the best practice for developing new features for spark ?
Thanks Ted. I notice another thread about running spark programmatically (client mode for standalone and yarn). Would it be much easier to debug spark if is is possible ? Hasn't anyone thought about it ? On Wed, Aug 19, 2015 at 5:50 PM, Ted Yu yuzhih...@gmail.com wrote: See this thread: http://search-hadoop.com/m/q3RTtdZv0d1btRHl/Spark+build+modulesubj=Building+Spark+Building+just+one+module+ On Aug 19, 2015, at 1:44 AM, canan chen ccn...@gmail.com wrote: I want to work on one jira, but it is not easy to do unit test, because it involves different components especially UI. spark building is pretty slow, I don't want to build it each time to test my code change. I am wondering how other people do ? Is there any experience can share ? Thanks
Re: Scala: How to match a java object????
Saif: In your example below, the error was due to there is no automatic conversion from Int to BigDecimal. Cheers On Aug 19, 2015, at 6:40 AM, saif.a.ell...@wellsfargo.com saif.a.ell...@wellsfargo.com wrote: Hi, thank you all for the asssistance. It is odd, it works when creating a new java.mathBigDecimal object, but not if I work directly with scala 5 match { case x: java.math.BigDecimal = 2 } console:23: error: scrutinee is incompatible with pattern type; found : java.math.BigDecimal required: Int 5 match { case x: java.math.BigDecimal = 2 } I will try and see how it works for my Seq[Any] and see. Thanks for the work arounds. Saif From: Sujit Pal [mailto:sujitatgt...@gmail.com] Sent: Tuesday, August 18, 2015 6:25 PM To: Ellafi, Saif A. Cc: wrbri...@gmail.com; user Subject: Re: Scala: How to match a java object Hi Saif, Would this work? import scala.collection.JavaConversions._ new java.math.BigDecimal(5) match { case x: java.math.BigDecimal = x.doubleValue } It gives me on the scala console. res9: Double = 5.0 Assuming you had a stream of BigDecimals, you could just call map on it. myBigDecimals.map(_.doubleValue) to get your Seq of Doubles. You will need the JavaConversions._ import to allow Java Doubles to be treated by Scala as Scala Doubles. -sujit On Tue, Aug 18, 2015 at 12:59 PM, saif.a.ell...@wellsfargo.com wrote: Hi, thank you for further assistance you can reproduce this by simply running 5 match { case java.math.BigDecimal = 2 } In my personal case, I am applying a map acton to a Seq[Any], so the elements inside are of type any, to which I need to apply a proper .asInstanceOf[WhoYouShouldBe]. Saif From: William Briggs [mailto:wrbri...@gmail.com] Sent: Tuesday, August 18, 2015 4:46 PM To: Ellafi, Saif A.; user@spark.apache.org Subject: Re: Scala: How to match a java object Could you share your pattern matching expression that is failing? On Tue, Aug 18, 2015, 3:38 PM saif.a.ell...@wellsfargo.com wrote: Hi all, I am trying to run a spark job, in which I receive java.math.BigDecimal objects, instead of the scala equivalents, and I am trying to convert them into Doubles. If I try to match-case this object class, I get: “error: object java.math.BigDecimal is not a value” How could I get around matching java objects? I would like to avoid a multiple try-catch on ClassCastExceptions for all my checks. Thank you, Saif
Re: Why use spark.history.fs.logDirectory instead of spark.eventLog.dir
Anyone know about this ? Or do I miss something here ? On Fri, Aug 7, 2015 at 4:20 PM, canan chen ccn...@gmail.com wrote: Is there any reason that historyserver use another property for the event log dir ? Thanks
Re: Difference between Sort based and Hash based shuffle
Thanks Andrew for a detailed response, So the reason why key value pairs with same keys are always found in a single buckets in Hash based shuffle but not in Sort is because in sort-shuffle each mapper writes a single partitioned file, and it is up to the reducer to fetch correct partitions from the the files ? On Wed, Aug 19, 2015 at 2:13 AM, Andrew Or and...@databricks.com wrote: Hi Muhammad, On a high level, in hash-based shuffle each mapper M writes R shuffle files, one for each reducer where R is the number of reduce partitions. This results in M * R shuffle files. Since it is not uncommon for M and R to be O(1000), this quickly becomes expensive. An optimization with hash-based shuffle is consolidation, where all mappers run in the same core C write one file per reducer, resulting in C * R files. This is a strict improvement, but it is still relatively expensive. Instead, in sort-based shuffle each mapper writes a single partitioned file. This allows a particular reducer to request a specific portion of each mapper's single output file. In more detail, the mapper first fills up an internal buffer in memory and continually spills the contents of the buffer to disk, then finally merges all the spilled files together to form one final output file. This places much less stress on the file system and requires much fewer I/O operations especially on the read side. -Andrew 2015-08-16 11:08 GMT-07:00 Muhammad Haseeb Javed 11besemja...@seecs.edu.pk: I did check it out and although I did get a general understanding of the various classes used to implement Sort and Hash shuffles, however these slides lack details as to how they are implemented and why sort generally has better performance than hash On Sun, Aug 16, 2015 at 4:31 AM, Ravi Kiran ravikiranmag...@gmail.com wrote: Have a look at this presentation. http://www.slideshare.net/colorant/spark-shuffle-introduction . Can be of help to you. On Sat, Aug 15, 2015 at 1:42 PM, Muhammad Haseeb Javed 11besemja...@seecs.edu.pk wrote: What are the major differences between how Sort based and Hash based shuffle operate and what is it that cause Sort Shuffle to perform better than Hash? Any talks that discuss both shuffles in detail, how they are implemented and the performance gains ?
Re: Strange shuffle behaviour difference between Zeppelin and Spark-shell
any differences in number of cores, memory settings for executors? On 19 August 2015 at 09:49, Rick Moritz rah...@gmail.com wrote: Dear list, I am observing a very strange difference in behaviour between a Spark 1.4.0-rc4 REPL (locally compiled with Java 7) and a Spark 1.4.0 zeppelin interpreter (compiled with Java 6 and sourced from maven central). The workflow loads data from Hive, applies a number of transformations (including quite a lot of shuffle operations) and then presents an enriched dataset. The code (an resulting DAGs) are identical in each case. The following particularities are noted: Importing the HiveRDD and caching it yields identical results on both platforms. Applying case classes, leads to a 2-2.5MB increase in dataset size per partition (excepting empty partitions). Writing shuffles shows this much more significant result: Zeppelin: *Total Time Across All Tasks: * 2,6 min *Input Size / Records: * 2.4 GB / 7314771 *Shuffle Write: * 673.5 MB / 7314771 vs Spark-shell: *Total Time Across All Tasks: * 28 min *Input Size / Records: * 3.6 GB / 7314771 *Shuffle Write: * 9.0 GB / 7314771 This is one of the early stages, which reads from a cached partition and then feeds into a join-stage. The latter stages show similar behaviour in producing excessive shuffle spills. Quite often the excessive shuffle volume will lead to massive shuffle spills which ultimately kill not only performance, but the actual executors as well. I have examined the Environment tab in the SParkUI and identified no notable difference besides FAIR (Zeppelin) vs FIFO (spark-shell) scheduling mode. I fail to see how this would impact shuffle writes in such a drastic way, since it should be on the inter-job level, while this happens at the inter-stage level. I was somewhat supicious of maybe compression or serialization playing a role, but the SparkConf points to those being set to the default. Also Zeppelin's interpreter adds no relevant additional default parameters. I performed a diff between rc4 (which was later released) and 1.4.0 and as expected there were no differences, besides a single class (remarkably, a shuffle-relevant class: /org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.class ) differing in its binary representation due to being compiled with Java 7 instead of Java 6. The decompiled sources of those two are again identical. I may attempt as a next step to simply replace that file in the packaged jar, to ascertain that indeed there is no difference between the two versions, but would consider this to be a major bg, if a simple compiler change leads to this kind of issue. I a also open for any other ideas, in particular to verify that the same compression/serialization is indeed happening, and regarding ways to determin what exactly is written into these shuffles -- currently I only know that the tuples are bigger (or smaller) than they ought to be. The Zeppelin-obtained results do appear to be consistent at least, thus the suspicion is, that there is an issue with the process launched from spark-shell. I will also attempt to build a spark job and spark-submit it using different spark-binaries to further explore the issue. Best Regards, Rick Moritz PS: I already tried to send this mail yesterday, but it never made it onto the list, as far as I can tell -- I apologize should anyone receive this as a second copy.
Re: Spark return key value pair
I am not 100% sure but probably flatMap unwinds the tuples. Try with map instead. 2015-08-19 13:10 GMT+02:00 Jerry OELoo oylje...@gmail.com: Hi. I want to parse a file and return a key-value pair with pySpark, but result is strange to me. the test.sql is a big fie and each line is usename and password, with # between them, I use below mapper2 to map data, and in my understanding, i in words.take(10) should be a tuple, but the result is that i is username or password, this is strange for me to understand, Thanks for you help. def mapper2(line): words = line.split('#') return (words[0].strip(), words[1].strip()) def main2(sc): lines = sc.textFile(hdfs://master:9000/spark/test.sql) words = lines.flatMap(mapper2) for i in words.take(10): msg = i + : + \n -- Rejoice,I Desire! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to automatically relaunch a Driver program after crashes?
When submitting to YARN, you can specify two different operation modes for the driver with the --master parameter: yarn-client or yarn-cluster. For more information on submitting to YARN, see this page in the Spark docs: http://spark.apache.org/docs/latest/running-on-yarn.html yarn-cluster mode will run the driver inside of the Application Master, which will be retried on failure. The number of retries is dependent on the yarn.resourcemanager.am.max-attempts configuration setting for the YARN ResourceManager. Regards, Will On Wed, Aug 19, 2015 at 2:55 AM, Spark Enthusiast sparkenthusi...@yahoo.in wrote: Folks, As I see, the Driver program is a single point of failure. Now, I have seen ways as to how to make it recover from failures on a restart (using Checkpointing) but I have not seen anything as to how to restart it automatically if it crashes. Will running the Driver as a Hadoop Yarn Application do it? Can someone educate me as to how?
Re: Strange shuffle behaviour difference between Zeppelin and Spark-shell
i would compare spark ui metrics for both cases and see any differences(number of partitions, number of spills etc) why can't you make repl to be consistent with zepellin spark version? might be rc has issues... On 19 August 2015 at 14:42, Rick Moritz rah...@gmail.com wrote: No, the setup is one driver with 32g of memory, and three executors each with 8g of memory in both cases. No core-number has been specified, thus it should default to single-core (though I've seen the yarn-owned jvms wrapping the executors take up to 3 cores in top). That is, unless, as I suggested, there are different defaults for the two means of job submission that come into play in a non-transparent fashion (i.e. not visible in SparkConf). On Wed, Aug 19, 2015 at 1:36 PM, Igor Berman igor.ber...@gmail.com wrote: any differences in number of cores, memory settings for executors? On 19 August 2015 at 09:49, Rick Moritz rah...@gmail.com wrote: Dear list, I am observing a very strange difference in behaviour between a Spark 1.4.0-rc4 REPL (locally compiled with Java 7) and a Spark 1.4.0 zeppelin interpreter (compiled with Java 6 and sourced from maven central). The workflow loads data from Hive, applies a number of transformations (including quite a lot of shuffle operations) and then presents an enriched dataset. The code (an resulting DAGs) are identical in each case. The following particularities are noted: Importing the HiveRDD and caching it yields identical results on both platforms. Applying case classes, leads to a 2-2.5MB increase in dataset size per partition (excepting empty partitions). Writing shuffles shows this much more significant result: Zeppelin: *Total Time Across All Tasks: * 2,6 min *Input Size / Records: * 2.4 GB / 7314771 *Shuffle Write: * 673.5 MB / 7314771 vs Spark-shell: *Total Time Across All Tasks: * 28 min *Input Size / Records: * 3.6 GB / 7314771 *Shuffle Write: * 9.0 GB / 7314771 This is one of the early stages, which reads from a cached partition and then feeds into a join-stage. The latter stages show similar behaviour in producing excessive shuffle spills. Quite often the excessive shuffle volume will lead to massive shuffle spills which ultimately kill not only performance, but the actual executors as well. I have examined the Environment tab in the SParkUI and identified no notable difference besides FAIR (Zeppelin) vs FIFO (spark-shell) scheduling mode. I fail to see how this would impact shuffle writes in such a drastic way, since it should be on the inter-job level, while this happens at the inter-stage level. I was somewhat supicious of maybe compression or serialization playing a role, but the SparkConf points to those being set to the default. Also Zeppelin's interpreter adds no relevant additional default parameters. I performed a diff between rc4 (which was later released) and 1.4.0 and as expected there were no differences, besides a single class (remarkably, a shuffle-relevant class: /org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.class ) differing in its binary representation due to being compiled with Java 7 instead of Java 6. The decompiled sources of those two are again identical. I may attempt as a next step to simply replace that file in the packaged jar, to ascertain that indeed there is no difference between the two versions, but would consider this to be a major bg, if a simple compiler change leads to this kind of issue. I a also open for any other ideas, in particular to verify that the same compression/serialization is indeed happening, and regarding ways to determin what exactly is written into these shuffles -- currently I only know that the tuples are bigger (or smaller) than they ought to be. The Zeppelin-obtained results do appear to be consistent at least, thus the suspicion is, that there is an issue with the process launched from spark-shell. I will also attempt to build a spark job and spark-submit it using different spark-binaries to further explore the issue. Best Regards, Rick Moritz PS: I already tried to send this mail yesterday, but it never made it onto the list, as far as I can tell -- I apologize should anyone receive this as a second copy.
Fwd: Strange shuffle behaviour difference between Zeppelin and Spark-shell
oops, forgot to reply-all on this thread. -- Forwarded message -- From: Rick Moritz rah...@gmail.com Date: Wed, Aug 19, 2015 at 2:46 PM Subject: Re: Strange shuffle behaviour difference between Zeppelin and Spark-shell To: Igor Berman igor.ber...@gmail.com Those values are not explicitely set, and attempting to read their values results in 'java.util.NoSuchElementException: spark.shuffle.spill.compress'. What I mean by the volume per element being larger is illustrated in my original post: for each case the number of elements is identical, but the volume of data required to obtain/manage these elements is many times greater. The only difference used to be that Zeppelin had FAIR scheduling over FIFO scheduling for spark-shell. I just verified that spark-shell with FAIR scheduling makes no difference. The only other difference in the environment lies in some class-path variables which should only affect method availability, not actual usage. Another fact to note: Spark assembly (1.4.0-rc4) was built with provided hadoop dependencies (build/mvn -Pyarn -Phadoop-2.6 -Dhadoop.version=2.6.0 -Phadoop-provided -Phive -Phive-thriftserver -Psparkr -DskipTests clean package) for 2.6.0 from Hortonworks, while Zeppelin was built with dependencies against 2.6.0 from Maven central. On Wed, Aug 19, 2015 at 2:08 PM, Igor Berman igor.ber...@gmail.com wrote: so what your case for version differences? what do u mean by in spark-shell the volume per element is much larger can you verify that configuration in spark ui (under Environment tab is same). if you suspect compression than check following properties: spark.shuffle.compress spark.shuffle.spill.compress spark.io.compression.codec spark.rdd.compress On 19 August 2015 at 15:03, Rick Moritz rah...@gmail.com wrote: Number of partitions and even size look relatively similar - except in spark-shell the volume per element is much larger, especially in later stages. That's when shuffles start to spill. Zeppelin creates almost no spills at all. The number of elements per partition are the same for both setups, but with very different data volume in/out. Almost as though compression was used in one case, and not in another, or as though shuffling is somehow less specific, and more nodes get data that they ultimately don't process at all. The same shuffling algorithm appears to be at work in each case, if the partitioning of the number of elements is anything to go by. On Wed, Aug 19, 2015 at 1:58 PM, Igor Berman igor.ber...@gmail.com wrote: i would compare spark ui metrics for both cases and see any differences(number of partitions, number of spills etc) why can't you make repl to be consistent with zepellin spark version? might be rc has issues... On 19 August 2015 at 14:42, Rick Moritz rah...@gmail.com wrote: No, the setup is one driver with 32g of memory, and three executors each with 8g of memory in both cases. No core-number has been specified, thus it should default to single-core (though I've seen the yarn-owned jvms wrapping the executors take up to 3 cores in top). That is, unless, as I suggested, there are different defaults for the two means of job submission that come into play in a non-transparent fashion (i.e. not visible in SparkConf). On Wed, Aug 19, 2015 at 1:36 PM, Igor Berman igor.ber...@gmail.com wrote: any differences in number of cores, memory settings for executors? On 19 August 2015 at 09:49, Rick Moritz rah...@gmail.com wrote: Dear list, I am observing a very strange difference in behaviour between a Spark 1.4.0-rc4 REPL (locally compiled with Java 7) and a Spark 1.4.0 zeppelin interpreter (compiled with Java 6 and sourced from maven central). The workflow loads data from Hive, applies a number of transformations (including quite a lot of shuffle operations) and then presents an enriched dataset. The code (an resulting DAGs) are identical in each case. The following particularities are noted: Importing the HiveRDD and caching it yields identical results on both platforms. Applying case classes, leads to a 2-2.5MB increase in dataset size per partition (excepting empty partitions). Writing shuffles shows this much more significant result: Zeppelin: *Total Time Across All Tasks: * 2,6 min *Input Size / Records: * 2.4 GB / 7314771 *Shuffle Write: * 673.5 MB / 7314771 vs Spark-shell: *Total Time Across All Tasks: * 28 min *Input Size / Records: * 3.6 GB / 7314771 *Shuffle Write: * 9.0 GB / 7314771 This is one of the early stages, which reads from a cached partition and then feeds into a join-stage. The latter stages show similar behaviour in producing excessive shuffle spills. Quite often the excessive shuffle volume will lead to massive shuffle spills which ultimately kill not only performance, but the actual executors as well. I have examined the Environment tab in the SParkUI and identified no notable difference besides FAIR
RE: Failed to fetch block error
From the log, it looks like the OS user who is running spark cannot open any more file. Check your ulimit setting for that user: ulimit -aopen files (-n) 65536 Date: Tue, 18 Aug 2015 22:06:04 -0700 From: swethakasire...@gmail.com To: user@spark.apache.org Subject: Failed to fetch block error Hi, I see the following error in my Spark Job even after using like 100 cores and 16G memory. Did any of you experience the same problem earlier? 15/08/18 21:51:23 ERROR shuffle.RetryingBlockFetcher: Failed to fetch block input-0-1439959114400, and will not retry (0 retries) java.lang.RuntimeException: java.io.FileNotFoundException: /data1/spark/spark-aed30958-2ee1-4eb7-984e-6402fb0a0503/blockmgr-ded36b52-ccc7-48dc-ba05-65bb21fc4136/34/input-0-1439959114400 (Too many open files) at java.io.RandomAccessFile.open(Native Method) at java.io.RandomAccessFile.init(RandomAccessFile.java:241) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:110) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:134) at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:511) at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:302) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Failed-to-fetch-block-error-tp24335.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Strange shuffle behaviour difference between Zeppelin and Spark-shell
No, the setup is one driver with 32g of memory, and three executors each with 8g of memory in both cases. No core-number has been specified, thus it should default to single-core (though I've seen the yarn-owned jvms wrapping the executors take up to 3 cores in top). That is, unless, as I suggested, there are different defaults for the two means of job submission that come into play in a non-transparent fashion (i.e. not visible in SparkConf). On Wed, Aug 19, 2015 at 1:36 PM, Igor Berman igor.ber...@gmail.com wrote: any differences in number of cores, memory settings for executors? On 19 August 2015 at 09:49, Rick Moritz rah...@gmail.com wrote: Dear list, I am observing a very strange difference in behaviour between a Spark 1.4.0-rc4 REPL (locally compiled with Java 7) and a Spark 1.4.0 zeppelin interpreter (compiled with Java 6 and sourced from maven central). The workflow loads data from Hive, applies a number of transformations (including quite a lot of shuffle operations) and then presents an enriched dataset. The code (an resulting DAGs) are identical in each case. The following particularities are noted: Importing the HiveRDD and caching it yields identical results on both platforms. Applying case classes, leads to a 2-2.5MB increase in dataset size per partition (excepting empty partitions). Writing shuffles shows this much more significant result: Zeppelin: *Total Time Across All Tasks: * 2,6 min *Input Size / Records: * 2.4 GB / 7314771 *Shuffle Write: * 673.5 MB / 7314771 vs Spark-shell: *Total Time Across All Tasks: * 28 min *Input Size / Records: * 3.6 GB / 7314771 *Shuffle Write: * 9.0 GB / 7314771 This is one of the early stages, which reads from a cached partition and then feeds into a join-stage. The latter stages show similar behaviour in producing excessive shuffle spills. Quite often the excessive shuffle volume will lead to massive shuffle spills which ultimately kill not only performance, but the actual executors as well. I have examined the Environment tab in the SParkUI and identified no notable difference besides FAIR (Zeppelin) vs FIFO (spark-shell) scheduling mode. I fail to see how this would impact shuffle writes in such a drastic way, since it should be on the inter-job level, while this happens at the inter-stage level. I was somewhat supicious of maybe compression or serialization playing a role, but the SparkConf points to those being set to the default. Also Zeppelin's interpreter adds no relevant additional default parameters. I performed a diff between rc4 (which was later released) and 1.4.0 and as expected there were no differences, besides a single class (remarkably, a shuffle-relevant class: /org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.class ) differing in its binary representation due to being compiled with Java 7 instead of Java 6. The decompiled sources of those two are again identical. I may attempt as a next step to simply replace that file in the packaged jar, to ascertain that indeed there is no difference between the two versions, but would consider this to be a major bg, if a simple compiler change leads to this kind of issue. I a also open for any other ideas, in particular to verify that the same compression/serialization is indeed happening, and regarding ways to determin what exactly is written into these shuffles -- currently I only know that the tuples are bigger (or smaller) than they ought to be. The Zeppelin-obtained results do appear to be consistent at least, thus the suspicion is, that there is an issue with the process launched from spark-shell. I will also attempt to build a spark job and spark-submit it using different spark-binaries to further explore the issue. Best Regards, Rick Moritz PS: I already tried to send this mail yesterday, but it never made it onto the list, as far as I can tell -- I apologize should anyone receive this as a second copy.
blogs/articles/videos on how to analyse spark performance
Hi, I would ask if there are some blogs/articles/videos on how to analyse spark performance during runtime,eg, tools that can be used or something related.
Re: how do I execute a job on a single worker node in standalone mode
That worked great, thanks Andrew. On Tue, Aug 18, 2015 at 1:39 PM, Andrew Or and...@databricks.com wrote: Hi Axel, You can try setting `spark.deploy.spreadOut` to false (through your conf/spark-defaults.conf file). What this does is essentially try to schedule as many cores on one worker as possible before spilling over to other workers. Note that you *must* restart the cluster through the sbin scripts. For more information see: http://spark.apache.org/docs/latest/spark-standalone.html. Feel free to let me know whether it works, -Andrew 2015-08-18 4:49 GMT-07:00 Igor Berman igor.ber...@gmail.com: by default standalone creates 1 executor on every worker machine per application number of overall cores is configured with --total-executor-cores so in general if you'll specify --total-executor-cores=1 then there would be only 1 core on some executor and you'll get what you want on the other hand, if you application needs all cores of your cluster and only some specific job should run on single executor there are few methods to achieve this e.g. coallesce(1) or dummyRddWithOnePartitionOnly.foreachPartition On 18 August 2015 at 01:36, Axel Dahl a...@whisperstream.com wrote: I have a 4 node cluster and have been playing around with the num-executors parameters, executor-memory and executor-cores I set the following: --executor-memory=10G --num-executors=1 --executor-cores=8 But when I run the job, I see that each worker, is running one executor which has 2 cores and 2.5G memory. What I'd like to do instead is have Spark just allocate the job to a single worker node? Is that possible in standalone mode or do I need a job/resource scheduler like Yarn to do that? Thanks in advance, -Axel
Re: SQLContext Create Table Problem
Can you try to use HiveContext instead of SQLContext? Your query is trying to create a table and persist the metadata of the table in metastore, which is only supported by HiveContext. On Wed, Aug 19, 2015 at 8:44 AM, Yusuf Can Gürkan yu...@useinsider.com wrote: Hello, I’m trying to create a table with sqlContext.sql method as below: *val sc = new SparkContext()* *val sqlContext = new SQLContext(sc)* *import sqlContext.implicits._* *sqlContext.sql(s* *create table if not exists landing (* *date string,* *referrer string* *)* *partitioned by (partnerid string,dt string)* *row format delimited fields terminated by '\t' lines terminated by '\n'* *STORED AS TEXTFILE LOCATION 's3n://...'* * ”)* It gives error on spark-submit: *Exception in thread main java.lang.RuntimeException: [2.1] failure: ``with'' expected but identifier create found* *create external table if not exists landing (* *^* * at scala.sys.package$.error(package.scala:27)* * at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.parse(AbstractSparkSQLParser.scala:36)* * at org.apache.spark.sql.catalyst.DefaultParserDialect.parse(ParserDialect.scala:67)* What can be the reason??
Spark Streaming: Some issues (Could not compute split, block —— not found) and questions
Some background on what we're trying to do: We have four Kinesis receivers with varying amounts of data coming through them. Ultimately we work on a unioned stream that is getting about 11 MB/second of data. We use a batch size of 5 seconds. We create four distinct DStreams from this data that have different aggregation computations (various combinations of map/flatMap/reduceByKeyAndWindow and then finishing by serializing the records to JSON strings and writing them to S3). We want to do 30 minute windows of computations on this data, to get a better compression rate for the aggregates (there are a lot of repeated keys across this time frame, and we want to combine them all -- we do this using reduceByKeyAndWindow). But even when trying to do 5 minute windows, we have issues with Could not compute split, block —— not found. This is being run on a YARN cluster and it seems like the executors are getting killed even though they should have plenty of memory. Also, it seems like no computation actually takes place until the end of the window duration. This seems inefficient if there is a lot of data that you know is going to be needed for the computation. Is there any good way around this? There are some of the configuration settings we are using for Spark: spark.executor.memory=26000M,\ spark.executor.cores=4,\ spark.executor.instances=5,\ spark.driver.cores=4,\ spark.driver.memory=24000M,\ spark.default.parallelism=128,\ spark.streaming.blockInterval=100ms,\ spark.streaming.receiver.maxRate=2,\ spark.akka.timeout=300,\ spark.storage.memoryFraction=0.6,\ spark.rdd.compress=true,\ spark.executor.instances=16,\ spark.serializer=org.apache.spark.serializer.KryoSerializer,\ spark.kryoserializer.buffer.max=2047m,\ Is this the correct way to do this, and how can I further debug to figure out this issue? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Some-issues-Could-not-compute-split-block-not-found-and-questions-tp24342.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SQLContext Create Table Problem
Hey Yin, Thanks for answer. I thought that this could be problem but i can not create HiveContext because i can not import org.apache.spark.sql.hive.HiveContext. It does not see this package. I read that i should build spark with -PHive but i’m running on Amazon EMR 1.4.1 and on spark-shell i can import hive package but can not do the same on spark-submit. Do you have any idea why? Because if it’s related to build with -PHive, how can i import it in spark-shell? On 19 Aug 2015, at 18:59, Yin Huai yh...@databricks.com wrote: Can you try to use HiveContext instead of SQLContext? Your query is trying to create a table and persist the metadata of the table in metastore, which is only supported by HiveContext. On Wed, Aug 19, 2015 at 8:44 AM, Yusuf Can Gürkan yu...@useinsider.com mailto:yu...@useinsider.com wrote: Hello, I’m trying to create a table with sqlContext.sql method as below: val sc = new SparkContext() val sqlContext = new SQLContext(sc) import sqlContext.implicits._ sqlContext.sql(s create table if not exists landing ( date string, referrer string ) partitioned by (partnerid string,dt string) row format delimited fields terminated by '\t' lines terminated by '\n' STORED AS TEXTFILE LOCATION 's3n://...' ”) It gives error on spark-submit: Exception in thread main java.lang.RuntimeException: [2.1] failure: ``with'' expected but identifier create found create external table if not exists landing ( ^ at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.parse(AbstractSparkSQLParser.scala:36) at org.apache.spark.sql.catalyst.DefaultParserDialect.parse(ParserDialect.scala:67) What can be the reason??
How to overwrite partition when writing Parquet?
Hello, I have a DataFrame, with a date column which I want to use as a partition. Each day I want to write the data for the same date in Parquet, and then read a dataframe for a date range. I'm using: myDataframe.write().partitionBy(date).mode(SaveMode.Overwrite).parquet(parquetDir); If I use SaveMode.Append, then writing data for the same partition adds the same data there again. If I use SaveMode.Overwrite, then writing data for a single partition removes all the data for all partitions. How can I overwrite only a given partition or manually remove a partition before writing? Many thanks! Romi K.
Re: blogs/articles/videos on how to analyse spark performance
Excellent resource: http://www.oreilly.com/pub/e/3330 And more amazing is the fact that the presenter actually responds to your questions. Regards, Gourav Sengupta On Wed, Aug 19, 2015 at 4:12 PM, Todd bit1...@163.com wrote: Hi, I would ask if there are some blogs/articles/videos on how to analyse spark performance during runtime,eg, tools that can be used or something related.
Re: how do I execute a job on a single worker node in standalone mode
hmm maybe I spoke too soon. I have an apache zeppelin instance running and have configured it to use 48 cores (each node only has 16 cores), so I figured by setting it to 48, would mean that spark would grab 3 nodes. what happens instead though is that spark, reports that 48 cores are being used, but only executes everything on 1 node, it looks like it's not grabbing the extra nodes. On Wed, Aug 19, 2015 at 8:43 AM, Axel Dahl a...@whisperstream.com wrote: That worked great, thanks Andrew. On Tue, Aug 18, 2015 at 1:39 PM, Andrew Or and...@databricks.com wrote: Hi Axel, You can try setting `spark.deploy.spreadOut` to false (through your conf/spark-defaults.conf file). What this does is essentially try to schedule as many cores on one worker as possible before spilling over to other workers. Note that you *must* restart the cluster through the sbin scripts. For more information see: http://spark.apache.org/docs/latest/spark-standalone.html. Feel free to let me know whether it works, -Andrew 2015-08-18 4:49 GMT-07:00 Igor Berman igor.ber...@gmail.com: by default standalone creates 1 executor on every worker machine per application number of overall cores is configured with --total-executor-cores so in general if you'll specify --total-executor-cores=1 then there would be only 1 core on some executor and you'll get what you want on the other hand, if you application needs all cores of your cluster and only some specific job should run on single executor there are few methods to achieve this e.g. coallesce(1) or dummyRddWithOnePartitionOnly.foreachPartition On 18 August 2015 at 01:36, Axel Dahl a...@whisperstream.com wrote: I have a 4 node cluster and have been playing around with the num-executors parameters, executor-memory and executor-cores I set the following: --executor-memory=10G --num-executors=1 --executor-cores=8 But when I run the job, I see that each worker, is running one executor which has 2 cores and 2.5G memory. What I'd like to do instead is have Spark just allocate the job to a single worker node? Is that possible in standalone mode or do I need a job/resource scheduler like Yarn to do that? Thanks in advance, -Axel
Re: blogs/articles/videos on how to analyse spark performance
you don't need to register, search in youtube for this video... On 19 August 2015 at 18:34, Gourav Sengupta gourav.sengu...@gmail.com wrote: Excellent resource: http://www.oreilly.com/pub/e/3330 And more amazing is the fact that the presenter actually responds to your questions. Regards, Gourav Sengupta On Wed, Aug 19, 2015 at 4:12 PM, Todd bit1...@163.com wrote: Hi, I would ask if there are some blogs/articles/videos on how to analyse spark performance during runtime,eg, tools that can be used or something related.
Re: Too many files/dirs in hdfs
My question was how to do this in Hadoop? Could somebody point me to some examples? On Tue, Aug 18, 2015 at 10:43 PM, UMESH CHAUDHARY umesh9...@gmail.com wrote: Of course, Java or Scala can do that: 1) Create a FileWriter with append or roll over option 2) For each RDD create a StringBuilder after applying your filters 3) Write this StringBuilder to File when you want to write (The duration can be defined as a condition) On Tue, Aug 18, 2015 at 11:05 PM, Mohit Anchlia mohitanch...@gmail.com wrote: Is there a way to store all the results in one file and keep the file roll over separate than the spark streaming batch interval? On Mon, Aug 17, 2015 at 2:39 AM, UMESH CHAUDHARY umesh9...@gmail.com wrote: In Spark Streaming you can simply check whether your RDD contains any records or not and if records are there you can save them using FIleOutputStream: DStream.foreachRDD(t= { var count = t.count(); if (count0){ // SAVE YOUR STUFF} }; This will not create unnecessary files of 0 bytes. On Mon, Aug 17, 2015 at 2:51 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Currently, spark streaming would create a new directory for every batch and store the data to it (whether it has anything or not). There is no direct append call as of now, but you can achieve this either with FileUtil.copyMerge http://apache-spark-user-list.1001560.n3.nabble.com/save-spark-streaming-output-to-single-file-on-hdfs-td21124.html#a21167 or have a separate program which will do the clean up for you. Thanks Best Regards On Sat, Aug 15, 2015 at 5:20 AM, Mohit Anchlia mohitanch...@gmail.com wrote: Spark stream seems to be creating 0 bytes files even when there is no data. Also, I have 2 concerns here: 1) Extra unnecessary files is being created from the output 2) Hadoop doesn't work really well with too many files and I see that it is creating a directory with a timestamp every 1 second. Is there a better way of writing a file, may be use some kind of append mechanism where one doesn't have to change the batch interval.
SQLContext load. Filtering files
Hi. I'd like to read Avro files using this library https://github.com/databricks/spark-avro I need to load several files from a folder, not all files. Is there some functionality to filter the files to load? And... Is is possible to know the name of the files loaded from a folder? My problem is that I have a folder where an external process is inserting files every X minutes and I need process these files once, and I can't move, rename or copy the source files. Thanks -- Regards Miguel Ángel
Re: broadcast variable of Kafka producer throws ConcurrentModificationException
I'm glad that I could help :) 19 sie 2015 8:52 AM Shenghua(Daniel) Wan wansheng...@gmail.com napisał(a): +1 I wish I have read this blog earlier. I am using Java and have just implemented a singleton producer per executor/JVM during the day. Yes, I did see that NonSerializableException when I was debugging the code ... Thanks for sharing. On Tue, Aug 18, 2015 at 10:59 PM, Tathagata Das t...@databricks.com wrote: Its a cool blog post! Tweeted it! Broadcasting the configuration necessary for lazily instantiating the producer is a good idea. Nitpick: The first code example has an extra `}` ;) On Tue, Aug 18, 2015 at 10:49 PM, Marcin Kuthan marcin.kut...@gmail.com wrote: As long as Kafka producent is thread-safe you don't need any pool at all. Just share single producer on every executor. Please look at my blog post for more details. http://allegro.tech/spark-kafka-integration.html 19 sie 2015 2:00 AM Shenghua(Daniel) Wan wansheng...@gmail.com napisał(a): All of you are right. I was trying to create too many producers. My idea was to create a pool(for now the pool contains only one producer) shared by all the executors. After I realized it was related to the serializable issues (though I did not find clear clues in the source code to indicate the broacast template type parameter must be implement serializable), I followed spark cassandra connector design and created a singleton of Kafka producer pools. There is not exception noticed. Thanks for all your comments. On Tue, Aug 18, 2015 at 4:28 PM, Tathagata Das t...@databricks.com wrote: Why are you even trying to broadcast a producer? A broadcast variable is some immutable piece of serializable DATA that can be used for processing on the executors. A Kafka producer is neither DATA nor immutable, and definitely not serializable. The right way to do this is to create the producer in the executors. Please see the discussion in the programming guide http://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams On Tue, Aug 18, 2015 at 3:08 PM, Cody Koeninger c...@koeninger.org wrote: I wouldn't expect a kafka producer to be serializable at all... among other things, it has a background thread On Tue, Aug 18, 2015 at 4:55 PM, Shenghua(Daniel) Wan wansheng...@gmail.com wrote: Hi, Did anyone see java.util.ConcurrentModificationException when using broadcast variables? I encountered this exception when wrapping a Kafka producer like this in the spark streaming driver. Here is what I did. KafkaProducerString, String producer = new KafkaProducerString, String(properties); final BroadcastKafkaDataProducer bCastProducer = streamingContext.sparkContext().broadcast(producer); Then within an closure called by a foreachRDD, I was trying to get the wrapped producer, i.e. KafkaProducerString, String p = bCastProducer.value(); after rebuilding and rerunning, I got the stack trace like this Exception in thread main com.esotericsoftware.kryo.KryoException: java.util.ConcurrentModificationException Serialization trace: classes (sun.misc.Launcher$AppClassLoader) classloader (java.security.ProtectionDomain) context (java.security.AccessControlContext) acc (org.apache.spark.util.MutableURLClassLoader) contextClassLoader (org.apache.kafka.common.utils.KafkaThread) ioThread (org.apache.kafka.clients.producer.KafkaProducer) producer (my driver) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at
Re: Spark Streaming failing on YARN Cluster
I'm getting some spark exception. Please look this log trace ( *http://pastebin.com/xL9jaRUa http://pastebin.com/xL9jaRUa* ). *Thanks*, https://in.linkedin.com/in/ramkumarcs31 On Wed, Aug 19, 2015 at 10:20 PM, Hari Shreedharan hshreedha...@cloudera.com wrote: It looks like you are having issues with the files getting distributed to the cluster. What is the exception you are getting now? On Wednesday, August 19, 2015, Ramkumar V ramkumar.c...@gmail.com wrote: Thanks a lot for your suggestion. I had modified HADOOP_CONF_DIR in spark-env.sh so that core-site.xml is under HADOOP_CONF_DIR. i can able to see the logs like that you had shown above. Now i can able to run for 3 minutes and store results between every minutes. After sometimes, there is an exception. How to fix this exception ? and Can you please explain where its going wrong ? *Log Link : http://pastebin.com/xL9jaRUa http://pastebin.com/xL9jaRUa * *Thanks*, https://in.linkedin.com/in/ramkumarcs31 On Wed, Aug 19, 2015 at 1:54 PM, Jeff Zhang zjf...@gmail.com wrote: HADOOP_CONF_DIR is the environment variable point to the hadoop conf directory. Not sure how CDH organize that, make sure core-site.xml is under HADOOP_CONF_DIR. On Wed, Aug 19, 2015 at 4:06 PM, Ramkumar V ramkumar.c...@gmail.com wrote: We are using Cloudera-5.3.1. since it is one of the earlier version of CDH, it doesnt supports the latest version of spark. So i installed spark-1.4.1 separately in my machine. I couldnt able to do spark-submit in cluster mode. How to core-site.xml under classpath ? it will be very helpful if you could explain in detail to solve this issue. *Thanks*, https://in.linkedin.com/in/ramkumarcs31 On Fri, Aug 14, 2015 at 8:25 AM, Jeff Zhang zjf...@gmail.com wrote: 1. 15/08/12 13:24:49 INFO Client: Source and destination file systems are the same. Not copying file:/home/hdfs/spark-1.4.1/assembly/target/scala-2.10/spark-assembly-1.4.1-hadoop2.5.0-cdh5.3.5.jar 2. 15/08/12 13:24:49 INFO Client: Source and destination file systems are the same. Not copying file:/home/hdfs/spark-1.4.1/external/kafka-assembly/target/spark-streaming-kafka-assembly_2.10-1.4.1.jar 3. 15/08/12 13:24:49 INFO Client: Source and destination file systems are the same. Not copying file:/home/hdfs/spark-1.4.1/python/lib/pyspark.zip 4. 15/08/12 13:24:49 INFO Client: Source and destination file systems are the same. Not copying file:/home/hdfs/spark-1.4.1/python/lib/py4j-0.8.2.1-src.zip 5. 15/08/12 13:24:49 INFO Client: Source and destination file systems are the same. Not copying file:/home/hdfs/spark-1.4.1/examples/src/main/python/streaming/kyt.py 6. 1. diagnostics: Application application_1437639737006_3808 failed 2 times due to AM Container for appattempt_1437639737006_3808_02 exited with exitCode: -1000 due to: File file:/home/hdfs/spark-1.4.1/python/lib/pyspark.zip does not exist 2. .Failing this attempt.. Failing the application. The machine you run spark is the client machine, while the yarn AM is running on another machine. And the yarn AM complains that the files are not found as your logs shown. From the logs, its seems that these files are not copied to the HDFS as local resources. I doubt that you didn't put core-site.xml under your classpath, so that spark can not detect your remote file system and won't copy the files to hdfs as local resources. Usually in yarn-cluster mode, you should be able to see the logs like following. 15/08/14 10:48:49 INFO yarn.Client: Preparing resources for our AM container 15/08/14 10:48:49 INFO yarn.Client: Uploading resource file:/Users/abc/github/spark/assembly/target/scala-2.10/spark-assembly-1.5.0-SNAPSHOT-hadoop2.6.0.jar - hdfs:// 0.0.0.0:9000/user/abc/.sparkStaging/application_1439432662178_0019/spark-assembly-1.5.0-SNAPSHOT-hadoop2.6.0.jar 15/08/14 10:48:50 INFO yarn.Client: Uploading resource file:/Users/abc/github/spark/spark.py - hdfs:// 0.0.0.0:9000/user/abc/.sparkStaging/application_1439432662178_0019/spark.py 15/08/14 10:48:50 INFO yarn.Client: Uploading resource file:/Users/abc/github/spark/python/lib/pyspark.zip - hdfs:// 0.0.0.0:9000/user/abc/.sparkStaging/application_1439432662178_0019/pyspark.zip On Thu, Aug 13, 2015 at 2:50 PM, Ramkumar V ramkumar.c...@gmail.com wrote: Hi, I have a cluster of 1 master and 2 slaves. I'm running a spark streaming in master and I want to utilize all nodes in my cluster. i had specified some parameters like driver memory and executor memory in my code. when i give --deploy-mode cluster --master yarn-cluster in my spark-submit, it gives the following error. Log link : *http://pastebin.com/kfyVWDGR http://pastebin.com/kfyVWDGR* How to fix this issue ? Please help me if i'm doing wrong. *Thanks*, Ramkumar V -- Best Regards Jeff Zhang -- Best Regards Jeff Zhang -- Thanks, Hari
Re: spark streaming 1.3 doubts(force it to not consume anything)
will try scala. Only Reason of not using scala is - never worked on that. On Wed, Aug 19, 2015 at 7:34 PM, Cody Koeninger c...@koeninger.org wrote: Is there a reason not to just use scala? It's not a lot of code... and it'll be even less code in scala ;) On Wed, Aug 19, 2015 at 4:14 AM, Shushant Arora shushantaror...@gmail.com wrote: To correct myself - KafkaRDD[K, V, U, T, R] is subclass of RDD[R] but OptionKafkaRDD[K, V, U, T, R] is not subclass of OptionRDD[R]; In scala C[T’] is a subclass of C[T] as per https://twitter.github.io/scala_school/type-basics.html but this is not allowed in java. So is there any workaround to achieve this in java for overriding DirectKafkaInputDStream ? On Wed, Aug 19, 2015 at 12:45 AM, Shushant Arora shushantaror...@gmail.com wrote: But KafkaRDD[K, V, U, T, R] is not subclass of RDD[R] as per java generic inheritance is not supported so derived class cannot return different genric typed subclass from overriden method. On Wed, Aug 19, 2015 at 12:02 AM, Cody Koeninger c...@koeninger.org wrote: Option is covariant and KafkaRDD is a subclass of RDD On Tue, Aug 18, 2015 at 1:12 PM, Shushant Arora shushantaror...@gmail.com wrote: Is it that in scala its allowed for derived class to have any return type ? And streaming jar is originally created in scala so its allowed for DirectKafkaInputDStream to return Option[KafkaRDD[K, V, U, T, R]] compute method ? On Tue, Aug 18, 2015 at 8:36 PM, Shushant Arora shushantaror...@gmail.com wrote: looking at source code of org.apache.spark.streaming.kafka.DirectKafkaInputDStream override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] = { val untilOffsets = clamp(latestLeaderOffsets(maxRetries)) val rdd = KafkaRDD[K, V, U, T, R]( context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler) currentOffsets = untilOffsets.map(kv = kv._1 - kv._2.offset) Some(rdd) } But in DStream its def compute (validTime: Time): Option[RDD[T]] , So what should be the return type of custom DStream extends DirectKafkaInputDStream . Since I want the behaviour to be same as of DirectKafkaInputDStream in normal scenarios and return none in specific scenario. And why the same error did not come while extending DirectKafkaInputDStream from InputDStream ? Since new return type Option[KafkaRDD[K, V, U, T, R]] is not subclass of Option[RDD[T] so it should have been failed? On Tue, Aug 18, 2015 at 7:28 PM, Cody Koeninger c...@koeninger.org wrote: The superclass method in DStream is defined as returning an Option[RDD[T]] On Tue, Aug 18, 2015 at 8:07 AM, Shushant Arora shushantaror...@gmail.com wrote: Getting compilation error while overriding compute method of DirectKafkaInputDStream. [ERROR] CustomDirectKafkaInputDstream.java:[51,83] compute(org.apache.spark.streaming.Time) in CustomDirectKafkaInputDstream cannot override compute(org.apache.spark.streaming.Time) in org.apache.spark.streaming.dstream.DStream; attempting to use incompatible return type [ERROR] found : scala.Optionorg.apache.spark.streaming.kafka.KafkaRDDbyte[],byte[],kafka.serializer.DefaultDecoder,kafka.serializer.DefaultDecoder,byte[][] [ERROR] required: scala.Optionorg.apache.spark.rdd.RDDbyte[][] class : public class CustomDirectKafkaInputDstream extends DirectKafkaInputDStreambyte[], byte[], kafka.serializer.DefaultDecoder, kafka.serializer.DefaultDecoder, byte[][]{ @Override public OptionKafkaRDDbyte[], byte[], DefaultDecoder, DefaultDecoder, byte[][] compute( Time validTime) { int processed=processedCounter.value(); int failed = failedProcessingsCounter.value(); if((processed==failed)){ System.out.println(backing off since its 100 % failure); return Option.empty(); }else{ System.out.println(starting the stream ); return super.compute(validTime); } } } What should be the return type of compute method ? super class is returning OptionKafkaRDDbyte[], byte[], DefaultDecoder, DefaultDecoder, byte[][] but its expecting scala.Optionorg.apache.spark.rdd.RDDbyte[][] from derived class . Is there something wring with code? On Mon, Aug 17, 2015 at 7:08 PM, Cody Koeninger c...@koeninger.org wrote: Look at the definitions of the java-specific KafkaUtils.createDirectStream methods (the ones that take a JavaStreamingContext) On Mon, Aug 17, 2015 at 5:13 AM, Shushant Arora shushantaror...@gmail.com wrote: How to create classtag in java ?Also Constructor of DirectKafkaInputDStream takes Function1 not Function but kafkautils.createDirectStream allows function. I have below as overriden DirectKafkaInputDStream. public class CustomDirectKafkaInputDstream extends DirectKafkaInputDStreambyte[], byte[], kafka.serializer.DefaultDecoder, kafka.serializer.DefaultDecoder, byte[][]{ public CustomDirectKafkaInputDstream( StreamingContext ssc_, MapString, String kafkaParams,
PySpark on Mesos - Scaling
I'm running a pyspark program on a Mesos cluster and seeing behavior where: * The first three stages run with multiple (10-15) tasks. * The fourth stage runs with only one task. * It is using 10 cpus, which is 5 machines in this configuration * It is very slow I would like it to use more resources and more are available on the cluster. I've tried setting spark.driver.cores and spark.executor.memory to no avail. Can someone suggest (a) how I go about debugging why this is happening in the first place (b) how to configure it to use more resources? If this is answered elsewhere, I was unable to find it and would appreciate a link. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/PySpark-on-Mesos-Scaling-tp24347.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: how do I execute a job on a single worker node in standalone mode
Hi Axel, what spark version are you using? Also, what do your configurations look like for the following? - spark.cores.max (also --total-executor-cores) - spark.executor.cores (also --executor-cores) 2015-08-19 9:27 GMT-07:00 Axel Dahl a...@whisperstream.com: hmm maybe I spoke too soon. I have an apache zeppelin instance running and have configured it to use 48 cores (each node only has 16 cores), so I figured by setting it to 48, would mean that spark would grab 3 nodes. what happens instead though is that spark, reports that 48 cores are being used, but only executes everything on 1 node, it looks like it's not grabbing the extra nodes. On Wed, Aug 19, 2015 at 8:43 AM, Axel Dahl a...@whisperstream.com wrote: That worked great, thanks Andrew. On Tue, Aug 18, 2015 at 1:39 PM, Andrew Or and...@databricks.com wrote: Hi Axel, You can try setting `spark.deploy.spreadOut` to false (through your conf/spark-defaults.conf file). What this does is essentially try to schedule as many cores on one worker as possible before spilling over to other workers. Note that you *must* restart the cluster through the sbin scripts. For more information see: http://spark.apache.org/docs/latest/spark-standalone.html. Feel free to let me know whether it works, -Andrew 2015-08-18 4:49 GMT-07:00 Igor Berman igor.ber...@gmail.com: by default standalone creates 1 executor on every worker machine per application number of overall cores is configured with --total-executor-cores so in general if you'll specify --total-executor-cores=1 then there would be only 1 core on some executor and you'll get what you want on the other hand, if you application needs all cores of your cluster and only some specific job should run on single executor there are few methods to achieve this e.g. coallesce(1) or dummyRddWithOnePartitionOnly.foreachPartition On 18 August 2015 at 01:36, Axel Dahl a...@whisperstream.com wrote: I have a 4 node cluster and have been playing around with the num-executors parameters, executor-memory and executor-cores I set the following: --executor-memory=10G --num-executors=1 --executor-cores=8 But when I run the job, I see that each worker, is running one executor which has 2 cores and 2.5G memory. What I'd like to do instead is have Spark just allocate the job to a single worker node? Is that possible in standalone mode or do I need a job/resource scheduler like Yarn to do that? Thanks in advance, -Axel
Re: How to minimize shuffling on Spark dataframe Join?
If you create a PairRDD from the DataFrame, using dataFrame.toRDD().mapToPair(), then you can call partitionBy(someCustomPartitioner) which will partition the RDD by the key (of the pair). Then the operations on it (like joining with another RDD) will consider this partitioning. I'm not sure that DataFrames already support this. On Wed, Aug 12, 2015 at 11:16 AM Abdullah Anwar abdullah.ibn.an...@gmail.com wrote: Hi Hemant, Thank you for your replay. I think source of my dataframe is not partitioned on key, its an avro file where 'id' is a field .. but I don't know how to read a file and at the same time configure partition key. I couldn't find anything on SQLContext.read.load where you can set partition key. or in dataframe where you can set partition key. If it could partition the on the specified key .. will spark put the same partition range on same machine for two different dataframe?? What are the overall tips to join faster? Best Regards, Abdullah On Wed, Aug 12, 2015 at 11:02 AM, Hemant Bhanawat hemant9...@gmail.com wrote: Is the source of your dataframe partitioned on key? As per your mail, it looks like it is not. If that is the case, for partitioning the data, you will have to shuffle the data anyway. Another part of your question is - how to co-group data from two dataframes based on a key? I think for RDD's cogroup in PairRDDFunctions is a way. I am not sure if something similar is available for DataFrames. Hemant On Tue, Aug 11, 2015 at 2:14 PM, Abdullah Anwar abdullah.ibn.an...@gmail.com wrote: I have two dataframes like this student_rdf = (studentid, name, ...) student_result_rdf = (studentid, gpa, ...) we need to join this two dataframes. we are now doing like this, student_rdf.join(student_result_rdf, student_result_rdf[studentid] == student_rdf[studentid]) So it is simple. But it creates lots of data shuffling across worker nodes, but as joining key is similar and if the dataframe could (understand the partitionkey) be partitioned using that key (studentid) then there suppose not to be any shuffling at all. As similar data (based on partition key) would reside in similar node. is it possible, to hint spark to do this? So, I am finding the way to partition data based on a column while I read a dataframe from input. And If it is possible that Spark would understand that two partitionkey of two dataframes are similar, then how? -- Abdullah -- Abdullah
Re: Spark return key value pair
Dawid is right, if you did words.count it would be twice the number of input lines. You can use map like this: words = lines.map(mapper2) for i in words.take(10): msg = i[0] + :” + i[1] + \n” --- Robin East Spark GraphX in Action Michael Malak and Robin East Manning Publications Co. http://www.manning.com/malak/ http://www.manning.com/malak/ On 19 Aug 2015, at 12:19, Dawid Wysakowicz wysakowicz.da...@gmail.com wrote: I am not 100% sure but probably flatMap unwinds the tuples. Try with map instead. 2015-08-19 13:10 GMT+02:00 Jerry OELoo oylje...@gmail.com mailto:oylje...@gmail.com: Hi. I want to parse a file and return a key-value pair with pySpark, but result is strange to me. the test.sql is a big fie and each line is usename and password, with # between them, I use below mapper2 to map data, and in my understanding, i in words.take(10) should be a tuple, but the result is that i is username or password, this is strange for me to understand, Thanks for you help. def mapper2(line): words = line.split('#') return (words[0].strip(), words[1].strip()) def main2(sc): lines = sc.textFile(hdfs://master:9000/spark/test.sql) words = lines.flatMap(mapper2) for i in words.take(10): msg = i + : + \n -- Rejoice,I Desire! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org mailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org mailto:user-h...@spark.apache.org
Re: Issues with S3 paths that contain colons
I had the exact same issue, and overcame it by overriding NativeS3FileSystem with my own class, where I replaced the implementation of globStatus. It's a hack but it works. Then I set the hadoop config fs.myschema.impl to my class name, and accessed the files through myschema:// instead of s3n:// @Override public FileStatus[] globStatus(final Path pathPattern, final PathFilter filter) throws IOException { final FileStatus[] statusList = super.listStatus(pathPattern); final ListFileStatus result = Lists.newLinkedList(); for (FileStatus fileStatus : statusList) { if (filter.accept(fileStatus.getPath())) { result.add(fileStatus); } } return result.toArray(new FileStatus[] {}); } On Wed, Aug 19, 2015 at 9:14 PM Steve Loughran ste...@hortonworks.com wrote: you might want to think about filing a JIRA on issues.apache.org against HADOOP here, component being fs/s3. That doesn't mean it is fixable, only known. Every FS has its own set of forbidden characters filenames; unix doesn't files named .; windows doesn't allow files called COM1, ..., so hitting some filesystem rule is sometimes a problem. Here, though, you've got the file in S3, the listing finds it, but other bits of the codepath are failing -which implies that it is something in the Hadoop libs. On 18 Aug 2015, at 08:20, Brian Stempin brian.stem...@gmail.com wrote: Hi, I'm running Spark on Amazon EMR (Spark 1.4.1, Hadoop 2.6.0). I'm seeing the exception below when encountering file names that contain colons. Any idea on how to get around this? scala val files = sc.textFile(s3a://redactedbucketname/*) 2015-08-18 04:38:34,567 INFO [main] storage.MemoryStore (Logging.scala:logInfo(59)) - ensureFreeSpace(242224) called with curMem=669367, maxMem=285203496 2015-08-18 04:38:34,568 INFO [main] storage.MemoryStore (Logging.scala:logInfo(59)) - Block broadcast_3 stored as values in memory (estimated size 236.5 KB, free 271.1 MB) 2015-08-18 04:38:34,663 INFO [main] storage.MemoryStore (Logging.scala:logInfo(59)) - ensureFreeSpace(21533) called with curMem=911591, maxMem=285203496 2015-08-18 04:38:34,664 INFO [main] storage.MemoryStore (Logging.scala:logInfo(59)) - Block broadcast_3_piece0 stored as bytes in memory (estimated size 21.0 KB, free 271.1 MB) 2015-08-18 04:38:34,665 INFO [sparkDriver-akka.actor.default-dispatcher-19] storage.BlockManagerInfo (Logging.scala:logInfo(59)) - Added broadcast_3_piece0 in memory on 10.182.184.26:60338 (size: 21.0 KB, free: 271.9 MB) 2015-08-18 04:38:34,667 INFO [main] spark.SparkContext (Logging.scala:logInfo(59)) - Created broadcast 3 from textFile at console:21 files: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[7] at textFile at console:21 scala files.count 2015-08-18 04:38:37,262 INFO [main] s3a.S3AFileSystem (S3AFileSystem.java:listStatus(533)) - List status for path: s3a://redactedbucketname/ 2015-08-18 04:38:37,262 INFO [main] s3a.S3AFileSystem (S3AFileSystem.java:getFileStatus(684)) - Getting path status for s3a://redactedbucketname/ () java.lang.IllegalArgumentException: java.net.URISyntaxException: Relative path in absolute URI: [922-212-4438]-[119]-[1]-[2015-08-13T15:43:12.346193%5D-%5B2015-01-01T00:00:00%5D-redacted.csv at org.apache.hadoop.fs.Path.initialize(Path.java:206) at org.apache.hadoop.fs.Path.init(Path.java:172) at org.apache.hadoop.fs.Path.init(Path.java:94) at org.apache.hadoop.fs.Globber.glob(Globber.java:240) at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1700) at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:229) at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:200) at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:279) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:207) at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1781) at org.apache.spark.rdd.RDD.count(RDD.scala:1099) at $iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC.init(console:24) at $iwC$iwC$iwC$iwC$iwC$iwC$iwC.init(console:29) at $iwC$iwC$iwC$iwC$iwC$iwC.init(console:31) at $iwC$iwC$iwC$iwC$iwC.init(console:33) at $iwC$iwC$iwC$iwC.init(console:35) at $iwC$iwC$iwC.init(console:37) at
Re: Why use spark.history.fs.logDirectory instead of spark.eventLog.dir
Hi Canan, The event log dir is a per-application setting whereas the history server is an independent service that serves history UIs from many applications. If you use history server locally then the `spark.history.fs.logDirectory` will happen to point to `spark.eventLog.dir`, but the use case it provides is broader than that. -Andrew 2015-08-19 5:13 GMT-07:00 canan chen ccn...@gmail.com: Anyone know about this ? Or do I miss something here ? On Fri, Aug 7, 2015 at 4:20 PM, canan chen ccn...@gmail.com wrote: Is there any reason that historyserver use another property for the event log dir ? Thanks
Re: Difference between Sort based and Hash based shuffle
Yes, in other words, a bucket is a single file in hash-based shuffle (no consolidation), but a segment of partitioned file in sort-based shuffle. 2015-08-19 5:52 GMT-07:00 Muhammad Haseeb Javed 11besemja...@seecs.edu.pk: Thanks Andrew for a detailed response, So the reason why key value pairs with same keys are always found in a single buckets in Hash based shuffle but not in Sort is because in sort-shuffle each mapper writes a single partitioned file, and it is up to the reducer to fetch correct partitions from the the files ? On Wed, Aug 19, 2015 at 2:13 AM, Andrew Or and...@databricks.com wrote: Hi Muhammad, On a high level, in hash-based shuffle each mapper M writes R shuffle files, one for each reducer where R is the number of reduce partitions. This results in M * R shuffle files. Since it is not uncommon for M and R to be O(1000), this quickly becomes expensive. An optimization with hash-based shuffle is consolidation, where all mappers run in the same core C write one file per reducer, resulting in C * R files. This is a strict improvement, but it is still relatively expensive. Instead, in sort-based shuffle each mapper writes a single partitioned file. This allows a particular reducer to request a specific portion of each mapper's single output file. In more detail, the mapper first fills up an internal buffer in memory and continually spills the contents of the buffer to disk, then finally merges all the spilled files together to form one final output file. This places much less stress on the file system and requires much fewer I/O operations especially on the read side. -Andrew 2015-08-16 11:08 GMT-07:00 Muhammad Haseeb Javed 11besemja...@seecs.edu.pk: I did check it out and although I did get a general understanding of the various classes used to implement Sort and Hash shuffles, however these slides lack details as to how they are implemented and why sort generally has better performance than hash On Sun, Aug 16, 2015 at 4:31 AM, Ravi Kiran ravikiranmag...@gmail.com wrote: Have a look at this presentation. http://www.slideshare.net/colorant/spark-shuffle-introduction . Can be of help to you. On Sat, Aug 15, 2015 at 1:42 PM, Muhammad Haseeb Javed 11besemja...@seecs.edu.pk wrote: What are the major differences between how Sort based and Hash based shuffle operate and what is it that cause Sort Shuffle to perform better than Hash? Any talks that discuss both shuffles in detail, how they are implemented and the performance gains ?
Re: issue Running Spark Job on Yarn Cluster
Please look at more about hadoop logs, such as yarn logs -applicationId xxx attach more logs to this topic -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/issue-Running-Spark-Job-on-Yarn-Cluster-tp21779p24350.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to set the number of executors and tasks in a Spark Streaming job in Mesos
Hi, How to set the number of executors and tasks in a Spark Streaming job in Mesos? I have the following settings but my job still shows me 11 active tasks and 11 executors. Any idea as to why this is happening ? sparkConf.set(spark.mesos.coarse, true) sparkConf.set(spark.cores.max, 128) sparkConf.set(spark.default.parallelism, 100) //sparkConf.set(spark.locality.wait, 0) sparkConf.set(spark.executor.memory, 32g) sparkConf.set(spark.streaming.unpersist, true) sparkConf.set(spark.shuffle.io.numConnectionsPerPeer, 1) sparkConf.set(spark.rdd.compress, true) sparkConf.set(spark.shuffle.memoryFraction, .6) sparkConf.set(spark.storage.memoryFraction, .2) sparkConf.set(spark.shuffle.spill, true) sparkConf.set(spark.shuffle.spill.compress, true) sparkConf.set(spark.streaming.receiver.writeAheadLog.enable, true) sparkConf.set(spark.streaming.blockInterval, 400) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-set-the-number-of-executors-and-tasks-in-a-Spark-Streaming-job-in-Mesos-tp24348.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Why use spark.history.fs.logDirectory instead of spark.eventLog.dir
Thanks Andrew, make sense. On Thu, Aug 20, 2015 at 4:40 AM, Andrew Or and...@databricks.com wrote: Hi Canan, The event log dir is a per-application setting whereas the history server is an independent service that serves history UIs from many applications. If you use history server locally then the `spark.history.fs.logDirectory` will happen to point to `spark.eventLog.dir`, but the use case it provides is broader than that. -Andrew 2015-08-19 5:13 GMT-07:00 canan chen ccn...@gmail.com: Anyone know about this ? Or do I miss something here ? On Fri, Aug 7, 2015 at 4:20 PM, canan chen ccn...@gmail.com wrote: Is there any reason that historyserver use another property for the event log dir ? Thanks
Re:Re: How to automatically relaunch a Driver program after crashes?
I think Yarn ResourceManager has the mechanism to relaunch the driver on failure. But I am uncertain. Could someone help on this? Thanks. At 2015-08-19 16:37:32, Spark Enthusiast sparkenthusi...@yahoo.in wrote: Thanks for the reply. Are Standalone or Mesos the only options? Is there a way to auto relaunch if driver runs as a Hadoop Yarn Application? On Wednesday, 19 August 2015 12:49 PM, Todd bit1...@163.com wrote: There is an option for the spark-submit (Spark standalone or Mesos with cluster deploy mode only) --supervise If given, restarts the driver on failure. At 2015-08-19 14:55:39, Spark Enthusiast sparkenthusi...@yahoo.in wrote: Folks, As I see, the Driver program is a single point of failure. Now, I have seen ways as to how to make it recover from failures on a restart (using Checkpointing) but I have not seen anything as to how to restart it automatically if it crashes. Will running the Driver as a Hadoop Yarn Application do it? Can someone educate me as to how?
Spark return key value pair
Hi. I want to parse a file and return a key-value pair with pySpark, but result is strange to me. the test.sql is a big fie and each line is usename and password, with # between them, I use below mapper2 to map data, and in my understanding, i in words.take(10) should be a tuple, but the result is that i is username or password, this is strange for me to understand, Thanks for you help. def mapper2(line): words = line.split('#') return (words[0].strip(), words[1].strip()) def main2(sc): lines = sc.textFile(hdfs://master:9000/spark/test.sql) words = lines.flatMap(mapper2) for i in words.take(10): msg = i + : + \n -- Rejoice,I Desire! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark UI returning error 500 in yarn-client mode
Running Spark on YARN in yarn-client mode, everything appears to be working just fine except I can't access Spark UI via the Application Master link in YARN UI or directly at http://driverip:4040/jobs I get error 500, and the driver log shows the error pasted below: When running the same job using spark master mode, everything is fine. Any help is appreciated Error Stacktrace as appears in driver: WARN [2015-08-12 14:05:46,856] org.spark-project.jetty.server.AbstractHttpConnection: header full: java.lang.RuntimeException: Header6144 WARN [2015-08-12 14:05:46,857] org.spark-project.jetty.server.Response: Committed before 500 null WARN [2015-08-12 14:05:46,862] org.spark-project.jetty.server.AbstractHttpConnection:
RE: Spark Sql behaves strangely with tables with a lot of partitions
Yes, you can try set the spark.sql.sources.partitionDiscovery.enabled to false. BTW, which version are you using? Hao From: Jerrick Hoang [mailto:jerrickho...@gmail.com] Sent: Thursday, August 20, 2015 12:16 PM To: Philip Weaver Cc: user Subject: Re: Spark Sql behaves strangely with tables with a lot of partitions I guess the question is why does spark have to do partition discovery with all partitions when the query only needs to look at one partition? Is there a conf flag to turn this off? On Wed, Aug 19, 2015 at 9:02 PM, Philip Weaver philip.wea...@gmail.commailto:philip.wea...@gmail.com wrote: I've had the same problem. It turns out that Spark (specifically parquet) is very slow at partition discovery. It got better in 1.5 (not yet released), but was still unacceptably slow. Sadly, we ended up reading parquet files manually in Python (via C++) and had to abandon Spark SQL because of this problem. On Wed, Aug 19, 2015 at 7:51 PM, Jerrick Hoang jerrickho...@gmail.commailto:jerrickho...@gmail.com wrote: Hi all, I did a simple experiment with Spark SQL. I created a partitioned parquet table with only one partition (date=20140701). A simple `select count(*) from table where date=20140701` would run very fast (0.1 seconds). However, as I added more partitions the query takes longer and longer. When I added about 10,000 partitions, the query took way too long. I feel like querying for a single partition should not be affected by having more partitions. Is this a known behaviour? What does spark try to do here? Thanks, Jerrick
Re: Spark Sql behaves strangely with tables with a lot of partitions
I cloned from TOT after 1.5.0 cut off. I noticed there were a couple of CLs trying to speed up spark sql with tables with a huge number of partitions, I've made sure that those CLs are included but it's still very slow On Wed, Aug 19, 2015 at 10:43 PM, Cheng, Hao hao.ch...@intel.com wrote: Yes, you can try set the spark.sql.sources.partitionDiscovery.enabled to false. BTW, which version are you using? Hao *From:* Jerrick Hoang [mailto:jerrickho...@gmail.com] *Sent:* Thursday, August 20, 2015 12:16 PM *To:* Philip Weaver *Cc:* user *Subject:* Re: Spark Sql behaves strangely with tables with a lot of partitions I guess the question is why does spark have to do partition discovery with all partitions when the query only needs to look at one partition? Is there a conf flag to turn this off? On Wed, Aug 19, 2015 at 9:02 PM, Philip Weaver philip.wea...@gmail.com wrote: I've had the same problem. It turns out that Spark (specifically parquet) is very slow at partition discovery. It got better in 1.5 (not yet released), but was still unacceptably slow. Sadly, we ended up reading parquet files manually in Python (via C++) and had to abandon Spark SQL because of this problem. On Wed, Aug 19, 2015 at 7:51 PM, Jerrick Hoang jerrickho...@gmail.com wrote: Hi all, I did a simple experiment with Spark SQL. I created a partitioned parquet table with only one partition (date=20140701). A simple `select count(*) from table where date=20140701` would run very fast (0.1 seconds). However, as I added more partitions the query takes longer and longer. When I added about 10,000 partitions, the query took way too long. I feel like querying for a single partition should not be affected by having more partitions. Is this a known behaviour? What does spark try to do here? Thanks, Jerrick
creating data warehouse with Spark and running query with Hive
HI All, I have a data in HDFS partition with Year/month/data/event_type. And I am creating a hive tables with this data, this data is in JSON so I am using json serve and creating hive tables. below is the code val jsonFile = hiveContext.read.json(hdfs://localhost:9000/housing/events_real/category=Impressions/date=1007465766/*) jsonFile.toDF().printSchema() jsonFile.write.saveAsTable(JsonFileTable) jsonFile.toDF().printSchema() val events = hiveContext.sql(SELECT category, uid FROM JsonFileTable) events.map(e = Event: + e).collect().foreach(println) saveAstable failing with Error saying MKDir failed to create the directory ,anybody has any idea?
RE: Spark Sql behaves strangely with tables with a lot of partitions
Can you make some more profiling? I am wondering if the driver is busy with scanning the HDFS / S3. Like jstack pid of driver process And also, it’s will be great if you can paste the physical plan for the simple query. From: Jerrick Hoang [mailto:jerrickho...@gmail.com] Sent: Thursday, August 20, 2015 1:46 PM To: Cheng, Hao Cc: Philip Weaver; user Subject: Re: Spark Sql behaves strangely with tables with a lot of partitions I cloned from TOT after 1.5.0 cut off. I noticed there were a couple of CLs trying to speed up spark sql with tables with a huge number of partitions, I've made sure that those CLs are included but it's still very slow On Wed, Aug 19, 2015 at 10:43 PM, Cheng, Hao hao.ch...@intel.commailto:hao.ch...@intel.com wrote: Yes, you can try set the spark.sql.sources.partitionDiscovery.enabled to false. BTW, which version are you using? Hao From: Jerrick Hoang [mailto:jerrickho...@gmail.commailto:jerrickho...@gmail.com] Sent: Thursday, August 20, 2015 12:16 PM To: Philip Weaver Cc: user Subject: Re: Spark Sql behaves strangely with tables with a lot of partitions I guess the question is why does spark have to do partition discovery with all partitions when the query only needs to look at one partition? Is there a conf flag to turn this off? On Wed, Aug 19, 2015 at 9:02 PM, Philip Weaver philip.wea...@gmail.commailto:philip.wea...@gmail.com wrote: I've had the same problem. It turns out that Spark (specifically parquet) is very slow at partition discovery. It got better in 1.5 (not yet released), but was still unacceptably slow. Sadly, we ended up reading parquet files manually in Python (via C++) and had to abandon Spark SQL because of this problem. On Wed, Aug 19, 2015 at 7:51 PM, Jerrick Hoang jerrickho...@gmail.commailto:jerrickho...@gmail.com wrote: Hi all, I did a simple experiment with Spark SQL. I created a partitioned parquet table with only one partition (date=20140701). A simple `select count(*) from table where date=20140701` would run very fast (0.1 seconds). However, as I added more partitions the query takes longer and longer. When I added about 10,000 partitions, the query took way too long. I feel like querying for a single partition should not be affected by having more partitions. Is this a known behaviour? What does spark try to do here? Thanks, Jerrick
Re: Spark Interview Questions
Thank you All. I have updated it to a little better version. Regards, Sandeep Giri, +1 347 781 4573 (US) +91-953-899-8962 (IN) www.KnowBigData.com. http://KnowBigData.com. Phone: +1-253-397-1945 (Office) [image: linkedin icon] https://linkedin.com/company/knowbigdata [image: other site icon] http://knowbigdata.com [image: facebook icon] https://facebook.com/knowbigdata [image: twitter icon] https://twitter.com/IKnowBigData https://twitter.com/IKnowBigData On Mon, Aug 17, 2015 at 7:10 PM, Sandeep Giri sand...@knowbigdata.com wrote: This statement is from the Spark's website itself. Regards, Sandeep Giri, +1 347 781 4573 (US) +91-953-899-8962 (IN) www.KnowBigData.com. http://KnowBigData.com. Phone: +1-253-397-1945 (Office) [image: linkedin icon] https://linkedin.com/company/knowbigdata [image: other site icon] http://knowbigdata.com [image: facebook icon] https://facebook.com/knowbigdata [image: twitter icon] https://twitter.com/IKnowBigData https://twitter.com/IKnowBigData On Wed, Aug 12, 2015 at 10:42 PM, Peyman Mohajerian mohaj...@gmail.com wrote: I think this statement is inaccurate: Q7: What are Actions? A: An action brings back the data from the RDD to the local machine - Also I wouldn't say Spark is 100x faster than Hadoop and it is memory based. This is the kind of statement that will not get you the job. When it comes to shuffle it has to write to disk, it is a faster in many cases but 100x is just some marketing statement in a very narrow use cases. On Thu, Jul 30, 2015 at 4:55 AM, Sandeep Giri sand...@knowbigdata.com wrote: i have prepared some interview questions: http://www.knowbigdata.com/blog/interview-questions-apache-spark-part-1 http://www.knowbigdata.com/blog/interview-questions-apache-spark-part-2 please provide your feedback. On Wed, Jul 29, 2015, 23:43 Pedro Rodriguez ski.rodrig...@gmail.com wrote: You might look at the edx course on Apache Spark or ML with Spark. There are probably some homework problems or quiz questions that might be relevant. I haven't looked at the course myself, but thats where I would go first. https://www.edx.org/course/introduction-big-data-apache-spark-uc-berkeleyx-cs100-1x https://www.edx.org/course/scalable-machine-learning-uc-berkeleyx-cs190-1x -- Pedro Rodriguez PhD Student in Distributed Machine Learning | CU Boulder UC Berkeley AMPLab Alumni ski.rodrig...@gmail.com | pedrorodriguez.io | 208-340-1703 Github: github.com/EntilZha | LinkedIn: https://www.linkedin.com/in/pedrorodriguezscience
Strange shuffle behaviour difference between Zeppelin and Spark-shell
Dear list, I am observing a very strange difference in behaviour between a Spark 1.4.0-rc4 REPL (locally compiled with Java 7) and a Spark 1.4.0 zeppelin interpreter (compiled with Java 6 and sourced from maven central). The workflow loads data from Hive, applies a number of transformations (including quite a lot of shuffle operations) and then presents an enriched dataset. The code (an resulting DAGs) are identical in each case. The following particularities are noted: Importing the HiveRDD and caching it yields identical results on both platforms. Applying case classes, leads to a 2-2.5MB increase in dataset size per partition (excepting empty partitions). Writing shuffles shows this much more significant result: Zeppelin: *Total Time Across All Tasks: * 2,6 min *Input Size / Records: * 2.4 GB / 7314771 *Shuffle Write: * 673.5 MB / 7314771 vs Spark-shell: *Total Time Across All Tasks: * 28 min *Input Size / Records: * 3.6 GB / 7314771 *Shuffle Write: * 9.0 GB / 7314771 This is one of the early stages, which reads from a cached partition and then feeds into a join-stage. The latter stages show similar behaviour in producing excessive shuffle spills. Quite often the excessive shuffle volume will lead to massive shuffle spills which ultimately kill not only performance, but the actual executors as well. I have examined the Environment tab in the SParkUI and identified no notable difference besides FAIR (Zeppelin) vs FIFO (spark-shell) scheduling mode. I fail to see how this would impact shuffle writes in such a drastic way, since it should be on the inter-job level, while this happens at the inter-stage level. I was somewhat supicious of maybe compression or serialization playing a role, but the SparkConf points to those being set to the default. Also Zeppelin's interpreter adds no relevant additional default parameters. I performed a diff between rc4 (which was later released) and 1.4.0 and as expected there were no differences, besides a single class (remarkably, a shuffle-relevant class: /org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.class ) differing in its binary representation due to being compiled with Java 7 instead of Java 6. The decompiled sources of those two are again identical. I may attempt as a next step to simply replace that file in the packaged jar, to ascertain that indeed there is no difference between the two versions, but would consider this to be a major bg, if a simple compiler change leads to this kind of issue. I a also open for any other ideas, in particular to verify that the same compression/serialization is indeed happening, and regarding ways to determin what exactly is written into these shuffles -- currently I only know that the tuples are bigger (or smaller) than they ought to be. The Zeppelin-obtained results do appear to be consistent at least, thus the suspicion is, that there is an issue with the process launched from spark-shell. I will also attempt to build a spark job and spark-submit it using different spark-binaries to further explore the issue. Best Regards, Rick Moritz PS: I already tried to send this mail yesterday, but it never made it onto the list, as far as I can tell -- I apologize should anyone receive this as a second copy.
Re:How to automatically relaunch a Driver program after crashes?
There is an option for the spark-submit (Spark standalone or Mesos with cluster deploy mode only) --supervise If given, restarts the driver on failure. At 2015-08-19 14:55:39, Spark Enthusiast sparkenthusi...@yahoo.in wrote: Folks, As I see, the Driver program is a single point of failure. Now, I have seen ways as to how to make it recover from failures on a restart (using Checkpointing) but I have not seen anything as to how to restart it automatically if it crashes. Will running the Driver as a Hadoop Yarn Application do it? Can someone educate me as to how?
Re: What am I missing that's preventing javac from finding the libraries (CLASSPATH is setup...)?
Just add spark_1.4.1_yarn_shuffle.jar in ClassPath or create a New Maven project using below dependency: dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.11/artifactId version1.4.1/version /dependency dependency groupIdorg.apache.spark/groupId artifactIdspark-sql_2.11/artifactId version1.4.1/version /dependency On Tue, Aug 18, 2015 at 11:51 PM, Jerry jerry.c...@gmail.com wrote: So from what I understand, those usually pull dependencies for a given project? I'm able to run the spark shell so I'd assume I have everything. What am I missing from the big picture and what directory do I run maven on? Thanks, Jerry On Tue, Aug 18, 2015 at 11:15 AM, Ted Yu yuzhih...@gmail.com wrote: Normally people would establish maven project with Spark dependencies or, use sbt. Can you go with either approach ? Cheers On Tue, Aug 18, 2015 at 10:28 AM, Jerry jerry.c...@gmail.com wrote: Hello, So I setup Spark to run on my local machine to see if I can reproduce the issue I'm having with data frames, but I'm running into issues with the compiler. Here's what I got: $ echo $CLASSPATH /usr/lib/jvm/java-6-oracle/lib:/home/adminz/dev/spark/spark-1.4.1/lib/spark-assembly-1.4.1-hadoop2.6.0.jar javac Test.java Test.java:1: package org.apache.spark.sql.api.java does not exist import org.apache.spark.sql.api.java.*; ^ Test.java:6: package org.apache.spark.sql does not exist import org.apache.spark.sql.*; ^ Test.java:7: package org.apache.spark.sql.hive does not exist import org.apache.spark.sql.hive.*; Let me know what I'm doing wrong. Thanks, Jerry
SaveAsTable changes the order of rows
I have a simple RDD with Key/Value and do val partitioned = rdd.partitionBy(new HashPartitioner(400)) val row = partitioned.first I can get a key G2726 from a returned row. This first row is located on a partition #0 because G2726.hashCode is 67114000 and 67114000%400 is 0. But the order of keys is changed when I save rdd to table by doing saveAsTable. After doing this and calling sqlContext.table, a key from a first row is G265. Does DataFrame forget a parent's partitioner or Parquet format always rearranges the order of original data? In my case, the order is not important but some of users may want to keep their keys ordered. Kevin 상기 메일은 지정된 수신인만을 위한 것이며, 부정경쟁방지 및 영업비밀보호에 관한 법률,개인정보 보호법을 포함하여 관련 법령에 따라 보호의 대상이 되는 영업비밀, 산업기술,기밀정보, 개인정보 등을 포함하고 있을 수 있습니다. 본 문서에 포함된 정보의 전부 또는 일부를 무단으로 복사 또는 사용하거나 제3자에게 공개, 배포, 제공하는 것은 엄격히 금지됩니다. 본 메일이 잘못 전송된 경우 발신인 또는 당사에게 알려주시고 본 메일을 즉시 삭제하여 주시기 바랍니다. The contents of this e-mail message and any attachments are confidential and are intended solely for addressee. The information may also be legally privileged. This transmission is sent in trust, for the sole purpose of delivery to the intended recipient. If you have received this transmission in error, any use, reproduction or dissemination of this transmission is strictly prohibited. If you are not the intended recipient, please immediately notify the sender by reply e-mail or phone and delete this message and its attachments, if any.
Re: broadcast variable of Kafka producer throws ConcurrentModificationException
+1 I wish I have read this blog earlier. I am using Java and have just implemented a singleton producer per executor/JVM during the day. Yes, I did see that NonSerializableException when I was debugging the code ... Thanks for sharing. On Tue, Aug 18, 2015 at 10:59 PM, Tathagata Das t...@databricks.com wrote: Its a cool blog post! Tweeted it! Broadcasting the configuration necessary for lazily instantiating the producer is a good idea. Nitpick: The first code example has an extra `}` ;) On Tue, Aug 18, 2015 at 10:49 PM, Marcin Kuthan marcin.kut...@gmail.com wrote: As long as Kafka producent is thread-safe you don't need any pool at all. Just share single producer on every executor. Please look at my blog post for more details. http://allegro.tech/spark-kafka-integration.html 19 sie 2015 2:00 AM Shenghua(Daniel) Wan wansheng...@gmail.com napisał(a): All of you are right. I was trying to create too many producers. My idea was to create a pool(for now the pool contains only one producer) shared by all the executors. After I realized it was related to the serializable issues (though I did not find clear clues in the source code to indicate the broacast template type parameter must be implement serializable), I followed spark cassandra connector design and created a singleton of Kafka producer pools. There is not exception noticed. Thanks for all your comments. On Tue, Aug 18, 2015 at 4:28 PM, Tathagata Das t...@databricks.com wrote: Why are you even trying to broadcast a producer? A broadcast variable is some immutable piece of serializable DATA that can be used for processing on the executors. A Kafka producer is neither DATA nor immutable, and definitely not serializable. The right way to do this is to create the producer in the executors. Please see the discussion in the programming guide http://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams On Tue, Aug 18, 2015 at 3:08 PM, Cody Koeninger c...@koeninger.org wrote: I wouldn't expect a kafka producer to be serializable at all... among other things, it has a background thread On Tue, Aug 18, 2015 at 4:55 PM, Shenghua(Daniel) Wan wansheng...@gmail.com wrote: Hi, Did anyone see java.util.ConcurrentModificationException when using broadcast variables? I encountered this exception when wrapping a Kafka producer like this in the spark streaming driver. Here is what I did. KafkaProducerString, String producer = new KafkaProducerString, String(properties); final BroadcastKafkaDataProducer bCastProducer = streamingContext.sparkContext().broadcast(producer); Then within an closure called by a foreachRDD, I was trying to get the wrapped producer, i.e. KafkaProducerString, String p = bCastProducer.value(); after rebuilding and rerunning, I got the stack trace like this Exception in thread main com.esotericsoftware.kryo.KryoException: java.util.ConcurrentModificationException Serialization trace: classes (sun.misc.Launcher$AppClassLoader) classloader (java.security.ProtectionDomain) context (java.security.AccessControlContext) acc (org.apache.spark.util.MutableURLClassLoader) contextClassLoader (org.apache.kafka.common.utils.KafkaThread) ioThread (org.apache.kafka.clients.producer.KafkaProducer) producer (my driver) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at
Does spark sql support column indexing
I don't find related talk on whether spark sql supports column indexing. If it does, is there guide how to do it? Thanks.
How to automatically relaunch a Driver program after crashes?
Folks, As I see, the Driver program is a single point of failure. Now, I have seen ways as to how to make it recover from failures on a restart (using Checkpointing) but I have not seen anything as to how to restart it automatically if it crashes. Will running the Driver as a Hadoop Yarn Application do it? Can someone educate me as to how?
Re: What's the best practice for developing new features for spark ?
See this thread: http://search-hadoop.com/m/q3RTtdZv0d1btRHl/Spark+build+modulesubj=Building+Spark+Building+just+one+module+ On Aug 19, 2015, at 1:44 AM, canan chen ccn...@gmail.com wrote: I want to work on one jira, but it is not easy to do unit test, because it involves different components especially UI. spark building is pretty slow, I don't want to build it each time to test my code change. I am wondering how other people do ? Is there any experience can share ? Thanks - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark UI returning error 500 in yarn-client mode
Running Spark on YARN in yarn-client mode, everything appears to be working just fine except I can't access Spark UI via the Application Master link in YARN UI or directly at http://driverip:4040/jobs I get error 500, and the driver log shows the error pasted below: When running the same job using spark master mode, everything is fine. Any help is appreciated Error Stacktrace as appears in driver: WARN [2015-08-12 14:05:46,856] org.spark-project.jetty.server.AbstractHttpConnection: header full: java.lang.RuntimeException: Header6144 WARN [2015-08-12 14:05:46,857] org.spark-project.jetty.server.Response: Committed before 500 null WARN [2015-08-12 14:05:46,862] org.spark-project.jetty.server.AbstractHttpConnection:
Re: spark streaming 1.3 doubts(force it to not consume anything)
To correct myself - KafkaRDD[K, V, U, T, R] is subclass of RDD[R] but OptionKafkaRDD[K, V, U, T, R] is not subclass of OptionRDD[R]; In scala C[T’] is a subclass of C[T] as per https://twitter.github.io/scala_school/type-basics.html but this is not allowed in java. So is there any workaround to achieve this in java for overriding DirectKafkaInputDStream ? On Wed, Aug 19, 2015 at 12:45 AM, Shushant Arora shushantaror...@gmail.com wrote: But KafkaRDD[K, V, U, T, R] is not subclass of RDD[R] as per java generic inheritance is not supported so derived class cannot return different genric typed subclass from overriden method. On Wed, Aug 19, 2015 at 12:02 AM, Cody Koeninger c...@koeninger.org wrote: Option is covariant and KafkaRDD is a subclass of RDD On Tue, Aug 18, 2015 at 1:12 PM, Shushant Arora shushantaror...@gmail.com wrote: Is it that in scala its allowed for derived class to have any return type ? And streaming jar is originally created in scala so its allowed for DirectKafkaInputDStream to return Option[KafkaRDD[K, V, U, T, R]] compute method ? On Tue, Aug 18, 2015 at 8:36 PM, Shushant Arora shushantaror...@gmail.com wrote: looking at source code of org.apache.spark.streaming.kafka.DirectKafkaInputDStream override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] = { val untilOffsets = clamp(latestLeaderOffsets(maxRetries)) val rdd = KafkaRDD[K, V, U, T, R]( context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler) currentOffsets = untilOffsets.map(kv = kv._1 - kv._2.offset) Some(rdd) } But in DStream its def compute (validTime: Time): Option[RDD[T]] , So what should be the return type of custom DStream extends DirectKafkaInputDStream . Since I want the behaviour to be same as of DirectKafkaInputDStream in normal scenarios and return none in specific scenario. And why the same error did not come while extending DirectKafkaInputDStream from InputDStream ? Since new return type Option[KafkaRDD[K, V, U, T, R]] is not subclass of Option[RDD[T] so it should have been failed? On Tue, Aug 18, 2015 at 7:28 PM, Cody Koeninger c...@koeninger.org wrote: The superclass method in DStream is defined as returning an Option[RDD[T]] On Tue, Aug 18, 2015 at 8:07 AM, Shushant Arora shushantaror...@gmail.com wrote: Getting compilation error while overriding compute method of DirectKafkaInputDStream. [ERROR] CustomDirectKafkaInputDstream.java:[51,83] compute(org.apache.spark.streaming.Time) in CustomDirectKafkaInputDstream cannot override compute(org.apache.spark.streaming.Time) in org.apache.spark.streaming.dstream.DStream; attempting to use incompatible return type [ERROR] found : scala.Optionorg.apache.spark.streaming.kafka.KafkaRDDbyte[],byte[],kafka.serializer.DefaultDecoder,kafka.serializer.DefaultDecoder,byte[][] [ERROR] required: scala.Optionorg.apache.spark.rdd.RDDbyte[][] class : public class CustomDirectKafkaInputDstream extends DirectKafkaInputDStreambyte[], byte[], kafka.serializer.DefaultDecoder, kafka.serializer.DefaultDecoder, byte[][]{ @Override public OptionKafkaRDDbyte[], byte[], DefaultDecoder, DefaultDecoder, byte[][] compute( Time validTime) { int processed=processedCounter.value(); int failed = failedProcessingsCounter.value(); if((processed==failed)){ System.out.println(backing off since its 100 % failure); return Option.empty(); }else{ System.out.println(starting the stream ); return super.compute(validTime); } } } What should be the return type of compute method ? super class is returning OptionKafkaRDDbyte[], byte[], DefaultDecoder, DefaultDecoder, byte[][] but its expecting scala.Optionorg.apache.spark.rdd.RDDbyte[][] from derived class . Is there something wring with code? On Mon, Aug 17, 2015 at 7:08 PM, Cody Koeninger c...@koeninger.org wrote: Look at the definitions of the java-specific KafkaUtils.createDirectStream methods (the ones that take a JavaStreamingContext) On Mon, Aug 17, 2015 at 5:13 AM, Shushant Arora shushantaror...@gmail.com wrote: How to create classtag in java ?Also Constructor of DirectKafkaInputDStream takes Function1 not Function but kafkautils.createDirectStream allows function. I have below as overriden DirectKafkaInputDStream. public class CustomDirectKafkaInputDstream extends DirectKafkaInputDStreambyte[], byte[], kafka.serializer.DefaultDecoder, kafka.serializer.DefaultDecoder, byte[][]{ public CustomDirectKafkaInputDstream( StreamingContext ssc_, MapString, String kafkaParams, MapTopicAndPartition, Object fromOffsets, Function1MessageAndMetadatabyte[], byte[], byte[][] messageHandler, ClassTagbyte[] evidence$1, ClassTagbyte[] evidence$2, ClassTagDefaultDecoder evidence$3, ClassTagDefaultDecoder evidence$4, ClassTagbyte[][] evidence$5) { super(ssc_, kafkaParams, fromOffsets, messageHandler, evidence$1, evidence$2, evidence$3, evidence$4,
Re: Spark Streaming failing on YARN Cluster
Thanks a lot for your suggestion. I had modified HADOOP_CONF_DIR in spark-env.sh so that core-site.xml is under HADOOP_CONF_DIR. i can able to see the logs like that you had shown above. Now i can able to run for 3 minutes and store results between every minutes. After sometimes, there is an exception. How to fix this exception ? and Can you please explain where its going wrong ? *Log Link : http://pastebin.com/xL9jaRUa http://pastebin.com/xL9jaRUa * *Thanks*, https://in.linkedin.com/in/ramkumarcs31 On Wed, Aug 19, 2015 at 1:54 PM, Jeff Zhang zjf...@gmail.com wrote: HADOOP_CONF_DIR is the environment variable point to the hadoop conf directory. Not sure how CDH organize that, make sure core-site.xml is under HADOOP_CONF_DIR. On Wed, Aug 19, 2015 at 4:06 PM, Ramkumar V ramkumar.c...@gmail.com wrote: We are using Cloudera-5.3.1. since it is one of the earlier version of CDH, it doesnt supports the latest version of spark. So i installed spark-1.4.1 separately in my machine. I couldnt able to do spark-submit in cluster mode. How to core-site.xml under classpath ? it will be very helpful if you could explain in detail to solve this issue. *Thanks*, https://in.linkedin.com/in/ramkumarcs31 On Fri, Aug 14, 2015 at 8:25 AM, Jeff Zhang zjf...@gmail.com wrote: 1. 15/08/12 13:24:49 INFO Client: Source and destination file systems are the same. Not copying file:/home/hdfs/spark-1.4.1/assembly/target/scala-2.10/spark-assembly-1.4.1-hadoop2.5.0-cdh5.3.5.jar 2. 15/08/12 13:24:49 INFO Client: Source and destination file systems are the same. Not copying file:/home/hdfs/spark-1.4.1/external/kafka-assembly/target/spark-streaming-kafka-assembly_2.10-1.4.1.jar 3. 15/08/12 13:24:49 INFO Client: Source and destination file systems are the same. Not copying file:/home/hdfs/spark-1.4.1/python/lib/pyspark.zip 4. 15/08/12 13:24:49 INFO Client: Source and destination file systems are the same. Not copying file:/home/hdfs/spark-1.4.1/python/lib/py4j-0.8.2.1-src.zip 5. 15/08/12 13:24:49 INFO Client: Source and destination file systems are the same. Not copying file:/home/hdfs/spark-1.4.1/examples/src/main/python/streaming/kyt.py 6. 1. diagnostics: Application application_1437639737006_3808 failed 2 times due to AM Container for appattempt_1437639737006_3808_02 exited with exitCode: -1000 due to: File file:/home/hdfs/spark-1.4.1/python/lib/pyspark.zip does not exist 2. .Failing this attempt.. Failing the application. The machine you run spark is the client machine, while the yarn AM is running on another machine. And the yarn AM complains that the files are not found as your logs shown. From the logs, its seems that these files are not copied to the HDFS as local resources. I doubt that you didn't put core-site.xml under your classpath, so that spark can not detect your remote file system and won't copy the files to hdfs as local resources. Usually in yarn-cluster mode, you should be able to see the logs like following. 15/08/14 10:48:49 INFO yarn.Client: Preparing resources for our AM container 15/08/14 10:48:49 INFO yarn.Client: Uploading resource file:/Users/abc/github/spark/assembly/target/scala-2.10/spark-assembly-1.5.0-SNAPSHOT-hadoop2.6.0.jar - hdfs:// 0.0.0.0:9000/user/abc/.sparkStaging/application_1439432662178_0019/spark-assembly-1.5.0-SNAPSHOT-hadoop2.6.0.jar 15/08/14 10:48:50 INFO yarn.Client: Uploading resource file:/Users/abc/github/spark/spark.py - hdfs:// 0.0.0.0:9000/user/abc/.sparkStaging/application_1439432662178_0019/spark.py 15/08/14 10:48:50 INFO yarn.Client: Uploading resource file:/Users/abc/github/spark/python/lib/pyspark.zip - hdfs:// 0.0.0.0:9000/user/abc/.sparkStaging/application_1439432662178_0019/pyspark.zip On Thu, Aug 13, 2015 at 2:50 PM, Ramkumar V ramkumar.c...@gmail.com wrote: Hi, I have a cluster of 1 master and 2 slaves. I'm running a spark streaming in master and I want to utilize all nodes in my cluster. i had specified some parameters like driver memory and executor memory in my code. when i give --deploy-mode cluster --master yarn-cluster in my spark-submit, it gives the following error. Log link : *http://pastebin.com/kfyVWDGR http://pastebin.com/kfyVWDGR* How to fix this issue ? Please help me if i'm doing wrong. *Thanks*, Ramkumar V -- Best Regards Jeff Zhang -- Best Regards Jeff Zhang
回复:Does spark sql support column indexing
The answer is simply NO, But I hope someone could give more deep insight or any meaningful reference 在2015年08月19日 15:21,Todd 写道: I don't find related talk on whether spark sql supports column indexing. If it does, is there guide how to do it? Thanks.
Re: Spark Streaming failing on YARN Cluster
We are using Cloudera-5.3.1. since it is one of the earlier version of CDH, it doesnt supports the latest version of spark. So i installed spark-1.4.1 separately in my machine. I couldnt able to do spark-submit in cluster mode. How to core-site.xml under classpath ? it will be very helpful if you could explain in detail to solve this issue. *Thanks*, https://in.linkedin.com/in/ramkumarcs31 On Fri, Aug 14, 2015 at 8:25 AM, Jeff Zhang zjf...@gmail.com wrote: 1. 15/08/12 13:24:49 INFO Client: Source and destination file systems are the same. Not copying file:/home/hdfs/spark-1.4.1/assembly/target/scala-2.10/spark-assembly-1.4.1-hadoop2.5.0-cdh5.3.5.jar 2. 15/08/12 13:24:49 INFO Client: Source and destination file systems are the same. Not copying file:/home/hdfs/spark-1.4.1/external/kafka-assembly/target/spark-streaming-kafka-assembly_2.10-1.4.1.jar 3. 15/08/12 13:24:49 INFO Client: Source and destination file systems are the same. Not copying file:/home/hdfs/spark-1.4.1/python/lib/pyspark.zip 4. 15/08/12 13:24:49 INFO Client: Source and destination file systems are the same. Not copying file:/home/hdfs/spark-1.4.1/python/lib/py4j-0.8.2.1-src.zip 5. 15/08/12 13:24:49 INFO Client: Source and destination file systems are the same. Not copying file:/home/hdfs/spark-1.4.1/examples/src/main/python/streaming/kyt.py 6. 1. diagnostics: Application application_1437639737006_3808 failed 2 times due to AM Container for appattempt_1437639737006_3808_02 exited with exitCode: -1000 due to: File file:/home/hdfs/spark-1.4.1/python/lib/pyspark.zip does not exist 2. .Failing this attempt.. Failing the application. The machine you run spark is the client machine, while the yarn AM is running on another machine. And the yarn AM complains that the files are not found as your logs shown. From the logs, its seems that these files are not copied to the HDFS as local resources. I doubt that you didn't put core-site.xml under your classpath, so that spark can not detect your remote file system and won't copy the files to hdfs as local resources. Usually in yarn-cluster mode, you should be able to see the logs like following. 15/08/14 10:48:49 INFO yarn.Client: Preparing resources for our AM container 15/08/14 10:48:49 INFO yarn.Client: Uploading resource file:/Users/abc/github/spark/assembly/target/scala-2.10/spark-assembly-1.5.0-SNAPSHOT-hadoop2.6.0.jar - hdfs:// 0.0.0.0:9000/user/abc/.sparkStaging/application_1439432662178_0019/spark-assembly-1.5.0-SNAPSHOT-hadoop2.6.0.jar 15/08/14 10:48:50 INFO yarn.Client: Uploading resource file:/Users/abc/github/spark/spark.py - hdfs:// 0.0.0.0:9000/user/abc/.sparkStaging/application_1439432662178_0019/spark.py 15/08/14 10:48:50 INFO yarn.Client: Uploading resource file:/Users/abc/github/spark/python/lib/pyspark.zip - hdfs:// 0.0.0.0:9000/user/abc/.sparkStaging/application_1439432662178_0019/pyspark.zip On Thu, Aug 13, 2015 at 2:50 PM, Ramkumar V ramkumar.c...@gmail.com wrote: Hi, I have a cluster of 1 master and 2 slaves. I'm running a spark streaming in master and I want to utilize all nodes in my cluster. i had specified some parameters like driver memory and executor memory in my code. when i give --deploy-mode cluster --master yarn-cluster in my spark-submit, it gives the following error. Log link : *http://pastebin.com/kfyVWDGR http://pastebin.com/kfyVWDGR* How to fix this issue ? Please help me if i'm doing wrong. *Thanks*, Ramkumar V -- Best Regards Jeff Zhang
What's the best practice for developing new features for spark ?
I want to work on one jira, but it is not easy to do unit test, because it involves different components especially UI. spark building is pretty slow, I don't want to build it each time to test my code change. I am wondering how other people do ? Is there any experience can share ? Thanks
Re: Programmatically create SparkContext on YARN
Hi Andrew, Thanks a lot for your response. I am aware of the '--master' flag in the spark-submit command. However I would like to create the SparkContext inside my coding. Maybe I should elaborate a little bit further: I would like to reuse e.g. the result of any Spark computation inside my code. Here is the SparkPi example: String[] jars = new String[1]; jars[0] = System.getProperty(user.dir) + /target/SparkPi-1.0-SNAPSHOT.jar; SparkConf conf = new SparkConf() .setAppName(JavaSparkPi) .setMaster(spark://SPARK_HOST:7077) .setJars(jars); JavaSparkContext sc = new JavaSparkContext(conf); int slices = (args.length == 1) ? Integer.parseInt(args[0]) : 2; int n = 100 * slices; ListInteger l = new ArrayListInteger(n); for (int i = 0; i n; i++) { l.add(i); } JavaRDDInteger dataSet = sc.parallelize(l, slices); int *count* = dataSet.map(new FunctionInteger, Integer() { @Override public Integer call(Integer integer) { double x = Math.random() * 2 - 1; double y = Math.random() * 2 - 1; return (x * x + y * y 1) ? 1 : 0; } }).reduce(new Function2Integer, Integer, Integer() { @Override public Integer call(Integer integer, Integer integer2) { return integer + integer2; } }); System.out.println(Pi is roughly + 4.0 * *count* / n); sc.stop(); As you can see, I can reuse the result (count) in my coding directly. So my goal would be to resuse this kind of implementation in YARN mode (client/cluster mode). However, I didn't really find a solution how to do that, since I always have to submit my Spark code via spark-submit. What if I want to run this code as part of a web application which renders the result as web page? -- Andreas On Tue, Aug 18, 2015 at 10:50 PM, Andrew Or and...@databricks.com wrote: Hi Andreas, I believe the distinction is not between standalone and YARN mode, but between client and cluster mode. In client mode, your Spark submit JVM runs your driver code. In cluster mode, one of the workers (or NodeManagers if you're using YARN) in the cluster runs your driver code. In the latter case, it doesn't really make sense to call `setMaster` in your driver because Spark needs to know which cluster you're submitting the application to. Instead, the recommended way is to set the master through the `--master` flag in the command line, e.g. $ bin/spark-submit --master spark://1.2.3.4:7077 --class some.user.Clazz --name My app name --jars lib1.jar,lib2.jar --deploy-mode cluster app.jar Both YARN and standalone modes support client and cluster modes, and the spark-submit script is the common interface through which you can launch your application. In other words, you shouldn't have to do anything more than providing a different value to `--master` to use YARN. -Andrew 2015-08-17 0:34 GMT-07:00 Andreas Fritzler andreas.fritz...@gmail.com: Hi all, when runnig the Spark cluster in standalone mode I am able to create the Spark context from Java via the following code snippet: SparkConf conf = new SparkConf() .setAppName(MySparkApp) .setMaster(spark://SPARK_MASTER:7077) .setJars(jars); JavaSparkContext sc = new JavaSparkContext(conf); As soon as I'm done with my processing, I can just close it via sc.stop(); Now my question: Is the same also possible when running Spark on YARN? I currently don't see how this should be possible without submitting your application as a packaged jar file. Is there a way to get this kind of interactivity from within your Scala/Java code? Regards, Andrea
Re: How to automatically relaunch a Driver program after crashes?
Thanks for the reply. Are Standalone or Mesos the only options? Is there a way to auto relaunch if driver runs as a Hadoop Yarn Application? On Wednesday, 19 August 2015 12:49 PM, Todd bit1...@163.com wrote: There is an option for the spark-submit (Spark standalone or Mesos with cluster deploy mode only) --supervise If given, restarts the driver on failure. At 2015-08-19 14:55:39, Spark Enthusiast sparkenthusi...@yahoo.in wrote: Folks, As I see, the Driver program is a single point of failure. Now, I have seen ways as to how to make it recover from failures on a restart (using Checkpointing) but I have not seen anything as to how to restart it automatically if it crashes. Will running the Driver as a Hadoop Yarn Application do it? Can someone educate me as to how?
Re: Spark Sql behaves strangely with tables with a lot of partitions
I guess the question is why does spark have to do partition discovery with all partitions when the query only needs to look at one partition? Is there a conf flag to turn this off? On Wed, Aug 19, 2015 at 9:02 PM, Philip Weaver philip.wea...@gmail.com wrote: I've had the same problem. It turns out that Spark (specifically parquet) is very slow at partition discovery. It got better in 1.5 (not yet released), but was still unacceptably slow. Sadly, we ended up reading parquet files manually in Python (via C++) and had to abandon Spark SQL because of this problem. On Wed, Aug 19, 2015 at 7:51 PM, Jerrick Hoang jerrickho...@gmail.com wrote: Hi all, I did a simple experiment with Spark SQL. I created a partitioned parquet table with only one partition (date=20140701). A simple `select count(*) from table where date=20140701` would run very fast (0.1 seconds). However, as I added more partitions the query takes longer and longer. When I added about 10,000 partitions, the query took way too long. I feel like querying for a single partition should not be affected by having more partitions. Is this a known behaviour? What does spark try to do here? Thanks, Jerrick
Spark Sql behaves strangely with tables with a lot of partitions
Hi all, I did a simple experiment with Spark SQL. I created a partitioned parquet table with only one partition (date=20140701). A simple `select count(*) from table where date=20140701` would run very fast (0.1 seconds). However, as I added more partitions the query takes longer and longer. When I added about 10,000 partitions, the query took way too long. I feel like querying for a single partition should not be affected by having more partitions. Is this a known behaviour? What does spark try to do here? Thanks, Jerrick
Re: Spark Sql behaves strangely with tables with a lot of partitions
I've had the same problem. It turns out that Spark (specifically parquet) is very slow at partition discovery. It got better in 1.5 (not yet released), but was still unacceptably slow. Sadly, we ended up reading parquet files manually in Python (via C++) and had to abandon Spark SQL because of this problem. On Wed, Aug 19, 2015 at 7:51 PM, Jerrick Hoang jerrickho...@gmail.com wrote: Hi all, I did a simple experiment with Spark SQL. I created a partitioned parquet table with only one partition (date=20140701). A simple `select count(*) from table where date=20140701` would run very fast (0.1 seconds). However, as I added more partitions the query takes longer and longer. When I added about 10,000 partitions, the query took way too long. I feel like querying for a single partition should not be affected by having more partitions. Is this a known behaviour? What does spark try to do here? Thanks, Jerrick