Spark streaming over a rest API

2015-05-18 Thread juandasgandaras
Hello, 

I would like to use spark streaming over a REST api to get information along
the time and with diferent parameters in the REST query.

I was thinking to use apache kafka but I don´t have any experience with this
and I would like to have some advice about this.

Thanks.

Best regards, 

Juan



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-over-a-rest-API-tp22936.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming and reducing latency

2015-05-18 Thread Akhil Das
we = Sigmoid

back-pressuring mechanism = Stoping the receiver from receiving more
messages when its about to exhaust the worker memory. Here's a similar
https://issues.apache.org/jira/browse/SPARK-7398 kind of proposal if you
haven't seen already.



Thanks
Best Regards

On Mon, May 18, 2015 at 6:53 PM, Evo Eftimov evo.efti...@isecc.com wrote:

 Who are “we” and what is the mysterious “back-pressuring mechanism” and is
 it part of the Spark Distribution (are you talking about implementation of
 the custom feedback loop mentioned in my previous emails below)- asking
 these because I can assure you that at least as of Spark Streaming 1.2.0,
 as Evo says Spark Streaming DOES crash in “unceremonious way” when the free
 RAM available for In Memory Cashed RDDs gets exhausted



 *From:* Akhil Das [mailto:ak...@sigmoidanalytics.com]
 *Sent:* Monday, May 18, 2015 2:03 PM
 *To:* Evo Eftimov
 *Cc:* Dmitry Goldenberg; user@spark.apache.org

 *Subject:* Re: Spark Streaming and reducing latency



 We fix the receivers rate at which it should consume at any given point of
 time. Also we have a back-pressuring mechanism attached to the receivers so
 it won't simply crashes in the unceremonious way like Evo said. Mesos has
 some sort of auto-scaling (read it somewhere), may be you can look into
 that also.


 Thanks

 Best Regards



 On Mon, May 18, 2015 at 5:20 PM, Evo Eftimov evo.efti...@isecc.com
 wrote:

 And if you want to genuinely “reduce the latency” (still within the
 boundaries of the micro-batch) THEN you need to design and finely tune the
 Parallel Programming / Execution Model of your application. The
 objective/metric here is:



 a)  Consume all data within your selected micro-batch window WITHOUT
 any artificial message rate limits

 b)  The above will result in a certain size of Dstream RDD per
 micro-batch.

 c)   The objective now is to Process that RDD WITHIN the time of the
 micro-batch (and also account for temporary message rate spike etc which
 may further increase the size of the RDD) – this will avoid any clogging up
 of the app and will process your messages at the lowest latency possible in
 a micro-batch architecture

 d)  You achieve the objective stated in c by designing, varying and
 experimenting with various aspects of the Spark Streaming Parallel
 Programming and Execution Model – e.g. number of receivers, number of
 threads per receiver, number of executors, number of cores, RAM allocated
 to executors, number of RDD partitions which correspond to the number of
 parallel threads operating on the RDD etc etc



 Re the “unceremonious removal of DStream RDDs” from RAM by Spark Streaming
 when the available RAM is exhausted due to high message rate and which
 crashes your (hence clogged up) application the name of the condition is:



 Loss was due to java.lang.Exception

 java.lang.Exception: *Could not compute split, block*
 *input-4-1410542878200 not found*



 *From:* Evo Eftimov [mailto:evo.efti...@isecc.com]
 *Sent:* Monday, May 18, 2015 12:13 PM
 *To:* 'Dmitry Goldenberg'; 'Akhil Das'
 *Cc:* 'user@spark.apache.org'
 *Subject:* RE: Spark Streaming and reducing latency



 You can use



 spark.streaming.receiver.maxRate

 not set

 Maximum rate (number of records per second) at which each receiver will
 receive data. Effectively, each stream will consume at most this number of
 records per second. Setting this configuration to 0 or a negative number
 will put no limit on the rate. See the deployment guide
 https://spark.apache.org/docs/latest/streaming-programming-guide.html#deploying-applications
 in the Spark Streaming programing guide for mode details.





 Another way is to implement a feedback loop in your receivers monitoring
 the performance metrics of your application/job and based on that adjusting
 automatically the receiving rate – BUT all these have nothing to do  with
 “reducing the latency” – they simply prevent your application/job from
 clogging up – the nastier effect of which is when S[ark Streaming starts
 removing In Memory RDDs from RAM before they are processed by the job –
 that works fine in Spark Batch (ie removing RDDs from RAM based on LRU) but
 in Spark Streaming when done in this “unceremonious way” it simply Crashes
 the application



 *From:* Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com
 dgoldenberg...@gmail.com]
 *Sent:* Monday, May 18, 2015 11:46 AM
 *To:* Akhil Das
 *Cc:* user@spark.apache.org
 *Subject:* Re: Spark Streaming and reducing latency



 Thanks, Akhil. So what do folks typically do to increase/contract the
 capacity? Do you plug in some cluster auto-scaling solution to make this
 elastic?



 Does Spark have any hooks for instrumenting auto-scaling?

 In other words, how do you avoid overwheling the receivers in a scenario
 when your system's input can be unpredictable, based on users' activity?


 On May 17, 2015, at 11:04 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 With receiver based streaming, you 

Re: Forbidded : Error Code: 403

2015-05-18 Thread Mohammad Tariq
Tried almost all the options, but it did not work. So, I ended up creating
a new IAM user and the keys of this user are working fine. I am not getting
Forbidden(403) exception now, but my program seems to be running
infinitely. It's not throwing any exception, but continues to run
continuously with following trace :

.
.
.
.
15/05/18 17:35:44 INFO HttpServer: Starting HTTP Server
15/05/18 17:35:44 INFO Server: jetty-8.y.z-SNAPSHOT
15/05/18 17:35:44 INFO AbstractConnector: Started
SocketConnector@0.0.0.0:60316
15/05/18 17:35:44 INFO Utils: Successfully started service 'HTTP file
server' on port 60316.
15/05/18 17:35:44 INFO SparkEnv: Registering OutputCommitCoordinator
15/05/18 17:35:44 INFO Server: jetty-8.y.z-SNAPSHOT
15/05/18 17:35:44 INFO AbstractConnector: Started
SelectChannelConnector@0.0.0.0:4040
15/05/18 17:35:44 INFO Utils: Successfully started service 'SparkUI' on
port 4040.
15/05/18 17:35:44 INFO SparkUI: Started SparkUI at http://172.28.210.74:4040
15/05/18 17:35:44 INFO Executor: Starting executor ID driver on host
localhost
15/05/18 17:35:44 INFO AkkaUtils: Connecting to HeartbeatReceiver:
akka.tcp://sparkDriver@172.28.210.74:60315/user/HeartbeatReceiver
15/05/18 17:35:44 INFO NettyBlockTransferService: Server created on 60317
15/05/18 17:35:44 INFO BlockManagerMaster: Trying to register BlockManager
15/05/18 17:35:44 INFO BlockManagerMasterActor: Registering block manager
localhost:60317 with 66.9 MB RAM, BlockManagerId(driver, localhost, 60317)
15/05/18 17:35:44 INFO BlockManagerMaster: Registered BlockManager
15/05/18 17:35:45 WARN AmazonHttpClient: Detected a possible problem with
the current JVM version (1.6.0_65).  If you experience XML parsing problems
using the SDK, try upgrading to a more recent JVM update.
15/05/18 17:35:47 INFO S3AFileSystem: Getting path status for
s3a://bucket-name/avro_data/episodes.avro (avro_data/episodes.avro)
15/05/18 17:35:47 INFO S3AFileSystem: Getting path status for
s3a://bucket-name/avro_data/episodes.avro (avro_data/episodes.avro)
15/05/18 17:35:47 INFO S3AFileSystem: Getting path status for
s3a://bucket-name/avro_data/episodes.avro (avro_data/episodes.avro)
15/05/18 17:35:48 INFO S3AFileSystem: Opening
's3a://bucket-name/avro_data/episodes.avro' for reading
15/05/18 17:35:48 INFO S3AFileSystem: Getting path status for
s3a://bucket-name/avro_data/episodes.avro (avro_data/episodes.avro)
15/05/18 17:35:48 INFO S3AFileSystem: Actually opening file
avro_data/episodes.avro at pos 0
15/05/18 17:35:48 INFO S3AFileSystem: Reopening avro_data/episodes.avro to
seek to new offset -4
15/05/18 17:35:48 INFO S3AFileSystem: Actually opening file
avro_data/episodes.avro at pos 0
15/05/18 17:35:50 INFO MemoryStore: ensureFreeSpace(230868) called with
curMem=0, maxMem=70177259
15/05/18 17:35:50 INFO MemoryStore: Block broadcast_0 stored as values in
memory (estimated size 225.5 KB, free 66.7 MB)
15/05/18 17:35:50 INFO MemoryStore: ensureFreeSpace(31491) called with
curMem=230868, maxMem=70177259
15/05/18 17:35:50 INFO MemoryStore: Block broadcast_0_piece0 stored as
bytes in memory (estimated size 30.8 KB, free 66.7 MB)
15/05/18 17:35:50 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory
on localhost:60317 (size: 30.8 KB, free: 66.9 MB)
15/05/18 17:35:50 INFO BlockManagerMaster: Updated info of block
broadcast_0_piece0
15/05/18 17:35:50 INFO SparkContext: Created broadcast 0 from hadoopFile at
AvroRelation.scala:82
15/05/18 17:35:50 INFO S3AFileSystem: Getting path status for
s3a://bucket-name/avro_data/episodes.avro (avro_data/episodes.avro)
15/05/18 17:35:50 INFO FileInputFormat: Total input paths to process : 1
15/05/18 17:35:50 INFO SparkContext: Starting job: runJob at
SparkPlan.scala:122
15/05/18 17:35:50 INFO DAGScheduler: Got job 0 (runJob at
SparkPlan.scala:122) with 1 output partitions (allowLocal=false)
15/05/18 17:35:50 INFO DAGScheduler: Final stage: Stage 0(runJob at
SparkPlan.scala:122)
15/05/18 17:35:50 INFO DAGScheduler: Parents of final stage: List()
15/05/18 17:35:50 INFO DAGScheduler: Missing parents: List()
15/05/18 17:35:50 INFO DAGScheduler: Submitting Stage 0
(MapPartitionsRDD[2] at map at SparkPlan.scala:97), which has no missing
parents
15/05/18 17:35:50 INFO MemoryStore: ensureFreeSpace(3448) called with
curMem=262359, maxMem=70177259
15/05/18 17:35:50 INFO MemoryStore: Block broadcast_1 stored as values in
memory (estimated size 3.4 KB, free 66.7 MB)
15/05/18 17:35:50 INFO MemoryStore: ensureFreeSpace(2386) called with
curMem=265807, maxMem=70177259
15/05/18 17:35:50 INFO MemoryStore: Block broadcast_1_piece0 stored as
bytes in memory (estimated size 2.3 KB, free 66.7 MB)
15/05/18 17:35:50 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory
on localhost:60317 (size: 2.3 KB, free: 66.9 MB)
15/05/18 17:35:50 INFO BlockManagerMaster: Updated info of block
broadcast_1_piece0
15/05/18 17:35:50 INFO SparkContext: Created broadcast 1 from broadcast at
DAGScheduler.scala:839
15/05/18 17:35:50 INFO DAGScheduler: Submitting 1 missing tasks 

Re: Processing multiple columns in parallel

2015-05-18 Thread ayan guha
My first thought would be creating 10 rdds and run your word count on each
of them..I think spark scheduler is going to resolve dependency in parallel
and launch 10 jobs.

Best
Ayan
On 18 May 2015 23:41, Laeeq Ahmed laeeqsp...@yahoo.com.invalid wrote:

 Hi,

 Consider I have a tab delimited text file with 10 columns. Each column is
 a a set of text. I would like to do a word count for each column. In scala,
 I would do the following RDD transformation and action:





 *val data = sc.textFile(hdfs://namenode/data.txt) for(i - 0 until 9){
  data.map(_.split(\t,-1)(i)).map((_,1)).reduce(_+_).saveAsTextFile(i) } *
 Within the for loop, it's a parallel process, but each column is
 sequentially processed from 0 to 9.

 Is there anyway so that I can process multiple column in parallel in
 Spark? I saw posting about using AKKA, but RDD itself is already using
 AKKA. Any pointers would be appreciated.


 Regards,
 Laeeq



pass configuration parameters to PySpark job

2015-05-18 Thread Oleg Ruchovets
Hi ,
   I am looking a way to pass configuration parameters to spark job.
In general I have quite simple PySpark job.

  def process_model(k, vc):
   
   do something
   


 sc = SparkContext(appName=TAD)
lines = sc.textFile(input_job_files)
result = lines.map(doSplit).groupByKey().map(lambda (k,vc):
process_model(k,vc))

Question:
In case I need to pass to process_model function additional metadata ,
parameters , etc ...

   I tried to do something like
   param = 'param1'
  result = lines.map(doSplit).groupByKey().map(lambda (param,k,vc):
process_model(param1,k,vc)) ,

but job stops to work , also it looks like not elegant solution.
Is there a way to have access to SparkContext from my custom functions?
I found that there are methods setLocalProperty/getLocalProperty   but I
didn't find example how to use it for my requirements (from my function).

It would be great to have short example how to pass parameters.

Thanks
Oleg.


parsedData option

2015-05-18 Thread Ricardo Goncalves da Silva
Hi Team,

My dataset has the following format:
CELLPHONE,KL_1,KL_2,KL_3,KL_4,KL_5
1120100114,-5.3244062521117e-003,-4.10825709805041e-003,-1.7816995027779e-002,-4.21462029980323e-003,-1.6200555039e-002

i.e., a reader in the first column and the data separated by comas. To load 
this data I’m using:

val data = sc.textFile(/data/disk1/cluster/fraude5.csv)

But now the problem: wath I need to do to choose KL_4 for clustering in the 
following line?

val parsedData = data.map(s = 
Vectors.dense(s.split(',').map(_.toDouble))).cache()

Thanks

Ricardo.




Este mensaje y sus adjuntos se dirigen exclusivamente a su destinatario, puede 
contener información privilegiada o confidencial y es para uso exclusivo de la 
persona o entidad de destino. Si no es usted. el destinatario indicado, queda 
notificado de que la lectura, utilización, divulgación y/o copia sin 
autorización puede estar prohibida en virtud de la legislación vigente. Si ha 
recibido este mensaje por error, le rogamos que nos lo comunique inmediatamente 
por esta misma vía y proceda a su destrucción.

The information contained in this transmission is privileged and confidential 
information intended only for the use of the individual or entity named above. 
If the reader of this message is not the intended recipient, you are hereby 
notified that any dissemination, distribution or copying of this communication 
is strictly prohibited. If you have received this transmission in error, do not 
read it. Please immediately reply to the sender that you have received this 
communication in error and then delete it.

Esta mensagem e seus anexos se dirigem exclusivamente ao seu destinatário, pode 
conter informação privilegiada ou confidencial e é para uso exclusivo da pessoa 
ou entidade de destino. Se não é vossa senhoria o destinatário indicado, fica 
notificado de que a leitura, utilização, divulgação e/ou cópia sem autorização 
pode estar proibida em virtude da legislação vigente. Se recebeu esta mensagem 
por erro, rogamos-lhe que nos o comunique imediatamente por esta mesma via e 
proceda a sua destruição


RE: Spark Streaming and reducing latency

2015-05-18 Thread Evo Eftimov
Who are “we” and what is the mysterious “back-pressuring mechanism” and is it 
part of the Spark Distribution (are you talking about implementation of the 
custom feedback loop mentioned in my previous emails below)- asking these 
because I can assure you that at least as of Spark Streaming 1.2.0, as Evo says 
Spark Streaming DOES crash in “unceremonious way” when the free RAM available 
for In Memory Cashed RDDs gets exhausted 

 

From: Akhil Das [mailto:ak...@sigmoidanalytics.com] 
Sent: Monday, May 18, 2015 2:03 PM
To: Evo Eftimov
Cc: Dmitry Goldenberg; user@spark.apache.org
Subject: Re: Spark Streaming and reducing latency

 

We fix the receivers rate at which it should consume at any given point of 
time. Also we have a back-pressuring mechanism attached to the receivers so it 
won't simply crashes in the unceremonious way like Evo said. Mesos has some 
sort of auto-scaling (read it somewhere), may be you can look into that also.




Thanks

Best Regards

 

On Mon, May 18, 2015 at 5:20 PM, Evo Eftimov evo.efti...@isecc.com wrote:

And if you want to genuinely “reduce the latency” (still within the boundaries 
of the micro-batch) THEN you need to design and finely tune the Parallel 
Programming / Execution Model of your application. The objective/metric here is:

 

a)  Consume all data within your selected micro-batch window WITHOUT any 
artificial message rate limits

b)  The above will result in a certain size of Dstream RDD per micro-batch. 

c)   The objective now is to Process that RDD WITHIN the time of the 
micro-batch (and also account for temporary message rate spike etc which may 
further increase the size of the RDD) – this will avoid any clogging up of the 
app and will process your messages at the lowest latency possible in a 
micro-batch architecture 

d)  You achieve the objective stated in c by designing, varying and 
experimenting with various aspects of the Spark Streaming Parallel Programming 
and Execution Model – e.g. number of receivers, number of threads per receiver, 
number of executors, number of cores, RAM allocated to executors, number of RDD 
partitions which correspond to the number of parallel threads operating on the 
RDD etc etc  

 

Re the “unceremonious removal of DStream RDDs” from RAM by Spark Streaming when 
the available RAM is exhausted due to high message rate and which crashes your 
(hence clogged up) application the name of the condition is:

 

Loss was due to java.lang.Exception   

java.lang.Exception: Could not compute split, block
input-4-1410542878200 not found

 

From: Evo Eftimov [mailto:evo.efti...@isecc.com] 
Sent: Monday, May 18, 2015 12:13 PM
To: 'Dmitry Goldenberg'; 'Akhil Das'
Cc: 'user@spark.apache.org'
Subject: RE: Spark Streaming and reducing latency

 

You can use

 


spark.streaming.receiver.maxRate

not set

Maximum rate (number of records per second) at which each receiver will receive 
data. Effectively, each stream will consume at most this number of records per 
second. Setting this configuration to 0 or a negative number will put no limit 
on the rate. See the deployment guide 
https://spark.apache.org/docs/latest/streaming-programming-guide.html#deploying-applications
  in the Spark Streaming programing guide for mode details.

 

 

Another way is to implement a feedback loop in your receivers monitoring the 
performance metrics of your application/job and based on that adjusting 
automatically the receiving rate – BUT all these have nothing to do  with 
“reducing the latency” – they simply prevent your application/job from clogging 
up – the nastier effect of which is when S[ark Streaming starts removing In 
Memory RDDs from RAM before they are processed by the job – that works fine in 
Spark Batch (ie removing RDDs from RAM based on LRU) but in Spark Streaming 
when done in this “unceremonious way” it simply Crashes the application

 

From: Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com] 
Sent: Monday, May 18, 2015 11:46 AM
To: Akhil Das
Cc: user@spark.apache.org
Subject: Re: Spark Streaming and reducing latency

 

