Re: Processing multiple request in cluster

2014-09-25 Thread Akhil Das
You can try spark on Mesos or Yarn since they have lot more support for
scheduling and all

Thanks
Best Regards

On Thu, Sep 25, 2014 at 4:50 AM, Subacini B subac...@gmail.com wrote:

 hi All,

 How to run concurrently multiple requests on same cluster.

 I have a program using *spark streaming context *which reads* streaming
 data* and writes it to HBase. It works fine, the problem is when multiple
 requests are submitted to cluster, only first request is processed as the
 entire cluster is used for this request. Rest of the requests are in
 waiting mode.

 i have set  spark.cores.max to 2 or less, so that it can process another
 request,but if there is only one request cluster is not utilized properly.

 Is there any way, that spark cluster can process streaming request
 concurrently at the same time effectively utitlizing cluster, something
 like sharkserver

 Thanks
 Subacini



Re: What is a pre built package of Apache Spark

2014-09-25 Thread Akhil Das
Looks like pyspark was not able to find the python binaries from the
environment. You need to install python
https://docs.python.org/2/faq/windows.html (if not installed already).

Thanks
Best Regards

On Thu, Sep 25, 2014 at 9:00 AM, Denny Lee denny.g@gmail.com wrote:

 This seems similar to a related Windows issue concerning python where
 pyspark could't find the python because the PYTHONSTARTUP environment
 wasn't set - by any chance could this be related?

 On Wed, Sep 24, 2014 at 7:51 PM, christy 760948...@qq.com wrote:

 Hi I have installed standalone on win7(src not pre-built version), after
 the
 sbt assembly, I can run spark-shell successfully but python shell does
 work.

 $ ./bin/pyspark
 ./bin/pyspark: line 111: exec: python: not found

 have you solved your problem?

 Thanks,
 Christy






 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/What-is-a-pre-built-package-of-Apache-Spark-tp14080p15101.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Log hdfs blocks sending

2014-09-25 Thread Alexey Romanchuk
Hello again spark users and developers!

I have standalone spark cluster (1.1.0) and spark sql running on it. My
cluster consists of 4 datanodes and replication factor of files is 3.

I use thrift server to access spark sql and have 1 table with 30+
partitions. When I run query on whole table (something simple like select
count(*) from t) spark produces a lot of network activity filling all
available 1gb link. Looks like spark sent data by network instead of local
reading.

Is it any way to log which blocks were accessed locally and which are not?

Thanks!


Re: quick start guide: building a standalone scala program

2014-09-25 Thread christy
I have encountered the same issue when I went through the tutorial first
standalone application. Then I tried to reinstall the stb but it doest help.

Then I follow this thread, create a workspace under spark directly and
execute ./sbt/sbt package, it says packing successfully. But how this
happen? How the sbt know which location specific?  

And though it went smoothly, I didn't see any jar had been created. 

Pls help.

Thanks,
Christy



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/quick-start-guide-building-a-standalone-scala-program-tp3116p15120.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: java.lang.OutOfMemoryError while running SVD MLLib example

2014-09-25 Thread Xiangrui Meng
7000x7000 is not tall-and-skinny matrix. Storing the dense matrix
requires 784MB. The driver needs more storage for collecting result
from executors as well as making a copy for LAPACK's dgesvd. So you
need more memory. Do you need the full SVD? If not, try to use a small
k, e.g, 50. -Xiangrui

On Wed, Sep 24, 2014 at 3:01 PM, Shailesh Birari
sbir...@wynyardgroup.com wrote:

 Note, the data is random numbers (double).
 Any suggestions/pointers will be highly appreciated.

 Thanks,
   Shailesh



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-OutOfMemoryError-while-running-SVD-MLLib-example-tp14972p15083.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Out of memory exception in MLlib's naive baye's classification training

2014-09-25 Thread Xiangrui Meng
For the vectorizer, what's the output feature dimension and are you
creating sparse vectors or dense vectors? The model on the driver
consists of numClasses * numFeatures doubles. However, the driver
needs more memory in order to receive the task result (of the same
size) from executors. So you need to control the feature dimension
(this is why people use the hashing trick) and reduce the number of
partitions. -Xiangrui


On Wed, Sep 24, 2014 at 10:59 AM, jatinpreet jatinpr...@gmail.com wrote:
 Hi,

 I was able to get the training running in local mode with default settings,
 there was a problem with document labels which were quite large(not 20 as
 suggested earlier).

 I am currently training 175000 documents on a single node with 2GB of
 executor memory and 5GB of driver memory successfully. If I increase the
 number of documents, I get the OOM error. I wish to understand what
 generally the bottlenecks are for naive bayes, is it the executor or the
 driver memory? Also, what are the things to keep in mind while training huge
 sets of data so that I can have a bullet proof classification system,
 slowing down in case of low memory is fine but not exceptions.

 As a side note, is there any classification algorithm in MLlib which can
 just append the new training data to an existing model? With naive bayes, I
 need to have all the data available at once for training.

 Thanks,
 Jatin



 -
 Novice Big Data Programmer
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Out-of-memory-exception-in-MLlib-s-naive-baye-s-classification-training-tp14809p15052.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: YARN ResourceManager and Hadoop NameNode Web UI not visible in port 8088, port 50070

2014-09-25 Thread Sandy Ryza
Hi Raghuveer,

This might be a better question for the cdh-user list or the Hadoop user
list.  The Hadoop web interfaces for both the NameNode and ResourceManager
are enabled by default.  Is it possible you have a firewall blocking those
ports?

-Sandy

On Wed, Sep 24, 2014 at 9:00 PM, Raghuveer Chanda 
raghuveer.cha...@gmail.com wrote:

 Hi,

 Im running a spark job in YARN cluster ..but im not able to see the Web
 Interface of the YARN ResourceManager and Hadoop NameNode Web UI  in port
 8088, port 50070 and spark stages.

 Only the spark UI in port 18080 is visible.

 I got the URL's from cloudera but may be due to some default option for
 security the Web Interface is disabled.

 How can i enable the web interface i.e is there any option in cloudera or
 is the server firewall is blocking it .. Please help ..



 --
 Regards,
 Raghuveer Chanda
 4th year Undergraduate Student
 Computer Science and Engineering
 IIT Kharagpur



Re: quick start guide: building a standalone scala program

2014-09-25 Thread christy
I encountered exactly the same problem. How did you solve this?

Thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/quick-start-guide-building-a-standalone-scala-program-tp3116p15125.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Issue with Spark-1.1.0 and the start-thriftserver.sh script

2014-09-25 Thread Hélène Delanoeye



Hi

We've just experienced an issue with the new Spark-1.1.0 and the start-thriftserver.sh script.


We tried to launch start-thriftserver.sh with --master yarn option and got the following error message :

Failed to load Hive Thrift server main class org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.
You need to build Spark with -Phive. 


In fact Spark was built with -Phive option, but the real problem was this one :

Application appattempt_1411058337040_0118_01 submitted by user x to unknown queue: default

So the solution was to specified the queue, and it works : 
/opt/spark/sbin/start-thriftserver.sh --master yarn --queue spark-batch

Hope this could help, as the error message is not really clear (and rather wrong).

Helene

-- 






Hélène Delanoeye Software engineer / Search team

E helene.delano...@kelkoo.comY!Messenger kelkoohelened
T (33) 4 56 09 07 57
A 6, rue des Méridiens 38130 Echirolles FRANCE











Kelkoo SAS
Société par Actions Simplifiée
Au capital de € 4.168.964,30
Siège social : 8, rue du Sentier 75002 Paris
425 093 069 RCS Paris

Ce message et les pièces jointes sont confidentiels et établis à l'attention exclusive de leurs destinataires. Si vous n'êtes pas le destinataire de ce message, merci de le détruire et d'en avertir l'expéditeur.





Re: Processing multiple request in cluster

2014-09-25 Thread Mayur Rustagi
There are two problems you may be facing.
1. your application is taking all resources
2. inside your application task submission is not scheduling properly.

for 1  you can either configure your app to take less resources or use
mesos/yarn types scheduler to dynamically change or juggle resources
for 2. you can use fair scheduler so that application tasks can be
scheduled more fairly.

Regards
Mayur

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi


On Thu, Sep 25, 2014 at 12:32 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 You can try spark on Mesos or Yarn since they have lot more support for
 scheduling and all

 Thanks
 Best Regards

 On Thu, Sep 25, 2014 at 4:50 AM, Subacini B subac...@gmail.com wrote:

 hi All,

 How to run concurrently multiple requests on same cluster.

 I have a program using *spark streaming context *which reads* streaming
 data* and writes it to HBase. It works fine, the problem is when
 multiple requests are submitted to cluster, only first request is processed
 as the entire cluster is used for this request. Rest of the requests are in
 waiting mode.

 i have set  spark.cores.max to 2 or less, so that it can process another
 request,but if there is only one request cluster is not utilized properly.

 Is there any way, that spark cluster can process streaming request
 concurrently at the same time effectively utitlizing cluster, something
 like sharkserver

 Thanks
 Subacini





Re: Can not see any spark metrics on ganglia-web

2014-09-25 Thread tsingfu
Hi, I found the problem.
By default, gmond is monitoring the multicast ip:239.2.11.71, while I set
*.sink.ganglia.host=localhost.

the correct configuration in metrics.properties:
# Enable GangliaSink for all instances
*.sink.ganglia.class=org.apache.spark.metrics.sink.GangliaSink
#*.sink.ganglia.host=localhost
*.sink.ganglia.host=239.2.11.71
*.sink.ganglia.port=8653
*.sink.ganglia.period=10
*.sink.ganglia.unit=seconds
*.sink.ganglia.ttl=1
*.sink.ganglia.mode=multicast




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Can-not-see-any-spark-metrics-on-ganglia-web-tp14981p15128.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Memory used in Spark-0.9.0-incubating

2014-09-25 Thread 王晓雨

ENV:
Spark:0.9.0-incubating
Hadoop:2.3.0

I run spark task on Yarn. I see the log in Nodemanager:
2014-09-25 17:43:34,141 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: 
Memory usage of ProcessTree 549 for container-id 
container_1411635522254_0001_01_05: *4.5 GB**of 5 GB* physical 
memory used; 5.0 GB of 10.5 GB virtual memory used
2014-09-25 17:43:37,171 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: 
Memory usage of ProcessTree 549 for container-id 
container_1411635522254_0001_01_05: *4.5 GB**of 5 GB* physical 
memory used; 5.0 GB of 10.5 GB virtual memory used
2014-09-25 17:43:40,210 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: 
Memory usage of ProcessTree 549 for container-id 
container_1411635522254_0001_01_05: *4.5 GB**of 5 GB* physical 
memory used; 5.0 GB of 10.5 GB virtual memory used
2014-09-25 17:43:43,239 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: 
Memory usage of ProcessTree 549 for container-id 
container_1411635522254_0001_01_05: *4.5 GB of 5 GB* physical memory 
used; 5.0 GB of 10.5 GB virtual memory used
2014-09-25 17:43:46,269 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: 
Memory usage of ProcessTree 549 for container-id 
container_1411635522254_0001_01_05: *4.5 GB of 5 GB* physical memory 
used; 5.0 GB of 10.5 GB virtual memory used


My task parameter is :
--num-workers 4 --master-memory 2g --worker-memory 4g --worker-cores 4
In myopinion --worker-memory 4g 4g is the maximum memory for container .
But why 4.5 GB of 5 GB physical memory used in the log?
And where to config 5G maxinum memory for container?

--

WangXiaoyu



Memory used in Spark-0.9.0-incubating

2014-09-25 Thread 王晓雨

ENV:
Spark:0.9.0-incubating
Hadoop:2.3.0

I run spark task on Yarn. I see the log in Nodemanager:
2014-09-25 17:43:34,141 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: 
Memory usage of ProcessTree 549 for container-id 
container_1411635522254_0001_01_05: *4.5 GB**of 5 GB* physical 
memory used; 5.0 GB of 10.5 GB virtual memory used
2014-09-25 17:43:37,171 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: 
Memory usage of ProcessTree 549 for container-id 
container_1411635522254_0001_01_05: *4.5 GB**of 5 GB* physical 
memory used; 5.0 GB of 10.5 GB virtual memory used
2014-09-25 17:43:40,210 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: 
Memory usage of ProcessTree 549 for container-id 
container_1411635522254_0001_01_05: *4.5 GB**of 5 GB* physical 
memory used; 5.0 GB of 10.5 GB virtual memory used
2014-09-25 17:43:43,239 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: 
Memory usage of ProcessTree 549 for container-id 
container_1411635522254_0001_01_05: *4.5 GB of 5 GB* physical memory 
used; 5.0 GB of 10.5 GB virtual memory used
2014-09-25 17:43:46,269 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: 
Memory usage of ProcessTree 549 for container-id 
container_1411635522254_0001_01_05: *4.5 GB of 5 GB* physical memory 
used; 5.0 GB of 10.5 GB virtual memory used


My task parameter is :
--num-workers 4 --master-memory 2g --worker-memory 4g --worker-cores 4
In myopinion --worker-memory 4g 4g is the maximum memory for container .
But why 4.5 GB of 5 GB physical memory used in the log?
And where to config 5G maxinum memory for container?

--


王晓雨
【云平台-开放云事业部】数据引擎组
-
手机:18600049984
电话:01062425760
邮箱:wangxiao...@jd.com
邮编:100195
地址:北京市海淀区杏石口路65号益园文创基地C区11号楼3层
-



Re: Memory used in Spark-0.9.0-incubating

2014-09-25 Thread 王晓雨

My yarn-site.xml config:
property
nameyarn.nodemanager.resource.memory-mb/name
value16384/value
/property


ENV:
Spark:0.9.0-incubating
Hadoop:2.3.0

I run spark task on Yarn. I see the log in Nodemanager:
2014-09-25 17:43:34,141 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: 
Memory usage of ProcessTree 549 for container-id 
container_1411635522254_0001_01_05: *4.5 GB**of 5 GB* physical 
memory used; 5.0 GB of 10.5 GB virtual memory used
2014-09-25 17:43:37,171 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: 
Memory usage of ProcessTree 549 for container-id 
container_1411635522254_0001_01_05: *4.5 GB**of 5 GB* physical 
memory used; 5.0 GB of 10.5 GB virtual memory used
2014-09-25 17:43:40,210 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: 
Memory usage of ProcessTree 549 for container-id 
container_1411635522254_0001_01_05: *4.5 GB**of 5 GB* physical 
memory used; 5.0 GB of 10.5 GB virtual memory used
2014-09-25 17:43:43,239 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: 
Memory usage of ProcessTree 549 for container-id 
container_1411635522254_0001_01_05: *4.5 GB of 5 GB* physical 
memory used; 5.0 GB of 10.5 GB virtual memory used
2014-09-25 17:43:46,269 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: 
Memory usage of ProcessTree 549 for container-id 
container_1411635522254_0001_01_05: *4.5 GB of 5 GB* physical 
memory used; 5.0 GB of 10.5 GB virtual memory used


My task parameter is :
--num-workers 4 --master-memory 2g --worker-memory 4g --worker-cores 4
In myopinion --worker-memory 4g 4g is the maximum memory for 
container .

But why 4.5 GB of 5 GB physical memory used in the log?
And where to config 5G maxinum memory for container?

--

WangXiaoyu
- 



Re: Re:

2014-09-25 Thread pouryas
I had similar problem writing to cassandra using the connector for cassandra.
I am not sure whether this will work or not but I reduced the number of
cores to 1 per machine and my job was stable. More explanation of my
issue...

