Re: Change for submitting to yarn in 1.3.1

2015-05-11 Thread Mridul Muralidharan
That works when it is launched from same process - which is
unfortunately not our case :-)

- Mridul

On Sun, May 10, 2015 at 9:05 PM, Manku Timma manku.tim...@gmail.com wrote:
 sc.applicationId gives the yarn appid.

 On 11 May 2015 at 08:13, Mridul Muralidharan mri...@gmail.com wrote:

 We had a similar requirement, and as a stopgap, I currently use a
 suboptimal impl specific workaround - parsing it out of the
 stdout/stderr (based on log config).
 A better means to get to this is indeed required !

 Regards,
 Mridul

 On Sun, May 10, 2015 at 7:33 PM, Ron's Yahoo!
 zlgonza...@yahoo.com.invalid wrote:
  Hi,
I used to submit my Spark yarn applications by using
  org.apache.spark.yarn.deploy.Client api so I can get the application id
  after I submit it. The following is the code that I have, but after
  upgrading to 1.3.1, the yarn Client class was made into a private class. Is
  there a particular reason why this Client class was made private?
I know that there’s a new SparkSubmit object that can be used, but
  it’s not clear to me how I can use it to get the application id after
  submitting to the cluster.
Thoughts?
 
  Thanks,
  Ron
 
  class SparkLauncherServiceImpl extends SparkLauncherService {
 
override def runApp(conf: Configuration, appName: String, queue:
  String): ApplicationId = {
  val ws = SparkLauncherServiceImpl.getWorkspace()
  val params = Array(--class, //
  com.xyz.sparkdb.service.impl.AssemblyServiceImpl, //
  --name, appName, //
  --queue, queue, //
  --driver-memory, 1024m, //
  --addJars,
  getListOfDependencyJars(s$ws/ledp/le-sparkdb/target/dependency), //
  --jar,
  sfile:$ws/ledp/le-sparkdb/target/le-sparkdb-1.0.3-SNAPSHOT.jar)
  System.setProperty(SPARK_YARN_MODE, true)
  System.setProperty(spark.driver.extraJavaOptions,
  -XX:PermSize=128m -XX:MaxPermSize=128m
  -Dsun.io.serialization.extendedDebugInfo=true)
  val sparkConf = new SparkConf()
  val args = new ClientArguments(params, sparkConf)
  new Client(args, conf, sparkConf).runApp()
}
 
private def getListOfDependencyJars(baseDir: String): String = {
  val files = new
  File(baseDir).listFiles().filter(!_.getName().startsWith(spark-assembly))
  val prependedFiles = files.map(x = file: + x.getAbsolutePath())
  val result = ((prependedFiles.tail.foldLeft(new
  StringBuilder(prependedFiles.head))) {(acc, e) = acc.append(,
  ).append(e)}).toString()
  result
}
  }
 

 -
 To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
 For additional commands, e-mail: dev-h...@spark.apache.org



-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: PySpark DataFrame: Preserving nesting when selecting a nested field

2015-05-11 Thread Reynold Xin
In 1.4, you can use struct function to create a struct, e.g. you can
explicitly select out the version column, and then create a new struct
named settings.


The current semantics of select basically follows closely relational
database's SQL, which is well understood and defined. I wouldn't add any
magic to select to for nested data, because it is not very well researched
 defined and we might get into conflicting scenarios.



On Sat, May 9, 2015 at 1:10 PM, Nicholas Chammas nicholas.cham...@gmail.com
 wrote:

 Take a look:

  df = sqlContext.jsonRDD(sc.parallelize(['{settings: {os: OS X,
 version: 10.10}}'])) df.printSchema()
 root
  |-- settings: struct (nullable = true)
  ||-- os: string (nullable = true)
  ||-- version: string (nullable = true)
  # Now I want to drop the version column by # selecting everything
 else. # I want to preserve the schema otherwise. # That means `os`
 should stayed nested under # `settings`.
  df.select('settings.os').printSchema()
 root
  |-- os: string (nullable = true)
  df.select('settings', 'settings.os').printSchema()
 root
  |-- settings: struct (nullable = true)
  ||-- os: string (nullable = true)
  ||-- version: string (nullable = true)
  |-- os: string (nullable = true)
  df.select(df['settings.os'].alias('settings.os')).printSchema()
 root
  |-- settings.os: string (nullable = true)

 In all cases, selecting a nested field loses the original nesting of that
 field.

 What I want is to select settings.os and get back a DataFrame with the
 following schema:

 root
  |-- settings: struct (nullable = true)
  ||-- os: string (nullable = true)

 In other words, I want to preserve the fact that os is nested under
 settings.
 I’m doing this as a work-around for the fact that PySpark does not
 currently support dropping columns,

 Until direct support for such a feature lands as part of SPARK-7509
 https://issues.apache.org/jira/browse/SPARK-7509, selecting all columns
 but the ones you want to drop seems way better than directly manipulating
 the schema (which is the hackier and way more complex alternative for
 rolling your own “drop” logic).

 And you want that process to preserve the schema as much as possible, which
 I assume is how a native “drop column” method would work.

 Is it possible though? Or do we have to do direct schema manipulation?

 Nick
 ​



Re: large volume spark job spends most of the time in AppendOnlyMap.changeValue

2015-05-11 Thread Michal Haris
This is the stack trace of the worker thread:

org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:150)
org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130)
org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:60)
org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:46)
org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
org.apache.spark.scheduler.Task.run(Task.scala:64)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)

On 8 May 2015 at 22:12, Josh Rosen rosenvi...@gmail.com wrote:

 Do you have any more specific profiling data that you can share?  I'm
 curious to know where AppendOnlyMap.changeValue is being called from.

 On Fri, May 8, 2015 at 1:26 PM, Michal Haris michal.ha...@visualdna.com
 wrote:

 +dev
 On 6 May 2015 10:45, Michal Haris michal.ha...@visualdna.com wrote:

  Just wanted to check if somebody has seen similar behaviour or knows
 what
  we might be doing wrong. We have a relatively complex spark application
  which processes half a terabyte of data at various stages. We have
 profiled
  it in several ways and everything seems to point to one place where 90%
 of
  the time is spent:  AppendOnlyMap.changeValue. The job scales and is
  relatively faster than its map-reduce alternative but it still feels
 slower
  than it should be. I am suspecting too much spill but I haven't seen any
  improvement by increasing number of partitions to 10k. Any idea would be
  appreciated.
 
  --
  Michal Haris
  Technical Architect
  direct line: +44 (0) 207 749 0229
  www.visualdna.com | t: +44 (0) 207 734 7033,
 





