Re: How to use spark-submit

2014-05-12 Thread Sonal Goyal
Hi Stephen,

I am using maven shade plugin for creating my uber jar. I have marked spark
dependencies as provided.

Best Regards,
Sonal
Nube Technologies http://www.nubetech.co

http://in.linkedin.com/in/sonalgoyal




On Mon, May 12, 2014 at 1:04 AM, Stephen Boesch java...@gmail.com wrote:

 HI Sonal,
 Yes I am working towards that same idea.  How did you go about
 creating the non-spark-jar dependencies ?  The way I am doing it is a
 separate straw-man project that does not include spark but has the external
 third party jars included. Then running sbt compile:managedClasspath and
 reverse engineering the lib jars from it.  That is obviously not ideal.

 The maven run will be useful for other projects built by maven: i will
 keep in my notes.

 AFA sbt run-example, it requires additional libraries to be added for my
 external dependencies.  I tried several items including  ADD_JARS,
  --driver-class-path  and combinations of extraClassPath. I have deferred
 that ad-hoc approach to finding a systematic one.




 2014-05-08 5:26 GMT-07:00 Sonal Goyal sonalgoy...@gmail.com:

 I am creating a jar with only my dependencies and run spark-submit through
 my project mvn build. I have configured the mvn exec goal to the location
 of the script. Here is how I have set it up for my app. The mainClass is my
 driver program, and I am able to send my custom args too. Hope this helps.

 plugin
 groupIdorg.codehaus.mojo/groupId
 artifactIdexec-maven-plugin/artifactId
 executions
 execution
  goals
 goalexec/goal
 /goals
  /execution
 /executions
 configuration
executable/home/sgoyal/spark/bin/spark-submit/executable
  arguments
 argument${jars}/argument
 argument--class/argument
 argument${mainClass}/argument
 argument--arg/argument
 argument${spark.master}/argument
 argument--arg/argument
 argument${my app arg 1}/argument
 argument--arg/argument
 argument${my arg 2}/argument
 /arguments
 /configuration
 /plugin


 Best Regards,
 Sonal
 Nube Technologies http://www.nubetech.co

  http://in.linkedin.com/in/sonalgoyal




 On Wed, May 7, 2014 at 6:57 AM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 Doesnt the run-example script work for you? Also, are you on the latest
 commit of branch-1.0 ?

 TD


 On Mon, May 5, 2014 at 7:51 PM, Soumya Simanta soumya.sima...@gmail.com
  wrote:



 Yes, I'm struggling with a similar problem where my class are not found
 on the worker nodes. I'm using 1.0.0_SNAPSHOT.  I would really appreciate
 if someone can provide some documentation on the usage of spark-submit.

 Thanks

  On May 5, 2014, at 10:24 PM, Stephen Boesch java...@gmail.com
 wrote:
 
 
  I have a spark streaming application that uses the external streaming
 modules (e.g. kafka, mqtt, ..) as well.  It is not clear how to properly
 invoke the spark-submit script: what are the ---driver-class-path and/or
 -Dspark.executor.extraClassPath parameters required?
 
   For reference, the following error is proving difficult to resolve:
 
  java.lang.ClassNotFoundException:
 org.apache.spark.streaming.examples.StreamingExamples
 







Re: File present but file not found exception

2014-05-12 Thread Sai Prasanna
I found that if a file is present in all the nodes in the given path in
localFS, then reading is possible.

But is there a way to read if the file is present only in certain nodes ??
[There should be a way !!]

*NEED: Wanted to do some filter ops in HDFS file, create a local file of
the result, create an RDD out of it operate *

Is there any way out ??

Thanks in advance !