http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Cassandra-Connector-Issue-and-performance-td15005.html



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/no-subject-tp15019p15134.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



java.io.FileNotFoundException in usercache

2014-09-25 Thread Egor Pahomov
I work with spark on unstable cluster with bad administration.
I started get

14/09/25 15:29:56 ERROR storage.DiskBlockObjectWriter: Uncaught
exception while reverting partial writes to file
/local/hd2/yarn/local/usercache/epahomov/appcache/application_1411219858924_15501/spark-local-20140925151931-a4c3/3a/shuffle_4_30_174

java.io.FileNotFoundException:
/local/hd2/yarn/local/usercache/epahomov/appcache/application_1411219858924_15501/spark-local-20140925151931-a4c3/3a/shuffle_4_30_174
(No such file or directory)

couple days ago. After this error spark context shuted down. I'm are that
there are some problems with distributed cache on cluster, some people
add too much data in it.

I totally don't understand what's going on, but willing to undertand deeply.

1) Does spark somehow rely on yarn localization mechanizm?
2) What is directory usercache about?
3) Is there a quick way to go around of problem?
4) Isn't shutting spark context is overreaction on this error?


-- 



*Sincerely yoursEgor PakhomovDeveloper, Yandex*


SPARK 1.1.0 on yarn-cluster and external JARs

2014-09-25 Thread rzykov
We build some SPARK jobs with external jars. I compile jobs by including them
in one assembly.
But look for an approach to put all external jars into HDFS.

We have already put  spark jar in a HDFS folder and set up the variable
SPARK_JAR. 
What is the best way to do that for other external jars (MongoDB, algebird
and so on)?

Thanks in advance





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SPARK-1-1-0-on-yarn-cluster-and-external-JARs-tp15136.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Memory used in Spark-0.9.0-incubating

2014-09-25 Thread Yi Tian
You should check the log of resource manager when you submit this job to yarn.

It will be recorded how many resources your spark application actually asked 
from resource manager for each container.

Did you use fair scheduler?

there is a config parameter of fair scheduler 
“yarn.scheduler.increment-allocation-mb”, default is 1024

it means if you ask 4097mb memory for a container, the resource manager will 
create a container which use 5120mb memory.

But I can’t figure out where 5GB come from.

Maybe there are some codes which mistake 1024 and 1000?

Best Regards,

Yi Tian
tianyi.asiai...@gmail.com




On Sep 25, 2014, at 18:41, 王晓雨 wangxiao...@jd.com wrote:

 My yarn-site.xml config:
 property
 nameyarn.nodemanager.resource.memory-mb/name
 value16384/value
 /property
 
 ENV:
 Spark:0.9.0-incubating
 Hadoop:2.3.0
 
 I run spark task on Yarn. I see the log in Nodemanager:
 2014-09-25 17:43:34,141 INFO 
 org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
  Memory usage of ProcessTree 549 for container-id 
 container_1411635522254_0001_01_05: 4.5 GB of 5 GB physical memory used; 
 5.0 GB of 10.5 GB virtual memory used
 2014-09-25 17:43:37,171 INFO 
 org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
  Memory usage of ProcessTree 549 for container-id 
 container_1411635522254_0001_01_05: 4.5 GB of 5 GB physical memory used; 
 5.0 GB of 10.5 GB virtual memory used
 2014-09-25 17:43:40,210 INFO 
 org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
  Memory usage of ProcessTree 549 for container-id 
 container_1411635522254_0001_01_05: 4.5 GB of 5 GB physical memory used; 
 5.0 GB of 10.5 GB virtual memory used
 2014-09-25 17:43:43,239 INFO 
 org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
  Memory usage of ProcessTree 549 for container-id 
 container_1411635522254_0001_01_05: 4.5 GB of 5 GB physical memory used; 
 5.0 GB of 10.5 GB virtual memory used
 2014-09-25 17:43:46,269 INFO 
 org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
  Memory usage of ProcessTree 549 for container-id 
 container_1411635522254_0001_01_05: 4.5 GB of 5 GB physical memory used; 
 5.0 GB of 10.5 GB virtual memory used
 
 My task parameter is :
 --num-workers 4 --master-memory 2g --worker-memory 4g --worker-cores 4
 In my opinion --worker-memory 4g 4g is the maximum memory for container . 
 But why 4.5 GB of 5 GB physical memory used in the log?
 And where to config 5G maxinum memory for container?
 
 -- 
 
 WangXiaoyu
 -



Update gcc version ,Still snappy error.

2014-09-25 Thread buring
 I update the spark version form 1.02 to 1.10 , experienced an snappy version
issue with the new Spark-1.1.0. After update the glibc version, occured a
another issue. I abstract the log as follows:

14/09/25 11:29:18 WARN [org.apache.hadoop.util.NativeCodeLoader---main]:
Unable to load native-hadoop library for your platform... using builtin-java
classes where applicable
14/09/25 11:29:19 WARN [org.apache.hadoop.hdfs.DomainSocketFactory---main]:
The short-circuit local reads feature is disabled because libhadoop cannot
be loaded.

WARN [org.apache.spark.scheduler.TaskSetManager---Result resolver thread-0]:
Lost task 0.0 in stage 1.0 (TID 1, spark-dev134):
org.xerial.snappy.SnappyError: [FAILED_TO_LOAD_NATIVE_LIBRARY] null
org.xerial.snappy.SnappyLoader.load(SnappyLoader.java:236)
org.xerial.snappy.Snappy.clinit(Snappy.java:48)
   
org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:351)

WARN [org.apache.spark.scheduler.TaskSetManager---Result resolver thread-3]:
Lost task 4.0 in stage 1.0 (TID 4, spark-dev134):
java.lang.NoClassDefFoundError: Could not initialize class
org.xerial.snappy.Snappy
   
org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:351)
   
org.xerial.snappy.SnappyInputStream.rawRead(SnappyInputStream.java:159)
org.xerial.snappy.SnappyInputStream.read(SnappyInputStream.java:142)

14/09/25 11:29:24 ERROR
[org.apache.spark.network.ConnectionManager---handle-read-write-executor-3]:
Corresponding SendingConnection to ConnectionManagerId(spark-dev135,38649)
not found

14/09/25 11:29:24 INFO [org.apache.spark.scheduler.DAGScheduler---main]:
Failed to run count at SessionSVD2.scala:23
Exception in thread main org.apache.spark.SparkException: Job aborted due
to stage failure: Task 2 in stage 1.0 failed 4 times, most recent failure:
Lost task 2.3 in stage 1.0 (TID 9, spark-dev135): ExecutorLostFailure
(executor lost)
Driver stacktrace:
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

1)I tried to print JAVA_LIBRARY_PATH ,native-hadoop library is not in the 
path,and I set the
System.setProperty(JAVA_LIBRARY_PATH,hadoop_home/lib/native/),which only
effect in System.getenv(),but not the
System.getProperty(JAVA_LIBRARY_PATH) .And hadoop_home/lib/native/
contain libhadoop libsnappy.so file,whcih I want to include in path.

2)I found in /tmp there are many snappy-uuuid file,each time i submit a job
it create a snappy-uuuid file. Before I update the glibc version,my fellow
update the snappy version,I think this is the reason why it can find the
snappy file but libhadoop.

Is there any ideas?
Thanks 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Update-gcc-version-Still-snappy-error-tp15137.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Pregel messages serialized in local machine?

2014-09-25 Thread Cheuk Lam
This is a question on using the Pregel function in GraphX.  Does a message
get serialized and then de-serialized in the scenario where both the source
and the destination vertices are in the same compute node/machine?

Thank you!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Pregel-messages-serialized-in-local-machine-tp15140.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Systematic error when re-starting Spark stream unless I delete all checkpoints

2014-09-25 Thread Svend
I experience spark streaming restart issues similar to what is discussed in
the 2 threads below (in which I failed to find a solution). Could anybody
let me know if anything is wrong in the way I start/stop or if this could be
a spark bug?

http://apache-spark-user-list.1001560.n3.nabble.com/RDD-data-checkpoint-cleaning-td14847.html
http://apache-spark-user-list.1001560.n3.nabble.com/KafkaReciever-Error-when-starting-ssc-Actor-name-not-unique-tc3978.html

My stream reads a Kafka topic, does some processing involving an
updatStateByKey and saves the result to HDFS. 

The context is (re)-created at startup as follows: 



And the start-up and shutdown of the stream is handled as follows: 




When starting the stream for the first time (with spark-submit), the
processing happens successfully, folders are created on the target HDFS
folder and streaming stats are visible on http://sparkhost:4040/streaming.

After letting the streaming work several minutes and then stopping it
(ctrl-c on the command line), the following info is visible in the
checkpoint folder: 



(checkpoint clean-up seems to happen since the stream ran for much more than
5 times 10 seconds)

When re-starting the stream, the startup fails with the error below,
http://sparkhost:4040/streaming shows no statistics, no new HDFS folder is
added in the target folder and no new checkpoint are created: 






Now if I delete all older checkpoints and keep only the most recent one: 



I end up with this (kafka?) actor non unique name error. 



If I delete the checkpoint folder the stream starts successfully (but I lose
my ongoing stream state, obviously)

We're running spark 1.1.0 on Mesos 0.20. Our spark assembly is packaged with
CDH 5.1.0 and Hive: 



Any comment or suggestion would be greatly appreciated.

















--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Systematic-error-when-re-starting-Spark-stream-unless-I-delete-all-checkpoints-tp15142.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Hive max key length is 767 bytes

2014-09-25 Thread Denny Lee
Sorry for missing your original email - thanks for the catch, eh?!

On Thu, Sep 25, 2014 at 7:14 AM, arthur.hk.c...@gmail.com 
arthur.hk.c...@gmail.com wrote:

 Hi,

 Fixed the issue by downgrade hive from 13.1 to 12.0, it works well now.

 Regards


 On 31 Aug, 2014, at 7:28 am, arthur.hk.c...@gmail.com 
 arthur.hk.c...@gmail.com wrote:

 Hi,

 Already done but still get the same error:

 (I use HIVE 0.13.1 Spark 1.0.2, Hadoop 2.4.1)

 Steps:
 Step 1) mysql:

 alter database hive character set latin1;

 Step 2) HIVE:

 hive create table test_datatype2 (testbigint bigint );
 OK
 Time taken: 0.708 seconds

 hive drop table test_datatype2;
 OK
 Time taken: 23.272 seconds

 Step 3) scala val hiveContext = new
 org.apache.spark.sql.hive.HiveContext(sc)

 14/08/29 19:33:52 INFO Configuration.deprecation:
 mapred.reduce.tasks.speculative.execution is deprecated. Instead, use
 mapreduce.reduce.speculative
 hiveContext: org.apache.spark.sql.hive.HiveContext =
 org.apache.spark.sql.hive.HiveContext@395c7b94

 scala hiveContext.hql(“create table test_datatype3 (testbigint bigint)”)

 res0: org.apache.spark.sql.SchemaRDD =
 SchemaRDD[0] at RDD at SchemaRDD.scala:104
 == Query Plan ==
 Native command: executed by Hive

 scala hiveContext.hql(drop table test_datatype3)

 14/08/29 19:34:14 ERROR DataNucleus.Datastore: An exception was thrown
 while adding/validating class(es) : Specified key was too long; max key
 length is 767 bytes
 com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Specified key
 was too long; max key length is 767 bytes
 at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
 at
 sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:39)

 14/08/29 19:34:17 WARN DataNucleus.Query: Query for candidates of
 org.apache.hadoop.hive.metastore.model.MPartition and subclasses resulted
 in no possible candidates
 Error(s) were found while auto-creating/validating the datastore for
 classes. The errors are printed in the log, and are attached to this
 exception.
 org.datanucleus.exceptions.NucleusDataStoreException: Error(s) were found
 while auto-creating/validating the datastore for classes. The errors are
 printed in the log, and are attached to this exception.
 at
 org.datanucleus.store.rdbms.RDBMSStoreManager$ClassAdder.verifyErrors(RDBMSStoreManager.java:3609)


 Caused by: com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException:
 Specified key was too long; max key length is 767 bytes
 at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)



 Should I use HIVE 0.12.0 instead of HIVE 0.13.1?

 Regards
 Arthur

 On 31 Aug, 2014, at 6:01 am, Denny Lee denny.g@gmail.com wrote:

 Oh, you may be running into an issue with your MySQL setup actually, try
 running

 alter database metastore_db character set latin1

 so that way Hive (and the Spark HiveContext) can execute properly against
 the metastore.


 On August 29, 2014 at 04:39:01, arthur.hk.c...@gmail.com (
 arthur.hk.c...@gmail.com) wrote:

 Hi,


 Tried the same thing in HIVE directly without issue:

 HIVE:

 hive create table test_datatype2 (testbigint bigint );
 OK
 Time taken: 0.708 seconds

 hive drop table test_datatype2;
 OK
 Time taken: 23.272 seconds




 Then tried again in SPARK:
 scala val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
 14/08/29 19:33:52 INFO Configuration.deprecation:
 mapred.reduce.tasks.speculative.execution is deprecated. Instead, use
 mapreduce.reduce.speculative
 hiveContext: org.apache.spark.sql.hive.HiveContext =
 org.apache.spark.sql.hive.HiveContext@395c7b94

 scala val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
 res0: org.apache.spark.sql.SchemaRDD =
 SchemaRDD[0] at RDD at SchemaRDD.scala:104
 == Query Plan ==
 Native command: executed by Hive

 scala hiveContext.hql(drop table test_datatype3)

 14/08/29 19:34:14 ERROR DataNucleus.Datastore: An exception was thrown
 while adding/validating class(es) : Specified key was too long; max key
 length is 767 bytes
 com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Specified key
 was too long; max key length is 767 bytes
 at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
 at
 sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:39)

 14/08/29 19:34:17 WARN DataNucleus.Query: Query for candidates of
 org.apache.hadoop.hive.metastore.model.MPartition and subclasses resulted
 in no possible candidates
 Error(s) were found while auto-creating/validating the datastore for
 classes. The errors are printed in the log, and are attached to this
 exception.
 org.datanucleus.exceptions.NucleusDataStoreException: Error(s) were found
 while auto-creating/validating the datastore for classes. The errors are
 printed in the log, and are attached to this exception.
 at
 org.datanucleus.store.rdbms.RDBMSStoreManager$ClassAdder.verifyErrors(RDBMSStoreManager.java:3609)


 Caused by: 

Re: SPARK 1.1.0 on yarn-cluster and external JARs

2014-09-25 Thread Egor Pahomov
SparkContext.addJar()?

Why you didn't like fat jar way?

2014-09-25 16:25 GMT+04:00 rzykov rzy...@gmail.com:

 We build some SPARK jobs with external jars. I compile jobs by including
 them
 in one assembly.
 But look for an approach to put all external jars into HDFS.

 We have already put  spark jar in a HDFS folder and set up the variable
 SPARK_JAR.
 What is the best way to do that for other external jars (MongoDB, algebird
 and so on)?

 Thanks in advance





 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/SPARK-1-1-0-on-yarn-cluster-and-external-JARs-tp15136.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 



*Sincerely yoursEgor PakhomovScala Developer, Yandex*


how to run spark job on yarn with jni lib?

2014-09-25 Thread taqilabon
Hi all,

I tried to run my spark job on yarn.
In my application, I need to call third-parity jni libraries in a spark job.
However, I can't find a way to make spark job load my native libraries.
Is there anyone who knows how to solve this problem?
Thanks.

Ziv Huang



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/how-to-run-spark-job-on-yarn-with-jni-lib-tp15146.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: how to run spark job on yarn with jni lib?