Thanks, Akhil. So what do folks typically do to increase/contract the capacity? 
Do you plug in some cluster auto-scaling solution to make this elastic?

 

Does Spark have any hooks for instrumenting auto-scaling?

In other words, how do you avoid overwheling the receivers in a scenario when 
your system's input can be unpredictable, based on users' activity?


On May 17, 2015, at 11:04 AM, Akhil Das ak...@sigmoidanalytics.com wrote:

With receiver based streaming, you can actually specify 
spark.streaming.blockInterval which is the interval at which the receiver will 
fetch data from the source. Default value is 200ms and hence if your batch 
duration is 1 second, it will produce 5 blocks of data. And yes, with 
sparkstreaming when your processing time goes beyond your batch duration and 
you are having a higher data consumption then you will overwhelm the receiver's 
memory and hence 

Working with slides. How do I know how many times a RDD has been processed?

2015-05-18 Thread Guillermo Ortiz
Hi,

I have two streaming RDD1 and RDD2 and want to cogroup them.
Data don't come in the same time and sometimes they could come with some
delay.
When I get all data I want to insert in MongoDB.

For example, imagine that I get:
RDD1 -- T 0
RDD2 --T 0.5
I do cogroup between them but I couldn't store in Mongo yet because it
could come more data in the next windows/slide.
RDD2' --T 1.5
Another RDD2' comes, I only want to save in Mongo once. So, I should only
save it when I get all data. What I know it's how long I should wait as
much.

Ideally, I would like to save in MongoDB in the last slide for each RDD
when I know that there is not possible to get more RDD2 to join with RDD1.
Is it possible? how?

Maybe there is other way to resolve this problem, any idea?


Re: Spark and Flink

2015-05-18 Thread Robert Metzger
Hi,
I would really recommend you to put your Flink and Spark dependencies into
different maven modules.
Having them both in the same project will be very hard, if not impossible.
Both projects depend on similar projects with slightly different versions.

I would suggest a maven module structure like this:
yourproject-parent (a pom module)
-- yourproject-common
-- yourproject-flink
-- yourproject-spark



On Mon, May 18, 2015 at 10:00 AM, Pa Rö paul.roewer1...@googlemail.com
wrote:

 hi,
 if i add your dependency i get over 100 errors, now i change the version
 number:
 dependencies
 dependency
 groupIdcom.fasterxml.jackson.module/groupId
 artifactIdjackson-module-scala_2.10/artifactId
 version2.4.4/version
 exclusions
 exclusion
 groupIdcom.google.guava/groupId
 artifactIdguava/artifactId
 /exclusion
 /exclusions
 /dependency

 now the pom is fine, but i get the same error by run spark:
 WARN component.AbstractLifeCycle: FAILED
 org.eclipse.jetty.servlet.DefaultServlet-608411067:
 java.lang.NoSuchMethodError:
 org.eclipse.jetty.server.ResourceCache.init(Lorg/eclipse/jetty/http/MimeTypes;)V

 java.lang.NoSuchMethodError:
 org.eclipse.jetty.server.ResourceCache.init(Lorg/eclipse/jetty/http/MimeTypes;)V
 at
 org.eclipse.jetty.servlet.NIOResourceCache.init(NIOResourceCache.java:41)
 at
 org.eclipse.jetty.servlet.DefaultServlet.init(DefaultServlet.java:223)
 at javax.servlet.GenericServlet.init(GenericServlet.java:244)
 at
 org.eclipse.jetty.servlet.ServletHolder.initServlet(ServletHolder.java:442)
 at
 org.eclipse.jetty.servlet.ServletHolder.doStart(ServletHolder.java:270)
 at
 org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64)
 at
 org.eclipse.jetty.servlet.ServletHandler.initialize(ServletHandler.java:721)
 at
 org.eclipse.jetty.servlet.ServletContextHandler.startContext(ServletContextHandler.java:279)
 at
 org.eclipse.jetty.server.handler.ContextHandler.doStart(ContextHandler.java:717)
 at
 org.eclipse.jetty.servlet.ServletContextHandler.doStart(ServletContextHandler.java:155)
 at
 org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64)
 at
 org.eclipse.jetty.server.handler.HandlerCollection.doStart(HandlerCollection.java:229)
 at
 org.eclipse.jetty.server.handler.ContextHandlerCollection.doStart(ContextHandlerCollection.java:172)
 at
 org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64)
 at
 org.eclipse.jetty.server.handler.HandlerWrapper.doStart(HandlerWrapper.java:95)
 at org.eclipse.jetty.server.Server.doStart(Server.java:282)
 at
 org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64)
 at
 org.apache.spark.ui.JettyUtils$.org$apache$spark$ui$JettyUtils$$connect$1(JettyUtils.scala:199)
 at
 org.apache.spark.ui.JettyUtils$$anonfun$4.apply(JettyUtils.scala:209)
 at
 org.apache.spark.ui.JettyUtils$$anonfun$4.apply(JettyUtils.scala:209)
 at
 org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1454)
 at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
 at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1450)
 at
 org.apache.spark.ui.JettyUtils$.startJettyServer(JettyUtils.scala:209)
 at org.apache.spark.ui.WebUI.bind(WebUI.scala:102)
 at org.apache.spark.SparkContext.init(SparkContext.scala:224)
 at
 org.apache.spark.api.java.JavaSparkContext.init(JavaSparkContext.scala:53)
 at mgm.tp.bigdata.tempGeoKmeans.Spark.SparkMain.main(SparkMain.java:37)
 ...

 what i do wrong?

 best regards
 paul

 2015-05-13 15:43 GMT+02:00 Ted Yu yuzhih...@gmail.com:

 You can use exclusion to remove the undesired jetty version.
 Here is syntax:
   dependency
 groupIdcom.fasterxml.jackson.module/groupId
 artifactIdjackson-module-scala_2.10/artifactId
 version${fasterxml.jackson.version}/version
 exclusions
   exclusion
 groupIdcom.google.guava/groupId
 artifactIdguava/artifactId
   /exclusion
 /exclusions
   /dependency

 On Wed, May 13, 2015 at 6:41 AM, Paul Röwer 
 paul.roewer1...@googlemail.com wrote:

 Okay. And how i get it clean in my maven project?


 Am 13. Mai 2015 15:15:34 MESZ, schrieb Ted Yu yuzhih...@gmail.com:

 You can run the following command:
 mvn dependency:tree

 And see what jetty versions are brought in.

 Cheers



 On May 13, 2015, at 6:07 AM, Pa Rö paul.roewer1...@googlemail.com
 wrote:

 hi,

 i use spark and flink in the same maven project,

 now i get a exception on working with spark, flink work well

 the problem are transitiv dependencies.

 maybe somebody know a solution, or versions, which work together.

 best regards
 paul

 ps: a cloudera maven repo flink would be desirable

 my pom:

 project 

Re: Spark Streaming and reducing latency

2015-05-18 Thread Akhil Das
We fix the receivers rate at which it should consume at any given point of
time. Also we have a back-pressuring mechanism attached to the receivers so
it won't simply crashes in the unceremonious way like Evo said. Mesos has
some sort of auto-scaling (read it somewhere), may be you can look into
that also.

Thanks
Best Regards

On Mon, May 18, 2015 at 5:20 PM, Evo Eftimov evo.efti...@isecc.com wrote:

 And if you want to genuinely “reduce the latency” (still within the
 boundaries of the micro-batch) THEN you need to design and finely tune the
 Parallel Programming / Execution Model of your application. The
 objective/metric here is:



 a)  Consume all data within your selected micro-batch window WITHOUT
 any artificial message rate limits

 b)  The above will result in a certain size of Dstream RDD per
 micro-batch.

 c)   The objective now is to Process that RDD WITHIN the time of the
 micro-batch (and also account for temporary message rate spike etc which
 may further increase the size of the RDD) – this will avoid any clogging up
 of the app and will process your messages at the lowest latency possible in
 a micro-batch architecture

 d)  You achieve the objective stated in c by designing, varying and
 experimenting with various aspects of the Spark Streaming Parallel
 Programming and Execution Model – e.g. number of receivers, number of
 threads per receiver, number of executors, number of cores, RAM allocated
 to executors, number of RDD partitions which correspond to the number of
 parallel threads operating on the RDD etc etc



 Re the “unceremonious removal of DStream RDDs” from RAM by Spark Streaming
 when the available RAM is exhausted due to high message rate and which
 crashes your (hence clogged up) application the name of the condition is:



 Loss was due to java.lang.Exception

 java.lang.Exception: *Could not compute split, block*
 *input-4-1410542878200 not found*



 *From:* Evo Eftimov [mailto:evo.efti...@isecc.com]
 *Sent:* Monday, May 18, 2015 12:13 PM
 *To:* 'Dmitry Goldenberg'; 'Akhil Das'
 *Cc:* 'user@spark.apache.org'
 *Subject:* RE: Spark Streaming and reducing latency



 You can use



 spark.streaming.receiver.maxRate

 not set

 Maximum rate (number of records per second) at which each receiver will
 receive data. Effectively, each stream will consume at most this number of
 records per second. Setting this configuration to 0 or a negative number
 will put no limit on the rate. See the deployment guide
 https://spark.apache.org/docs/latest/streaming-programming-guide.html#deploying-applications
 in the Spark Streaming programing guide for mode details.





 Another way is to implement a feedback loop in your receivers monitoring
 the performance metrics of your application/job and based on that adjusting
 automatically the receiving rate – BUT all these have nothing to do  with
 “reducing the latency” – they simply prevent your application/job from
 clogging up – the nastier effect of which is when S[ark Streaming starts
 removing In Memory RDDs from RAM before they are processed by the job –
 that works fine in Spark Batch (ie removing RDDs from RAM based on LRU) but
 in Spark Streaming when done in this “unceremonious way” it simply Crashes
 the application



 *From:* Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com
 dgoldenberg...@gmail.com]
 *Sent:* Monday, May 18, 2015 11:46 AM
 *To:* Akhil Das
 *Cc:* user@spark.apache.org
 *Subject:* Re: Spark Streaming and reducing latency



 Thanks, Akhil. So what do folks typically do to increase/contract the
 capacity? Do you plug in some cluster auto-scaling solution to make this
 elastic?



 Does Spark have any hooks for instrumenting auto-scaling?

 In other words, how do you avoid overwheling the receivers in a scenario
 when your system's input can be unpredictable, based on users' activity?


 On May 17, 2015, at 11:04 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 With receiver based streaming, you can actually
 specify spark.streaming.blockInterval which is the interval at which the
 receiver will fetch data from the source. Default value is 200ms and hence
 if your batch duration is 1 second, it will produce 5 blocks of data. And
 yes, with sparkstreaming when your processing time goes beyond your batch
 duration and you are having a higher data consumption then you will
 overwhelm the receiver's memory and hence will throw up block not found
 exceptions.


 Thanks

 Best Regards



 On Sun, May 17, 2015 at 7:21 PM, dgoldenberg dgoldenberg...@gmail.com
 wrote:

 I keep hearing the argument that the way Discretized Streams work with
 Spark
 Streaming is a lot more of a batch processing algorithm than true
 streaming.
 For streaming, one would expect a new item, e.g. in a Kafka topic, to be
 available to the streaming consumer immediately.

 With the discretized streams, streaming is done with batch intervals i.e.
 the consumer has to wait the interval to be able to get at the new items.
 If
 

RE: Spark Streaming and reducing latency

2015-05-18 Thread Evo Eftimov
Ooow – that is essentially the custom feedback loop mentioned in my previous 
emails in generic Architecture Terms and what you have done is only one of the 
possible implementations moreover based on Zookeeper – there are other possible 
designs not using things like zookeeper at all and hence achieving much lower 
latency and responsiveness 

 

Can I also give you a friendly advice – there is a long way FROM 
“we=Sigmoid and our custom sigmoid solution”, TO your earlier statement that 
Spark Streaming does “NOT” crash UNCEREMNOUSLY – please maintain responsible 
and objective communication and facts 

 

From: Akhil Das [mailto:ak...@sigmoidanalytics.com] 
Sent: Monday, May 18, 2015 2:28 PM
To: Evo Eftimov
Cc: Dmitry Goldenberg; user@spark.apache.org
Subject: Re: Spark Streaming and reducing latency

 

we = Sigmoid

 

back-pressuring mechanism = Stoping the receiver from receiving more messages 
when its about to exhaust the worker memory. Here's a similar 
https://issues.apache.org/jira/browse/SPARK-7398  kind of proposal if you 
haven't seen already.

 

 




Thanks

Best Regards

 

On Mon, May 18, 2015 at 6:53 PM, Evo Eftimov evo.efti...@isecc.com wrote:

Who are “we” and what is the mysterious “back-pressuring mechanism” and is it 
part of the Spark Distribution (are you talking about implementation of the 
custom feedback loop mentioned in my previous emails below)- asking these 
because I can assure you that at least as of Spark Streaming 1.2.0, as Evo says 
Spark Streaming DOES crash in “unceremonious way” when the free RAM available 
for In Memory Cashed RDDs gets exhausted 

 

From: Akhil Das [mailto:ak...@sigmoidanalytics.com] 
Sent: Monday, May 18, 2015 2:03 PM
To: Evo Eftimov
Cc: Dmitry Goldenberg; user@spark.apache.org


Subject: Re: Spark Streaming and reducing latency

 

We fix the receivers rate at which it should consume at any given point of 
time. Also we have a back-pressuring mechanism attached to the receivers so it 
won't simply crashes in the unceremonious way like Evo said. Mesos has some 
sort of auto-scaling (read it somewhere), may be you can look into that also.




Thanks

Best Regards

 

On Mon, May 18, 2015 at 5:20 PM, Evo Eftimov evo.efti...@isecc.com wrote:

And if you want to genuinely “reduce the latency” (still within the boundaries 
of the micro-batch) THEN you need to design and finely tune the Parallel 
Programming / Execution Model of your application. The objective/metric here is:

 

a)  Consume all data within your selected micro-batch window WITHOUT any 
artificial message rate limits

b)  The above will result in a certain size of Dstream RDD per micro-batch. 

c)   The objective now is to Process that RDD WITHIN the time of the 
micro-batch (and also account for temporary message rate spike etc which may 
further increase the size of the RDD) – this will avoid any clogging up of the 
app and will process your messages at the lowest latency possible in a 
micro-batch architecture 

d)  You achieve the objective stated in c by designing, varying and 
experimenting with various aspects of the Spark Streaming Parallel Programming 
and Execution Model – e.g. number of receivers, number of threads per receiver, 
number of executors, number of cores, RAM allocated to executors, number of RDD 
partitions which correspond to the number of parallel threads operating on the 
RDD etc etc  

 

Re the “unceremonious removal of DStream RDDs” from RAM by Spark Streaming when 
the available RAM is exhausted due to high message rate and which crashes your 
(hence clogged up) application the name of the condition is:

 

Loss was due to java.lang.Exception   

java.lang.Exception: Could not compute split, block
input-4-1410542878200 not found

 

From: Evo Eftimov [mailto:evo.efti...@isecc.com] 
Sent: Monday, May 18, 2015 12:13 PM
To: 'Dmitry Goldenberg'; 'Akhil Das'
Cc: 'user@spark.apache.org'
Subject: RE: Spark Streaming and reducing latency

 

You can use

 


spark.streaming.receiver.maxRate

not set

Maximum rate (number of records per second) at which each receiver will receive 
data. Effectively, each stream will consume at most this number of records per 
second. Setting this configuration to 0 or a negative number will put no limit 
on the rate. See the deployment guide 
https://spark.apache.org/docs/latest/streaming-programming-guide.html#deploying-applications
  in the Spark Streaming programing guide for mode details.

 

 