On Fri, May 9, 2014 at 12:18 AM, Sai Prasanna ansaiprasa...@gmail.comwrote:

 Hi Everyone,

 I think all are pretty busy, the response time in this group has slightly
 increased.

 But anyways, this is a pretty silly problem, but could not get over.

 I have a file in my localFS, but when i try to create an RDD out of it,
 tasks fails with file not found exception is thrown at the log files.

 *var file = sc.textFile(file:///home/sparkcluster/spark/input.txt);*
 *file.top(1);*

 input.txt exists in the above folder but still Spark coudnt find it. Some
 parameters need to be set ??

 Any help is really appreciated. Thanks !!



Re: java.lang.NoSuchMethodError on Java API

2014-05-12 Thread Alessandro De Carli
Sure, I uploaded the code on pastebin: http://pastebin.com/90Hynrjh

On Mon, May 12, 2014 at 12:27 AM, Madhu ma...@madhu.com wrote:
 No, you don't need to do anything special to get it to run in Eclipse.
 Just add the assembly jar to the build path, create a main method, add your
 code, and click the green run button.

 Can you post your code here?
 I can try it in my environment.



 -
 Madhu
 https://www.linkedin.com/in/msiddalingaiah
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-NoSuchMethodError-on-Java-API-tp5545p5567.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



-- 
Alessandro De Carli
Sonnmattstr. 121
CH-5242 Birr

Email: decarli@gmail.com
Twitter: @a_d_c_
Tel: +41 76 305 75 00
Web: http://www.papers.ch


Re: How to use spark-submit

2014-05-12 Thread Stephen Boesch
@Sonal - makes sense.  Is the maven shade plugin runnable within sbt ? If
so would you care to share those build.sbt (or .scala) lines?  If not, are
you aware of a similar plugin for sbt?




2014-05-11 23:53 GMT-07:00 Sonal Goyal sonalgoy...@gmail.com:

 Hi Stephen,

 I am using maven shade plugin for creating my uber jar. I have marked
 spark dependencies as provided.

 Best Regards,
 Sonal
 Nube Technologies http://www.nubetech.co

 http://in.linkedin.com/in/sonalgoyal




 On Mon, May 12, 2014 at 1:04 AM, Stephen Boesch java...@gmail.com wrote:

 HI Sonal,
 Yes I am working towards that same idea.  How did you go about
 creating the non-spark-jar dependencies ?  The way I am doing it is a
 separate straw-man project that does not include spark but has the external
 third party jars included. Then running sbt compile:managedClasspath and
 reverse engineering the lib jars from it.  That is obviously not ideal.

 The maven run will be useful for other projects built by maven: i will
 keep in my notes.

 AFA sbt run-example, it requires additional libraries to be added for my
 external dependencies.  I tried several items including  ADD_JARS,
  --driver-class-path  and combinations of extraClassPath. I have deferred
 that ad-hoc approach to finding a systematic one.




 2014-05-08 5:26 GMT-07:00 Sonal Goyal sonalgoy...@gmail.com:

 I am creating a jar with only my dependencies and run spark-submit
 through my project mvn build. I have configured the mvn exec goal to the
 location of the script. Here is how I have set it up for my app. The
 mainClass is my driver program, and I am able to send my custom args too.
 Hope this helps.

 plugin
 groupIdorg.codehaus.mojo/groupId
 artifactIdexec-maven-plugin/artifactId
 executions
 execution
  goals
 goalexec/goal
 /goals
  /execution
 /executions
 configuration
executable/home/sgoyal/spark/bin/spark-submit/executable
  arguments
 argument${jars}/argument
 argument--class/argument
 argument${mainClass}/argument
 argument--arg/argument
 argument${spark.master}/argument
 argument--arg/argument
 argument${my app arg 1}/argument
 argument--arg/argument
 argument${my arg 2}/argument
 /arguments
 /configuration
 /plugin


 Best Regards,
 Sonal
 Nube Technologies http://www.nubetech.co

  http://in.linkedin.com/in/sonalgoyal




 On Wed, May 7, 2014 at 6:57 AM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 Doesnt the run-example script work for you? Also, are you on the latest
 commit of branch-1.0 ?

 TD


 On Mon, May 5, 2014 at 7:51 PM, Soumya Simanta 
 soumya.sima...@gmail.com wrote:



 Yes, I'm struggling with a similar problem where my class are not
 found on the worker nodes. I'm using 1.0.0_SNAPSHOT.  I would really
 appreciate if someone can provide some documentation on the usage of
 spark-submit.

 Thanks

  On May 5, 2014, at 10:24 PM, Stephen Boesch java...@gmail.com
 wrote:
 
 
  I have a spark streaming application that uses the external
 streaming modules (e.g. kafka, mqtt, ..) as well.  It is not clear how to
 properly invoke the spark-submit script: what are the ---driver-class-path
 and/or -Dspark.executor.extraClassPath parameters required?
 
   For reference, the following error is proving difficult to resolve:
 
  java.lang.ClassNotFoundException:
 org.apache.spark.streaming.examples.StreamingExamples
 








Re: How to read a multipart s3 file?

2014-05-12 Thread Nicholas Chammas
On Wed, May 7, 2014 at 4:00 AM, Han JU ju.han.fe...@gmail.com wrote:

But in my experience, when reading directly from s3n, spark create only 1
 input partition per file, regardless of the file size. This may lead to
 some performance problem if you have big files.

 You can (and perhaps should) always repartition() the RDD explicitly to
increase your level of parallelism to match the number of cores in your
cluster. It’s pretty quick, and will speed up all subsequent operations.


Re: Spark to utilize HDFS's mmap caching

2014-05-12 Thread Matei Zaharia
Yes, Spark goes through the standard HDFS client and will automatically benefit 
from this.

Matei

On May 8, 2014, at 4:43 AM, Chanwit Kaewkasi chan...@gmail.com wrote:

 Hi all,
 
 Can Spark (0.9.x) utilize the caching feature in HDFS 2.3 via
 sc.textFile() and other HDFS-related APIs?
 
 http://hadoop.apache.org/docs/r2.3.0/hadoop-project-dist/hadoop-hdfs/CentralizedCacheManagement.html
 
 Best regards,
 
 -chanwit
 
 --
 Chanwit Kaewkasi
 linkedin.com/in/chanwit



Re: build shark(hadoop CDH5) on hadoop2.0.0 CDH4

2014-05-12 Thread Sean Owen
There was never a Hadoop 2.0.0. There was a Hadoop 2.0.0-alpha as
far as Maven artifacts are concerned. The latest in that series is
2.0.6-alpha.

On Mon, May 12, 2014 at 4:29 AM, Sophia sln-1...@163.com wrote:
 I have built shark in sbt way,but the sbt exception turn out:
 [error] sbt.resolveException:unresolved dependency:
 org.apache.hadoop#hadoop-client;2.0.0: not found.
 How can I do to build it well?




 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/build-shark-hadoop-CDH5-on-hadoop2-0-0-CDH4-tp5574.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.


Client cannot authenticate via:[TOKEN]

2014-05-12 Thread innowireless TaeYun Kim
I'm trying to run spark-shell on Hadoop yarn.
Specifically, the environment is as follows:

- Client
  - OS: Windows 7
  - Spark version: 1.0.0-SNAPSHOT (git cloned 2014.5.8)
- Server
  - Platform: hortonworks sandbox 2.1

I modified the spark code to apply
https://issues.apache.org/jira/browse/YARN-1824 so that the cross-platform
issues are removed. (that is, $() to $$(), File.pathSeparator to
ApplicationConstants.CLASS_PATH_SEPARATOR)

Now when I run spark-shell on client(Windows 7), server log is produced as
follows:
('owner' is the user name of the Windows 7 machine.)

Log Type: stderr
Log Length: 1356
log4j:WARN No appenders could be found for logger
(org.apache.hadoop.util.Shell).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for
more info.
14/05/12 01:13:54 INFO YarnSparkHadoopUtil: Using Spark's default log4j
profile: org/apache/spark/log4j-defaults.properties
14/05/12 01:13:54 INFO SecurityManager: Changing view acls to: yarn,owner
14/05/12 01:13:54 INFO SecurityManager: SecurityManager: authentication
disabled; ui acls disabled; users with view permissions: Set(yarn, owner)
14/05/12 01:13:55 INFO Slf4jLogger: Slf4jLogger started
14/05/12 01:13:56 INFO Remoting: Starting remoting
14/05/12 01:13:56 INFO Remoting: Remoting started; listening on addresses
:[akka.tcp://sparkyar...@sandbox.hortonworks.com:47074]
14/05/12 01:13:56 INFO Remoting: Remoting now listens on addresses:
[akka.tcp://sparkyar...@sandbox.hortonworks.com:47074]
14/05/12 01:13:56 INFO RMProxy: Connecting to ResourceManager at
/0.0.0.0:8030
14/05/12 01:13:56 INFO ExecutorLauncher: ApplicationAttemptId:
appattempt_1399856448891_0018_01
14/05/12 01:13:56 INFO ExecutorLauncher: Registering the ApplicationMaster
14/05/12 01:13:56 WARN Client: Exception encountered while connecting to the
server : org.apache.hadoop.security.AccessControlException: Client cannot
authenticate via:[TOKEN]

How can I handle this error?
Thanks in advance.



Spark on Yarn - A small issue !

2014-05-12 Thread Sai Prasanna
Hi All,

I wanted to launch Spark on Yarn, interactive - yarn client mode.

With default settings of yarn-site.xml and spark-env.sh, i followed the
given link
http://spark.apache.org/docs/0.8.1/running-on-yarn.html

I get the pi value correct when i run without launching the shell.

When i launch the shell, with following command,

SPARK_JAR=./assembly/target/scala-2.9.3/spark-assembly-0.8.1-incubating-hadoop2.3.0.jar
\
SPARK_YARN_APP_JAR=examples/target/scala-2.9.3/spark-examples-assembly-0.8.1-incubating.jar
\
MASTER=yarn-client ./spark-shell

And try to create RDDs and do some action on it, i get nothing. After
sometime tasks fails.

LogFile of spark:

519095 14/05/12 13:30:40 INFO YarnClientClusterScheduler:
YarnClientClusterScheduler.postStartHook done

519096 14/05/12 13:30:40 INFO BlockManagerMasterActor$BlockManagerInfo:
Registering block manager s1:38355 with 324.4 MB RAM

519097 14/05/12 13:31:38 INFO MemoryStore: ensureFreeSpace(202584) called
with curMem=0, maxMem=340147568

519098 14/05/12 13:31:38 INFO MemoryStore: Block broadcast_0 stored as
values to memory (estimated size 197.8 KB, free 324.2 MB)

519099 14/05/12 13:31:49 INFO FileInputFormat: Total input paths to process
: 1

519100 14/05/12 13:31:49 INFO NetworkTopology: Adding a new node:
/default-rack/192.168.1.100:50010

519101 14/05/12 13:31:49 INFO SparkContext: Starting job: top at
console:15

519102 14/05/12 13:31:49 INFO DAGScheduler: Got job 0 (top at console:15)
with 4 output partitions (allowLocal=false)

519103 14/05/12 13:31:49 INFO DAGScheduler: Final stage: Stage 0 (top at
console:15)

519104 14/05/12 13:31:49 INFO DAGScheduler: Parents of final stage: List()

519105 14/05/12 13:31:49 INFO DAGScheduler: Missing parents: List()

519106 14/05/12 13:31:49 INFO DAGScheduler: Submitting Stage 0
(MapPartitionsRDD[2] at top at console:15), which has no missing par
  ents

519107 14/05/12 13:31:49 INFO DAGScheduler: Submitting 4 missing tasks from
Stage 0 (MapPartitionsRDD[2] at top at console:15)

519108 14/05/12 13:31:49 INFO YarnClientClusterScheduler: Adding task set
0.0 with 4 tasks

519109 14/05/12 13:31:49 INFO *RackResolver: Resolved s1 to /default-rack*

*519110 14/05/12 13:31:49 INFO ClusterTaskSetManager: Starting task 0.0:3
as TID 0 on executor 1: s1 (PROCESS_LOCAL)*

*519111 14/05/12 13:31:49 INFO ClusterTaskSetManager: Serialized task 0.0:3
as 1811 bytes in 4 ms*

*519112 14/05/12 13:31:49 INFO ClusterTaskSetManager: Starting task 0.0:0
as TID 1 on executor 1: s1 (NODE_LOCAL)*

*519113 14/05/12 13:31:49 INFO ClusterTaskSetManager: Serialized task 0.0:0
as 1811 bytes in 1 ms*

519114 14/05/12 13:32:18* INFO YarnClientSchedulerBackend: Executor 1
disconnected, so removing it*

*519115 14/05/12 13:32:18 ERROR YarnClientClusterScheduler: Lost executor 1
on s1: remote Akka client shutdown*

*519116 14/05/12 13:32:18 INFO ClusterTaskSetManager: Re-queueing tasks for
1 from TaskSet 0.0*

*519117 14/05/12 13:32:18 WARN ClusterTaskSetManager: Lost TID 1 (task
0.0:0)*

*519118 14/05/12 13:32:18 WARN ClusterTaskSetManager: Lost TID 0 (task
0.0:3)*

*519119 14/05/12 13:32:18 INFO DAGScheduler: Executor lost: 1 (epoch 0)*

*519120 14/05/12 13:32:18 INFO BlockManagerMasterActor: Trying to remove
executor 1 from BlockManagerMaster.*

*519121 14/05/12 13:32:18 INFO BlockManagerMaster: Removed 1 successfully
in removeExecutor*


 Do i need to set any other env-variable specifically for SPARK on YARN.
What could be the isuue ??

Can anyone please help me in this regard.

Thanks in Advance !!


Re: Is there any problem on the spark mailing list?

2014-05-12 Thread Sean Owen
Note the mails are coming out of order in some cases. I am getting current
messages but a sprinkling of old replies too.
On May 12, 2014 12:16 PM, ankurdave ankurd...@gmail.com wrote:

 I haven't been getting mail either. This was the last message I received:

 http://apache-spark-user-list.1001560.n3.nabble.com/master-attempted-to-re-register-the-worker-and-then-took-all-workers-as-unregistered-tp553p5491.html



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Is-there-any-problem-on-the-spark-mailing-list-tp5509p5515.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



spark-env.sh do not take effect.

2014-05-12 Thread lihu
Hi,
I set a small cluster with 3 machines, every machine is 64GB RAM, 11
Core. and I used the spark0.9.

   I have set spark-env.sh as following:

   *SPARK_MASTER_IP=192.168.35.2*
*   SPARK_MASTER_PORT=7077*
*   SPARK_MASTER_WEBUI_PORT=12306*
*   SPARK_WORKER_CORES=3*
*   SPARK_WORKER_MEMORY=20g*
*  SPARK_JAVA_OPTS+=-Dspark.executor.memory=5g*

   but I see the log in the master as following,

   *Spark Command: java -cp
:/usr/local/spark-0.9.0/conf:/usr/local/spark-0.9.0/assembly/target/scala-2.1
   0/spark-assembly-0.9.0-incubating-hadoop2.2.0.jar
-Dspark.akka.logLifecycleEvents=true -Djava.library.path= -Xms512m
-Xmx512m org.apache.spark.deploy.master.Master --ip 192.168.35.2 --port
7077 --webui-port 12306*
*  *

*  log4j:WARN No appenders could be found for logger
(akka.event.slf4j.Slf4jLogger).*
*  log4j:WARN Please initialize the log4j system properly.*
*  log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig
http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.*
*  14/05/07 08:30:31 INFO Master: Using Spark's default log4j profile:
org/apache/spark/log4j-defaults.properties*
*  14/05/07 08:30:31 INFO Master: Starting Spark master at
spark://192.168.35.2:7077 http://192.168.35.2:7077*
*  14/05/07 08:30:31 INFO MasterWebUI: Started Master web UI at
http://pug-master:12306 http://pug-master:12306*
*  14/05/07 08:30:31 INFO Master: I have been elected leader! New state:
ALIVE*
*  14/05/07 08:30:34 INFO Master: Registering worker 192.168.35.2:52972
http://192.168.35.2:52972 with 11 cores, 61.9 GB RAM*
*  14/05/07 08:30:34 INFO Master: Registering worker 192.168.35.2:43225
http://192.168.35.2:43225 with 11 cores, 61.9 GB RAM*


and the log in my worker as following:

   *Spark Command: java -cp
:/usr/local/spark-0.9.0/conf:/usr/local/spark-0.9.0/assembly/target/scala-2.1
   0/spark-assembly-0.9.0-incubating-hadoop2.2.0.jar
-Dspark.akka.logLifecycleEvents=true -Djava.library.path= -Xms512m
-Xmx512m org.apache.spark.deploy.worker.Worker spark://192.168.35.2:7077
http://192.168.35.2:7077*
*   *

*  log4j:WARN No appenders could be found for logger
(akka.event.slf4j.Slf4jLogger).*
*  log4j:WARN Please initialize the log4j system properly.*
*  log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig
http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.*
*  14/05/07 08:30:34 INFO Worker: Using Spark's default log4j profile:
org/apache/spark/log4j-defaults.properties*
*  14/05/07 08:30:34 INFO Worker: Starting Spark worker pug1:43225 with 11
cores, 61.9 GB RAM*
*  14/05/07 08:30:34 INFO Worker: Spark home: /usr/local/spark-0.9.0*
*  14/05/07 08:30:34 INFO WorkerWebUI: Started Worker web UI at
http://pug1:8081 http://pug1:8081*
*  14/05/07 08:30:34 INFO Worker: Connecting to master
spark://192.168.35.2:7077...*
* 14/05/07 08:30:34 INFO Worker: Successfully registered with master
spark://192.168.35.2:7077 http://192.168.35.2:7077*



   I have checked that I do not spell configuration  by mistaken, and use
the rsync sync the spark-env.sh file  from the master to the workers. but
it seem that the spark-env.sh do not take effect. I do not know what I have
missed.


missing method in my slf4j after excluding Spark ZK log4j

2014-05-12 Thread Adrian Mocanu
Hey guys,
I've asked before, in Spark 0.9 - I now use 0.9.1, about removing log4j 
dependency and was told that it was gone. However I still find it part of 
zookeeper imports. This is fine since I exclude it myself in the sbt file, but 
another issue arises.
I wonder if anyone else has run into this.

Spark uses log4j v1.2.17 and slf4j-log4j12:1.7.2
I use slf4j 1.7.5, logback 1.0.13, and log4joverslf4j v 1.7.5

I think my slf4j 1.7.5 doesn't agree with what zookeeper expects in its log4j v 
1.2.17 because I get missing method error:
java.lang.NoSuchMethodError: 
org.apache.log4j.Logger.setLevel(Lorg/apache/log4j/Level;)V
at 
org.apache.spark.util.AkkaUtils$$anonfun$createActorSystem$1.apply(AkkaUtils.scala:58)
at 
org.apache.spark.util.AkkaUtils$$anonfun$createActorSystem$1.apply(AkkaUtils.scala:58)
at scala.Option.map(Option.scala:145)
at 
org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:58)
at org.apache.spark.SparkEnv$.create(SparkEnv.scala:126)
at org.apache.spark.SparkContext.init(SparkContext.scala:139)
at 
org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:500)
at 
org.apache.spark.streaming.StreamingContext.init(StreamingContext.scala:76)
...

Is there a way to find out what versions of slf4j I need to make it work with 
log4j 1.2.17?

-Adrian



Re: Variables outside of mapPartitions scope

2014-05-12 Thread pedro
Right now I am not using any class variables (references to this). All my
variables are created within the scope of the method I am running.

I did more debugging and found this strange behavior.
variables here
for loop
mapPartitions call
use variables here
end mapPartitions
endfor

This will result in a serializable bug, but this won't

variables here
for loop
create new references to variables here
mapPartitions call
use new reference variables here
end mapPartitions
endfor



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Variables-outside-of-mapPartitions-scope-tp5517p5528.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: logging in pyspark

2014-05-12 Thread Nicholas Chammas
Ah, yes, that is correct. You need a serializable object one way or the
other.

An alternate suggestion would be to use a combination of
RDD.sample()http://spark.apache.org/docs/latest/api/pyspark/pyspark.rdd.RDD-class.html#sampleand
collect() to take a look at some small amount of data and just log it
from the driver. That's pretty awkward as well, but will spare you having
to make some kind of serializable logger function.


On Wed, May 7, 2014 at 9:32 AM, Diana Carroll dcarr...@cloudera.com wrote:

 foreach vs. map isn't the issue.  Both require serializing the called
 function, so the pickle error would still apply, yes?

 And at the moment, I'm just testing.  Definitely wouldn't want to log
 something for each element, but may want to detect something and log for
 SOME elements.

 So my question is: how are other people doing logging from distributed
 tasks, given the serialization issues?

 The same issue actually exists in Scala, too.  I could work around it by
 creating a small serializable object that provides a logger, but it seems
 kind of kludgy to me, so I'm wondering if other people are logging from
 tasks, and if so, how?

 Diana


 On Tue, May 6, 2014 at 6:24 PM, Nicholas Chammas 
 nicholas.cham...@gmail.com wrote:

 I think you're looking for 
 RDD.foreach()http://spark.apache.org/docs/latest/api/pyspark/pyspark.rdd.RDD-class.html#foreach
 .

 According to the programming 
 guidehttp://spark.apache.org/docs/latest/scala-programming-guide.html
 :

 Run a function func on each element of the dataset. This is usually done
 for side effects such as updating an accumulator variable (see below) or
 interacting with external storage systems.


 Do you really want to log something for each element of your RDD?

 Nick


 On Tue, May 6, 2014 at 3:31 PM, Diana Carroll dcarr...@cloudera.comwrote:

 What should I do if I want to log something as part of a task?

 This is what I tried.  To set up a logger, I followed the advice here:
 http://py4j.sourceforge.net/faq.html#how-to-turn-logging-on-off

 logger = logging.getLogger(py4j)
 logger.setLevel(logging.INFO)
 logger.addHandler(logging.StreamHandler())

 This works fine when I call it from my driver (ie pyspark):
 logger.info(this works fine)

 But I want to try logging within a distributed task so I did this:

 def logTestMap(a):
  logger.info(test)
 return a

 myrdd.map(logTestMap).count()

 and got:
 PicklingError: Can't pickle 'lock' object

 So it's trying to serialize my function and can't because of a lock
 object used in logger, presumably for thread-safeness.  But then...how
 would I do it?  Or is this just a really bad idea?

 Thanks
 Diana






Forcing spark to send exactly one element to each worker node

2014-05-12 Thread NevinLi158
Hi all,

I'm currently trying to use pipe to run C++ code on each worker node, and I
have an RDD of essentially command line arguments that i'm passing to each
node. I want to send exactly one element to each node, but when I run my
code, Spark ends up sending multiple elements to a node: is there any way to
force Spark to send only one? I've tried coalescing and repartitioning the
RDD to be equal to the number of elements in the RDD, but that hasn't
worked.

Thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Forcing-spark-to-send-exactly-one-element-to-each-worker-node-tp5605.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Forcing spark to send exactly one element to each worker node

2014-05-12 Thread NevinLi158
Fixed the problem as soon as I sent this out, sigh. Apparently you can do
this by changing the number of slices to cut the dataset into: I thought
that was identical to the amount of partitions, but apparently not.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Forcing-spark-to-send-exactly-one-element-to-each-worker-node-tp5605p5607.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Turn BLAS on MacOSX

2014-05-12 Thread Xiangrui Meng
Those are warning messages instead of errors. You need to add
netlib-java:all to use native BLAS/LAPACK. But it won't work if you
include netlib-java:all in an assembly jar. It has to be a separate
jar when you submit your job. For SGD, we only use level-1 BLAS, so I
don't think native code is called. -Xiangrui

On Sun, May 11, 2014 at 9:32 AM, DB Tsai dbt...@stanford.edu wrote:
 Hi Debasish,

 In https://github.com/apache/spark/blob/master/docs/mllib-guide.md
 Dependencies section, the document talks about the native blas dependencies
 issue.

 For netlib which breeze uses internally, if the native library isn't found,
 the jblas implementation will be used.

 Here is more detail about how to install native library in different
 platform.
 https://github.com/fommil/netlib-java/blob/master/README.md#machine-optimised-system-libraries


 Sincerely,

 DB Tsai
 ---
 My Blog: https://www.dbtsai.com
 LinkedIn: https://www.linkedin.com/in/dbtsai


 On Wed, May 7, 2014 at 10:52 AM, Debasish Das debasish.da...@gmail.com
 wrote:

 Hi,

 How do I load native BLAS libraries on Mac ?

 I am getting the following errors while running LR and SVM with SGD:

 14/05/07 10:48:13 WARN BLAS: Failed to load implementation from:
 com.github.fommil.netlib.NativeSystemBLAS

 14/05/07 10:48:13 WARN BLAS: Failed to load implementation from:
 com.github.fommil.netlib.NativeRefBLAS

 centos it was fine...but on mac I am getting these warnings..

 Also when it fails to run native blas does it use java code for BLAS
 operations ?

 May be after addition of breeze, we should add these details on a page as
 well so that users are aware of it before they report any performance
 results..

 Thanks.

 Deb




Re: Is their a way to Create SparkContext object?

2014-05-12 Thread Matei Zaharia
You can just pass it around as a parameter.

On May 12, 2014, at 12:37 PM, yh18190 yh18...@gmail.com wrote:

 Hi,
 
 Could anyone suggest an idea how can we create sparkContext object in other
 classes or fucntions where we need to convert a scala collection to RDD
 using sc object.like sc.makeRDD(list).instead of using Main class
 sparkcontext object?
 is their  a way to pass sc object as a parameter to function in other
 classes?
 Please let me know
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Is-their-a-way-to-Create-SparkContext-object-tp5612.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Is their a way to Create SparkContext object?

2014-05-12 Thread yh18190
Hi,

Could anyone suggest an idea how can we create sparkContext object in other
classes or fucntions where we need to convert a scala collection to RDD
using sc object.like sc.makeRDD(list).instead of using Main class
sparkcontext object?
is their  a way to pass sc object as a parameter to function in other
classes?
Please let me know



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Is-their-a-way-to-Create-SparkContext-object-tp5612.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


java.lang.StackOverflowError when calling count()

2014-05-12 Thread Guanhua Yan
Dear Sparkers:

I am using Python spark of version 0.9.0 to implement some iterative
algorithm. I got some errors shown at the end of this email. It seems that
it's due to the Java Stack Overflow error. The same error has been
duplicated on a mac desktop and a linux workstation, both running the same
version of Spark.

The same line of code works correctly after quite some iterations. At the
line of error, rdd__new.count() could be 0. (In some previous rounds, this
was also 0 without any problem).

Any thoughts on this?

Thank you very much,
- Guanhua



CODE:print round, round, rdd__new.count()

  File 
/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd
.py, line 542, in count
14/05/12 16:20:28 INFO TaskSetManager: Loss was due to
java.lang.StackOverflowError [duplicate 1]
return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
14/05/12 16:20:28 ERROR TaskSetManager: Task 8419.0:0 failed 1 times;
aborting job
  File 
/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd
.py, line 533, in sum
14/05/12 16:20:28 INFO TaskSchedulerImpl: Ignoring update with state FAILED
from TID 1774 because its task set is gone
return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
  File 
/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd
.py, line 499, in reduce
vals = self.mapPartitions(func).collect()
  File 
/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd
.py, line 463, in collect
bytesInJava = self._jrdd.collect().iterator()
  File 
/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/lib/py4j-0.
8.1-src.zip/py4j/java_gateway.py, line 537, in __call__
  File 
/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/lib/py4j-0.
8.1-src.zip/py4j/protocol.py, line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o4317.collect.
: org.apache.spark.SparkException: Job aborted: Task 8419.0:1 failed 1 times
(most recent failure: Exception failure: java.lang.StackOverflowError)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$
DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$
DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:5
9)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGSchedu
ler$$abortStage(DAGScheduler.scala:1026)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGSc
heduler.scala:619)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGSc
heduler.scala:619)
at scala.Option.foreach(Option.scala:236)
at 
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$re
ceive$1.applyOrElse(DAGScheduler.scala:207)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDis
patcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1
339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java
:107)