2014-09-25 Thread Marcelo Vanzin
Hmmm, you might be suffering from SPARK-1719.

Not sure what the proper workaround is, but it sounds like your native
libs are not in any of the standard lib directories; one workaround
might be to copy them there, or add their location to /etc/ld.so.conf
(I'm assuming Linux).

On Thu, Sep 25, 2014 at 8:34 AM, taqilabon g945...@gmail.com wrote:
 Hi all,

 I tried to run my spark job on yarn.
 In my application, I need to call third-parity jni libraries in a spark job.
 However, I can't find a way to make spark job load my native libraries.
 Is there anyone who knows how to solve this problem?
 Thanks.

 Ziv Huang



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/how-to-run-spark-job-on-yarn-with-jni-lib-tp15146.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark SQL use of alias in where clause

2014-09-25 Thread Du Li
Thanks, Yanbo and Nicholas. Now it makes more sense — query optimization is the 
answer. /Du

From: Nicholas Chammas 
nicholas.cham...@gmail.commailto:nicholas.cham...@gmail.com
Date: Thursday, September 25, 2014 at 6:43 AM
To: Yanbo Liang yanboha...@gmail.commailto:yanboha...@gmail.com
Cc: Du Li l...@yahoo-inc.com.invalidmailto:l...@yahoo-inc.com.invalid, 
d...@spark.apache.orgmailto:d...@spark.apache.org 
d...@spark.apache.orgmailto:d...@spark.apache.org, 
user@spark.apache.orgmailto:user@spark.apache.org 
user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: Spark SQL use of alias in where clause

That is correct. Aliases in the SELECT clause can only be referenced in the 
ORDER BY and HAVING clauses. Otherwise, you'll have to just repeat the 
statement, like concat() in this case.

A more elegant alternative, which is probably not available in Spark SQL yet, 
is to use Common Table 
Expressionshttp://technet.microsoft.com/en-us/library/ms190766(v=sql.105).aspx.

On Wed, Sep 24, 2014 at 11:32 PM, Yanbo Liang 
yanboha...@gmail.commailto:yanboha...@gmail.com wrote:
Maybe it's the way SQL works.
The select part is executed after the where filter is applied, so you cannot 
use alias declared in select part in where clause.
Hive and Oracle behavior the same as Spark SQL.

2014-09-25 8:58 GMT+08:00 Du Li 
l...@yahoo-inc.com.invalidmailto:l...@yahoo-inc.com.invalid:
Hi,

The following query does not work in Shark nor in the new Spark SQLContext or 
HiveContext.
SELECT key, value, concat(key, value) as combined from src where combined like 
’11%’;

The following tweak of syntax works fine although a bit ugly.
SELECT key, value, concat(key, value) as combined from src where 
concat(key,value) like ’11%’ order by combined;

Are you going to support alias in where clause soon?

Thanks,
Du




Re: Multiple Kafka Receivers and Union

2014-09-25 Thread Matt Narrell
I suppose I have other problems as I can’t get the Scala example to work 
either.  Puzzling, as I have literally coded like the examples (that are 
purported to work), but no luck.

mn

On Sep 24, 2014, at 11:27 AM, Tim Smith secs...@gmail.com wrote:

 Maybe differences between JavaPairDStream and JavaPairReceiverInputDStream?
 
 On Wed, Sep 24, 2014 at 7:46 AM, Matt Narrell matt.narr...@gmail.com wrote:
 The part that works is the commented out, single receiver stream below the 
 loop.  It seems that when I call KafkaUtils.createStream more than once, I 
 don’t receive any messages.
 
 I’ll dig through the logs, but at first glance yesterday I didn’t see 
 anything suspect.  I’ll have to look closer.
 
 mn
 
 On Sep 23, 2014, at 6:14 PM, Tim Smith secs...@gmail.com wrote:
 
 Maybe post the before-code as in what was the code before you did the
 loop (that worked)? I had similar situations where reviewing code
 before (worked) and after (does not work) helped. Also, what helped is
 the Scala REPL because I can see what are the object types being
 returned by each statement.
 
 Other than code, in the driver logs, you should see events that say
 Registered receiver for stream 0 from
 akka.tcp://sp...@node5.acme.net:53135
 
 Now, if you goto node5 and look at Spark or YarnContainer logs
 (depending on who's doing RM), you should be able to see if the
 receiver has any errors when trying to talk to kafka.
 
 
 
 On Tue, Sep 23, 2014 at 3:21 PM, Matt Narrell matt.narr...@gmail.com 
 wrote:
 To my eyes, these are functionally equivalent.  I’ll try a Scala approach, 
 but this may cause waves for me upstream (e.g., non-Java)
 
 Thanks for looking at this.  If anyone else can see a glaring issue in the 
 Java approach that would be appreciated.
 
 Thanks,
 Matt
 
 On Sep 23, 2014, at 4:13 PM, Tim Smith secs...@gmail.com wrote:
 
 Sorry, I am almost Java illiterate but here's my Scala code to do the
 equivalent (that I have tested to work):
 
 val kInStreams = (1 to 10).map{_ =
 KafkaUtils.createStream(ssc,zkhost.acme.net:2182,myGrp,Map(myTopic
 - 1), StorageLevel.MEMORY_AND_DISK_SER) } //Create 10 receivers
 across the cluster, one for each partition, potentially but active
 receivers are only as many kafka partitions you have
 
 val kInMsg = 
 ssc.union(kInStreams).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER)
 
 
 
 
 On Tue, Sep 23, 2014 at 2:20 PM, Matt Narrell matt.narr...@gmail.com 
 wrote:
 So, this is scrubbed some for confidentiality, but the meat of it is as 
 follows.  Note, that if I substitute the commented section for the loop, 
 I receive messages from the topic.
 
 SparkConf sparkConf = new SparkConf();
 sparkConf.set(spark.streaming.unpersist, true);
 sparkConf.set(spark.logConf, true);
 
 MapString, String kafkaProps = new HashMap();
 kafkaProps.put(zookeeper.connect, Constants.ZK_ENSEMBLE + /kafka);
 kafkaProps.put(group.id, groupId);
 
 JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, 
 Seconds.apply(1));
 jsc.checkpoint(hdfs://some_location);
 
 ListJavaPairDStreamString, ProtobufModel streamList = new 
 ArrayList(5);
 
 for (int i = 0; i  5; i++) {
  streamList.add(KafkaUtils.createStream(jsc,
 String.class, 
 ProtobufModel.class,
 StringDecoder.class, 
 ProtobufModelDecoder.class,
 kafkaProps,
 Collections.singletonMap(topic, 
 1),
 StorageLevel.MEMORY_ONLY_SER()));
 }
 
 final JavaPairDStreamString, ProtobufModel stream = 
 jsc.union(streamList.get(0), streamList.subList(1, streamList.size()));
 
 //  final JavaPairReceiverInputDStreamString, ProtobufModel stream =
 //  KafkaUtils.createStream(jsc,
 //  String.class, 
 ProtobufModel.class,
 //  StringDecoder.class, 
 ProtobufModelDecoder.class,
 //  kafkaProps,
 //  
 Collections.singletonMap(topic, 5),
 //  
 StorageLevel.MEMORY_ONLY_SER());
 
 final JavaPairDStreamString, Integer tuples = stream.mapToPair(
  new PairFunctionTuple2String, ProtobufModel, String, Integer() {
  @Override
  public Tuple2String, Integer call(Tuple2String, 
 ProtobufModel tuple) throws Exception {
  return new Tuple2(tuple._2().getDeviceId(), 1);
  }
  });
 
 … and futher Spark functions ...
 
 On Sep 23, 2014, at 2:55 PM, Tim Smith secs...@gmail.com wrote:
 
 Posting your code would be really helpful in figuring out gotchas.
 
 On Tue, Sep 23, 2014 at 9:19 AM, Matt Narrell matt.narr...@gmail.com 
 wrote:
 Hey,
 
 Spark 1.1.0
 Kafka 0.8.1.1
 Hadoop (YARN/HDFS) 2.5.1
 
 I have a five partition Kafka topic.  I can create a single Kafka 
 receiver
 via KafkaUtils.createStream 

VertexRDD partition imbalance

2014-09-25 Thread Larry Xiao

Hi all

VertexRDD is partitioned with HashPartitioner, and it exhibits some 
imbalance of tasks.

For example, Connected Components with partition strategy Edge2D:


   Aggregated Metrics by Executor

Executor ID 	Task Time 	Total Tasks 	Failed Tasks 	Succeeded Tasks 
Input 	Shuffle Read 	Shuffle Write 	Shuffle Spill (Memory) 	Shuffle 
Spill (Disk)

1   10 s10  0   10  234.6 MB0.0 B   43.2 MB 
0.0 B   0.0 B
2   3 s 3   0   3   70.4 MB 0.0 B   13.0 MB 
0.0 B   0.0 B
3   6 s 6   0   6   140.7 MB0.0 B   25.9 MB 
0.0 B   0.0 B
4   9 s 8   0   8   187.9 MB0.0 B   34.6 MB 
0.0 B   0.0 B
5   10 s9   0   9   211.4 MB0.0 B   38.9 MB 
0.0 B   0.0 B

For a stage on mapPartitions at VertexRDD.scala:347
343
344   /** Generates an RDD of vertex attributes suitable for shipping to 
the edge partitions. */

345   private[graphx] def shipVertexAttributes(
346   shipSrc: Boolean, shipDst: Boolean): RDD[(PartitionID, 
VertexAttributeBlock[VD])] = {
347 
partitionsRDD.mapPartitions(_.flatMap(_.shipVertexAttributes(shipSrc, 
shipDst)))

348   }
349

This is executed for every iteration in Pregel, so the imbalance is bad 
for performance.


However, when run PageRank with Edge2D, the tasks are even across 
executors. (all finish 6 tasks)

Our configuration is 6 node, 36 partitions.

My questions is:

   What decides the number of tasks for different executors? And how to
   make it balance?

Thanks!
Larry



Working on LZOP Files

2014-09-25 Thread Harsha HN
Hi,

Anybody using LZOP files to process in Spark?

We have a huge volume of LZOP files in HDFS to process through Spark. In
MapReduce framework, it automatically detects the file format and sends the
decompressed version to Mappers.
Any such support in Spark?
As of now I am manually downloading, decompressing it before processing.

Thanks,
Harsha


RE: MLUtils.loadLibSVMFile error

2014-09-25 Thread Sameer Tilak
Hi Liquan,
Thanks. I was running this in spark-shell. I was able to resolve this issue by 
creating an app and then submitting it via spark-submit in yarn-client mode. I 
have seen this happening before as well -- submitting via spark-shell has 
memory issues.  The same code then works fine when submitted as an app in 
spark-submit yarn-client mode. I am not sure whether this is due to difference 
between spark-shell and spark-submit or yarn vs non-yarn mode.

Date: Wed, 24 Sep 2014 22:13:35 -0700
Subject: Re: MLUtils.loadLibSVMFile error
From: liquan...@gmail.com
To: ssti...@live.com
CC: so...@cloudera.com; user@spark.apache.org

Hi Sameer,
I think there are two things that you can do1) What is your current 
driver-memory or executor-memory, you can try to Increate driver-memory or 
executor-memory to see if that solves your problem. 2) How many features in 
your data? Two many features may create a large number of temp objects, which 
may also cause GC to happen. 
Hope this helps!Liquan
On Wed, Sep 24, 2014 at 9:50 PM, Sameer Tilak ssti...@live.com wrote:



Hi All,I was able to solve this formatting issue. However, I have another 
question. When I do the following, 
val examples: RDD[LabeledPoint] 
=MLUtils.loadLibSVMFile(sc,structured/results/data.txt)
I get java.lang.OutOfMemoryError: GC overhead limit exceeded error. Is it 
possible to specify the number of partitions explicitly? 
I want to add that this dataset is sparse and is fairly small -- ~250 MB.
Error log:
14/09/24 21:41:02 ERROR Executor: Exception in task ID 
0java.lang.OutOfMemoryError: GC overhead limit exceeded  at 
java.util.regex.Pattern.compile(Pattern.java:1655)   at 
java.util.regex.Pattern.init(Pattern.java:1337)at 
java.util.regex.Pattern.compile(Pattern.java:1022)   at 
java.lang.String.split(String.java:2313) at 
java.lang.String.split(String.java:2355) at 
scala.collection.immutable.StringLike$class.split(StringLike.scala:201)  at 
scala.collection.immutable.StringOps.split(StringOps.scala:31)   at 
org.apache.spark.mllib.util.MLUtils$$anonfun$4$$anonfun$5.apply(MLUtils.scala:80)
at 
org.apache.spark.mllib.util.MLUtils$$anonfun$4$$anonfun$5.apply(MLUtils.scala:79)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)  at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:244)at 
scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)  at 
org.apache.spark.mllib.util.MLUtils$$anonfun$4.apply(MLUtils.scala:79)   at 
org.apache.spark.mllib.util.MLUtils$$anonfun$4.apply(MLUtils.scala:76)   at 
scala.collection.Iterator$$anon$11.next(Iterator.scala:328)  at 
scala.collection.Iterator$class.foreach(Iterator.scala:727)  at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1157)   at 
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)at 
org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:107)   at 
org.apache.spark.rdd.RDD.iterator(RDD.scala:227) at 
org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)   at 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)  at 
org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at 
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)  at 
org.apache.spark.scheduler.Task.run(Task.scala:51)   at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)14/09/24
 21:41:02 ERROR ExecutorUncaughtExceptionHandler: Uncaught exception in thread 
Thread[Executor task launch worker-0,5,main]java.lang.OutOfMemoryError: GC 
overhead limit exceeded at 
java.util.regex.Pattern.compile(Pattern.java:1655)   at 
java.util.regex.Pattern.init(Pattern.java:1337)at 
java.util.regex.Pattern.compile(Pattern.java:1022)   at 
java.lang.String.split(String.java:2313) at 
java.lang.String.split(String.java:2355) at 
scala.collection.immutable.StringLike$class.split(StringLike.scala:201)  at 
scala.collection.immutable.StringOps.split(StringOps.scala:31)   at 
org.apache.spark.mllib.util.MLUtils$$anonfun$4$$anonfun$5.apply(MLUtils.scala:80)
at 
org.apache.spark.mllib.util.MLUtils$$anonfun$4$$anonfun$5.apply(MLUtils.scala:79)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at 

Optimal Partition Strategy

2014-09-25 Thread Muttineni, Vinay
Hello,
A bit of a background.
I have a dataset with about 200 million records and around 10 columns. The size 
of this dataset is around 1.5Tb and is split into around 600 files.
When I read this dataset, using sparkContext, by default it creates around 3000 
partitions if I do not specify the number of partitions in the textFile() 
command.
Now I see that even though my spark application has around 400 executors 
assigned to it, the data is spread out only to about 200 of them. I am using 
.cache() method to hold my data in-memory.
Each of these 200 executors, each with a total available memory of 6Gb, are now 
having multiple blocks and are thus using up their entire memory by caching the 
data.

Even though I have about 400 machines, only about 200 of them are actually 
being used.
Now, my question is:

How do I partition my data so all 400 of the executors have some chunks of the 
data, thus better parallelizing my work?
So, instead of only about 200 machines having about 6Gb of data each, I would 
like to have 400 machines with about 3Gb data each.




Any idea on how I can set about achieving the above?
Thanks,
Vinay


Re: Pregel messages serialized in local machine?