Another way is to implement a feedback loop in your receivers monitoring the 
performance metrics of your application/job and based on that adjusting 
automatically the receiving rate – BUT all these have nothing to do  with 
“reducing the latency” – they simply prevent your application/job from clogging 
up – the nastier effect of which is when S[ark Streaming starts removing In 
Memory RDDs from RAM before they are processed by the job – that works fine in 
Spark Batch (ie removing RDDs from RAM based on LRU) 

Re: How to run multiple jobs in one sparkcontext from separate threads in pyspark?

2015-05-18 Thread MEETHU MATHEW
Hi Akhil, The python wrapper for Spark Job Server did not help me. I actually 
need the pyspark code sample  which shows how  I can call a function from 2 
threads and execute it simultaneously. Thanks  Regards,
Meethu M 


 On Thursday, 14 May 2015 12:38 PM, Akhil Das ak...@sigmoidanalytics.com 
wrote:
   

 Did you happened to have a look at the spark job server? Someone wrote a 
python wrapper around it, give it a try.
ThanksBest Regards
On Thu, May 14, 2015 at 11:10 AM, MEETHU MATHEW meethu2...@yahoo.co.in wrote:

Hi all,
 Quote Inside a given Spark application (SparkContext instance), multiple 
parallel jobs can run simultaneously if they were submitted from separate 
threads.  
How to run multiple jobs in one SPARKCONTEXT using separate threads in pyspark? 
I found some examples in scala and java, but couldn't find python code. Can 
anyone help me with a pyspark example? 
Thanks  Regards,
Meethu M



  

Re: How to run multiple jobs in one sparkcontext from separate threads in pyspark?

2015-05-18 Thread ayan guha
Hi

So to be clear, do you want to run one operation in multiple threads within
a function or you want run multiple jobs using multiple threads? I am
wondering why python thread module can't be used? Or you have already gave
it a try?
On 18 May 2015 16:39, MEETHU MATHEW meethu2...@yahoo.co.in wrote:

 Hi Akhil,

 The python wrapper for Spark Job Server did not help me. I actually need
 the pyspark code sample  which shows how  I can call a function from 2
 threads and execute it simultaneously.

 Thanks  Regards,
 Meethu M



   On Thursday, 14 May 2015 12:38 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:


 Did you happened to have a look at the spark job server?
 https://github.com/ooyala/spark-jobserver Someone wrote a python wrapper
 https://github.com/wangqiang8511/spark_job_manager around it, give it a
 try.

 Thanks
 Best Regards

 On Thu, May 14, 2015 at 11:10 AM, MEETHU MATHEW meethu2...@yahoo.co.in
 wrote:

 Hi all,

  Quote
  Inside a given Spark application (SparkContext instance), multiple
 parallel jobs can run simultaneously if they were submitted from separate
 threads. 

 How to run multiple jobs in one SPARKCONTEXT using separate threads in
 pyspark? I found some examples in scala and java, but couldn't find python
 code. Can anyone help me with a* pyspark example*?

 Thanks  Regards,
 Meethu M







How to debug spark in IntelliJ Idea

2015-05-18 Thread Yi.Zhang
Hi all,

Currently, I wrote some code lines to access spark master which was deployed
on standalone style. I wanted to set the breakpoint for spark master which
was running on the different process. I am wondering maybe I need attach
process in IntelliJ, so that when AppClient sent the message to remote
actor(spark master), the breakpoint would be enabled. 

I don't know how to debug it in IntelliJ Idea. I need help. Thanks.

Regards,
Yi



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-debug-spark-in-IntelliJ-Idea-tp22932.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



py-files (and others?) not properly set up in cluster-mode Spark Yarn job?

2015-05-18 Thread Shay Rojansky
I'm having issues with submitting a Spark Yarn job in cluster mode when the
cluster filesystem is file:///. It seems that additional resources
(--py-files) are simply being skipped and not being added into the
PYTHONPATH. The same issue may also exist for --jars, --files, etc.

We use a simple NFS mount on all our nodes instead of HDFS. The problem is
that when I submit a job that has files (via --py-files), these don't get
copied across to the application's staging directory, nor do they get added
to the PYTHONPATH. On startup, I can clearly see the message Source and
destination file systems are the same. Not copying, which is a result of
the check here:
https://github.com/apache/spark/blob/master/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala#L221

The compareFs function simply looks whether the scheme, host and port are
the same, and if so (my case), simply skips the copy. While that in itself
isn't a problem, the PYTHONPATH isn't updated either.


Re: number of executors

2015-05-18 Thread edward cui
Oh BTW, it's spark 1.3.1 on hadoop 2.4. AIM 3.6.

Sorry for lefting out this information.

Appreciate for any help!

Ed

2015-05-18 12:53 GMT-04:00 edward cui edwardcu...@gmail.com:

 I actually have the same problem, but I am not sure whether it is a spark
 problem or a Yarn problem.

 I set up a five nodes cluster on aws emr, start yarn daemon on the master
 (The node manager will not be started on default on the master, I don't
 want to waste any resource since I have to pay). And submit the spark task
 through yarn-cluster mode. The command is:
 ./spark/bin/spark-submit --master yearn-cluster --num-executors 5
 --exectutor-cores 4 --propertifies-file spark-application.conf myapp.py

 But the yarn resource manager only created 4 containers on 4 nodes, and
 one node was completely on idle.

 More details about the setup:
 EMR node:
 m3.xlarge: 16g ram 4 cores 40g ssd (HDFS on EBS?)

 Yarn-site.xml:
 yarn.scheduler.maximum-allocation-mb=11520
 yarn.nodemanager.resource.memory-mb=11520

 Spark-conf:

 spark.executor.memory 10g

 spark.storage.memoryFraction  0.2

 spark.python.worker.memory1500mspark.akka.frameSize   
  200spark.shuffle.memoryFraction0.1

 spark.driver.memory 10g


 Hadoop behavior observed:
 Create 4 containers on four nodes including emr master but one emr slave
 on idle (memory consumption around 2g and 0% cpu occupation)
 Spark use one container for driver on emr slave node (make sense since I
 required that much of memory)
 Use the other three node for computing the tasks.


 If yarn can't use all the nodes and I have to pay for the node, it's just a 
 big waste : p


 Any thoughts on this?


 Great thanks,

 Ed



 2015-05-18 12:07 GMT-04:00 Sandy Ryza sandy.r...@cloudera.com:

 *All

 On Mon, May 18, 2015 at 9:07 AM, Sandy Ryza sandy.r...@cloudera.com
 wrote:

 Hi Xiaohe,

 The all Spark options must go before the jar or they won't take effect.

 -Sandy

 On Sun, May 17, 2015 at 8:59 AM, xiaohe lan zombiexco...@gmail.com
 wrote:

 Sorry, them both are assigned task actually.

 Aggregated Metrics by Executor
 Executor IDAddressTask TimeTotal TasksFailed TasksSucceeded TasksInput
 Size / RecordsShuffle Write Size / RecordsShuffle Spill (Memory)Shuffle
 Spill (Disk)1host1:61841.7 min505640.0 MB / 12318400382.3 MB / 
 121007701630.4
 MB295.4 MB2host2:620721.7 min505640.0 MB / 12014510386.0 MB / 
 109269121646.6
 MB304.8 MB

 On Sun, May 17, 2015 at 11:50 PM, xiaohe lan zombiexco...@gmail.com
 wrote:

 bash-4.1$ ps aux | grep SparkSubmit
 xilan 1704 13.2  1.2 5275520 380244 pts/0  Sl+  08:39   0:13
 /scratch/xilan/jdk1.8.0_45/bin/java -cp
 /scratch/xilan/spark/conf:/scratch/xilan/spark/lib/spark-assembly-1.3.1-hadoop2.4.0.jar:/scratch/xilan/spark/lib/datanucleus-core-3.2.10.jar:/scratch/xilan/spark/lib/datanucleus-api-jdo-3.2.6.jar:/scratch/xilan/spark/lib/datanucleus-rdbms-3.2.9.jar:/scratch/xilan/hadoop/etc/hadoop
 -Xms512m -Xmx512m org.apache.spark.deploy.SparkSubmit --master yarn
 target/scala-2.10/simple-project_2.10-1.0.jar --class scala.SimpleApp
 --num-executors 5 --executor-cores 4
 xilan 1949  0.0  0.0 103292   800 pts/1S+   08:40   0:00 grep
 --color SparkSubmit


 When look at the sparkui, I see the following:
 Aggregated Metrics by ExecutorExecutor IDAddressTask TimeTotal TasksFailed
 TasksSucceeded TasksShuffle Read Size / Records1host1:304836 s101127.1
 MB / 28089782host2:49970 ms00063.4 MB / 1810945

 So executor 2 is not even assigned a task ? Maybe I have some problems
 in my setting, but I don't know what could be the possible settings I set
 wrong or have not set.


 Thanks,
 Xiaohe

 On Sun, May 17, 2015 at 11:16 PM, Akhil Das 
 ak...@sigmoidanalytics.com wrote:

 Did you try --executor-cores param? While you submit the job, do a ps
 aux | grep spark-submit and see the exact command parameters.

 Thanks
 Best Regards

 On Sat, May 16, 2015 at 12:31 PM, xiaohe lan zombiexco...@gmail.com
 wrote:

 Hi,

 I have a 5 nodes yarn cluster, I used spark-submit to submit a
 simple app.

  spark-submit --master yarn
 target/scala-2.10/simple-project_2.10-1.0.jar --class scala.SimpleApp
 --num-executors 5

 I have set the number of executor to 5, but from sparkui I could see
 only two executors and it ran very slow. What did I miss ?

 Thanks,
 Xiaohe










Re: Reading Real Time Data only from Kafka

2015-05-18 Thread Akhil Das
I have played a bit with the directStream kafka api. Good work cody. These
are my findings and also can you clarify a few things for me (see below).

- When auto.offset.reset- smallest and you have 60GB of messages in
Kafka, it takes forever as it reads the whole 60GB at once. largest will
only read the latest messages.
- To avoid this, you can actually limit the rate with
spark.streaming.kafka.maxRatePerPartition, which is pretty stable (Always
reads the same amount of data).
- Number of partitions per batch = number of kafka partitions.

- In the case of driver failures, offset reset being set to smallest
will replay the whole messages and largest will only read those messages
which are pushed after the streaming job has started. What happens to those
messages which arrive in between?

*Few things which are unclear:*

- If we have a kafka topic with 9 partitions, and spark cluster with 3
slaves, how does it decides which slave should read from which partition?
And what happens if a single slave fails while reading the data?

- By default it doesn't push the offsets of messages which are read
anywhere, then how does it replay the message in case of failures?

Thanks
Best Regards

On Wed, May 13, 2015 at 8:32 PM, Cody Koeninger c...@koeninger.org wrote:

 You linked to a google mail tab, not a public archive, so I don't know
 exactly which conversation you're referring to.

 As far as I know, streaming only runs a single job at a time in the order
 they were defined, unless you turn on an experimental option for more
 parallelism (TD or someone more knowledgeable can chime in on this).  If
 you're talking about the possibility of the next job starting before the
 prior one has fully finished, because your processing is lagging behind...
 I'm not 100% sure this is possible because I've never observed it.

 The thing is, it's a moot point, because if you're saving offsets yourself
 transactionally, you already need to be verifying that offsets are correct
 (increasing without gaps) in order to handle restarts correctly.

 If you're super concerned about how batches get generated, the direct api
 gives you access to KafkaUtils.createRDD... just schedule your own rdds in
 the order you want.  Again, flexible.




 On Wed, May 13, 2015 at 9:36 AM, Dibyendu Bhattacharya 
 dibyendu.bhattach...@gmail.com wrote:

 Thanks Cody for your email. I think my concern was not to get the
 ordering of message within a partition , which as you said is possible if
 one knows how Spark works. The issue is how Spark schedule jobs on every
 batch  which is not on the same order they generated. So if that is not
 guaranteed it does not matter if you manege order within your partition. So
 depends on par-partition ordering to commit offset may leads to offsets
 commit in wrong order.

 In this thread you have discussed this as well and some workaround  :


 https://mail.google.com/mail/u/1/?tab=wm#search/rdd+order+guarantees/14b9f1eaf0b8bd15

 So again , one need to understand every details of a Consumer to take a
 decision if that solves their use case.

 Regards,
 Dibyendu

 On Wed, May 13, 2015 at 7:35 PM, Cody Koeninger c...@koeninger.org
 wrote:

 As far as I can tell, Dibyendu's cons boil down to:

 1. Spark checkpoints can't be recovered if you upgrade code
 2. Some Spark transformations involve a shuffle, which can repartition
 data

 It's not accurate to imply that either one of those things are
 inherently cons of the direct stream api.

 Regarding checkpoints, nothing about the direct stream requires you to
 use checkpoints.  You can save offsets in a checkpoint, your own database,
 or not save offsets at all (as James wants).  One might even say that the
 direct stream api is . . . flexible . . . in that regard.

 Regarding partitions, the direct stream api gives you the same ordering
 guarantee as Kafka, namely that within a given partition messages will be
 in increasing offset order.   Clearly if you do a transformation that
 repartitions the stream, that no longer holds.  Thing is, that doesn't
 matter if you're saving offsets and results for each rdd in the driver.
 The offset ranges for the original rdd don't change as a result of the
 transformation you executed, they're immutable.

 Sure, you can get into trouble if you're trying to save offsets /
 results per partition on the executors, after a shuffle of some kind. You
 can avoid this pretty easily by just using normal scala code to do your
 transformation on the iterator inside a foreachPartition.  Again, this
 isn't a con of the direct stream api, this is just a need to understand
 how Spark works.



 On Tue, May 12, 2015 at 10:30 PM, Dibyendu Bhattacharya 
 dibyendu.bhattach...@gmail.com wrote:

 The low level consumer which Akhil mentioned , has been running in
 Pearson for last 4-5 months without any downtime. I think this one is the
 reliable Receiver Based Kafka consumer as of today for Spark .. if you
 say it that way ..

 Prior to Spark 1.3 other 

org.apache.spark.shuffle.FetchFailedException :: Migration from Spark 1.2 to 1.3

2015-05-18 Thread zia_kayani
Hi, I'm getting this exception after shifting my code from Spark 1.2 to Spark
1.3

15/05/18 18:22:39 WARN TaskSetManager: Lost task 0.0 in stage 1.6 (TID 84,
cloud8-server): FetchFailed(BlockManagerId(1, cloud4-server, 7337),
shuffleId=0, mapId=9, reduceId=1, message=
org.apache.spark.shuffle.FetchFailedException: java.lang.RuntimeException:
Failed to open file:
/tmp/spark-fff63849-a318-4e48-bdea-2f563076ad5d/spark-40ba3a41-0f4d-446e-b806-e788e210d394/spark-a3d61f7a-22e9-4b3b-9346-ff3b70d0e43d/blockmgr-0e3b2b5d-f677-4e91-b98b-ed913adbd15f/39/shuffle_0_9_0.index
at
org.apache.spark.network.shuffle.ExternalShuffleBlockManager.getSortBasedShuffleBlockData(ExternalShuffleBlockManager.java:202)
at
org.apache.spark.network.shuffle.ExternalShuffleBlockManager.getBlockData(ExternalShuffleBlockManager.java:112)
at
org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.receive(ExternalShuffleBlockHandler.java:74)
at
org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:124)
at
org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:97)
at
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:91)
at
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44)
at
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
at
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
at
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.FileNotFoundException:
/tmp/spark-fff63849-a318-4e48-bdea-2f563076ad5d/spark-40ba3a41-0f4d-446e-b806-e788e210d394/spark-a3d61f7a-22e9-4b3b-9346-ff3b70d0e43d/blockmgr-0e3b2b5d-f677-4e91-b98b-ed913adbd15f/39/shuffle_0_9_0.index
(Permission denied)
at java.io.FileInputStream.open(Native Method)
at java.io.FileInputStream.init(FileInputStream.java:146)
at
org.apache.spark.network.shuffle.ExternalShuffleBlockManager.getSortBasedShuffleBlockData(ExternalShuffleBlockManager.java:191)
... 23 more



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/org-apache-spark-shuffle-FetchFailedException-Migration-from-Spark-1-2-to-1-3-tp22937.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark groupByKey, does it always create at least 1 partition per key?

2015-05-18 Thread tomboyle
I am currently using spark streaming. During my batch processing I must
groupByKey. Afterwards I call foreachRDD  foreachPartition  write to an
external datastore.

My only concern with this is if it's future proof? I know groupByKey by
default uses the hashPartitioner. I have printed out the internals of
partitions and loaded large text files into memory and ran groupByKey just
to make sure.

I have two questions.
#1 First will my implementation ever break in the future? Will partitions 
groupByKey work differently?
#2 Is it possible for a (key,values) to exist on more than 1 partition after
using groupByKey.

Notes: I'm aware groupByKey, is not very efficient. However I am not working
with large amounts of data  can process batches very quickly. Below I could
have used aggregateByKey because I printed the sum, however my real
implementation is much different and I do need each value for each key I can
not reduce the data.

1 Million line test log file
Partition HashCode: 965943941 Key:lol Size:2346
Partition HashCode: 1605678983 Key:ee Size:4692
Partition HashCode: 1605678983 Key:aa Size:32844
Partition HashCode: 1605678983 Key:gg Size:4692
Partition HashCode: 1605678983 Key:dd Size:11730
Partition HashCode: 1605678983 Key:hh Size:4692
Partition HashCode: 1605678983 Key:kk Size:2346
Partition HashCode: 1605678983 Key:tt Size:4692
Partition HashCode: 1605678983 Key:ff Size:2346
Partition HashCode: 1605678983 Key:bb Size:18768
Partition HashCode: 1605678983 Key:cc Size:14076




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-groupByKey-does-it-always-create-at-least-1-partition-per-key-tp22938.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: py-files (and others?) not properly set up in cluster-mode Spark Yarn job?

2015-05-18 Thread Marcelo Vanzin
Hi Shay,

Yeah, that seems to be a bug; it doesn't seem to be related to the default
FS nor compareFs either - I can reproduce this with HDFS when copying files
from the local fs too. In yarn-client mode things seem to work.

Could you file a bug to track this? If you don't have a jira account I can
do that for you.


On Mon, May 18, 2015 at 9:38 AM, Shay Rojansky r...@roji.org wrote:

 I'm having issues with submitting a Spark Yarn job in cluster mode when
 the cluster filesystem is file:///. It seems that additional resources
 (--py-files) are simply being skipped and not being added into the
 PYTHONPATH. The same issue may also exist for --jars, --files, etc.

 We use a simple NFS mount on all our nodes instead of HDFS. The problem is
 that when I submit a job that has files (via --py-files), these don't get
 copied across to the application's staging directory, nor do they get added
 to the PYTHONPATH. On startup, I can clearly see the message Source and
 destination file systems are the same. Not copying, which is a result of
 the check here:
 https://github.com/apache/spark/blob/master/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala#L221

 The compareFs function simply looks whether the scheme, host and port are
 the same, and if so (my case), simply skips the copy. While that in itself
 isn't a problem, the PYTHONPATH isn't updated either.




-- 
Marcelo


RE: Spark Streaming and reducing latency