==
The stack overflow error is shown as follows:
==

14/05/12 16:20:28 ERROR Executor: Exception in task ID 1774
java.lang.StackOverflowError
at java.util.zip.Inflater.inflate(Inflater.java:259)
at java.util.zip.InflaterInputStream.read(InflaterInputStream.java:152)
at java.util.zip.GZIPInputStream.read(GZIPInputStream.java:116)
at 
java.io.ObjectInputStream$PeekInputStream.read(ObjectInputStream.java:2310)
at 
java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2
323)
at 
java.io.ObjectInputStream$BlockDataInputStream.readInt(ObjectInputStream.jav
a:2818)
at java.io.ObjectInputStream.readHandle(ObjectInputStream.java:1452)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1511)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at 

Re: Average of each RDD in Stream

2014-05-12 Thread Sean Owen
You mean you normally get an RDD, right?
A DStream is a sequence of RDDs.
It kind of depends on what you are trying to accomplish here?
sum/count for each RDD in the stream?

On Wed, May 7, 2014 at 6:43 PM, Laeeq Ahmed laeeqsp...@yahoo.com wrote:
 Hi,

 I use the following code for calculating average. The problem is that the
 reduce operation return a DStream here and not a tuple as it normally does
 without Streaming. So how can we get the sum and the count from the DStream.
 Can we cast it to tuple?

 val numbers = ssc.textFileStream(args(1))
 val sumandcount = numbers.map(n = (n.toDouble, 1)).reduce{ (a, b) =
 (a._1 + b._1, a._2 + b._2) }
 sumandcount.print()

 Regards,
 Laeeq



