Re: Change for submitting to yarn in 1.3.1
That works when it is launched from same process - which is unfortunately not our case :-) - Mridul On Sun, May 10, 2015 at 9:05 PM, Manku Timma manku.tim...@gmail.com wrote: sc.applicationId gives the yarn appid. On 11 May 2015 at 08:13, Mridul Muralidharan mri...@gmail.com wrote: We had a similar requirement, and as a stopgap, I currently use a suboptimal impl specific workaround - parsing it out of the stdout/stderr (based on log config). A better means to get to this is indeed required ! Regards, Mridul On Sun, May 10, 2015 at 7:33 PM, Ron's Yahoo! zlgonza...@yahoo.com.invalid wrote: Hi, I used to submit my Spark yarn applications by using org.apache.spark.yarn.deploy.Client api so I can get the application id after I submit it. The following is the code that I have, but after upgrading to 1.3.1, the yarn Client class was made into a private class. Is there a particular reason why this Client class was made private? I know that there’s a new SparkSubmit object that can be used, but it’s not clear to me how I can use it to get the application id after submitting to the cluster. Thoughts? Thanks, Ron class SparkLauncherServiceImpl extends SparkLauncherService { override def runApp(conf: Configuration, appName: String, queue: String): ApplicationId = { val ws = SparkLauncherServiceImpl.getWorkspace() val params = Array(--class, // com.xyz.sparkdb.service.impl.AssemblyServiceImpl, // --name, appName, // --queue, queue, // --driver-memory, 1024m, // --addJars, getListOfDependencyJars(s$ws/ledp/le-sparkdb/target/dependency), // --jar, sfile:$ws/ledp/le-sparkdb/target/le-sparkdb-1.0.3-SNAPSHOT.jar) System.setProperty(SPARK_YARN_MODE, true) System.setProperty(spark.driver.extraJavaOptions, -XX:PermSize=128m -XX:MaxPermSize=128m -Dsun.io.serialization.extendedDebugInfo=true) val sparkConf = new SparkConf() val args = new ClientArguments(params, sparkConf) new Client(args, conf, sparkConf).runApp() } private def getListOfDependencyJars(baseDir: String): String = { val files = new File(baseDir).listFiles().filter(!_.getName().startsWith(spark-assembly)) val prependedFiles = files.map(x = file: + x.getAbsolutePath()) val result = ((prependedFiles.tail.foldLeft(new StringBuilder(prependedFiles.head))) {(acc, e) = acc.append(, ).append(e)}).toString() result } } - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: PySpark DataFrame: Preserving nesting when selecting a nested field
In 1.4, you can use struct function to create a struct, e.g. you can explicitly select out the version column, and then create a new struct named settings. The current semantics of select basically follows closely relational database's SQL, which is well understood and defined. I wouldn't add any magic to select to for nested data, because it is not very well researched defined and we might get into conflicting scenarios. On Sat, May 9, 2015 at 1:10 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Take a look: df = sqlContext.jsonRDD(sc.parallelize(['{settings: {os: OS X, version: 10.10}}'])) df.printSchema() root |-- settings: struct (nullable = true) ||-- os: string (nullable = true) ||-- version: string (nullable = true) # Now I want to drop the version column by # selecting everything else. # I want to preserve the schema otherwise. # That means `os` should stayed nested under # `settings`. df.select('settings.os').printSchema() root |-- os: string (nullable = true) df.select('settings', 'settings.os').printSchema() root |-- settings: struct (nullable = true) ||-- os: string (nullable = true) ||-- version: string (nullable = true) |-- os: string (nullable = true) df.select(df['settings.os'].alias('settings.os')).printSchema() root |-- settings.os: string (nullable = true) In all cases, selecting a nested field loses the original nesting of that field. What I want is to select settings.os and get back a DataFrame with the following schema: root |-- settings: struct (nullable = true) ||-- os: string (nullable = true) In other words, I want to preserve the fact that os is nested under settings. I’m doing this as a work-around for the fact that PySpark does not currently support dropping columns, Until direct support for such a feature lands as part of SPARK-7509 https://issues.apache.org/jira/browse/SPARK-7509, selecting all columns but the ones you want to drop seems way better than directly manipulating the schema (which is the hackier and way more complex alternative for rolling your own “drop” logic). And you want that process to preserve the schema as much as possible, which I assume is how a native “drop column” method would work. Is it possible though? Or do we have to do direct schema manipulation? Nick
Re: large volume spark job spends most of the time in AppendOnlyMap.changeValue
This is the stack trace of the worker thread: org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:150) org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32) org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130) org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:60) org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:46) org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) org.apache.spark.rdd.RDD.iterator(RDD.scala:244) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) org.apache.spark.rdd.RDD.iterator(RDD.scala:244) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) org.apache.spark.rdd.RDD.iterator(RDD.scala:244) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70) org.apache.spark.rdd.RDD.iterator(RDD.scala:242) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) org.apache.spark.scheduler.Task.run(Task.scala:64) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) On 8 May 2015 at 22:12, Josh Rosen rosenvi...@gmail.com wrote: Do you have any more specific profiling data that you can share? I'm curious to know where AppendOnlyMap.changeValue is being called from. On Fri, May 8, 2015 at 1:26 PM, Michal Haris michal.ha...@visualdna.com wrote: +dev On 6 May 2015 10:45, Michal Haris michal.ha...@visualdna.com wrote: Just wanted to check if somebody has seen similar behaviour or knows what we might be doing wrong. We have a relatively complex spark application which processes half a terabyte of data at various stages. We have profiled it in several ways and everything seems to point to one place where 90% of the time is spent: AppendOnlyMap.changeValue. The job scales and is relatively faster than its map-reduce alternative but it still feels slower than it should be. I am suspecting too much spill but I haven't seen any improvement by increasing number of partitions to 10k. Any idea would be appreciated. -- Michal Haris Technical Architect direct line: +44 (0) 207 749 0229 www.visualdna.com | t: +44 (0) 207 734 7033, -- Michal Haris Technical Architect direct line: +44 (0) 207 749 0229 www.visualdna.com | t: +44 (0) 207 734 7033,
Re: YARN mode startup takes too long (10+ secs)
Isn't this issue something that should be improved? Based on the following discussion, there are two places were YARN's heartbeat interval is respected on job start-up, but do we really need to respect it on start-up? On Fri, May 8, 2015 at 12:14 PM Taeyun Kim taeyun@innowireless.com wrote: I think so. In fact, the flow is: allocator.allocateResources() - sleep - allocator.allocateResources() - sleep … But I guess that on the first allocateResources() the allocation is not fulfilled. So sleep occurs. *From:* Zoltán Zvara [mailto:zoltan.zv...@gmail.com] *Sent:* Friday, May 08, 2015 4:25 PM *To:* Taeyun Kim; u...@spark.apache.org *Subject:* Re: YARN mode startup takes too long (10+ secs) So is this sleep occurs before allocating resources for the first few executors to start the job? On Fri, May 8, 2015 at 6:23 AM Taeyun Kim taeyun@innowireless.com wrote: I think I’ve found the (maybe partial, but major) reason. It’s between the following lines, (it’s newly captured, but essentially the same place that Zoltán Zvara picked: 15/05/08 11:36:32 INFO BlockManagerMaster: Registered BlockManager 15/05/08 11:36:38 INFO YarnClientSchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@cluster04:55237/user/Executor#-149550753] with ID 1 When I read the logs on cluster side, the following lines were found: (the exact time is different with above line, but it’s the difference between machines) 15/05/08 11:36:23 INFO yarn.ApplicationMaster: Started progress reporter thread - sleep time : 5000 15/05/08 11:36:28 INFO impl.AMRMClientImpl: Received new token for : cluster04:45454 It seemed that Spark deliberately sleeps 5 secs. I’ve read the Spark source code, and in org.apache.spark.deploy.yarn.ApplicationMaster.scala, launchReporterThread() had the code for that. It loops calling allocator.allocateResources() and Thread.sleep(). For sleep, it reads the configuration variable spark.yarn.scheduler.heartbeat.interval-ms (the default value is 5000, which is 5 secs). According to the comment, “we want to be reasonably responsive without causing too many requests to RM”. So, unless YARN immediately fulfill the allocation request, it seems that 5 secs will be wasted. When I modified the configuration variable to 1000, it only waited for 1 sec. Here is the log lines after the change: 15/05/08 11:47:21 INFO yarn.ApplicationMaster: Started progress reporter thread - sleep time : 1000 15/05/08 11:47:22 INFO impl.AMRMClientImpl: Received new token for : cluster04:45454 4 secs saved. So, when one does not want to wait 5 secs, one can change the spark.yarn.scheduler.heartbeat.interval-ms. I hope that the additional overhead it incurs would be negligible. *From:* Zoltán Zvara [mailto:zoltan.zv...@gmail.com] *Sent:* Thursday, May 07, 2015 10:05 PM *To:* Taeyun Kim; u...@spark.apache.org *Subject:* Re: YARN mode startup takes too long (10+ secs) Without considering everything, just a few hints: You are running on YARN. From 09:18:34 to 09:18:37 your application is in state ACCEPTED. There is a noticeable overhead introduced due to communicating with YARN's ResourceManager, NodeManager and given that the YARN scheduler needs time to make a decision. I guess somewhere from 09:18:38 to 09:18:43 your application JAR gets copied to another container requested by the Spark ApplicationMaster deployed on YARN's container 0. Deploying an executor needs further resource negotiations with the ResourceManager usually. Also, as I said, your JAR and Executor's code requires copying to the container's local directory - execution blocked until that is complete. On Thu, May 7, 2015 at 3:09 AM Taeyun Kim taeyun@innowireless.com wrote: Hi, I’m running a spark application with YARN-client or YARN-cluster mode. But it seems to take too long to startup. It takes 10+ seconds to initialize the spark context. Is this normal? Or can it be optimized? The environment is as follows: - Hadoop: Hortonworks HDP 2.2 (Hadoop 2.6) - Spark: 1.3.1 - Client: Windows 7, but similar result on CentOS 6.6 The following is the startup part of the application log. (Some private information was edited) ‘Main: Initializing context’ at the first line and ‘MainProcessor: Deleting previous output files’ at the last line are the logs by the application. Others in between are from Spark itself. Application logic is executed after this log is displayed. --- 15/05/07 09:18:31 INFO Main: Initializing context 15/05/07 09:18:31 INFO SparkContext: Running Spark version 1.3.1 15/05/07 09:18:31 INFO SecurityManager: Changing view acls to: myuser,myapp 15/05/07 09:18:31 INFO SecurityManager: Changing modify acls to: myuser,myapp 15/05/07 09:18:31 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(myuser, myapp); users with modify
Re: Intellij Spark Source Compilation
Hi, Thanks Iulian. Yeah, I was kind of anticipating I could just ignore old-deps ultimately. However, Even after doing a clean and build all, I get the following still: Description LocationResourcePathType not found: type EventBatch line 72 SparkAvroCallbackHandler.scala /spark-streaming-flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink Scala Problem not found: type EventBatch line 87 SparkAvroCallbackHandler.scala /spark-streaming-flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink Scala Problem not found: type EventBatch line 25 SparkSinkUtils.scala /spark-streaming-flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink Scala Problem not found: type EventBatch line 48 TransactionProcessor.scala /spark-streaming-flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink Scala Problem not found: type EventBatch line 48 TransactionProcessor.scala /spark-streaming-flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink Scala Problem not found: type EventBatch line 80 TransactionProcessor.scala /spark-streaming-flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink Scala Problem not found: type EventBatch line 146TransactionProcessor.scala /spark-streaming-flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink Scala Problem not found: type SparkFlumeProtocol line 46 SparkAvroCallbackHandler.scala /spark-streaming-flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink Scala Problem not found: type SparkFlumeProtocol line 86 SparkSink.scala /spark-streaming-flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink Scala Problem not found: type SparkSinkEvent line 115TransactionProcessor.scala /spark-streaming-flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink Scala Problem not found: value SparkFlumeProtocol line 185SparkSinkSuite.scala /spark-streaming-flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink Scala Problem not found: value SparkFlumeProtocol line 194SparkSinkSuite.scala /spark-streaming-flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink Scala Problem Project not built due to errors in dependent project(s) spark-streaming-flume-sink Unknown spark-streaming-flume Scala Problem value ack is not a member of Anyline 52 SparkSinkSuite.scala /spark-streaming-flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink Scala Problem value getEventBatch is not a member of Any line 51 SparkSinkSuite.scala /spark-streaming-flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink Scala Problem value getEventBatch is not a member of Any line 71 SparkSinkSuite.scala /spark-streaming-flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink Scala Problem value getEventBatch is not a member of Any line 91 SparkSinkSuite.scala /spark-streaming-flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink Scala Problem value nack is not a member of Any line 73 SparkSinkSuite.scala /spark-streaming-flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink Scala Problem I'd mention that the EventBatch and SparkFlumeProtocol were very similar to what initially occurred for me in intellij, until I clicked the Generate Sources and Update Folders For All Projects button in the Maven Projects tool window that the wiki suggests suggests doing. -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Intellij-Spark-Source-Compilation-tp12168p12195.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: Intellij Spark Source Compilation
Oh, I see. So then try to run one build on the command time firs (or try sbt avro:generate, though I’m not sure it’s enough). I just noticed that I have an additional source folder target/scala-2.10/src_managed/main/compiled_avro for spark-streaming-flume-sink. I guess I built the project once and that’s why I never saw these errors.. Once we get it to work, let me know so I can update the wiki guide. thanks, iulian On Mon, May 11, 2015 at 4:20 PM, rtimp dolethebobdol...@gmail.com wrote: Hi, Thanks Iulian. Yeah, I was kind of anticipating I could just ignore old-deps ultimately. However, Even after doing a clean and build all, I get the following still: Description LocationResourcePathType not found: type EventBatch line 72 SparkAvroCallbackHandler.scala /spark-streaming-flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink Scala Problem not found: type EventBatch line 87 SparkAvroCallbackHandler.scala /spark-streaming-flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink Scala Problem not found: type EventBatch line 25 SparkSinkUtils.scala /spark-streaming-flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink Scala Problem not found: type EventBatch line 48 TransactionProcessor.scala /spark-streaming-flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink Scala Problem not found: type EventBatch line 48 TransactionProcessor.scala /spark-streaming-flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink Scala Problem not found: type EventBatch line 80 TransactionProcessor.scala /spark-streaming-flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink Scala Problem not found: type EventBatch line 146TransactionProcessor.scala /spark-streaming-flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink Scala Problem not found: type SparkFlumeProtocol line 46 SparkAvroCallbackHandler.scala /spark-streaming-flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink Scala Problem not found: type SparkFlumeProtocol line 86 SparkSink.scala /spark-streaming-flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink Scala Problem not found: type SparkSinkEvent line 115TransactionProcessor.scala /spark-streaming-flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink Scala Problem not found: value SparkFlumeProtocol line 185 SparkSinkSuite.scala /spark-streaming-flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink Scala Problem not found: value SparkFlumeProtocol line 194 SparkSinkSuite.scala /spark-streaming-flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink Scala Problem Project not built due to errors in dependent project(s) spark-streaming-flume-sink Unknown spark-streaming-flume Scala Problem value ack is not a member of Anyline 52 SparkSinkSuite.scala /spark-streaming-flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink Scala Problem value getEventBatch is not a member of Any line 51 SparkSinkSuite.scala /spark-streaming-flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink Scala Problem value getEventBatch is not a member of Any line 71 SparkSinkSuite.scala /spark-streaming-flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink Scala Problem value getEventBatch is not a member of Any line 91 SparkSinkSuite.scala /spark-streaming-flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink Scala Problem value nack is not a member of Any line 73 SparkSinkSuite.scala /spark-streaming-flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink Scala Problem I'd mention that the EventBatch and SparkFlumeProtocol were very similar to what initially occurred for me in intellij, until I clicked the Generate Sources and Update Folders For All Projects button in the Maven Projects tool window that the wiki suggests suggests doing. -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Intellij-Spark-Source-Compilation-tp12168p12195.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org -- -- Iulian Dragos -- Reactive Apps on the JVM www.typesafe.com
RE: DataFrame distinct vs RDD distinct
The following worked for me as a workaround for distinct: val pf = sqlContext.parquetFile(hdfs://file) val distinctValuesOfColumn4 = pf.rdd.aggregate[scala.collection.mutable.HashSet[String]](new scala.collection.mutable.HashSet[String]())( (s, v) = s += v.getString(4), (s1, s2) = s1 ++= s2) Best regards, Alexander -Original Message- From: Ulanov, Alexander Sent: Monday, May 11, 2015 11:59 AM To: Olivier Girardot; Michael Armbrust Cc: Reynold Xin; dev@spark.apache.org Subject: RE: DataFrame distinct vs RDD distinct Hi, Could you suggest alternative way of implementing distinct, e.g. via fold or aggregate? Both SQL distinct and RDD distinct fail on my dataset due to overflow of Spark shuffle disk. I have 7 nodes with 300GB dedicated to Spark shuffle each. My dataset is 2B rows, the field which I'm performing distinct has 23 distinct values. Best regards, Alexander -Original Message- From: Olivier Girardot [mailto:o.girar...@lateral-thoughts.com] Sent: Friday, May 08, 2015 12:50 PM To: Michael Armbrust; Olivier Girardot Cc: Reynold Xin; dev@spark.apache.org Subject: Re: DataFrame distinct vs RDD distinct I'll try to reproduce what has been reported to me first :) and I'll let you know. Thanks ! Le jeu. 7 mai 2015 à 21:16, Michael Armbrust mich...@databricks.com a écrit : I'd happily merge a PR that changes the distinct implementation to be more like Spark core, assuming it includes benchmarks that show better performance for both the fits in memory case and the too big for memory case. On Thu, May 7, 2015 at 2:23 AM, Olivier Girardot o.girar...@lateral-thoughts.com wrote: Ok, but for the moment, this seems to be killing performances on some computations... I'll try to give you precise figures on this between rdd and dataframe. Olivier. Le jeu. 7 mai 2015 à 10:08, Reynold Xin r...@databricks.com a écrit : In 1.5, we will most likely just rewrite distinct in SQL to either use the Aggregate operator which will benefit from all the Tungsten optimizations, or have a Tungsten version of distinct for SQL/DataFrame. On Thu, May 7, 2015 at 1:32 AM, Olivier Girardot o.girar...@lateral-thoughts.com wrote: Hi everyone, there seems to be different implementations of the distinct feature in DataFrames and RDD and some performance issue with the DataFrame distinct API. In RDD.scala : def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { map(x = (x, null)).reduceByKey((x, y) = x, numPartitions).map(_._1) } And in DataFrame : case class Distinct(partial: Boolean, child: SparkPlan) extends UnaryNode { override def output: Seq[Attribute] = child.output override def requiredChildDistribution: Seq[Distribution] = if (partial) UnspecifiedDistribution :: Nil else ClusteredDistribution(child.output) :: Nil *override def execute(): RDD[Row] = {** child.execute().mapPartitions { iter =** val hashSet = new scala.collection.mutable.HashSet[Row]()* * var currentRow: Row = null** while (iter.hasNext) {** currentRow = iter.next()** if (!hashSet.contains(currentRow)) {** hashSet.add(currentRow.copy())** }** }* * hashSet.iterator** }** }*} I can try to reproduce more clearly the performance issue, but do you have any insights into why we can't have the same distinct strategy between DataFrame and RDD ? Regards, Olivier. B CB [ X ܚX KK[XZ[ ] ][ X ܚX P \ ˘\X K ܙ B ܈Y][ۘ[ [X[ K[XZ[ ] Z[ \ ˘\X K ܙ B - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
[SPARK-7400] PortableDataStream UDT
Hello, I'm working on SPARK-7400 for DataFrame support for PortableDataStream, i.e. the data type associated with the RDD from sc.binaryFiles(...). Assuming a patch is available soon, what is the likelihood of inclusion in Spark 1.4? Thanks
RE: DataFrame distinct vs RDD distinct
Hi, Could you suggest alternative way of implementing distinct, e.g. via fold or aggregate? Both SQL distinct and RDD distinct fail on my dataset due to overflow of Spark shuffle disk. I have 7 nodes with 300GB dedicated to Spark shuffle each. My dataset is 2B rows, the field which I'm performing distinct has 23 distinct values. Best regards, Alexander -Original Message- From: Olivier Girardot [mailto:o.girar...@lateral-thoughts.com] Sent: Friday, May 08, 2015 12:50 PM To: Michael Armbrust; Olivier Girardot Cc: Reynold Xin; dev@spark.apache.org Subject: Re: DataFrame distinct vs RDD distinct I'll try to reproduce what has been reported to me first :) and I'll let you know. Thanks ! Le jeu. 7 mai 2015 à 21:16, Michael Armbrust mich...@databricks.com a écrit : I'd happily merge a PR that changes the distinct implementation to be more like Spark core, assuming it includes benchmarks that show better performance for both the fits in memory case and the too big for memory case. On Thu, May 7, 2015 at 2:23 AM, Olivier Girardot o.girar...@lateral-thoughts.com wrote: Ok, but for the moment, this seems to be killing performances on some computations... I'll try to give you precise figures on this between rdd and dataframe. Olivier. Le jeu. 7 mai 2015 à 10:08, Reynold Xin r...@databricks.com a écrit : In 1.5, we will most likely just rewrite distinct in SQL to either use the Aggregate operator which will benefit from all the Tungsten optimizations, or have a Tungsten version of distinct for SQL/DataFrame. On Thu, May 7, 2015 at 1:32 AM, Olivier Girardot o.girar...@lateral-thoughts.com wrote: Hi everyone, there seems to be different implementations of the distinct feature in DataFrames and RDD and some performance issue with the DataFrame distinct API. In RDD.scala : def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { map(x = (x, null)).reduceByKey((x, y) = x, numPartitions).map(_._1) } And in DataFrame : case class Distinct(partial: Boolean, child: SparkPlan) extends UnaryNode { override def output: Seq[Attribute] = child.output override def requiredChildDistribution: Seq[Distribution] = if (partial) UnspecifiedDistribution :: Nil else ClusteredDistribution(child.output) :: Nil *override def execute(): RDD[Row] = {** child.execute().mapPartitions { iter =** val hashSet = new scala.collection.mutable.HashSet[Row]()* * var currentRow: Row = null** while (iter.hasNext) {** currentRow = iter.next()** if (!hashSet.contains(currentRow)) {** hashSet.add(currentRow.copy())** }** }* * hashSet.iterator** }** }*} I can try to reproduce more clearly the performance issue, but do you have any insights into why we can't have the same distinct strategy between DataFrame and RDD ? Regards, Olivier.
RE: Easy way to convert Row back to case class
Thank you for suggestions! From: Reynold Xin [mailto:r...@databricks.com] Sent: Friday, May 08, 2015 11:10 AM To: Will Benton Cc: Ulanov, Alexander; dev@spark.apache.org Subject: Re: Easy way to convert Row back to case class In 1.4, you can do row.getInt(colName) In 1.5, some variant of this will come to allow you to turn a DataFrame into a typed RDD, where the case class's field names match the column names. https://github.com/apache/spark/pull/5713 On Fri, May 8, 2015 at 11:01 AM, Will Benton wi...@redhat.commailto:wi...@redhat.com wrote: This might not be the easiest way, but it's pretty easy: you can use Row(field_1, ..., field_n) as a pattern in a case match. So if you have a data frame with foo as an int column and bar as a String columns and you want to construct instances of a case class that wraps these up, you can do something like this: // assuming Record is declared as case class Record(foo: Int, bar: String) // and df is a data frame df.map { case Row(foo: Int, bar: String) = Record(foo, bar) } best, wb - Original Message - From: Alexander Ulanov alexander.ula...@hp.commailto:alexander.ula...@hp.com To: dev@spark.apache.orgmailto:dev@spark.apache.org Sent: Friday, May 8, 2015 11:50:53 AM Subject: Easy way to convert Row back to case class Hi, I created a dataset RDD[MyCaseClass], converted it to DataFrame and saved to Parquet file, following https://spark.apache.org/docs/latest/sql-programming-guide.html#interoperating-with-rdds When I load this dataset with sqlContext.parquetFile, I get DataFrame with column names as in initial case class. I want to convert this DataFrame to RDD to perform RDD operations. However, when I convert it I get RDD[Row] and all information about row names gets lost. Could you suggest an easy way to convert DataFrame to RDD[MyCaseClass]? Best regards, Alexander - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.orgmailto:dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.orgmailto:dev-h...@spark.apache.org
Re: Recent Spark test failures
Hi Ted, Yes, those two options can be useful, but in general I think the standard to set is that tests should never fail. It's actually the worst if tests fail sometimes but not others, because we can't reproduce them deterministically. Using -M and -A actually tolerates flaky tests to a certain extent, and I would prefer to instead increase the determinism in these tests. -Andrew 2015-05-08 17:56 GMT-07:00 Ted Yu yuzhih...@gmail.com: Andrew: Do you think the -M and -A options described here can be used in test runs ? http://scalatest.org/user_guide/using_the_runner Cheers On Wed, May 6, 2015 at 5:41 PM, Andrew Or and...@databricks.com wrote: Dear all, I'm sure you have all noticed that the Spark tests have been fairly unstable recently. I wanted to share a tool that I use to track which tests have been failing most often in order to prioritize fixing these flaky tests. Here is an output of the tool. This spreadsheet reports the top 10 failed tests this week (ending yesterday 5/5): https://docs.google.com/spreadsheets/d/1Iv_UDaTFGTMad1sOQ_s4ddWr6KD3PuFIHmTSzL7LSb4 It is produced by a small project: https://github.com/andrewor14/spark-test-failures I have been filing JIRAs on flaky tests based on this tool. Hopefully we can collectively stabilize the build a little more as we near the release for Spark 1.4. -Andrew
[PySpark DataFrame] When a Row is not a Row
This is really strange. # Spark 1.3.1 print type(results) class 'pyspark.sql.dataframe.DataFrame' a = results.take(1)[0] print type(a) class 'pyspark.sql.types.Row' print pyspark.sql.types.Row class 'pyspark.sql.types.Row' print type(a) == pyspark.sql.types.Row False print isinstance(a, pyspark.sql.types.Row) False If I set a as follows, then the type checks pass fine. a = pyspark.sql.types.Row('name')('Nick') Is this a bug? What can I do to narrow down the source? results is a massive DataFrame of spark-perf results. Nick
Re: large volume spark job spends most of the time in AppendOnlyMap.changeValue
Looks like it is spending a lot of time doing hash probing. It could be a number of the following: 1. hash probing itself is inherently expensive compared with rest of your workload 2. murmur3 doesn't work well with this key distribution 3. quadratic probing (triangular sequence) with a power-of-2 hash table works really badly for this workload. One way to test this is to instrument changeValue function to store the number of probes in total, and then log it. We added this probing capability to the new Bytes2Bytes hash map we built. We should consider just having it being reported as some built-in metrics to facilitate debugging. https://github.com/apache/spark/blob/b83091ae4589feea78b056827bc3b7659d271e41/unsafe/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java#L214 On Mon, May 11, 2015 at 4:21 AM, Michal Haris michal.ha...@visualdna.com wrote: This is the stack trace of the worker thread: org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:150) org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32) org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130) org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:60) org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:46) org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) org.apache.spark.rdd.RDD.iterator(RDD.scala:244) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) org.apache.spark.rdd.RDD.iterator(RDD.scala:244) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) org.apache.spark.rdd.RDD.iterator(RDD.scala:244) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70) org.apache.spark.rdd.RDD.iterator(RDD.scala:242) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) org.apache.spark.scheduler.Task.run(Task.scala:64) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) On 8 May 2015 at 22:12, Josh Rosen rosenvi...@gmail.com wrote: Do you have any more specific profiling data that you can share? I'm curious to know where AppendOnlyMap.changeValue is being called from. On Fri, May 8, 2015 at 1:26 PM, Michal Haris michal.ha...@visualdna.com wrote: +dev On 6 May 2015 10:45, Michal Haris michal.ha...@visualdna.com wrote: Just wanted to check if somebody has seen similar behaviour or knows what we might be doing wrong. We have a relatively complex spark application which processes half a terabyte of data at various stages. We have profiled it in several ways and everything seems to point to one place where 90% of the time is spent: AppendOnlyMap.changeValue. The job scales and is relatively faster than its map-reduce alternative but it still feels slower than it should be. I am suspecting too much spill but I haven't seen any improvement by increasing number of partitions to 10k. Any idea would be appreciated. -- Michal Haris Technical Architect direct line: +44 (0) 207 749 0229 www.visualdna.com | t: +44 (0) 207 734 7033, -- Michal Haris Technical Architect direct line: +44 (0) 207 749 0229 www.visualdna.com | t: +44 (0) 207 734 7033,
Re: Recent Spark test failures
Makes sense. Having high determinism in these tests would make Jenkins build stable. On Mon, May 11, 2015 at 1:08 PM, Andrew Or and...@databricks.com wrote: Hi Ted, Yes, those two options can be useful, but in general I think the standard to set is that tests should never fail. It's actually the worst if tests fail sometimes but not others, because we can't reproduce them deterministically. Using -M and -A actually tolerates flaky tests to a certain extent, and I would prefer to instead increase the determinism in these tests. -Andrew 2015-05-08 17:56 GMT-07:00 Ted Yu yuzhih...@gmail.com: Andrew: Do you think the -M and -A options described here can be used in test runs ? http://scalatest.org/user_guide/using_the_runner Cheers On Wed, May 6, 2015 at 5:41 PM, Andrew Or and...@databricks.com wrote: Dear all, I'm sure you have all noticed that the Spark tests have been fairly unstable recently. I wanted to share a tool that I use to track which tests have been failing most often in order to prioritize fixing these flaky tests. Here is an output of the tool. This spreadsheet reports the top 10 failed tests this week (ending yesterday 5/5): https://docs.google.com/spreadsheets/d/1Iv_UDaTFGTMad1sOQ_s4ddWr6KD3PuFIHmTSzL7LSb4 It is produced by a small project: https://github.com/andrewor14/spark-test-failures I have been filing JIRAs on flaky tests based on this tool. Hopefully we can collectively stabilize the build a little more as we near the release for Spark 1.4. -Andrew
Re: Recent Spark test failures
On 7 May 2015, at 01:41, Andrew Or and...@databricks.com wrote: Dear all, I'm sure you have all noticed that the Spark tests have been fairly unstable recently. I wanted to share a tool that I use to track which tests have been failing most often in order to prioritize fixing these flaky tests. Here is an output of the tool. This spreadsheet reports the top 10 failed tests this week (ending yesterday 5/5): https://docs.google.com/spreadsheets/d/1Iv_UDaTFGTMad1sOQ_s4ddWr6KD3PuFIHmTSzL7LSb4 It is produced by a small project: https://github.com/andrewor14/spark-test-failures that's really nice. I like the publishing to google spreadsheets the eating-your-own-dogfood analysis. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: [PySpark DataFrame] When a Row is not a Row
In Row#equals(): while (i len) { if (apply(i) != that.apply(i)) { '!=' should be !apply(i).equals(that.apply(i)) ? Cheers On Mon, May 11, 2015 at 1:49 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: This is really strange. # Spark 1.3.1 print type(results) class 'pyspark.sql.dataframe.DataFrame' a = results.take(1)[0] print type(a) class 'pyspark.sql.types.Row' print pyspark.sql.types.Row class 'pyspark.sql.types.Row' print type(a) == pyspark.sql.types.Row False print isinstance(a, pyspark.sql.types.Row) False If I set a as follows, then the type checks pass fine. a = pyspark.sql.types.Row('name')('Nick') Is this a bug? What can I do to narrow down the source? results is a massive DataFrame of spark-perf results. Nick
Re: [SPARK-7400] PortableDataStream UDT
Sorry it's hard to give a definitive answer due to the lack of details (I'm not sure what exactly is entailed to have this PortableDataStream), but the answer is probably no if we need to change some existing code and expose a whole new data type to users. On Mon, May 11, 2015 at 9:02 AM, Eron Wright ewri...@live.com wrote: Hello, I'm working on SPARK-7400 for DataFrame support for PortableDataStream, i.e. the data type associated with the RDD from sc.binaryFiles(...). Assuming a patch is available soon, what is the likelihood of inclusion in Spark 1.4? Thanks
Re: YARN mode startup takes too long (10+ secs)
Wow, I hadn't noticed this, but 5 seconds is really long. It's true that it's configurable, but I think we need to provide a decent out-of-the-box experience. For comparison, the MapReduce equivalent is 1 second. I filed https://issues.apache.org/jira/browse/SPARK-7533 for this. -Sandy On Mon, May 11, 2015 at 9:03 AM, Mridul Muralidharan mri...@gmail.com wrote: For tiny/small clusters (particularly single tenet), you can set it to lower value. But for anything reasonably large or multi-tenet, the request storm can be bad if large enough number of applications start aggressively polling RM. That is why the interval is set to configurable. - Mridul On Mon, May 11, 2015 at 6:54 AM, Zoltán Zvara zoltan.zv...@gmail.com wrote: Isn't this issue something that should be improved? Based on the following discussion, there are two places were YARN's heartbeat interval is respected on job start-up, but do we really need to respect it on start-up? On Fri, May 8, 2015 at 12:14 PM Taeyun Kim taeyun@innowireless.com wrote: I think so. In fact, the flow is: allocator.allocateResources() - sleep - allocator.allocateResources() - sleep … But I guess that on the first allocateResources() the allocation is not fulfilled. So sleep occurs. *From:* Zoltán Zvara [mailto:zoltan.zv...@gmail.com] *Sent:* Friday, May 08, 2015 4:25 PM *To:* Taeyun Kim; u...@spark.apache.org *Subject:* Re: YARN mode startup takes too long (10+ secs) So is this sleep occurs before allocating resources for the first few executors to start the job? On Fri, May 8, 2015 at 6:23 AM Taeyun Kim taeyun@innowireless.com wrote: I think I’ve found the (maybe partial, but major) reason. It’s between the following lines, (it’s newly captured, but essentially the same place that Zoltán Zvara picked: 15/05/08 11:36:32 INFO BlockManagerMaster: Registered BlockManager 15/05/08 11:36:38 INFO YarnClientSchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@cluster04 :55237/user/Executor#-149550753] with ID 1 When I read the logs on cluster side, the following lines were found: (the exact time is different with above line, but it’s the difference between machines) 15/05/08 11:36:23 INFO yarn.ApplicationMaster: Started progress reporter thread - sleep time : 5000 15/05/08 11:36:28 INFO impl.AMRMClientImpl: Received new token for : cluster04:45454 It seemed that Spark deliberately sleeps 5 secs. I’ve read the Spark source code, and in org.apache.spark.deploy.yarn.ApplicationMaster.scala, launchReporterThread() had the code for that. It loops calling allocator.allocateResources() and Thread.sleep(). For sleep, it reads the configuration variable spark.yarn.scheduler.heartbeat.interval-ms (the default value is 5000, which is 5 secs). According to the comment, “we want to be reasonably responsive without causing too many requests to RM”. So, unless YARN immediately fulfill the allocation request, it seems that 5 secs will be wasted. When I modified the configuration variable to 1000, it only waited for 1 sec. Here is the log lines after the change: 15/05/08 11:47:21 INFO yarn.ApplicationMaster: Started progress reporter thread - sleep time : 1000 15/05/08 11:47:22 INFO impl.AMRMClientImpl: Received new token for : cluster04:45454 4 secs saved. So, when one does not want to wait 5 secs, one can change the spark.yarn.scheduler.heartbeat.interval-ms. I hope that the additional overhead it incurs would be negligible. *From:* Zoltán Zvara [mailto:zoltan.zv...@gmail.com] *Sent:* Thursday, May 07, 2015 10:05 PM *To:* Taeyun Kim; u...@spark.apache.org *Subject:* Re: YARN mode startup takes too long (10+ secs) Without considering everything, just a few hints: You are running on YARN. From 09:18:34 to 09:18:37 your application is in state ACCEPTED. There is a noticeable overhead introduced due to communicating with YARN's ResourceManager, NodeManager and given that the YARN scheduler needs time to make a decision. I guess somewhere from 09:18:38 to 09:18:43 your application JAR gets copied to another container requested by the Spark ApplicationMaster deployed on YARN's container 0. Deploying an executor needs further resource negotiations with the ResourceManager usually. Also, as I said, your JAR and Executor's code requires copying to the container's local directory - execution blocked until that is complete. On Thu, May 7, 2015 at 3:09 AM Taeyun Kim taeyun@innowireless.com wrote: Hi, I’m running a spark application with YARN-client or YARN-cluster mode. But it seems to take too long to startup. It takes 10+ seconds to initialize the spark context. Is this normal? Or can it be optimized? The environment is as follows: - Hadoop:
Re: YARN mode startup takes too long (10+ secs)
For tiny/small clusters (particularly single tenet), you can set it to lower value. But for anything reasonably large or multi-tenet, the request storm can be bad if large enough number of applications start aggressively polling RM. That is why the interval is set to configurable. - Mridul On Mon, May 11, 2015 at 6:54 AM, Zoltán Zvara zoltan.zv...@gmail.com wrote: Isn't this issue something that should be improved? Based on the following discussion, there are two places were YARN's heartbeat interval is respected on job start-up, but do we really need to respect it on start-up? On Fri, May 8, 2015 at 12:14 PM Taeyun Kim taeyun@innowireless.com wrote: I think so. In fact, the flow is: allocator.allocateResources() - sleep - allocator.allocateResources() - sleep … But I guess that on the first allocateResources() the allocation is not fulfilled. So sleep occurs. *From:* Zoltán Zvara [mailto:zoltan.zv...@gmail.com] *Sent:* Friday, May 08, 2015 4:25 PM *To:* Taeyun Kim; u...@spark.apache.org *Subject:* Re: YARN mode startup takes too long (10+ secs) So is this sleep occurs before allocating resources for the first few executors to start the job? On Fri, May 8, 2015 at 6:23 AM Taeyun Kim taeyun@innowireless.com wrote: I think I’ve found the (maybe partial, but major) reason. It’s between the following lines, (it’s newly captured, but essentially the same place that Zoltán Zvara picked: 15/05/08 11:36:32 INFO BlockManagerMaster: Registered BlockManager 15/05/08 11:36:38 INFO YarnClientSchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@cluster04:55237/user/Executor#-149550753] with ID 1 When I read the logs on cluster side, the following lines were found: (the exact time is different with above line, but it’s the difference between machines) 15/05/08 11:36:23 INFO yarn.ApplicationMaster: Started progress reporter thread - sleep time : 5000 15/05/08 11:36:28 INFO impl.AMRMClientImpl: Received new token for : cluster04:45454 It seemed that Spark deliberately sleeps 5 secs. I’ve read the Spark source code, and in org.apache.spark.deploy.yarn.ApplicationMaster.scala, launchReporterThread() had the code for that. It loops calling allocator.allocateResources() and Thread.sleep(). For sleep, it reads the configuration variable spark.yarn.scheduler.heartbeat.interval-ms (the default value is 5000, which is 5 secs). According to the comment, “we want to be reasonably responsive without causing too many requests to RM”. So, unless YARN immediately fulfill the allocation request, it seems that 5 secs will be wasted. When I modified the configuration variable to 1000, it only waited for 1 sec. Here is the log lines after the change: 15/05/08 11:47:21 INFO yarn.ApplicationMaster: Started progress reporter thread - sleep time : 1000 15/05/08 11:47:22 INFO impl.AMRMClientImpl: Received new token for : cluster04:45454 4 secs saved. So, when one does not want to wait 5 secs, one can change the spark.yarn.scheduler.heartbeat.interval-ms. I hope that the additional overhead it incurs would be negligible. *From:* Zoltán Zvara [mailto:zoltan.zv...@gmail.com] *Sent:* Thursday, May 07, 2015 10:05 PM *To:* Taeyun Kim; u...@spark.apache.org *Subject:* Re: YARN mode startup takes too long (10+ secs) Without considering everything, just a few hints: You are running on YARN. From 09:18:34 to 09:18:37 your application is in state ACCEPTED. There is a noticeable overhead introduced due to communicating with YARN's ResourceManager, NodeManager and given that the YARN scheduler needs time to make a decision. I guess somewhere from 09:18:38 to 09:18:43 your application JAR gets copied to another container requested by the Spark ApplicationMaster deployed on YARN's container 0. Deploying an executor needs further resource negotiations with the ResourceManager usually. Also, as I said, your JAR and Executor's code requires copying to the container's local directory - execution blocked until that is complete. On Thu, May 7, 2015 at 3:09 AM Taeyun Kim taeyun@innowireless.com wrote: Hi, I’m running a spark application with YARN-client or YARN-cluster mode. But it seems to take too long to startup. It takes 10+ seconds to initialize the spark context. Is this normal? Or can it be optimized? The environment is as follows: - Hadoop: Hortonworks HDP 2.2 (Hadoop 2.6) - Spark: 1.3.1 - Client: Windows 7, but similar result on CentOS 6.6 The following is the startup part of the application log. (Some private information was edited) ‘Main: Initializing context’ at the first line and ‘MainProcessor: Deleting previous output files’ at the last line are the logs by the application. Others in between are from Spark itself. Application logic is executed after this log is displayed. --- 15/05/07 09:18:31 INFO Main: Initializing context 15/05/07 09:18:31
Re: Intellij Spark Source Compilation
Hi Iulian, I was able to successfully compile in eclipse after, on the command line, using sbt avro:generate followed by a sbt clean compile (and then a full clean compile in eclipse). Thanks for your help! -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Intellij-Spark-Source-Compilation-tp12168p12201.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org