2015-05-18 Thread Akhil Das
Thanks for the heads up mate.
On 18 May 2015 19:08, Evo Eftimov evo.efti...@isecc.com wrote:

 Ooow – that is essentially the custom feedback loop mentioned in my
 previous emails in generic Architecture Terms and what you have done is
 only one of the possible implementations moreover based on Zookeeper –
 there are other possible designs not using things like zookeeper at all and
 hence achieving much lower latency and responsiveness



 Can I also give you a friendly advice – there is a long way FROM
 “we=Sigmoid and our custom sigmoid solution”, TO your earlier statement
 that Spark Streaming does “NOT” crash UNCEREMNOUSLY – please maintain
 responsible and objective communication and facts



 *From:* Akhil Das [mailto:ak...@sigmoidanalytics.com]
 *Sent:* Monday, May 18, 2015 2:28 PM
 *To:* Evo Eftimov
 *Cc:* Dmitry Goldenberg; user@spark.apache.org
 *Subject:* Re: Spark Streaming and reducing latency



 we = Sigmoid



 back-pressuring mechanism = Stoping the receiver from receiving more
 messages when its about to exhaust the worker memory. Here's a similar
 https://issues.apache.org/jira/browse/SPARK-7398 kind of proposal if
 you haven't seen already.






 Thanks

 Best Regards



 On Mon, May 18, 2015 at 6:53 PM, Evo Eftimov evo.efti...@isecc.com
 wrote:

 Who are “we” and what is the mysterious “back-pressuring mechanism” and is
 it part of the Spark Distribution (are you talking about implementation of
 the custom feedback loop mentioned in my previous emails below)- asking
 these because I can assure you that at least as of Spark Streaming 1.2.0,
 as Evo says Spark Streaming DOES crash in “unceremonious way” when the free
 RAM available for In Memory Cashed RDDs gets exhausted



 *From:* Akhil Das [mailto:ak...@sigmoidanalytics.com]
 *Sent:* Monday, May 18, 2015 2:03 PM
 *To:* Evo Eftimov
 *Cc:* Dmitry Goldenberg; user@spark.apache.org


 *Subject:* Re: Spark Streaming and reducing latency



 We fix the receivers rate at which it should consume at any given point of
 time. Also we have a back-pressuring mechanism attached to the receivers so
 it won't simply crashes in the unceremonious way like Evo said. Mesos has
 some sort of auto-scaling (read it somewhere), may be you can look into
 that also.


 Thanks

 Best Regards



 On Mon, May 18, 2015 at 5:20 PM, Evo Eftimov evo.efti...@isecc.com
 wrote:

 And if you want to genuinely “reduce the latency” (still within the
 boundaries of the micro-batch) THEN you need to design and finely tune the
 Parallel Programming / Execution Model of your application. The
 objective/metric here is:



 a)  Consume all data within your selected micro-batch window WITHOUT
 any artificial message rate limits

 b)  The above will result in a certain size of Dstream RDD per
 micro-batch.

 c)   The objective now is to Process that RDD WITHIN the time of the
 micro-batch (and also account for temporary message rate spike etc which
 may further increase the size of the RDD) – this will avoid any clogging up
 of the app and will process your messages at the lowest latency possible in
 a micro-batch architecture

 d)  You achieve the objective stated in c by designing, varying and
 experimenting with various aspects of the Spark Streaming Parallel
 Programming and Execution Model – e.g. number of receivers, number of
 threads per receiver, number of executors, number of cores, RAM allocated
 to executors, number of RDD partitions which correspond to the number of
 parallel threads operating on the RDD etc etc



 Re the “unceremonious removal of DStream RDDs” from RAM by Spark Streaming
 when the available RAM is exhausted due to high message rate and which
 crashes your (hence clogged up) application the name of the condition is:



 Loss was due to java.lang.Exception

 java.lang.Exception: *Could not compute split, block*
 *input-4-1410542878200 not found*



 *From:* Evo Eftimov [mailto:evo.efti...@isecc.com]
 *Sent:* Monday, May 18, 2015 12:13 PM
 *To:* 'Dmitry Goldenberg'; 'Akhil Das'
 *Cc:* 'user@spark.apache.org'
 *Subject:* RE: Spark Streaming and reducing latency



 You can use



 spark.streaming.receiver.maxRate

 not set

 Maximum rate (number of records per second) at which each receiver will
 receive data. Effectively, each stream will consume at most this number of
 records per second. Setting this configuration to 0 or a negative number
 will put no limit on the rate. See the deployment guide
 https://spark.apache.org/docs/latest/streaming-programming-guide.html#deploying-applications
 in the Spark Streaming programing guide for mode details.





 Another way is to implement a feedback loop in your receivers monitoring
 the performance metrics of your application/job and based on that adjusting
 automatically the receiving rate – BUT all these have nothing to do  with
 “reducing the latency” – they simply prevent your application/job from
 clogging up – the nastier effect of which is when S[ark Streaming starts
 removing In 

RE: Processing multiple columns in parallel

2015-05-18 Thread Needham, Guy
How about making the range in the for loop parallelised? The driver will then 
kick off the word counts independently.

Regards,
Guy Needham | Data Discovery
Virgin Media   | Technology and Transformation | Data
Bartley Wood Business Park, Hook, Hampshire RG27 9UP
D 01256 75 3362
I welcome VSRE emails. Learn more at http://vsre.info/
From: ayan guha [mailto:guha.a...@gmail.com]
Sent: 18 May 2015 15:46
To: Laeeq Ahmed
Cc: user@spark.apache.org
Subject: Re: Processing multiple columns in parallel


My first thought would be creating 10 rdds and run your word count on each of 
them..I think spark scheduler is going to resolve dependency in parallel and 
launch 10 jobs.

Best
Ayan
On 18 May 2015 23:41, Laeeq Ahmed 
laeeqsp...@yahoo.com.invalidmailto:laeeqsp...@yahoo.com.invalid wrote:
Hi,

Consider I have a tab delimited text file with 10 columns. Each column is a a 
set of text. I would like to do a word count for each column. In scala, I would 
do the following RDD transformation and action:

val data = sc.textFile(hdfs://namenode/data.txt)
for(i - 0 until 9){
   data.map(_.split(\t,-1)(i)).map((_,1)).reduce(_+_).saveAsTextFile(i)
}

Within the for loop, it's a parallel process, but each column is sequentially 
processed from 0 to 9.

Is there anyway so that I can process multiple column in parallel in Spark? I 
saw posting about using AKKA, but RDD itself is already using AKKA. Any 
pointers would be appreciated.


Regards,
Laeeq


Save Paper - Do you really need to print this e-mail?

Visit www.virginmedia.com for more information, and more fun.

This email and any attachments are or may be confidential and legally privileged
and are sent solely for the attention of the addressee(s). If you have received 
this
email in error, please delete it from your system: its use, disclosure or 
copying is
unauthorised. Statements and opinions expressed in this email may not represent
those of Virgin Media. Any representations or commitments in this email are
subject to contract. 

Registered office: Media House, Bartley Wood Business Park, Hook, Hampshire, 
RG27 9UP
Registered in England and Wales with number 2591237


Re: number of executors

2015-05-18 Thread Sandy Ryza
*All

On Mon, May 18, 2015 at 9:07 AM, Sandy Ryza sandy.r...@cloudera.com wrote:

 Hi Xiaohe,

 The all Spark options must go before the jar or they won't take effect.

 -Sandy

 On Sun, May 17, 2015 at 8:59 AM, xiaohe lan zombiexco...@gmail.com
 wrote:

 Sorry, them both are assigned task actually.

 Aggregated Metrics by Executor
 Executor IDAddressTask TimeTotal TasksFailed TasksSucceeded TasksInput
 Size / RecordsShuffle Write Size / RecordsShuffle Spill (Memory)Shuffle
 Spill (Disk)1host1:61841.7 min505640.0 MB / 12318400382.3 MB / 121007701630.4
 MB295.4 MB2host2:620721.7 min505640.0 MB / 12014510386.0 MB / 109269121646.6
 MB304.8 MB

 On Sun, May 17, 2015 at 11:50 PM, xiaohe lan zombiexco...@gmail.com
 wrote:

 bash-4.1$ ps aux | grep SparkSubmit
 xilan 1704 13.2  1.2 5275520 380244 pts/0  Sl+  08:39   0:13
 /scratch/xilan/jdk1.8.0_45/bin/java -cp
 /scratch/xilan/spark/conf:/scratch/xilan/spark/lib/spark-assembly-1.3.1-hadoop2.4.0.jar:/scratch/xilan/spark/lib/datanucleus-core-3.2.10.jar:/scratch/xilan/spark/lib/datanucleus-api-jdo-3.2.6.jar:/scratch/xilan/spark/lib/datanucleus-rdbms-3.2.9.jar:/scratch/xilan/hadoop/etc/hadoop
 -Xms512m -Xmx512m org.apache.spark.deploy.SparkSubmit --master yarn
 target/scala-2.10/simple-project_2.10-1.0.jar --class scala.SimpleApp
 --num-executors 5 --executor-cores 4
 xilan 1949  0.0  0.0 103292   800 pts/1S+   08:40   0:00 grep
 --color SparkSubmit


 When look at the sparkui, I see the following:
 Aggregated Metrics by ExecutorExecutor IDAddressTask TimeTotal TasksFailed
 TasksSucceeded TasksShuffle Read Size / Records1host1:304836 s101127.1
 MB / 28089782host2:49970 ms00063.4 MB / 1810945

 So executor 2 is not even assigned a task ? Maybe I have some problems
 in my setting, but I don't know what could be the possible settings I set
 wrong or have not set.


 Thanks,
 Xiaohe

 On Sun, May 17, 2015 at 11:16 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Did you try --executor-cores param? While you submit the job, do a ps
 aux | grep spark-submit and see the exact command parameters.

 Thanks
 Best Regards

 On Sat, May 16, 2015 at 12:31 PM, xiaohe lan zombiexco...@gmail.com
 wrote:

 Hi,

 I have a 5 nodes yarn cluster, I used spark-submit to submit a simple
 app.

  spark-submit --master yarn
 target/scala-2.10/simple-project_2.10-1.0.jar --class scala.SimpleApp
 --num-executors 5

 I have set the number of executor to 5, but from sparkui I could see
 only two executors and it ran very slow. What did I miss ?

 Thanks,
 Xiaohe








Re: number of executors

2015-05-18 Thread Sandy Ryza
Hi Xiaohe,

The all Spark options must go before the jar or they won't take effect.

-Sandy

On Sun, May 17, 2015 at 8:59 AM, xiaohe lan zombiexco...@gmail.com wrote:

 Sorry, them both are assigned task actually.

 Aggregated Metrics by Executor
 Executor IDAddressTask TimeTotal TasksFailed TasksSucceeded TasksInput
 Size / RecordsShuffle Write Size / RecordsShuffle Spill (Memory)Shuffle
 Spill (Disk)1host1:61841.7 min505640.0 MB / 12318400382.3 MB / 121007701630.4
 MB295.4 MB2host2:620721.7 min505640.0 MB / 12014510386.0 MB / 109269121646.6
 MB304.8 MB

 On Sun, May 17, 2015 at 11:50 PM, xiaohe lan zombiexco...@gmail.com
 wrote:

 bash-4.1$ ps aux | grep SparkSubmit
 xilan 1704 13.2  1.2 5275520 380244 pts/0  Sl+  08:39   0:13
 /scratch/xilan/jdk1.8.0_45/bin/java -cp
 /scratch/xilan/spark/conf:/scratch/xilan/spark/lib/spark-assembly-1.3.1-hadoop2.4.0.jar:/scratch/xilan/spark/lib/datanucleus-core-3.2.10.jar:/scratch/xilan/spark/lib/datanucleus-api-jdo-3.2.6.jar:/scratch/xilan/spark/lib/datanucleus-rdbms-3.2.9.jar:/scratch/xilan/hadoop/etc/hadoop
 -Xms512m -Xmx512m org.apache.spark.deploy.SparkSubmit --master yarn
 target/scala-2.10/simple-project_2.10-1.0.jar --class scala.SimpleApp
 --num-executors 5 --executor-cores 4
 xilan 1949  0.0  0.0 103292   800 pts/1S+   08:40   0:00 grep
 --color SparkSubmit


 When look at the sparkui, I see the following:
 Aggregated Metrics by ExecutorExecutor IDAddressTask TimeTotal TasksFailed
 TasksSucceeded TasksShuffle Read Size / Records1host1:304836 s101127.1
 MB / 28089782host2:49970 ms00063.4 MB / 1810945

 So executor 2 is not even assigned a task ? Maybe I have some problems in
 my setting, but I don't know what could be the possible settings I set
 wrong or have not set.


 Thanks,
 Xiaohe

 On Sun, May 17, 2015 at 11:16 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Did you try --executor-cores param? While you submit the job, do a ps
 aux | grep spark-submit and see the exact command parameters.

 Thanks
 Best Regards

 On Sat, May 16, 2015 at 12:31 PM, xiaohe lan zombiexco...@gmail.com
 wrote:

 Hi,

 I have a 5 nodes yarn cluster, I used spark-submit to submit a simple
 app.

  spark-submit --master yarn
 target/scala-2.10/simple-project_2.10-1.0.jar --class scala.SimpleApp
 --num-executors 5

 I have set the number of executor to 5, but from sparkui I could see
 only two executors and it ran very slow. What did I miss ?

 Thanks,
 Xiaohe







Re: number of executors

2015-05-18 Thread edward cui
I actually have the same problem, but I am not sure whether it is a spark
problem or a Yarn problem.

I set up a five nodes cluster on aws emr, start yarn daemon on the master
(The node manager will not be started on default on the master, I don't
want to waste any resource since I have to pay). And submit the spark task
through yarn-cluster mode. The command is:
./spark/bin/spark-submit --master yearn-cluster --num-executors 5
--exectutor-cores 4 --propertifies-file spark-application.conf myapp.py

But the yarn resource manager only created 4 containers on 4 nodes, and one
node was completely on idle.

More details about the setup:
EMR node:
m3.xlarge: 16g ram 4 cores 40g ssd (HDFS on EBS?)

Yarn-site.xml:
yarn.scheduler.maximum-allocation-mb=11520
yarn.nodemanager.resource.memory-mb=11520

Spark-conf:

spark.executor.memory   10g

spark.storage.memoryFraction0.2

spark.python.worker.memory  1500mspark.akka.frameSize
 200spark.shuffle.memoryFraction0.1

spark.driver.memory 10g


Hadoop behavior observed:
Create 4 containers on four nodes including emr master but one emr slave on
idle (memory consumption around 2g and 0% cpu occupation)
Spark use one container for driver on emr slave node (make sense since I
required that much of memory)
Use the other three node for computing the tasks.


If yarn can't use all the nodes and I have to pay for the node, it's
just a big waste : p


Any thoughts on this?


Great thanks,

Ed



2015-05-18 12:07 GMT-04:00 Sandy Ryza sandy.r...@cloudera.com:

 *All

 On Mon, May 18, 2015 at 9:07 AM, Sandy Ryza sandy.r...@cloudera.com
 wrote:

 Hi Xiaohe,

 The all Spark options must go before the jar or they won't take effect.

 -Sandy

 On Sun, May 17, 2015 at 8:59 AM, xiaohe lan zombiexco...@gmail.com
 wrote:

 Sorry, them both are assigned task actually.

 Aggregated Metrics by Executor
 Executor IDAddressTask TimeTotal TasksFailed TasksSucceeded TasksInput
 Size / RecordsShuffle Write Size / RecordsShuffle Spill (Memory)Shuffle
 Spill (Disk)1host1:61841.7 min505640.0 MB / 12318400382.3 MB / 
 121007701630.4
 MB295.4 MB2host2:620721.7 min505640.0 MB / 12014510386.0 MB / 109269121646.6
 MB304.8 MB

 On Sun, May 17, 2015 at 11:50 PM, xiaohe lan zombiexco...@gmail.com
 wrote:

 bash-4.1$ ps aux | grep SparkSubmit
 xilan 1704 13.2  1.2 5275520 380244 pts/0  Sl+  08:39   0:13
 /scratch/xilan/jdk1.8.0_45/bin/java -cp
 /scratch/xilan/spark/conf:/scratch/xilan/spark/lib/spark-assembly-1.3.1-hadoop2.4.0.jar:/scratch/xilan/spark/lib/datanucleus-core-3.2.10.jar:/scratch/xilan/spark/lib/datanucleus-api-jdo-3.2.6.jar:/scratch/xilan/spark/lib/datanucleus-rdbms-3.2.9.jar:/scratch/xilan/hadoop/etc/hadoop
 -Xms512m -Xmx512m org.apache.spark.deploy.SparkSubmit --master yarn
 target/scala-2.10/simple-project_2.10-1.0.jar --class scala.SimpleApp
 --num-executors 5 --executor-cores 4
 xilan 1949  0.0  0.0 103292   800 pts/1S+   08:40   0:00 grep
 --color SparkSubmit


 When look at the sparkui, I see the following:
 Aggregated Metrics by ExecutorExecutor IDAddressTask TimeTotal TasksFailed
 TasksSucceeded TasksShuffle Read Size / Records1host1:304836 s101127.1
 MB / 28089782host2:49970 ms00063.4 MB / 1810945

 So executor 2 is not even assigned a task ? Maybe I have some problems
 in my setting, but I don't know what could be the possible settings I set
 wrong or have not set.


 Thanks,
 Xiaohe

 On Sun, May 17, 2015 at 11:16 PM, Akhil Das ak...@sigmoidanalytics.com
  wrote:

 Did you try --executor-cores param? While you submit the job, do a ps
 aux | grep spark-submit and see the exact command parameters.

 Thanks
 Best Regards

 On Sat, May 16, 2015 at 12:31 PM, xiaohe lan zombiexco...@gmail.com
 wrote:

 Hi,

 I have a 5 nodes yarn cluster, I used spark-submit to submit a simple
 app.

  spark-submit --master yarn
 target/scala-2.10/simple-project_2.10-1.0.jar --class scala.SimpleApp
 --num-executors 5

 I have set the number of executor to 5, but from sparkui I could see
 only two executors and it ran very slow. What did I miss ?

 Thanks,
 Xiaohe









Re: Spark streaming over a rest API

2015-05-18 Thread Akhil Das
Why not use sparkstreaming to do the computation and dump the result
somewhere in a DB perhaps and take it from there?

Thanks
Best Regards

On Mon, May 18, 2015 at 7:51 PM, juandasgandaras juandasganda...@gmail.com
wrote:

 Hello,

 I would like to use spark streaming over a REST api to get information
 along
 the time and with diferent parameters in the REST query.

 I was thinking to use apache kafka but I don´t have any experience with
 this
 and I would like to have some advice about this.

 Thanks.

 Best regards,

 Juan



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-over-a-rest-API-tp22936.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




RE: Spark Streaming and reducing latency

2015-05-18 Thread Evo Eftimov
My pleasure young man, i will even go beynd the so called heads up and send 
you a solution design for Feedback Loop preventing spark streaming app clogging 
and resource depletion and featuring machine learning based self-tunning AND 
which is not zookeeper based and hence offers lower latency

Ps: ultimately though remember that none of this stuff is part of spark 
streming as of yet


Sent from Samsung Mobile

div Original message /divdivFrom: Akhil Das 
ak...@sigmoidanalytics.com /divdivDate:2015/05/18  16:56  (GMT+00:00) 
/divdivTo: Evo Eftimov evo.efti...@isecc.com /divdivCc: 
user@spark.apache.org /divdivSubject: RE: Spark Streaming and reducing 
latency /divdiv
/divThanks for the heads up mate.

On 18 May 2015 19:08, Evo Eftimov evo.efti...@isecc.com wrote:
Ooow – that is essentially the custom feedback loop mentioned in my previous 
emails in generic Architecture Terms and what you have done is only one of the 
possible implementations moreover based on Zookeeper – there are other possible 
designs not using things like zookeeper at all and hence achieving much lower 
latency and responsiveness

 

Can I also give you a friendly advice – there is a long way FROM 
“we=Sigmoid and our custom sigmoid solution”, TO your earlier statement that 
Spark Streaming does “NOT” crash UNCEREMNOUSLY – please maintain responsible 
and objective communication and facts

 

From: Akhil Das [mailto:ak...@sigmoidanalytics.com] 
Sent: Monday, May 18, 2015 2:28 PM
To: Evo Eftimov
Cc: Dmitry Goldenberg; user@spark.apache.org
Subject: Re: Spark Streaming and reducing latency

 

we = Sigmoid

 

back-pressuring mechanism = Stoping the receiver from receiving more messages 
when its about to exhaust the worker memory. Here's a similar kind of proposal 
if you haven't seen already.

 

 



Thanks

Best Regards

 

On Mon, May 18, 2015 at 6:53 PM, Evo Eftimov evo.efti...@isecc.com wrote:

Who are “we” and what is the mysterious “back-pressuring mechanism” and is it 
part of the Spark Distribution (are you talking about implementation of the 
custom feedback loop mentioned in my previous emails below)- asking these 
because I can assure you that at least as of Spark Streaming 1.2.0, as Evo says 
Spark Streaming DOES crash in “unceremonious way” when the free RAM available 
for In Memory Cashed RDDs gets exhausted

 

From: Akhil Das [mailto:ak...@sigmoidanalytics.com] 
Sent: Monday, May 18, 2015 2:03 PM
To: Evo Eftimov
Cc: Dmitry Goldenberg; user@spark.apache.org


Subject: Re: Spark Streaming and reducing latency

 

We fix the receivers rate at which it should consume at any given point of 
time. Also we have a back-pressuring mechanism attached to the receivers so it 
won't simply crashes in the unceremonious way like Evo said. Mesos has some 
sort of auto-scaling (read it somewhere), may be you can look into that also.



Thanks

Best Regards

 

On Mon, May 18, 2015 at 5:20 PM, Evo Eftimov evo.efti...@isecc.com wrote:

And if you want to genuinely “reduce the latency” (still within the boundaries 
of the micro-batch) THEN you need to design and finely tune the Parallel 
Programming / Execution Model of your application. The objective/metric here is:

 

a)  Consume all data within your selected micro-batch window WITHOUT any 
artificial message rate limits

b)  The above will result in a certain size of Dstream RDD per micro-batch.

