I've experienced the same bug, which I had to workaround manually.  I
posted the details here:
http://stackoverflow.com/questions/23687081/spark-workers-unable-to-find-jar-on-ec2-cluster

On 5/15/14, DB Tsai <dbt...@stanford.edu> wrote:
> Hi guys,
>
> I think it maybe a bug in Spark. I wrote some code to demonstrate the bug.
>
> Example 1) This is how Spark adds jars. Basically, add jars to
> cutomURLClassLoader.
>
> https://github.com/dbtsai/classloader-experiement/blob/master/calling/src/main/java/Calling1.java
>
> It doesn't work for two reasons. a) We don't pass the
> customURLClassLoader to task, so it's only available in the
> Executor.scala.  b) Even we do so, we need to get the class by
> loader.loadClass("Class Name").newInstance(), and get the Method by
> getDeclaredMethod to run it.
>
>
> Example 2) It works by getting the class using loadClass API, and then
> get and run the Method by getDeclaredMethod. Since we don't know which
> classes users will use, it's not a solution.
>
> https://github.com/dbtsai/classloader-experiement/blob/master/calling/src/main/java/Calling2.java
>
>
> Example 3) Add jars to systemClassLoader and have them accessible in
> JVM. Users can use the classes directly.
>
> https://github.com/dbtsai/classloader-experiement/blob/master/calling/src/main/java/Calling3.java
>
> I'm now porting example 3) to Spark, and will let you know if it works.
>
> Thanks.
>
> Sincerely,
>
> DB Tsai
> -------------------------------------------------------
> My Blog: https://www.dbtsai.com
> LinkedIn: https://www.linkedin.com/in/dbtsai
>
>
> On Thu, May 15, 2014 at 12:03 PM, DB Tsai <dbt...@stanford.edu> wrote:
>> Hi Xiangrui,
>>
>> We're still using Spark 0.9 branch, and our job is submitted by
>>
>> ./bin/spark-class org.apache.spark.deploy.yarn.Client \
>>   --jar <YOUR_APP_JAR_FILE> \
>>   --class <APP_MAIN_CLASS> \
>>   --args <APP_MAIN_ARGUMENTS> \
>>   --num-workers <NUMBER_OF_WORKER_MACHINES> \
>>   --master-class <ApplicationMaster_CLASS>
>>   --master-memory <MEMORY_FOR_MASTER> \
>>   --worker-memory <MEMORY_PER_WORKER> \
>>   --addJars <any_local_files_used_in_SparkContext.addJar>
>>
>>
>> Based on my understanding of the code in yarn-standalone mode, the jar
>> distributing from local machine to application master is through
>> distributed
>> cache (using hadoop yarn-client api). From application master to
>> executors,
>> it's through http server. I maybe wrong, but if you look at the code in
>> SparkContext addJar method, you can see the jar is added to http server
>> in
>> yarn-standalone mode.
>>
>>             if (SparkHadoopUtil.get.isYarnMode() && master ==
>> "yarn-standalone") {
>>               // In order for this to work in yarn standalone mode the
>> user
>> must specify the
>>               // --addjars option to the client to upload the file into
>> the
>> distributed cache
>>               // of the AM to make it show up in the current working
>> directory.
>>               val fileName = new Path(uri.getPath).getName()
>>               try {
>>                 env.httpFileServer.addJar(new File(fileName))
>>               } catch {
>>
>> Those jars will be fetched in Executor from http server and added to
>> classloader of "Executor" class, see
>>
>>   private def updateDependencies(newFiles: HashMap[String, Long],
>> newJars:
>> HashMap[String, Long]) {
>>     synchronized {
>>       // Fetch missing dependencies
>>       for ((name, timestamp) <- newFiles if currentFiles.getOrElse(name,
>> -1L) < timestamp) {
>>         logInfo("Fetching " + name + " with timestamp " + timestamp)
>>         Utils.fetchFile(name, new File(SparkFiles.getRootDirectory),
>> conf)
>>         currentFiles(name) = timestamp
>>       }
>>       for ((name, timestamp) <- newJars if currentJars.getOrElse(name,
>> -1L)
>> < timestamp) {
>>         logInfo("Fetching " + name + " with timestamp " + timestamp)
>>         Utils.fetchFile(name, new File(SparkFiles.getRootDirectory),
>> conf)
>>         currentJars(name) = timestamp
>>         // Add it to our class loader
>>         val localName = name.split("/").last
>>         val url = new File(SparkFiles.getRootDirectory,
>> localName).toURI.toURL
>>
>>         if (!urlClassLoader.getURLs.contains(url)) {
>>           urlClassLoader.addURL(url)
>>         }
>>       }
>>
>>
>> The problem seems to be that jars are added to the classloader of
>> "Executor"
>> classes, and they are not accessible in Task.scala.
>>
>> I verified this by trying to load our custom classes in Executor.scala,
>> and
>> it works. But if I tried to load those classes in Task.scala, I'll get
>> classNotFound exception.
>>
>> Thanks.
>>
>>
>>
>>
>>
>> Sincerely,
>>
>> DB Tsai
>> -------------------------------------------------------
>> My Blog: https://www.dbtsai.com
>> LinkedIn: https://www.linkedin.com/in/dbtsai
>>
>>
>> On Wed, May 14, 2014 at 6:04 PM, Xiangrui Meng <men...@gmail.com> wrote:
>>>
>>> In SparkContext#addJar, for yarn-standalone mode, the workers should
>>> get the jars from local distributed cache instead of fetching them
>>> from the http server. Could you send the command you used to submit
>>> the job? -Xiangrui
>>>
>>> On Wed, May 14, 2014 at 1:26 AM, DB Tsai <dbt...@stanford.edu> wrote:
>>> > Hi Xiangrui,
>>> >
>>> > I actually used `yarn-standalone`, sorry for misleading. I did
>>> > debugging
>>> > in
>>> > the last couple days, and everything up to updateDependency in
>>> > executor.scala works. I also checked the file size and md5sum in the
>>> > executors, and they are the same as the one in driver. Gonna do more
>>> > testing
>>> > tomorrow.
>>> >
>>> > Thanks.
>>> >
>>> >
>>> > Sincerely,
>>> >
>>> > DB Tsai
>>> > -------------------------------------------------------
>>> > My Blog: https://www.dbtsai.com
>>> > LinkedIn: https://www.linkedin.com/in/dbtsai
>>> >
>>> >
>>> > On Tue, May 13, 2014 at 11:41 PM, Xiangrui Meng <men...@gmail.com>
>>> > wrote:
>>> >>
>>> >> I don't know whether this would fix the problem. In v0.9, you need
>>> >> `yarn-standalone` instead of `yarn-cluster`.
>>> >>
>>> >> See
>>> >>
>>> >> https://github.com/apache/spark/commit/328c73d037c17440c2a91a6c88b4258fbefa0c08
>>> >>
>>> >> On Tue, May 13, 2014 at 11:36 PM, Xiangrui Meng <men...@gmail.com>
>>> >> wrote:
>>> >> > Does v0.9 support yarn-cluster mode? I checked SparkContext.scala
>>> >> > in
>>> >> > v0.9.1 and didn't see special handling of `yarn-cluster`. -Xiangrui
>>> >> >
>>> >> > On Mon, May 12, 2014 at 11:14 AM, DB Tsai <dbt...@stanford.edu>
>>> >> > wrote:
>>> >> >> We're deploying Spark in yarn-cluster mode (Spark 0.9), and we add
>>> >> >> jar
>>> >> >> dependencies in command line with "--addJars" option. However,
>>> >> >> those
>>> >> >> external jars are only available in the driver (application
>>> >> >> running
>>> >> >> in
>>> >> >> hadoop), and not available in the executors (workers).
>>> >> >>
>>> >> >> After doing some research, we realize that we've to push those
>>> >> >> jars
>>> >> >> to
>>> >> >> executors in driver via sc.AddJar(fileName). Although in the
>>> >> >> driver's
>>> >> >> log
>>> >> >> (see the following), the jar is successfully added in the http
>>> >> >> server
>>> >> >> in the
>>> >> >> driver, and I confirm that it's downloadable from any machine in
>>> >> >> the
>>> >> >> network, I still get `java.lang.NoClassDefFoundError` in the
>>> >> >> executors.
>>> >> >>
>>> >> >> 14/05/09 14:51:41 INFO spark.SparkContext: Added JAR
>>> >> >> analyticshadoop-eba5cdce1.jar at
>>> >> >> http://10.0.0.56:42522/jars/analyticshadoop-eba5cdce1.jar with
>>> >> >> timestamp
>>> >> >> 1399672301568
>>> >> >>
>>> >> >> Then I check the log in the executors, and I don't find anything
>>> >> >> `Fetching
>>> >> >> <file> with timestamp <timestamp>`, which implies something is
>>> >> >> wrong;
>>> >> >> the
>>> >> >> executors are not downloading the external jars.
>>> >> >>
>>> >> >> Any suggestion what we can look at?
>>> >> >>
>>> >> >> After digging into how spark distributes external jars, I wonder
>>> >> >> the
>>> >> >> scalability of this approach. What if there are thousands of nodes
>>> >> >> downloading the jar from single http server in the driver? Why
>>> >> >> don't
>>> >> >> we
>>> >> >> push
>>> >> >> the jars into HDFS distributed cache by default instead of
>>> >> >> distributing
>>> >> >> them
>>> >> >> via http server?
>>> >> >>
>>> >> >> Thanks.
>>> >> >>
>>> >> >> Sincerely,
>>> >> >>
>>> >> >> DB Tsai
>>> >> >> -------------------------------------------------------
>>> >> >> My Blog: https://www.dbtsai.com
>>> >> >> LinkedIn: https://www.linkedin.com/in/dbtsai
>>> >
>>> >
>>
>>
>

Reply via email to