2014-09-25 Thread Ankur Dave
At 2014-09-25 06:52:46 -0700, Cheuk Lam chl...@hotmail.com wrote:
 This is a question on using the Pregel function in GraphX.  Does a message
 get serialized and then de-serialized in the scenario where both the source
 and the destination vertices are in the same compute node/machine?

Yes, message passing currently uses partitionBy, which shuffles all messages, 
including ones to the same machine, by serializing them and writing them to 
disk.

Ankur

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Question About Submit Application

2014-09-25 Thread Marcelo Vanzin
Then I think it's time for you to look at the Spark Master logs...

On Thu, Sep 25, 2014 at 7:51 AM, danilopds danilob...@gmail.com wrote:
 Hi Marcelo,

 Yes, I can ping spark-01 and I also include the IP and host in my file
 /etc/hosts.
 My VM can ping the local machine too.



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Question-About-Submit-Application-tp15072p15145.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark Streaming + Actors

2014-09-25 Thread Madabhattula Rajesh Kumar
Hi Team,

Can I use Actors in Spark Streaming based on events type? Could you please
review below Test program and let me know if any thing I need to change
with respect to best practices

import akka.actor.Actor
import akka.actor.{ActorRef, Props}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import akka.actor.ActorSystem

case class one(r: org.apache.spark.rdd.RDD[String])
case class two(s: org.apache.spark.rdd.RDD[String])

class Events extends Actor
{
  def receive = {
// Based on event type - Invoke respective methods asynchronously
case one(r) = println(ONE COUNT + r.count) // Invoke respective
functions
case two(s) = println(TWO COUNT + s.count) // Invoke respective
functions
  }
}

object Test {

def main(args: Array[String]) {
val system = ActorSystem(System)
val event: ActorRef = system.actorOf(Props[Events], events)
val sparkConf = new SparkConf() setAppName(AlertsLinesCount)
setMaster(local)
val ssc = new StreamingContext(sparkConf, Seconds(30))
val lines = ssc
textFileStream(hdfs://localhost:9000/user/rajesh/EventsDirectory/)
lines foreachRDD(x = {
  event ! one(x)
  event ! two(x)
})
ssc.start
ssc.awaitTermination
}
}

Regards,
Rajesh


Re: Yarn number of containers

2014-09-25 Thread Marcelo Vanzin
On Thu, Sep 25, 2014 at 8:55 AM, jamborta jambo...@gmail.com wrote:
 I am running spark with the default settings in yarn client mode. For some
 reason yarn always allocates three containers to the application (wondering
 where it is set?), and only uses two of them.

The default number of executors in Yarn mode is 2; so you have 2
executors + the application master, so 3 containers.

 Also the cpus on the cluster never go over 50%, I turned off the fair
 scheduler and set high spark.cores.max. Is there some additional settings I
 am missing?

You probably need to request more cores (--executor-cores). Don't
remember if that is respected in Yarn, but should be.

-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: MLUtils.loadLibSVMFile error

2014-09-25 Thread Liquan Pei
Hi Sameer,

When starting spark-shell, by default, the JVM for spark-shell only have
512M memory. For a quick hack, you can use
SPARK_MEM=4g bin/spark-shell to set JVM memory to be 4g. For more
information, you can refer
http://spark.apache.org/docs/latest/cluster-overview.html

Thanks,
Liquan

On Thu, Sep 25, 2014 at 9:46 AM, Sameer Tilak ssti...@live.com wrote:

 Hi Liquan,

 Thanks. I was running this in spark-shell. I was able to resolve this
 issue by creating an app and then submitting it via spark-submit in
 yarn-client mode. I have seen this happening before as well -- submitting
 via spark-shell has memory issues.  The same code then works fine when
 submitted as an app in spark-submit yarn-client mode. I am not sure whether
 this is due to difference between spark-shell and spark-submit or yarn vs
 non-yarn mode.


 --
 Date: Wed, 24 Sep 2014 22:13:35 -0700
 Subject: Re: MLUtils.loadLibSVMFile error
 From: liquan...@gmail.com
 To: ssti...@live.com
 CC: so...@cloudera.com; user@spark.apache.org


 Hi Sameer,

 I think there are two things that you can do
 1) What is your current driver-memory or executor-memory, you can try to
 Increate driver-memory or executor-memory to see if that solves your
 problem.
 2) How many features in your data? Two many features may create a large
 number of temp objects, which may also cause GC to happen.

 Hope this helps!
 Liquan

 On Wed, Sep 24, 2014 at 9:50 PM, Sameer Tilak ssti...@live.com wrote:

 Hi All,
 I was able to solve this formatting issue. However, I have another
 question. When I do the following,

 val examples: RDD[LabeledPoint] =
 MLUtils.loadLibSVMFile(sc,structured/results/data.txt)

 I get java.lang.OutOfMemoryError: GC overhead limit exceeded error. Is it
 possible to specify the number of partitions explicitly?

 I want to add that this dataset is sparse and is fairly small -- ~250 MB.

 Error log:

 14/09/24 21:41:02 ERROR Executor: Exception in task ID 0
 java.lang.OutOfMemoryError: GC overhead limit exceeded
 at java.util.regex.Pattern.compile(Pattern.java:1655)
 at java.util.regex.Pattern.init(Pattern.java:1337)
 at java.util.regex.Pattern.compile(Pattern.java:1022)
 at java.lang.String.split(String.java:2313)
 at java.lang.String.split(String.java:2355)
 at scala.collection.immutable.StringLike$class.split(StringLike.scala:201)
 at scala.collection.immutable.StringOps.split(StringOps.scala:31)
 at
 org.apache.spark.mllib.util.MLUtils$$anonfun$4$$anonfun$5.apply(MLUtils.scala:80)
 at
 org.apache.spark.mllib.util.MLUtils$$anonfun$4$$anonfun$5.apply(MLUtils.scala:79)
 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
 at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
 at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
 at org.apache.spark.mllib.util.MLUtils$$anonfun$4.apply(MLUtils.scala:79)
 at org.apache.spark.mllib.util.MLUtils$$anonfun$4.apply(MLUtils.scala:76)
 at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
 at
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
 at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:107)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:227)
 at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
 at org.apache.spark.scheduler.Task.run(Task.scala:51)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 14/09/24 21:41:02 ERROR ExecutorUncaughtExceptionHandler: Uncaught
 exception in thread Thread[Executor task launch worker-0,5,main]
 java.lang.OutOfMemoryError: GC overhead limit exceeded
 at java.util.regex.Pattern.compile(Pattern.java:1655)
 at java.util.regex.Pattern.init(Pattern.java:1337)
 at java.util.regex.Pattern.compile(Pattern.java:1022)
 at java.lang.String.split(String.java:2313)
 at java.lang.String.split(String.java:2355)
 at scala.collection.immutable.StringLike$class.split(StringLike.scala:201)
 at scala.collection.immutable.StringOps.split(StringOps.scala:31)
 at
 org.apache.spark.mllib.util.MLUtils$$anonfun$4$$anonfun$5.apply(MLUtils.scala:80)
 at
 

Re: SPARK 1.1.0 on yarn-cluster and external JARs

2014-09-25 Thread Marcelo Vanzin
You can pass the HDFS location of those extra jars in the spark-submit
--jars argument. Spark will take care of using Yarn's distributed
cache to make them available to the executors. Note that you may need
to provide the full hdfs URL (not just the path, since that will be
interpreted as a local path, IIRC).

Also, you may run into SPARK-3560, which has been fixed since but is
not in 1.1.0.

On Thu, Sep 25, 2014 at 5:25 AM, rzykov rzy...@gmail.com wrote:
 We build some SPARK jobs with external jars. I compile jobs by including them
 in one assembly.
 But look for an approach to put all external jars into HDFS.

 We have already put  spark jar in a HDFS folder and set up the variable
 SPARK_JAR.
 What is the best way to do that for other external jars (MongoDB, algebird
 and so on)?

 Thanks in advance





 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/SPARK-1-1-0-on-yarn-cluster-and-external-JARs-tp15136.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: java.lang.NegativeArraySizeException in pyspark

2014-09-25 Thread Brad Miller
Hi Davies,

Thanks for your help.

I ultimately re-wrote the code to use broadcast variables, and then
received an error when trying to broadcast self.all_models that the size
did not fit in an int (recall that broadcasts use 32 bit ints to store
size), suggesting that it was in fact over 2G.  I don't know why the
previous tests (described above) where duplicated portions of
self.all_models worked (it could have been an error in either my debugging
or notes), but splitting the self.all_models into a separate broadcast
variable for each element worked.  I avoided broadcast variables for a
while since there was no way to unpersist them in pyspark, but now that
there is you're completely right that using broadcast is the correct way to
code this.

best,
-Brad

On Tue, Sep 23, 2014 at 12:16 PM, Davies Liu dav...@databricks.com wrote:

 Or maybe there is a bug related to the base64 in py4j, could you
 dumps the serialized bytes of closure to verify this?

 You could add a line in spark/python/pyspark/rdd.py:

 ser = CloudPickleSerializer()
 pickled_command = ser.dumps(command)
 +  print len(pickled_command), repr(pickled_command)


 On Tue, Sep 23, 2014 at 11:02 AM, Brad Miller
 bmill...@eecs.berkeley.edu wrote:
  Hi Davies,
 
  That's interesting to know.  Here's more details about my code.  The
 object
  (self) contains pointers to the spark_context (which seems to generate
  errors during serialization) so I strip off the extra state using the
 outer
  lambda function and just pass the value self.all_models into the map.
  all_models is a list of length 9 where each element contains 3 numbers
 (ints
  or floats, can't remember) and then one LinearSVC object.  The classifier
  was trained over ~2.5M features, so the object isn't small, but probably
  shouldn't be 150M either.  Additionally, the call ran OK when I use
 either
  2x the first 5 objects or 2x the last 5 objects (another reason why it
 seems
  unlikely the bug was size related).
 
  def _predict_all_models(all_models, sample):
  scores = []
  for _, (_, _, classifier) in all_models:
  score = classifier.decision_function(sample[VALUE][RECORD])
  scores.append(float(score))
  return (sample[VALUE][LABEL], scores)
 
  # fails
  #return (lambda am: testing_feature_rdd.map(lambda x:
  _predict_all_models(am, x))) (self.all_models)
  # works
  #return (lambda am: testing_feature_rdd.map(lambda x:
  _predict_all_models(am, x))) (self.all_models[:5] + self.all_models[:5])
  #return (lambda am: testing_feature_rdd.map(lambda x:
  _predict_all_models(am, x))) (self.all_models[4:] + self.all_models[4:])
 
  I've since written a work-around into my code, but if I get a chance I'll
  switch to broadcast variables and see whether that works.
 
  later,
  -brad
 
  On Mon, Sep 22, 2014 at 11:12 AM, Davies Liu dav...@databricks.com
 wrote:
 
  The traceback said that the serialized closure cannot be parsed (base64)
  correctly by py4j.
 
  The string in Java cannot be longer than 2G, so the serialized closure
  cannot longer than 1.5G (there are overhead in base64), is it possible
  that your data used in the map function is so big? If it's, you should
  use broadcast for it.
 
  In master of Spark, we will use broadcast automatically if the closure
  is too big. (but use broadcast explicitly is always better).
 
  On Sat, Sep 20, 2014 at 12:42 PM, Brad Miller
  bmill...@eecs.berkeley.edu wrote:
   Hi All,
  
   I'm experiencing a java.lang.NegativeArraySizeException in a pyspark
   script
   I have.  I've pasted the full traceback at the end of this email.
  
   I have isolated the line of code in my script which causes the
   exception
   to occur. Although the exception seems to occur deterministically, it
 is
   very unclear why the different variants of the line would cause the
   exception to occur. Unfortunately, I am only able to reproduce the bug
   in
   the context of a large data processing job, and the line of code which
   must
   change to reproduce the bug has little meaning out of context.  The
 bug
   occurs when I call map on an RDD with a function that references
 some
   state outside of the RDD (which is presumably bundled up and
 distributed
   with the function).  The output of the function is a tuple where the
   first
   element is an int and the second element is a list of floats (same
   positive
   length every time, as verified by an 'assert' statement).
  
   Given that:
   -It's unclear why changes in the line would cause an exception
   -The exception comes from within pyspark code
   -The exception has to do with negative array sizes (and I couldn't
 have
   created a negative sized array anywhere in my python code)
   I suspect this is a bug in pyspark.
  
   Has anybody else observed or reported this bug?
  
   best,
   -Brad
  
   Traceback (most recent call last):
 File /home/bmiller1/pipeline/driver.py, line 214, in module
   main()
 File 

Re:

2014-09-25 Thread Ted Yu
I followed linked JIRAs to HDFS-7005 which is in hadoop 2.6.0

Any chance of deploying 2.6.0-SNAPSHOT to see if the problem goes away ?

On Wed, Sep 24, 2014 at 10:54 PM, Jianshi Huang jianshi.hu...@gmail.com
wrote:

 Looks like it's a HDFS issue, pretty new.

 https://issues.apache.org/jira/browse/HDFS-6999

 Jianshi

 On Thu, Sep 25, 2014 at 12:10 AM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

 Hi Ted,

 See my previous reply to Debasish, all region servers are idle. I don't
 think it's caused by hotspotting.

 Besides, only 6 out of 3000 tasks were stuck, and their inputs are about
 only 80MB each.

 Jianshi

 On Wed, Sep 24, 2014 at 11:58 PM, Ted Yu yuzhih...@gmail.com wrote:

 I was thinking along the same line.

 Jianshi:
 See
 http://hbase.apache.org/book.html#d0e6369

 On Wed, Sep 24, 2014 at 8:56 AM, Debasish Das debasish.da...@gmail.com
 wrote:

 HBase regionserver needs to be balancedyou might have some skewness
 in row keys and one regionserver is under pressuretry finding that key
 and replicate it using random salt

 On Wed, Sep 24, 2014 at 8:51 AM, Jianshi Huang jianshi.hu...@gmail.com
  wrote:

 Hi Ted,

 It converts RDD[Edge] to HBase rowkey and columns and insert them to
 HBase (in batch).

 BTW, I found batched Put actually faster than generating HFiles...


 Jianshi

 On Wed, Sep 24, 2014 at 11:49 PM, Ted Yu yuzhih...@gmail.com wrote:

 bq. at com.paypal.risk.rds.dragon.
 storage.hbase.HbaseRDDBatch$$anonfun$batchInsertEdges$3.
 apply(HbaseRDDBatch.scala:179)

 Can you reveal what HbaseRDDBatch.scala does ?

 Cheers

 On Wed, Sep 24, 2014 at 8:46 AM, Jianshi Huang 
 jianshi.hu...@gmail.com wrote:

 One of my big spark program always get stuck at 99% where a few
 tasks never finishes.

 I debugged it by printing out thread stacktraces, and found there're
 workers stuck at parquet.hadoop.ParquetFileReader.readNextRowGroup.

 Anyone had similar problem? I'm using Spark 1.1.0 built for HDP2.1.
 The parquet files are generated by pig using latest parquet-pig-bundle
 v1.6.0rc1.

 From Spark 1.1.0's pom.xml, Spark is using parquet v1.4.3, will this
 be problematic?

 One of the weird behavior is that another program read and sort data
 read from the same parquet files and it works fine. The only difference
 seems the buggy program uses foreachPartition and the working program 
 uses
 map.

 Here's the full stacktrace:

 Executor task launch worker-3