c)   The objective now is to Process that RDD WITHIN the time of the 
micro-batch (and also account for temporary message rate spike etc which may 
further increase the size of the RDD) – this will avoid any clogging up of the 
app and will process your messages at the lowest latency possible in a 
micro-batch architecture

d)  You achieve the objective stated in c by designing, varying and 
experimenting with various aspects of the Spark Streaming Parallel Programming 
and Execution Model – e.g. number of receivers, number of threads per receiver, 
number of executors, number of cores, RAM allocated to executors, number of RDD 
partitions which correspond to the number of parallel threads operating on the 
RDD etc etc  

 

Re the “unceremonious removal of DStream RDDs” from RAM by Spark Streaming when 
the available RAM is exhausted due to high message rate and which crashes your 
(hence clogged up) application the name of the condition is:

 

Loss was due to java.lang.Exception  

java.lang.Exception: Could not compute split, block
input-4-1410542878200 not found

 

From: Evo Eftimov [mailto:evo.efti...@isecc.com] 
Sent: Monday, May 18, 2015 12:13 PM
To: 'Dmitry Goldenberg'; 'Akhil Das'
Cc: 'user@spark.apache.org'
Subject: RE: Spark Streaming and reducing latency

 

You can use

 

spark.streaming.receiver.maxRate

not set

Maximum rate (number of records per second) at which each receiver will receive 
data. Effectively, each stream will consume at most this number of records per 
second. Setting this configuration to 0 or a negative number will 

Re: [SparkSQL 1.4.0] groupBy columns are always nullable?

2015-05-18 Thread Olivier Girardot
PR is opened : https://github.com/apache/spark/pull/6237

Le ven. 15 mai 2015 à 17:55, Olivier Girardot ssab...@gmail.com a écrit :

 yes, please do and send me the link.
 @rxin I have trouble building master, but the code is done...


 Le ven. 15 mai 2015 à 01:27, Haopu Wang hw...@qilinsoft.com a écrit :

  Thank you, should I open a JIRA for this issue?


  --

 *From:* Olivier Girardot [mailto:ssab...@gmail.com]
 *Sent:* Tuesday, May 12, 2015 5:12 AM
 *To:* Reynold Xin
 *Cc:* Haopu Wang; user
 *Subject:* Re: [SparkSQL 1.4.0] groupBy columns are always nullable?



 I'll look into it - not sure yet what I can get out of exprs :p



 Le lun. 11 mai 2015 à 22:35, Reynold Xin r...@databricks.com a écrit :

 Thanks for catching this. I didn't read carefully enough.



 It'd make sense to have the udaf result be non-nullable, if the exprs are
 indeed non-nullable.



 On Mon, May 11, 2015 at 1:32 PM, Olivier Girardot ssab...@gmail.com
 wrote:

 Hi Haopu,
 actually here `key` is nullable because this is your input's schema :

 scala result.printSchema

 root
 |-- key: string (nullable = true)
 |-- SUM(value): long (nullable = true)

 scala df.printSchema
 root
 |-- key: string (nullable = true)
 |-- value: long (nullable = false)



 I tried it with a schema where the key is not flagged as nullable, and
 the schema is actually respected. What you can argue however is that
 SUM(value) should also be not nullable since value is not nullable.



 @rxin do you think it would be reasonable to flag the Sum aggregation
 function as nullable (or not) depending on the input expression's schema ?



 Regards,



 Olivier.

 Le lun. 11 mai 2015 à 22:07, Reynold Xin r...@databricks.com a écrit :

 Not by design. Would you be interested in submitting a pull request?



 On Mon, May 11, 2015 at 1:48 AM, Haopu Wang hw...@qilinsoft.com wrote:

 I try to get the result schema of aggregate functions using DataFrame
 API.

 However, I find the result field of groupBy columns are always nullable
 even the source field is not nullable.

 I want to know if this is by design, thank you! Below is the simple code
 to show the issue.

 ==

   import sqlContext.implicits._
   import org.apache.spark.sql.functions._
   case class Test(key: String, value: Long)
   val df = sc.makeRDD(Seq(Test(k1,2),Test(k1,1))).toDF

   val result = df.groupBy(key).agg($key, sum(value))

   // From the output, you can see the key column is nullable, why??
   result.printSchema
 //root
 // |-- key: string (nullable = true)
 // |-- SUM(value): long (nullable = true)


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org








Re: Error communicating with MapOutputTracker

2015-05-18 Thread Imran Rashid
On Fri, May 15, 2015 at 5:09 PM, Thomas Gerber thomas.ger...@radius.com
wrote:

 Now, we noticed that we get java heap OOM exceptions on the output tracker
 when we have too many tasks. I wonder:
 1. where does the map output tracker live? The driver? The master (when
 those are not the same)?
 2. how can we increase the heap for it? Especially when using spark-submit?


It does not live on the master -- that is only in a standalone cluster, and
it does very little work.  (Though there are *Master and *Worker variants
of the class, its really running on the driver and the executors.)  If you
are getting OOMs in the MapOutputTrackerMaster (which lives on the driver),
then you can increase the memory for the driver via the normal args for
controlling driver memory, with --driver-memory 10G or whatever.

Just to be clear, if you hit an OOM from somewhere in the MapOutputTracker
code, it just means that code is what pushed things over the top.  Of
course you could have 99% of your memory used by something else, perhaps
your own data structures, which perhaps could be trimmed down.  You could
get a heap dump on the driver to see where the memory is really getting
used.

Do you mind sharing the details of how you hit these OOMs?  How much
memory, how many partitions on each side of the shuffle?  Sort based
shuffle I assume?

thanks,
Imran


Re: applications are still in progress?

2015-05-18 Thread Imran Rashid
Most likely, you never call sc.stop().

Note that in 1.4, this will happen for you automatically in a shutdown
hook, taken care of by https://issues.apache.org/jira/browse/SPARK-3090

On Wed, May 13, 2015 at 8:04 AM, Yifan LI iamyifa...@gmail.com wrote:

 Hi,

 I have some applications finished(but actually failed before), that in
 WebUI show
 Application myApp is still in progress.

 and, in the eventlog folder, there are several log files like this:

 app-20150512***.inprogress

 So, I am wondering what the “inprogress” means…

 Thanks! :)


 Best,
 Yifan LI








Re: How to run multiple jobs in one sparkcontext from separate threads in pyspark?

2015-05-18 Thread Davies Liu
SparkContext can be used in multiple threads (Spark streaming works
with multiple threads), for example:

import threading
import time

def show(x):
 time.sleep(1)
 print x

def job():
 sc.parallelize(range(100)).foreach(show)

threading.Thread(target=job).start()


On Mon, May 18, 2015 at 12:34 AM, ayan guha guha.a...@gmail.com wrote:
 Hi

 So to be clear, do you want to run one operation in multiple threads within
 a function or you want run multiple jobs using multiple threads? I am
 wondering why python thread module can't be used? Or you have already gave
 it a try?

 On 18 May 2015 16:39, MEETHU MATHEW meethu2...@yahoo.co.in wrote:

 Hi Akhil,

 The python wrapper for Spark Job Server did not help me. I actually need
 the pyspark code sample  which shows how  I can call a function from 2
 threads and execute it simultaneously.

 Thanks  Regards,
 Meethu M



 On Thursday, 14 May 2015 12:38 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:


 Did you happened to have a look at the spark job server? Someone wrote a
 python wrapper around it, give it a try.

 Thanks
 Best Regards

 On Thu, May 14, 2015 at 11:10 AM, MEETHU MATHEW meethu2...@yahoo.co.in
 wrote:

 Hi all,

  Quote
  Inside a given Spark application (SparkContext instance), multiple
 parallel jobs can run simultaneously if they were submitted from separate
 threads. 

 How to run multiple jobs in one SPARKCONTEXT using separate threads in
 pyspark? I found some examples in scala and java, but couldn't find python
 code. Can anyone help me with a pyspark example?

 Thanks  Regards,
 Meethu M






-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: parallelism on binary file

2015-05-18 Thread Imran Rashid
You can use sc.hadoopFile (or any of the variants) to do what you want.
They even let you reuse your existing HadoopInputFormats.  You should be
able to mimic your old use with MR just fine.  sc.textFile is just a
convenience method which sits on top.

imran

On Fri, May 8, 2015 at 12:03 PM, tog guillaume.all...@gmail.com wrote:

 Hi

 I havé an application that currently run using MR. It currently starts
 extracting information from a proprietary binary file that is copied to
 HDFS. The application starts by creating business objects from information
 extracted from the binary files. Later those objects are used for further
 processing using again MR jobs.

 I am planning to move towards Spark and I clearly see that I could use
 JavaRDDbusinessObjects for parallel processing. however it is not yet
 obvious what could be the process to generate this RDD from my binary file
 in parallel.

 Today I use parallelism based on the split assign to each of the map
 elements of a job. Can I mimick such a thing using spark. All example I
 have seen so far are using text files for which I guess the partitions are
 based on a given number of contiguous lines.

 Any help or pointer would be appreciated

 Cheers
 Guillaume



 --
 PGP KeyID: 2048R/EA31CFC9  subkeys.pgp.net



Re: Implementing custom metrics under MLPipeline's BinaryClassificationEvaluator

2015-05-18 Thread Joseph Bradley
Hi Justin,

It sound like you're on the right track.  The best way to write a custom
Evaluator will probably be to modify an existing Evaluator as you
described.  It's best if you don't remove the other code, which handles
parameter set/get and schema validation.

Joseph

On Sun, May 17, 2015 at 10:35 PM, Justin Yip yipjus...@prediction.io
wrote:

 Hello,

 I would like to use other metrics in BinaryClassificaitonEvaluator, I am
 thinking about simple ones (i.e. PrecisionByThreshold). From the api site,
 I can't tell much about how to implement it.

 From the code, it seems like I will have to override this function, using
 most of the existing code for checking column schema, then replace the line
 which compute the actual score
 https://github.com/apache/spark/blob/1b8625f4258d6d1a049d0ba60e39e9757f5a568b/mllib/src/main/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluator.scala#L72
 .

 Is my understanding correct? Or there are more convenient way of
 implementing a metric in order to be used by ML pipeline?

 Thanks.

 Justin

 --
 View this message in context: Implementing custom metrics under
 MLPipeline's BinaryClassificationEvaluator
 http://apache-spark-user-list.1001560.n3.nabble.com/Implementing-custom-metrics-under-MLPipeline-s-BinaryClassificationEvaluator-tp22930.html
 Sent from the Apache Spark User List mailing list archive
 http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.



TwitterUtils on Windows

2015-05-18 Thread Justin Pihony
I am trying to print a basic twitter stream and receiving the following
error:


15/05/18 22:03:14 INFO Executor: Fetching
http://192.168.56.1:49752/jars/twitter4j-media-support-3.0.3.jar with
timestamp 1432000973058
15/05/18 22:03:14 INFO Utils: Fetching
http://192.168.56.1:49752/jars/twitter4j-media-support-3.0.3.jar to
C:\Users\Justin\AppData\Local\Temp\spark-4a37d3
e9-34a2-40d4-b09b-6399931f527d\userFiles-65ee748e-4721-4e16-9fe6-65933651fec1\fetchFileTemp8970201232303518432.tmp
15/05/18 22:03:14 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.NullPointerException
at java.lang.ProcessBuilder.start(ProcessBuilder.java:1012)
at org.apache.hadoop.util.Shell.runCommand(Shell.java:482)
at org.apache.hadoop.util.Shell.run(Shell.java:455)
at
org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.ja
va:715)
at org.apache.hadoop.fs.FileUtil.chmod(FileUtil.java:873)
at org.apache.hadoop.fs.FileUtil.chmod(FileUtil.java:853)
at org.apache.spark.util.Utils$.fetchFile(Utils.scala:443)
at
org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:374)
at
org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:366)
at
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
at
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at
org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$updateDependencies(Executor.scala:366)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:184)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:744)


Code is:

spark-shell --jars
\Spark\lib\spark-streaming-twitter_2.10-1.3.1.jar,\Spark\lib\twitter4j-async-3.0.3.jar,\Spark\lib\twitter4j-core-3.0.3.jar,\Spark\lib\twitter4j-media-support-3.0.3.jar,\Spark\lib\twitter4j-stream-3.0.3.jar

import org.apache.spark.streaming.twitter._
import org.apache.spark.streaming._

System.setProperty(twitter4j.oauth.consumerKey,*)
System.setProperty(twitter4j.oauth.consumerSecret,*)
System.setProperty(twitter4j.oauth.accessToken,*)
System.setProperty(twitter4j.oauth.accessTokenSecret,*)

val ssc = new StreamingContext(sc, Seconds(10))
val stream = TwitterUtils.createStream(ssc, None)
stream.print
ssc.start


This seems to be happening at FileUtil.chmod(targetFile.getAbsolutePath,
a+x) but Im not sure why...



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/TwitterUtils-on-Windows-tp22939.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: number of executors

2015-05-18 Thread xiaohe lan
Yeah, I read that page before, but it does not mention the options should
come before the application jar. Actually, if I put the --class option
before the application jar, I will get  ClassNotFoundException.

Anyway, thanks again Sandy.

On Tue, May 19, 2015 at 11:06 AM, Sandy Ryza sandy.r...@cloudera.com
wrote:

 Awesome!

 It's documented here:
 https://spark.apache.org/docs/latest/submitting-applications.html

 -Sandy

 On Mon, May 18, 2015 at 8:03 PM, xiaohe lan zombiexco...@gmail.com
 wrote:

 Hi Sandy,

 Thanks for your information. Yes, spark-submit --master yarn
 --num-executors 5 --executor-cores 4
 target/scala-2.10/simple-project_2.10-1.0.jar --class scala.SimpleApp is
 working awesomely. Is there any documentations pointing to this ?

 Thanks,
 Xiaohe

 On Tue, May 19, 2015 at 12:07 AM, Sandy Ryza sandy.r...@cloudera.com
 wrote:

 Hi Xiaohe,

 The all Spark options must go before the jar or they won't take effect.

 -Sandy

 On Sun, May 17, 2015 at 8:59 AM, xiaohe lan zombiexco...@gmail.com
 wrote:

 Sorry, them both are assigned task actually.

 Aggregated Metrics by Executor
 Executor IDAddressTask TimeTotal TasksFailed TasksSucceeded TasksInput
 Size / RecordsShuffle Write Size / RecordsShuffle Spill (Memory)Shuffle
 Spill (Disk)1host1:61841.7 min505640.0 MB / 12318400382.3 MB / 
 121007701630.4
 MB295.4 MB2host2:620721.7 min505640.0 MB / 12014510386.0 MB / 
 109269121646.6
 MB304.8 MB

 On Sun, May 17, 2015 at 11:50 PM, xiaohe lan zombiexco...@gmail.com
 wrote:

 bash-4.1$ ps aux | grep SparkSubmit
 xilan 1704 13.2  1.2 5275520 380244 pts/0  Sl+  08:39   0:13
 /scratch/xilan/jdk1.8.0_45/bin/java -cp
 /scratch/xilan/spark/conf:/scratch/xilan/spark/lib/spark-assembly-1.3.1-hadoop2.4.0.jar:/scratch/xilan/spark/lib/datanucleus-core-3.2.10.jar:/scratch/xilan/spark/lib/datanucleus-api-jdo-3.2.6.jar:/scratch/xilan/spark/lib/datanucleus-rdbms-3.2.9.jar:/scratch/xilan/hadoop/etc/hadoop
 -Xms512m -Xmx512m org.apache.spark.deploy.SparkSubmit --master yarn
 target/scala-2.10/simple-project_2.10-1.0.jar --class scala.SimpleApp
 --num-executors 5 --executor-cores 4
 xilan 1949  0.0  0.0 103292   800 pts/1S+   08:40   0:00 grep
 --color SparkSubmit


 When look at the sparkui, I see the following:
 Aggregated Metrics by ExecutorExecutor IDAddressTask TimeTotal TasksFailed
 TasksSucceeded TasksShuffle Read Size / Records1host1:304836 s101127.1
 MB / 28089782host2:49970 ms00063.4 MB / 1810945

 So executor 2 is not even assigned a task ? Maybe I have some problems
 in my setting, but I don't know what could be the possible settings I set
 wrong or have not set.


 Thanks,
 Xiaohe

 On Sun, May 17, 2015 at 11:16 PM, Akhil Das 
 ak...@sigmoidanalytics.com wrote:

 Did you try --executor-cores param? While you submit the job, do a ps
 aux | grep spark-submit and see the exact command parameters.

 Thanks
 Best Regards

 On Sat, May 16, 2015 at 12:31 PM, xiaohe lan zombiexco...@gmail.com
 wrote:

 Hi,

 I have a 5 nodes yarn cluster, I used spark-submit to submit a
 simple app.

  spark-submit --master yarn
 target/scala-2.10/simple-project_2.10-1.0.jar --class scala.SimpleApp
 --num-executors 5

 I have set the number of executor to 5, but from sparkui I could see
 only two executors and it ran very slow. What did I miss ?

 Thanks,
 Xiaohe










Re: TwitterUtils on Windows

2015-05-18 Thread Justin Pihony
I think I found the answer -
http://apache-spark-user-list.1001560.n3.nabble.com/Error-while-running-example-scala-application-using-spark-submit-td10056.html

Do I have no way of running this in Windows locally?