-- 
Michal Haris
Technical Architect
direct line: +44 (0) 207 749 0229
www.visualdna.com | t: +44 (0) 207 734 7033,


Re: YARN mode startup takes too long (10+ secs)

2015-05-11 Thread Zoltán Zvara
Isn't this issue something that should be improved? Based on the following
discussion, there are two places were YARN's heartbeat interval is
respected on job start-up, but do we really need to respect it on start-up?

On Fri, May 8, 2015 at 12:14 PM Taeyun Kim taeyun@innowireless.com
wrote:

 I think so.

 In fact, the flow is: allocator.allocateResources() - sleep -
 allocator.allocateResources() - sleep …

 But I guess that on the first allocateResources() the allocation is not
 fulfilled. So sleep occurs.



 *From:* Zoltán Zvara [mailto:zoltan.zv...@gmail.com]
 *Sent:* Friday, May 08, 2015 4:25 PM


 *To:* Taeyun Kim; u...@spark.apache.org
 *Subject:* Re: YARN mode startup takes too long (10+ secs)



 So is this sleep occurs before allocating resources for the first few
 executors to start the job?



 On Fri, May 8, 2015 at 6:23 AM Taeyun Kim taeyun@innowireless.com
 wrote:

 I think I’ve found the (maybe partial, but major) reason.



 It’s between the following lines, (it’s newly captured, but essentially
 the same place that Zoltán Zvara picked:



 15/05/08 11:36:32 INFO BlockManagerMaster: Registered BlockManager

 15/05/08 11:36:38 INFO YarnClientSchedulerBackend: Registered executor:
 Actor[akka.tcp://sparkExecutor@cluster04:55237/user/Executor#-149550753]
 with ID 1



 When I read the logs on cluster side, the following lines were found: (the
 exact time is different with above line, but it’s the difference between
 machines)



 15/05/08 11:36:23 INFO yarn.ApplicationMaster: Started progress reporter
 thread - sleep time : 5000

 15/05/08 11:36:28 INFO impl.AMRMClientImpl: Received new token for :
 cluster04:45454



 It seemed that Spark deliberately sleeps 5 secs.

 I’ve read the Spark source code, and in
 org.apache.spark.deploy.yarn.ApplicationMaster.scala, launchReporterThread()
 had the code for that.

 It loops calling allocator.allocateResources() and Thread.sleep().

 For sleep, it reads the configuration variable
 spark.yarn.scheduler.heartbeat.interval-ms (the default value is 5000,
 which is 5 secs).

 According to the comment, “we want to be reasonably responsive without
 causing too many requests to RM”.

 So, unless YARN immediately fulfill the allocation request, it seems that
 5 secs will be wasted.



 When I modified the configuration variable to 1000, it only waited for 1
 sec.



 Here is the log lines after the change:



 15/05/08 11:47:21 INFO yarn.ApplicationMaster: Started progress reporter
 thread - sleep time : 1000

 15/05/08 11:47:22 INFO impl.AMRMClientImpl: Received new token for :
 cluster04:45454



 4 secs saved.

 So, when one does not want to wait 5 secs, one can change the
 spark.yarn.scheduler.heartbeat.interval-ms.

 I hope that the additional overhead it incurs would be negligible.





 *From:* Zoltán Zvara [mailto:zoltan.zv...@gmail.com]
 *Sent:* Thursday, May 07, 2015 10:05 PM
 *To:* Taeyun Kim; u...@spark.apache.org
 *Subject:* Re: YARN mode startup takes too long (10+ secs)



 Without considering everything, just a few hints:

 You are running on YARN. From 09:18:34 to 09:18:37 your application is in
 state ACCEPTED. There is a noticeable overhead introduced due to
 communicating with YARN's ResourceManager, NodeManager and given that the
 YARN scheduler needs time to make a decision. I guess somewhere
 from 09:18:38 to 09:18:43 your application JAR gets copied to another
 container requested by the Spark ApplicationMaster deployed on YARN's
 container 0. Deploying an executor needs further resource negotiations with
 the ResourceManager usually. Also, as I said, your JAR and Executor's code
 requires copying to the container's local directory - execution blocked
 until that is complete.



 On Thu, May 7, 2015 at 3:09 AM Taeyun Kim taeyun@innowireless.com
 wrote:

 Hi,



 I’m running a spark application with YARN-client or YARN-cluster mode.

 But it seems to take too long to startup.

 It takes 10+ seconds to initialize the spark context.

 Is this normal? Or can it be optimized?



 The environment is as follows:

 - Hadoop: Hortonworks HDP 2.2 (Hadoop 2.6)

 - Spark: 1.3.1

 - Client: Windows 7, but similar result on CentOS 6.6



 The following is the startup part of the application log. (Some private
 information was edited)

 ‘Main: Initializing context’ at the first line and ‘MainProcessor:
 Deleting previous output files’ at the last line are the logs by the
 application. Others in between are from Spark itself. Application logic is
 executed after this log is displayed.



 ---



 15/05/07 09:18:31 INFO Main: Initializing context

 15/05/07 09:18:31 INFO SparkContext: Running Spark version 1.3.1

 15/05/07 09:18:31 INFO SecurityManager: Changing view acls to: myuser,myapp

 15/05/07 09:18:31 INFO SecurityManager: Changing modify acls to:
 myuser,myapp

 15/05/07 09:18:31 INFO SecurityManager: SecurityManager: authentication
 disabled; ui acls disabled; users with view permissions: Set(myuser,
 myapp); users with modify 

Re: Intellij Spark Source Compilation

2015-05-11 Thread rtimp
Hi,

Thanks Iulian. Yeah, I was kind of anticipating I could just ignore old-deps
ultimately. However, Even after doing a clean and build all, I get the
following still:

Description LocationResourcePathType
not found: type EventBatch  line 72 SparkAvroCallbackHandler.scala
/spark-streaming-flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink
Scala Problem
not found: type EventBatch  line 87 SparkAvroCallbackHandler.scala
/spark-streaming-flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink
Scala Problem
not found: type EventBatch  line 25 SparkSinkUtils.scala
/spark-streaming-flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink
Scala Problem
not found: type EventBatch  line 48 TransactionProcessor.scala
/spark-streaming-flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink
Scala Problem
not found: type EventBatch  line 48 TransactionProcessor.scala
/spark-streaming-flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink
Scala Problem
not found: type EventBatch  line 80 TransactionProcessor.scala
/spark-streaming-flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink
Scala Problem
not found: type EventBatch  line 146TransactionProcessor.scala
/spark-streaming-flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink
Scala Problem
not found: type SparkFlumeProtocol  line 46 SparkAvroCallbackHandler.scala
/spark-streaming-flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink
Scala Problem
not found: type SparkFlumeProtocol  line 86 SparkSink.scala
/spark-streaming-flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink
Scala Problem
not found: type SparkSinkEvent  line 115TransactionProcessor.scala
/spark-streaming-flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink
Scala Problem
not found: value SparkFlumeProtocol line 185SparkSinkSuite.scala
/spark-streaming-flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink
Scala Problem
not found: value SparkFlumeProtocol line 194SparkSinkSuite.scala
/spark-streaming-flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink
Scala Problem
Project not built due to errors in dependent project(s)
spark-streaming-flume-sink  Unknown spark-streaming-flume   Scala 
Problem
value ack is not a member of Anyline 52 SparkSinkSuite.scala
/spark-streaming-flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink
Scala Problem
value getEventBatch is not a member of Any  line 51 SparkSinkSuite.scala
/spark-streaming-flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink
Scala Problem
value getEventBatch is not a member of Any  line 71 SparkSinkSuite.scala
/spark-streaming-flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink
Scala Problem
value getEventBatch is not a member of Any  line 91 SparkSinkSuite.scala
/spark-streaming-flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink
Scala Problem
value nack is not a member of Any   line 73 SparkSinkSuite.scala
/spark-streaming-flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink
Scala Problem


I'd mention that the EventBatch  and SparkFlumeProtocol were very similar to
what initially occurred for me in intellij, until I clicked the Generate
Sources and Update Folders For All Projects button in the Maven Projects
tool window that the wiki suggests suggests doing.



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Intellij-Spark-Source-Compilation-tp12168p12195.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: Intellij Spark Source Compilation

2015-05-11 Thread Iulian Dragoș
Oh, I see. So then try to run one build on the command time firs (or try sbt
avro:generate, though I’m not sure it’s enough). I just noticed that I have
an additional source folder target/scala-2.10/src_managed/main/compiled_avro
for spark-streaming-flume-sink. I guess I built the project once and that’s
why I never saw these errors..

Once we get it to work, let me know so I can update the wiki guide.

thanks,
iulian
​

On Mon, May 11, 2015 at 4:20 PM, rtimp dolethebobdol...@gmail.com wrote:

 Hi,

 Thanks Iulian. Yeah, I was kind of anticipating I could just ignore
 old-deps
 ultimately. However, Even after doing a clean and build all, I get the
 following still:

 Description LocationResourcePathType
 not found: type EventBatch  line 72 SparkAvroCallbackHandler.scala

 /spark-streaming-flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink
 Scala Problem
 not found: type EventBatch  line 87 SparkAvroCallbackHandler.scala

 /spark-streaming-flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink
 Scala Problem
 not found: type EventBatch  line 25 SparkSinkUtils.scala

 /spark-streaming-flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink
 Scala Problem
 not found: type EventBatch  line 48 TransactionProcessor.scala

 /spark-streaming-flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink
 Scala Problem
 not found: type EventBatch  line 48 TransactionProcessor.scala

 /spark-streaming-flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink
 Scala Problem
 not found: type EventBatch  line 80 TransactionProcessor.scala

 /spark-streaming-flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink
 Scala Problem
 not found: type EventBatch  line 146TransactionProcessor.scala

 /spark-streaming-flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink
 Scala Problem
 not found: type SparkFlumeProtocol  line 46
 SparkAvroCallbackHandler.scala

 /spark-streaming-flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink
 Scala Problem
 not found: type SparkFlumeProtocol  line 86 SparkSink.scala

 /spark-streaming-flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink
 Scala Problem
 not found: type SparkSinkEvent  line 115TransactionProcessor.scala

 /spark-streaming-flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink
 Scala Problem
 not found: value SparkFlumeProtocol line 185
 SparkSinkSuite.scala

 /spark-streaming-flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink
 Scala Problem
 not found: value SparkFlumeProtocol line 194
 SparkSinkSuite.scala

 /spark-streaming-flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink
 Scala Problem
 Project not built due to errors in dependent project(s)
 spark-streaming-flume-sink  Unknown spark-streaming-flume
  Scala Problem
 value ack is not a member of Anyline 52 SparkSinkSuite.scala

 /spark-streaming-flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink
 Scala Problem
 value getEventBatch is not a member of Any  line 51
 SparkSinkSuite.scala

 /spark-streaming-flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink
 Scala Problem
 value getEventBatch is not a member of Any  line 71
 SparkSinkSuite.scala

 /spark-streaming-flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink
 Scala Problem
 value getEventBatch is not a member of Any  line 91
 SparkSinkSuite.scala

 /spark-streaming-flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink
 Scala Problem
 value nack is not a member of Any   line 73 SparkSinkSuite.scala

 /spark-streaming-flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink
 Scala Problem


 I'd mention that the EventBatch  and SparkFlumeProtocol were very similar
 to
 what initially occurred for me in intellij, until I clicked the Generate
 Sources and Update Folders For All Projects button in the Maven Projects
 tool window that the wiki suggests suggests doing.



 --
 View this message in context:
 http://apache-spark-developers-list.1001551.n3.nabble.com/Intellij-Spark-Source-Compilation-tp12168p12195.html
 Sent from the Apache Spark Developers List mailing list archive at
 Nabble.com.

 -
 To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
 For additional commands, e-mail: dev-h...@spark.apache.org




-- 

--
Iulian Dragos

--
Reactive Apps on the JVM
www.typesafe.com


RE: DataFrame distinct vs RDD distinct

2015-05-11 Thread Ulanov, Alexander
The following worked for me as a workaround for distinct:

val pf = sqlContext.parquetFile(hdfs://file)
val distinctValuesOfColumn4 = 
pf.rdd.aggregate[scala.collection.mutable.HashSet[String]](new 
scala.collection.mutable.HashSet[String]())( (s, v) = s += v.getString(4), 
(s1, s2) = s1 ++= s2)

Best regards, Alexander

-Original Message-
From: Ulanov, Alexander 
Sent: Monday, May 11, 2015 11:59 AM
To: Olivier Girardot; Michael Armbrust
Cc: Reynold Xin; dev@spark.apache.org
Subject: RE: DataFrame distinct vs RDD distinct

Hi,

Could you suggest alternative way of implementing distinct, e.g. via fold or 
aggregate? Both SQL distinct and RDD distinct fail on my dataset due to 
overflow of Spark shuffle disk. I have 7 nodes with 300GB dedicated to Spark 
shuffle each. My dataset is 2B rows, the field which I'm performing distinct 
has 23 distinct values.

Best regards, Alexander 

-Original Message-
From: Olivier Girardot [mailto:o.girar...@lateral-thoughts.com]
Sent: Friday, May 08, 2015 12:50 PM
To: Michael Armbrust; Olivier Girardot
Cc: Reynold Xin; dev@spark.apache.org
Subject: Re: DataFrame distinct vs RDD distinct

I'll try to reproduce what has been reported to me first :) and I'll let you 
know. Thanks !

Le jeu. 7 mai 2015 à 21:16, Michael Armbrust mich...@databricks.com a écrit :

 I'd happily merge a PR that changes the distinct implementation to be 
 more like Spark core, assuming it includes benchmarks that show better 
 performance for both the fits in memory case and the too big for 
 memory case.

 On Thu, May 7, 2015 at 2:23 AM, Olivier Girardot  
 o.girar...@lateral-thoughts.com wrote:

 Ok, but for the moment, this seems to be killing performances on some 
 computations...
 I'll try to give you precise figures on this between rdd and dataframe.

 Olivier.

 Le jeu. 7 mai 2015 à 10:08, Reynold Xin r...@databricks.com a écrit :

  In 1.5, we will most likely just rewrite distinct in SQL to either 
  use
 the
  Aggregate operator which will benefit from all the Tungsten
 optimizations,
  or have a Tungsten version of distinct for SQL/DataFrame.
 
  On Thu, May 7, 2015 at 1:32 AM, Olivier Girardot  
  o.girar...@lateral-thoughts.com wrote:
 
  Hi everyone,
  there seems to be different implementations of the distinct 
  feature
 in
  DataFrames and RDD and some performance issue with the DataFrame
 distinct
  API.
 
  In RDD.scala :
 
  def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null):
  RDD[T] =
  withScope { map(x = (x, null)).reduceByKey((x, y) = x,
  numPartitions).map(_._1) }
  And in DataFrame :
 
 
  case class Distinct(partial: Boolean, child: SparkPlan) extends
 UnaryNode
  {
  override def output: Seq[Attribute] = child.output override def
  requiredChildDistribution: Seq[Distribution] = if (partial) 
  UnspecifiedDistribution :: Nil else
 ClusteredDistribution(child.output) ::
 
  Nil *override def execute(): RDD[Row] = {**
 child.execute().mapPartitions {
  iter =** val hashSet = new
  scala.collection.mutable.HashSet[Row]()* *
 var
  currentRow: Row = null** while (iter.hasNext) {** currentRow =
  iter.next()**
  if (!hashSet.contains(currentRow)) {**
  hashSet.add(currentRow.copy())**
  }**
  }* * hashSet.iterator** }** }*}
 
 
 
 
 
 
  I can try to reproduce more clearly the performance issue, but do 
  you
 have
  any insights into why we can't have the same distinct strategy 
  between DataFrame and RDD ?
 
  Regards,
 
  Olivier.
 
 



B CB  [  
X  ܚX KK[XZ[
 ] ][  X  ܚX P \ ˘\X K ܙ B  ܈Y][ۘ[  [X[  K[XZ[
 ] Z[ \ ˘\X K ܙ B 

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



[SPARK-7400] PortableDataStream UDT

2015-05-11 Thread Eron Wright
Hello,
I'm working on SPARK-7400 for DataFrame support for PortableDataStream,  i.e. 
the data type associated with the RDD from sc.binaryFiles(...). 
Assuming a patch is available soon, what is the likelihood of inclusion in 
Spark 1.4?
Thanks

RE: DataFrame distinct vs RDD distinct

2015-05-11 Thread Ulanov, Alexander
Hi,

Could you suggest alternative way of implementing distinct, e.g. via fold or 
aggregate? Both SQL distinct and RDD distinct fail on my dataset due to 
overflow of Spark shuffle disk. I have 7 nodes with 300GB dedicated to Spark 
shuffle each. My dataset is 2B rows, the field which I'm performing distinct 
has 23 distinct values.

Best regards, Alexander 

-Original Message-
From: Olivier Girardot [mailto:o.girar...@lateral-thoughts.com] 
Sent: Friday, May 08, 2015 12:50 PM
To: Michael Armbrust; Olivier Girardot
Cc: Reynold Xin; dev@spark.apache.org
Subject: Re: DataFrame distinct vs RDD distinct

I'll try to reproduce what has been reported to me first :) and I'll let you 
know. Thanks !

Le jeu. 7 mai 2015 à 21:16, Michael Armbrust mich...@databricks.com a écrit :

 I'd happily merge a PR that changes the distinct implementation to be 
 more like Spark core, assuming it includes benchmarks that show better 
 performance for both the fits in memory case and the too big for 
 memory case.

 On Thu, May 7, 2015 at 2:23 AM, Olivier Girardot  
 o.girar...@lateral-thoughts.com wrote:

 Ok, but for the moment, this seems to be killing performances on some 
 computations...
 I'll try to give you precise figures on this between rdd and dataframe.

 Olivier.

 Le jeu. 7 mai 2015 à 10:08, Reynold Xin r...@databricks.com a écrit :

  In 1.5, we will most likely just rewrite distinct in SQL to either 
  use
 the
  Aggregate operator which will benefit from all the Tungsten
 optimizations,
  or have a Tungsten version of distinct for SQL/DataFrame.
 
  On Thu, May 7, 2015 at 1:32 AM, Olivier Girardot  
  o.girar...@lateral-thoughts.com wrote:
 
  Hi everyone,
  there seems to be different implementations of the distinct 
  feature
 in
  DataFrames and RDD and some performance issue with the DataFrame
 distinct
  API.
 
  In RDD.scala :
 
  def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null):
  RDD[T] =
  withScope { map(x = (x, null)).reduceByKey((x, y) = x,
  numPartitions).map(_._1) }
  And in DataFrame :
 
 
  case class Distinct(partial: Boolean, child: SparkPlan) extends
 UnaryNode
  {
  override def output: Seq[Attribute] = child.output override def
  requiredChildDistribution: Seq[Distribution] = if (partial) 
  UnspecifiedDistribution :: Nil else
 ClusteredDistribution(child.output) ::
 
  Nil *override def execute(): RDD[Row] = {**
 child.execute().mapPartitions {
  iter =** val hashSet = new 
  scala.collection.mutable.HashSet[Row]()* *
 var
  currentRow: Row = null** while (iter.hasNext) {** currentRow =
  iter.next()**
  if (!hashSet.contains(currentRow)) {** 
  hashSet.add(currentRow.copy())**
  }**
  }* * hashSet.iterator** }** }*}
 
 
 
 
 
 
  I can try to reproduce more clearly the performance issue, but do 
  you
 have
  any insights into why we can't have the same distinct strategy 
  between DataFrame and RDD ?
 
  Regards,
 
  Olivier.
 
 





RE: Easy way to convert Row back to case class

2015-05-11 Thread Ulanov, Alexander
Thank you for suggestions!

From: Reynold Xin [mailto:r...@databricks.com]
Sent: Friday, May 08, 2015 11:10 AM
To: Will Benton
Cc: Ulanov, Alexander; dev@spark.apache.org
Subject: Re: Easy way to convert Row back to case class

In 1.4, you can do

row.getInt(colName)

In 1.5, some variant of this will come to allow you to turn a DataFrame into a 
typed RDD, where the case class's field names match the column names. 
https://github.com/apache/spark/pull/5713



On Fri, May 8, 2015 at 11:01 AM, Will Benton 
wi...@redhat.commailto:wi...@redhat.com wrote:
This might not be the easiest way, but it's pretty easy:  you can use 
Row(field_1, ..., field_n) as a pattern in a case match.  So if you have a data 
frame with foo as an int column and bar as a String columns and you want to 
construct instances of a case class that wraps these up, you can do something 
like this:

// assuming Record is declared as case class Record(foo: Int, bar: String)
// and df is a data frame

df.map {
  case Row(foo: Int, bar: String) = Record(foo, bar)
}



best,
wb


- Original Message -
 From: Alexander Ulanov 
 alexander.ula...@hp.commailto:alexander.ula...@hp.com
 To: dev@spark.apache.orgmailto:dev@spark.apache.org
 Sent: Friday, May 8, 2015 11:50:53 AM
 Subject: Easy way to convert Row back to case class

 Hi,

 I created a dataset RDD[MyCaseClass], converted it to DataFrame and saved to
 Parquet file, following
 https://spark.apache.org/docs/latest/sql-programming-guide.html#interoperating-with-rdds

 When I load this dataset with sqlContext.parquetFile, I get DataFrame with
 column names as in initial case class. I want to convert this DataFrame to
 RDD to perform RDD operations. However, when I convert it I get RDD[Row] and
 all information about row names gets lost. Could you suggest an easy way to
 convert DataFrame to RDD[MyCaseClass]?

 Best regards, Alexander

-
To unsubscribe, e-mail: 
dev-unsubscr...@spark.apache.orgmailto:dev-unsubscr...@spark.apache.org
For additional commands, e-mail: 
dev-h...@spark.apache.orgmailto:dev-h...@spark.apache.org



Re: Recent Spark test failures

2015-05-11 Thread Andrew Or
Hi Ted,

Yes, those two options can be useful, but in general I think the standard
to set is that tests should never fail. It's actually the worst if tests
fail sometimes but not others, because we can't reproduce them
deterministically. Using -M and -A actually tolerates flaky tests to a
certain extent, and I would prefer to instead increase the determinism in
these tests.

-Andrew

2015-05-08 17:56 GMT-07:00 Ted Yu yuzhih...@gmail.com:

 Andrew:
 Do you think the -M and -A options described here can be used in test runs
 ?
 http://scalatest.org/user_guide/using_the_runner

 Cheers

 On Wed, May 6, 2015 at 5:41 PM, Andrew Or and...@databricks.com wrote:

 Dear all,

 I'm sure you have all noticed that the Spark tests have been fairly
 unstable recently. I wanted to share a tool that I use to track which
 tests
 have been failing most often in order to prioritize fixing these flaky
 tests.

 Here is an output of the tool. This spreadsheet reports the top 10 failed
 tests this week (ending yesterday 5/5):

 https://docs.google.com/spreadsheets/d/1Iv_UDaTFGTMad1sOQ_s4ddWr6KD3PuFIHmTSzL7LSb4

 It is produced by a small project:
 https://github.com/andrewor14/spark-test-failures

 I have been filing JIRAs on flaky tests based on this tool. Hopefully we
 can collectively stabilize the build a little more as we near the release
 for Spark 1.4.

 -Andrew





[PySpark DataFrame] When a Row is not a Row

2015-05-11 Thread Nicholas Chammas
This is really strange.

 # Spark 1.3.1
 print type(results)
class 'pyspark.sql.dataframe.DataFrame'

 a = results.take(1)[0]

 print type(a)
class 'pyspark.sql.types.Row'

 print pyspark.sql.types.Row
class 'pyspark.sql.types.Row'

 print type(a) == pyspark.sql.types.Row
False
 print isinstance(a, pyspark.sql.types.Row)
False

If I set a as follows, then the type checks pass fine.

a = pyspark.sql.types.Row('name')('Nick')

Is this a bug? What can I do to narrow down the source?

results is a massive DataFrame of spark-perf results.

Nick
​


Re: large volume spark job spends most of the time in AppendOnlyMap.changeValue

2015-05-11 Thread Reynold Xin
Looks like it is spending a lot of time doing hash probing. It could be a
number of the following:

1. hash probing itself is inherently expensive compared with rest of your
workload

2. murmur3 doesn't work well with this key distribution

3. quadratic probing (triangular sequence) with a power-of-2 hash table
works really badly for this workload.

One way to test this is to instrument changeValue function to store the
number of probes in total, and then log it. We added this probing
capability to the new Bytes2Bytes hash map we built. We should consider
just having it being reported as some built-in metrics to facilitate
debugging.

https://github.com/apache/spark/blob/b83091ae4589feea78b056827bc3b7659d271e41/unsafe/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java#L214






On Mon, May 11, 2015 at 4:21 AM, Michal Haris michal.ha...@visualdna.com
wrote:

 This is the stack trace of the worker thread:


 org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:150)

 org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)

 org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130)
 org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:60)

 org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:46)
 org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
 org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
 org.apache.spark.scheduler.Task.run(Task.scala:64)
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)

 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 java.lang.Thread.run(Thread.java:745)

 On 8 May 2015 at 22:12, Josh Rosen rosenvi...@gmail.com wrote:

 Do you have any more specific profiling data that you can share?  I'm
 curious to know where AppendOnlyMap.changeValue is being called from.

 On Fri, May 8, 2015 at 1:26 PM, Michal Haris michal.ha...@visualdna.com
 wrote:

 +dev
 On 6 May 2015 10:45, Michal Haris michal.ha...@visualdna.com wrote:

  Just wanted to check if somebody has seen similar behaviour or knows
 what
  we might be doing wrong. We have a relatively complex spark application
  which processes half a terabyte of data at various stages. We have
 profiled
  it in several ways and everything seems to point to one place where
 90% of
  the time is spent:  AppendOnlyMap.changeValue. The job scales and is
  relatively faster than its map-reduce alternative but it still feels
 slower
  than it should be. I am suspecting too much spill but I haven't seen
 any
  improvement by increasing number of partitions to 10k. Any idea would
 be
  appreciated.
 
  --
  Michal Haris
  Technical Architect
  direct line: +44 (0) 207 749 0229
  www.visualdna.com | t: +44 (0) 207 734 7033,
 





 --
 Michal Haris
 Technical Architect
 direct line: +44 (0) 207 749 0229
 www.visualdna.com | t: +44 (0) 207 734 7033,



Re: Recent Spark test failures

2015-05-11 Thread Ted Yu
Makes sense.

Having high determinism in these tests would make Jenkins build stable.

On Mon, May 11, 2015 at 1:08 PM, Andrew Or and...@databricks.com wrote:

 Hi Ted,

 Yes, those two options can be useful, but in general I think the standard
 to set is that tests should never fail. It's actually the worst if tests
 fail sometimes but not others, because we can't reproduce them
 deterministically. Using -M and -A actually tolerates flaky tests to a
 certain extent, and I would prefer to instead increase the determinism in
 these tests.

 -Andrew

 2015-05-08 17:56 GMT-07:00 Ted Yu yuzhih...@gmail.com:

 Andrew:
 Do you think the -M and -A options described here can be used in test
 runs ?
 http://scalatest.org/user_guide/using_the_runner

 Cheers

 On Wed, May 6, 2015 at 5:41 PM, Andrew Or and...@databricks.com wrote:

 Dear all,

 I'm sure you have all noticed that the Spark tests have been fairly
 unstable recently. I wanted to share a tool that I use to track which
 tests
 have been failing most often in order to prioritize fixing these flaky
 tests.

 Here is an output of the tool. This spreadsheet reports the top 10 failed
 tests this week (ending yesterday 5/5):

 https://docs.google.com/spreadsheets/d/1Iv_UDaTFGTMad1sOQ_s4ddWr6KD3PuFIHmTSzL7LSb4

 It is produced by a small project:
 https://github.com/andrewor14/spark-test-failures

 I have been filing JIRAs on flaky tests based on this tool. Hopefully we
 can collectively stabilize the build a little more as we near the release
 for Spark 1.4.

 -Andrew






Re: Recent Spark test failures

2015-05-11 Thread Steve Loughran

 On 7 May 2015, at 01:41, Andrew Or and...@databricks.com wrote:
 
 Dear all,
 
 I'm sure you have all noticed that the Spark tests have been fairly
 unstable recently. I wanted to share a tool that I use to track which tests
 have been failing most often in order to prioritize fixing these flaky
 tests.
 
 Here is an output of the tool. This spreadsheet reports the top 10 failed
 tests this week (ending yesterday 5/5):
 https://docs.google.com/spreadsheets/d/1Iv_UDaTFGTMad1sOQ_s4ddWr6KD3PuFIHmTSzL7LSb4
 
 It is produced by a small project:
 https://github.com/andrewor14/spark-test-failures


that's really nice.

I like the publishing to google spreadsheets  the eating-your-own-dogfood 
analysis.

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: [PySpark DataFrame] When a Row is not a Row

2015-05-11 Thread Ted Yu
In Row#equals():

  while (i  len) {
if (apply(i) != that.apply(i)) {

'!=' should be !apply(i).equals(that.apply(i)) ?

Cheers

On Mon, May 11, 2015 at 1:49 PM, Nicholas Chammas 
nicholas.cham...@gmail.com wrote:

 This is really strange.

  # Spark 1.3.1
  print type(results)
 class 'pyspark.sql.dataframe.DataFrame'

  a = results.take(1)[0]

  print type(a)
 class 'pyspark.sql.types.Row'

  print pyspark.sql.types.Row
 class 'pyspark.sql.types.Row'

  print type(a) == pyspark.sql.types.Row
 False
  print isinstance(a, pyspark.sql.types.Row)
 False

 If I set a as follows, then the type checks pass fine.

 a = pyspark.sql.types.Row('name')('Nick')

 Is this a bug? What can I do to narrow down the source?

 results is a massive DataFrame of spark-perf results.

 Nick
 ​



Re: [SPARK-7400] PortableDataStream UDT

2015-05-11 Thread Reynold Xin
Sorry it's hard to give a definitive answer due to the lack of details (I'm
not sure what exactly is entailed to have this PortableDataStream), but the
answer is probably no if we need to change some existing code and expose a
whole new data type to users.


On Mon, May 11, 2015 at 9:02 AM, Eron Wright ewri...@live.com wrote:

 Hello,
 I'm working on SPARK-7400 for DataFrame support for PortableDataStream,
 i.e. the data type associated with the RDD from sc.binaryFiles(...).
 Assuming a patch is available soon, what is the likelihood of inclusion in
 Spark 1.4?
 Thanks


Re: YARN mode startup takes too long (10+ secs)

2015-05-11 Thread Sandy Ryza
Wow, I hadn't noticed this, but 5 seconds is really long.  It's true that
it's configurable, but I think we need to provide a decent out-of-the-box
experience.  For comparison, the MapReduce equivalent is 1 second.

I filed https://issues.apache.org/jira/browse/SPARK-7533 for this.

-Sandy

On Mon, May 11, 2015 at 9:03 AM, Mridul Muralidharan mri...@gmail.com
wrote:

 For tiny/small clusters (particularly single tenet), you can set it to
 lower value.
 But for anything reasonably large or multi-tenet, the request storm
 can be bad if large enough number of applications start aggressively
 polling RM.
 That is why the interval is set to configurable.

 - Mridul


 On Mon, May 11, 2015 at 6:54 AM, Zoltán Zvara zoltan.zv...@gmail.com
 wrote:
  Isn't this issue something that should be improved? Based on the
 following
  discussion, there are two places were YARN's heartbeat interval is
  respected on job start-up, but do we really need to respect it on
 start-up?
 
  On Fri, May 8, 2015 at 12:14 PM Taeyun Kim taeyun@innowireless.com
  wrote:
 
  I think so.
 
  In fact, the flow is: allocator.allocateResources() - sleep -
  allocator.allocateResources() - sleep …
 
  But I guess that on the first allocateResources() the allocation is not
  fulfilled. So sleep occurs.
 
 
 
  *From:* Zoltán Zvara [mailto:zoltan.zv...@gmail.com]
  *Sent:* Friday, May 08, 2015 4:25 PM
 
 
  *To:* Taeyun Kim; u...@spark.apache.org
  *Subject:* Re: YARN mode startup takes too long (10+ secs)
 
 
 
  So is this sleep occurs before allocating resources for the first few
  executors to start the job?
 
 
 
  On Fri, May 8, 2015 at 6:23 AM Taeyun Kim taeyun@innowireless.com
  wrote:
 
  I think I’ve found the (maybe partial, but major) reason.
 
 
 
  It’s between the following lines, (it’s newly captured, but essentially
  the same place that Zoltán Zvara picked:
 
 
 
  15/05/08 11:36:32 INFO BlockManagerMaster: Registered BlockManager
 
  15/05/08 11:36:38 INFO YarnClientSchedulerBackend: Registered executor:
  Actor[akka.tcp://sparkExecutor@cluster04
 :55237/user/Executor#-149550753]
  with ID 1
 
 
 
  When I read the logs on cluster side, the following lines were found:
 (the
  exact time is different with above line, but it’s the difference between
  machines)
 
 
 
  15/05/08 11:36:23 INFO yarn.ApplicationMaster: Started progress reporter
  thread - sleep time : 5000
 
  15/05/08 11:36:28 INFO impl.AMRMClientImpl: Received new token for :
  cluster04:45454
 
 
 
  It seemed that Spark deliberately sleeps 5 secs.
 
  I’ve read the Spark source code, and in
  org.apache.spark.deploy.yarn.ApplicationMaster.scala,
 launchReporterThread()
  had the code for that.
 
  It loops calling allocator.allocateResources() and Thread.sleep().
 
  For sleep, it reads the configuration variable
  spark.yarn.scheduler.heartbeat.interval-ms (the default value is 5000,
  which is 5 secs).
 
  According to the comment, “we want to be reasonably responsive without
  causing too many requests to RM”.
 
  So, unless YARN immediately fulfill the allocation request, it seems
 that
  5 secs will be wasted.
 
 
 
  When I modified the configuration variable to 1000, it only waited for 1
  sec.
 
 
 
  Here is the log lines after the change:
 
 
 
  15/05/08 11:47:21 INFO yarn.ApplicationMaster: Started progress reporter
  thread - sleep time : 1000
 
  15/05/08 11:47:22 INFO impl.AMRMClientImpl: Received new token for :
  cluster04:45454
 
 
 
  4 secs saved.
 
  So, when one does not want to wait 5 secs, one can change the
  spark.yarn.scheduler.heartbeat.interval-ms.
 
  I hope that the additional overhead it incurs would be negligible.
 
 
 
 
 
  *From:* Zoltán Zvara [mailto:zoltan.zv...@gmail.com]
  *Sent:* Thursday, May 07, 2015 10:05 PM
  *To:* Taeyun Kim; u...@spark.apache.org
  *Subject:* Re: YARN mode startup takes too long (10+ secs)
 
 
 
  Without considering everything, just a few hints:
 
  You are running on YARN. From 09:18:34 to 09:18:37 your application is
 in
  state ACCEPTED. There is a noticeable overhead introduced due to
  communicating with YARN's ResourceManager, NodeManager and given that
 the
  YARN scheduler needs time to make a decision. I guess somewhere
  from 09:18:38 to 09:18:43 your application JAR gets copied to another
  container requested by the Spark ApplicationMaster deployed on YARN's
  container 0. Deploying an executor needs further resource negotiations
 with
  the ResourceManager usually. Also, as I said, your JAR and Executor's
 code
  requires copying to the container's local directory - execution blocked
  until that is complete.
 
 
 
  On Thu, May 7, 2015 at 3:09 AM Taeyun Kim taeyun@innowireless.com
  wrote:
 
  Hi,
 
 
 
  I’m running a spark application with YARN-client or YARN-cluster mode.
 
  But it seems to take too long to startup.
 
  It takes 10+ seconds to initialize the spark context.
 
  Is this normal? Or can it be optimized?
 
 
 
  The environment is as follows:
 
  - Hadoop: 

Re: YARN mode startup takes too long (10+ secs)

2015-05-11 Thread Mridul Muralidharan
For tiny/small clusters (particularly single tenet), you can set it to
lower value.
But for anything reasonably large or multi-tenet, the request storm
can be bad if large enough number of applications start aggressively
polling RM.
That is why the interval is set to configurable.

- Mridul


On Mon, May 11, 2015 at 6:54 AM, Zoltán Zvara zoltan.zv...@gmail.com wrote:
 Isn't this issue something that should be improved? Based on the following
 discussion, there are two places were YARN's heartbeat interval is
 respected on job start-up, but do we really need to respect it on start-up?

 On Fri, May 8, 2015 at 12:14 PM Taeyun Kim taeyun@innowireless.com
 wrote:

 I think so.

 In fact, the flow is: allocator.allocateResources() - sleep -
 allocator.allocateResources() - sleep …

 But I guess that on the first allocateResources() the allocation is not
 fulfilled. So sleep occurs.



 *From:* Zoltán Zvara [mailto:zoltan.zv...@gmail.com]
 *Sent:* Friday, May 08, 2015 4:25 PM


 *To:* Taeyun Kim; u...@spark.apache.org
 *Subject:* Re: YARN mode startup takes too long (10+ secs)



 So is this sleep occurs before allocating resources for the first few
 executors to start the job?



 On Fri, May 8, 2015 at 6:23 AM Taeyun Kim taeyun@innowireless.com
 wrote:

 I think I’ve found the (maybe partial, but major) reason.



 It’s between the following lines, (it’s newly captured, but essentially
 the same place that Zoltán Zvara picked:



 15/05/08 11:36:32 INFO BlockManagerMaster: Registered BlockManager

 15/05/08 11:36:38 INFO YarnClientSchedulerBackend: Registered executor:
 Actor[akka.tcp://sparkExecutor@cluster04:55237/user/Executor#-149550753]
 with ID 1



 When I read the logs on cluster side, the following lines were found: (the
 exact time is different with above line, but it’s the difference between
 machines)



 15/05/08 11:36:23 INFO yarn.ApplicationMaster: Started progress reporter
 thread - sleep time : 5000

 15/05/08 11:36:28 INFO impl.AMRMClientImpl: Received new token for :
 cluster04:45454



 It seemed that Spark deliberately sleeps 5 secs.

 I’ve read the Spark source code, and in
 org.apache.spark.deploy.yarn.ApplicationMaster.scala, launchReporterThread()
 had the code for that.

 It loops calling allocator.allocateResources() and Thread.sleep().

 For sleep, it reads the configuration variable
 spark.yarn.scheduler.heartbeat.interval-ms (the default value is 5000,
 which is 5 secs).

 According to the comment, “we want to be reasonably responsive without
 causing too many requests to RM”.

 So, unless YARN immediately fulfill the allocation request, it seems that
 5 secs will be wasted.



 When I modified the configuration variable to 1000, it only waited for 1
 sec.



 Here is the log lines after the change:



 15/05/08 11:47:21 INFO yarn.ApplicationMaster: Started progress reporter
 thread - sleep time : 1000

 15/05/08 11:47:22 INFO impl.AMRMClientImpl: Received new token for :
 cluster04:45454



 4 secs saved.

 So, when one does not want to wait 5 secs, one can change the
 spark.yarn.scheduler.heartbeat.interval-ms.

 I hope that the additional overhead it incurs would be negligible.





 *From:* Zoltán Zvara [mailto:zoltan.zv...@gmail.com]
 *Sent:* Thursday, May 07, 2015 10:05 PM
 *To:* Taeyun Kim; u...@spark.apache.org
 *Subject:* Re: YARN mode startup takes too long (10+ secs)



 Without considering everything, just a few hints:

 You are running on YARN. From 09:18:34 to 09:18:37 your application is in
 state ACCEPTED. There is a noticeable overhead introduced due to
 communicating with YARN's ResourceManager, NodeManager and given that the
 YARN scheduler needs time to make a decision. I guess somewhere
 from 09:18:38 to 09:18:43 your application JAR gets copied to another
 container requested by the Spark ApplicationMaster deployed on YARN's
 container 0. Deploying an executor needs further resource negotiations with
 the ResourceManager usually. Also, as I said, your JAR and Executor's code
 requires copying to the container's local directory - execution blocked
 until that is complete.



 On Thu, May 7, 2015 at 3:09 AM Taeyun Kim taeyun@innowireless.com
 wrote:

 Hi,



 I’m running a spark application with YARN-client or YARN-cluster mode.

 But it seems to take too long to startup.

 It takes 10+ seconds to initialize the spark context.

 Is this normal? Or can it be optimized?



 The environment is as follows:

 - Hadoop: Hortonworks HDP 2.2 (Hadoop 2.6)

 - Spark: 1.3.1

 - Client: Windows 7, but similar result on CentOS 6.6



 The following is the startup part of the application log. (Some private
 information was edited)

 ‘Main: Initializing context’ at the first line and ‘MainProcessor:
 Deleting previous output files’ at the last line are the logs by the
 application. Others in between are from Spark itself. Application logic is
 executed after this log is displayed.



 ---



 15/05/07 09:18:31 INFO Main: Initializing context

 15/05/07 09:18:31 

Re: Intellij Spark Source Compilation

2015-05-11 Thread rtimp
Hi Iulian,

I was able to successfully compile in eclipse after, on the command line,
using sbt avro:generate followed by a sbt clean compile (and then a full
clean compile in eclipse). Thanks for your help!





--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Intellij-Spark-Source-Compilation-tp12168p12201.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org