java.lang.Thread.State: RUNNABLE
 at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
 at
 sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:257)
 at
 sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
 at
 sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87)
 at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98)
 at
 org.apache.hadoop.net.SocketIOWithTimeout$SelectorPool.select(SocketIOWithTimeout.java:335)
 at
 org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:157)
 at
 org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)
 at
 org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.readChannelFully(PacketReceiver.java:258)
 at
 org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doReadFully(PacketReceiver.java:209)
 at
 org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(PacketReceiver.java:171)
 at
 org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.receiveNextPacket(PacketReceiver.java:102)
 at
 org.apache.hadoop.hdfs.RemoteBlockReader2.readNextPacket(RemoteBlockReader2.java:173)
 at
 org.apache.hadoop.hdfs.RemoteBlockReader2.read(RemoteBlockReader2.java:138)
 at
 org.apache.hadoop.hdfs.DFSInputStream$ByteArrayStrategy.doRead(DFSInputStream.java:683)
 at
 org.apache.hadoop.hdfs.DFSInputStream.readBuffer(DFSInputStream.java:739)
 at
 org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:796)
 at
 org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:837)
 at
 java.io.DataInputStream.readFully(DataInputStream.java:195)
 at
 java.io.DataInputStream.readFully(DataInputStream.java:169)
 at
 parquet.hadoop.ParquetFileReader$ConsecutiveChunkList.readAll(ParquetFileReader.java:599)
 at
 parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:360)
 at
 parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:100)
 at
 parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:172)
 at
 parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:130)
 at
 org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:139)
 at
 org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
 at
 scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)

Re: Multiple Kafka Receivers and Union

2014-09-25 Thread Matt Narrell
Tim,

I think I understand this now.  I had a five node Spark cluster and a five 
partition topic, and I created five receivers.  I found this:  
http://stackoverflow.com/questions/25785581/custom-receiver-stalls-worker-in-spark-streaming
 Indicating that if I use all my workers as receivers, there are none left to 
do the processing.  If I drop the number of partitions/receivers down while 
still having multiple unioned receivers, I see messages.

mn

On Sep 25, 2014, at 10:18 AM, Matt Narrell matt.narr...@gmail.com wrote:

 I suppose I have other problems as I can’t get the Scala example to work 
 either.  Puzzling, as I have literally coded like the examples (that are 
 purported to work), but no luck.
 
 mn
 
 On Sep 24, 2014, at 11:27 AM, Tim Smith secs...@gmail.com wrote:
 
 Maybe differences between JavaPairDStream and JavaPairReceiverInputDStream?
 
 On Wed, Sep 24, 2014 at 7:46 AM, Matt Narrell matt.narr...@gmail.com wrote:
 The part that works is the commented out, single receiver stream below the 
 loop.  It seems that when I call KafkaUtils.createStream more than once, I 
 don’t receive any messages.
 
 I’ll dig through the logs, but at first glance yesterday I didn’t see 
 anything suspect.  I’ll have to look closer.
 
 mn
 
 On Sep 23, 2014, at 6:14 PM, Tim Smith secs...@gmail.com wrote:
 
 Maybe post the before-code as in what was the code before you did the
 loop (that worked)? I had similar situations where reviewing code
 before (worked) and after (does not work) helped. Also, what helped is
 the Scala REPL because I can see what are the object types being
 returned by each statement.
 
 Other than code, in the driver logs, you should see events that say
 Registered receiver for stream 0 from
 akka.tcp://sp...@node5.acme.net:53135
 
 Now, if you goto node5 and look at Spark or YarnContainer logs
 (depending on who's doing RM), you should be able to see if the
 receiver has any errors when trying to talk to kafka.
 
 
 
 On Tue, Sep 23, 2014 at 3:21 PM, Matt Narrell matt.narr...@gmail.com 
 wrote:
 To my eyes, these are functionally equivalent.  I’ll try a Scala 
 approach, but this may cause waves for me upstream (e.g., non-Java)
 
 Thanks for looking at this.  If anyone else can see a glaring issue in 
 the Java approach that would be appreciated.
 
 Thanks,
 Matt
 
 On Sep 23, 2014, at 4:13 PM, Tim Smith secs...@gmail.com wrote:
 
 Sorry, I am almost Java illiterate but here's my Scala code to do the
 equivalent (that I have tested to work):
 
 val kInStreams = (1 to 10).map{_ =
 KafkaUtils.createStream(ssc,zkhost.acme.net:2182,myGrp,Map(myTopic
 - 1), StorageLevel.MEMORY_AND_DISK_SER) } //Create 10 receivers
 across the cluster, one for each partition, potentially but active
 receivers are only as many kafka partitions you have
 
 val kInMsg = 
 ssc.union(kInStreams).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER)
 
 
 
 
 On Tue, Sep 23, 2014 at 2:20 PM, Matt Narrell matt.narr...@gmail.com 
 wrote:
 So, this is scrubbed some for confidentiality, but the meat of it is as 
 follows.  Note, that if I substitute the commented section for the 
 loop, I receive messages from the topic.
 
 SparkConf sparkConf = new SparkConf();
 sparkConf.set(spark.streaming.unpersist, true);
 sparkConf.set(spark.logConf, true);
 
 MapString, String kafkaProps = new HashMap();
 kafkaProps.put(zookeeper.connect, Constants.ZK_ENSEMBLE + /kafka);
 kafkaProps.put(group.id, groupId);
 
 JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, 
 Seconds.apply(1));
 jsc.checkpoint(hdfs://some_location);
 
 ListJavaPairDStreamString, ProtobufModel streamList = new 
 ArrayList(5);
 
 for (int i = 0; i  5; i++) {
 streamList.add(KafkaUtils.createStream(jsc,
String.class, 
 ProtobufModel.class,
StringDecoder.class, 
 ProtobufModelDecoder.class,
kafkaProps,
Collections.singletonMap(topic, 
 1),
StorageLevel.MEMORY_ONLY_SER()));
 }
 
 final JavaPairDStreamString, ProtobufModel stream = 
 jsc.union(streamList.get(0), streamList.subList(1, streamList.size()));
 
 //  final JavaPairReceiverInputDStreamString, ProtobufModel stream =
 //  KafkaUtils.createStream(jsc,
 //  String.class, 
 ProtobufModel.class,
 //  StringDecoder.class, 
 ProtobufModelDecoder.class,
 //  kafkaProps,
 //  
 Collections.singletonMap(topic, 5),
 //  
 StorageLevel.MEMORY_ONLY_SER());
 
 final JavaPairDStreamString, Integer tuples = stream.mapToPair(
 new PairFunctionTuple2String, ProtobufModel, String, Integer() {
 @Override
 public Tuple2String, Integer call(Tuple2String, 
 ProtobufModel tuple) 

Add Meetup

2014-09-25 Thread Brian Husted
Please add the Apache Spark Maryland meetup to the Spark website.

http://www.meetup.com/Apache-Spark-Maryland

Thanks!
Brian

*Brian Husted*

*Tetra Concepts, LLC*
tetraconcepts.com

*301.518.6994 (c)*
*866.618.1343 (f)*


Re: RDD of Iterable[String]

2014-09-25 Thread Liquan Pei
Hi Deep,

I believe that you are referring to the map for Iterable[String]

suppose you have
iter:Iterable[String]
you can do

newIter = iter.map(item = Item + a )

which will create an new Iterable[String] with each element appending  an
a to all string in iter.

Does this answer your question?

Liquan

On Thu, Sep 25, 2014 at 12:43 AM, Deep Pradhan pradhandeep1...@gmail.com
wrote:

 what should come in the map??
 Thanks Liquan for answering me...
 I really need some help..I am stuck in some thing.


 On Thu, Sep 25, 2014 at 12:57 PM, Deep Pradhan pradhandeep1...@gmail.com
 wrote:

 what should come in the map??

 On Wed, Sep 24, 2014 at 10:52 PM, Liquan Pei liquan...@gmail.com wrote:

 Hi Deep,

 The Iterable trait in scala has methods like map and reduce that you can
 use to iterate elements of Iterable[String]. You can also create an
 Iterator from the Iterable. For example, suppose you have

 val rdd: RDD[Iterable[String]]

 you can do

 rdd.map { x = //x has type Iterable[String]
x.map(...) // Process elements in iterable[String]
val iter:Iterator[String] = x.iterator
while(iter.hasNext) {
 iter.next()
}
 }

 Hope this helps!

 Liquan

 On Wed, Sep 24, 2014 at 8:21 AM, Deep Pradhan pradhandeep1...@gmail.com
  wrote:

 Can we iterate over RDD of Iterable[String]? How do we do that?
 Because the entire Iterable[String] seems to be a single element in the
 RDD.

 Thank You




 --
 Liquan Pei
 Department of Physics
 University of Massachusetts Amherst






-- 
Liquan Pei
Department of Physics
University of Massachusetts Amherst


Re: Multiple Kafka Receivers and Union

2014-09-25 Thread Matt Narrell
Additionally,

If I dial up/down the number of executor cores, this does what I want.  Thanks 
for the extra eyes!

mn

On Sep 25, 2014, at 12:34 PM, Matt Narrell matt.narr...@gmail.com wrote:

 Tim,
 
 I think I understand this now.  I had a five node Spark cluster and a five 
 partition topic, and I created five receivers.  I found this:  
 http://stackoverflow.com/questions/25785581/custom-receiver-stalls-worker-in-spark-streaming
  Indicating that if I use all my workers as receivers, there are none left to 
 do the processing.  If I drop the number of partitions/receivers down while 
 still having multiple unioned receivers, I see messages.
 
 mn
 
 On Sep 25, 2014, at 10:18 AM, Matt Narrell matt.narr...@gmail.com wrote:
 
 I suppose I have other problems as I can’t get the Scala example to work 
 either.  Puzzling, as I have literally coded like the examples (that are 
 purported to work), but no luck.
 
 mn
 
 On Sep 24, 2014, at 11:27 AM, Tim Smith secs...@gmail.com wrote:
 
 Maybe differences between JavaPairDStream and JavaPairReceiverInputDStream?
 
 On Wed, Sep 24, 2014 at 7:46 AM, Matt Narrell matt.narr...@gmail.com 
 wrote:
 The part that works is the commented out, single receiver stream below the 
 loop.  It seems that when I call KafkaUtils.createStream more than once, I 
 don’t receive any messages.
 
 I’ll dig through the logs, but at first glance yesterday I didn’t see 
 anything suspect.  I’ll have to look closer.
 
 mn
 
 On Sep 23, 2014, at 6:14 PM, Tim Smith secs...@gmail.com wrote:
 
 Maybe post the before-code as in what was the code before you did the
 loop (that worked)? I had similar situations where reviewing code
 before (worked) and after (does not work) helped. Also, what helped is
 the Scala REPL because I can see what are the object types being
 returned by each statement.
 
 Other than code, in the driver logs, you should see events that say
 Registered receiver for stream 0 from
 akka.tcp://sp...@node5.acme.net:53135
 
 Now, if you goto node5 and look at Spark or YarnContainer logs
 (depending on who's doing RM), you should be able to see if the
 receiver has any errors when trying to talk to kafka.
 
 
 
 On Tue, Sep 23, 2014 at 3:21 PM, Matt Narrell matt.narr...@gmail.com 
 wrote:
 To my eyes, these are functionally equivalent.  I’ll try a Scala 
 approach, but this may cause waves for me upstream (e.g., non-Java)
 
 Thanks for looking at this.  If anyone else can see a glaring issue in 
 the Java approach that would be appreciated.
 
 Thanks,
 Matt
 
 On Sep 23, 2014, at 4:13 PM, Tim Smith secs...@gmail.com wrote:
 
 Sorry, I am almost Java illiterate but here's my Scala code to do the
 equivalent (that I have tested to work):
 
 val kInStreams = (1 to 10).map{_ =
 KafkaUtils.createStream(ssc,zkhost.acme.net:2182,myGrp,Map(myTopic
 - 1), StorageLevel.MEMORY_AND_DISK_SER) } //Create 10 receivers
 across the cluster, one for each partition, potentially but active
 receivers are only as many kafka partitions you have
 
 val kInMsg = 
 ssc.union(kInStreams).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER)
 
 
 
 
 On Tue, Sep 23, 2014 at 2:20 PM, Matt Narrell matt.narr...@gmail.com 
 wrote:
 So, this is scrubbed some for confidentiality, but the meat of it is 
 as follows.  Note, that if I substitute the commented section for the 
 loop, I receive messages from the topic.
 
 SparkConf sparkConf = new SparkConf();
 sparkConf.set(spark.streaming.unpersist, true);
 sparkConf.set(spark.logConf, true);
 
 MapString, String kafkaProps = new HashMap();
 kafkaProps.put(zookeeper.connect, Constants.ZK_ENSEMBLE + /kafka);
 kafkaProps.put(group.id, groupId);
 
 JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, 
 Seconds.apply(1));
 jsc.checkpoint(hdfs://some_location);
 
 ListJavaPairDStreamString, ProtobufModel streamList = new 
 ArrayList(5);
 
 for (int i = 0; i  5; i++) {
 streamList.add(KafkaUtils.createStream(jsc,
String.class, 
 ProtobufModel.class,
StringDecoder.class, 
 ProtobufModelDecoder.class,
kafkaProps,
Collections.singletonMap(topic, 
 1),

 StorageLevel.MEMORY_ONLY_SER()));
 }
 
 final JavaPairDStreamString, ProtobufModel stream = 
 jsc.union(streamList.get(0), streamList.subList(1, streamList.size()));
 
 //  final JavaPairReceiverInputDStreamString, ProtobufModel stream =
 //  KafkaUtils.createStream(jsc,
 //  String.class, 
 ProtobufModel.class,
 //  StringDecoder.class, 
 ProtobufModelDecoder.class,
 //  kafkaProps,
 //  
 Collections.singletonMap(topic, 5),
 //  
 StorageLevel.MEMORY_ONLY_SER());
 
 final 

Re: K-means faster on Mahout then on Spark

2014-09-25 Thread bhusted
What is the size of your vector mine is set to 20? I am seeing slow results
as well with iteration=5, # of elements 200,000,000. 







--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/K-means-faster-on-Mahout-then-on-Spark-tp3195p15168.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



SPARK UI - Details post job processiong

2014-09-25 Thread Harsha HN
Hi,

Details laid out in Spark UI for the job in progress is really interesting
and very useful.
But this gets vanished once the job is done.
Is there a way to get job details post processing?

Looking for Spark UI data, not standard input,output and error info.

Thanks,
Harsha


Ungroup data

2014-09-25 Thread Luis Guerra
Hi everyone,

I need some advice about how to make the following: having a RDD of vectors
(each vector being Vector(Int, Int , Int, int)), I need to group the data,
then I need to apply a function to every group comparing each consecutive
item within a group and retaining a variable (that has to be added to the
end of each vector) if a condition from the comparison is true.

I show an example next:

(1, 2, 5, 2)
(1, 3, 4, 4)
(1, 3, 7, 3)
(1, 3, 4, 8)

Data are grouped by the two first fields, then for each group I have to
compare each consecutive fourth field,  the first field is used as initial
value and then,  if the next value is greater than the previous one that
will be the next retained value added to the vector.  So,  the output
should be:

(1, 2 , 5, 2, 2)
(1, 3 ,4, 4, 4)
(1, 3 , 7, 3, 4)
(1, 3, 4, 8, 8)

My attempt is a groupBy and then a map with a loop for inside,  then I have
to build a vector of vectors adding the new field. However,  I am not being
able to get the right output since I cannot add a new field to the vector.
I do not know either what should be the right output from the map to get
the same shape than the original data once it has been grouped.  Besides,
my though is that the loop for is not the best option to iterate through
the elements of each group.  And finally,  maybe this can be done with
other operations like reducebykey or so.

Any clue is very appreciated... Thanks in advance!


Spark streaming - submit new job version

2014-09-25 Thread demian
Hi. We are testing Spark streaming. Its looks awesome!

We are trying to figure how to submit a new version of a live forever job.
We have a job that streams metrics of a bunch of servers applying
transformations like .reduceByWindow and then stores the results in hdfs. 

If we submit this new version, there will be two jobs fighting for the same
stream and the results metrics were be incomplete in each window. If we stop
the original job and then submit the new one, metrics will be lost. 

Any thoughts?

Could two jobs share and aggregate streams?

Thanks,



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-submit-new-job-version-tp15173.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark Streaming: No parallelism in writing to database (MySQL)

2014-09-25 Thread maddenpj
I posted yesterday about a related issue but resolved it shortly after. I'm
using Spark Streaming to summarize event data from Kafka and save it to a
MySQL table. Currently the bottleneck is in writing to MySQL and I'm puzzled
as to how to speed it up. I've tried repartitioning with several different
values but it looks like only one worker is actually doing the writing to
MySQL. Obviously this is not ideal because I need the parallelism to insert
this data in a timely manner.

Here's the code https://gist.github.com/maddenpj/5032c76aeb330371a6e6
https://gist.github.com/maddenpj/5032c76aeb330371a6e6  

I'm running this on a cluster of 6 spark nodes (2 cores, 7.5 GB memory) and
tried repartition sizes of 6, 12 and 48. How do I ensure that there is
parallelism in writing to the database? 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-No-parallelism-in-writing-to-database-MySQL-tp15174.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Kryo UnsupportedOperationException

2014-09-25 Thread Sandy Ryza
We're running into an error (below) when trying to read spilled shuffle
data back in.

Has anybody encountered this before / is anybody familiar with what causes
these Kryo UnsupportedOperationExceptions?

any guidance appreciated,
Sandy

---
com.esotericsoftware.kryo.KryoException
(com.esotericsoftware.kryo.KryoException:
java.lang.UnsupportedOperationException Serialization trace: omitted
variable name (omitted class name) omitted variable name (omitted
class name))

com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626)