On Mon, May 18, 2015 at 10:44 PM, Justin Pihony justin.pih...@gmail.com
wrote:

 I'm not 100% sure that is causing a problem, though. The stream still
 starts, but is giving blank output. I checked the environment variables in
 the ui and it is running local[*], so there should be no bottleneck there.

 On Mon, May 18, 2015 at 10:08 PM, Justin Pihony justin.pih...@gmail.com
 wrote:

 I am trying to print a basic twitter stream and receiving the following
 error:


 15/05/18 22:03:14 INFO Executor: Fetching
 http://192.168.56.1:49752/jars/twitter4j-media-support-3.0.3.jar with
 timestamp 1432000973058
 15/05/18 22:03:14 INFO Utils: Fetching
 http://192.168.56.1:49752/jars/twitter4j-media-support-3.0.3.jar to
 C:\Users\Justin\AppData\Local\Temp\spark-4a37d3

 e9-34a2-40d4-b09b-6399931f527d\userFiles-65ee748e-4721-4e16-9fe6-65933651fec1\fetchFileTemp8970201232303518432.tmp
 15/05/18 22:03:14 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID
 0)
 java.lang.NullPointerException
 at java.lang.ProcessBuilder.start(ProcessBuilder.java:1012)
 at org.apache.hadoop.util.Shell.runCommand(Shell.java:482)
 at org.apache.hadoop.util.Shell.run(Shell.java:455)
 at
 org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.ja
 va:715)
 at org.apache.hadoop.fs.FileUtil.chmod(FileUtil.java:873)
 at org.apache.hadoop.fs.FileUtil.chmod(FileUtil.java:853)
 at org.apache.spark.util.Utils$.fetchFile(Utils.scala:443)
 at

 org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:374)
 at

 org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:366)
 at

 scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
 at

 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
 at

 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
 at
 scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
 at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
 at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
 at

 scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
 at
 org.apache.spark.executor.Executor.org
 $apache$spark$executor$Executor$$updateDependencies(Executor.scala:366)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:184)
 at

 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at

 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:744)


 Code is:

 spark-shell --jars

 \Spark\lib\spark-streaming-twitter_2.10-1.3.1.jar,\Spark\lib\twitter4j-async-3.0.3.jar,\Spark\lib\twitter4j-core-3.0.3.jar,\Spark\lib\twitter4j-media-support-3.0.3.jar,\Spark\lib\twitter4j-stream-3.0.3.jar

 import org.apache.spark.streaming.twitter._
 import org.apache.spark.streaming._

 System.setProperty(twitter4j.oauth.consumerKey,*)
 System.setProperty(twitter4j.oauth.consumerSecret,*)
 System.setProperty(twitter4j.oauth.accessToken,*)
 System.setProperty(twitter4j.oauth.accessTokenSecret,*)

 val ssc = new StreamingContext(sc, Seconds(10))
 val stream = TwitterUtils.createStream(ssc, None)
 stream.print
 ssc.start


 This seems to be happening at FileUtil.chmod(targetFile.getAbsolutePath,
 a+x) but Im not sure why...



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/TwitterUtils-on-Windows-tp22939.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





RE: Spark sql error while writing Parquet file- Trying to write more fields than contained in row

2015-05-18 Thread Chandra Mohan, Ananda Vel Murugan
Hi,

Thanks for the response. But I could not see fillna function in DataFrame class.

[cid:image001.png@01D0920E.32B14460]


Is it available in some specific version of Spark sql. This is what I have in 
my pom.xml

dependency
  groupIdorg.apache.spark/groupId
  artifactIdspark-sql_2.10/artifactId
  version1.3.1/version
   /dependency

Regards,
Anand.C

From: ayan guha [mailto:guha.a...@gmail.com]
Sent: Monday, May 18, 2015 5:19 PM
To: Chandra Mohan, Ananda Vel Murugan; user
Subject: Re: Spark sql error while writing Parquet file- Trying to write more 
fields than contained in row

Hi

Give a try with dtaFrame.fillna function to fill up missing column

Best
Ayan

On Mon, May 18, 2015 at 8:29 PM, Chandra Mohan, Ananda Vel Murugan 
ananda.muru...@honeywell.commailto:ananda.muru...@honeywell.com wrote:
Hi,

I am using spark-sql to read a CSV file and write it as parquet file. I am 
building the schema using the following code.

String schemaString = a b c;
   ListStructField fields = new ArrayListStructField();
   MetadataBuilder mb = new MetadataBuilder();
   mb.putBoolean(nullable, true);
   Metadata m = mb.build();
   for (String fieldName: schemaString.split( )) {
fields.add(new StructField(fieldName,DataTypes.DoubleType,true, 
m));
   }
   StructType schema = DataTypes.createStructType(fields);

Some of the rows in my input csv does not contain three columns. After building 
my JavaRDDRow, I create data frame as shown below using the RDD and schema.

DataFrame darDataFrame = sqlContext.createDataFrame(rowRDD, schema);

Finally I try to save it as Parquet file

darDataFrame.saveAsParquetFile(/home/anand/output.parquet”)

I get this error when saving it as Parquet file

java.lang.IndexOutOfBoundsException: Trying to write more fields than contained 
in row (3  2)

I understand the reason behind this error. Some of my rows in Row RDD does not 
contain three elements as some rows in my input csv does not contain three 
columns. But while building the schema, I am specifying every field as 
nullable. So I believe, it should not throw this error. Can anyone help me fix 
this error. Thank you.

Regards,
Anand.C





--
Best Regards,
Ayan Guha


Re: TwitterUtils on Windows

2015-05-18 Thread Justin Pihony
I'm not 100% sure that is causing a problem, though. The stream still
starts, but is giving blank output. I checked the environment variables in
the ui and it is running local[*], so there should be no bottleneck there.

On Mon, May 18, 2015 at 10:08 PM, Justin Pihony justin.pih...@gmail.com
wrote:

 I am trying to print a basic twitter stream and receiving the following
 error:


 15/05/18 22:03:14 INFO Executor: Fetching
 http://192.168.56.1:49752/jars/twitter4j-media-support-3.0.3.jar with
 timestamp 1432000973058
 15/05/18 22:03:14 INFO Utils: Fetching
 http://192.168.56.1:49752/jars/twitter4j-media-support-3.0.3.jar to
 C:\Users\Justin\AppData\Local\Temp\spark-4a37d3

 e9-34a2-40d4-b09b-6399931f527d\userFiles-65ee748e-4721-4e16-9fe6-65933651fec1\fetchFileTemp8970201232303518432.tmp
 15/05/18 22:03:14 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID
 0)
 java.lang.NullPointerException
 at java.lang.ProcessBuilder.start(ProcessBuilder.java:1012)
 at org.apache.hadoop.util.Shell.runCommand(Shell.java:482)
 at org.apache.hadoop.util.Shell.run(Shell.java:455)
 at
 org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.ja
 va:715)
 at org.apache.hadoop.fs.FileUtil.chmod(FileUtil.java:873)
 at org.apache.hadoop.fs.FileUtil.chmod(FileUtil.java:853)
 at org.apache.spark.util.Utils$.fetchFile(Utils.scala:443)
 at

 org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:374)
 at

 org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:366)
 at

 scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
 at
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
 at
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
 at
 scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
 at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
 at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
 at

 scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
 at
 org.apache.spark.executor.Executor.org
 $apache$spark$executor$Executor$$updateDependencies(Executor.scala:366)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:184)
 at

 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at

 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:744)


 Code is:

 spark-shell --jars

 \Spark\lib\spark-streaming-twitter_2.10-1.3.1.jar,\Spark\lib\twitter4j-async-3.0.3.jar,\Spark\lib\twitter4j-core-3.0.3.jar,\Spark\lib\twitter4j-media-support-3.0.3.jar,\Spark\lib\twitter4j-stream-3.0.3.jar

 import org.apache.spark.streaming.twitter._
 import org.apache.spark.streaming._

 System.setProperty(twitter4j.oauth.consumerKey,*)
 System.setProperty(twitter4j.oauth.consumerSecret,*)
 System.setProperty(twitter4j.oauth.accessToken,*)
 System.setProperty(twitter4j.oauth.accessTokenSecret,*)

 val ssc = new StreamingContext(sc, Seconds(10))
 val stream = TwitterUtils.createStream(ssc, None)
 stream.print
 ssc.start


 This seems to be happening at FileUtil.chmod(targetFile.getAbsolutePath,
 a+x) but Im not sure why...



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/TwitterUtils-on-Windows-tp22939.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: number of executors

2015-05-18 Thread xiaohe lan
Hi Sandy,

Thanks for your information. Yes, spark-submit --master yarn
--num-executors 5 --executor-cores 4
target/scala-2.10/simple-project_2.10-1.0.jar --class scala.SimpleApp is
working awesomely. Is there any documentations pointing to this ?

Thanks,
Xiaohe

On Tue, May 19, 2015 at 12:07 AM, Sandy Ryza sandy.r...@cloudera.com
wrote:

 Hi Xiaohe,

 The all Spark options must go before the jar or they won't take effect.

 -Sandy

 On Sun, May 17, 2015 at 8:59 AM, xiaohe lan zombiexco...@gmail.com
 wrote:

 Sorry, them both are assigned task actually.

 Aggregated Metrics by Executor
 Executor IDAddressTask TimeTotal TasksFailed TasksSucceeded TasksInput
 Size / RecordsShuffle Write Size / RecordsShuffle Spill (Memory)Shuffle
 Spill (Disk)1host1:61841.7 min505640.0 MB / 12318400382.3 MB / 121007701630.4
 MB295.4 MB2host2:620721.7 min505640.0 MB / 12014510386.0 MB / 109269121646.6
 MB304.8 MB

 On Sun, May 17, 2015 at 11:50 PM, xiaohe lan zombiexco...@gmail.com
 wrote:

 bash-4.1$ ps aux | grep SparkSubmit
 xilan 1704 13.2  1.2 5275520 380244 pts/0  Sl+  08:39   0:13
 /scratch/xilan/jdk1.8.0_45/bin/java -cp
 /scratch/xilan/spark/conf:/scratch/xilan/spark/lib/spark-assembly-1.3.1-hadoop2.4.0.jar:/scratch/xilan/spark/lib/datanucleus-core-3.2.10.jar:/scratch/xilan/spark/lib/datanucleus-api-jdo-3.2.6.jar:/scratch/xilan/spark/lib/datanucleus-rdbms-3.2.9.jar:/scratch/xilan/hadoop/etc/hadoop
 -Xms512m -Xmx512m org.apache.spark.deploy.SparkSubmit --master yarn
 target/scala-2.10/simple-project_2.10-1.0.jar --class scala.SimpleApp
 --num-executors 5 --executor-cores 4
 xilan 1949  0.0  0.0 103292   800 pts/1S+   08:40   0:00 grep
 --color SparkSubmit


 When look at the sparkui, I see the following:
 Aggregated Metrics by ExecutorExecutor IDAddressTask TimeTotal TasksFailed
 TasksSucceeded TasksShuffle Read Size / Records1host1:304836 s101127.1
 MB / 28089782host2:49970 ms00063.4 MB / 1810945

 So executor 2 is not even assigned a task ? Maybe I have some problems
 in my setting, but I don't know what could be the possible settings I set
 wrong or have not set.


 Thanks,
 Xiaohe

 On Sun, May 17, 2015 at 11:16 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Did you try --executor-cores param? While you submit the job, do a ps
 aux | grep spark-submit and see the exact command parameters.

 Thanks
 Best Regards

 On Sat, May 16, 2015 at 12:31 PM, xiaohe lan zombiexco...@gmail.com
 wrote:

 Hi,

 I have a 5 nodes yarn cluster, I used spark-submit to submit a simple
 app.

  spark-submit --master yarn
 target/scala-2.10/simple-project_2.10-1.0.jar --class scala.SimpleApp
 --num-executors 5

 I have set the number of executor to 5, but from sparkui I could see
 only two executors and it ran very slow. What did I miss ?

 Thanks,
 Xiaohe








Re: number of executors

2015-05-18 Thread Sandy Ryza
Awesome!

It's documented here:
https://spark.apache.org/docs/latest/submitting-applications.html

-Sandy

On Mon, May 18, 2015 at 8:03 PM, xiaohe lan zombiexco...@gmail.com wrote:

 Hi Sandy,

 Thanks for your information. Yes, spark-submit --master yarn
 --num-executors 5 --executor-cores 4
 target/scala-2.10/simple-project_2.10-1.0.jar --class scala.SimpleApp is
 working awesomely. Is there any documentations pointing to this ?

 Thanks,
 Xiaohe

 On Tue, May 19, 2015 at 12:07 AM, Sandy Ryza sandy.r...@cloudera.com
 wrote:

 Hi Xiaohe,

 The all Spark options must go before the jar or they won't take effect.

 -Sandy

 On Sun, May 17, 2015 at 8:59 AM, xiaohe lan zombiexco...@gmail.com
 wrote:

 Sorry, them both are assigned task actually.

 Aggregated Metrics by Executor
 Executor IDAddressTask TimeTotal TasksFailed TasksSucceeded TasksInput
 Size / RecordsShuffle Write Size / RecordsShuffle Spill (Memory)Shuffle
 Spill (Disk)1host1:61841.7 min505640.0 MB / 12318400382.3 MB / 
 121007701630.4
 MB295.4 MB2host2:620721.7 min505640.0 MB / 12014510386.0 MB / 109269121646.6
 MB304.8 MB

 On Sun, May 17, 2015 at 11:50 PM, xiaohe lan zombiexco...@gmail.com
 wrote:

 bash-4.1$ ps aux | grep SparkSubmit
 xilan 1704 13.2  1.2 5275520 380244 pts/0  Sl+  08:39   0:13
 /scratch/xilan/jdk1.8.0_45/bin/java -cp
 /scratch/xilan/spark/conf:/scratch/xilan/spark/lib/spark-assembly-1.3.1-hadoop2.4.0.jar:/scratch/xilan/spark/lib/datanucleus-core-3.2.10.jar:/scratch/xilan/spark/lib/datanucleus-api-jdo-3.2.6.jar:/scratch/xilan/spark/lib/datanucleus-rdbms-3.2.9.jar:/scratch/xilan/hadoop/etc/hadoop
 -Xms512m -Xmx512m org.apache.spark.deploy.SparkSubmit --master yarn
 target/scala-2.10/simple-project_2.10-1.0.jar --class scala.SimpleApp
 --num-executors 5 --executor-cores 4
 xilan 1949  0.0  0.0 103292   800 pts/1S+   08:40   0:00 grep
 --color SparkSubmit


 When look at the sparkui, I see the following:
 Aggregated Metrics by ExecutorExecutor IDAddressTask TimeTotal TasksFailed
 TasksSucceeded TasksShuffle Read Size / Records1host1:304836 s101127.1
 MB / 28089782host2:49970 ms00063.4 MB / 1810945

 So executor 2 is not even assigned a task ? Maybe I have some problems
 in my setting, but I don't know what could be the possible settings I set
 wrong or have not set.


 Thanks,
 Xiaohe

 On Sun, May 17, 2015 at 11:16 PM, Akhil Das ak...@sigmoidanalytics.com
  wrote:

 Did you try --executor-cores param? While you submit the job, do a ps
 aux | grep spark-submit and see the exact command parameters.

 Thanks
 Best Regards

 On Sat, May 16, 2015 at 12:31 PM, xiaohe lan zombiexco...@gmail.com
 wrote:

 Hi,

 I have a 5 nodes yarn cluster, I used spark-submit to submit a simple
 app.

  spark-submit --master yarn
 target/scala-2.10/simple-project_2.10-1.0.jar --class scala.SimpleApp
 --num-executors 5

 I have set the number of executor to 5, but from sparkui I could see
 only two executors and it ran very slow. What did I miss ?

 Thanks,
 Xiaohe









Spark Streaming graceful shutdown in Spark 1.4

2015-05-18 Thread Dibyendu Bhattacharya
Hi,

Just figured out that if I want to perform graceful shutdown of Spark
Streaming 1.4 ( from master ) , the Runtime.getRuntime().addShutdownHook no
longer works . As in Spark 1.4 there is Utils.addShutdownHook defined for
Spark Core, that gets anyway called , which leads to graceful shutdown from
Spark streaming failed with error like Sparkcontext already closed issue.

To solve this , I need to explicitly add Utils.addShutdownHook in my driver
with higher priority ( say 150 ) than Spark's shutdown priority of 50 , and
there I specified streamingcontext stop method with (false , true)
parameter.

Just curious to know , if this is how we need to handle shutdown hook going
forward ?

Can't we make the streaming shutdown default to gracefully  shutdown ?

Also the Java Api for adding shutdownhook in Utils looks very dirty with
methods like this ..



Utils.addShutdownHook(150, new Function0BoxedUnit() {
 @Override
public BoxedUnit apply() {
return null;
}

@Override
public byte apply$mcB$sp() {
return 0;
}

@Override
public char apply$mcC$sp() {
return 0;
}

@Override
public double apply$mcD$sp() {
return 0;
}

@Override
public float apply$mcF$sp() {
return 0;
}

@Override
public int apply$mcI$sp() {
// TODO Auto-generated method stub
return 0;
}

@Override
public long apply$mcJ$sp() {
return 0;
}

@Override
public short apply$mcS$sp() {
return 0;
}

@Override
public void apply$mcV$sp() {
 *jsc.stop(false, true);*
 }

@Override
public boolean apply$mcZ$sp() {
// TODO Auto-generated method stub
return false;
}
});


Re: spark log field clarification

2015-05-18 Thread Imran Rashid
depends what you mean by output data.  Do you mean:

* the data that is sent back to the driver? that is result size
* the shuffle output?  that is in Shuffle Write Metrics
* the data written to a hadoop output format?  that is in Output Metrics

On Thu, May 14, 2015 at 2:22 PM, yanwei echo@gmail.com wrote:

 I am trying to extract the *output data size* information for *each task*.
 What *field(s)* should I look for, given the json-format log?

 Also, what does Result Size stand for?

 Thanks a lot in advance!
 -Yanwei



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/spark-log-field-clarification-tp22892.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: LogisticRegressionWithLBFGS with large feature set

2015-05-18 Thread Imran Rashid
I'm not super familiar with this part of the code, but from taking a quick
look:

a) the code creates a MultivariateOnlineSummarizer, which stores 7 doubles
per feature (mean, max, min, etc. etc.)
b) The limit is on the result size from *all* tasks, not from one task.
You start with 3072 tasks
c) tree aggregate should first merge things down to about 8 partitions
before bringing results back to the driver, which is how you end up with 54
tasks at your failure.

this means you should have about 30 MB / per task per meaure * 54 tasks * 7
measures, which comes to about 11GB, or in the ballpark of what you found.

In principle, you could get this working by adding more levels to the
treeAggregate (the depth parameter), but looks like that isn't exposed.
You could also try coalescing your data down to a smaller set of partitions
first, but that comes with other downsides.

Perhaps an MLLib expert could chime in on an alternate approach.  My
feeling (from a very quick look) is that there is room for some
optimization in the internals

Imran

On Thu, May 14, 2015 at 5:44 PM, Pala M Muthaia mchett...@rocketfuelinc.com
 wrote:

 Hi,

 I am trying to validate our modeling data pipeline by running
 LogisticRegressionWithLBFGS on a dataset with ~3.7 million features,
 basically to compute AUC. This is on Spark 1.3.0.

 I am using 128 executors with 4 GB each + driver with 8 GB. The number of
 data partitions is 3072

 The execution fails with the following messages:

 *Total size of serialized results of 54 tasks (10.4 GB) is bigger than
 spark.driver.maxResultSize (3.0 GB)*

 The associated stage in the job is treeAggregate at
 StandardScaler.scala:52
 http://lsv-10.rfiserve.net:18080/history/application_1426202183036_633264/stages/stage?id=3attempt=0
  :
 The call stack looks as below:

 org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:996)
 org.apache.spark.mllib.feature.StandardScaler.fit(StandardScaler.scala:52)
 org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm.run(GeneralizedLinearAlgorithm.scala:233)
 org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm.run(GeneralizedLinearAlgorithm.scala:190)


 I am trying to both understand why such large amount of data needs to be
 passed back to driver as well as figure out a way around this. I also want
 to understand how much memory is required, as a function of dataset size,
 feature set size, and number of iterations performed, for future
 experiments.

 From looking at the MLLib code, the largest data structure seems to be a
 dense vector of the same size as feature set. I am not familiar with
 algorithm or its implementation I would guess 3.7 million features would
 lead to a constant multiple of ~3.7 * 8 ~ 30 MB. So how does the dataset
 size become so large?

 I looked into the treeAggregate and it looks like hierarchical
 aggregation. If the data being sent to the driver is basically the
 aggregated coefficients (i.e. dense vectors) for the final aggregation,
 can't the dense vectors from executors be pulled in one at a time and
 merged in memory, rather than pulling all of them in together? (This is
 totally uneducated guess so i may be completely off here).

 Is there a way to get this running?

 Thanks,
 pala