Average of each RDD in Stream

2014-05-12 Thread Laeeq Ahmed
Hi,

I use the following code for calculating average. The problem is that the 
reduce operation return a DStream here and not a tuple as it normally does 
without Streaming. So how can we get the sum and the count from the DStream. 
Can we cast it to tuple?


val numbers = ssc.textFileStream(args(1))
    val sumandcount = numbers.map(n = (n.toDouble, 1)).reduce{ (a, b) = (a._1 
+ b._1, a._2 + b._2) }
    sumandcount.print()


Regards,
Laeeq


Re: Forcing spark to send exactly one element to each worker node

2014-05-12 Thread NevinLi158
A few more data points: my current theory is now that spark's piping
mechanism is considerably slower than just running the C++ app directly on
the node.

I ran the C++ application directly on a node in the cluster, and timed the
execution of various parts of the program, and got ~10 seconds to run the
entire thing, with it taking ~6 seconds to run a particular function, 2
seconds to run another function.

I then use Spark's piping mechanism, and got ~180 seconds to run the entire
thing, 120 seconds to run the 6 second function, and 24 seconds to run the 2
second function. I was under the impression that pipe() would just run the
C++ application on the remote node: is the application supposed to run
slower if you use pipe() to execute it?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Forcing-spark-to-send-exactly-one-element-to-each-worker-node-tp5605p5620.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: java.lang.NoSuchMethodError on Java API

2014-05-12 Thread Madhu
I was able to compile your code in Eclipse.
I ran it using the data in your comments, but I also see the
NoSuchMethodError you mentioned.

It seems to run fine until the call to calculateZVector(...)
It appears that org.apache.commons.math3.util.Pair is not Serializable, so
that's one potential problem.
I created a Serializable version of Pair, but that wasn't enough.

Commenting this code:

zVectorRaw.reduce(new Function2Tuple2lt;Integer, Double, Tuple2Integer,
Double, Tuple2Integer, Double() {
@Override
public Tuple2Integer, Double call(Tuple2Integer, Double
integerDoubleTuple2, Tuple2Integer, Double integerDoubleTuple22) throws
Exception {
return null;
}
});

Avoids the NoSuchMethodError, so that might be part of your problem.

Then I get a NotSerializableException, so my guess is there's a reference to
something else that's not serializable in that's referenced in that method.
There's a lot of stuff going on in that method, so it's not easy for me to
follow.

I would break it down to more manageable pieces and build it up one step at
a time.

Sorry I couldn't find the problem.



-
Madhu
https://www.linkedin.com/in/msiddalingaiah
--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-NoSuchMethodError-on-Java-API-tp5545p5623.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: missing method in my slf4j after excluding Spark ZK log4j

2014-05-12 Thread Sean Owen
It sounds like you are doing everything right.