com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)

com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)

com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)

com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)

com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)

com.twitter.chill.TraversableSerializer.read(Traversable.scala:44)
...


Re: Yarn number of containers

2014-09-25 Thread Tamas Jambor
Thank you.

Where is the number of containers set?

On Thu, Sep 25, 2014 at 7:17 PM, Marcelo Vanzin van...@cloudera.com wrote:
 On Thu, Sep 25, 2014 at 8:55 AM, jamborta jambo...@gmail.com wrote:
 I am running spark with the default settings in yarn client mode. For some
 reason yarn always allocates three containers to the application (wondering
 where it is set?), and only uses two of them.

 The default number of executors in Yarn mode is 2; so you have 2
 executors + the application master, so 3 containers.

 Also the cpus on the cluster never go over 50%, I turned off the fair
 scheduler and set high spark.cores.max. Is there some additional settings I
 am missing?

 You probably need to request more cores (--executor-cores). Don't
 remember if that is respected in Yarn, but should be.

 --
 Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Yarn number of containers

2014-09-25 Thread Marcelo Vanzin
From spark-submit --help:

 YARN-only:
  --executor-cores NUMNumber of cores per executor (Default: 1).
  --queue QUEUE_NAME  The YARN queue to submit to (Default: default).
  --num-executors NUM Number of executors to launch (Default: 2).
  --archives ARCHIVES Comma separated list of archives to be
extracted into the
  working directory of each executor.

On Thu, Sep 25, 2014 at 2:20 PM, Tamas Jambor jambo...@gmail.com wrote:
 Thank you.

 Where is the number of containers set?

 On Thu, Sep 25, 2014 at 7:17 PM, Marcelo Vanzin van...@cloudera.com wrote:
 On Thu, Sep 25, 2014 at 8:55 AM, jamborta jambo...@gmail.com wrote:
 I am running spark with the default settings in yarn client mode. For some
 reason yarn always allocates three containers to the application (wondering
 where it is set?), and only uses two of them.

 The default number of executors in Yarn mode is 2; so you have 2
 executors + the application master, so 3 containers.

 Also the cpus on the cluster never go over 50%, I turned off the fair
 scheduler and set high spark.cores.max. Is there some additional settings I
 am missing?

 You probably need to request more cores (--executor-cores). Don't
 remember if that is respected in Yarn, but should be.

 --
 Marcelo



-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: java.lang.NegativeArraySizeException in pyspark

2014-09-25 Thread Davies Liu
On Thu, Sep 25, 2014 at 11:25 AM, Brad Miller
bmill...@eecs.berkeley.edu wrote:
 Hi Davies,

 Thanks for your help.

 I ultimately re-wrote the code to use broadcast variables, and then received
 an error when trying to broadcast self.all_models that the size did not fit
 in an int (recall that broadcasts use 32 bit ints to store size),

What is the error? Could you file a JIRA for it?

 that it was in fact over 2G.  I don't know why the previous tests (described
 above) where duplicated portions of self.all_models worked (it could have
 been an error in either my debugging or notes), but splitting the
 self.all_models into a separate broadcast variable for each element worked.
 I avoided broadcast variables for a while since there was no way to
 unpersist them in pyspark, but now that there is you're completely right
 that using broadcast is the correct way to code this.

In 1.1, you could use broadcast.unpersist() to release it, also the performance
of Python Broadcast was much improved in 1.1.


 best,
 -Brad

 On Tue, Sep 23, 2014 at 12:16 PM, Davies Liu dav...@databricks.com wrote:

 Or maybe there is a bug related to the base64 in py4j, could you
 dumps the serialized bytes of closure to verify this?

 You could add a line in spark/python/pyspark/rdd.py:

 ser = CloudPickleSerializer()
 pickled_command = ser.dumps(command)
 +  print len(pickled_command), repr(pickled_command)


 On Tue, Sep 23, 2014 at 11:02 AM, Brad Miller
 bmill...@eecs.berkeley.edu wrote:
  Hi Davies,
 
  That's interesting to know.  Here's more details about my code.  The
  object
  (self) contains pointers to the spark_context (which seems to generate
  errors during serialization) so I strip off the extra state using the
  outer
  lambda function and just pass the value self.all_models into the map.
  all_models is a list of length 9 where each element contains 3 numbers
  (ints
  or floats, can't remember) and then one LinearSVC object.  The
  classifier
  was trained over ~2.5M features, so the object isn't small, but probably
  shouldn't be 150M either.  Additionally, the call ran OK when I use
  either
  2x the first 5 objects or 2x the last 5 objects (another reason why it
  seems
  unlikely the bug was size related).
 
  def _predict_all_models(all_models, sample):
  scores = []
  for _, (_, _, classifier) in all_models:
  score = classifier.decision_function(sample[VALUE][RECORD])
  scores.append(float(score))
  return (sample[VALUE][LABEL], scores)
 
  # fails
  #return (lambda am: testing_feature_rdd.map(lambda x:
  _predict_all_models(am, x))) (self.all_models)
  # works
  #return (lambda am: testing_feature_rdd.map(lambda x:
  _predict_all_models(am, x))) (self.all_models[:5] + self.all_models[:5])
  #return (lambda am: testing_feature_rdd.map(lambda x:
  _predict_all_models(am, x))) (self.all_models[4:] + self.all_models[4:])
 
  I've since written a work-around into my code, but if I get a chance
  I'll
  switch to broadcast variables and see whether that works.
 
  later,
  -brad
 
  On Mon, Sep 22, 2014 at 11:12 AM, Davies Liu dav...@databricks.com
  wrote:
 
  The traceback said that the serialized closure cannot be parsed
  (base64)
  correctly by py4j.
 
  The string in Java cannot be longer than 2G, so the serialized closure
  cannot longer than 1.5G (there are overhead in base64), is it possible
  that your data used in the map function is so big? If it's, you should
  use broadcast for it.
 
  In master of Spark, we will use broadcast automatically if the closure
  is too big. (but use broadcast explicitly is always better).
 
  On Sat, Sep 20, 2014 at 12:42 PM, Brad Miller
  bmill...@eecs.berkeley.edu wrote:
   Hi All,
  
   I'm experiencing a java.lang.NegativeArraySizeException in a pyspark
   script
   I have.  I've pasted the full traceback at the end of this email.
  
   I have isolated the line of code in my script which causes the
   exception
   to occur. Although the exception seems to occur deterministically, it
   is
   very unclear why the different variants of the line would cause the
   exception to occur. Unfortunately, I am only able to reproduce the
   bug
   in
   the context of a large data processing job, and the line of code
   which
   must
   change to reproduce the bug has little meaning out of context.  The
   bug
   occurs when I call map on an RDD with a function that references
   some
   state outside of the RDD (which is presumably bundled up and
   distributed
   with the function).  The output of the function is a tuple where the
   first
   element is an int and the second element is a list of floats (same
   positive
   length every time, as verified by an 'assert' statement).
  
   Given that:
   -It's unclear why changes in the line would cause an exception
   -The exception comes from within pyspark code
   -The exception has to do with negative array sizes (and I couldn't
   have
   created a negative 

Re: java.lang.OutOfMemoryError while running SVD MLLib example

2014-09-25 Thread Shailesh Birari
Hi Xianguri,

After setting SVD to smaller value (200) its working.

Thanks,
  Shailesh



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-OutOfMemoryError-while-running-SVD-MLLib-example-tp14972p15179.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming: No parallelism in writing to database (MySQL)

2014-09-25 Thread maddenpj
Update for posterity, so once again I solved the problem shortly after
posting to the mailing list. So updateStateByKey uses the default
partitioner, which in my case seemed like it was set to one.

Changing my call from .updateStateByKey[Long](updateFn) -
.updateStateByKey[Long](updateFn, numPartitions) resolved it for me.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-No-parallelism-in-writing-to-database-MySQL-tp15174p15182.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Kryo UnsupportedOperationException

2014-09-25 Thread Ian O'Connell
I would guess the field serializer is having issues being able to
reconstruct the class again, its pretty much best effort.

Is this an intermediate type?

On Thu, Sep 25, 2014 at 2:12 PM, Sandy Ryza sandy.r...@cloudera.com wrote:

 We're running into an error (below) when trying to read spilled shuffle
 data back in.

 Has anybody encountered this before / is anybody familiar with what causes
 these Kryo UnsupportedOperationExceptions?

 any guidance appreciated,
 Sandy

 ---
 com.esotericsoftware.kryo.KryoException
 (com.esotericsoftware.kryo.KryoException:
 java.lang.UnsupportedOperationException Serialization trace: omitted
 variable name (omitted class name) omitted variable name (omitted
 class name))


 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626)


 com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)

 com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)


 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)


 com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)

 com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)

 com.twitter.chill.TraversableSerializer.read(Traversable.scala:44)
 ...




Re: Spark Streaming: No parallelism in writing to database (MySQL)

2014-09-25 Thread Buntu Dev
Thanks for the update.. I'm interested in writing the results to MySQL as
well, can you share some light or code sample on how you setup the
driver/connection pool/etc.?

On Thu, Sep 25, 2014 at 4:00 PM, maddenpj madde...@gmail.com wrote:

 Update for posterity, so once again I solved the problem shortly after
 posting to the mailing list. So updateStateByKey uses the default
 partitioner, which in my case seemed like it was set to one.

 Changing my call from .updateStateByKey[Long](updateFn) -
 .updateStateByKey[Long](updateFn, numPartitions) resolved it for me.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-No-parallelism-in-writing-to-database-MySQL-tp15174p15182.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Shuffle files

2014-09-25 Thread SK
Hi,