Re: pass configuration parameters to PySpark job

2015-05-18 Thread Davies Liu
In PySpark, it serializes the functions/closures together with used
global values.

For example,

global_param = 111

def my_map(x):
 return x + global_param

rdd.map(my_map)

- Davies

On Mon, May 18, 2015 at 7:26 AM, Oleg Ruchovets oruchov...@gmail.com wrote:
 Hi ,
I am looking a way to pass configuration parameters to spark job.
 In general I have quite simple PySpark job.

   def process_model(k, vc):

do something



  sc = SparkContext(appName=TAD)
 lines = sc.textFile(input_job_files)
 result = lines.map(doSplit).groupByKey().map(lambda (k,vc):
 process_model(k,vc))

 Question:
 In case I need to pass to process_model function additional metadata ,
 parameters , etc ...

I tried to do something like
param = 'param1'
   result = lines.map(doSplit).groupByKey().map(lambda (param,k,vc):
 process_model(param1,k,vc)) ,

 but job stops to work , also it looks like not elegant solution.
 Is there a way to have access to SparkContext from my custom functions?
 I found that there are methods setLocalProperty/getLocalProperty   but I
 didn't find example how to use it for my requirements (from my function).

 It would be great to have short example how to pass parameters.

 Thanks
 Oleg.



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: com.esotericsoftware.kryo.KryoException: java.io.IOException: Stream is corrupted

2015-05-18 Thread Imran Rashid
Looks like this exception is after many more failures have occurred.  It is
already on attempt 6 for stage 7 -- I'd try to find out why attempt 0
failed.

This particular exception is probably a result of corruption that can
happen when stages are retried, that I'm working on addressing in
https://issues.apache.org/jira/browse/SPARK-7308.  But your real problem is
figuring out why the stage failed in the first place.


On Wed, May 13, 2015 at 6:01 AM, Yifan LI iamyifa...@gmail.com wrote:

 Hi,

 I was running our graphx application(worked finely on Spark 1.2.0) but
 failed on Spark 1.3.1 with below exception.

 Anyone has idea on this issue? I guess it was caused by using LZ4 codec?

 Exception in thread main org.apache.spark.SparkException: Job aborted
 due to stage failure: Task 54 in stage 7.6 failed 128 times, most recent
 failure: Lost task 54.127 in stage 7.6 (TID 5311,
 small15-tap1.common.lip6.fr): com.esotericsoftware.kryo.KryoException:
 java.io.IOException: Stream is corrupted
 at com.esotericsoftware.kryo.io.Input.fill(Input.java:142)
 at com.esotericsoftware.kryo.io.Input.require(Input.java:155)
 at com.esotericsoftware.kryo.io.Input.readInt(Input.java:337)
 at
 com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:109)
 at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:610)
 at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:721)
 at
 org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:138)
 at
 org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
 at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
 at
 org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
 at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
 at
 org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
 at
 org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at
 org.apache.spark.graphx.impl.ShippableVertexPartition$.apply(ShippableVertexPartition.scala:60)
 at org.apache.spark.graphx.VertexRDD$$anonfun$2.apply(VertexRDD.scala:300)
 at org.apache.spark.graphx.VertexRDD$$anonfun$2.apply(VertexRDD.scala:297)
 at
 org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
 at
 org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
 at org.apache.spark.scheduler.Task.run(Task.scala:64)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)
 Caused by: java.io.IOException: Stream is corrupted
 at net.jpountz.lz4.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:152)
 at net.jpountz.lz4.LZ4BlockInputStream.read(LZ4BlockInputStream.java:116)
 at com.esotericsoftware.kryo.io.Input.fill(Input.java:140)
 ... 35 more

 Driver stacktrace:
 at org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at
 org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
 at scala.Option.foreach(Option.scala:236)
 at
 org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
 at
 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
 at
 

Re: Spark on Yarn : Map outputs lifetime ?

2015-05-18 Thread Imran Rashid
Neither of those two.  Instead, the shuffle data is cleaned up when the
stage they are from get GC'ed by the jvm.  that is, when you are no longer
holding any references to anything which points to the old stages, and
there is an appropriate gc event.

The data is not cleaned up right after the stage completes, because it
might get used again by another later (eg., if the stage is retried).

On Tue, May 12, 2015 at 6:50 PM, Ashwin Shankar ashwinshanka...@gmail.com
wrote:

 Hi,
 In spark on yarn and when running spark_shuffle as auxiliary service on
 node manager, does map spills of a stage gets cleaned up once the next
 stage completes OR
 is it preserved till the app completes(ie waits for all the stages to
 complete) ?

 --
 Thanks,
 Ashwin






Re: Restricting the number of iterations in Mllib Kmeans

2015-05-18 Thread Joseph Bradley
Hi Suman,

For maxIterations, are you using the DenseKMeans.scala example code?  (I'm
guessing yes since you mention the command line.)  If so, then you should
be able to specify maxIterations via an extra parameter like
--numIterations 50 (note the example uses numIterations in the current
master instead of maxIterations, which is sort of a bug in the example).
If that does not cap the max iterations, then please report it as a bug.

To specify the initial centroids, you will need to modify the DenseKMeans
example code.  Please see the KMeans API docs for details.

Good luck,
Joseph

On Mon, May 18, 2015 at 3:22 AM, MEETHU MATHEW meethu2...@yahoo.co.in
wrote:

 Hi,
 I think you cant supply an initial set of centroids to kmeans

 Thanks  Regards,
 Meethu M



   On Friday, 15 May 2015 12:37 AM, Suman Somasundar 
 suman.somasun...@oracle.com wrote:


 Hi,,

 I want to run a definite number of iterations in Kmeans.  There is a
 command line argument to set maxIterations, but even if I set it to a
 number, Kmeans runs until the centroids converge.
 Is there a specific way to specify it in command line?

 Also, I wanted to know if we can supply the initial set of centroids to
 the program instead of it choosing the centroids in random?

 Thanks,
 Suman.





Re: Broadcast variables can be rebroadcast?

2015-05-18 Thread Imran Rashid
Rather than updating the broadcast variable, can't you simply create a
new one?  When the old one can be gc'ed in your program, it will also get
gc'ed from spark's cache (and all executors).

I think this will make your code *slightly* more complicated, as you need
to add in another layer of indirection for which broadcast variable to use,
but not too bad.  Eg., from

var myBroadcast = sc.broadcast( ...)
(0 to 20).foreach{ iteration =
  //  ... some rdd operations that involve myBroadcast ...
  myBroadcast.update(...) // wrong! dont' update a broadcast variable
}

instead do something like:

def oneIteration(myRDD: RDD[...], myBroadcastVar: Broadcast[...]): Unit = {
 ...
}

var myBroadcast = sc.broadcast(...)
(0 to 20).foreach { iteration =
  oneIteration(myRDD, myBroadcast)
  var myBroadcast = sc.broadcast(...) // create a NEW broadcast here, with
whatever you need to update it
}

On Sat, May 16, 2015 at 2:01 AM, N B nb.nos...@gmail.com wrote:

 Thanks Ayan. Can we rebroadcast after updating in the driver?

 Thanks
 NB.


 On Fri, May 15, 2015 at 6:40 PM, ayan guha guha.a...@gmail.com wrote:

 Hi

 broadcast variables are shipped for the first time it is accessed in a
 transformation to the executors used by the transformation. It will NOT
 updated subsequently, even if the value has changed. However, a new value
 will be shipped to any new executor comes into play after the value has
 changed. This way, changing value of broadcast variable is not a good idea
 as it can create inconsistency within cluster. From documentatins:

  In addition, the object v should not be modified after it is broadcast
 in order to ensure that all nodes get the same value of the broadcast
 variable


 On Sat, May 16, 2015 at 10:39 AM, N B nb.nos...@gmail.com wrote:

 Thanks Ilya. Does one have to call broadcast again once the underlying
 data is updated in order to get the changes visible on all nodes?

 Thanks
 NB


 On Fri, May 15, 2015 at 5:29 PM, Ilya Ganelin ilgan...@gmail.com
 wrote:

 The broadcast variable is like a pointer. If the underlying data
 changes then the changes will be visible throughout the cluster.
 On Fri, May 15, 2015 at 5:18 PM NB nb.nos...@gmail.com wrote:

 Hello,

 Once a broadcast variable is created using sparkContext.broadcast(),
 can it
 ever be updated again? The use case is for something like the
 underlying
 lookup data changing over time.

 Thanks
 NB




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Broadcast-variables-can-be-rebroadcast-tp22908.html
 Sent from the Apache Spark User List mailing list archive at
 Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





 --
 Best Regards,
 Ayan Guha





Partition number of Spark Streaming Kafka receiver-based approach

2015-05-18 Thread Bill Jay
Hi all,

I am reading the docs of receiver-based Kafka consumer. The last parameters
of KafkaUtils.createStream is per topic number of Kafka partitions to
consume. My question is, does the number of partitions for topic in this
parameter need to match the number of partitions in Kafka.

For example, I have two topics, topic1 with 3 partitions and topic2 with 4
partitions.

If i specify 2 for topic1 and 3 for topic2 and feed them to the
createStream function, will there be data loss? Or it will just be an
inefficiency.

Thanks!

Bill


Re: Partition number of Spark Streaming Kafka receiver-based approach

2015-05-18 Thread Saisai Shao
HI Bill,

You don't need to match the number of thread to the number of partitions in
the specific topic, for example, you have 3 partitions in topic1, but you
only set 2 threads, ideally 1 thread will receive 2 partitions and another
thread for the left one partition, it depends on the scheduling of Kafka
itself, basically the data will not be lost.

But you don't need to set the thread number which is larger than the
partition number, since each partition can only be consumed by one
consumer, so the left threads will be wasted.


2015-05-19 7:46 GMT+08:00 Bill Jay bill.jaypeter...@gmail.com:

 Hi all,

 I am reading the docs of receiver-based Kafka consumer. The last
 parameters of KafkaUtils.createStream is per topic number of Kafka
 partitions to consume. My question is, does the number of partitions for
 topic in this parameter need to match the number of partitions in Kafka.

 For example, I have two topics, topic1 with 3 partitions and topic2 with 4
 partitions.

 If i specify 2 for topic1 and 3 for topic2 and feed them to the
 createStream function, will there be data loss? Or it will just be an
 inefficiency.

 Thanks!

 Bill



Re: FetchFailedException and MetadataFetchFailedException

2015-05-18 Thread Imran Rashid
Hi,

can you take a look at the logs and see what the first error you are
getting is?  Its possible that the file doesn't exist when that error is
produced, but it shows up later -- I've seen similar things happen, but
only after there have already been some errors.  But, if you see that in
the very first error, then Im not sure what the cause is.  Would be
helpful for you to send the logs.

Imran

On Fri, May 15, 2015 at 10:07 AM, rok rokros...@gmail.com wrote:

 I am trying to sort a collection of key,value pairs (between several
 hundred
 million to a few billion) and have recently been getting lots of
 FetchFailedException errors that seem to originate when one of the
 executors doesn't seem to find a temporary shuffle file on disk. E.g.:

 org.apache.spark.shuffle.FetchFailedException:

 /hadoop/tmp/hadoop-hadoop/nm-local-dir/usercache/user/appcache/application_1426230650260_1044/blockmgr-453473e7-76c2-4a94-85d0-d0b75b515ad6/10/shuffle_0_264_0.index
 (No such file or directory)

 This file actually exists:

  ls -l
 
 /hadoop/tmp/hadoop-hadoop/nm-local-dir/usercache/user/appcache/application_1426230650260_1044/blockmgr-453473e7-76c2-4a94-85d0-d0b75b515ad6/10/shuffle_0_264_0.index

 -rw-r--r-- 1 hadoop hadoop 11936 May 15 16:52

 /hadoop/tmp/hadoop-hadoop/nm-local-dir/usercache/user/appcache/application_1426230650260_1044/blockmgr-453473e7-76c2-4a94-85d0-d0b75b515ad6/10/shuffle_0_264_0.index

 This error repeats on several executors and is followed by a number of

 org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output
 location for shuffle 0

 This results on most tasks being lost and executors dying.

 There is plenty of space on all of the appropriate filesystems, so none of
 the executors are running out of disk space. Any idea what might be causing
 this? I am running this via YARN on approximately 100 nodes with 2 cores
 per
 node. Any thoughts on what might be causing these errors? Thanks!



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/FetchFailedException-and-MetadataFetchFailedException-tp22901.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: MLLib SVMWithSGD is failing for large dataset

2015-05-18 Thread Xiangrui Meng
Reducing the number of instances won't help in this case. We use the
driver to collect partial gradients. Even with tree aggregation, it
still puts heavy workload on the driver with 20M features. Please try
to reduce the number of partitions before training. We are working on
a more scalable implementation of logistic regression now, which
should be able to solve this problem efficiently. -Xiangrui

On Tue, Apr 28, 2015 at 3:43 PM, sarathkrishn...@gmail.com
sarathkrishn...@gmail.com wrote:
 Hi,

 I'm just calling the standard SVMWithSGD implementation of Spark's MLLib.
 I'm not using any method like collect.

 Thanks,
 Sarath

 On Tue, Apr 28, 2015 at 4:35 PM, ai he heai0...@gmail.com wrote:

 Hi Sarath,

 It might be questionable to set num-executors as 64 if you only has 8
 nodes. Do you use any action like collect which will overwhelm the
 driver since you have a large dataset?

 Thanks

 On Tue, Apr 28, 2015 at 10:50 AM, sarath sarathkrishn...@gmail.com
 wrote:
 
  I am trying to train a large dataset consisting of 8 million data points
  and
  20 million features using SVMWithSGD. But it is failing after running
  for
  some time. I tried increasing num-partitions, driver-memory,
  executor-memory, driver-max-resultSize. Also I tried by reducing the
  size of
  dataset from 8 million to 25K (keeping number of features same 20 M).
  But
  after using the entire 64GB driver memory for 20 to 30 min it failed.
 
  I'm using a cluster of 8 nodes (each with 8 cores and 64G RAM).
  executor-memory - 60G
  driver-memory - 60G
  num-executors - 64
  And other default settings
 
  This is the error log :
 
  15/04/20 11:51:09 WARN NativeCodeLoader: Unable to load native-hadoop
  library for your platform... using builtin-java classes where applicable
  15/04/20 11:51:29 WARN BLAS: Failed to load implementation from:
  com.github.fommil.netlib.NativeSystemBLAS
  15/04/20 11:51:29 WARN BLAS: Failed to load implementation from:
  com.github.fommil.netlib.NativeRefBLAS
  15/04/20 11:56:11 WARN TransportChannelHandler: Exception in connection
  from
  xxx.xxx.xxx.net/xxx.xxx.xxx.xxx:41029
  java.io.IOException: Connection reset by peer
  at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
  at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
  ...
  15/04/20 11:56:11 ERROR TransportResponseHandler: Still have 7 requests
  outstanding when connection from xxx.xxx.xxx.net/xxx.xxx.xxx.xxx:41029
  is
  closed
  15/04/20 11:56:11 ERROR OneForOneBlockFetcher: Failed while starting
  block
  fetches
  java.io.IOException: Connection reset by peer
  at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
  at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
  ...
  15/04/20 11:56:11 ERROR OneForOneBlockFetcher: Failed while starting
  block
  fetches
  java.io.IOException: Connection reset by peer
  at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
  at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
  ...
  15/04/20 11:56:12 ERROR RetryingBlockFetcher: Exception while beginning
  fetch of 1 outstanding blocks
  java.io.IOException: Failed to connect to
  xxx.xxx.xxx.net/xxx.xxx.xxx.xxx:41029
  at
 
  org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191)
  at
 
  org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
  at
 
  org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78)
  at
 
  org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
  at
 
  org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
  at
 
  org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:87)
  at
 
  org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:149)
  at
 
  org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:290)
  at
 
  org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:53)
  at
  scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
  at
 
  org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
  at
 
  org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
  at
  org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:91)
  at
 
  org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:44)
  at
  org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
  at
  org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
   

Re: StandardScaler failing with OOM errors in PySpark

2015-05-18 Thread Xiangrui Meng
AFAIK, there are two places where you can specify the driver memory.
One is via spark-summit --driver-memory and the other is via
spark.driver.memory in spark-defaults.conf. Please try these
approaches and see whether they work or not. You can find detailed
instructions at http://spark.apache.org/docs/latest/configuration.html
and http://spark.apache.org/docs/latest/submitting-applications.html.
-Xiangrui