NoSuchMethodError suggests it's finding log4j, just not the right
version. That method is definitely in 1.2; it might have been removed
in 2.x? (http://logging.apache.org/log4j/2.x/manual/migration.html)

So I wonder if something is sneaking in log4j 2.x in your app? that's
a first guess.

I'd say consult mvn dependency:tree, but you're on sbt and I don't
know the equivalent.

On Mon, May 12, 2014 at 3:51 PM, Adrian Mocanu
amoc...@verticalscope.com wrote:
 Hey guys,

 I've asked before, in Spark 0.9 - I now use 0.9.1, about removing log4j
 dependency and was told that it was gone. However I still find it part of
 zookeeper imports. This is fine since I exclude it myself in the sbt file,
 but another issue arises.

 I wonder if anyone else has run into this.



 Spark uses log4j v1.2.17 and slf4j-log4j12:1.7.2

 I use slf4j 1.7.5, logback 1.0.13, and log4joverslf4j v 1.7.5



 I think my slf4j 1.7.5 doesn't agree with what zookeeper expects in its
 log4j v 1.2.17 because I get missing method error:

 java.lang.NoSuchMethodError:
 org.apache.log4j.Logger.setLevel(Lorg/apache/log4j/Level;)V

 at
 org.apache.spark.util.AkkaUtils$$anonfun$createActorSystem$1.apply(AkkaUtils.scala:58)

 at
 org.apache.spark.util.AkkaUtils$$anonfun$createActorSystem$1.apply(AkkaUtils.scala:58)

 at scala.Option.map(Option.scala:145)

 at
 org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:58)

 at org.apache.spark.SparkEnv$.create(SparkEnv.scala:126)

 at
 org.apache.spark.SparkContext.init(SparkContext.scala:139)

 at
 org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:500)

 at
 org.apache.spark.streaming.StreamingContext.init(StreamingContext.scala:76)

 ...



 Is there a way to find out what versions of slf4j I need to make it work
 with log4j 1.2.17?



 -Adrian




Unexpected results when caching data

2014-05-12 Thread paul
I have been experimenting with a data set with and without persisting the RDD
and have come across some unexpected results.  The files we are reading are
Avro files so we are using the following to define the RDD, what we end up
with is a RDD[CleansedLogFormat]:

 val f = new NewHadoopRDD(sc,
  classOf[AvroKeyInputFormat[CleansedLogFormat]],
  classOf[AvroKey[CleansedLogFormat]],
  classOf[NullWritable],
  job.getConfiguration).map(_._1.datum())

f.count()
= 110268763

f.persist(StorageLevel.MEMORY_AND_DISK).count()
= 110268763

So far so good.  Both the persisted and non-persisted RDDs return the same
results for the count.  Where things get weird is when I try and do some
reduce by key or other grouping operations.  Something like:

f.map(record = (record.getProviderId.toString,
record)).join(bandwidthKv).map { pair =
val hour = new
DateTime(pair._2._1.getTimeStamp).toString(MMddHH)
(hour, Set(pair._2._1.getGuid))
  }.reduceByKey(_ ++ _).collect().foreach { a = println(a._1 + :  +
a._2.size)}


We then get different results from the non-persisted vs. the persisted
version.

Non-persisted:
2014050917: 7
2014050918: 42

Persisted:
2014050917: 7
2014050918: 12

Any idea what could account for the differences?  BTW I am using Spark
0.9.1.

Thanks,

Paul 





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Unexpected-results-when-caching-data-tp5619.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark to utilize HDFS's mmap caching

2014-05-12 Thread Marcelo Vanzin
Is that true? I believe that API Chanwit is talking about requires
explicitly asking for files to be cached in HDFS.

Spark automatically benefits from the kernel's page cache (i.e. if
some block is in the kernel's page cache, it will be read more
quickly). But the explicit HDFS cache is a different thing; Spark
applications that want to use it would have to explicitly call the
respective HDFS APIs.

On Sun, May 11, 2014 at 11:04 PM, Matei Zaharia matei.zaha...@gmail.com wrote:
 Yes, Spark goes through the standard HDFS client and will automatically 
 benefit from this.

 Matei

 On May 8, 2014, at 4:43 AM, Chanwit Kaewkasi chan...@gmail.com wrote:

 Hi all,

 Can Spark (0.9.x) utilize the caching feature in HDFS 2.3 via
 sc.textFile() and other HDFS-related APIs?

 http://hadoop.apache.org/docs/r2.3.0/hadoop-project-dist/hadoop-hdfs/CentralizedCacheManagement.html

 Best regards,

 -chanwit

 --
 Chanwit Kaewkasi
 linkedin.com/in/chanwit




-- 
Marcelo


Re: missing method in my slf4j after excluding Spark ZK log4j

2014-05-12 Thread Tathagata Das
This gives dependency tree in SBT (spark uses this).
https://github.com/jrudolph/sbt-dependency-graph

TD


On Mon, May 12, 2014 at 4:55 PM, Sean Owen so...@cloudera.com wrote:

 It sounds like you are doing everything right.

 NoSuchMethodError suggests it's finding log4j, just not the right
 version. That method is definitely in 1.2; it might have been removed
 in 2.x? (http://logging.apache.org/log4j/2.x/manual/migration.html)

 So I wonder if something is sneaking in log4j 2.x in your app? that's
 a first guess.

 I'd say consult mvn dependency:tree, but you're on sbt and I don't
 know the equivalent.

 On Mon, May 12, 2014 at 3:51 PM, Adrian Mocanu
 amoc...@verticalscope.com wrote:
  Hey guys,
 
  I've asked before, in Spark 0.9 - I now use 0.9.1, about removing log4j
  dependency and was told that it was gone. However I still find it part of
  zookeeper imports. This is fine since I exclude it myself in the sbt
 file,
  but another issue arises.
 
  I wonder if anyone else has run into this.
 
 
 
  Spark uses log4j v1.2.17 and slf4j-log4j12:1.7.2
 
  I use slf4j 1.7.5, logback 1.0.13, and log4joverslf4j v 1.7.5
 
 
 
  I think my slf4j 1.7.5 doesn't agree with what zookeeper expects in its
  log4j v 1.2.17 because I get missing method error:
 
  java.lang.NoSuchMethodError:
  org.apache.log4j.Logger.setLevel(Lorg/apache/log4j/Level;)V
 
  at
 
 org.apache.spark.util.AkkaUtils$$anonfun$createActorSystem$1.apply(AkkaUtils.scala:58)
 
  at
 
 org.apache.spark.util.AkkaUtils$$anonfun$createActorSystem$1.apply(AkkaUtils.scala:58)
 
  at scala.Option.map(Option.scala:145)
 
  at
  org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:58)
 
  at org.apache.spark.SparkEnv$.create(SparkEnv.scala:126)
 
  at
  org.apache.spark.SparkContext.init(SparkContext.scala:139)
 
  at
 
 org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:500)
 
  at
 
 org.apache.spark.streaming.StreamingContext.init(StreamingContext.scala:76)
 
  ...
 
 
 
  Is there a way to find out what versions of slf4j I need to make it work
  with log4j 1.2.17?
 
 
 
  -Adrian
 
 



Re: streaming on hdfs can detected all new file, but the sum of all the rdd.count() not equals which had detected

2014-05-12 Thread Tathagata Das
A very crucial thing to remember when using file stream is that the files
must be written to the monitored directory atomically. That is when the
file system show the file in its listing, the file should not be appended /
updated after that. That often causes this kind of issues, as spark
streaming may the file (soon after it is visible in the listing) and may
try to process it even before all of the data has been written.

So the best way to feed data into spark streaming is to write the file to a
temp dir, and them move / rename them into the monitored directory.
That makes it atomic. This is mentioned in the API docs of
fileStreamhttp://spark.apache.org/docs/0.9.1/api/streaming/index.html#org.apache.spark.streaming.StreamingContext
.

TD



On Sun, May 11, 2014 at 7:30 PM, zqf12345 zqf12...@gmail.comwrote:

 when I put 200 png files to Hdfs , I found sparkStreaming counld detect 200
 files , but the sum of rdd.count() is less than 200, always between 130 and
 170, I don't know why...Is this a Bug?
 PS: When I put 200 files in hdfs before streaming run , It get the correct
 count and right result.

 Here is the code:

 def main(args: Array[String]) {
 val conf = new SparkConf().setMaster(SparkURL)
 .setAppName(QimageStreaming-broadcast)
 .setSparkHome(System.getenv(SPARK_HOME))
 .setJars(SparkContext.jarOfClass(this.getClass()))
 conf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer)
 conf.set(spark.kryo.registrator, qing.hdu.Image.MyRegistrator)
 conf.set(spark.kryoserializer.buffer.mb, 10);
 val ssc = new StreamingContext(conf, Seconds(2))
 val inputFormatClass = classOf[QimageInputFormat[Text, Qimage]]
 val outputFormatClass = classOf[QimageOutputFormat[Text, Qimage]]
 val input_path = HdfsURL + /Qimage/input
 val output_path = HdfsURL + /Qimage/output/
 val bg_path = HdfsURL + /Qimage/bg/
 val bg = ssc.sparkContext.newAPIHadoopFile[Text, Qimage,
 QimageInputFormat[Text, Qimage]](bg_path)
 val bbg = bg.map(data = (data._1.toString(), data._2))
 val broadcastbg = ssc.sparkContext.broadcast(bbg)
 val file = ssc.fileStream[Text, Qimage, QimageInputFormat[Text,
 Qimage]](input_path)
 val qingbg = broadcastbg.value.collectAsMap
 val foreachFunc = (rdd: RDD[(Text, Qimage)], time: Time) = {
 val rddnum = rdd.count
 System.out.println(\n\n+ rddnum is  + rddnum + \n\n)
 if (rddnum  0)
 {
 System.out.println(here is foreachFunc)
 val a = rdd.keys
 val b = a.first
  val cbg = qingbg.get(getbgID(b)).getOrElse(new Qimage)
 rdd.map(data = (data._1, (new QimageProc(data._1, data._2)).koutu(cbg)))
 .saveAsNewAPIHadoopFile(output_path, classOf[Text], classOf[Qimage],
 outputFormatClass) }
 }
 file.foreachRDD(foreachFunc)
 ssc.start()
 ssc.awaitTermination()
 }



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/streaming-on-hdfs-can-detected-all-new-file-but-the-sum-of-all-the-rdd-count-not-equals-which-had-ded-tp5572.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: java.lang.ClassNotFoundException