I am using Spark 1.1.0 on a cluster. My job takes as input 30 files in a
directory (I am using  sc.textfile(dir/*) ) to read in the files.  I am
getting the following warning:

WARN TaskSetManager: Lost task 99.0 in stage 1.0 (TID 99,
mesos12-dev.sccps.net): java.io.FileNotFoundException:
/tmp/spark-local-20140925215712-0319/12/shuffle_0_99_93138 (Too many open
files)

basically I think a lot of shuffle files are being created. 

1) The tasks eventually fail and the job just hangs (after taking very long,
more than an hour).  If I read these 30 files in a for loop, the same job
completes in a few minutes. However, I need to specify the files names,
which is not convenient. I am assuming that sc.textfile(dir/*) creates a
large RDD for all the 30 files. Is there a way to make the operation on this
large RDD efficient so as to avoid creating too many shuffle files?


2) Also, I am finding that all the shuffle files for my other completed jobs
are not being automatically deleted even after days. I thought that
sc.stop() clears the intermediate files.  Is there some way to
programmatically delete these temp shuffle files upon job completion?


thanks





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Shuffle-files-tp15185.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



spark-ec2 ERROR: Line magic function `%matplotlib` not found

2014-09-25 Thread Andy Davidson
Hi

I am running into trouble using iPython notebook on my cluster. Use the
following command to set the cluster up

$ ./spark-ec2 --key-pair=$KEY_PAIR --identity-file=$KEY_FILE
--region=$REGION --slaves=$NUM_SLAVES launch $CLUSTER_NAME


On master I launch python as follows
$ IPYTHON_OPTS=notebook --pylab inline --no-browser --port=7000
$SPARK_HOME/bin/pyspark


It looks like the problem is the cluster is using an old version of python
and python. Any idea how I can easily upgrade ? The following version works
on my mac

Thanks

Andy

{'commit_hash': '681fd77',
 'commit_source': 'installation',
 'default_encoding': 'UTF-8',
 'ipython_path': '/Library/Python/2.7/site-packages/IPython',
 'ipython_version': '2.1.0',
 'os_name': 'posix',
 'platform': 'Darwin-13.3.0-x86_64-i386-64bit',
 'sys_executable': '/usr/bin/python',
 'sys_platform': 'darwin',
 'sys_version': '2.7.5 (default, Mar  9 2014, 22:15:05) \n[GCC 4.2.1
Compatible Apple LLVM 5.0 (clang-500.0.68)]¹}







Is it possible to use Parquet with Dremel encoding

2014-09-25 Thread matthes
Hi again!

At the moment I try to use parquet and I want to keep the data into the
memory in an efficient way to make requests against the data as fast as
possible.
I read about parquet it is able to encode nested columns. Parquet uses the
Dremel encoding with definition and repetition levels. 
Is it at the moment possible to use this in spark as well or is it actually
not implemented? If yes, I’m not sure how to do it. I saw some examples,
they try to put some arrays or case classes in other case classes, nut I
don’t think that is the right way.  The other thing that I saw in this
relation was SchemaRDDs. 

Input:

Col1|   Col2|   Col3|   Col4
Int |   long|   long|   int
-
14  |   1234|   1422|   3
14  |   3212|   1542|   2
14  |   8910|   1422|   8
15  |   1234|   1542|   9
15  |   8897|   1422|   13

Want this Parquet-format:
Col3|   Col1|   Col4|   Col2
long|   int |   int |   long

1422|   14  |   3   |   1234
“   |   “   |   8   |   8910
“   |   15  |   13  |   8897
1542|   14  |   2   |   3212
“   |   15  |   9   |   1234

It would be awesome if somebody could give me a good hint how can I do that
or maybe a better way.

Best,
Matthes




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Is-it-possible-to-use-Parquet-with-Dremel-encoding-tp15186.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: K-means faster on Mahout then on Spark

2014-09-25 Thread Xiangrui Meng
Please also check the load balance of the RDD on YARN. How many
partitions are you using? Does it match the number of CPU cores?
-Xiangrui

On Thu, Sep 25, 2014 at 12:28 PM, bhusted brian.hus...@gmail.com wrote:
 What is the size of your vector mine is set to 20? I am seeing slow results
 as well with iteration=5, # of elements 200,000,000.







 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/K-means-faster-on-Mahout-then-on-Spark-tp3195p15168.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Job cancelled because SparkContext was shut down

2014-09-25 Thread jamborta
hi all,

I am getting this strange error about half way through the job (running
spark 1.1 on yarn client mode):

14/09/26 00:54:06 INFO ConnectionManager: key already cancelled ?
sun.nio.ch.SelectionKeyImpl@4d0155fb
java.nio.channels.CancelledKeyException
at
org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:386)
at
org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:139)
14/09/26 00:54:06 INFO YarnClientSchedulerBackend: Executor 1 disconnected,
so removing it

then a few minutes later the whole process dies:

14/09/26 01:00:12 ERROR YarnClientSchedulerBackend: Yarn application already
ended: FINISHED
14/09/26 01:00:13 INFO SparkUI: Stopped Spark web UI at
http://backend-dev:4040
14/09/26 01:00:13 INFO YarnClientSchedulerBackend: Shutting down all
executors
14/09/26 01:00:13 INFO YarnClientSchedulerBackend: Asking each executor to
shut down
[E 140926 01:00:13 base:56] Request failed
14/09/26 01:00:13 INFO YarnClientSchedulerBackend: Stopped
[E 140926 01:00:13 base:57] {'error_msg': type 'exceptions.Exception',
org.apache.spark.SparkException: Job cancelled because SparkContext was shut
down, traceback object at 0x4483cb0}

any idea what's going on here?

thanks,





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Job-cancelled-because-SparkContext-was-shut-down-tp15189.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Yarn number of containers

2014-09-25 Thread jamborta
thanks.


On Thu, Sep 25, 2014 at 10:25 PM, Marcelo Vanzin [via Apache Spark
User List] ml-node+s1001560n15177...@n3.nabble.com wrote:
 From spark-submit --help:

  YARN-only:
   --executor-cores NUMNumber of cores per executor (Default: 1).
   --queue QUEUE_NAME  The YARN queue to submit to (Default:
 default).
   --num-executors NUM Number of executors to launch (Default: 2).
   --archives ARCHIVES Comma separated list of archives to be
 extracted into the
   working directory of each executor.

 On Thu, Sep 25, 2014 at 2:20 PM, Tamas Jambor [hidden email] wrote:

 Thank you.

 Where is the number of containers set?

 On Thu, Sep 25, 2014 at 7:17 PM, Marcelo Vanzin [hidden email] wrote:
 On Thu, Sep 25, 2014 at 8:55 AM, jamborta [hidden email] wrote:
 I am running spark with the default settings in yarn client mode. For
 some
 reason yarn always allocates three containers to the application
 (wondering
 where it is set?), and only uses two of them.

 The default number of executors in Yarn mode is 2; so you have 2
 executors + the application master, so 3 containers.

 Also the cpus on the cluster never go over 50%, I turned off the fair
 scheduler and set high spark.cores.max. Is there some additional
 settings I
 am missing?

 You probably need to request more cores (--executor-cores). Don't
 remember if that is respected in Yarn, but should be.

 --
 Marcelo



 --
 Marcelo

 -
 To unsubscribe, e-mail: [hidden email]
 For additional commands, e-mail: [hidden email]



 
 If you reply to this email, your message will be added to the discussion
 below:
 http://apache-spark-user-list.1001560.n3.nabble.com/Yarn-number-of-containers-tp15148p15177.html
 To unsubscribe from Yarn number of containers, click here.
 NAML




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Yarn-number-of-containers-tp15148p15191.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Multiple Kafka Receivers and Union

2014-09-25 Thread Tim Smith
Good to know it worked out and thanks for the update. I didn't realize
you need to provision for receiver workers + processing workers. One
would think a worker would process multiple stages of an app/job and
receive is just a stage of the job.



On Thu, Sep 25, 2014 at 12:05 PM, Matt Narrell matt.narr...@gmail.com wrote:
 Additionally,

 If I dial up/down the number of executor cores, this does what I want.
 Thanks for the extra eyes!

 mn

 On Sep 25, 2014, at 12:34 PM, Matt Narrell matt.narr...@gmail.com wrote:

 Tim,

 I think I understand this now.  I had a five node Spark cluster and a five
 partition topic, and I created five receivers.  I found this:
 http://stackoverflow.com/questions/25785581/custom-receiver-stalls-worker-in-spark-streaming
 Indicating that if I use all my workers as receivers, there are none left to
 do the processing.  If I drop the number of partitions/receivers down while
 still having multiple unioned receivers, I see messages.

 mn

 On Sep 25, 2014, at 10:18 AM, Matt Narrell matt.narr...@gmail.com wrote:

 I suppose I have other problems as I can’t get the Scala example to work
 either.  Puzzling, as I have literally coded like the examples (that are
 purported to work), but no luck.

 mn

 On Sep 24, 2014, at 11:27 AM, Tim Smith secs...@gmail.com wrote:

 Maybe differences between JavaPairDStream and JavaPairReceiverInputDStream?

 On Wed, Sep 24, 2014 at 7:46 AM, Matt Narrell matt.narr...@gmail.com
 wrote:

 The part that works is the commented out, single receiver stream below the
 loop.  It seems that when I call KafkaUtils.createStream more than once, I
 don’t receive any messages.

 I’ll dig through the logs, but at first glance yesterday I didn’t see
 anything suspect.  I’ll have to look closer.

 mn

 On Sep 23, 2014, at 6:14 PM, Tim Smith secs...@gmail.com wrote:

 Maybe post the before-code as in what was the code before you did the
 loop (that worked)? I had similar situations where reviewing code
 before (worked) and after (does not work) helped. Also, what helped is
 the Scala REPL because I can see what are the object types being
 returned by each statement.

 Other than code, in the driver logs, you should see events that say
 Registered receiver for stream 0 from
 akka.tcp://sp...@node5.acme.net:53135

 Now, if you goto node5 and look at Spark or YarnContainer logs
 (depending on who's doing RM), you should be able to see if the
 receiver has any errors when trying to talk to kafka.



 On Tue, Sep 23, 2014 at 3:21 PM, Matt Narrell matt.narr...@gmail.com
 wrote:

 To my eyes, these are functionally equivalent.  I’ll try a Scala approach,
 but this may cause waves for me upstream (e.g., non-Java)

 Thanks for looking at this.  If anyone else can see a glaring issue in the
 Java approach that would be appreciated.

 Thanks,
 Matt

 On Sep 23, 2014, at 4:13 PM, Tim Smith secs...@gmail.com wrote:

 Sorry, I am almost Java illiterate but here's my Scala code to do the
 equivalent (that I have tested to work):

 val kInStreams = (1 to 10).map{_ =
 KafkaUtils.createStream(ssc,zkhost.acme.net:2182,myGrp,Map(myTopic
 - 1), StorageLevel.MEMORY_AND_DISK_SER) } //Create 10 receivers
 across the cluster, one for each partition, potentially but active
 receivers are only as many kafka partitions you have

 val kInMsg =
 ssc.union(kInStreams).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER)




 On Tue, Sep 23, 2014 at 2:20 PM, Matt Narrell matt.narr...@gmail.com
 wrote:

 So, this is scrubbed some for confidentiality, but the meat of it is as
 follows.  Note, that if I substitute the commented section for the loop, I
 receive messages from the topic.

 SparkConf sparkConf = new SparkConf();
 sparkConf.set(spark.streaming.unpersist, true);
 sparkConf.set(spark.logConf, true);

 MapString, String kafkaProps = new HashMap();
 kafkaProps.put(zookeeper.connect, Constants.ZK_ENSEMBLE + /kafka);
 kafkaProps.put(group.id, groupId);

 JavaStreamingContext jsc = new JavaStreamingContext(sparkConf,
 Seconds.apply(1));
 jsc.checkpoint(hdfs://some_location);

 ListJavaPairDStreamString, ProtobufModel streamList = new
 ArrayList(5);

 for (int i = 0; i  5; i++) {
 streamList.add(KafkaUtils.createStream(jsc,
String.class, ProtobufModel.class,
StringDecoder.class,
 ProtobufModelDecoder.class,
kafkaProps,
Collections.singletonMap(topic, 1),
StorageLevel.MEMORY_ONLY_SER()));
 }

 final JavaPairDStreamString, ProtobufModel stream =
 jsc.union(streamList.get(0), streamList.subList(1, streamList.size()));

 //  final JavaPairReceiverInputDStreamString, ProtobufModel stream =
 //  KafkaUtils.createStream(jsc,
 //  String.class,
 ProtobufModel.class,
 //  StringDecoder.class,
 

Re: Memory used in Spark-0.9.0-incubating

2014-09-25 Thread 王晓雨

Thanks Yi Tian!

Yes, I use fair scheduler.
In resource manager log. I see the container's start shell:
/home/export/Data/hadoop/tmp/nm-local-dir/usercache/hpc/appcache/application_1411693809133_0002/container_1411693809133_0002_01_02/launch_container.sh
In the end:
exec /bin/bash -c $JAVA_HOME/bin/java -server 
-XX:OnOutOfMemoryError='kill %p' *-Xms4096m -Xmx4096m* -verbose:gc 
-XX:+PrintGCDetails -XX:+PrintGCTimeStamps -Djava.io.tmpdir=$PWD/tmp 
org.apache.spark.executor.CoarseGrainedExecutorBackend 
akka.tcp://spark@node99:6177/user/CoarseGrainedScheduler 2 node99 4 1 
/home/export/Logs/yarn/application_1411693809133_0002/container_1411693809133_0002_01_02/stdout 
2 
/home/export/Logs/yarn/application_1411693809133_0002/container_1411693809133_0002_01_02/stderr


the container's maximum memory is *4096m*

And I see the source of ContainerImpl.java. The monitor print 5G is from:
long pmemBytes = *container.getResource().getMemory()* * 1024 * 1024L;

The print method when value=L then print 5G.
So only *container.getResource().getMemory()=**5120* can print 5G
I don't know where is the 1024K from!!!


在 2014年09月25日 21:43, Yi Tian 写道:
You should check the log of resource manager when you submit this job 
to yarn.


It will be recorded how many resources your spark application actually 
asked from resource manager for each container.


Did you use fair scheduler?

there is a config parameter of fair scheduler 
“yarn.scheduler.increment-allocation-mb”, default is 1024


it means if you ask 4097mb memory for a container, the resource 
manager will create a container which use 5120mb memory.


But I can’t figure out where 5GB come from.

Maybe there are some codes which mistake 1024 and 1000?

Best Regards,

Yi Tian
tianyi.asiai...@gmail.com mailto:tianyi.asiai...@gmail.com




On Sep 25, 2014, at 18:41, 王晓雨 wangxiao...@jd.com 
mailto:wangxiao...@jd.com wrote:



My yarn-site.xml config:
property
nameyarn.nodemanager.resource.memory-mb/name
value16384/value
/property


ENV:
Spark:0.9.0-incubating
Hadoop:2.3.0

I run spark task on Yarn. I see the log in Nodemanager:
2014-09-25 17:43:34,141 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: 
Memory usage of ProcessTree 549 for container-id 
container_1411635522254_0001_01_05: *4.5 GB**of 5 GB* physical 
memory used; 5.0 GB of 10.5 GB virtual memory used
2014-09-25 17:43:37,171 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: 
Memory usage of ProcessTree 549 for container-id 
container_1411635522254_0001_01_05: *4.5 GB**of 5 GB* physical 
memory used; 5.0 GB of 10.5 GB virtual memory used
2014-09-25 17:43:40,210 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: 
Memory usage of ProcessTree 549 for container-id 
container_1411635522254_0001_01_05: *4.5 GB**of 5 GB* physical 
memory used; 5.0 GB of 10.5 GB virtual memory used
2014-09-25 17:43:43,239 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: 
Memory usage of ProcessTree 549 for container-id 
container_1411635522254_0001_01_05: *4.5 GB of 5 GB* physical 
memory used; 5.0 GB of 10.5 GB virtual memory used
2014-09-25 17:43:46,269 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: 
Memory usage of ProcessTree 549 for container-id 
container_1411635522254_0001_01_05: *4.5 GB of 5 GB* physical 
memory used; 5.0 GB of 10.5 GB virtual memory used


My task parameter is :
--num-workers 4 --master-memory 2g --worker-memory 4g --worker-cores 4
In myopinion --worker-memory 4g 4g is the maximum memory for 
container .

But why 4.5 GB of 5 GB physical memory used in the log?
And where to config 5G maxinum memory for container?

--

WangXiaoyu
- 





flume spark streaming receiver host random

2014-09-25 Thread centerqi hu
Hi all
My code is as follows:

/usr/local/webserver/sparkhive/bin/spark-submit
--class org.apache.spark.examples.streaming.FlumeEventCount
--master yarn
--deploy-mode cluster
--queue  online
--num-executors 5
--driver-memory 6g
--executor-memory 20g
--executor-cores 5 target/scala-2.10/simple-project_2.10-1.0.jar
10.1.15.115 6

However, the receiver does not in the 10.1.15.115, but the random
choice of one slave host.

How to solve this problem?

Thanks


-- 
cente...@gmail.com|齐忠

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: how to run spark job on yarn with jni lib?

2014-09-25 Thread taqilabon
You're right, I'm suffering from SPARK-1719.
I've tried to add their location to /etc/ld.so.conf and I've submitted my
job as a yarn-client,
but the problem is the same: my native libraries are not loaded.
Does this method work in your case?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/how-to-run-spark-job-on-yarn-with-jni-lib-tp15146p15195.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Shuffle files

2014-09-25 Thread Andrew Ash
Hi SK,

For the problem with lots of shuffle files and the too many open files
exception there are a couple options:

1. The linux kernel has a limit on the number of open files at once.  This
is set with ulimit -n, and can be set permanently in /etc/sysctl.conf or
/etc/sysctl.d/.  Try increasing this to a large value, at the bare minimum
the square of your partition count.
2. Try using shuffle consolidation -- spark.shuffle.consolidateFiles=true This
option writes fewer files to disk so shouldn't hit limits nearly as much
3. Try using the sort-based shuffle by setting spark.shuffle.manager=SORT.
You should likely hold off on this until
https://issues.apache.org/jira/browse/SPARK-3032 is fixed, hopefully in
1.1.1

Hope that helps!
Andrew

On Thu, Sep 25, 2014 at 4:20 PM, SK skrishna...@gmail.com wrote:

 Hi,

 I am using Spark 1.1.0 on a cluster. My job takes as input 30 files in a
 directory (I am using  sc.textfile(dir/*) ) to read in the files.  I am
 getting the following warning:

 WARN TaskSetManager: Lost task 99.0 in stage 1.0 (TID 99,
 mesos12-dev.sccps.net): java.io.FileNotFoundException:
 /tmp/spark-local-20140925215712-0319/12/shuffle_0_99_93138 (Too many open
 files)

 basically I think a lot of shuffle files are being created.

 1) The tasks eventually fail and the job just hangs (after taking very
 long,
 more than an hour).  If I read these 30 files in a for loop, the same job
 completes in a few minutes. However, I need to specify the files names,
 which is not convenient. I am assuming that sc.textfile(dir/*) creates a
 large RDD for all the 30 files. Is there a way to make the operation on
 this
 large RDD efficient so as to avoid creating too many shuffle files?


 2) Also, I am finding that all the shuffle files for my other completed
 jobs
 are not being automatically deleted even after days. I thought that
 sc.stop() clears the intermediate files.  Is there some way to
 programmatically delete these temp shuffle files upon job completion?


 thanks





 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Shuffle-files-tp15185.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: SPARK UI - Details post job processiong

2014-09-25 Thread Andrew Ash
Matt you should be able to set an HDFS path so you'll get logs written to a
unified place instead of to local disk on a random box on the cluster.

On Thu, Sep 25, 2014 at 1:38 PM, Matt Narrell matt.narr...@gmail.com
wrote:

 How does this work with a cluster manager like YARN?

 mn

 On Sep 25, 2014, at 2:23 PM, Andrew Or and...@databricks.com wrote:

 Hi Harsha,

 You can turn on `spark.eventLog.enabled` as documented here:
 http://spark.apache.org/docs/latest/monitoring.html. Then, if you are
 running standalone mode, you can access the finished SparkUI through the
 Master UI. Otherwise, you can start a HistoryServer to display finished UIs.

 -Andrew

 2014-09-25 12:55 GMT-07:00 Harsha HN 99harsha.h@gmail.com:

 Hi,

 Details laid out in Spark UI for the job in progress is really
 interesting and very useful.
 But this gets vanished once the job is done.
 Is there a way to get job details post processing?

 Looking for Spark UI data, not standard input,output and error info.

 Thanks,
 Harsha






Re: Optimal Partition Strategy

2014-09-25 Thread Andrew Ash
Hi Vinay,

What I'm guessing is happening is that Spark is taking the locality of
files into account and you don't have node-local data on all your
machines.  This might be the case if you're reading out of HDFS and your
600 files are somehow skewed to only be on about 200 of your 400 machines.
A possible cause could be that your cluster doesn't have both Spark and an
HDFS data node on ever server.  It could also happen if you're loading data
into HDFS from just those 200 machines -- the HDFS block placement strategy
is to store one block locally and then further replicas of that block
elsewhere on the cluster.

If you want to force the data to be evenly distributed across your
executors you can run .repartition() right after the textFile() call.  This
will shuffle data across the network in a hash partitioned way before
continuing with the rest of your computation.

Hope that helps!
Andrew

On Thu, Sep 25, 2014 at 10:37 AM, Muttineni, Vinay vmuttin...@ebay.com
wrote:

  Hello,

 A bit of a background.

 I have a dataset with about 200 million records and around 10 columns. The
 size of this dataset is around 1.5Tb and is split into around 600 files.

 When I read this dataset, using sparkContext, by default it creates around
 3000 partitions if I do not specify the number of partitions in the
 textFile() command.

 Now I see that even though my spark application has around 400 executors
 assigned to it, the data is spread out only to about 200 of them. I am
 using .cache() method to hold my data in-memory.

 Each of these 200 executors, each with a total available memory of 6Gb,
 are now having multiple blocks and are thus using up their entire memory by
 caching the data.



 Even though I have about 400 machines, only about 200 of them are actually
 being used.

 Now, my question is:

 How do I partition my data so all 400 of the executors have some chunks of
 the data, thus better parallelizing my work?
 So, instead of only about 200 machines having about 6Gb of data each, I
 would like to have 400 machines with about 3Gb data each.





 Any idea on how I can set about achieving the above?

 Thanks,

 Vinay



Re: Working on LZOP Files

2014-09-25 Thread Andrew Ash
Hi Harsha,

I use LZOP files extensively on my Spark cluster -- see my writeup for how
to do this on this mailing list post:
http://mail-archives.apache.org/mod_mbox/spark-user/201312.mbox/%3CCAOoZ679ehwvT1g8=qHd2n11Z4EXOBJkP+q=Aj0qE_=shhyl...@mail.gmail.com%3E

Maybe we should better document how to use LZO with Spark because it can be
tricky to get the lzo jars, native libraries, and hadoopFile() calls all
set up correctly.

Andrew

On Thu, Sep 25, 2014 at 9:44 AM, Harsha HN 99harsha.h@gmail.com wrote:

 Hi,

 Anybody using LZOP files to process in Spark?

 We have a huge volume of LZOP files in HDFS to process through Spark. In
 MapReduce framework, it automatically detects the file format and sends the
 decompressed version to Mappers.
 Any such support in Spark?
 As of now I am manually downloading, decompressing it before processing.

 Thanks,
 Harsha



Re: quick start guide: building a standalone scala program

2014-09-25 Thread Andrew Ash
Hi Christy,

I'm more of a Gradle fan but I know SBT fits better into the Scala
ecosystem as a build tool.  If you'd like to give Gradle a shot try this
skeleton Gradle+Spark repo from my coworker Punya.

https://github.com/punya/spark-gradle-test-example

Good luck!
Andrew

On Thu, Sep 25, 2014 at 1:00 AM, christy 760948...@qq.com wrote:

 I encountered exactly the same problem. How did you solve this?

 Thanks



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/quick-start-guide-building-a-standalone-scala-program-tp3116p15125.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Log hdfs blocks sending

2014-09-25 Thread Andrew Ash
Hi Alexey,

You should see in the logs a locality measure like NODE_LOCAL,
PROCESS_LOCAL, ANY, etc.  If your Spark workers each have an HDFS data node
on them and you're reading out of HDFS, then you should be seeing almost
all NODE_LOCAL accesses.  One cause I've seen for mismatches is if Spark
uses short hostnames and Hadoop uses FQDNs -- in that case Spark doesn't
think the data is local and does remote reads which really kills
performance.

Hope that helps!
Andrew

On Thu, Sep 25, 2014 at 12:09 AM, Alexey Romanchuk 
alexey.romanc...@gmail.com wrote:

 Hello again spark users and developers!

 I have standalone spark cluster (1.1.0) and spark sql running on it. My
 cluster consists of 4 datanodes and replication factor of files is 3.

 I use thrift server to access spark sql and have 1 table with 30+
 partitions. When I run query on whole table (something simple like select
 count(*) from t) spark produces a lot of network activity filling all
 available 1gb link. Looks like spark sent data by network instead of local
 reading.

 Is it any way to log which blocks were accessed locally and which are not?

 Thanks!



Re: Spark Streaming: No parallelism in writing to database (MySQL)

2014-09-25 Thread maddenpj
Yup it's all in the gist:
https://gist.github.com/maddenpj/5032c76aeb330371a6e6

Lines 6-9 deal with setting up the driver specifically. This sets the driver
up on each partition which keeps the connection pool around per record.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-No-parallelism-in-writing-to-database-MySQL-tp15174p15202.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark-ec2 ERROR: Line magic function `%matplotlib` not found

2014-09-25 Thread Davies Liu
Maybe you have Python 2.7 on master but Python 2.6 in cluster,
you should upgrade python to 2.7 in cluster, or use python 2.6 in
master by set PYSPARK_PYTHON=python2.6

On Thu, Sep 25, 2014 at 5:11 PM, Andy Davidson
a...@santacruzintegration.com wrote:
 Hi

 I am running into trouble using iPython notebook on my cluster. Use the
 following command to set the cluster up

 $ ./spark-ec2 --key-pair=$KEY_PAIR --identity-file=$KEY_FILE
 --region=$REGION --slaves=$NUM_SLAVES launch $CLUSTER_NAME


 On master I launch python as follows

 $ IPYTHON_OPTS=notebook --pylab inline --no-browser --port=7000
 $SPARK_HOME/bin/pyspark


 It looks like the problem is the cluster is using an old version of python
 and python. Any idea how I can easily upgrade ? The following version works
 on my mac

 Thanks

 Andy

 {'commit_hash': '681fd77',
  'commit_source': 'installation',
  'default_encoding': 'UTF-8',
  'ipython_path': '/Library/Python/2.7/site-packages/IPython',
  'ipython_version': '2.1.0',
  'os_name': 'posix',
  'platform': 'Darwin-13.3.0-x86_64-i386-64bit',
  'sys_executable': '/usr/bin/python',
  'sys_platform': 'darwin',
  'sys_version': '2.7.5 (default, Mar  9 2014, 22:15:05) \n[GCC 4.2.1
 Compatible Apple LLVM 5.0 (clang-500.0.68)]’}





-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Parallel spark jobs on standalone cluster

2014-09-25 Thread Sarath Chandra
Hi All,

I have a java program which submits a spark job to a standalone spark
cluster (2 nodes; 10 cores (6+4); 12GB (8+4)). This is being called by
another java program through ExecutorService and invokes it multiple times
with different set of arguments and parameters. I have set spark memory
usage to 3GB (tried with 2GB and 1GB also)

When I run this for 4 parallel jobs - first job finishes successfully,
second and third jobs start executing in parallel and never complete and
fourth job is waiting in queue. I've tried using Fair Scheduler but
didn't notice any change in the behavior. Also in the spark job submission
program I'm calling SparkContext.stop at the end of execution. Some times
all jobs fail with status as Exited.

Please let me know what is going wrong and how to overcome the issue?

~Sarath


Spark Streaming: foreachRDD network output

2014-09-25 Thread Jesper Lundgren
Hello all,

I have some questions regarding the foreachRDD output function in Spark
Streaming.

The programming guide (
http://spark.apache.org/docs/1.1.0/streaming-programming-guide.html)
describes how to output data using network connection on the worker nodes.

Are there some working examples on how to do this properly? (Most of the
guide just describes what to not do, instead of what to do).

Any suggestions on what is the best way to write tests for such code? To
make sure that connection objects are used properly etc.

How to handle network or other problems on worker node? Can I throw an
exception to force spark to try again with that data on another node? As an
example: a program that writes data to an sql database using foreachRDD.
One worker node might have connection issues to the database, so it has to
let another node finish the output operation.

Thanks!

-- Jesper Lundgren


Re: YARN ResourceManager and Hadoop NameNode Web UI not visible in port 8088, port 50070

2014-09-25 Thread Raghuveer Chanda
The problem is solved, the web interfaces are not opening in local network
connecting to server with proxy its opening only in the servers without
proxy ..

On Thu, Sep 25, 2014 at 1:12 PM, Sandy Ryza sandy.r...@cloudera.com wrote:

 Hi Raghuveer,

 This might be a better question for the cdh-user list or the Hadoop user
 list.  The Hadoop web interfaces for both the NameNode and ResourceManager
 are enabled by default.  Is it possible you have a firewall blocking those
 ports?

 -Sandy

 On Wed, Sep 24, 2014 at 9:00 PM, Raghuveer Chanda 
 raghuveer.cha...@gmail.com wrote:

 Hi,

 Im running a spark job in YARN cluster ..but im not able to see the Web
 Interface of the YARN ResourceManager and Hadoop NameNode Web UI  in port
 8088, port 50070 and spark stages.

 Only the spark UI in port 18080 is visible.

 I got the URL's from cloudera but may be due to some default option for
 security the Web Interface is disabled.

 How can i enable the web interface i.e is there any option in cloudera or
 is the server firewall is blocking it .. Please help ..



 --
 Regards,
 Raghuveer Chanda
 4th year Undergraduate Student
 Computer Science and Engineering
 IIT Kharagpur





-- 
Regards,
Raghuveer Chanda
4th year Undergraduate Student
Computer Science and Engineering
IIT Kharagpur


Re:

2014-09-25 Thread Jianshi Huang
I built a patched DFSClient jar and now testing (takes 3 hours...)

I'd like to know if I can patch spark builds? How about just replace
DFSClient.class in spark-assembly jar?

Jianshi

On Fri, Sep 26, 2014 at 2:29 AM, Ted Yu yuzhih...@gmail.com wrote:

 I followed linked JIRAs to HDFS-7005 which is in hadoop 2.6.0

 Any chance of deploying 2.6.0-SNAPSHOT to see if the problem goes away ?

 On Wed, Sep 24, 2014 at 10:54 PM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

 Looks like it's a HDFS issue, pretty new.

 https://issues.apache.org/jira/browse/HDFS-6999

 Jianshi

 On Thu, Sep 25, 2014 at 12:10 AM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

 Hi Ted,

 See my previous reply to Debasish, all region servers are idle. I don't
 think it's caused by hotspotting.

 Besides, only 6 out of 3000 tasks were stuck, and their inputs are about
 only 80MB each.

 Jianshi

 On Wed, Sep 24, 2014 at 11:58 PM, Ted Yu yuzhih...@gmail.com wrote:

 I was thinking along the same line.

 Jianshi:
 See
 http://hbase.apache.org/book.html#d0e6369

 On Wed, Sep 24, 2014 at 8:56 AM, Debasish Das debasish.da...@gmail.com
  wrote:

 HBase regionserver needs to be balancedyou might have some
 skewness in row keys and one regionserver is under pressuretry finding
 that key and replicate it using random salt

 On Wed, Sep 24, 2014 at 8:51 AM, Jianshi Huang 
 jianshi.hu...@gmail.com wrote:

 Hi Ted,

 It converts RDD[Edge] to HBase rowkey and columns and insert them to
 HBase (in batch).

 BTW, I found batched Put actually faster than generating HFiles...


 Jianshi

 On Wed, Sep 24, 2014 at 11:49 PM, Ted Yu yuzhih...@gmail.com wrote:

 bq. at com.paypal.risk.rds.dragon.
 storage.hbase.HbaseRDDBatch$$anonfun$batchInsertEdges$3.
 apply(HbaseRDDBatch.scala:179)

 Can you reveal what HbaseRDDBatch.scala does ?

 Cheers

 On Wed, Sep 24, 2014 at 8:46 AM, Jianshi Huang 
 jianshi.hu...@gmail.com wrote:

 One of my big spark program always get stuck at 99% where a few
 tasks never finishes.

 I debugged it by printing out thread stacktraces, and found
 there're workers stuck at 
 parquet.hadoop.ParquetFileReader.readNextRowGroup.

 Anyone had similar problem? I'm using Spark 1.1.0 built for HDP2.1.
 The parquet files are generated by pig using latest parquet-pig-bundle
 v1.6.0rc1.

 From Spark 1.1.0's pom.xml, Spark is using parquet v1.4.3, will
 this be problematic?

 One of the weird behavior is that another program read and sort
 data read from the same parquet files and it works fine. The only
 difference seems the buggy program uses foreachPartition and the 
 working
 program uses map.

 Here's the full stacktrace:

 Executor task launch worker-3
java.lang.Thread.State: RUNNABLE
 at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
 at
 sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:257)
 at
 sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
 at
 sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87)
 at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98)
 at
 org.apache.hadoop.net.SocketIOWithTimeout$SelectorPool.select(SocketIOWithTimeout.java:335)
 at
 org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:157)
 at
 org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)
 at
 org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.readChannelFully(PacketReceiver.java:258)
 at
 org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doReadFully(PacketReceiver.java:209)
 at
 org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(PacketReceiver.java:171)
 at
 org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.receiveNextPacket(PacketReceiver.java:102)
 at
 org.apache.hadoop.hdfs.RemoteBlockReader2.readNextPacket(RemoteBlockReader2.java:173)
 at
 org.apache.hadoop.hdfs.RemoteBlockReader2.read(RemoteBlockReader2.java:138)
 at
 org.apache.hadoop.hdfs.DFSInputStream$ByteArrayStrategy.doRead(DFSInputStream.java:683)
 at
 org.apache.hadoop.hdfs.DFSInputStream.readBuffer(DFSInputStream.java:739)
 at
 org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:796)
 at
 org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:837)
 at
 java.io.DataInputStream.readFully(DataInputStream.java:195)
 at
 java.io.DataInputStream.readFully(DataInputStream.java:169)
 at
 parquet.hadoop.ParquetFileReader$ConsecutiveChunkList.readAll(ParquetFileReader.java:599)
 at
 parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:360)
 at
 parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:100)
 at
 parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:172)
 at