On Tue, Apr 28, 2015 at 4:06 AM, Rok Roskar rokros...@gmail.com wrote:
 That's exactly what I'm saying -- I specify the memory options using spark
 options, but this is not reflected in how the JVM is created. No matter
 which memory settings I specify, the JVM for the driver is always made with
 512Mb of memory. So I'm not sure if this is a feature or a bug?

 rok

 On Mon, Apr 27, 2015 at 6:54 PM, Xiangrui Meng men...@gmail.com wrote:

 You might need to specify driver memory in spark-submit instead of
 passing JVM options. spark-submit is designed to handle different
 deployments correctly. -Xiangrui

 On Thu, Apr 23, 2015 at 4:58 AM, Rok Roskar rokros...@gmail.com wrote:
  ok yes, I think I have narrowed it down to being a problem with driver
  memory settings. It looks like the application master/driver is not
  being
  launched with the settings specified:
 
  For the driver process on the main node I see -XX:MaxPermSize=128m
  -Xms512m
  -Xmx512m as options used to start the JVM, even though I specified
 
  'spark.yarn.am.memory', '5g'
  'spark.yarn.am.memoryOverhead', '2000'
 
  The info shows that these options were read:
 
  15/04/23 13:47:47 INFO yarn.Client: Will allocate AM container, with
  7120 MB
  memory including 2000 MB overhead
 
  Is there some reason why these options are being ignored and instead
  starting the driver with just 512Mb of heap?
 
  On Thu, Apr 23, 2015 at 8:06 AM, Rok Roskar rokros...@gmail.com wrote:
 
  the feature dimension is 800k.
 
  yes, I believe the driver memory is likely the problem since it doesn't
  crash until the very last part of the tree aggregation.
 
  I'm running it via pyspark through YARN -- I have to run in client mode
  so
  I can't set spark.driver.memory -- I've tried setting the
  spark.yarn.am.memory and overhead parameters but it doesn't seem to
  have an
  effect.
 
  Thanks,
 
  Rok
 
  On Apr 23, 2015, at 7:47 AM, Xiangrui Meng men...@gmail.com wrote:
 
   What is the feature dimension? Did you set the driver memory?
   -Xiangrui
  
   On Tue, Apr 21, 2015 at 6:59 AM, rok rokros...@gmail.com wrote:
   I'm trying to use the StandardScaler in pyspark on a relatively
   small
   (a few
   hundred Mb) dataset of sparse vectors with 800k features. The fit
   method of
   StandardScaler crashes with Java heap space or Direct buffer memory
   errors.
   There should be plenty of memory around -- 10 executors with 2 cores
   each
   and 8 Gb per core. I'm giving the executors 9g of memory and have
   also
   tried
   lots of overhead (3g), thinking it might be the array creation in
   the
   aggregators that's causing issues.
  
   The bizarre thing is that this isn't always reproducible --
   sometimes
   it
   actually works without problems. Should I be setting up executors
   differently?
  
   Thanks,
  
   Rok
  
  
  
  
   --
   View this message in context:
  
   http://apache-spark-user-list.1001560.n3.nabble.com/StandardScaler-failing-with-OOM-errors-in-PySpark-tp22593.html
   Sent from the Apache Spark User List mailing list archive at
   Nabble.com.
  
  
   -
   To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
   For additional commands, e-mail: user-h...@spark.apache.org
  
 
 



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: bug: numClasses is not a valid argument of LogisticRegressionWithSGD

2015-05-18 Thread Xiangrui Meng
LogisticRegressionWithSGD doesn't support multi-class. Please use
LogisticRegressionWithLBFGS instead. -Xiangrui

On Mon, Apr 27, 2015 at 12:37 PM, Pagliari, Roberto
rpagli...@appcomsci.com wrote:
 With the Python APIs, the available arguments I got (using inspect module)
 are the following:

 ['cls', 'data', 'iterations', 'step', 'miniBatchFraction', 'initialWeights',
 'regParam', 'regType', 'intercept']

 numClasses is not available. Can someone comment on this?

 Thanks,






-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark sql error while writing Parquet file- Trying to write more fields than contained in row

2015-05-18 Thread Chandra Mohan, Ananda Vel Murugan
Hi,

I am using spark-sql to read a CSV file and write it as parquet file. I am 
building the schema using the following code.

String schemaString = a b c;
   ListStructField fields = new ArrayListStructField();
   MetadataBuilder mb = new MetadataBuilder();
   mb.putBoolean(nullable, true);
   Metadata m = mb.build();
   for (String fieldName: schemaString.split( )) {
fields.add(new StructField(fieldName,DataTypes.DoubleType,true, 
m));
   }
   StructType schema = DataTypes.createStructType(fields);

Some of the rows in my input csv does not contain three columns. After building 
my JavaRDDRow, I create data frame as shown below using the RDD and schema.

DataFrame darDataFrame = sqlContext.createDataFrame(rowRDD, schema);

Finally I try to save it as Parquet file

darDataFrame.saveAsParquetFile(/home/anand/output.parquet)

I get this error when saving it as Parquet file

java.lang.IndexOutOfBoundsException: Trying to write more fields than contained 
in row (3  2)

I understand the reason behind this error. Some of my rows in Row RDD does not 
contain three elements as some rows in my input csv does not contain three 
columns. But while building the schema, I am specifying every field as 
nullable. So I believe, it should not throw this error. Can anyone help me fix 
this error. Thank you.

Regards,
Anand.C




Re: Spark Streaming and reducing latency

2015-05-18 Thread Dmitry Goldenberg
Thanks, Akhil. So what do folks typically do to increase/contract the capacity? 
Do you plug in some cluster auto-scaling solution to make this elastic?

Does Spark have any hooks for instrumenting auto-scaling?

In other words, how do you avoid overwheling the receivers in a scenario when 
your system's input can be unpredictable, based on users' activity?

 On May 17, 2015, at 11:04 AM, Akhil Das ak...@sigmoidanalytics.com wrote:
 
 With receiver based streaming, you can actually specify 
 spark.streaming.blockInterval which is the interval at which the receiver 
 will fetch data from the source. Default value is 200ms and hence if your 
 batch duration is 1 second, it will produce 5 blocks of data. And yes, with 
 sparkstreaming when your processing time goes beyond your batch duration and 
 you are having a higher data consumption then you will overwhelm the 
 receiver's memory and hence will throw up block not found exceptions. 
 
 Thanks
 Best Regards
 
 On Sun, May 17, 2015 at 7:21 PM, dgoldenberg dgoldenberg...@gmail.com 
 wrote:
 I keep hearing the argument that the way Discretized Streams work with Spark
 Streaming is a lot more of a batch processing algorithm than true streaming.
 For streaming, one would expect a new item, e.g. in a Kafka topic, to be
 available to the streaming consumer immediately.
 
 With the discretized streams, streaming is done with batch intervals i.e.
 the consumer has to wait the interval to be able to get at the new items. If
 one wants to reduce latency it seems the only way to do this would be by
 reducing the batch interval window. However, that may lead to a great deal
 of churn, with many requests going into Kafka out of the consumers,
 potentially with no results whatsoever as there's nothing new in the topic
 at the moment.
 
 Is there a counter-argument to this reasoning? What are some of the general
 approaches to reduce latency  folks might recommend? Or, perhaps there are
 ways of dealing with this at the streaming API level?
 
 If latency is of great concern, is it better to look into streaming from
 something like Flume where data is pushed to consumers rather than pulled by
 them? Are there techniques, in that case, to ensure the consumers don't get
 overwhelmed with new data?
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-and-reducing-latency-tp22922.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 


Re: Spark's Guava pieces cause exceptions in non-trivial deployments

2015-05-18 Thread Steve Loughran

On 16 May 2015, at 04:39, Anton Brazhnyk 
anton.brazh...@genesys.commailto:anton.brazh...@genesys.com wrote:

For me it wouldn’t help I guess, because those newer classes would still be 
loaded by different classloader.
What did work for me with 1.3.1 – removing of those classes from Spark’s jar 
completely, so they get loaded from external Guava (the version I prefer) and 
by the classloader I expect.


Note that Hadoop = 2.6.0 wont' work with Guava = 0.17; see: HADOOP-11032

FWIW Guava is a version nightmare across the hadoop stack; almost as bad as 
protobuf.jar. With Hadoop 2.7+, Hadoop will run on later versions, it'll just 
continue to ship an older one to avoid breaking apps that expect it.


NullPointerException when accessing broadcast variable in DStream

2015-05-18 Thread hotienvu
Hi I'm trying to use broadcast variables in my Spark streaming program.

 val conf = new
SparkConf().setMaster(SPARK_MASTER).setAppName(APPLICATION_NAME)
 val ssc = new StreamingContext(conf, Seconds(1))
 
 val LIMIT = ssc.sparkContext.broadcast(5L)
 println(LIMIT.value) // this print 5
  val lines = ssc.socketTextStream(localhost, )
  val words = lines.flatMap(_.split( )) filter (_.size  LIMIT.value)
  words.print()
  ssc.start()
  ssc.awaitTermination()

It throws java.lang.NullPointerException at the line (_.size  LIMIT.value)
so I'm guessing LIMIT is not accessible within the stream

I'm running spark 1.3.1 in standalone mode with 2 nodes cluster. I tried
with spark-shell and it works fine. Please help!

Thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/NullPointerException-when-accessing-broadcast-variable-in-DStream-tp22934.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Restricting the number of iterations in Mllib Kmeans

2015-05-18 Thread MEETHU MATHEW
Hi,I think you cant supply an initial set of centroids to kmeans Thanks  
Regards,
Meethu M 


 On Friday, 15 May 2015 12:37 AM, Suman Somasundar 
suman.somasun...@oracle.com wrote:
   

 !--#yiv5602900621 _filtered #yiv5602900621 {font-family:Cambria 
Math;panose-1:2 4 5 3 5 4 6 3 2 4;} _filtered #yiv5602900621 
{font-family:Calibri;panose-1:2 15 5 2 2 2 4 3 2 4;}#yiv5602900621 
#yiv5602900621 p.yiv5602900621MsoNormal, #yiv5602900621 
li.yiv5602900621MsoNormal, #yiv5602900621 div.yiv5602900621MsoNormal 
{margin:0in;margin-bottom:.0001pt;font-size:11.0pt;font-family:Calibri, 
sans-serif;}#yiv5602900621 a:link, #yiv5602900621 
span.yiv5602900621MsoHyperlink 
{color:blue;text-decoration:underline;}#yiv5602900621 a:visited, #yiv5602900621 
span.yiv5602900621MsoHyperlinkFollowed 
{color:purple;text-decoration:underline;}#yiv5602900621 
span.yiv5602900621EmailStyle17 {font-family:Calibri, 
sans-serif;color:windowtext;}#yiv5602900621 .yiv5602900621MsoChpDefault {} 
_filtered #yiv5602900621 {margin:1.0in 1.0in 1.0in 1.0in;}#yiv5602900621 
div.yiv5602900621WordSection1 {}--Hi,,

I want to run a definite number of iterations in Kmeans.  There is a command 
line argument to set maxIterations, but even if I set it to a number, Kmeans 
runs until the centroids converge. Is there a specific way to specify it in 
command line?
Also, I wanted to know if we can supply the initial set of centroids to the 
program instead of it choosing the centroids in random?  Thanks,
Suman.

  

RE: Spark Streaming and reducing latency

2015-05-18 Thread Evo Eftimov
You can use

 


spark.streaming.receiver.maxRate

not set

Maximum rate (number of records per second) at which each receiver will receive 
data. Effectively, each stream will consume at most this number of records per 
second. Setting this configuration to 0 or a negative number will put no limit 
on the rate. See the deployment guide 
https://spark.apache.org/docs/latest/streaming-programming-guide.html#deploying-applications
  in the Spark Streaming programing guide for mode details.

 

 

Another way is to implement a feedback loop in your receivers monitoring the 
performance metrics of your application/job and based on that adjusting 
automatically the receiving rate – BUT all these have nothing to do  with 
“reducing the latency” – they simply prevent your application/job from clogging 
up – the nastier effect of which is when S[ark Streaming starts removing In 
Memory RDDs from RAM before they are processed by the job – that works fine in 
Spark Batch (ie removing RDDs from RAM based on LRU) but in Spark Streaming 
when done in this “unceremonious way” it simply Crashes the application

 

From: Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com] 
Sent: Monday, May 18, 2015 11:46 AM
To: Akhil Das
Cc: user@spark.apache.org
Subject: Re: Spark Streaming and reducing latency

 

Thanks, Akhil. So what do folks typically do to increase/contract the capacity? 
Do you plug in some cluster auto-scaling solution to make this elastic?

 

Does Spark have any hooks for instrumenting auto-scaling?

In other words, how do you avoid overwheling the receivers in a scenario when 
your system's input can be unpredictable, based on users' activity?


On May 17, 2015, at 11:04 AM, Akhil Das ak...@sigmoidanalytics.com wrote:

With receiver based streaming, you can actually specify 
spark.streaming.blockInterval which is the interval at which the receiver will 
fetch data from the source. Default value is 200ms and hence if your batch 
duration is 1 second, it will produce 5 blocks of data. And yes, with 
sparkstreaming when your processing time goes beyond your batch duration and 
you are having a higher data consumption then you will overwhelm the receiver's 
memory and hence will throw up block not found exceptions. 




Thanks

Best Regards

 

On Sun, May 17, 2015 at 7:21 PM, dgoldenberg dgoldenberg...@gmail.com wrote:

I keep hearing the argument that the way Discretized Streams work with Spark
Streaming is a lot more of a batch processing algorithm than true streaming.
For streaming, one would expect a new item, e.g. in a Kafka topic, to be
available to the streaming consumer immediately.

With the discretized streams, streaming is done with batch intervals i.e.
the consumer has to wait the interval to be able to get at the new items. If
one wants to reduce latency it seems the only way to do this would be by
reducing the batch interval window. However, that may lead to a great deal
of churn, with many requests going into Kafka out of the consumers,
potentially with no results whatsoever as there's nothing new in the topic
at the moment.

Is there a counter-argument to this reasoning? What are some of the general
approaches to reduce latency  folks might recommend? Or, perhaps there are
ways of dealing with this at the streaming API level?

If latency is of great concern, is it better to look into streaming from
something like Flume where data is pushed to consumers rather than pulled by
them? Are there techniques, in that case, to ensure the consumers don't get
overwhelmed with new data?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-and-reducing-latency-tp22922.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

 



Re: Spark sql error while writing Parquet file- Trying to write more fields than contained in row

2015-05-18 Thread ayan guha
Hi

Give a try with dtaFrame.fillna function to fill up missing column

Best
Ayan

On Mon, May 18, 2015 at 8:29 PM, Chandra Mohan, Ananda Vel Murugan 
ananda.muru...@honeywell.com wrote:

  Hi,



 I am using spark-sql to read a CSV file and write it as parquet file. I am
 building the schema using the following code.



 String schemaString = a b c;

ListStructField fields = *new* ArrayListStructField();

MetadataBuilder mb = *new* MetadataBuilder();

mb.putBoolean(nullable, *true*);

Metadata m = mb.build();

*for* (String fieldName: schemaString.split( )) {

 fields.add(*new* StructField(fieldName,DataTypes.
 *DoubleType*,*true*, m));

}

StructType schema = DataTypes.*createStructType*(fields);



 Some of the rows in my input csv does not contain three columns. After
 building my JavaRDDRow, I create data frame as shown below using the
 RDD and schema.



 DataFrame darDataFrame = sqlContext.createDataFrame(rowRDD, schema);



 Finally I try to save it as Parquet file



 darDataFrame.saveAsParquetFile(/home/anand/output.parquet”)



 I get this error when saving it as Parquet file



 java.lang.IndexOutOfBoundsException: Trying to write more fields than
 contained in row (3  2)



 I understand the reason behind this error. Some of my rows in Row RDD does
 not contain three elements as some rows in my input csv does not contain
 three columns. But while building the schema, I am specifying every field
 as nullable. So I believe, it should not throw this error. Can anyone help
 me fix this error. Thank you.



 Regards,

 Anand.C








-- 
Best Regards,
Ayan Guha


RE: Spark Streaming and reducing latency

2015-05-18 Thread Evo Eftimov
And if you want to genuinely “reduce the latency” (still within the boundaries 
of the micro-batch) THEN you need to design and finely tune the Parallel 
Programming / Execution Model of your application. The objective/metric here is:

 

a)  Consume all data within your selected micro-batch window WITHOUT any 
artificial message rate limits

b)  The above will result in a certain size of Dstream RDD per micro-batch. 

c)   The objective now is to Process that RDD WITHIN the time of the 
micro-batch (and also account for temporary message rate spike etc which may 
further increase the size of the RDD) – this will avoid any clogging up of the 
app and will process your messages at the lowest latency possible in a 
micro-batch architecture 

d)  You achieve the objective stated in c by designing, varying and 
experimenting with various aspects of the Spark Streaming Parallel Programming 
and Execution Model – e.g. number of receivers, number of threads per receiver, 
number of executors, number of cores, RAM allocated to executors, number of RDD 
partitions which correspond to the number of parallel threads operating on the 
RDD etc etc  

 

Re the “unceremonious removal of DStream RDDs” from RAM by Spark Streaming when 
the available RAM is exhausted due to high message rate and which crashes your 
(hence clogged up) application the name of the condition is:

 

Loss was due to java.lang.Exception   

java.lang.Exception: Could not compute split, block
input-4-1410542878200 not found

 

From: Evo Eftimov [mailto:evo.efti...@isecc.com] 
Sent: Monday, May 18, 2015 12:13 PM
To: 'Dmitry Goldenberg'; 'Akhil Das'
Cc: 'user@spark.apache.org'
Subject: RE: Spark Streaming and reducing latency

 

You can use

 


spark.streaming.receiver.maxRate

not set

Maximum rate (number of records per second) at which each receiver will receive 
data. Effectively, each stream will consume at most this number of records per 
second. Setting this configuration to 0 or a negative number will put no limit 
on the rate. See the deployment guide 
https://spark.apache.org/docs/latest/streaming-programming-guide.html#deploying-applications
  in the Spark Streaming programing guide for mode details.

 

 

Another way is to implement a feedback loop in your receivers monitoring the 
performance metrics of your application/job and based on that adjusting 
automatically the receiving rate – BUT all these have nothing to do  with 
“reducing the latency” – they simply prevent your application/job from clogging 
up – the nastier effect of which is when S[ark Streaming starts removing In 
Memory RDDs from RAM before they are processed by the job – that works fine in 
Spark Batch (ie removing RDDs from RAM based on LRU) but in Spark Streaming 
when done in this “unceremonious way” it simply Crashes the application

 

From: Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com] 
Sent: Monday, May 18, 2015 11:46 AM
To: Akhil Das
Cc: user@spark.apache.org
Subject: Re: Spark Streaming and reducing latency

 

Thanks, Akhil. So what do folks typically do to increase/contract the capacity? 
Do you plug in some cluster auto-scaling solution to make this elastic?

 

Does Spark have any hooks for instrumenting auto-scaling?

In other words, how do you avoid overwheling the receivers in a scenario when 
your system's input can be unpredictable, based on users' activity?


On May 17, 2015, at 11:04 AM, Akhil Das ak...@sigmoidanalytics.com wrote:

With receiver based streaming, you can actually specify 
spark.streaming.blockInterval which is the interval at which the receiver will 
fetch data from the source. Default value is 200ms and hence if your batch 
duration is 1 second, it will produce 5 blocks of data. And yes, with 
sparkstreaming when your processing time goes beyond your batch duration and 
you are having a higher data consumption then you will overwhelm the receiver's 
memory and hence will throw up block not found exceptions. 




Thanks

Best Regards

 

On Sun, May 17, 2015 at 7:21 PM, dgoldenberg dgoldenberg...@gmail.com wrote:

I keep hearing the argument that the way Discretized Streams work with Spark
Streaming is a lot more of a batch processing algorithm than true streaming.
For streaming, one would expect a new item, e.g. in a Kafka topic, to be
available to the streaming consumer immediately.

With the discretized streams, streaming is done with batch intervals i.e.
the consumer has to wait the interval to be able to get at the new items. If
one wants to reduce latency it seems the only way to do this would be by
reducing the batch interval window. However, that may lead to a great deal
of churn, with many requests going into Kafka out of the consumers,
potentially with no results whatsoever as there's nothing new in the topic
at the moment.

Is there a counter-argument to this reasoning? What are some of the general
approaches to reduce latency  folks might recommend? Or, perhaps there are
ways of dealing with