2014-05-12 Thread Archit Thakur
Hi Joe,

Your messages are going into spam folder for me.

Thx, Archit_Thakur.


On Fri, May 2, 2014 at 9:22 AM, Joe L selme...@yahoo.com wrote:

 Hi, You should include the jar file of your project. for example:
 conf.set(yourjarfilepath.jar)

 Joe
   On Friday, May 2, 2014 7:39 AM, proofmoore [via Apache Spark User List]
 [hidden email] wrote:
   HelIo. I followed A Standalone App in Java part of the tutorial
 https://spark.apache.org/docs/0.8.1/quick-start.html

 Spark standalone cluster looks it's running without a problem :
 http://i.stack.imgur.com/7bFv8.png

 I have built a fat jar for running this JavaApp on the cluster. Before
 maven package:

 find .

 ./pom.xml
 ./src
 ./src/main
 ./src/main/java
 ./src/main/java/SimpleApp.java


 content of SimpleApp.java is :

  import org.apache.spark.api.java.*;
  import org.apache.spark.api.java.function.Function;
  import org.apache.spark.SparkConf;
  import org.apache.spark.SparkContext;


  public class SimpleApp {
  public static void main(String[] args) {

  SparkConf conf =  new SparkConf()
.setMaster(spark://10.35.23.13:7077)
.setAppName(My app)
.set(spark.executor.memory, 1g);

  JavaSparkContext   sc = new JavaSparkContext (conf);
  String logFile = /home/ubuntu/spark-0.9.1/test_data;
  JavaRDDString logData = sc.textFile(logFile).cache();

  long numAs = logData.filter(new FunctionString, Boolean() {
   public Boolean call(String s) { return s.contains(a); }
  }).count();

  System.out.println(Lines with a:  + numAs);
  }
  }

 This program only works when master is set as setMaster(local).
 Otherwise I get this error : http://i.stack.imgur.com/doRSn.png

 Thanks,
 Ibrahim


 --
  If you reply to this email, your message will be added to the discussion
 below:

 http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-ClassNotFoundException-tp5191.html
  To start a new topic under Apache Spark User List, email [hidden email]
 To unsubscribe from Apache Spark User List, click here.
 NAML



 --
 View this message in context: Re: java.lang.ClassNotFoundException
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: How to read a multipart s3 file?

2014-05-12 Thread Aaron Davidson
One way to ensure Spark writes more partitions is by using
RDD#repartition() to make each partition smaller. One Spark partition
always corresponds to one file in the underlying store, and it's usually a
good idea to have each partition size range somewhere between 64 MB to 256
MB. Too few partitions leads to other problems, such as too little
concurrency -- Spark can only run as many tasks as there are partitions, so
if you don't have enough partitions, your cluster will be underutilized.


On Tue, May 6, 2014 at 7:07 PM, kamatsuoka ken...@gmail.com wrote:

 Yes, I'm using s3:// for both. I was using s3n:// but I got frustrated by
 how
 slow it is at writing files. In particular the phases where it moves the
 temporary files to their permanent location takes as long as writing the
 file itself.  I can't believe anyone uses this.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-read-a-multipart-s3-file-tp5463p5470.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Spark LIBLINEAR

2014-05-12 Thread DB Tsai
It seems that the code isn't managed in github. Can be downloaded from
http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/distributed-liblinear/spark/spark-liblinear-1.94.zip

It will be easier to track the changes in github.



Sincerely,

DB Tsai
---
My Blog: https://www.dbtsai.com
LinkedIn: https://www.linkedin.com/in/dbtsai


On Mon, May 12, 2014 at 7:53 AM, Xiangrui Meng men...@gmail.com wrote:

 Hi Chieh-Yen,

 Great to see the Spark implementation of LIBLINEAR! We will definitely
 consider adding a wrapper in MLlib to support it. Is the source code
 on github?

 Deb, Spark LIBLINEAR uses BSD license, which is compatible with Apache.

 Best,
 Xiangrui

 On Sun, May 11, 2014 at 10:29 AM, Debasish Das debasish.da...@gmail.com
 wrote:
  Hello Prof. Lin,
 
  Awesome news ! I am curious if you have any benchmarks comparing C++ MPI
  with Scala Spark liblinear implementations...
 
  Is Spark Liblinear apache licensed or there are any specific
 restrictions on
  using it ?
 
  Except using native blas libraries (which each user has to manage by
 pulling
  in their best proprietary BLAS package), all Spark code is Apache
 licensed.
 
  Thanks.
  Deb
 
 
  On Sun, May 11, 2014 at 3:01 AM, DB Tsai dbt...@stanford.edu wrote:
 
  Dear Prof. Lin,
 
  Interesting! We had an implementation of L-BFGS in Spark and already
  merged in the upstream now.
 
  We read your paper comparing TRON and OWL-QN for logistic regression
 with
  L1 (http://www.csie.ntu.edu.tw/~cjlin/papers/l1.pdf), but it seems
 that it's
  not in the distributed setup.
 
  Will be very interesting to know the L2 logistic regression benchmark
  result in Spark with your TRON optimizer and the L-BFGS optimizer
 against
  different datasets (sparse, dense, and wide, etc).
 
  I'll try your TRON out soon.
 
 
  Sincerely,
 
  DB Tsai
  ---
  My Blog: https://www.dbtsai.com
  LinkedIn: https://www.linkedin.com/in/dbtsai
 
 
  On Sun, May 11, 2014 at 1:49 AM, Chieh-Yen r01944...@csie.ntu.edu.tw
  wrote:
 
  Dear all,
 
  Recently we released a distributed extension of LIBLINEAR at
 
  http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/distributed-liblinear/
 
  Currently, TRON for logistic regression and L2-loss SVM is supported.
  We provided both MPI and Spark implementations.
  This is very preliminary so your comments are very welcome.
 
  Thanks,
  Chieh-Yen
 
 
 



Re: Bug when zip with longs and too many partitions?

2014-05-12 Thread Michael Malak


I've discovered that it was noticed a year ago that RDD zip() does not work 
when the number of partitions does not evenly divide the total number of 
elements in the RDD:

https://groups.google.com/forum/#!msg/spark-users/demrmjHFnoc/Ek3ijiXHr2MJ

I will enter a JIRA ticket just as soon as the ASF Jira system will let me 
reset my password.



On Sunday, May 11, 2014 4:40 AM, Michael Malak michaelma...@yahoo.com wrote:

Is this a bug?

scala sc.parallelize(1 to 2,4).zip(sc.parallelize(11 to 12,4)).collect
res0: Array[(Int, Int)] = Array((1,11), (2,12))

scala sc.parallelize(1L to 2L,4).zip(sc.parallelize(11 to 12,4)).collect
res1: Array[(Long, Int)] = Array((2,11))


Re: build shark(hadoop CDH5) on hadoop2.0.0 CDH4

2014-05-12 Thread Sophia
Hi 
Why I always confront remoting error:
akka.remote.remoteTransportException and
java.util.concurrent.timeoutException?
Best Regards,



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/build-shark-hadoop-CDH5-on-hadoop2-0-0-CDH4-tp5574p5629.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: pySpark memory usage

2014-05-12 Thread Jim Blomo
Thanks, Aaron, this looks like a good solution!  Will be trying it out shortly.

I noticed that the S3 exception seem to occur more frequently when the
box is swapping.  Why is the box swapping?  combineByKey seems to make
the assumption that it can fit an entire partition in memory when
doing the combineLocally step.  I'm going to try to break this apart
but will need some sort of heuristic options include looking at memory
usage via the resource module and trying to keep below
'spark.executor.memory', or using batchSize to limit the number of
entries in the dictionary.  Let me know if you have any opinions.

On Sun, May 4, 2014 at 8:02 PM, Aaron Davidson ilike...@gmail.com wrote:
 I'd just like to update this thread by pointing to the PR based on our
 initial design: https://github.com/apache/spark/pull/640

 This solution is a little more general and avoids catching IOException
 altogether. Long live exception propagation!


 On Mon, Apr 28, 2014 at 1:28 PM, Patrick Wendell pwend...@gmail.com wrote:

 Hey Jim,

 This IOException thing is a general issue that we need to fix and your
 observation is spot-in. There is actually a JIRA for it here I created a few
 days ago:
 https://issues.apache.org/jira/browse/SPARK-1579

 Aaron is assigned on that one but not actively working on it, so we'd
 welcome a PR from you on this if you are interested.

 The first thought we had was to set a volatile flag when the reader sees
 an exception (indicating there was a failure in the task) and avoid
 swallowing the IOException in the writer if this happens. But I think there
 is a race here where the writer sees the error first before the reader knows
 what is going on.

 Anyways maybe if you have a simpler solution you could sketch it out in
 the JIRA and we could talk over there. The current proposal in the JIRA is
 somewhat complicated...

 - Patrick






 On Mon, Apr 28, 2014 at 1:01 PM, Jim Blomo jim.bl...@gmail.com wrote:

 FYI, it looks like this stdin writer to Python finished early error was
 caused by a break in the connection to S3, from which the data was being
 pulled.  A recent commit to PythonRDD noted that the current exception
 catching can potentially mask an exception for the data source, and that is
 indeed what I see happening.  The underlying libraries (jets3t and
 httpclient) do have retry capabilities, but I don't see a great way of
 setting them through Spark code.  Instead I added the patch below which
 kills the worker on the exception.  This allows me to completely load the
 data source after a few worker retries.

 Unfortunately, java.net.SocketException is the same error that is
 sometimes expected from the client when using methods like take().  One
 approach around this conflation is to create a new locally scoped exception
 class, eg. WriterException, catch java.net.SocketException during output
 writing, then re-throw the new exception.  The worker thread could then
 distinguish between the reasons java.net.SocketException might be thrown.
 Perhaps there is a more elegant way to do this in Scala, though?

 Let me know if I should open a ticket or discuss this on the developers
 list instead.  Best,

 Jim

 diff --git
 a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
 b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
 index 0d71fdb..f31158c 100644
 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
 +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
 @@ -95,6 +95,12 @@ private[spark] class PythonRDD[T: ClassTag](
  readerException = e
  Try(worker.shutdownOutput()) // kill Python worker process

 +  case e: java.net.SocketException =
 +   // This can happen if a connection to the datasource, eg S3,
 resets
 +   // or is otherwise broken
 +readerException = e
 +Try(worker.shutdownOutput()) // kill Python worker process
 +
case e: IOException =
  // This can happen for legitimate reasons if the Python code
 stops returning data
  // before we are done passing elements through, e.g., for
 take(). Just log a message to


 On Wed, Apr 9, 2014 at 7:04 PM, Jim Blomo jim.bl...@gmail.com wrote:

 This dataset is uncompressed text at ~54GB. stats() returns (count:
 56757667, mean: 1001.68740583, stdev: 601.775217822, max: 8965, min:
 343)

 On Wed, Apr 9, 2014 at 6:59 PM, Matei Zaharia matei.zaha...@gmail.com
 wrote:
  Okay, thanks. Do you have any info on how large your records and data
  file are? I'd like to reproduce and fix this.
 
  Matei
 
  On Apr 9, 2014, at 3:52 PM, Jim Blomo jim.bl...@gmail.com wrote:
 
  Hi Matei, thanks for working with me to find these issues.
 
  To summarize, the issues I've seen are:
  0.9.0:
  - https://issues.apache.org/jira/browse/SPARK-1323
 
  SNAPSHOT 2014-03-18:
  - When persist() used and batchSize=1, java.lang.OutOfMemoryError:
  Java heap space.  To me this indicates a memory 

Re: Average of each RDD in Stream

2014-05-12 Thread Tathagata Das
Use DStream.foreachRDD to do an operation on the final RDD of every batch.

val sumandcount = numbers.map(n = (n.toDouble, 1)).reduce{ (a, b) = (a._1
+ b._1, a._2 + b._2) }
sumandcount.foreachRDD { rdd = val first: (Double, Int) = rdd.take(1)  ;
... }

DStream.reduce creates DStream whose RDDs have just one tuple each. The
rdd.take(1) above gets that one tuple.
However note that there is a corner case in this approach. If in a
particular batch, there is not data, then the rdd will have zero elements
(no data, nothing to reduce). So you have to take that into account (maybe
do a rdd.collect(), check the size, and then get the first / only element).

TD



On Wed, May 7, 2014 at 7:59 AM, Laeeq Ahmed laeeqsp...@yahoo.com wrote:

 Hi,

 I use the following code for calculating average. The problem is that the
 reduce operation return a DStream here and not a tuple as it normally does
 without Streaming. So how can we get the sum and the count from the
 DStream. Can we cast it to tuple?

 val numbers = ssc.textFileStream(args(1))
 val sumandcount = numbers.map(n = (n.toDouble, 1)).reduce{ (a, b) =
 (a._1 + b._1, a._2 + b._2) }
 sumandcount.print()

 Regards,
 Laeeq




Re: Proper way to stop Spark stream processing

2014-05-12 Thread Tathagata Das
Since you are using the latest Spark code and not Spark 0.9.1 (guessed from
the log messages), you can actually do graceful shutdown of a streaming
context. This ensures that the receivers are properly stopped and all
received data is processed and then the system terminates (stop() stays
blocked until then. See other variations of streamingContext.stop().

TD

On Mon, May 12, 2014 at 2:49 AM, Tobias Pfeiffer t...@preferred.jp wrote:

 Hello,

 I am trying to implement something like process a stream for N
 seconds, then return a result with Spark Streaming (built from git
 head). My approach (which is probably not very elegant) is

 val ssc = new StreamingContext(...)
 ssc.start()
 future {
   Thread.sleep(Seconds(N))
   ssc.stop(true)
 }
 ssc.awaitTermination()

 and in fact, this stops the stream processing. However, I get the
 following error messages:

 14/05/12 18:41:49 ERROR scheduler.ReceiverTracker: Deregistered
 receiver for stream 0: Stopped by driver
 14/05/12 18:41:49 ERROR scheduler.ReceiverTracker: Deregistered
 receiver for stream 0: Restarting receiver with delay 2000ms: Retrying
 connecting to localhost:
 14/05/12 18:41:50 ERROR network.ConnectionManager: Corresponding
 SendingConnectionManagerId not found
 14/05/12 18:41:50 ERROR network.ConnectionManager: Corresponding
 SendingConnectionManagerId not found

 (where localhost: is the source I am reading the stream from).
 This doesn't actually seem like the proper way to do it. Can anyone
 point me to how to implement stop after N seconds without these
 error messages?

 Thanks
 Tobias



Re: Local Dev Env with Mesos + Spark Streaming on Docker: Can't submit jobs.

2014-05-12 Thread Tim St Clair
Jacob  Gerard - 

You might find the link below useful: 

http://rrati.github.io/blog/2014/05/07/apache-hadoop-plus-docker-plus-fedora-running-images/
 

For non-reverse-dns apps, NAT is your friend. 

Cheers, 
Tim 

- Original Message -

 From: Jacob Eisinger jeis...@us.ibm.com
 To: user@spark.apache.org
 Sent: Tuesday, May 6, 2014 8:30:23 AM
 Subject: Re: Local Dev Env with Mesos + Spark Streaming on Docker: Can't
 submit jobs.

 Howdy,

 You might find the discussion Andrew and I have been having about Docker and
 network security [1] applicable.

 Also, I posted an answer [2] to your stackoverflow question.

 [1]
 http://apache-spark-user-list.1001560.n3.nabble.com/spark-shell-driver-interacting-with-Workers-in-YARN-mode-firewall-blocking-communication-tp5237p5441.html
 [2]
 http://stackoverflow.com/questions/23410505/how-to-run-hdfs-cluster-without-dns/23495100#23495100

 Jacob D. Eisinger
 IBM Emerging Technologies
 jeis...@us.ibm.com - (512) 286-6075

 Gerard Maas ---05/05/2014 04:18:08 PM---Hi Benjamin, Yes, we initially used a
 modified version of the AmpLabs docker scripts

 From: Gerard Maas gerard.m...@gmail.com
 To: user@spark.apache.org
 Date: 05/05/2014 04:18 PM
 Subject: Re: Local Dev Env with Mesos + Spark Streaming on Docker: Can't
 submit jobs.

 Hi Benjamin,

 Yes, we initially used a modified version of the AmpLabs docker scripts [1].
 The amplab docker images are a good starting point.
 One of the biggest hurdles has been HDFS, which requires reverse-DNS and I
 didn't want to go the dnsmasq route to keep the containers relatively simple
 to use without the need of external scripts. Ended up running a 1-node setup
 nnode+dnode. I'm still looking for a better solution for HDFS [2]

 Our usecase using docker is to easily create local dev environments both for
 development and for automated functional testing (using cucumber). My aim is
 to strongly reduce the time of the develop-deploy-test cycle.
 That also means that we run the minimum number of instances required to have
 a functionally working setup. E.g. 1 Zookeeper, 1 Kafka broker, ...

 For the actual cluster deployment we have Chef-based devops toolchain that
 put things in place on public cloud providers.
 Personally, I think Docker rocks and would like to replace those complex
 cookbooks with Dockerfiles once the technology is mature enough.

 -greetz, Gerard.

 [1] https://github.com/amplab/docker-scripts
 [2]
 http://stackoverflow.com/questions/23410505/how-to-run-hdfs-cluster-without-dns

 On Mon, May 5, 2014 at 11:00 PM, Benjamin  bboui...@gmail.com  wrote: Hi,

 Before considering running on Mesos, did you try to submit the application on
 Spark deployed without Mesos on Docker containers ?

 Currently investigating this idea to deploy quickly a complete set of
 clusters with Docker, I'm interested by your findings on sharing the
 settings of Kafka and Zookeeper across nodes. How many broker and zookeeper
 do you use ?

 Regards,

 On Mon, May 5, 2014 at 10:11 PM, Gerard Maas  gerard.m...@gmail.com  wrote:
 Hi all,

 I'm currently working on creating a set of docker images to facilitate local
 development with Spark/streaming on Mesos (+zk, hdfs, kafka)

 After solving the initial hurdles to get things working together in docker
 containers, now everything seems to start-up correctly and the mesos UI
 shows slaves as they are started.

 I'm trying to submit a job from IntelliJ and the jobs submissions seem to get
 lost in Mesos translation. The logs are not helping me to figure out what's
 wrong, so I'm posting them here in the hope that they can ring a bell and
 somebdoy could provide me a hint on what's wrong/missing with my setup.

  DRIVER (IntelliJ running a Job.scala main) 
 14/05/05 21:52:31 INFO MetadataCleaner: Ran metadata cleaner for
 SHUFFLE_BLOCK_MANAGER
 14/05/05 21:52:31 INFO BlockManager: Dropping broadcast blocks older than
 1399319251962
 14/05/05 21:52:31 INFO BlockManager: Dropping non broadcast blocks older than
 1399319251962
 14/05/05 21:52:31 INFO MetadataCleaner: Ran metadata cleaner for
 BROADCAST_VARS
 14/05/05 21:52:31 INFO MetadataCleaner: Ran metadata cleaner for
 BLOCK_MANAGER
 14/05/05 21:52:32 INFO MetadataCleaner: Ran metadata cleaner for
 HTTP_BROADCAST
 14/05/05 21:52:32 INFO MetadataCleaner: Ran metadata cleaner for
 MAP_OUTPUT_TRACKER
 14/05/05 21:52:32 INFO MetadataCleaner: Ran metadata cleaner for
 SPARK_CONTEXT

  MESOS MASTER 
 I0505 19:52:39.718080 388 master.cpp:690] Registering framework
 201405051517-67113388-5050-383-6995 at scheduler(1)@ 127.0.1.1:58115
 I0505 19:52:39.718261 388 master.cpp:493] Framework
 201405051517-67113388-5050-383-6995 disconnected
 I0505 19:52:39.718277 389 hierarchical_allocator_process.hpp:332] Added
 framework 201405051517-67113388-5050-383-6995
 I0505 19:52:39.718312 388 master.cpp:520] Giving framework
 201405051517-67113388-5050-383-6995 0ns to failover
 I0505 19:52:39.718431 389 hierarchical_allocator_process.hpp:408] 

Re: pySpark memory usage

2014-05-12 Thread Matei Zaharia
Hey Jim, unfortunately external spilling is not implemented in Python right 
now. While it would be possible to update combineByKey to do smarter stuff 
here, one simple workaround you can try is to launch more map tasks (or more 
reduce tasks). To set the minimum number of map tasks, you can pass it as a 
second argument to textFile and such (e.g. sc.textFile(“s3n://foo.txt”, 1000)).

Matei

On May 12, 2014, at 5:47 PM, Jim Blomo jim.bl...@gmail.com wrote:

 Thanks, Aaron, this looks like a good solution!  Will be trying it out 
 shortly.
 
 I noticed that the S3 exception seem to occur more frequently when the
 box is swapping.  Why is the box swapping?  combineByKey seems to make
 the assumption that it can fit an entire partition in memory when
 doing the combineLocally step.  I'm going to try to break this apart
 but will need some sort of heuristic options include looking at memory
 usage via the resource module and trying to keep below
 'spark.executor.memory', or using batchSize to limit the number of
 entries in the dictionary.  Let me know if you have any opinions.
 
 On Sun, May 4, 2014 at 8:02 PM, Aaron Davidson ilike...@gmail.com wrote:
 I'd just like to update this thread by pointing to the PR based on our
 initial design: https://github.com/apache/spark/pull/640
 
 This solution is a little more general and avoids catching IOException
 altogether. Long live exception propagation!
 
 
 On Mon, Apr 28, 2014 at 1:28 PM, Patrick Wendell pwend...@gmail.com wrote:
 
 Hey Jim,
 
 This IOException thing is a general issue that we need to fix and your
 observation is spot-in. There is actually a JIRA for it here I created a few
 days ago:
 https://issues.apache.org/jira/browse/SPARK-1579
 
 Aaron is assigned on that one but not actively working on it, so we'd
 welcome a PR from you on this if you are interested.
 
 The first thought we had was to set a volatile flag when the reader sees
 an exception (indicating there was a failure in the task) and avoid
 swallowing the IOException in the writer if this happens. But I think there
 is a race here where the writer sees the error first before the reader knows
 what is going on.
 
 Anyways maybe if you have a simpler solution you could sketch it out in
 the JIRA and we could talk over there. The current proposal in the JIRA is
 somewhat complicated...
 
 - Patrick
 
 
 
 
 
 
 On Mon, Apr 28, 2014 at 1:01 PM, Jim Blomo jim.bl...@gmail.com wrote:
 
 FYI, it looks like this stdin writer to Python finished early error was
 caused by a break in the connection to S3, from which the data was being
 pulled.  A recent commit to PythonRDD noted that the current exception
 catching can potentially mask an exception for the data source, and that is
 indeed what I see happening.  The underlying libraries (jets3t and
 httpclient) do have retry capabilities, but I don't see a great way of
 setting them through Spark code.  Instead I added the patch below which
 kills the worker on the exception.  This allows me to completely load the
 data source after a few worker retries.
 
 Unfortunately, java.net.SocketException is the same error that is
 sometimes expected from the client when using methods like take().  One
 approach around this conflation is to create a new locally scoped exception
 class, eg. WriterException, catch java.net.SocketException during output
 writing, then re-throw the new exception.  The worker thread could then
 distinguish between the reasons java.net.SocketException might be thrown.
 Perhaps there is a more elegant way to do this in Scala, though?
 
 Let me know if I should open a ticket or discuss this on the developers
 list instead.  Best,
 
 Jim
 
 diff --git
 a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
 b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
 index 0d71fdb..f31158c 100644
 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
 +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
 @@ -95,6 +95,12 @@ private[spark] class PythonRDD[T: ClassTag](
 readerException = e
 Try(worker.shutdownOutput()) // kill Python worker process
 
 +  case e: java.net.SocketException =
 +   // This can happen if a connection to the datasource, eg S3,
 resets
 +   // or is otherwise broken
 +readerException = e
 +Try(worker.shutdownOutput()) // kill Python worker process
 +
   case e: IOException =
 // This can happen for legitimate reasons if the Python code
 stops returning data
 // before we are done passing elements through, e.g., for
 take(). Just log a message to
 
 
 On Wed, Apr 9, 2014 at 7:04 PM, Jim Blomo jim.bl...@gmail.com wrote:
 
 This dataset is uncompressed text at ~54GB. stats() returns (count:
 56757667, mean: 1001.68740583, stdev: 601.775217822, max: 8965, min:
 343)
 
 On Wed, Apr 9, 2014 at 6:59 PM, Matei Zaharia matei.zaha...@gmail.com
 wrote:
 Okay, thanks. 

Re: missing method in my slf4j after excluding Spark ZK log4j

2014-05-12 Thread Paul Brown
Hi, Adrian --

If my memory serves, you need 1.7.7 of the various slf4j modules to avoid
that issue.

Best.
-- Paul

—
p...@mult.ifario.us | Multifarious, Inc. | http://mult.ifario.us/


On Mon, May 12, 2014 at 7:51 AM, Adrian Mocanu amoc...@verticalscope.comwrote:

  Hey guys,

 I've asked before, in Spark 0.9 - I now use 0.9.1, about removing log4j
 dependency and was told that it was gone. However I still find it part of
 zookeeper imports. This is fine since I exclude it myself in the sbt file,
 but another issue arises.

 I wonder if anyone else has run into this.



 Spark uses log4j v1.2.17 and slf4j-log4j12:1.7.2

 I use slf4j 1.7.5, logback 1.0.13, and log4joverslf4j v 1.7.5



 I think my slf4j 1.7.5 doesn't agree with what zookeeper expects in its
 log4j v 1.2.17 because I get missing method error:

 java.lang.NoSuchMethodError:
 org.apache.log4j.Logger.setLevel(Lorg/apache/log4j/Level;)V

 at
 org.apache.spark.util.AkkaUtils$$anonfun$createActorSystem$1.apply(AkkaUtils.scala:58)

 at
 org.apache.spark.util.AkkaUtils$$anonfun$createActorSystem$1.apply(AkkaUtils.scala:58)

 at scala.Option.map(Option.scala:145)

 at
 org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:58)

 at org.apache.spark.SparkEnv$.create(SparkEnv.scala:126)

 at
 org.apache.spark.SparkContext.init(SparkContext.scala:139)

 at
 org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:500)

 at
 org.apache.spark.streaming.StreamingContext.init(StreamingContext.scala:76)

 ...



 Is there a way to find out what versions of slf4j I need to make it work
 with log4j 1.2.17?



 -Adrian