Re: Spark can't pickle class: error cannot lookup attribute

2015-02-19 Thread Guillaume Guy
Thanks Davies and Eric. I followed Davies' instructions and it works
wonderful.

I would add that you can also add these scripts in the pyspark shell too:

pyspark --py-files support.py

where support.py is your script containing your class as Davies described.




Best,

Guillaume Guy

* +1 919 - 972 - 8750*

On Wed, Feb 18, 2015 at 11:48 PM, Davies Liu dav...@databricks.com wrote:

 Currently, PySpark can not support pickle a class object in current
 script ( '__main__'), the workaround could be put the implementation
 of the class into a separate module, then use bin/spark-submit
 --py-files xxx.py in deploy it.

 in xxx.py:

 class test(object):
   def __init__(self, a, b):
 self.total = a + b

 in job.py:

 from xxx import test
 a = sc.parallelize([(True,False),(False,False)])
 a.map(lambda (x,y): test(x,y))

 run it by:

 bin/spark-submit --py-files xxx.py job.py


 On Wed, Feb 18, 2015 at 1:48 PM, Guillaume Guy
 guillaume.c@gmail.com wrote:
  Hi,
 
  This is a duplicate of the stack-overflow question here. I hope to
 generate
  more interest  on this mailing list.
 
 
  The problem:
 
  I am running into some attribute lookup problems when trying to initiate
 a
  class within my RDD.
 
  My workflow is quite standard:
 
  1- Start with an RDD
 
  2- Take each element of the RDD, initiate an object for each
 
  3- Reduce (I will write a method that will define the reduce operation
 later
  on)
 
  Here is #2:
 
  class test(object):
  def __init__(self, a,b):
  self.total = a + b
 
  a = sc.parallelize([(True,False),(False,False)])
  a.map(lambda (x,y): test(x,y))
 
  Here is the error I get:
 
  PicklingError: Can't pickle  class 'main.test' : attribute lookup
  main.test failed
 
  I'd like to know if there is any way around it. Please, answer with a
  working example to achieve the intended results (i.e. creating a RDD of
  objects of class tests).
 
  Thanks in advance!
 
  Related question:
 
  https://groups.google.com/forum/#!topic/edx-code/9xzRJFyQwn
 
 
  GG
 



Re: SchemaRDD.select

2015-02-19 Thread Michael Armbrust
The trick here is getting the scala compiler to do the implicit conversion
from Symbol - Column.  In your second example, the compiler doesn't know
that you are going to try and use the Seq[Symbol] as a Seq[Column] and so
doesn't do the conversion.  The following are other ways to provide enough
info to make sure it happens:

// Explicitly create columns
val columnList = Seq($field1, $field2)

// Explicitly type the Seq
val columnList: Seq[Column] = Seq('field1, 'field2)


On Thu, Feb 19, 2015 at 12:09 PM, Cesar Flores ces...@gmail.com wrote:

 Well:

 I think that I solved my issue in the next way:


 val variable_fieldsStr = List(field1,field2)
 val variable_argument_list= variable_fieldsStr.map(f = Alias(Symbol(f),
 f)())

 val schm2 = myschemaRDD.select(variable_argument_list:_*)


 schm2 seems to have the required fields, but would like to hear the
 opinion of an expert about it.



 Thanks

 On Thu, Feb 19, 2015 at 12:01 PM, Cesar Flores ces...@gmail.com wrote:


 I am trying to pass a variable number of arguments to the select function
 of a SchemaRDD I created, as I want to select the fields in run time:


 val variable_argument_list = List('field1,'field2')

 val schm1 = myschemaRDD.select('field1,'field2) // works
 val schm2 = myschemaRDD.select(variable_argument_list:_*) // do not work


 I am interested in selecting in run time the fields
 from myschemaRDD variable. However, the usual way of passing variable
 number of arguments as a List in Scala fails.

 Is there a way of selecting a variable number of arguments in the select
 function? If not, what will be a better approach for selecting the required
 fields in run time?



 Thanks in advance for your help
 --
 Cesar Flores




 --
 Cesar Flores



Streaming Linear Regression

2015-02-19 Thread barisak
Hi 

I tried to run Streaming Linear Regression in my local. 

val trainingData =
ssc.textFileStream(/home/barisakgu/Desktop/Spark/train).map(LabeledPoint.parse)

textFileStream is not seeing the new files. I search on the Internet, and I
saw that somebody has same issue but no solution is found for that. 

Is there any opinion for this ? Is there any body who can achieve  the
running streaming linear regression ? 

Thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Linear-Regression-tp21726.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark job fails on cluster but works fine on a single machine

2015-02-19 Thread Pavel Velikhov
Yeah, I do manually delete the files, but it still fails with this error.

 On Feb 19, 2015, at 8:16 PM, Ganelin, Ilya ilya.gane...@capitalone.com 
 wrote:
 
 When writing to hdfs Spark will not overwrite existing files or directories. 
 You must either manually delete these or use Java's Hadoop FileSystem class 
 to remove them.
 
 
 
 Sent with Good (www.good.com)
 
 
 -Original Message-
 From: Pavel Velikhov [pavel.velik...@gmail.com 
 mailto:pavel.velik...@gmail.com]
 Sent: Thursday, February 19, 2015 11:32 AM Eastern Standard Time
 To: user@spark.apache.org
 Subject: Spark job fails on cluster but works fine on a single machine
 
 I have a simple Spark job that goes out to Cassandra, runs a pipe and stores 
 results:
 
 val sc = new SparkContext(conf)
 val rdd = sc.cassandraTable(“keyspace, “table)
   .map(r = r.getInt(“column) + \t + 
 write(get_lemmas(r.getString(tags
   .pipe(python3 /tmp/scripts_and_models/scripts/run.py)
   .map(r = convertStr(r) )
   .coalesce(1,true)
   .saveAsTextFile(/tmp/pavel/CassandraPipeTest.txt)
   //.saveToCassandra(“keyspace, “table, SomeColumns(“id”,data”))
 
 When run on a single machine, everything is fine if I save to an hdfs file or 
 save to Cassandra.
 When run in cluster neither works:
 
  - When saving to file, I get an exception: User class threw exception: 
 Output directory hdfs://hadoop01:54310/tmp/pavel/CassandraPipeTest.txt 
 hdfs://hadoop01:54310/tmp/pavel/CassandraPipeTest.txt already exists
  - When saving to Cassandra, only 4 rows are updated with empty data (I test 
 on a 4-machine Spark cluster)
 
 Any hints on how to debug this and where the problem could be?
 
 - I delete the hdfs file before running
 - Would really like the output to hdfs to work, so I can debug
 - Then it would be nice to save to Cassandra
 
 The information contained in this e-mail is confidential and/or proprietary 
 to Capital One and/or its affiliates. The information transmitted herewith is 
 intended only for use by the individual or entity to which it is addressed.  
 If the reader of this message is not the intended recipient, you are hereby 
 notified that any review, retransmission, dissemination, distribution, 
 copying or other use of, or taking of any action in reliance upon this 
 information is strictly prohibited. If you have received this communication 
 in error, please contact the sender and delete the material from your 
 computer.



Spark streaming program with Tranquility/Druid

2015-02-19 Thread Jaal
Hello all,

I am trying to integrate Spark with Tranquility
(https://github.com/metamx/tranquility) and when I start the Spark program,
I get the error below:

java.lang.NoClassDefFoundError: org/codehaus/jackson/annotate/JsonClass
at
org.codehaus.jackson.map.introspect.JacksonAnnotationIntrospector.findDeserializationType(JacksonAnnotationIntrospector.java:524)
at
org.codehaus.jackson.map.deser.BasicDeserializerFactory.modifyTypeByAnnotation(BasicDeserializerFactory.java:732)
at
org.codehaus.jackson.map.deser.BeanDeserializerFactory.createBeanDeserializer(BeanDeserializerFactory.java:427)
at
org.codehaus.jackson.map.deser.StdDeserializerProvider._createDeserializer(StdDeserializerProvider.java:398)
at
org.codehaus.jackson.map.deser.StdDeserializerProvider._createAndCache2(StdDeserializerProvider.java:307)
at
org.codehaus.jackson.map.deser.StdDeserializerProvider._createAndCacheValueDeserializer(StdDeserializerProvider.java:287)

.


The Tranquility library uses Jackson 2.4.4 (as does Druid) while I think
something in Spark is using jackson-core-asl which is the deprecated version
of the Jackson library. 

I am using sbt assembly to create an assembly file to use via spark-submit
and I tried to force it to add the older version containing the package
referenced in the error above. I see that the assembly jar has that package
now but it is still being ignored by Spark.

Any idea how to go about finding a solution?

Thanks.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-program-with-Tranquility-Druid-tp21725.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Unknown sample in Naive Baye's

2015-02-19 Thread Xiangrui Meng
If you know there are data doesn't belong to any existing category,
put them into the training set and make a new category for them. It
won't help much if instances from this unknown category are all
outliers. In that case, lower the thresholds and tune the parameters
to get a lower error rate. -Xiangrui

On Thu, Feb 19, 2015 at 8:58 AM, Jatinpreet Singh jatinpr...@gmail.com wrote:
 Hi Xiangrui,

 Thanks for the answer. The problem is that in my application, I can not stop
 user from scoring any type of sample against trained model.

 So, even if the class of a completely unknown sample has not been trained,
 the model will put it in one of the categories with high priority. I wish to
 eliminate this with come kind of probability threshold. Is this possible in
 any way with Naive Baye's? Can changing the classification algorithm help in
 this regard?

 I appreciate any help on this.

 Thanks,
 Jatin

 On Wed, Feb 18, 2015 at 3:07 AM, Xiangrui Meng men...@gmail.com wrote:

 If there exists a sample that doesn't not belong to A/B/C, it means
 that there exists another class D or Unknown besides A/B/C. You should
 have some of these samples in the training set in order to let naive
 Bayes learn the priors. -Xiangrui

 On Tue, Feb 10, 2015 at 10:44 PM, jatinpreet jatinpr...@gmail.com wrote:
  Hi,
 
  I am using MLlib's Naive Baye's classifier to classify textual data. I
  am
  accessing the posterior probabilities through a hack for each class.
 
  Once I have trained the model, I want to remove documents whose
  confidence
  of classification is low. Say for a document, if the highest class
  probability is lesser than a pre-defined threshold(separate for each
  class),
  categorize this document as 'unknown'.
 
  Say there are three classes A, B and C with thresholds 0.35, 0.32 and
  0.33
  respectively defined after training and testing. If I score a sample
  that
  belongs to neither of the three categories, I wish to classify it as
  'unknown'. But the issue is I can get a probability higher than these
  thresholds for a document that doesn't belong to the trained categories.
 
  Is there any technique which I can apply to segregate documents that
  belong
  to untrained classes with certain degree of confidence?
 
  Thanks
 
 
 
  --
  View this message in context:
  http://apache-spark-user-list.1001560.n3.nabble.com/Unknown-sample-in-Naive-Baye-s-tp21594.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 




 --
 Regards,
 Jatinpreet Singh

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: stack map functions in a loop (pyspark)

2015-02-19 Thread Davies Liu
On Thu, Feb 19, 2015 at 7:57 AM, jamborta jambo...@gmail.com wrote:
 Hi all,

 I think I have run into an issue on the lazy evaluation of variables in
 pyspark, I have to following

 functions = [func1, func2, func3]

 for counter in range(len(functions)):
 data = data.map(lambda value: [functions[counter](value)])

You need to create a wrapper for counter:

def mapper(f):
  return lambda v: [f(v)]

for f in functions:
data = data.map(mapper(f))

 it looks like that the counter is evaluated when the RDD is computed, so it
 fills in all the three mappers with the last value of it. Is there any way
 to get it forced to be evaluated at the time? (I am aware that I could run
 persist it after each step, which sounds a bit of a waste)

 thanks,



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/stack-map-functions-in-a-loop-pyspark-tp21722.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: SchemaRDD.select

2015-02-19 Thread Cesar Flores
Well:

I think that I solved my issue in the next way:


val variable_fieldsStr = List(field1,field2)
val variable_argument_list= variable_fieldsStr.map(f = Alias(Symbol(f),
f)())

val schm2 = myschemaRDD.select(variable_argument_list:_*)


schm2 seems to have the required fields, but would like to hear the opinion
of an expert about it.



Thanks

On Thu, Feb 19, 2015 at 12:01 PM, Cesar Flores ces...@gmail.com wrote:


 I am trying to pass a variable number of arguments to the select function
 of a SchemaRDD I created, as I want to select the fields in run time:


 val variable_argument_list = List('field1,'field2')

 val schm1 = myschemaRDD.select('field1,'field2) // works
 val schm2 = myschemaRDD.select(variable_argument_list:_*) // do not work


 I am interested in selecting in run time the fields
 from myschemaRDD variable. However, the usual way of passing variable
 number of arguments as a List in Scala fails.

 Is there a way of selecting a variable number of arguments in the select
 function? If not, what will be a better approach for selecting the required
 fields in run time?



 Thanks in advance for your help
 --
 Cesar Flores




-- 
Cesar Flores


spark-sql problem with textfile separator

2015-02-19 Thread sparkino
Hello everybody,
I'm quite new to Spark and Scala as well and I was trying to analyze some
csv data via spark-sql 

My csv file contains data like this



Following the example at this link below
https://spark.apache.org/docs/latest/sql-programming-guide.html#inferring-the-schema-using-reflection

I have that code:


When I try to fetch val name_address the second field (address) does not
contain all the text. The issue is probably the comma, it is at the same
time the split character and a string value of the third column (address) of
the csv file.

How can handle this?

Thank you in advance
Sparkino



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-sql-problem-with-textfile-separator-tp21718.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Unzipping large files and 2GB partition size.

2015-02-19 Thread Joe Wass
Thanks for your reply Sean.

Looks like it's happening in a map:

15/02/18 20:35:52 INFO scheduler.DAGScheduler: Submitting 100 missing tasks
from Stage 1 (MappedRDD[17] at mapToPair at
NativeMethodAccessorImpl.java:-2)

That's my initial 'parse' stage, done before repartitioning. It reduces the
data size significantly so I thought it would be sensible to do before
repartitioning, which involves moving lots of data around. That might be a
stupid idea in hindsight!

So the obvious thing to try would be to try repartitioning before the map
as the first transformation. I would have done that if I could be sure that
it would succeed or fail quickly.

I'm not entirely clear about the lazy execution of transformations in DAG.
It could be that the error is manifesting during the mapToPair, but caused
by the earlier read from text file stage.

Thanks for pointers to those compression formats. I'll give them a go
(although it's not trivial to re-encode 200 GB of data on S3, so if I can
get this working reasonably with gzip I'd like to).

Any advice about whether this error can be worked round with an early
partition?

Cheers

Joe


On 19 February 2015 at 09:51, Sean Owen so...@cloudera.com wrote:

 gzip and zip are not splittable compression formats; bzip and lzo are.
 Ideally, use a splittable compression format.

 Repartitioning is not a great solution since it means a shuffle, typically.

 This is not necessarily related to how big your partitions are. The
 question is, when does this happen? what operation?

 On Thu, Feb 19, 2015 at 9:35 AM, Joe Wass jw...@crossref.org wrote:
  On the advice of some recent discussions on this list, I thought I would
 try
  and consume gz files directly. I'm reading them, doing a preliminary map,
  then repartitioning, then doing normal spark things.
 
  As I understand it, zip files aren't readable in partitions because of
 the
  format, so I thought that repartitioning would be the next best thing for
  parallelism. I have about 200 files, some about 1GB compressed and some
 over
  2GB uncompressed.
 
  I'm hitting the 2GB maximum partition size. It's been discussed on this
 list
  (topic: 2GB limit for partitions?, tickets SPARK-1476 and SPARK-1391).
  Stack trace at the end. This happened at 10 hours in (probably when it
 saw
  its first file). I can't just re-run it quickly!
 
  Does anyone have any advice? Might I solve this by re-partitioning as the
  first step after reading the file(s)? Or is it effectively impossible to
  read a gz file that expands to over 2GB? Does anyone have any experience
  with this?
 
  Thanks in advance
 
  Joe
 
  Stack trace:
 
  Exception in thread main 15/02/18 20:44:25 INFO
 scheduler.TaskSetManager:
  Lost task 5.3 in stage 1.0 (TID 283) on executor:
  java.lang.IllegalArgumentException (Size exceeds Integer.MAX_VALUE)
  [duplicate 6]
  org.apache.spark.SparkException: Job aborted due to stage failure: Task
 2 in
  stage 1.0 failed 4 times, most recent failure: Lost task 2.3 in stage
 1.0:
  java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
  at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:829)
  at
 org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123)
  at
 org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132)
  at
  org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517)
  at
  org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:432)
  at
 org.apache.spark.storage.BlockManager.get(BlockManager.scala:618)
  at
 org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:44)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:245)



In a Spark Streaming application, what might be the potential causes for util.AkkaUtils: Error sending message in 1 attempts and java.util.concurrent.TimeoutException: Futures timed out and

2015-02-19 Thread Emre Sevinc
Hello,

We have a Spark Streaming application that watches an input directory, and
as files are copied there the application reads them and sends the contents
to a RESTful web service, receives a response and write some contents to an
output directory.

When testing the application by copying a few thousand files at once to its
input directory, we have realized that after having processed about 3800
files, it creates messages as the following in the log file:

15/02/19 10:22:06 INFO storage.MemoryStore: Block broadcast_17935 of size
9960 dropped from memory (free 447798720)
15/02/19 10:22:55 WARN util.AkkaUtils: Error sending message in 1 attempts
java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at
scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:107)
at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:187)
at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:398)

and then the Spark Streaming application dies.

What might be the potential causes to check for such errors?

Below you can see last few lines before it dies:


15/02/19 10:22:03 INFO broadcast.TorrentBroadcast: Started reading
broadcast variable 12894
15/02/19 10:22:04 INFO storage.MemoryStore: ensureFreeSpace(20978) called
with curMem=107884847, maxMem=556038881
15/02/19 10:22:04 INFO storage.MemoryStore: Block broadcast_12894_piece0
stored as bytes in memory (estimated size 20.5 KB, free 427.4 MB)
15/02/19 10:22:04 INFO storage.BlockManagerMaster: Updated info of block
broadcast_12894_piece0
15/02/19 10:22:04 INFO broadcast.TorrentBroadcast: Reading broadcast
variable 12894 took 460 ms
15/02/19 10:22:04 INFO storage.MemoryStore: ensureFreeSpace(347363) called
with curMem=107905825, maxMem=556038881
15/02/19 10:22:04 INFO storage.MemoryStore: Block broadcast_12894 stored as
values in memory (estimated size 339.2 KB, free 427.0 MB)
15/02/19 10:22:04 INFO storage.MemoryStore: ensureFreeSpace(1079) called
with curMem=108253188, maxMem=556038881
15/02/19 10:22:04 INFO storage.MemoryStore: Block rdd_30466_35 stored as
bytes in memory (estimated size 1079.0 B, free 427.0 MB)
15/02/19 10:22:04 INFO storage.BlockManagerMaster: Updated info of block
rdd_30466_35
15/02/19 10:22:05 INFO storage.MemoryStore: ensureFreeSpace(5) called with
curMem=108254267, maxMem=556038881
15/02/19 10:22:05 INFO storage.MemoryStore: Block rdd_30467_35 stored as
bytes in memory (estimated size 5.0 B, free 427.0 MB)
15/02/19 10:22:05 INFO storage.BlockManagerMaster: Updated info of block
rdd_30467_35
15/02/19 10:22:05 INFO executor.Executor: Finished task 35.0 in stage 351.0
(TID 12229). 2353 bytes result sent to driver
15/02/19 10:22:06 INFO storage.BlockManager: Removing broadcast 17935
15/02/19 10:22:06 INFO storage.BlockManager: Removing block
broadcast_17935_piece0
15/02/19 10:22:06 INFO storage.MemoryStore: Block broadcast_17935_piece0 of
size 4151 dropped from memory (free 447788760)
15/02/19 10:22:06 INFO storage.BlockManagerMaster: Updated info of block
broadcast_17935_piece0
15/02/19 10:22:06 INFO storage.BlockManager: Removing block broadcast_17935
15/02/19 10:22:06 INFO storage.MemoryStore: Block broadcast_17935 of size
9960 dropped from memory (free 447798720)
15/02/19 10:22:55 WARN util.AkkaUtils: Error sending message in 1 attempts
java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at
scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:107)
at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:187)
at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:398)
15/02/19 10:23:28 WARN util.AkkaUtils: Error sending message in 2 attempts
java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at
scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:107)
at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:187)
at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:398)
15/02/19 10:24:01 WARN util.AkkaUtils: Error sending message in 3 attempts

Unzipping large files and 2GB partition size.

2015-02-19 Thread Joe Wass
On the advice of some recent discussions on this list, I thought I would
try and consume gz files directly. I'm reading them, doing a preliminary
map, then repartitioning, then doing normal spark things.

As I understand it, zip files aren't readable in partitions because of the
format, so I thought that repartitioning would be the next best thing for
parallelism. I have about 200 files, some about 1GB compressed and some
over 2GB uncompressed.

I'm hitting the 2GB maximum partition size. It's been discussed on this
list (topic: 2GB limit for partitions?, tickets SPARK-1476 and
SPARK-1391).  Stack trace at the end. This happened at 10 hours in
(probably when it saw its first file). I can't just re-run it quickly!

Does anyone have any advice? Might I solve this by re-partitioning as the
first step after reading the file(s)? Or is it effectively impossible to
read a gz file that expands to over 2GB? Does anyone have any experience
with this?

Thanks in advance

Joe

Stack trace:

Exception in thread main 15/02/18 20:44:25 INFO scheduler.TaskSetManager:
Lost task 5.3 in stage 1.0 (TID 283) on executor:
java.lang.IllegalArgumentException (Size exceeds Integer.MAX_VALUE)
[duplicate 6]
org.apache.spark.SparkException: Job aborted due to stage failure: Task 2
in stage 1.0 failed 4 times, most recent failure: Lost task 2.3 in stage
1.0: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:829)
at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123)
at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132)
at
org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517)
at
org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:432)
at org.apache.spark.storage.BlockManager.get(BlockManager.scala:618)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:44)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:245)


Re: loads of memory still GC overhead limit exceeded

2015-02-19 Thread Sean Owen
This should result in 4 executors, not 25. They should be able to
execute 4*4 = 16 tasks simultaneously. You have them grab 4*32 = 128GB
of RAM, not 1TB.

It still feels like this shouldn't be running out of memory, not by a
long shot though. But just pointing out potential differences between
what you are expecting and what you are configuring.

On Thu, Feb 19, 2015 at 9:56 AM, Antony Mayi
antonym...@yahoo.com.invalid wrote:
 Hi,

 I have 4 very powerful boxes (256GB RAM, 32 cores each). I am running spark
 1.2.0 in yarn-client mode with following layout:

 spark.executor.cores=4
 spark.executor.memory=28G
 spark.yarn.executor.memoryOverhead=4096

 I am submitting bigger ALS trainImplicit task (rank=100, iters=15) on a
 dataset with ~3 billion of ratings using 25 executors. At some point some
 executor crashes with:

 15/02/19 05:41:06 WARN util.AkkaUtils: Error sending message in 1 attempts
 java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
 at
 scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
 at
 scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
 at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
 at
 scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
 at scala.concurrent.Await$.result(package.scala:107)
 at
 org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:187)
 at
 org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:398)
 15/02/19 05:41:06 ERROR executor.Executor: Exception in task 131.0 in stage
 51.0 (TID 7259)
 java.lang.OutOfMemoryError: GC overhead limit exceeded
 at java.lang.reflect.Array.newInstance(Array.java:75)
 at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1671)
 at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1345)
 at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1707)
 at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1345)

 So the GC overhead limit exceeded is pretty clear and would suggest running
 out of memory. Since I have 1TB of RAM available this must be rather due to
 some config inoptimality.

 Can anyone please point me to some directions how to tackle this?

 Thanks,
 Antony.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Tableau beta connector

2015-02-19 Thread Ashutosh Trivedi (MT2013030)
Hi,

I would like you to read my stack overflow answer to this question. If you need 
more clarification feel free to drop a msg.

http://stackoverflow.com/questions/28403664/connect-to-existing-hive-in-intellij-using-sbt-as-build


Regards,

Ashutosh


From: ganterm [via Apache Spark User List] 
ml-node+s1001560n21709...@n3.nabble.com
Sent: Thursday, February 19, 2015 12:49 AM
To: Ashutosh Trivedi (MT2013030)
Subject: Re: Tableau beta connector

Ashutosh,

Were you able to figure this out? I am having the exact some question.
I think the answer is to use Spark SQL to create/load a table in Hive (e.g. 
execute the HiveQL CREATE TABLE statement) but I am not sure. Hoping for 
something more simple than that.

Anybody?

Thanks!


If you reply to this email, your message will be added to the discussion below:
http://apache-spark-user-list.1001560.n3.nabble.com/Tableau-beta-connector-tp21512p21709.html
To unsubscribe from Tableau beta connector, click 
herehttp://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=21512code=YXNodXRvc2gudHJpdmVkaUBpaWl0Yi5vcmd8MjE1MTJ8LTM5MzMxOTc2MQ==.
NAMLhttp://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml


Re: In a Spark Streaming application, what might be the potential causes for util.AkkaUtils: Error sending message in 1 attempts and java.util.concurrent.TimeoutException: Futures timed out and

2015-02-19 Thread Tathagata Das
What version of Spark are you using?

TD

On Thu, Feb 19, 2015 at 2:45 AM, Emre Sevinc emre.sev...@gmail.com wrote:

 Hello,

 We have a Spark Streaming application that watches an input directory, and
 as files are copied there the application reads them and sends the contents
 to a RESTful web service, receives a response and write some contents to an
 output directory.

 When testing the application by copying a few thousand files at once to
 its input directory, we have realized that after having processed about
 3800 files, it creates messages as the following in the log file:

 15/02/19 10:22:06 INFO storage.MemoryStore: Block broadcast_17935 of size
 9960 dropped from memory (free 447798720)
 15/02/19 10:22:55 WARN util.AkkaUtils: Error sending message in 1 attempts
 java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
 at
 scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
 at
 scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
 at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
 at
 scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
 at scala.concurrent.Await$.result(package.scala:107)
 at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:187)
 at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:398)

 and then the Spark Streaming application dies.

 What might be the potential causes to check for such errors?

 Below you can see last few lines before it dies:


 15/02/19 10:22:03 INFO broadcast.TorrentBroadcast: Started reading
 broadcast variable 12894
 15/02/19 10:22:04 INFO storage.MemoryStore: ensureFreeSpace(20978) called
 with curMem=107884847, maxMem=556038881
 15/02/19 10:22:04 INFO storage.MemoryStore: Block broadcast_12894_piece0
 stored as bytes in memory (estimated size 20.5 KB, free 427.4 MB)
 15/02/19 10:22:04 INFO storage.BlockManagerMaster: Updated info of block
 broadcast_12894_piece0
 15/02/19 10:22:04 INFO broadcast.TorrentBroadcast: Reading broadcast
 variable 12894 took 460 ms
 15/02/19 10:22:04 INFO storage.MemoryStore: ensureFreeSpace(347363) called
 with curMem=107905825, maxMem=556038881
 15/02/19 10:22:04 INFO storage.MemoryStore: Block broadcast_12894 stored
 as values in memory (estimated size 339.2 KB, free 427.0 MB)
 15/02/19 10:22:04 INFO storage.MemoryStore: ensureFreeSpace(1079) called
 with curMem=108253188, maxMem=556038881
 15/02/19 10:22:04 INFO storage.MemoryStore: Block rdd_30466_35 stored as
 bytes in memory (estimated size 1079.0 B, free 427.0 MB)
 15/02/19 10:22:04 INFO storage.BlockManagerMaster: Updated info of block
 rdd_30466_35
 15/02/19 10:22:05 INFO storage.MemoryStore: ensureFreeSpace(5) called with
 curMem=108254267, maxMem=556038881
 15/02/19 10:22:05 INFO storage.MemoryStore: Block rdd_30467_35 stored as
 bytes in memory (estimated size 5.0 B, free 427.0 MB)
 15/02/19 10:22:05 INFO storage.BlockManagerMaster: Updated info of block
 rdd_30467_35
 15/02/19 10:22:05 INFO executor.Executor: Finished task 35.0 in stage
 351.0 (TID 12229). 2353 bytes result sent to driver
 15/02/19 10:22:06 INFO storage.BlockManager: Removing broadcast 17935
 15/02/19 10:22:06 INFO storage.BlockManager: Removing block
 broadcast_17935_piece0
 15/02/19 10:22:06 INFO storage.MemoryStore: Block broadcast_17935_piece0
 of size 4151 dropped from memory (free 447788760)
 15/02/19 10:22:06 INFO storage.BlockManagerMaster: Updated info of block
 broadcast_17935_piece0
 15/02/19 10:22:06 INFO storage.BlockManager: Removing block broadcast_17935
 15/02/19 10:22:06 INFO storage.MemoryStore: Block broadcast_17935 of size
 9960 dropped from memory (free 447798720)
 15/02/19 10:22:55 WARN util.AkkaUtils: Error sending message in 1 attempts
 java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
 at
 scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
 at
 scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
 at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
 at
 scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
 at scala.concurrent.Await$.result(package.scala:107)
 at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:187)
 at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:398)
 15/02/19 10:23:28 WARN util.AkkaUtils: Error sending message in 2 attempts
 java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
 at
 scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
 at
 scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
 at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
 at
 scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
 at scala.concurrent.Await$.result(package.scala:107)
 at 

storing MatrixFactorizationModel (pyspark)

2015-02-19 Thread Antony Mayi
Hi,
when getting the model out of ALS.train it would be beneficial to store it (to 
disk) so the model can be reused later for any following predictions. I am 
using pyspark and I had no luck pickling it either using standard pickle module 
or even dill.
does anyone have a solution for this (note it is pyspark)?
thank you,Antony.

Regarding minimum number of partitions while reading data from Hadoop

2015-02-19 Thread twinkle sachdeva
Hi,

In our job, we need to process the data in small chunks, so  as to avoid GC
and other stuff. For this, we are using old API of hadoop as that let us
specify parameter like minPartitions.

Does any one knows, If  there a way to do the same via newHadoopAPI also?
How that way will be different from older API?

I am little bit aware of split size stuff, but not much aware regarding any
promise that minimum number of partitions criteria gets satisfied or not.

Any pointers will be of help.

Thanks,
Twinkle


Re: Unzipping large files and 2GB partition size.

2015-02-19 Thread Sean Owen
gzip and zip are not splittable compression formats; bzip and lzo are.
Ideally, use a splittable compression format.

Repartitioning is not a great solution since it means a shuffle, typically.

This is not necessarily related to how big your partitions are. The
question is, when does this happen? what operation?

On Thu, Feb 19, 2015 at 9:35 AM, Joe Wass jw...@crossref.org wrote:
 On the advice of some recent discussions on this list, I thought I would try
 and consume gz files directly. I'm reading them, doing a preliminary map,
 then repartitioning, then doing normal spark things.

 As I understand it, zip files aren't readable in partitions because of the
 format, so I thought that repartitioning would be the next best thing for
 parallelism. I have about 200 files, some about 1GB compressed and some over
 2GB uncompressed.

 I'm hitting the 2GB maximum partition size. It's been discussed on this list
 (topic: 2GB limit for partitions?, tickets SPARK-1476 and SPARK-1391).
 Stack trace at the end. This happened at 10 hours in (probably when it saw
 its first file). I can't just re-run it quickly!

 Does anyone have any advice? Might I solve this by re-partitioning as the
 first step after reading the file(s)? Or is it effectively impossible to
 read a gz file that expands to over 2GB? Does anyone have any experience
 with this?

 Thanks in advance

 Joe

 Stack trace:

 Exception in thread main 15/02/18 20:44:25 INFO scheduler.TaskSetManager:
 Lost task 5.3 in stage 1.0 (TID 283) on executor:
 java.lang.IllegalArgumentException (Size exceeds Integer.MAX_VALUE)
 [duplicate 6]
 org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in
 stage 1.0 failed 4 times, most recent failure: Lost task 2.3 in stage 1.0:
 java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
 at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:829)
 at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123)
 at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132)
 at
 org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517)
 at
 org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:432)
 at org.apache.spark.storage.BlockManager.get(BlockManager.scala:618)
 at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:44)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:245)

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



loads of memory still GC overhead limit exceeded

2015-02-19 Thread Antony Mayi
Hi,
I have 4 very powerful boxes (256GB RAM, 32 cores each). I am running spark 
1.2.0 in yarn-client mode with following layout:
spark.executor.cores=4
spark.executor.memory=28G
spark.yarn.executor.memoryOverhead=4096

I am submitting bigger ALS trainImplicit task (rank=100, iters=15) on a dataset 
with ~3 billion of ratings using 25 executors. At some point some executor 
crashes with:
15/02/19 05:41:06 WARN util.AkkaUtils: Error sending message in 1 attempts
java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]     
   at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)     
   at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)    
    at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)        
at 
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
        at scala.concurrent.Await$.result(package.scala:107)        at 
org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:187)        at 
org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:398)15/02/19 
05:41:06 ERROR executor.Executor: Exception in task 131.0 in stage 51.0 (TID 
7259)java.lang.OutOfMemoryError: GC overhead limit exceeded        at 
java.lang.reflect.Array.newInstance(Array.java:75)        at 
java.io.ObjectInputStream.readArray(ObjectInputStream.java:1671)        at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1345)        at 
java.io.ObjectInputStream.readArray(ObjectInputStream.java:1707)        at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1345)
So the GC overhead limit exceeded is pretty clear and would suggest running out 
of memory. Since I have 1TB of RAM available this must be rather due to some 
config inoptimality.
Can anyone please point me to some directions how to tackle this?
Thanks,Antony.

Re: loads of memory still GC overhead limit exceeded

2015-02-19 Thread Antony Mayi
based on spark UI I am running 25 executors for sure. why would you expect 
four? I submit the task with --num-executors 25 and I get 6-7 executors running 
per host (using more of smaller executors allows me better cluster utilization 
when running parallel spark sessions (which is not the case of this reported 
issue - for now using the cluster exclusively)).
thx,Antony. 

 On Thursday, 19 February 2015, 11:02, Sean Owen so...@cloudera.com wrote:
   
 

 This should result in 4 executors, not 25. They should be able to
execute 4*4 = 16 tasks simultaneously. You have them grab 4*32 = 128GB
of RAM, not 1TB.

It still feels like this shouldn't be running out of memory, not by a
long shot though. But just pointing out potential differences between
what you are expecting and what you are configuring.

On Thu, Feb 19, 2015 at 9:56 AM, Antony Mayi
antonym...@yahoo.com.invalid wrote:
 Hi,

 I have 4 very powerful boxes (256GB RAM, 32 cores each). I am running spark
 1.2.0 in yarn-client mode with following layout:

 spark.executor.cores=4
 spark.executor.memory=28G
 spark.yarn.executor.memoryOverhead=4096

 I am submitting bigger ALS trainImplicit task (rank=100, iters=15) on a
 dataset with ~3 billion of ratings using 25 executors. At some point some
 executor crashes with:

 15/02/19 05:41:06 WARN util.AkkaUtils: Error sending message in 1 attempts
 java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
        at
 scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
        at
 scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
        at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
        at
 scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
        at scala.concurrent.Await$.result(package.scala:107)
        at
 org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:187)
        at
 org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:398)
 15/02/19 05:41:06 ERROR executor.Executor: Exception in task 131.0 in stage
 51.0 (TID 7259)
 java.lang.OutOfMemoryError: GC overhead limit exceeded
        at java.lang.reflect.Array.newInstance(Array.java:75)
        at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1671)
        at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1345)
        at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1707)
        at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1345)

 So the GC overhead limit exceeded is pretty clear and would suggest running
 out of memory. Since I have 1TB of RAM available this must be rather due to
 some config inoptimality.

 Can anyone please point me to some directions how to tackle this?

 Thanks,
 Antony.


 
   

Re: loads of memory still GC overhead limit exceeded

2015-02-19 Thread Sean Owen
Oh OK you are saying you are requesting 25 executors and getting them,
got it. You can consider making fewer, bigger executors to pool rather
than split up your memory, but at some point it becomes
counter-productive. 32GB is a fine executor size.

So you have ~8GB available per task which seems like plenty. Something
else is at work here. Is this error form your code's stages or ALS?

On Thu, Feb 19, 2015 at 10:07 AM, Antony Mayi antonym...@yahoo.com wrote:
 based on spark UI I am running 25 executors for sure. why would you expect
 four? I submit the task with --num-executors 25 and I get 6-7 executors
 running per host (using more of smaller executors allows me better cluster
 utilization when running parallel spark sessions (which is not the case of
 this reported issue - for now using the cluster exclusively)).

 thx,
 Antony.


 On Thursday, 19 February 2015, 11:02, Sean Owen so...@cloudera.com wrote:



 This should result in 4 executors, not 25. They should be able to
 execute 4*4 = 16 tasks simultaneously. You have them grab 4*32 = 128GB
 of RAM, not 1TB.

 It still feels like this shouldn't be running out of memory, not by a
 long shot though. But just pointing out potential differences between
 what you are expecting and what you are configuring.

 On Thu, Feb 19, 2015 at 9:56 AM, Antony Mayi
 antonym...@yahoo.com.invalid wrote:
 Hi,

 I have 4 very powerful boxes (256GB RAM, 32 cores each). I am running
 spark
 1.2.0 in yarn-client mode with following layout:

 spark.executor.cores=4
 spark.executor.memory=28G
 spark.yarn.executor.memoryOverhead=4096

 I am submitting bigger ALS trainImplicit task (rank=100, iters=15) on a
 dataset with ~3 billion of ratings using 25 executors. At some point some
 executor crashes with:

 15/02/19 05:41:06 WARN util.AkkaUtils: Error sending message in 1 attempts
 java.util.concurrent.TimeoutException: Futures timed out after [30
 seconds]
at
 scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at
 scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at
 scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at

 scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:107)
at
 org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:187)
at
 org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:398)
 15/02/19 05:41:06 ERROR executor.Executor: Exception in task 131.0 in
 stage
 51.0 (TID 7259)
 java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.lang.reflect.Array.newInstance(Array.java:75)
at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1671)
at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1345)
at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1707)
at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1345)

 So the GC overhead limit exceeded is pretty clear and would suggest
 running
 out of memory. Since I have 1TB of RAM available this must be rather due
 to
 some config inoptimality.

 Can anyone please point me to some directions how to tackle this?

 Thanks,
 Antony.



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: loads of memory still GC overhead limit exceeded

2015-02-19 Thread Antony Mayi
it is from within the ALS.trainImplicit() call. btw. the exception varies 
between this GC overhead limit exceeded and Java heap space (which I guess 
is just different outcome of same problem).
just tried another run and here are the logs (filtered) - note I tried this run 
with spark.shuffle.io.preferDirectBufs=false so this might be slightly 
different issue from my previous case (going to revert now):
=== spark stdout ===15/02/19 10:15:05 WARN storage.BlockManagerMasterActor: 
Removing BlockManager BlockManagerId(6, 192.168.1.92, 54289) with no recent 
heart beats: 50221ms exceeds 45000ms15/02/19 10:16:05 WARN 
storage.BlockManagerMasterActor: Removing BlockManager BlockManagerId(13, 
192.168.1.90, 56768) with no recent heart beats: 54749ms exceeds 
45000ms15/02/19 10:16:44 ERROR cluster.YarnClientClusterScheduler: Lost 
executor 6 on 192.168.1.92: remote Akka client disassociated15/02/19 10:16:44 
WARN scheduler.TaskSetManager: Lost task 57.0 in stage 18.0 (TID 5379, 
192.168.1.92): ExecutorLostFailure (executor 6 lost)15/02/19 10:16:44 WARN 
scheduler.TaskSetManager: Lost task 32.0 in stage 18.0 (TID 5354, 
192.168.1.92): ExecutorLostFailure (executor 6 lost)15/02/19 10:16:44 WARN 
scheduler.TaskSetManager: Lost task 82.0 in stage 18.0 (TID 5404, 
192.168.1.92): ExecutorLostFailure (executor 6 lost)15/02/19 10:16:44 WARN 
scheduler.TaskSetManager: Lost task 7.0 in stage 18.0 (TID 5329, 192.168.1.92): 
ExecutorLostFailure (executor 6 lost)15/02/19 10:16:44 ERROR 
cluster.YarnClientSchedulerBackend: Asked to remove non-existent executor 
615/02/19 10:16:54 WARN scheduler.TaskSetManager: Lost task 6.0 in stage 18.0 
(TID 5328, 192.168.1.90): FetchFailed(BlockManagerId(6, 192.168.1.92, 54289), 
shuffleId=6, mapId=227, reduceId=6, 
message=org.apache.spark.shuffle.FetchFailedException: Failed to connect to 
/192.168.1.92:54289        at 
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
        at 
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
        at 
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
        at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
=== yarn log ===15/02/19 10:15:05 WARN executor.Executor: Told to re-register 
on heartbeat15/02/19 10:16:02 ERROR executor.CoarseGrainedExecutorBackend: 
RECEIVED SIGNAL 15: SIGTERM15/02/19 10:16:02 WARN 
server.TransportChannelHandler: Exception in connection from 
/192.168.1.92:45633io.netty.handler.codec.DecoderException: 
java.lang.OutOfMemoryError: Java heap space        at 
io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:280)
        at 
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:149)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
=== yarn nodemanager log ===2015-02-19 10:16:45,146 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
 Memory usage of ProcessTree 20284 for container-id 
container_1424204221358_0012_01_16: 28.5 GB of 32 GB physical memory used; 
29.1 GB of 67.2 GB virtual memory used2015-02-19 10:16:45,163 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
 Memory usage of ProcessTree 20273 for container-id 
container_1424204221358_0012_01_20: 28.5 GB of 32 GB physical memory used; 
29.2 GB of 67.2 GB virtual memory used2015-02-19 10:16:46,621 WARN 
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor: Exit code 
from container container_1424204221358_0012_01_08 is : 1432015-02-19 
10:16:46,621 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container: 
Container container_1424204221358_0012_01_08 transitioned from RUNNING to 
EXITED_WITH_FAILURE2015-02-19 10:16:46,621 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch:
 Cleaning up container container_1424204221358_0012_01_08

thanks for any help,Antony.








ps. could that be Java 8 related? 

 On Thursday, 19 February 2015, 11:25, Sean Owen so...@cloudera.com wrote:
   
 

 Oh OK you are saying you are requesting 25 executors and getting them,
got it. You can consider making fewer, bigger executors to pool rather
than split up your memory, but at some point it becomes
counter-productive. 32GB is a fine executor size.

So you have ~8GB available per task which seems like plenty. Something
else is at work here. Is this error form your code's stages or ALS?

On Thu, Feb 19, 2015 at 10:07 AM, Antony Mayi antonym...@yahoo.com wrote:
 based on spark UI I am running 25 executors for sure. why would you expect
 four? I submit the task with --num-executors 25 and I get 6-7 executors
 running per host (using 

Re: In a Spark Streaming application, what might be the potential causes for util.AkkaUtils: Error sending message in 1 attempts and java.util.concurrent.TimeoutException: Futures timed out and

2015-02-19 Thread Emre Sevinc
On Thu, Feb 19, 2015 at 12:27 PM, Tathagata Das t...@databricks.com wrote:

 What version of Spark are you using?

 TD



Spark version is 1.2.0 (running on Cloudera CDH 5.3.0)


--
Emre Sevinç


Re: Regarding minimum number of partitions while reading data from Hadoop

2015-02-19 Thread Sean Owen
I think that the newer Hadoop API does not expose this suggested min
partitions parameter like the old one did. I believe you can try
setting mapreduce.input.fileinputformat.split.{min,max}size instead on
the Hadoop Configuration to suggest a max/min split size, and
therefore bound the number of partitions you get back.

On Thu, Feb 19, 2015 at 11:07 AM, twinkle sachdeva
twinkle.sachd...@gmail.com wrote:
 Hi,

 In our job, we need to process the data in small chunks, so  as to avoid GC
 and other stuff. For this, we are using old API of hadoop as that let us
 specify parameter like minPartitions.

 Does any one knows, If  there a way to do the same via newHadoopAPI also?
 How that way will be different from older API?

 I am little bit aware of split size stuff, but not much aware regarding any
 promise that minimum number of partitions criteria gets satisfied or not.

 Any pointers will be of help.

 Thanks,
 Twinkle

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark-sql problem with textfile separator

2015-02-19 Thread Yanbo Liang
This is because of each line will be separated into 4 columns instead of 3
columns.
If you want to use comma to separate different columns, each column will be
not allowed to include commas.


2015-02-19 18:12 GMT+08:00 sparkino francescoboname...@gmail.com:

 Hello everybody,
 I'm quite new to Spark and Scala as well and I was trying to analyze some
 csv data via spark-sql

 My csv file contains data like this



 Following the example at this link below

 https://spark.apache.org/docs/latest/sql-programming-guide.html#inferring-the-schema-using-reflection

 I have that code:


 When I try to fetch val name_address the second field (address) does not
 contain all the text. The issue is probably the comma, it is at the same
 time the split character and a string value of the third column (address)
 of
 the csv file.

 How can handle this?

 Thank you in advance
 Sparkino



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/spark-sql-problem-with-textfile-separator-tp21718.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Spark on Mesos: Multiple Users with iPython Notebooks

2015-02-19 Thread John Omernik
I am running Spark on Mesos and it works quite well.  I have three
users, all who setup iPython notebooks to instantiate a spark instance
to work with on the notebooks. I love it so far.

Since I am auto instantiating (I don't want a user to have to
think about instantiating and submitting a spark app to do adhoc
analysis, I want the environment setup ahead of time) this is done
whenever an iPython notebook is open.  So far it's working pretty
good, save one issue:

Every notebook is a new driver. I.e. every time they open a notebook,
a new spark submit is called, and the driver resources are allocated,
regardless if they are used or not.  Yes, it's only the driver, but
even that I find starts slowing down my queries for the notebooks that
using spark.  (I am running in Mesos Fined Grained mode).


I have three users on my system, ideally, I would love to find a way
so that on the first notebook being opened, a driver is started for
that user, and then can be used for any notebook the user has open. So
if they open a new notebook, I can check that yes, the user has a
spark driver running, and thus, that notebook, if there is a query,
will run it through that driver. That allows me to understand the
resource allocation better, and it limits users from running 10
notebooks and having a lot of resources.

The other thing I was wondering is could the driver actually be run on
the mesos cluster? Right now, I have a edge node as an iPython
server, the drivers all exist on that server, so as I get more and
more drivers, the box's local resources get depleted with unused
drivers.  Obviously if I could reuse the drivers per user, on that
box, that is great first step, but if I could reuse drivers, and run
them on the cluster, that would be ideal.  looking through the docs I
was not clear on those options. If anyone could point me in the right
direction, I would greatly appreciate it!

John

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: ML Transformer

2015-02-19 Thread Peter Rudenko

Hi Cesar,
these methods would be private until new ml api would stabilize (aprox. 
in spark 1.4). My solution for the same issue was to create 
org.apache.spark.ml package in my project and extends/implement 
everything there.


Thanks,
Peter Rudenko


On 2015-02-18 22:17, Cesar Flores wrote:


I am working right now with the ML pipeline, which I really like it. 
However in order to make a real use of it, I would like create my own 
transformers that implements org.apache.spark.ml.Transformer. In order 
to do that, a method from the PipelineStage needs to be implemented. 
But this method is private to the ml package:


private[ml] deftransformSchema(schema: StructType, paramMap: 
ParamMap):StructType


Do any user can create their own transformers? If not, do this 
functionality will be added in the future.


Thanks
--
Cesar Flores




Re: loads of memory still GC overhead limit exceeded

2015-02-19 Thread Antony Mayi
now with reverted spark.shuffle.io.preferDirectBufs (to true) getting again GC 
overhead limit exceeded:
=== spark stdout ===15/02/19 12:08:08 WARN scheduler.TaskSetManager: Lost task 
7.0 in stage 18.0 (TID 5329, 192.168.1.93): java.lang.OutOfMemoryError: GC 
overhead limit exceeded        at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989)        
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)        
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)    
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)       
 at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)        at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
=== yarn log (same) ===15/02/19 12:08:08 ERROR executor.Executor: Exception in 
task 7.0 in stage 18.0 (TID 5329)java.lang.OutOfMemoryError: GC overhead limit 
exceeded        at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989)        
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)        
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)    
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)       
 at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)        at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
=== yarn nodemanager ===2015-02-19 12:08:13,758 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
 Memory usage of ProcessTree 19014 for container-id 
container_1424204221358_0013_01_12: 29.8 GB of 32 GB physical memory used; 
31.7 GB of 67.2 GB virtual memory used2015-02-19 12:08:13,778 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
 Memory usage of ProcessTree 19013 for container-id 
container_1424204221358_0013_01_08: 1.2 MB of 32 GB physical memory used; 
103.6 MB of 67.2 GB virtual memory used2015-02-19 12:08:14,455 WARN 
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor: Exit code 
from container container_1424204221358_0013_01_08 is : 1432015-02-19 
12:08:14,455 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container: 
Container container_1424204221358_0013_01_08 transitioned from RUNNING to 
EXITED_WITH_FAILURE2015-02-19 12:08:14,455 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch:
 Cleaning up container container_1424204221358_0013_01_08
Antony.

 

 On Thursday, 19 February 2015, 11:54, Antony Mayi 
antonym...@yahoo.com.INVALID wrote:
   
 

 it is from within the ALS.trainImplicit() call. btw. the exception varies 
between this GC overhead limit exceeded and Java heap space (which I guess 
is just different outcome of same problem).
just tried another run and here are the logs (filtered) - note I tried this run 
with spark.shuffle.io.preferDirectBufs=false so this might be slightly 
different issue from my previous case (going to revert now):
=== spark stdout ===15/02/19 10:15:05 WARN storage.BlockManagerMasterActor: 
Removing BlockManager BlockManagerId(6, 192.168.1.92, 54289) with no recent 
heart beats: 50221ms exceeds 45000ms15/02/19 10:16:05 WARN 
storage.BlockManagerMasterActor: Removing BlockManager BlockManagerId(13, 
192.168.1.90, 56768) with no recent heart beats: 54749ms exceeds 
45000ms15/02/19 10:16:44 ERROR cluster.YarnClientClusterScheduler: Lost 
executor 6 on 192.168.1.92: remote Akka client disassociated15/02/19 10:16:44 
WARN scheduler.TaskSetManager: Lost task 57.0 in stage 18.0 (TID 5379, 
192.168.1.92): ExecutorLostFailure (executor 6 lost)15/02/19 10:16:44 WARN 
scheduler.TaskSetManager: Lost task 32.0 in stage 18.0 (TID 5354, 
192.168.1.92): ExecutorLostFailure (executor 6 lost)15/02/19 10:16:44 WARN 
scheduler.TaskSetManager: Lost task 82.0 in stage 18.0 (TID 5404, 
192.168.1.92): ExecutorLostFailure (executor 6 lost)15/02/19 10:16:44 WARN 
scheduler.TaskSetManager: Lost task 7.0 in stage 18.0 (TID 5329, 192.168.1.92): 
ExecutorLostFailure (executor 6 lost)15/02/19 10:16:44 ERROR 
cluster.YarnClientSchedulerBackend: Asked to remove non-existent executor 
615/02/19 10:16:54 WARN scheduler.TaskSetManager: Lost task 6.0 in stage 18.0 
(TID 5328, 192.168.1.90): FetchFailed(BlockManagerId(6, 192.168.1.92, 54289), 
shuffleId=6, mapId=227, reduceId=6, 
message=org.apache.spark.shuffle.FetchFailedException: Failed to connect to 
/192.168.1.92:54289        at 
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
        at 
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
        at 

Re: spark-sql problem with textfile separator

2015-02-19 Thread Francesco Bonamente
Hi Yanbo,
unfortunately all csv files contain comma inside some columns and I can't
change the structure.

How can I work with this kind of textfile and spark-sql?

Thank you again


2015-02-19 14:38 GMT+01:00 Yanbo Liang hackingda...@gmail.com:

 This is because of each line will be separated into 4 columns instead of 3
 columns.
 If you want to use comma to separate different columns, each column will
 be not allowed to include commas.


 2015-02-19 18:12 GMT+08:00 sparkino francescoboname...@gmail.com:

 Hello everybody,
 I'm quite new to Spark and Scala as well and I was trying to analyze some
 csv data via spark-sql

 My csv file contains data like this



 Following the example at this link below

 https://spark.apache.org/docs/latest/sql-programming-guide.html#inferring-the-schema-using-reflection

 I have that code:


 When I try to fetch val name_address the second field (address) does not
 contain all the text. The issue is probably the comma, it is at the same
 time the split character and a string value of the third column (address)
 of
 the csv file.

 How can handle this?

 Thank you in advance
 Sparkino



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/spark-sql-problem-with-textfile-separator-tp21718.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: JdbcRDD, ClassCastException with scala.Function0

2015-02-19 Thread Cody Koeninger
At the beginning of the code, do a query to find the current maximum ID

Don't just put in an arbitrarily large value, or all of your rows will end
up in 1 spark partition at the beginning of the range.

The question of keys is up to you... all that you need to be able to do is
write a sql statement that takes 2 numbers to specify the bounds.  Of
course, a numeric primary key is going to be the most efficient way to do
that.

On Thu, Feb 19, 2015 at 8:57 AM, Dmitry Goldenberg dgoldenberg...@gmail.com
 wrote:

 Yup, I did see that. Good point though, Cody. The mismatch was happening
 for me when I was trying to get the 'new JdbcRDD' approach going. Once I
 switched to the 'create' method things are working just fine. Was just able
 to refactor the 'get connection' logic into a 'DbConnection implements
 JdbcRDD.ConnectionFactory' and my 'map row' class is still 'MapRow
 implements org.apache.spark.api.java.function.FunctionResultSet, Row'.

 This works fine and makes the driver program tighter. Of course, my next
 question is, how to work with the lower and upper bound parameters. As in,
 what if I don't know what the min and max ID values are and just want to
 extract all data from the table, what should the params be, if that's even
 supported. And furthermore, what if the primary key on the table is not
 numeric? or if there's no primary key altogether?

 The method works fine with lowerBound=0 and upperBound=100, for
 example. But doesn't seem to have a way to say, 'no upper bound' (-1 didn't
 work).

 On Wed, Feb 18, 2015 at 11:59 PM, Cody Koeninger c...@koeninger.org
 wrote:

 Look at the definition of JdbcRDD.create:

   def create[T](

   sc: JavaSparkContext,

   connectionFactory: ConnectionFactory,

   sql: String,

   lowerBound: Long,

   upperBound: Long,

   numPartitions: Int,

   mapRow: JFunction[ResultSet, T]): JavaRDD[T] = {


 JFunction here is the interface org.apache.spark.api.java.function.Function,
 not scala Function0

 LIkewise, ConnectionFactory is an interface defined inside JdbcRDD, not
 scala Function0

 On Wed, Feb 18, 2015 at 4:50 PM, Dmitry Goldenberg 
 dgoldenberg...@gmail.com wrote:

 That's exactly what I was doing. However, I ran into runtime issues with
 doing that. For instance, I had a

   public class DbConnection extends AbstractFunction0Connection
 implements Serializable

 I got a runtime error from Spark complaining that DbConnection wasn't an
 instance of scala.Function0.

 I also had a

   public class MapRow extends
 scala.runtime.AbstractFunction1java.sql.ResultSet, Row implements
 Serializable

 with which I seemed to have more luck.

 On Wed, Feb 18, 2015 at 5:32 PM, Cody Koeninger c...@koeninger.org
 wrote:

 Cant you implement the

 org.apache.spark.api.java.function.Function

 interface and pass an instance of that to JdbcRDD.create ?

 On Wed, Feb 18, 2015 at 3:48 PM, Dmitry Goldenberg 
 dgoldenberg...@gmail.com wrote:

 Cody, you were right, I had a copy and paste snag where I ended up
 with a vanilla SparkContext rather than a Java one.  I also had to *not*
 use my function subclasses, rather just use anonymous inner classes for 
 the
 Function stuff and that got things working. I'm fully following
 the JdbcRDD.create approach from JavaJdbcRDDSuite.java basically verbatim.

 Is there a clean way to refactor out the custom Function classes such
 as the one for getting a db connection or mapping ResultSet data to your
 own POJO's rather than doing it all inline?


 On Wed, Feb 18, 2015 at 1:52 PM, Cody Koeninger c...@koeninger.org
 wrote:

 Is sc there a SparkContext or a JavaSparkContext?  The compilation
 error seems to indicate the former, but JdbcRDD.create expects the latter

 On Wed, Feb 18, 2015 at 12:30 PM, Dmitry Goldenberg 
 dgoldenberg...@gmail.com wrote:

 I have tried that as well, I get a compile error --

 [ERROR] ...SparkProto.java:[105,39] error: no suitable method found
 for create(SparkContext,anonymous
 ConnectionFactory,String,int,int,int,anonymous
 FunctionResultSet,Integer)

 The code is a copy and paste:

 JavaRDDInteger jdbcRDD = JdbcRDD.create(
   sc,
   new JdbcRDD.ConnectionFactory() {
 public Connection getConnection() throws SQLException {
   return
 DriverManager.getConnection(jdbc:derby:target/JavaJdbcRDDSuiteDb);
 }
   },
   SELECT DATA FROM FOO WHERE ? = ID AND ID = ?,
   1, 100, 1,
   new FunctionResultSet, Integer() {
 public Integer call(ResultSet r) throws Exception {
   return r.getInt(1);
 }
   }
 );

 The other thing I've tried was to define a static class locally for
 GetConnection and use the JdbcCreate constructor. This got around the
 compile issues but blew up at runtime with NoClassDefFoundError:
 scala/runtime/AbstractFunction0 !

 JdbcRDDRow jdbcRDD = new JdbcRDDRow(
 sc,
 (AbstractFunction0Connection) new DbConn(), // had to cast or a

Re: SparkSQL + Tableau Connector

2015-02-19 Thread Silvio Fiorito
Great, glad it worked out!

From: Todd Nist
Date: Thursday, February 19, 2015 at 9:19 AM
To: Silvio Fiorito
Cc: user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: SparkSQL + Tableau Connector

Hi Silvio,

I got this working today using your suggestion with the Initial SQL and a 
Custom Query.  See here for details:

http://stackoverflow.com/questions/28403664/connect-to-existing-hive-in-intellij-using-sbt-as-build/28608608#28608608

It is not ideal as I need to write a custom query, but does work for now.  I 
also have it working by doing a SaveAsTable on the ingested data which stores 
the reference into the metastore for access via the thrift server.

Thanks for the help.

-Todd

On Wed, Feb 11, 2015 at 8:41 PM, Silvio Fiorito 
silvio.fior...@granturing.commailto:silvio.fior...@granturing.com wrote:
Hey Todd,

I don’t have an app to test against the thrift server, are you able to define 
custom SQL without using Tableau’s schema query? I guess it’s not possible to 
just use SparkSQL temp tables, you may have to use permanent Hive tables that 
are actually in the metastore so Tableau can discover them in the schema. In 
that case you will either have to generate the Hive tables externally from 
Spark or use Spark to process the data and save them using a HiveContext.


From: Todd Nist
Date: Wednesday, February 11, 2015 at 7:53 PM
To: Andrew Lee
Cc: Arush Kharbanda, user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: SparkSQL + Tableau Connector

First sorry for the long post.  So back to tableau and Spark SQL, I'm still 
missing something.

TL;DR

To get the Spark SQL Temp table associated with the metastore are there 
additional steps required beyond doing the below?

Initial SQL on connection:

create temporary table test
using org.apache.spark.sql.json
options (path '/data/json/*');

cache table test;

I feel like I'm missing a step of associating the Spark SQL table with the 
metastore, do I need to actually save it in some fashion?   I'm trying to avoid 
saving to hive if possible.

Details:

I configured the hive-site.xml and placed it in the $SPARK_HOME/conf.  It looks 
like this, thanks Andrew and Arush for the assistance:

?xml version=1.0?
?xml-stylesheet type=text/xsl href=configuration.xsl?

configuration
  property
namehive.semantic.analyzer.factory.impl/name
valueorg.apache.hcatalog.cli.HCatSemanticAnalyzerFactory/value
  /property

  property
namehive.metastore.sasl.enabled/name
valuefalse/value
  /property

  property
namehive.server2.authentication/name
valueNONE/value
  /property

  property
namehive.server2.enable.doAs/name
valuetrue/value
  /property

  !--
  property
namehive.metastore.uris/name
valuethrift://localhost:9083/value
descriptionIP address (or fully-qualified domain name) and port of the 
metastore host/description
  /property
  --

  property
namehive.warehouse.subdir.inherit.perms/name
valuetrue/value
  /property

  property
namehive.metastore.schema.verification/name
valuefalse/value
  /property

  property
namejavax.jdo.option.ConnectionURL/name

valuejdbc:mysql://localhost:3306/metastore_db?createDatabaseIfNotExist=true/value
descriptionmetadata is stored in a MySQL server/description
  /property

  property
namejavax.jdo.option.ConnectionDriverName/name
valuecom.mysql.jdbc.Driver/value
descriptionMySQL JDBC driver class/description
  /property

  property
namejavax.jdo.option.ConnectionUserName/name
valuehiveuser/value
  /property

  property
namejavax.jdo.option.ConnectionPassword/name
valuehiveuser/value
  /property

/configuration

When I start the server it looks fine:

$ ./sbin/start-thriftserver.sh --hiveconf hive.server2.thrift.port=10001 
--hiveconf hive.server2.thrift.bind.host radtech.iohttp://radtech.io 
--master spark://radtech.io:7077http://radtech.io:7077 --driver-class-path 
/usr/local/spark/lib/mysql-connector-java-5.1.34-bin.jar
starting org.apache.spark.sql.hive.thriftserver.HiveThriftServer2, logging to 
/usr/local/spark-1.2.1-bin-hadoop2.4/logs/spark-tnist-org.apache.spark.sql.hive.thriftserver.HiveThriftServer2-1-radtech.io.out
radtech:spark tnist$ tail -f 
logs/spark-tnist-org.apache.spark.sql.hive.thriftserver.HiveThriftServer2-1-radtech.io.out
15/02/11 19:15:24 INFO SparkDeploySchedulerBackend: Granted executor ID 
app-20150211191524-0008/1 on hostPort 
192.168.1.2:50851http://192.168.1.2:50851 with 2 cores, 512.0 MB RAM
15/02/11 19:15:24 INFO AppClient$ClientActor: Executor updated: 
app-20150211191524-0008/0 is now LOADING
15/02/11 19:15:24 INFO AppClient$ClientActor: Executor updated: 
app-20150211191524-0008/1 is now LOADING
15/02/11 19:15:24 INFO AppClient$ClientActor: Executor updated: 
app-20150211191524-0008/0 is now RUNNING
15/02/11 19:15:24 INFO AppClient$ClientActor: Executor updated: 
app-20150211191524-0008/1 is now RUNNING
15/02/11 19:15:24 INFO NettyBlockTransferService: Server created on 50938
15/02/11 

Re: storing MatrixFactorizationModel (pyspark)

2015-02-19 Thread Ilya Ganelin
Yep. the matrix model had two RDD vectors representing the decomposed
matrix. You can save these to disk and re use them.
On Thu, Feb 19, 2015 at 2:19 AM Antony Mayi antonym...@yahoo.com.invalid
wrote:

 Hi,

 when getting the model out of ALS.train it would be beneficial to store it
 (to disk) so the model can be reused later for any following predictions. I
 am using pyspark and I had no luck pickling it either using standard pickle
 module or even dill.

 does anyone have a solution for this (note it is pyspark)?

 thank you,
 Antony.



Re: loads of memory still GC overhead limit exceeded

2015-02-19 Thread Ilya Ganelin
Hi Anthony - you are seeing a problem that I ran into. The underlying issue
is your default parallelism setting. What's happening is that within ALS
certain RDD operations end up changing the number of partitions you have of
your data. For example if you start with an RDD of 300 partitions, unless
default parallelism is set while the algorithm executes you'll eventually
get an RDD with something like 20 partitions. Consequently, your giant data
set is now stored across a much smaller number of partitions so each
partition is huge. Then, when a shuffle requires serialization you run out
of heap space trying to serialize it. The solution should be as simple as
setting the default parallelism setting.

This is referenced in a JIRA I can't find at the moment.
On Thu, Feb 19, 2015 at 5:10 AM Antony Mayi antonym...@yahoo.com.invalid
wrote:

 now with reverted spark.shuffle.io.preferDirectBufs (to true) getting
 again GC overhead limit exceeded:

 === spark stdout ===
 15/02/19 12:08:08 WARN scheduler.TaskSetManager: Lost task 7.0 in stage
 18.0 (TID 5329, 192.168.1.93): java.lang.OutOfMemoryError: GC overhead
 limit exceeded
 at
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989)
 at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
 at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
 at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
 at
 org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)

 === yarn log (same) ===
 15/02/19 12:08:08 ERROR executor.Executor: Exception in task 7.0 in stage
 18.0 (TID 5329)
 java.lang.OutOfMemoryError: GC overhead limit exceeded
 at
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989)
 at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
 at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
 at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
 at
 org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)

 === yarn nodemanager ===
 2015-02-19 12:08:13,758 INFO
 org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
 Memory usage of ProcessTree 19014 for container-id
 container_1424204221358_0013_01_12: 29.8 GB of 32 GB physical memory
 used; 31.7 GB of 67.2 GB virtual memory used
 2015-02-19 12:08:13,778 INFO
 org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
 Memory usage of ProcessTree 19013 for container-id
 container_1424204221358_0013_01_08: 1.2 MB of 32 GB physical memory
 used; 103.6 MB of 67.2 GB virtual memory used
 2015-02-19 12:08:14,455 WARN
 org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor: Exit
 code from container container_1424204221358_0013_01_08 is : 143
 2015-02-19 12:08:14,455 INFO
 org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container:
 Container container_1424204221358_0013_01_08 transitioned from RUNNING
 to EXITED_WITH_FAILURE
 2015-02-19 12:08:14,455 INFO
 org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch:
 Cleaning up container container_1424204221358_0013_01_08

 Antony.




   On Thursday, 19 February 2015, 11:54, Antony Mayi
 antonym...@yahoo.com.INVALID wrote:



 it is from within the ALS.trainImplicit() call. btw. the exception varies
 between this GC overhead limit exceeded and Java heap space (which I
 guess is just different outcome of same problem).

 just tried another run and here are the logs (filtered) - note I tried
 this run with spark.shuffle.io.preferDirectBufs=false so this might be
 slightly different issue from my previous case (going to revert now):

 === spark stdout ===
 15/02/19 10:15:05 WARN storage.BlockManagerMasterActor: Removing
 BlockManager BlockManagerId(6, 192.168.1.92, 54289) with no recent heart
 beats: 50221ms exceeds 45000ms
 15/02/19 10:16:05 WARN storage.BlockManagerMasterActor: Removing
 BlockManager BlockManagerId(13, 192.168.1.90, 56768) with no recent heart
 beats: 54749ms exceeds 45000ms
 15/02/19 10:16:44 ERROR cluster.YarnClientClusterScheduler: Lost executor
 6 on 192.168.1.92: remote Akka client disassociated
 15/02/19 10:16:44 WARN scheduler.TaskSetManager: Lost task 57.0 in stage
 18.0 (TID 5379, 192.168.1.92): ExecutorLostFailure (executor 6 lost)
 15/02/19 10:16:44 WARN scheduler.TaskSetManager: Lost task 32.0 in stage
 18.0 (TID 5354, 192.168.1.92): ExecutorLostFailure (executor 6 lost)
 15/02/19 10:16:44 WARN scheduler.TaskSetManager: Lost task 82.0 in stage
 18.0 (TID 5404, 192.168.1.92): ExecutorLostFailure (executor 6 lost)
 15/02/19 10:16:44 WARN 

Why is RDD lookup slow?

2015-02-19 Thread shahab
Hi,

I am doing lookup on cached RDDs [(Int,String)], and I noticed that the
lookup is relatively slow 30-100 ms ?? I even tried this on one machine
with single partition, but no difference!

The RDDs are not large at all, 3-30 MB.

Is this expected behaviour? should I use other data structures, like
HashMap to keep data and look up it there and use Broadcast to send a copy
to all machines?

best,
/Shahab


Re: Why is RDD lookup slow?

2015-02-19 Thread Sean Owen
RDDs are not Maps. lookup() does a linear scan -- parallel by
partition, but stil linear. Yes, it is not supposed be an O(1) lookup
data structure. It'd be much nicer to broadcast the relatively small
data set as a Map and look it up fast, locally.

On Thu, Feb 19, 2015 at 3:29 PM, shahab shahab.mok...@gmail.com wrote:
 Hi,

 I am doing lookup on cached RDDs [(Int,String)], and I noticed that the
 lookup is relatively slow 30-100 ms ?? I even tried this on one machine with
 single partition, but no difference!

 The RDDs are not large at all, 3-30 MB.

 Is this expected behaviour? should I use other data structures, like HashMap
 to keep data and look up it there and use Broadcast to send a copy to all
 machines?

 best,
 /Shahab



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Why is RDD lookup slow?

2015-02-19 Thread Ilya Ganelin
Hi Shahab - if your data structures are small enough a broadcasted Map is
going to provide faster lookup. Lookup within an RDD is an O(m) operation
where m is the size of the partition. For RDDs with multiple partitions,
executors can operate on it in parallel so you get some improvement for
larger RDDs.
On Thu, Feb 19, 2015 at 7:31 AM shahab shahab.mok...@gmail.com wrote:

 Hi,

 I am doing lookup on cached RDDs [(Int,String)], and I noticed that the
 lookup is relatively slow 30-100 ms ?? I even tried this on one machine
 with single partition, but no difference!

 The RDDs are not large at all, 3-30 MB.

 Is this expected behaviour? should I use other data structures, like
 HashMap to keep data and look up it there and use Broadcast to send a copy
 to all machines?

 best,
 /Shahab





Re: spark-sql problem with textfile separator

2015-02-19 Thread Yanbo Liang
For your case, I think you can use a trick for separating with “ “,”  instead 
of “,”
You can refer the following code snippet

val people = sc.textFile(examples/src/main/resources/data.csv).map( x = 
x.substring(1,x.length-1).split(\,\)).map(p = List(p(0), p(1), p(2)))


On Feb 19, 2015, at 10:02 PM, Francesco Bonamente 
francescoboname...@gmail.com wrote:

 Hi Yanbo,
 unfortunately all csv files contain comma inside some columns and I can't 
 change the structure.
 
 How can I work with this kind of textfile and spark-sql?
  
 Thank you again
 
 
 2015-02-19 14:38 GMT+01:00 Yanbo Liang hackingda...@gmail.com:
 This is because of each line will be separated into 4 columns instead of 3 
 columns.
 If you want to use comma to separate different columns, each column will be 
 not allowed to include commas.  
 
 
 2015-02-19 18:12 GMT+08:00 sparkino francescoboname...@gmail.com:
 Hello everybody,
 I'm quite new to Spark and Scala as well and I was trying to analyze some
 csv data via spark-sql
 
 My csv file contains data like this
 
 
 
 Following the example at this link below
 https://spark.apache.org/docs/latest/sql-programming-guide.html#inferring-the-schema-using-reflection
 
 I have that code:
 
 
 When I try to fetch val name_address the second field (address) does not
 contain all the text. The issue is probably the comma, it is at the same
 time the split character and a string value of the third column (address) of
 the csv file.
 
 How can handle this?
 
 Thank you in advance
 Sparkino
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/spark-sql-problem-with-textfile-separator-tp21718.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 
 
 



Re: bulk writing to HDFS in Spark Streaming?

2015-02-19 Thread Akhil Das
There was already a thread around it if i understood your question
correctly, you can go through this
https://mail-archives.apache.org/mod_mbox/spark-user/201502.mbox/%3ccannjawtrp0nd3odz-5-_ya351rin81q-9+f2u-qn+vruqy+...@mail.gmail.com%3E

Thanks
Best Regards

On Thu, Feb 19, 2015 at 8:16 PM, Chico Qi qijic...@gmail.com wrote:

 Hi all,

 In Spark Streaming I want use the Dstream.saveAsTextFiles by bulk writing
 because of the normal saveAsTextFiles cannot during the batch interval of
 setting.
 May be a common pool of writing or another assigned worker for bulk
 writing?

 Thanks!

 B/R
 Jichao



Re: No suitable driver found error, Create table in hive from spark sql

2015-02-19 Thread Todd Nist
Hi Dhimant,

I believe if you change your spark-shell to pass -driver-class-path
/usr/local/spark/lib/mysql-connector-java-5.1.34-bin.jar vs putting it in
--jars.

-Todd

On Wed, Feb 18, 2015 at 10:41 PM, Dhimant dhimant84.jays...@gmail.com
wrote:

 Found solution from one of the post found on internet.
 I updated spark/bin/compute-classpath.sh and added database connector jar
 into classpath.
 CLASSPATH=$CLASSPATH:/data/mysql-connector-java-5.1.14-bin.jar



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/No-suitable-driver-found-error-Create-table-in-hive-from-spark-sql-tp21714p21715.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Tableau beta connector

2015-02-19 Thread Todd Nist
I am able to connect by doing the following using the Tableau Initial SQL
and a custom query:

   1.

   First ingest csv file or json and save out to file system:

   import org.apache.spark.sql.SQLContext
   import com.databricks.spark.csv._
   val sqlContext = new SQLContext(sc)
   val demo = sqlContext.csvFile(/user/data/csv/demo.csv)
   demo.toJSON.saveAsTextFile(/user/data/json/test”)

   2.

   Start $SPARK_HOME/sbin/start-thirftserver:

   ./sbin/start-thriftserver.sh --master spark://radtech.io:7077
--total-executor-cores 2 --driver-class-path --hiveconf
hive.server2.thrift.port=10001 --hiveconf
hive.server2.thrift.bind.host radtech.io

   3.

   Start tableau session. Create a connection to thrift server via SparkSQL
   (Beta) connector.
   4.

   In Tableau add the following to the “Initial SQL”

   create temporary table test
   using org.apache.spark.sql.json
   options (path '/user/data/json/test/*’);

   cache table test;

   1. Refresh connection.

Then select “New Custom SQL” and issue something like:

select * from test;

You will see your table appear.

HTH.

-Todd

On Thu, Feb 19, 2015 at 5:41 AM, ashu ashutosh.triv...@iiitb.org wrote:

  Hi,

 I would like you to read my stack overflow answer to this question. If you
 need more clarification feel free to drop a msg.


 http://stackoverflow.com/questions/28403664/connect-to-existing-hive-in-intellij-using-sbt-as-build


  Regards,

 Ashutosh
  --
 *From:* ganterm [via Apache Spark User List] ml-node+[hidden email]
 http:///user/SendEmail.jtp?type=nodenode=21719i=0
 *Sent:* Thursday, February 19, 2015 12:49 AM
 *To:* Ashutosh Trivedi (MT2013030)
 *Subject:* Re: Tableau beta connector

  Ashutosh,

 Were you able to figure this out? I am having the exact some question.
 I think the answer is to use Spark SQL to create/load a table in Hive
 (e.g. execute the HiveQL CREATE TABLE statement) but I am not sure. Hoping
 for something more simple than that.

 Anybody?

 Thanks!

 --
  If you reply to this email, your message will be added to the discussion
 below:

 http://apache-spark-user-list.1001560.n3.nabble.com/Tableau-beta-connector-tp21512p21709.html
  To unsubscribe from Tableau beta connector, click here.
 NAML
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml

 --
 View this message in context: Re: Tableau beta connector
 http://apache-spark-user-list.1001560.n3.nabble.com/Tableau-beta-connector-tp21512p21719.html

 Sent from the Apache Spark User List mailing list archive
 http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.



Re: Filtering keys after map+combine

2015-02-19 Thread Sean Owen
You have the keys before and after reduceByKey. You want to do
something based on the key within reduceByKey? it just calls
combineByKey, so you can use that method for lower-level control over
the merging.

Whether it's possible depends I suppose on what you mean to filter on.
If it's just a property of the key, can't you just filter before
reduceByKey? If it's a property of the key's value, don't you need to
wait for the reduction to finish? or are you saying that you know a
key should be filtered based on its value partway through the merge?

I suppose you can use combineByKey to create a mergeValue function
that changes an input type A into some other Option[B]; you output
None if your criteria is reached, and your combine function returns
None if either argument is None? it doesn't save 100% of the work but
it may mean you only shuffle (key,None) for some keys if the map-side
combine already worked out that the key would be filtered.

And then after, run a flatMap or something to make Option[B] into B.

On Thu, Feb 19, 2015 at 2:21 PM, Debasish Das debasish.da...@gmail.com wrote:
 Hi,

 Before I send out the keys for network shuffle, in reduceByKey after map +
 combine are done, I would like to filter the keys based on some threshold...

 Is there a way to get the key, value after map+combine stages so that I can
 run a filter on the keys ?

 Thanks.
 Deb

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: issue Running Spark Job on Yarn Cluster

2015-02-19 Thread Harshvardhan Chauhan
Is this the full stack trace ?

On Wed, Feb 18, 2015 at 2:39 AM, sachin Singh sachin.sha...@gmail.com
wrote:

 Hi,
 I want to run my spark Job in Hadoop yarn Cluster mode,
 I am using below command -
 spark-submit --master yarn-cluster --driver-memory 1g --executor-memory 1g
 --executor-cores 1 --class com.dc.analysis.jobs.AggregationJob
 sparkanalitic.jar param1 param2 param3
 I am getting error as under, kindly suggest whats going wrong ,is command
 is
 proper or not ,thanks in advance,

 Exception in thread main org.apache.spark.SparkException: Application
 finished with failed status
 at
 org.apache.spark.deploy.yarn.ClientBase$class.run(ClientBase.scala:509)
 at org.apache.spark.deploy.yarn.Client.run(Client.scala:35)
 at org.apache.spark.deploy.yarn.Client$.main(Client.scala:139)
 at org.apache.spark.deploy.yarn.Client.main(Client.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at

 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at

 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at
 org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
 at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
 at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/issue-Running-Spark-Job-on-Yarn-Cluster-tp21697.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
*Harshvardhan Chauhan*  |  Software Engineer
*GumGum* http://www.gumgum.com/  |  *Ads that stick*
310-260-9666  |  ha...@gumgum.com


Re: Filtering keys after map+combine

2015-02-19 Thread Debasish Das
Hi Sean,

This is what I intend to do:

are you saying that you know a key should be filtered based on its value
partway through the merge?

I should use combineByKey...

Thanks.
Deb


On Thu, Feb 19, 2015 at 6:31 AM, Sean Owen so...@cloudera.com wrote:

 You have the keys before and after reduceByKey. You want to do
 something based on the key within reduceByKey? it just calls
 combineByKey, so you can use that method for lower-level control over
 the merging.

 Whether it's possible depends I suppose on what you mean to filter on.
 If it's just a property of the key, can't you just filter before
 reduceByKey? If it's a property of the key's value, don't you need to
 wait for the reduction to finish? or are you saying that you know a
 key should be filtered based on its value partway through the merge?

 I suppose you can use combineByKey to create a mergeValue function
 that changes an input type A into some other Option[B]; you output
 None if your criteria is reached, and your combine function returns
 None if either argument is None? it doesn't save 100% of the work but
 it may mean you only shuffle (key,None) for some keys if the map-side
 combine already worked out that the key would be filtered.

 And then after, run a flatMap or something to make Option[B] into B.

 On Thu, Feb 19, 2015 at 2:21 PM, Debasish Das debasish.da...@gmail.com
 wrote:
  Hi,
 
  Before I send out the keys for network shuffle, in reduceByKey after map
 +
  combine are done, I would like to filter the keys based on some
 threshold...
 
  Is there a way to get the key, value after map+combine stages so that I
 can
  run a filter on the keys ?
 
  Thanks.
  Deb



Spark 1.2.1: ClassNotFoundException when running hello world example in scala 2.11

2015-02-19 Thread Luis Solano
I'm having an issue with spark 1.2.1 and scala 2.11. I detailed the
symptoms in this stackoverflow question.

http://stackoverflow.com/questions/28612837/spark-classnotfoundexception-when-running-hello-world-example-in-scala-2-11

Has anyone experienced anything similar?

Thank you!


Filter data from one RDD based on data from another RDD

2015-02-19 Thread Himanish Kushary
Hi,

I have two RDD's with csv data as below :

RDD-1

101970_5854301840,fbcf5485-e696-4100-9468-a17ec7c5bb43,19229261643
101970_5854301839,fbaf5485-e696-4100-9468-a17ec7c5bb39,9229261645
101970_5854301839,fbbf5485-e696-4100-9468-a17ec7c5bb39,9229261647
101970_17038953,546853f9-cf07-4700-b202-00f21e7c56d8,791191603
101970_5854301840,fbcf5485-e696-4100-9468-a17ec7c5bb42,19229261643
101970_5851048323,218f5485-e58c-4200-a473-348ddb858578,290542385
101970_5854301839,fbcf5485-e696-4100-9468-a17ec7c5bb41,922926164

RDD-2

101970_17038953,546853f9-cf07-4700-b202-00f21e7c56d9,7911160
101970_5851048323,218f5485-e58c-4200-a473-348ddb858578,2954238
101970_5854301839,fbaf5485-e696-4100-9468-a17ec7c5bb39,9226164
101970_5854301839,fbbf5485-e696-4100-9468-a17ec7c5bb39,92292164
101970_5854301839,fbcf5485-e696-4100-9468-a17ec7c5bb41,9226164

101970_5854301838,fbcf5485-e696-4100-9468-a17ec7c5bb40,929164
101970_5854301838,fbcf5485-e696-4100-9468-a17ec7c5bb39,26164

I need to filter RDD-2 to include only those records where the first column
value in RDD-2 matches any of the first column values in RDD-1

Currently , I am broadcasting the first column values from RDD-1 as a list
and then filtering RDD-2 based on that list.

val rdd1broadcast = sc.broadcast(rdd1.map { uu = uu.split(,)(0)
}.collect().toSet)

val rdd2filtered = rdd2.filter{ h =
rdd1broadcast.value.contains(h.split(,)(0)) }

This will result in data with first column 101970_5854301838 (last
two records) to be filtered out from RDD-2.

Is this is the best way to accomplish this ? I am worried that for
large data volume , the broadcast step may become an issue. Appreciate
any other suggestion.

---
Thanks
Himanish


Incorrect number of records after left outer join (I think)

2015-02-19 Thread Darin McBeath
Consider the following left outer join

potentialDailyModificationsRDD = 
reducedDailyPairRDD.leftOuterJoin(baselinePairRDD).partitionBy(new 
HashPartitioner(1024)).persist(StorageLevel.MEMORY_AND_DISK_SER());


Below are the record counts for the RDDs involved
Number of records for reducedDailyPairRDD: 2565206
Number of records for baselinePairRDD: 56102812
Number of records for potentialDailyModificationsRDD: 2570115

Below are the partitioners for the RDDs involved.
Partitioner for reducedDailyPairRDD: Some(org.apache.spark.HashPartitioner@400)
Partitioner for baselinePairRDD: Some(org.apache.spark.HashPartitioner@400)
Partitioner for potentialDailyModificationsRDD: 
Some(org.apache.spark.HashPartitioner@400)


I realize in the above statement that the .partitionBy is probably not needed 
as the underlying RDDs used in the left outer join are already hash partitioned.

My question is how the resulting RDD (potentialDailyModificationsRDD) can end 
up with more records than 
reducedDailyPairRDD.  I would think the number of records in 
potentialDailyModificationsRDD should be 2565206 instead of 2570115.  Am I 
missing something or is this possibly a bug?

I'm using Apache Spark 1.2 on a stand-alone cluster on ec2.  To get the counts 
for the records, I'm using the .count() for the RDD.

Thanks.

Darin.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming and message ordering

2015-02-19 Thread Cody Koeninger
Kafka ordering is guaranteed on a per-partition basis.

The high-level consumer api as used by the spark kafka streams prior to 1.3
will consume from multiple kafka partitions, thus not giving any ordering
guarantees.

The experimental direct stream in 1.3 uses the simple consumer api, and
there is a 1:1 correspondence between spark partitions and kafka
partitions.  So you will get deterministic ordering, but only on a
per-partition basis.

On Thu, Feb 19, 2015 at 11:31 PM, Neelesh neele...@gmail.com wrote:

 I had a chance to talk to TD today at the Strata+Hadoop Conf in San Jose.
 We talked a bit about this after his presentation about this - the short
 answer is spark streaming does not guarantee any sort of ordering (within
 batches, across batches).  One would have to use updateStateByKey to
 collect the events and sort them based on some attribute of the event.  But
 TD said message ordering is a frequently asked feature recently and is
 getting on his radar.

 I went through the source code and there does not seem to be any
 architectural/design limitation to support this.  (JobScheduler,
 JobGenerator are a good starting point to see how stuff works under the
 hood).  Overriding DStream#compute and using streaminglistener looks like a
 simple way of ensuring ordered execution of batches within a stream. But
 this would be a partial solution, since ordering within a batch needs some
 more work that I don't understand fully yet.

 Side note :  My custom receiver polls the metricsservlet once in a while
 to decide whether jobs are getting done fast enough and throttle/relax
 pushing data in to receivers based on the numbers provided by
 metricsservlet. I had to do this because out-of-the-box rate limiting right
 now is static and cannot adapt to the state of the cluster

 thnx
 -neelesh

 On Wed, Feb 18, 2015 at 4:13 PM, jay vyas jayunit100.apa...@gmail.com
 wrote:

 This is a *fantastic* question.  The idea of how we identify individual
 things in multiple  DStreams is worth looking at.

 The reason being, that you can then fine tune your streaming job, based
 on the RDD identifiers (i.e. are the timestamps from the producer
 correlating closely to the order in which RDD elements are being produced)
 ?  If *NO* then you need to (1) dial up throughput on producer sources or
 else (2) increase cluster size so that spark is capable of evenly handling
 load.

 You cant decide to do (1) or (2) unless you can track  when the streaming
 elements are being  converted to RDDs by spark itself.



 On Wed, Feb 18, 2015 at 6:54 PM, Neelesh neele...@gmail.com wrote:

 There does not seem to be a definitive answer on this. Every time I
 google for message ordering,the only relevant thing that comes up is this
  -
 http://samza.apache.org/learn/documentation/0.8/comparisons/spark-streaming.html
 .

 With a kafka receiver that pulls data from a single kafka partition of a
 kafka topic, are individual messages in the microbatch in same the order as
 kafka partition? Are successive microbatches originating from a kafka
 partition executed in order?


 Thanks!





 --
 jay vyas





Re: Re: Spark streaming doesn't print output when working with standalone master

2015-02-19 Thread bit1...@163.com
Thanks Akhil, you are right.
I checked and find that I have only 1 core allocated to the program
I am running on a visual machine,and only allocate one processor to it(1 core 
per processor), so even if I have specified --total-executor-cores 3 in the 
submit script, the application will still only be allocated one processor.

This leads to me another question:
Although I have only one core, If I have specified the master and executor as  
--master local[3] --executor-memory 512M --total-executor-cores 3. Since I have 
only one core, why does this work? 



bit1...@163.com
 
From: Akhil Das
Date: 2015-02-20 15:13
To: bit1...@163.com
CC: user
Subject: Re: Spark streaming doesn't print output when working with standalone 
master
While running the program go to your clusters webUI (that runs on 8080, prolly 
at hadoop.master:8080) and see how many cores are allocated to the program, it 
should be = 2 for the stream to get processed.






Thanks
Best Regards

On Fri, Feb 20, 2015 at 9:29 AM, bit1...@163.com bit1...@163.com wrote:
Hi,
I am trying the spark streaming log analysis reference application provided by 
Databricks at 
https://github.com/databricks/reference-apps/tree/master/logs_analyzer 
When I deploy the code to the standalone cluster, there is no output at will 
with the following shell script.Which means, the windowDStream has 0 RDDs
./spark-submit --deploy-mode client --name LogAnalyzerStreaming --master 
spark://hadoop.master:7077 --executor-memory 512M --total-executor-cores 3 
--class 
spark.examples.databricks.reference.apps.loganalysis.LogAnalyzerStreaming 
LogApp.jar

But, when I change --master to be --master local[3], the program starts to work 
fine. Can anyone have some advice? Thanks!
./spark-submit --deploy-mode client --name LogAnalyzerStreaming --master 
local[3] --executor-memory 512M --total-executor-cores 3 --class 
spark.examples.databricks.reference.apps.loganalysis.LogAnalyzerStreaming 
LogApp.jar


object LogAnalyzerStreaming { 

val WINDOW_LENGTH = new Duration(12 * 1000) 
val SLIDE_INTERVAL = new Duration(6 * 1000) 

def main(args: Array[String]) { 
val sparkConf = new SparkConf().setAppName(Log Analyzer Streaming in Scala) 
val sc = new SparkContext(sparkConf) 
val streamingContext = new StreamingContext(sc, SLIDE_INTERVAL) 

val logLinesDStream = streamingContext.socketTextStream(localhost, ) 

val accessLogsDStream = 
logLinesDStream.map(ApacheAccessLog.parseLogLine).cache() 
val windowDStream = accessLogsDStream.window(WINDOW_LENGTH, SLIDE_INTERVAL) 

windowDStream.foreachRDD(accessLogs = { 
if (accessLogs.count() == 0) { 
println(No access com.databricks.app.logs received in this time interval) 
} else { 
// Calculate statistics based on the content size. 
val contentSizes = accessLogs.map(log = log.contentSize).cache() 
println(Content Size Avg: %s, Min: %s, Max: %s.format( 
contentSizes.reduce(_ + _) / contentSizes.count, 
contentSizes.min, 
contentSizes.max 
)) 

streamingContext.start() 
streamingContext.awaitTermination() 
} 
}




邮件带有附件预览链接,若您转发或回复此邮件时不希望对方预览附件,建议您手动删除链接。
共有 1 个附件
image.png(13K) 极速下载 在线预览 


Re: How to pass parameters to a spark-jobserver Scala class?

2015-02-19 Thread Vasu C
Hi Sasi,

I am not sure about Vaadin, but by simple googling  you can find many
article on how to pass json parameters in http. 

http://stackoverflow.com/questions/21404252/post-request-send-json-data-java-httpurlconnection

You can also try Finagle which is fully fault tolerant framework by Twitter.


Regards,
   Vasu C



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-pass-parameters-to-a-spark-jobserver-Scala-class-tp21671p21727.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: using a database connection pool to write data into an RDBMS from a Spark application

2015-02-19 Thread Mohammed Guller
Hi Kelvin,

Yes. I am creating an uber jar with the Postgres driver included, but 
nevertheless tried both –jars and –driver-classpath flags. It didn’t help.

Interestingly, I can’t use BoneCP even in the driver program when I run my 
application with spark-submit. I am getting the same exception when the 
application initializes BoneCP before creating SparkContext. It looks like 
Spark is loading a different version of the Postgres JDBC driver than the one 
that I am linking.

Mohammed

From: Kelvin Chu [mailto:2dot7kel...@gmail.com]
Sent: Thursday, February 19, 2015 7:56 PM
To: Mohammed Guller
Cc: user@spark.apache.org
Subject: Re: using a database connection pool to write data into an RDBMS from 
a Spark application

Hi Mohammed,

Did you use --jars to specify your jdbc driver when you submitted your job? 
Take a look of this link: 
http://spark.apache.org/docs/1.2.0/submitting-applications.html

Hope this help!

Kelvin

On Thu, Feb 19, 2015 at 7:24 PM, Mohammed Guller 
moham...@glassbeam.commailto:moham...@glassbeam.com wrote:
Hi –
I am trying to use BoneCP (a database connection pooling library) to write data 
from my Spark application to an RDBMS. The database inserts are inside a 
foreachPartition code block. I am getting this exception when the code tries to 
insert data using BoneCP:

java.sql.SQLException: No suitable driver found for 
jdbc:postgresql://hostname:5432/dbname

I tried explicitly loading the Postgres driver on the worker nodes by adding 
the following line inside the foreachPartition code block:

Class.forName(org.postgresql.Driver)

It didn’t help.

Has anybody able to get a database connection pool library to work with Spark? 
If you got it working, can you please share the steps?

Thanks,
Mohammed




Re: Failure on a Pipe operation

2015-02-19 Thread athing goingon
It appears that the file paths are different when running spark in
local and cluster mode. When running spark without --master the paths to
the pipe command are relative to the local machine. When running spark
with --master the paths to the pipe command are ./

This is what finally worked. I still don't understand why. It's not
documented and a cursory glance of the source code didn't help.
sc.addFile(./ProgramSIM);
...
JavaRDDString output = data.pipe(SparkFiles.get(./ProgramSIM));




On Thu, Feb 19, 2015 at 5:41 PM, Imran Rashid iras...@cloudera.com wrote:

 The error msg is telling you the exact problem, it can't find ProgramSIM, 
 the thing you are trying to run

 Lost task 3520.3 in stage 0.0 (TID 11, compute3.research.dev): 
 java.io.IOException: Cannot run program ProgramSIM: error=2, No s\
 uch file or directory


 On Thu, Feb 19, 2015 at 5:52 PM, athing goingon athinggoin...@gmail.com
 wrote:

 Hi, I'm trying to figure out why the following job is failing on a pipe
 http://pastebin.com/raw.php?i=U5E8YiNN

 With this exception:
 http://pastebin.com/raw.php?i=07NTGyPP

 Any help is welcome. Thank you.





Re: using a database connection pool to write data into an RDBMS from a Spark application

2015-02-19 Thread Kelvin Chu
Hi Mohammed,

Did you use --jars to specify your jdbc driver when you submitted your job?
Take a look of this link:
http://spark.apache.org/docs/1.2.0/submitting-applications.html

Hope this help!

Kelvin

On Thu, Feb 19, 2015 at 7:24 PM, Mohammed Guller moham...@glassbeam.com
wrote:

  Hi –

 I am trying to use BoneCP (a database connection pooling library) to write
 data from my Spark application to an RDBMS. The database inserts are inside
 a foreachPartition code block. I am getting this exception when the code
 tries to insert data using BoneCP:



 java.sql.SQLException: No suitable driver found for
 jdbc:postgresql://hostname:5432/dbname



 I tried explicitly loading the Postgres driver on the worker nodes by
 adding the following line inside the foreachPartition code block:



 Class.forName(org.postgresql.Driver)



 It didn’t help.



 Has anybody able to get a database connection pool library to work with
 Spark? If you got it working, can you please share the steps?



 Thanks,

 Mohammed





Spark streaming doesn't print output when working with standalone master

2015-02-19 Thread bit1...@163.com
Hi,
I am trying the spark streaming log analysis reference application provided by 
Databricks at 
https://github.com/databricks/reference-apps/tree/master/logs_analyzer 
When I deploy the code to the standalone cluster, there is no output at will 
with the following shell script.Which means, the windowDStream has 0 RDDs
./spark-submit --deploy-mode client --name LogAnalyzerStreaming --master 
spark://hadoop.master:7077 --executor-memory 512M --total-executor-cores 3 
--class 
spark.examples.databricks.reference.apps.loganalysis.LogAnalyzerStreaming 
LogApp.jar

But, when I change --master to be --master local[3], the program starts to work 
fine. Can anyone have some advice? Thanks!
./spark-submit --deploy-mode client --name LogAnalyzerStreaming --master 
local[3] --executor-memory 512M --total-executor-cores 3 --class 
spark.examples.databricks.reference.apps.loganalysis.LogAnalyzerStreaming 
LogApp.jar


object LogAnalyzerStreaming { 

val WINDOW_LENGTH = new Duration(12 * 1000) 
val SLIDE_INTERVAL = new Duration(6 * 1000) 

def main(args: Array[String]) { 
val sparkConf = new SparkConf().setAppName(Log Analyzer Streaming in Scala) 
val sc = new SparkContext(sparkConf) 
val streamingContext = new StreamingContext(sc, SLIDE_INTERVAL) 

val logLinesDStream = streamingContext.socketTextStream(localhost, ) 

val accessLogsDStream = 
logLinesDStream.map(ApacheAccessLog.parseLogLine).cache() 
val windowDStream = accessLogsDStream.window(WINDOW_LENGTH, SLIDE_INTERVAL) 

windowDStream.foreachRDD(accessLogs = { 
if (accessLogs.count() == 0) { 
println(No access com.databricks.app.logs received in this time interval) 
} else { 
// Calculate statistics based on the content size. 
val contentSizes = accessLogs.map(log = log.contentSize).cache() 
println(Content Size Avg: %s, Min: %s, Max: %s.format( 
contentSizes.reduce(_ + _) / contentSizes.count, 
contentSizes.min, 
contentSizes.max 
)) 

streamingContext.start() 
streamingContext.awaitTermination() 
} 
}





Re: Tableau beta connector

2015-02-19 Thread Ashutosh Trivedi (MT2013030)
Thanks Todd. great stuff :)


Regards,

Ashu


From: Todd Nist tsind...@gmail.com
Sent: Thursday, February 19, 2015 7:46 PM
To: Ashutosh Trivedi (MT2013030)
Cc: user@spark.apache.org
Subject: Re: Tableau beta connector


I am able to connect by doing the following using the Tableau Initial SQL and a 
custom query:

  1.  First ingest csv file or json and save out to file system:

import org.apache.spark.sql.SQLContext
import com.databricks.spark.csv._
val sqlContext = new SQLContext(sc)
val demo = sqlContext.csvFile(/user/data/csv/demo.csv)
demo.toJSON.saveAsTextFile(/user/data/json/test)


  2.  Start $SPARK_HOME/sbin/start-thirftserver:

./sbin/start-thriftserver.sh --master 
spark://radtech.io:7077http://radtech.io:7077 --total-executor-cores 2 
--driver-class-path --hiveconf hive.server2.thrift.port=10001 --hiveconf 
hive.server2.thrift.bind.host radtech.iohttp://radtech.io


  3.  Start tableau session. Create a connection to thrift server via SparkSQL 
(Beta) connector.

  4.  In Tableau add the following to the Initial SQL

create temporary table test
using org.apache.spark.sql.json
options (path '/user/data/json/test/*');

cache table test;


 *   Refresh connection.

Then select New Custom SQL and issue something like:

select * from test;


You will see your table appear.

HTH.

-Todd

On Thu, Feb 19, 2015 at 5:41 AM, ashu 
ashutosh.triv...@iiitb.orgmailto:ashutosh.triv...@iiitb.org wrote:

Hi,

I would like you to read my stack overflow answer to this question. If you need 
more clarification feel free to drop a msg.

http://stackoverflow.com/questions/28403664/connect-to-existing-hive-in-intellij-using-sbt-as-build


Regards,

Ashutosh


From: ganterm [via Apache Spark User List] ml-node+[hidden 
email]http:///user/SendEmail.jtp?type=nodenode=21719i=0
Sent: Thursday, February 19, 2015 12:49 AM
To: Ashutosh Trivedi (MT2013030)
Subject: Re: Tableau beta connector

Ashutosh,

Were you able to figure this out? I am having the exact some question.
I think the answer is to use Spark SQL to create/load a table in Hive (e.g. 
execute the HiveQL CREATE TABLE statement) but I am not sure. Hoping for 
something more simple than that.

Anybody?

Thanks!


If you reply to this email, your message will be added to the discussion below:
http://apache-spark-user-list.1001560.n3.nabble.com/Tableau-beta-connector-tp21512p21709.html
To unsubscribe from Tableau beta connector, click here.
NAMLhttp://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml


View this message in context: Re: Tableau beta 
connectorhttp://apache-spark-user-list.1001560.n3.nabble.com/Tableau-beta-connector-tp21512p21719.html

Sent from the Apache Spark User List mailing list 
archivehttp://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.



Caching RDD

2015-02-19 Thread Kartheek.R
Hi,
I have HDFS file of size 598MB. I create RDD over this file and cache it in
RAM in a 7 node cluster with 2G RAM each. I find that each partition gets
replicated thrice or even 4 times in the cluster even without me specifying
in code. Total partitions are 5 for the RDD created but cached partitions
are 17. I want to know if Spark replicates RDD automatically whenever there
is scope (availability of extra resources)?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Caching-RDD-tp21728.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Spark Performance on Yarn

2015-02-19 Thread lbierman
I'm a bit new to Spark, but had a question on performance. I suspect a lot of
my issue is due to tuning and parameters. I have a Hive external table on
this data and to run queries against it runs in minutes 

The Job:
+ 40gb of avro events on HDFS (100 million+ avro events)
+ Read in the files from HDFS and dedupe events by key (mapToPair then a
reduceByKey)
+ RDD returned and persisted (disk and memory)
+ Then passed to a job that take the RDD and mapToPair of new object data
and then reduceByKey and foreachpartion do work

The issue:
When I run this on my environment on Yarn this takes 20+ hours. Running on
yarn we see the first stage runs to do build the RDD deduped, but then when
the next stage starts, things fail and data is lost. This results in stage 0
starting over and over and just dragging it out.

Errors I see in the driver logs:
ERROR cluster.YarnClientClusterScheduler: Lost executor 1 on X: remote
Akka client disassociated

15/02/20 00:27:36 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 3.1
(TID 1335,): FetchFailed(BlockManagerId(3, i, 33958), shuffleId=1,
mapId=162, reduceId=0, message=
org.apache.spark.shuffle.FetchFailedException: Failed to connect
toX/X:33958

Also we see this, but I'm suspecting this is because the previous stage
fails and the next one starts:
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output
location for shuffle 1

Cluster:
5 machines, each 2 core , 8gb machines 

Spark-submit command:
 spark-submit --class com.myco.SparkJob \
--master yarn \
/tmp/sparkjob.jar \

Any thoughts or where to look or how to start approaching this problem or
more data points to present.

Thanks..

Code for the job:
 JavaRDDAnalyticsEvent events = ((JavaRDDAvroKeylt;AnalyticsEvent)
context.newAPIHadoopRDD(
context.hadoopConfiguration(),
AvroKeyInputFormat.class,
AvroKey.class,
NullWritable.class
).keys())
.map(event - AnalyticsEvent.newBuilder(event.datum()).build())
.filter(key - { return
Optional.ofNullable(key.getStepEventKey()).isPresent(); })
.mapToPair(event - new Tuple2AnalyticsEvent, Integer(event, 1)) 
.reduceByKey((analyticsEvent1, analyticsEvent2) - analyticsEvent1)
.map(tuple - tuple._1());

events.persist(StorageLevel.MEMORY_AND_DISK_2());
events.mapToPair(event - {
return new Tuple2T, RunningAggregates(
keySelector.select(event),
new RunningAggregates(
Optional.ofNullable(event.getVisitors()).orElse(0L),
Optional.ofNullable(event.getImpressions()).orElse(0L),
Optional.ofNullable(event.getAmount()).orElse(0.0D),
   
Optional.ofNullable(event.getAmountSumOfSquares()).orElse(0.0D)));
})
.reduceByKey((left, right) - { return left.add(right); })
.foreachpartition(dostuff)






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Performance-on-Yarn-tp21729.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming and message ordering

2015-02-19 Thread Neelesh
I had a chance to talk to TD today at the Strata+Hadoop Conf in San Jose.
We talked a bit about this after his presentation about this - the short
answer is spark streaming does not guarantee any sort of ordering (within
batches, across batches).  One would have to use updateStateByKey to
collect the events and sort them based on some attribute of the event.  But
TD said message ordering is a frequently asked feature recently and is
getting on his radar.

I went through the source code and there does not seem to be any
architectural/design limitation to support this.  (JobScheduler,
JobGenerator are a good starting point to see how stuff works under the
hood).  Overriding DStream#compute and using streaminglistener looks like a
simple way of ensuring ordered execution of batches within a stream. But
this would be a partial solution, since ordering within a batch needs some
more work that I don't understand fully yet.

Side note :  My custom receiver polls the metricsservlet once in a while to
decide whether jobs are getting done fast enough and throttle/relax pushing
data in to receivers based on the numbers provided by metricsservlet. I had
to do this because out-of-the-box rate limiting right now is static and
cannot adapt to the state of the cluster

thnx
-neelesh

On Wed, Feb 18, 2015 at 4:13 PM, jay vyas jayunit100.apa...@gmail.com
wrote:

 This is a *fantastic* question.  The idea of how we identify individual
 things in multiple  DStreams is worth looking at.

 The reason being, that you can then fine tune your streaming job, based on
 the RDD identifiers (i.e. are the timestamps from the producer correlating
 closely to the order in which RDD elements are being produced) ?  If *NO*
 then you need to (1) dial up throughput on producer sources or else (2)
 increase cluster size so that spark is capable of evenly handling load.

 You cant decide to do (1) or (2) unless you can track  when the streaming
 elements are being  converted to RDDs by spark itself.



 On Wed, Feb 18, 2015 at 6:54 PM, Neelesh neele...@gmail.com wrote:

 There does not seem to be a definitive answer on this. Every time I
 google for message ordering,the only relevant thing that comes up is this
  -
 http://samza.apache.org/learn/documentation/0.8/comparisons/spark-streaming.html
 .

 With a kafka receiver that pulls data from a single kafka partition of a
 kafka topic, are individual messages in the microbatch in same the order as
 kafka partition? Are successive microbatches originating from a kafka
 partition executed in order?


 Thanks!





 --
 jay vyas



Re: Learning GraphX Questions

2015-02-19 Thread Takeshi Yamamuro
Hi,

Vertices are simply hash-partitioned by spark.HashPartitioner, so
you easily calculate partition ids by yourself.

Also, you can type the lines to check ids;

import org.apache.spark.graphx._

graph.vertices.mapPartitionsWithIndex { (pid, iter) =
  val vids = Array.newBuilder[VertexId]
  for (d - iter) vids += d._1
  Iterator((pid, vids.result))
}
.map(d = sPID:${d._1} IDs:${d._2.toSeq.toString})
.collect
.foreach(println)








On Thu, Feb 19, 2015 at 12:31 AM, Matthew Bucci mrbucci...@gmail.com
wrote:

 Thanks for all the responses so far! I have started to understand the
 system more, but I just had another question while I was going along. Is
 there a way to check the individual partitions of an RDD? For example, if I
 had a graph with vertices a,b,c,d and it was split into 2 partitions could
 I check which vertices belonged in partition 1 and parition 2?

 Thank You,
 Matthew Bucci

 On Fri, Feb 13, 2015 at 10:58 PM, Ankur Dave ankurd...@gmail.com wrote:

 At 2015-02-13 12:19:46 -0800, Matthew Bucci mrbucci...@gmail.com wrote:
  1) How do you actually run programs in GraphX? At the moment I've been
 doing
  everything live through the shell, but I'd obviously like to be able to
 work
  on it by writing and running scripts.

 You can create your own projects that build against Spark and GraphX
 through a Maven dependency [1], then run those applications using the
 bin/spark-submit script included with Spark [2].

 These guides assume you already know how to do this using your preferred
 build tool (SBT or Maven). In short, here's how to do it with SBT:

 1. Install SBT locally (`brew install sbt` on OS X).

 2. Inside your project directory, create a build.sbt file listing Spark
 and GraphX as a dependency, as in [3].

 3. Run `sbt package` in a shell.

 4. Pass the JAR in your_project_dir/target/scala-2.10/ to
 bin/spark-submit.

 [1]
 http://spark.apache.org/docs/latest/programming-guide.html#linking-with-spark
 [2] http://spark.apache.org/docs/latest/submitting-applications.html
 [3] https://gist.github.com/ankurdave/1fb7234d8affb3a2e4f4

  2) Is there a way to check the status of the partitions of a graph? For
  example, I want to determine for starters if the number of partitions
  requested are always made, like if I ask for 8 partitions but only have
 4
  cores what happens?

 You can look at `graph.vertices` and `graph.edges`, which are both RDDs,
 so you can do for example: graph.vertices.partitions

  3) Would I be able to partition by vertex instead of edges, even if I
 had to
  write it myself? I know partitioning by edges is favored in a majority
 of
  the cases, but for the sake of research I'd like to be able to do both.

 If you pass PartitionStrategy.EdgePartition1D, this will partition edges
 by their source vertices, so all edges with the same source will be
 co-partitioned, and the communication pattern will be similar to
 vertex-partitioned (edge-cut) systems like Giraph.

  4) Is there a better way to time processes outside of using built-in
 unix
  timing through the logs or something?

 I think the options are Unix timing, log file timestamp parsing, looking
 at the web UI, or writing timing code within your program
 (System.currentTimeMillis and System.nanoTime).

 Ankur





-- 
---
Takeshi Yamamuro


Re: Spark Streaming and message ordering

2015-02-19 Thread Neelesh
Even with the new direct streams in 1.3,  isn't it the case that the job
*scheduling* follows the partition order, rather than job *execution*? Or
is it the case that the stream listens to job completion event (using a
streamlistener) before scheduling the next batch?  To compare with storm
from a message ordering point of view, unless a tuple is fully processed by
the DAG (as defined by spout+bolts), the next tuple does not enter the DAG.


On Thu, Feb 19, 2015 at 9:47 PM, Cody Koeninger c...@koeninger.org wrote:

 Kafka ordering is guaranteed on a per-partition basis.

 The high-level consumer api as used by the spark kafka streams prior to
 1.3 will consume from multiple kafka partitions, thus not giving any
 ordering guarantees.

 The experimental direct stream in 1.3 uses the simple consumer api, and
 there is a 1:1 correspondence between spark partitions and kafka
 partitions.  So you will get deterministic ordering, but only on a
 per-partition basis.

 On Thu, Feb 19, 2015 at 11:31 PM, Neelesh neele...@gmail.com wrote:

 I had a chance to talk to TD today at the Strata+Hadoop Conf in San Jose.
 We talked a bit about this after his presentation about this - the short
 answer is spark streaming does not guarantee any sort of ordering (within
 batches, across batches).  One would have to use updateStateByKey to
 collect the events and sort them based on some attribute of the event.  But
 TD said message ordering is a frequently asked feature recently and is
 getting on his radar.

 I went through the source code and there does not seem to be any
 architectural/design limitation to support this.  (JobScheduler,
 JobGenerator are a good starting point to see how stuff works under the
 hood).  Overriding DStream#compute and using streaminglistener looks like a
 simple way of ensuring ordered execution of batches within a stream. But
 this would be a partial solution, since ordering within a batch needs some
 more work that I don't understand fully yet.

 Side note :  My custom receiver polls the metricsservlet once in a while
 to decide whether jobs are getting done fast enough and throttle/relax
 pushing data in to receivers based on the numbers provided by
 metricsservlet. I had to do this because out-of-the-box rate limiting right
 now is static and cannot adapt to the state of the cluster

 thnx
 -neelesh

 On Wed, Feb 18, 2015 at 4:13 PM, jay vyas jayunit100.apa...@gmail.com
 wrote:

 This is a *fantastic* question.  The idea of how we identify individual
 things in multiple  DStreams is worth looking at.

 The reason being, that you can then fine tune your streaming job, based
 on the RDD identifiers (i.e. are the timestamps from the producer
 correlating closely to the order in which RDD elements are being produced)
 ?  If *NO* then you need to (1) dial up throughput on producer sources or
 else (2) increase cluster size so that spark is capable of evenly handling
 load.

 You cant decide to do (1) or (2) unless you can track  when the
 streaming elements are being  converted to RDDs by spark itself.



 On Wed, Feb 18, 2015 at 6:54 PM, Neelesh neele...@gmail.com wrote:

 There does not seem to be a definitive answer on this. Every time I
 google for message ordering,the only relevant thing that comes up is this
  -
 http://samza.apache.org/learn/documentation/0.8/comparisons/spark-streaming.html
 .

 With a kafka receiver that pulls data from a single kafka partition of
 a kafka topic, are individual messages in the microbatch in same the order
 as kafka partition? Are successive microbatches originating from a kafka
 partition executed in order?


 Thanks!





 --
 jay vyas






Re: Spark 1.2.1: ClassNotFoundException when running hello world example in scala 2.11

2015-02-19 Thread Akhil Das
Can you downgrade your scala dependency to 2.10 and give it a try?

Thanks
Best Regards

On Fri, Feb 20, 2015 at 12:40 AM, Luis Solano l...@pixable.com wrote:

 I'm having an issue with spark 1.2.1 and scala 2.11. I detailed the
 symptoms in this stackoverflow question.


 http://stackoverflow.com/questions/28612837/spark-classnotfoundexception-when-running-hello-world-example-in-scala-2-11

 Has anyone experienced anything similar?

 Thank you!



Re: How to diagnose could not compute split errors and failed jobs?

2015-02-19 Thread Akhil Das
Not quiet sure, but this can be the case. One of your executor is stuck on
GC pause while the other one asks for the data from it and hence the
request timesout ending in that exception. You can try increasing the akk
framesize and ack wait timeout as follows:

  .set(spark.core.connection.ack.wait.timeout,600)
.set(spark.akka.frameSize,50)


Thanks
Best Regards

On Fri, Feb 20, 2015 at 6:21 AM, Tim Smith secs...@gmail.com wrote:

 My streaming app runs fine for a few hours and then starts spewing Could
 not compute split, block input-xx-xxx not found errors. After this,
 jobs start to fail and batches start to pile up.

 My question isn't so much about why this error but rather, how do I trace
 what leads to this error? I am using disk+memory for storage so shouldn't
 be a case of data loss resulting from memory overrun.

 15/02/18 22:04:49 ERROR JobScheduler: Error running job streaming job
 142429705 ms.28
 org.apache.spark.SparkException: Job aborted due to stage failure: Task 3
 in stage 247644.0 failed 64 times, most recent failure: Lost task 3.63 in
 stage 247644.0 (TID 3705290, node-dn1-16-test.abcdefg.com):
 java.lang.Exception: Could not compute split, block input-28-1424297042500
 not found
 at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
 at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
 at
 org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:228)
 at
 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
 at org.apache.spark.scheduler.Task.run(Task.scala:56)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)

 Driver stacktrace:
 at org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at
 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at
 org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
 at scala.Option.foreach(Option.scala:236)
 at
 org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
 at
 org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
 at akka.actor.ActorCell.invoke(ActorCell.scala:456)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
 at akka.dispatch.Mailbox.run(Mailbox.scala:219)
 at
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
 at
 scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

 Thanks,

 Tim




Re: Spark streaming doesn't print output when working with standalone master

2015-02-19 Thread Akhil Das
While running the program go to your clusters webUI (that runs on 8080,
prolly at hadoop.master:8080) and see how many cores are allocated to the
program, it should be = 2 for the stream to get processed.


[image: Inline image 1]



Thanks
Best Regards

On Fri, Feb 20, 2015 at 9:29 AM, bit1...@163.com bit1...@163.com wrote:

 Hi,
 I am trying the spark streaming log analysis reference application
 provided by Databricks at
 https://github.com/databricks/reference-apps/tree/master/logs_analyzer
 When I deploy the code to the standalone cluster, there is no output at
 will with the following shell script.Which means, the windowDStream has 0
 RDDs
 ./spark-submit --deploy-mode client --name LogAnalyzerStreaming --master
 spark://hadoop.master:7077 --executor-memory 512M --total-executor-cores 3
 --class
 spark.examples.databricks.reference.apps.loganalysis.LogAnalyzerStreaming
 LogApp.jar

 But, when I change --master to be --master local[3], the program starts to
 work fine. Can anyone have some advice? Thanks!
 ./spark-submit --deploy-mode client --name LogAnalyzerStreaming --master
 local[3] --executor-memory 512M --total-executor-cores 3 --class
 spark.examples.databricks.reference.apps.loganalysis.LogAnalyzerStreaming 
 LogApp.jar


 object LogAnalyzerStreaming {

 val WINDOW_LENGTH = new Duration(12 * 1000)
 val SLIDE_INTERVAL = new Duration(6 * 1000)

 def main(args: Array[String]) {
 val sparkConf = new SparkConf().setAppName(Log Analyzer Streaming in
 Scala)
 val sc = new SparkContext(sparkConf)
 val streamingContext = new StreamingContext(sc, SLIDE_INTERVAL)

 val logLinesDStream = streamingContext.socketTextStream(localhost, )

 val accessLogsDStream =
 logLinesDStream.map(ApacheAccessLog.parseLogLine).cache()
 val windowDStream = accessLogsDStream.window(WINDOW_LENGTH,
 SLIDE_INTERVAL)

 windowDStream.foreachRDD(accessLogs = {
 if (accessLogs.count() == 0) {
 println(No access com.databricks.app.logs received in this time
 interval)
 } else {
 // Calculate statistics based on the content size.
 val contentSizes = accessLogs.map(log = log.contentSize).cache()
 println(Content Size Avg: %s, Min: %s, Max: %s.format(
 contentSizes.reduce(_ + _) / contentSizes.count,
 contentSizes.min,
 contentSizes.max
 ))

 streamingContext.start()
 streamingContext.awaitTermination()
 }
 }
 --




Re: percentil UDAF in spark 1.2.0

2015-02-19 Thread Mark Hamstra
Already fixed: https://github.com/apache/spark/pull/2802


On Thu, Feb 19, 2015 at 3:17 PM, Mohnish Kodnani mohnish.kodn...@gmail.com
wrote:

 Hi,
 I am trying to use percentile and getting the following error. I am using
 spark 1.2.0. Does UDAF percentile exist in that code line and do i have to
 do something to get this to work.

 java.util.NoSuchElementException: key not found: percentile
 at scala.collection.MapLike$class.default(MapLike.scala:228)
 at scala.collection.AbstractMap.default(Map.scala:58)
 at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
 at
 org.apache.spark.sql.catalyst.analysis.SimpleFunctionRegistry.lookupFunction(FunctionRegistry.scala:53)
 at
 org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$10$$anonfun$applyOrElse$2.applyOrElse(Analyzer.scala:220)
 at
 org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$10$$anonfun$applyOrElse$2.applyOrElse(Analyzer.scala:218)
 at
 org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:144)
 at
 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:162)


 Thanks
 mohnish




Re: percentil UDAF in spark 1.2.0

2015-02-19 Thread Mark Hamstra
The percentile UDAF came in across a couple of PRs.   Commit
f33d55046427b8594fd19bda5fd2214eeeab1a95 reflects the most recent work, I
believe.  It will be part of the 1.3.0 release very soon:
http://apache-spark-developers-list.1001551.n3.nabble.com/VOTE-Release-Apache-Spark-1-3-0-RC1-tt10658.html

On Thu, Feb 19, 2015 at 3:39 PM, Mohnish Kodnani mohnish.kodn...@gmail.com
wrote:

 Isnt that PR about being able to pass in an array to percentile function.
 If I understand this error correctly, its not able to find the function
 percentile itself.
 Also, if I am incorrect and that PR fixes it, is it available in a release
 ?


 On Thu, Feb 19, 2015 at 3:27 PM, Mark Hamstra m...@clearstorydata.com
 wrote:

 Already fixed: https://github.com/apache/spark/pull/2802


 On Thu, Feb 19, 2015 at 3:17 PM, Mohnish Kodnani 
 mohnish.kodn...@gmail.com wrote:

 Hi,
 I am trying to use percentile and getting the following error. I am
 using spark 1.2.0. Does UDAF percentile exist in that code line and do i
 have to do something to get this to work.

 java.util.NoSuchElementException: key not found: percentile
 at scala.collection.MapLike$class.default(MapLike.scala:228)
 at scala.collection.AbstractMap.default(Map.scala:58)
 at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
 at
 org.apache.spark.sql.catalyst.analysis.SimpleFunctionRegistry.lookupFunction(FunctionRegistry.scala:53)
 at
 org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$10$$anonfun$applyOrElse$2.applyOrElse(Analyzer.scala:220)
 at
 org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$10$$anonfun$applyOrElse$2.applyOrElse(Analyzer.scala:218)
 at
 org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:144)
 at
 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:162)


 Thanks
 mohnish






stack map functions in a loop (pyspark)

2015-02-19 Thread jamborta
Hi all,

I think I have run into an issue on the lazy evaluation of variables in
pyspark, I have to following

functions = [func1, func2, func3]

for counter in range(len(functions)):
data = data.map(lambda value: [functions[counter](value)])

it looks like that the counter is evaluated when the RDD is computed, so it
fills in all the three mappers with the last value of it. Is there any way
to get it forced to be evaluated at the time? (I am aware that I could run
persist it after each step, which sounds a bit of a waste)

thanks,



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/stack-map-functions-in-a-loop-pyspark-tp21722.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: RDD Partition number

2015-02-19 Thread Ted Yu
What file system are you using ?

If you use hdfs, the documentation you cited is pretty clear on how
partitions are determined.

bq. file X replicated on 4 machines

I don't think replication factor plays a role w.r.t. partitions.

On Thu, Feb 19, 2015 at 8:05 AM, Alessandro Lulli lu...@di.unipi.it wrote:

 Hi All,

 Could you please help me understanding how Spark defines the number of
 partitions of the RDDs if not specified?

 I found the following in the documentation for file loaded from HDFS:
 *The textFile method also takes an optional second argument for
 controlling the number of partitions of the file. By default, Spark creates
 one partition for each block of the file (blocks being 64MB by default in
 HDFS), but you can also ask for a higher number of partitions by passing a
 larger value. Note that you cannot have fewer partitions than blocks*

 What is the rule for file loaded from the file systems?
 For instance, i have a file X replicated on 4 machines. If i load the file
 X in a RDD how many partitions are defined and why?

 Thanks for your help on this
 Alessandro



Implicit ALS with multiple features

2015-02-19 Thread poiuytrez
Hello, 

I would like to use the spark MLlib recommendation filtering library. My
goal will be to predict what a user would like to buy based on what he
bought before. 

I read on the spark documentation that Spark supports implicit feedback.
However there is not example for this application. Would implicit feedback
works on my business case and how? 

Can ALS accept multiple parameters. Currently I have :
(userId,productId,nbPurchased)
I would like to another parameter:
(userId,productId,nbPurchased,frenquency)

Is it possible with ALS? 

Thank you for your reply



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Implicit-ALS-with-multiple-features-tp21723.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Filtering keys after map+combine

2015-02-19 Thread Daniel Siegmann
I'm not sure what your use case is, but perhaps you could use mapPartitions
to reduce across the individual partitions and apply your filtering. Then
you can finish with a reduceByKey.

On Thu, Feb 19, 2015 at 9:21 AM, Debasish Das debasish.da...@gmail.com
wrote:

 Hi,

 Before I send out the keys for network shuffle, in reduceByKey after map +
 combine are done, I would like to filter the keys based on some threshold...

 Is there a way to get the key, value after map+combine stages so that I
 can run a filter on the keys ?

 Thanks.
 Deb




-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

54 W 40th St, New York, NY 10018
E: daniel.siegm...@velos.io W: www.velos.io


Re: Spark job fails on cluster but works fine on a single machine

2015-02-19 Thread Pavel Velikhov

 On Feb 19, 2015, at 7:29 PM, Pavel Velikhov pavel.velik...@icloud.com wrote:
 
 I have a simple Spark job that goes out to Cassandra, runs a pipe and stores 
 results:
 
 val sc = new SparkContext(conf)
 val rdd = sc.cassandraTable(“keyspace, “table)
   .map(r = r.getInt(“column) + \t + 
 write(get_lemmas(r.getString(tags
   .pipe(python3 /tmp/scripts_and_models/scripts/run.py)
   .map(r = convertStr(r) )
   .coalesce(1,true)
   .saveAsTextFile(/tmp/pavel/CassandraPipeTest.txt)
   //.saveToCassandra(“keyspace, “table, SomeColumns(“id”,data”))
 
 When run on a single machine, everything is fine if I save to an hdfs file or 
 save to Cassandra.
 When run in cluster neither works:
 
  - When saving to file, I get an exception: User class threw exception: 
 Output directory hdfs://hadoop01:54310/tmp/pavel/CassandraPipeTest.txt 
 hdfs://hadoop01:54310/tmp/pavel/CassandraPipeTest.txt already exists
  - When saving to Cassandra, only 4 rows are updated with empty data (I test 
 on a 4-machine Spark cluster)
 
 Any hints on how to debug this and where the problem could be?
 
 - I delete the hdfs file before running
 - Would really like the output to hdfs to work, so I can debug
 - Then it would be nice to save to Cassandra

- Btw I *don’t* want to use .coalesce(1,true) in the future as well. 

Spark job fails on cluster but works fine on a single machine

2015-02-19 Thread Pavel Velikhov
I have a simple Spark job that goes out to Cassandra, runs a pipe and stores 
results:

val sc = new SparkContext(conf)
val rdd = sc.cassandraTable(“keyspace, “table)
  .map(r = r.getInt(“column) + \t + 
write(get_lemmas(r.getString(tags
  .pipe(python3 /tmp/scripts_and_models/scripts/run.py)
  .map(r = convertStr(r) )
  .coalesce(1,true)
  .saveAsTextFile(/tmp/pavel/CassandraPipeTest.txt)
  //.saveToCassandra(“keyspace, “table, SomeColumns(“id”,data”))

When run on a single machine, everything is fine if I save to an hdfs file or 
save to Cassandra.
When run in cluster neither works:

 - When saving to file, I get an exception: User class threw exception: Output 
directory hdfs://hadoop01:54310/tmp/pavel/CassandraPipeTest.txt already exists
 - When saving to Cassandra, only 4 rows are updated with empty data (I test on 
a 4-machine Spark cluster)

Any hints on how to debug this and where the problem could be?

- I delete the hdfs file before running
- Would really like the output to hdfs to work, so I can debug
- Then it would be nice to save to Cassandra

Re: Unzipping large files and 2GB partition size.

2015-02-19 Thread Joe Wass
Thanks for your detailed reply Imran. I'm writing this in Clojure (using
Flambo which uses the Java API) but I don't think that's relevant. So
here's the pseudocode (sorry I've not written Scala for a long time):

val rawData = sc.hadoopFile(/dir/to/gzfiles) // NB multiple files.
val parsedFiles = rawData.map(parseFunction)   // can return nil on failure
val filtered = parsedFiles.filter(notNil)
val partitioned = filtered.repartition(100) // guessed number
val persisted = partitioned.persist(StorageLevels.DISK_ONLY)

val resultA = stuffA(persisted)
val resultB = stuffB(persisted)
val resultC = stuffC(persisted)

So, I think I'm already doing what you suggested. I would have assumed that
partition size would be («size of expanded file» / «number of partitions»).
In this case, 100 (which I picked out of the air).

I wonder whether the «size of expanded file» is actually the size of all
concatenated input files (probably about 800 GB)? In that case should I
multiply it by the number of files? Or perhaps I'm barking up completely
the wrong tree.

Joe




On 19 February 2015 at 14:44, Imran Rashid iras...@cloudera.com wrote:

 Hi Joe,

 The issue is not that you have input partitions that are bigger than 2GB
 -- its just that they are getting cached.  You can see in the stack trace,
 the problem is when you try to read data out of the DiskStore:

 org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132)

 Also, just because you see this:

 15/02/18 20:35:52 INFO scheduler.DAGScheduler: Submitting 100 missing
 tasks from Stage 1 (MappedRDD[17] at mapToPair at
 NativeMethodAccessorImpl.java:-2)

 it doesn't *necessarily* mean that this is coming from your map.  It can
 be pretty confusing how your operations on RDDs get turned into stages, it
 could be a lot more than just your map.  and actually, it might not even be
 your map at all -- some of the other operations you invoke call map
 underneath the covers.  So its hard to say what is going on here w/ out
 seeing more code.  Anyway, maybe you've already considered all this (you
 did mention the lazy execution of the DAG), but I wanted to make sure.  it
 might help to use rdd.setName() and also to look at rdd.toDebugString.

 As far as what you can do about this -- it could be as simple as moving
 your rdd.persist() to after you have compressed and repartitioned your
 data.  eg., I'm blindly guessing you have something like this:

 val rawData = sc.hadoopFile(...)
 rawData.persist(DISK)
 rawData.count()
 val compressedData = rawData.map{...}
 val repartitionedData = compressedData.repartition(N)
 ...

 change it to something like:

 val rawData = sc.hadoopFile(...)
 val compressedData = rawData.map{...}
 val repartitionedData = compressedData.repartition(N)
 repartitionedData.persist(DISK)
 repartitionedData.count()
 ...


 The point is, you avoid caching any data until you have ensured that the
 partitions are small.  You might have big partitions before that in
 rawData, but that is OK.

 Imran


 On Thu, Feb 19, 2015 at 4:43 AM, Joe Wass jw...@crossref.org wrote:

 Thanks for your reply Sean.

 Looks like it's happening in a map:

 15/02/18 20:35:52 INFO scheduler.DAGScheduler: Submitting 100 missing
 tasks from Stage 1 (MappedRDD[17] at mapToPair at
 NativeMethodAccessorImpl.java:-2)

 That's my initial 'parse' stage, done before repartitioning. It reduces
 the data size significantly so I thought it would be sensible to do before
 repartitioning, which involves moving lots of data around. That might be a
 stupid idea in hindsight!

 So the obvious thing to try would be to try repartitioning before the map
 as the first transformation. I would have done that if I could be sure that
 it would succeed or fail quickly.

 I'm not entirely clear about the lazy execution of transformations in
 DAG. It could be that the error is manifesting during the mapToPair, but
 caused by the earlier read from text file stage.

 Thanks for pointers to those compression formats. I'll give them a go
 (although it's not trivial to re-encode 200 GB of data on S3, so if I can
 get this working reasonably with gzip I'd like to).

 Any advice about whether this error can be worked round with an early
 partition?

 Cheers

 Joe


 On 19 February 2015 at 09:51, Sean Owen so...@cloudera.com wrote:

 gzip and zip are not splittable compression formats; bzip and lzo are.
 Ideally, use a splittable compression format.

 Repartitioning is not a great solution since it means a shuffle,
 typically.

 This is not necessarily related to how big your partitions are. The
 question is, when does this happen? what operation?

 On Thu, Feb 19, 2015 at 9:35 AM, Joe Wass jw...@crossref.org wrote:
  On the advice of some recent discussions on this list, I thought I
 would try
  and consume gz files directly. I'm reading them, doing a preliminary
 map,
  then repartitioning, then doing normal spark things.
 
  As I understand it, zip files aren't readable in partitions because of
 the
  format, 

RDD Partition number

2015-02-19 Thread Alessandro Lulli
Hi All,

Could you please help me understanding how Spark defines the number of
partitions of the RDDs if not specified?

I found the following in the documentation for file loaded from HDFS:
*The textFile method also takes an optional second argument for controlling
the number of partitions of the file. By default, Spark creates one
partition for each block of the file (blocks being 64MB by default in
HDFS), but you can also ask for a higher number of partitions by passing a
larger value. Note that you cannot have fewer partitions than blocks*

What is the rule for file loaded from the file systems?
For instance, i have a file X replicated on 4 machines. If i load the file
X in a RDD how many partitions are defined and why?

Thanks for your help on this
Alessandro


Re: Implicit ALS with multiple features

2015-02-19 Thread Sean Owen
It's shown at 
http://spark.apache.org/docs/latest/mllib-collaborative-filtering.html

It's really not different to use. It's suitable when you have
count-like data rather than rating-like data. That's what you have
here.

I am not sure what you mean that you want to add frequency too but no
the model is strictly (user,item,strength). However you can merge many
signals into a 'strength' score.

On Thu, Feb 19, 2015 at 4:40 PM, poiuytrez guilla...@databerries.com wrote:
 Hello,

 I would like to use the spark MLlib recommendation filtering library. My
 goal will be to predict what a user would like to buy based on what he
 bought before.

 I read on the spark documentation that Spark supports implicit feedback.
 However there is not example for this application. Would implicit feedback
 works on my business case and how?

 Can ALS accept multiple parameters. Currently I have :
 (userId,productId,nbPurchased)
 I would like to another parameter:
 (userId,productId,nbPurchased,frenquency)

 Is it possible with ALS?

 Thank you for your reply



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Implicit-ALS-with-multiple-features-tp21723.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: RDD Partition number

2015-02-19 Thread Ilya Ganelin
By default you will have (fileSize in Mb / 64) partitions. You can also set
the number of partitions when you read in a file with sc.textFile as an
optional second parameter.
On Thu, Feb 19, 2015 at 8:07 AM Alessandro Lulli lu...@di.unipi.it wrote:

 Hi All,

 Could you please help me understanding how Spark defines the number of
 partitions of the RDDs if not specified?

 I found the following in the documentation for file loaded from HDFS:
 *The textFile method also takes an optional second argument for
 controlling the number of partitions of the file. By default, Spark creates
 one partition for each block of the file (blocks being 64MB by default in
 HDFS), but you can also ask for a higher number of partitions by passing a
 larger value. Note that you cannot have fewer partitions than blocks*

 What is the rule for file loaded from the file systems?
 For instance, i have a file X replicated on 4 machines. If i load the file
 X in a RDD how many partitions are defined and why?

 Thanks for your help on this
 Alessandro



RE: Spark job fails on cluster but works fine on a single machine

2015-02-19 Thread Ganelin, Ilya
When writing to hdfs Spark will not overwrite existing files or directories. 
You must either manually delete these or use Java's Hadoop FileSystem class to 
remove them.



Sent with Good (www.good.com)


-Original Message-
From: Pavel Velikhov [pavel.velik...@gmail.commailto:pavel.velik...@gmail.com]
Sent: Thursday, February 19, 2015 11:32 AM Eastern Standard Time
To: user@spark.apache.org
Subject: Spark job fails on cluster but works fine on a single machine

I have a simple Spark job that goes out to Cassandra, runs a pipe and stores 
results:

val sc = new SparkContext(conf)
val rdd = sc.cassandraTable(“keyspace, “table)
  .map(r = r.getInt(“column) + \t + 
write(get_lemmas(r.getString(tags
  .pipe(python3 /tmp/scripts_and_models/scripts/run.py)
  .map(r = convertStr(r) )
  .coalesce(1,true)
  .saveAsTextFile(/tmp/pavel/CassandraPipeTest.txt)
  //.saveToCassandra(“keyspace, “table, SomeColumns(“id”,data”))

When run on a single machine, everything is fine if I save to an hdfs file or 
save to Cassandra.
When run in cluster neither works:

 - When saving to file, I get an exception: User class threw exception: Output 
directory hdfs://hadoop01:54310/tmp/pavel/CassandraPipeTest.txt already exists
 - When saving to Cassandra, only 4 rows are updated with empty data (I test on 
a 4-machine Spark cluster)

Any hints on how to debug this and where the problem could be?

- I delete the hdfs file before running
- Would really like the output to hdfs to work, so I can debug
- Then it would be nice to save to Cassandra


The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed.  If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Re: RDD Partition number

2015-02-19 Thread Ted Yu
bq. *blocks being 64MB by default in HDFS*


*In hadoop 2.1+, default block size has been increased.*
See https://issues.apache.org/jira/browse/HDFS-4053

Cheers

On Thu, Feb 19, 2015 at 8:32 AM, Ted Yu yuzhih...@gmail.com wrote:

 What file system are you using ?

 If you use hdfs, the documentation you cited is pretty clear on how
 partitions are determined.

 bq. file X replicated on 4 machines

 I don't think replication factor plays a role w.r.t. partitions.

 On Thu, Feb 19, 2015 at 8:05 AM, Alessandro Lulli lu...@di.unipi.it
 wrote:

 Hi All,

 Could you please help me understanding how Spark defines the number of
 partitions of the RDDs if not specified?

 I found the following in the documentation for file loaded from HDFS:
 *The textFile method also takes an optional second argument for
 controlling the number of partitions of the file. By default, Spark creates
 one partition for each block of the file (blocks being 64MB by default in
 HDFS), but you can also ask for a higher number of partitions by passing a
 larger value. Note that you cannot have fewer partitions than blocks*

 What is the rule for file loaded from the file systems?
 For instance, i have a file X replicated on 4 machines. If i load the
 file X in a RDD how many partitions are defined and why?

 Thanks for your help on this
 Alessandro





RE: RDD Partition number

2015-02-19 Thread Ganelin, Ilya
As Ted Yu points out, default block size is 128MB as of Hadoop 2.1.



Sent with Good (www.good.com)


-Original Message-
From: Ilya Ganelin [ilgan...@gmail.commailto:ilgan...@gmail.com]
Sent: Thursday, February 19, 2015 12:13 PM Eastern Standard Time
To: Alessandro Lulli; user@spark.apache.org
Cc: Massimiliano Bertolucci
Subject: Re: RDD Partition number

By default you will have (fileSize in Mb / 64) partitions. You can also set the 
number of partitions when you read in a file with sc.textFile as an optional 
second parameter.
On Thu, Feb 19, 2015 at 8:07 AM Alessandro Lulli 
lu...@di.unipi.itmailto:lu...@di.unipi.it wrote:
Hi All,

Could you please help me understanding how Spark defines the number of 
partitions of the RDDs if not specified?

I found the following in the documentation for file loaded from HDFS:
The textFile method also takes an optional second argument for controlling the 
number of partitions of the file. By default, Spark creates one partition for 
each block of the file (blocks being 64MB by default in HDFS), but you can also 
ask for a higher number of partitions by passing a larger value. Note that you 
cannot have fewer partitions than blocks

What is the rule for file loaded from the file systems?
For instance, i have a file X replicated on 4 machines. If i load the file X in 
a RDD how many partitions are defined and why?

Thanks for your help on this
Alessandro


The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed.  If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Filtering keys after map+combine

2015-02-19 Thread Debasish Das
Hi,

Before I send out the keys for network shuffle, in reduceByKey after map +
combine are done, I would like to filter the keys based on some threshold...

Is there a way to get the key, value after map+combine stages so that I can
run a filter on the keys ?

Thanks.
Deb


Re: Unzipping large files and 2GB partition size.

2015-02-19 Thread Imran Rashid
Hi Joe,

The issue is not that you have input partitions that are bigger than 2GB --
its just that they are getting cached.  You can see in the stack trace, the
problem is when you try to read data out of the DiskStore:

org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132)

Also, just because you see this:

15/02/18 20:35:52 INFO scheduler.DAGScheduler: Submitting 100 missing tasks
from Stage 1 (MappedRDD[17] at mapToPair at NativeMethodAccessorImpl.java:
-2)

it doesn't *necessarily* mean that this is coming from your map.  It can be
pretty confusing how your operations on RDDs get turned into stages, it
could be a lot more than just your map.  and actually, it might not even be
your map at all -- some of the other operations you invoke call map
underneath the covers.  So its hard to say what is going on here w/ out
seeing more code.  Anyway, maybe you've already considered all this (you
did mention the lazy execution of the DAG), but I wanted to make sure.  it
might help to use rdd.setName() and also to look at rdd.toDebugString.

As far as what you can do about this -- it could be as simple as moving
your rdd.persist() to after you have compressed and repartitioned your
data.  eg., I'm blindly guessing you have something like this:

val rawData = sc.hadoopFile(...)
rawData.persist(DISK)
rawData.count()
val compressedData = rawData.map{...}
val repartitionedData = compressedData.repartition(N)
...

change it to something like:

val rawData = sc.hadoopFile(...)
val compressedData = rawData.map{...}
val repartitionedData = compressedData.repartition(N)
repartitionedData.persist(DISK)
repartitionedData.count()
...


The point is, you avoid caching any data until you have ensured that the
partitions are small.  You might have big partitions before that in
rawData, but that is OK.

Imran


On Thu, Feb 19, 2015 at 4:43 AM, Joe Wass jw...@crossref.org wrote:

 Thanks for your reply Sean.

 Looks like it's happening in a map:

 15/02/18 20:35:52 INFO scheduler.DAGScheduler: Submitting 100 missing
 tasks from Stage 1 (MappedRDD[17] at mapToPair at
 NativeMethodAccessorImpl.java:-2)

 That's my initial 'parse' stage, done before repartitioning. It reduces
 the data size significantly so I thought it would be sensible to do before
 repartitioning, which involves moving lots of data around. That might be a
 stupid idea in hindsight!

 So the obvious thing to try would be to try repartitioning before the map
 as the first transformation. I would have done that if I could be sure that
 it would succeed or fail quickly.

 I'm not entirely clear about the lazy execution of transformations in DAG.
 It could be that the error is manifesting during the mapToPair, but caused
 by the earlier read from text file stage.

 Thanks for pointers to those compression formats. I'll give them a go
 (although it's not trivial to re-encode 200 GB of data on S3, so if I can
 get this working reasonably with gzip I'd like to).

 Any advice about whether this error can be worked round with an early
 partition?

 Cheers

 Joe


 On 19 February 2015 at 09:51, Sean Owen so...@cloudera.com wrote:

 gzip and zip are not splittable compression formats; bzip and lzo are.
 Ideally, use a splittable compression format.

 Repartitioning is not a great solution since it means a shuffle,
 typically.

 This is not necessarily related to how big your partitions are. The
 question is, when does this happen? what operation?

 On Thu, Feb 19, 2015 at 9:35 AM, Joe Wass jw...@crossref.org wrote:
  On the advice of some recent discussions on this list, I thought I
 would try
  and consume gz files directly. I'm reading them, doing a preliminary
 map,
  then repartitioning, then doing normal spark things.
 
  As I understand it, zip files aren't readable in partitions because of
 the
  format, so I thought that repartitioning would be the next best thing
 for
  parallelism. I have about 200 files, some about 1GB compressed and some
 over
  2GB uncompressed.
 
  I'm hitting the 2GB maximum partition size. It's been discussed on this
 list
  (topic: 2GB limit for partitions?, tickets SPARK-1476 and SPARK-1391).
  Stack trace at the end. This happened at 10 hours in (probably when it
 saw
  its first file). I can't just re-run it quickly!
 
  Does anyone have any advice? Might I solve this by re-partitioning as
 the
  first step after reading the file(s)? Or is it effectively impossible to
  read a gz file that expands to over 2GB? Does anyone have any experience
  with this?
 
  Thanks in advance
 
  Joe
 
  Stack trace:
 
  Exception in thread main 15/02/18 20:44:25 INFO
 scheduler.TaskSetManager:
  Lost task 5.3 in stage 1.0 (TID 283) on executor:
  java.lang.IllegalArgumentException (Size exceeds Integer.MAX_VALUE)
  [duplicate 6]
  org.apache.spark.SparkException: Job aborted due to stage failure: Task
 2 in
  stage 1.0 failed 4 times, most recent failure: Lost task 2.3 in stage
 1.0:
  java.lang.IllegalArgumentException: Size exceeds 

Re: SparkSQL + Tableau Connector

2015-02-19 Thread Todd Nist
Hi Silvio,

I got this working today using your suggestion with the Initial SQL and a
Custom Query.  See here for details:

http://stackoverflow.com/questions/28403664/connect-to-existing-hive-in-intellij-using-sbt-as-build/28608608#28608608

It is not ideal as I need to write a custom query, but does work for now.
I also have it working by doing a SaveAsTable on the ingested data which
stores the reference into the metastore for access via the thrift server.

Thanks for the help.

-Todd

On Wed, Feb 11, 2015 at 8:41 PM, Silvio Fiorito 
silvio.fior...@granturing.com wrote:

   Hey Todd,

  I don’t have an app to test against the thrift server, are you able to
 define custom SQL without using Tableau’s schema query? I guess it’s not
 possible to just use SparkSQL temp tables, you may have to use permanent
 Hive tables that are actually in the metastore so Tableau can discover them
 in the schema. In that case you will either have to generate the Hive
 tables externally from Spark or use Spark to process the data and save them
 using a HiveContext.


From: Todd Nist
 Date: Wednesday, February 11, 2015 at 7:53 PM
 To: Andrew Lee
 Cc: Arush Kharbanda, user@spark.apache.org
 Subject: Re: SparkSQL + Tableau Connector

   First sorry for the long post.  So back to tableau and Spark SQL, I'm
 still missing something.

  TL;DR

  To get the Spark SQL Temp table associated with the metastore are there
 additional steps required beyond doing the below?

 Initial SQL on connection:

  create temporary table test
 using org.apache.spark.sql.json
 options (path '/data/json/*');

 cache table test;

  I feel like I'm missing a step of associating the Spark SQL table with
 the metastore, do I need to actually save it in some fashion?   I'm trying
 to avoid saving to hive if possible.

  *Details:*

  I configured the hive-site.xml and placed it in the $SPARK_HOME/conf.
 It looks like this, thanks Andrew and Arush for the assistance:

  ?xml version=1.0?
 ?xml-stylesheet type=text/xsl href=configuration.xsl?

  configuration
   property
 namehive.semantic.analyzer.factory.impl/name
 valueorg.apache.hcatalog.cli.HCatSemanticAnalyzerFactory/value
   /property

property
 namehive.metastore.sasl.enabled/name
 valuefalse/value
   /property

property
 namehive.server2.authentication/name
 valueNONE/value
   /property

property
 namehive.server2.enable.doAs/name
 valuetrue/value
   /property

!--
   property
 namehive.metastore.uris/name
 valuethrift://localhost:9083/value
 descriptionIP address (or fully-qualified domain name) and port of
 the metastore host/description
   /property
   --

property
 namehive.warehouse.subdir.inherit.perms/name
 valuetrue/value
   /property

property
 namehive.metastore.schema.verification/name
 valuefalse/value
   /property

property
 namejavax.jdo.option.ConnectionURL/name

 valuejdbc:mysql://localhost:3306/metastore_db?createDatabaseIfNotExist=true/value
 descriptionmetadata is stored in a MySQL server/description
   /property

property
 namejavax.jdo.option.ConnectionDriverName/name
 valuecom.mysql.jdbc.Driver/value
 descriptionMySQL JDBC driver class/description
   /property

property
 namejavax.jdo.option.ConnectionUserName/name
 valuehiveuser/value
   /property

property
 namejavax.jdo.option.ConnectionPassword/name
 valuehiveuser/value
   /property

  /configuration

 When I start the server it looks fine:

  $ ./sbin/start-thriftserver.sh --hiveconf
 hive.server2.thrift.port=10001 --hiveconf hive.server2.thrift.bind.host
 radtech.io --master spark://radtech.io:7077 --driver-class-path
 /usr/local/spark/lib/mysql-connector-java-5.1.34-bin.jar
 starting org.apache.spark.sql.hive.thriftserver.HiveThriftServer2, logging
 to
 /usr/local/spark-1.2.1-bin-hadoop2.4/logs/spark-tnist-org.apache.spark.sql.hive.thriftserver.HiveThriftServer2-1-radtech.io.out
 radtech:spark tnist$ tail -f
 logs/spark-tnist-org.apache.spark.sql.hive.thriftserver.HiveThriftServer2-1-radtech.io.out
 15/02/11 19:15:24 INFO SparkDeploySchedulerBackend: Granted executor ID
 app-20150211191524-0008/1 on hostPort 192.168.1.2:50851 with 2 cores,
 512.0 MB RAM
 15/02/11 19:15:24 INFO AppClient$ClientActor: Executor updated:
 app-20150211191524-0008/0 is now LOADING
 15/02/11 19:15:24 INFO AppClient$ClientActor: Executor updated:
 app-20150211191524-0008/1 is now LOADING
 15/02/11 19:15:24 INFO AppClient$ClientActor: Executor updated:
 app-20150211191524-0008/0 is now RUNNING
 15/02/11 19:15:24 INFO AppClient$ClientActor: Executor updated:
 app-20150211191524-0008/1 is now RUNNING
 15/02/11 19:15:24 INFO NettyBlockTransferService: Server created on 50938
 15/02/11 19:15:24 INFO BlockManagerMaster: Trying to register BlockManager
 15/02/11 19:15:24 INFO BlockManagerMasterActor: Registering block manager
 192.168.1.2:50938 with 265.1 MB RAM, BlockManagerId(driver,
 192.168.1.2, 50938)
 

bulk writing to HDFS in Spark Streaming?

2015-02-19 Thread Chico Qi
Hi all,

In Spark Streaming I want use the Dstream.saveAsTextFiles by bulk writing
because of the normal saveAsTextFiles cannot during the batch interval of
setting.
May be a common pool of writing or another assigned worker for bulk writing?

Thanks!

B/R
Jichao


Re: JdbcRDD, ClassCastException with scala.Function0

2015-02-19 Thread Dmitry Goldenberg
Yup, I did see that. Good point though, Cody. The mismatch was happening
for me when I was trying to get the 'new JdbcRDD' approach going. Once I
switched to the 'create' method things are working just fine. Was just able
to refactor the 'get connection' logic into a 'DbConnection implements
JdbcRDD.ConnectionFactory' and my 'map row' class is still 'MapRow
implements org.apache.spark.api.java.function.FunctionResultSet, Row'.

This works fine and makes the driver program tighter. Of course, my next
question is, how to work with the lower and upper bound parameters. As in,
what if I don't know what the min and max ID values are and just want to
extract all data from the table, what should the params be, if that's even
supported. And furthermore, what if the primary key on the table is not
numeric? or if there's no primary key altogether?

The method works fine with lowerBound=0 and upperBound=100, for
example. But doesn't seem to have a way to say, 'no upper bound' (-1 didn't
work).

On Wed, Feb 18, 2015 at 11:59 PM, Cody Koeninger c...@koeninger.org wrote:

 Look at the definition of JdbcRDD.create:

   def create[T](

   sc: JavaSparkContext,

   connectionFactory: ConnectionFactory,

   sql: String,

   lowerBound: Long,

   upperBound: Long,

   numPartitions: Int,

   mapRow: JFunction[ResultSet, T]): JavaRDD[T] = {


 JFunction here is the interface org.apache.spark.api.java.function.Function,
 not scala Function0

 LIkewise, ConnectionFactory is an interface defined inside JdbcRDD, not
 scala Function0

 On Wed, Feb 18, 2015 at 4:50 PM, Dmitry Goldenberg 
 dgoldenberg...@gmail.com wrote:

 That's exactly what I was doing. However, I ran into runtime issues with
 doing that. For instance, I had a

   public class DbConnection extends AbstractFunction0Connection
 implements Serializable

 I got a runtime error from Spark complaining that DbConnection wasn't an
 instance of scala.Function0.

 I also had a

   public class MapRow extends
 scala.runtime.AbstractFunction1java.sql.ResultSet, Row implements
 Serializable

 with which I seemed to have more luck.

 On Wed, Feb 18, 2015 at 5:32 PM, Cody Koeninger c...@koeninger.org
 wrote:

 Cant you implement the

 org.apache.spark.api.java.function.Function

 interface and pass an instance of that to JdbcRDD.create ?

 On Wed, Feb 18, 2015 at 3:48 PM, Dmitry Goldenberg 
 dgoldenberg...@gmail.com wrote:

 Cody, you were right, I had a copy and paste snag where I ended up with
 a vanilla SparkContext rather than a Java one.  I also had to *not* use my
 function subclasses, rather just use anonymous inner classes for the
 Function stuff and that got things working. I'm fully following
 the JdbcRDD.create approach from JavaJdbcRDDSuite.java basically verbatim.

 Is there a clean way to refactor out the custom Function classes such
 as the one for getting a db connection or mapping ResultSet data to your
 own POJO's rather than doing it all inline?


 On Wed, Feb 18, 2015 at 1:52 PM, Cody Koeninger c...@koeninger.org
 wrote:

 Is sc there a SparkContext or a JavaSparkContext?  The compilation
 error seems to indicate the former, but JdbcRDD.create expects the latter

 On Wed, Feb 18, 2015 at 12:30 PM, Dmitry Goldenberg 
 dgoldenberg...@gmail.com wrote:

 I have tried that as well, I get a compile error --

 [ERROR] ...SparkProto.java:[105,39] error: no suitable method found
 for create(SparkContext,anonymous
 ConnectionFactory,String,int,int,int,anonymous
 FunctionResultSet,Integer)

 The code is a copy and paste:

 JavaRDDInteger jdbcRDD = JdbcRDD.create(
   sc,
   new JdbcRDD.ConnectionFactory() {
 public Connection getConnection() throws SQLException {
   return
 DriverManager.getConnection(jdbc:derby:target/JavaJdbcRDDSuiteDb);
 }
   },
   SELECT DATA FROM FOO WHERE ? = ID AND ID = ?,
   1, 100, 1,
   new FunctionResultSet, Integer() {
 public Integer call(ResultSet r) throws Exception {
   return r.getInt(1);
 }
   }
 );

 The other thing I've tried was to define a static class locally for
 GetConnection and use the JdbcCreate constructor. This got around the
 compile issues but blew up at runtime with NoClassDefFoundError:
 scala/runtime/AbstractFunction0 !

 JdbcRDDRow jdbcRDD = new JdbcRDDRow(
 sc,
 (AbstractFunction0Connection) new DbConn(), // had to cast or a
 compile error
 SQL_QUERY,
 0L,
 1000L,
 10,
 new MapRow(),
 ROW_CLASS_TAG);
 // DbConn is defined as public static class DbConn extends
 AbstractFunction0Connection implements Serializable

 On Wed, Feb 18, 2015 at 1:20 PM, Cody Koeninger c...@koeninger.org
 wrote:

 That test I linked


 https://github.com/apache/spark/blob/v1.2.1/core/src/test/java/org/apache/spark/JavaJdbcRDDSuite.java#L90

 is calling a static method JdbcRDD.create, not new JdbcRDD.  Is that
 what you tried doing?

 On Wed, Feb 18, 2015 at 12:00 PM, Dmitry 

Re: Unzipping large files and 2GB partition size.

2015-02-19 Thread Imran Rashid
oh, I think you are just choosing a number that is too small for your
number of partitions.  All of the data in /dir/to/gzfiles is going to be
sucked into one RDD, with the data divided into partitions.  So if you're
parsing 200 files, each about 2 GB, and then repartitioning down to 100
partitions, you would expect 4 GB per partition.  Though you're filtering
the data down some, there may also be some bloat from from your parsed
objects.  Also if you're not using kryo for serialization, I'd strongly
recommend that over the default serialization, and try to register all your
classes.

I think you can get some information about how much data is in your RDDs
from the UI -- but it might depend on what version you are running of
spark, plus I think the info isn't saved on failed stages, so you might
just need to monitor it in the UI as its happening (I am not 100% sure
about that ...)

So I'd suggest (a) using a lot more partitions (maybe 1k, given your data
size) (b) turn on kryo if you haven't already.



On Thu, Feb 19, 2015 at 9:36 AM, Joe Wass jw...@crossref.org wrote:

 Thanks for your detailed reply Imran. I'm writing this in Clojure (using
 Flambo which uses the Java API) but I don't think that's relevant. So
 here's the pseudocode (sorry I've not written Scala for a long time):

 val rawData = sc.hadoopFile(/dir/to/gzfiles) // NB multiple files.
 val parsedFiles = rawData.map(parseFunction)   // can return nil on failure
 val filtered = parsedFiles.filter(notNil)
 val partitioned = filtered.repartition(100) // guessed number
 val persisted = partitioned.persist(StorageLevels.DISK_ONLY)

 val resultA = stuffA(persisted)
 val resultB = stuffB(persisted)
 val resultC = stuffC(persisted)

 So, I think I'm already doing what you suggested. I would have assumed
 that partition size would be («size of expanded file» / «number of
 partitions»). In this case, 100 (which I picked out of the air).

 I wonder whether the «size of expanded file» is actually the size of all
 concatenated input files (probably about 800 GB)? In that case should I
 multiply it by the number of files? Or perhaps I'm barking up completely
 the wrong tree.

 Joe




 On 19 February 2015 at 14:44, Imran Rashid iras...@cloudera.com wrote:

 Hi Joe,

 The issue is not that you have input partitions that are bigger than 2GB
 -- its just that they are getting cached.  You can see in the stack trace,
 the problem is when you try to read data out of the DiskStore:

 org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132)

 Also, just because you see this:

 15/02/18 20:35:52 INFO scheduler.DAGScheduler: Submitting 100 missing
 tasks from Stage 1 (MappedRDD[17] at mapToPair at
 NativeMethodAccessorImpl.java:-2)

 it doesn't *necessarily* mean that this is coming from your map.  It can
 be pretty confusing how your operations on RDDs get turned into stages, it
 could be a lot more than just your map.  and actually, it might not even be
 your map at all -- some of the other operations you invoke call map
 underneath the covers.  So its hard to say what is going on here w/ out
 seeing more code.  Anyway, maybe you've already considered all this (you
 did mention the lazy execution of the DAG), but I wanted to make sure.  it
 might help to use rdd.setName() and also to look at rdd.toDebugString.

 As far as what you can do about this -- it could be as simple as moving
 your rdd.persist() to after you have compressed and repartitioned your
 data.  eg., I'm blindly guessing you have something like this:

 val rawData = sc.hadoopFile(...)
 rawData.persist(DISK)
 rawData.count()
 val compressedData = rawData.map{...}
 val repartitionedData = compressedData.repartition(N)
 ...

 change it to something like:

 val rawData = sc.hadoopFile(...)
 val compressedData = rawData.map{...}
 val repartitionedData = compressedData.repartition(N)
 repartitionedData.persist(DISK)
 repartitionedData.count()
 ...


 The point is, you avoid caching any data until you have ensured that the
 partitions are small.  You might have big partitions before that in
 rawData, but that is OK.

 Imran


 On Thu, Feb 19, 2015 at 4:43 AM, Joe Wass jw...@crossref.org wrote:

 Thanks for your reply Sean.

 Looks like it's happening in a map:

 15/02/18 20:35:52 INFO scheduler.DAGScheduler: Submitting 100 missing
 tasks from Stage 1 (MappedRDD[17] at mapToPair at
 NativeMethodAccessorImpl.java:-2)

 That's my initial 'parse' stage, done before repartitioning. It reduces
 the data size significantly so I thought it would be sensible to do before
 repartitioning, which involves moving lots of data around. That might be a
 stupid idea in hindsight!

 So the obvious thing to try would be to try repartitioning before the
 map as the first transformation. I would have done that if I could be sure
 that it would succeed or fail quickly.

 I'm not entirely clear about the lazy execution of transformations in
 DAG. It could be that the error is manifesting during the mapToPair, but
 

Re: Filtering keys after map+combine

2015-02-19 Thread Debasish Das
I thought combiner comes from reduceByKey and not mapPartitions right...Let
me dig deeper into the APIs

On Thu, Feb 19, 2015 at 8:29 AM, Daniel Siegmann daniel.siegm...@velos.io
wrote:

 I'm not sure what your use case is, but perhaps you could use
 mapPartitions to reduce across the individual partitions and apply your
 filtering. Then you can finish with a reduceByKey.

 On Thu, Feb 19, 2015 at 9:21 AM, Debasish Das debasish.da...@gmail.com
 wrote:

 Hi,

 Before I send out the keys for network shuffle, in reduceByKey after map
 + combine are done, I would like to filter the keys based on some
 threshold...

 Is there a way to get the key, value after map+combine stages so that I
 can run a filter on the keys ?

 Thanks.
 Deb




 --
 Daniel Siegmann, Software Developer
 Velos
 Accelerating Machine Learning

 54 W 40th St, New York, NY 10018
 E: daniel.siegm...@velos.io W: www.velos.io



Re: JdbcRDD, ClassCastException with scala.Function0

2015-02-19 Thread Dmitry Goldenberg
That's a good point, thanks. Is there a way to instrument continuous
realtime streaming of data out of a database? If the data keeps changing,
one way to implement extraction would be to keep track of something like
the last-modified timestamp and instrument the query to be 'where
lastmodified  ?'

That would imply running the spark program repetitively on a scheduled
basis. I wonder if it's possible to just continuously stream any updates
out instead, using Spark..

On Thu, Feb 19, 2015 at 10:23 AM, Cody Koeninger c...@koeninger.org wrote:

 At the beginning of the code, do a query to find the current maximum ID

 Don't just put in an arbitrarily large value, or all of your rows will end
 up in 1 spark partition at the beginning of the range.

 The question of keys is up to you... all that you need to be able to do is
 write a sql statement that takes 2 numbers to specify the bounds.  Of
 course, a numeric primary key is going to be the most efficient way to do
 that.

 On Thu, Feb 19, 2015 at 8:57 AM, Dmitry Goldenberg 
 dgoldenberg...@gmail.com wrote:

 Yup, I did see that. Good point though, Cody. The mismatch was happening
 for me when I was trying to get the 'new JdbcRDD' approach going. Once I
 switched to the 'create' method things are working just fine. Was just able
 to refactor the 'get connection' logic into a 'DbConnection implements
 JdbcRDD.ConnectionFactory' and my 'map row' class is still 'MapRow
 implements org.apache.spark.api.java.function.FunctionResultSet, Row'.

 This works fine and makes the driver program tighter. Of course, my next
 question is, how to work with the lower and upper bound parameters. As in,
 what if I don't know what the min and max ID values are and just want to
 extract all data from the table, what should the params be, if that's even
 supported. And furthermore, what if the primary key on the table is not
 numeric? or if there's no primary key altogether?

 The method works fine with lowerBound=0 and upperBound=100, for
 example. But doesn't seem to have a way to say, 'no upper bound' (-1 didn't
 work).

 On Wed, Feb 18, 2015 at 11:59 PM, Cody Koeninger c...@koeninger.org
 wrote:

 Look at the definition of JdbcRDD.create:

   def create[T](

   sc: JavaSparkContext,

   connectionFactory: ConnectionFactory,

   sql: String,

   lowerBound: Long,

   upperBound: Long,

   numPartitions: Int,

   mapRow: JFunction[ResultSet, T]): JavaRDD[T] = {


 JFunction here is the interface org.apache.spark.api.java.function.Function,
 not scala Function0

 LIkewise, ConnectionFactory is an interface defined inside JdbcRDD, not
 scala Function0

 On Wed, Feb 18, 2015 at 4:50 PM, Dmitry Goldenberg 
 dgoldenberg...@gmail.com wrote:

 That's exactly what I was doing. However, I ran into runtime issues
 with doing that. For instance, I had a

   public class DbConnection extends AbstractFunction0Connection
 implements Serializable

 I got a runtime error from Spark complaining that DbConnection wasn't
 an instance of scala.Function0.

 I also had a

   public class MapRow extends
 scala.runtime.AbstractFunction1java.sql.ResultSet, Row implements
 Serializable

 with which I seemed to have more luck.

 On Wed, Feb 18, 2015 at 5:32 PM, Cody Koeninger c...@koeninger.org
 wrote:

 Cant you implement the

 org.apache.spark.api.java.function.Function

 interface and pass an instance of that to JdbcRDD.create ?

 On Wed, Feb 18, 2015 at 3:48 PM, Dmitry Goldenberg 
 dgoldenberg...@gmail.com wrote:

 Cody, you were right, I had a copy and paste snag where I ended up
 with a vanilla SparkContext rather than a Java one.  I also had to *not*
 use my function subclasses, rather just use anonymous inner classes for 
 the
 Function stuff and that got things working. I'm fully following
 the JdbcRDD.create approach from JavaJdbcRDDSuite.java basically 
 verbatim.

 Is there a clean way to refactor out the custom Function classes such
 as the one for getting a db connection or mapping ResultSet data to your
 own POJO's rather than doing it all inline?


 On Wed, Feb 18, 2015 at 1:52 PM, Cody Koeninger c...@koeninger.org
 wrote:

 Is sc there a SparkContext or a JavaSparkContext?  The compilation
 error seems to indicate the former, but JdbcRDD.create expects the 
 latter

 On Wed, Feb 18, 2015 at 12:30 PM, Dmitry Goldenberg 
 dgoldenberg...@gmail.com wrote:

 I have tried that as well, I get a compile error --

 [ERROR] ...SparkProto.java:[105,39] error: no suitable method found
 for create(SparkContext,anonymous
 ConnectionFactory,String,int,int,int,anonymous
 FunctionResultSet,Integer)

 The code is a copy and paste:

 JavaRDDInteger jdbcRDD = JdbcRDD.create(
   sc,
   new JdbcRDD.ConnectionFactory() {
 public Connection getConnection() throws SQLException {
   return
 DriverManager.getConnection(jdbc:derby:target/JavaJdbcRDDSuiteDb);
 }
   },
   SELECT DATA FROM FOO WHERE ? = ID AND ID = 

Re: Some tasks taking too much time to complete in a stage

2015-02-19 Thread Imran Rashid
almost all your data is going to one task.  You can see that the shuffle
read for task 0 is 153.3 KB, and for most other tasks its just 26B (which
is probably just some header saying there are no actual records).  You need
to ensure your data is more evenly distributed before this step.

On Thu, Feb 19, 2015 at 10:53 AM, jatinpreet jatinpr...@gmail.com wrote:

 Hi,

 I am running Spark 1.2.1 for compute intensive jobs comprising of multiple
 tasks. I have observed that most tasks complete very quickly, but there are
 always one or two tasks that take a lot of time to complete thereby
 increasing the overall stage time. What could be the reason for this?

 Following are the statistics for one such stage. As you can see, the task
 with index 0 takes 1.1 minutes whereas others completed much more quickly.

 Aggregated Metrics by Executor
 Executor ID Address Task Time   Total Tasks Failed
 TasksSucceeded Tasks
 Input   Output  Shuffle ReadShuffle Write   Shuffle Spill (Memory)
 Shuffle
 Spill (Disk)
 0   slave1:5631146 s13  0   13  0.0 B   0.0 B
  0.0 B   0.0 B   0.0 B   0.0 B
 1   slave2:426482.1 min 13  0   13  0.0 B
  0.0 B   384.3 KB0.0 B   0.0 B
 0.0 B
 2   slave3:4432223 s12  0   12  0.0 B   0.0 B
  136.4 KB0.0 B   0.0 B   0.0
 B
 3   slave4:3798744 s12  0   12  0.0 B   0.0 B
  213.9 KB0.0 B   0.0 B   0.0
 B
 Tasks
 Index   ID  Attempt Status  Locality Level  Executor ID / Host
 Launch Time
 DurationGC Time Shuffle ReadErrors
 0   213 0   SUCCESS PROCESS_LOCAL   1 / slave2
 2015/02/19 11:40:05 1.1 min
 1 s 153.3 KB
 5   218 0   SUCCESS PROCESS_LOCAL   3 / slave4
 2015/02/19 11:40:05 23 ms
 26.0 B
 1   214 0   SUCCESS PROCESS_LOCAL   3 / slave4
 2015/02/19 11:40:05 2 s 0.9
 s   13.8 KB
 4   217 0   SUCCESS PROCESS_LOCAL   1 / slave2
 2015/02/19 11:40:05 26 ms
 26.0 B
 3   216 0   SUCCESS PROCESS_LOCAL   0 / slave1
 2015/02/19 11:40:05 11 ms
 0.0 B
 2   215 0   SUCCESS PROCESS_LOCAL   2 / slave3
 2015/02/19 11:40:05 27 ms
 26.0 B
 7   220 0   SUCCESS PROCESS_LOCAL   0 / slave1
 2015/02/19 11:40:05 11 ms
 0.0 B
 10  223 0   SUCCESS PROCESS_LOCAL   2 / slave3
 2015/02/19 11:40:05 23 ms
 26.0 B
 6   219 0   SUCCESS PROCESS_LOCAL   2 / slave3
 2015/02/19 11:40:05 23 ms
 26.0 B
 9   222 0   SUCCESS PROCESS_LOCAL   3 / slave4
 2015/02/19 11:40:05 23 ms
 26.0 B
 8   221 0   SUCCESS PROCESS_LOCAL   1 / slave2
 2015/02/19 11:40:05 23 ms
 26.0 B
 11  224 0   SUCCESS PROCESS_LOCAL   0 / slave1
 2015/02/19 11:40:05 10 ms
 0.0 B
 14  227 0   SUCCESS PROCESS_LOCAL   2 / slave3
 2015/02/19 11:40:05 24 ms
 26.0 B
 13  226 0   SUCCESS PROCESS_LOCAL   3 / slave4
 2015/02/19 11:40:05 23 ms
 26.0 B
 16  229 0   SUCCESS PROCESS_LOCAL   1 / slave2
 2015/02/19 11:40:05 22 ms
 26.0 B
 12  225 0   SUCCESS PROCESS_LOCAL   1 / slave2
 2015/02/19 11:40:05 22 ms
 26.0 B
 15  228 0   SUCCESS PROCESS_LOCAL   0 / slave1
 2015/02/19 11:40:05 10 ms
 0.0 B
 17  230 0   SUCCESS PROCESS_LOCAL   3 / slave4
 2015/02/19 11:40:05 22 ms
 26.0 B
 23  236 0   SUCCESS PROCESS_LOCAL   0 / slave1
 2015/02/19 11:40:05 10 ms
 0.0 B
 22  235 0   SUCCESS PROCESS_LOCAL   2 / slave3
 2015/02/19 11:40:05 21 ms
 26.0 B
 19  232 0   SUCCESS PROCESS_LOCAL   0 / slave1
 2015/02/19 11:40:05 10 ms
 0.0 B
 21  234 0   SUCCESS PROCESS_LOCAL   3 / slave4
 2015/02/19 11:40:05 25 ms
 26.0 B
 18  231 0   SUCCESS PROCESS_LOCAL   2 / slave3
 2015/02/19 11:40:05 24 ms
 26.0 B
 20  233 0   SUCCESS PROCESS_LOCAL   1 / slave2
 2015/02/19 11:40:05 28 ms
 26.0 B
 25  238 0   SUCCESS PROCESS_LOCAL   3 / slave4
 2015/02/19 11:40:05 20 ms
 26.0 B
 28  241 0   SUCCESS PROCESS_LOCAL   1 / slave2
 2015/02/19 11:40:05 27 ms
 26.0 B
 27  240 0   SUCCESS PROCESS_LOCAL   0 / slave1
 2015/02/19 11:40:05 10 ms
 0.0 B


 Thanks



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Some-tasks-taking-too-much-time-to-complete-in-a-stage-tp21724.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Some tasks taking too much time to complete in a stage

2015-02-19 Thread Jatinpreet Singh
Hi Imran,

Thanks for pointing that out. My data comes from the HBase connector of
Spark. I do not govern the distribution of data myself. HBase decides to
put the data on any of the region servers. Is there a way to distribute
data evenly? And I am especially interested in running even small loads
very quickly apart from bulk loads.

Thanks,
Jatin

On Thu, Feb 19, 2015 at 10:28 PM, Imran Rashid iras...@cloudera.com wrote:

 almost all your data is going to one task.  You can see that the shuffle
 read for task 0 is 153.3 KB, and for most other tasks its just 26B (which
 is probably just some header saying there are no actual records).  You need
 to ensure your data is more evenly distributed before this step.

 On Thu, Feb 19, 2015 at 10:53 AM, jatinpreet jatinpr...@gmail.com wrote:

 Hi,

 I am running Spark 1.2.1 for compute intensive jobs comprising of multiple
 tasks. I have observed that most tasks complete very quickly, but there
 are
 always one or two tasks that take a lot of time to complete thereby
 increasing the overall stage time. What could be the reason for this?

 Following are the statistics for one such stage. As you can see, the task
 with index 0 takes 1.1 minutes whereas others completed much more quickly.

 Aggregated Metrics by Executor
 Executor ID Address Task Time   Total Tasks Failed
 TasksSucceeded Tasks
 Input   Output  Shuffle ReadShuffle Write   Shuffle Spill (Memory)
 Shuffle
 Spill (Disk)
 0   slave1:5631146 s13  0   13  0.0 B   0.0 B
  0.0 B   0.0 B   0.0 B   0.0 B
 1   slave2:426482.1 min 13  0   13  0.0 B
  0.0 B   384.3 KB0.0 B   0.0 B
 0.0 B
 2   slave3:4432223 s12  0   12  0.0 B   0.0 B
  136.4 KB0.0 B   0.0 B   0.0
 B
 3   slave4:3798744 s12  0   12  0.0 B   0.0 B
  213.9 KB0.0 B   0.0 B   0.0
 B
 Tasks
 Index   ID  Attempt Status  Locality Level  Executor ID / Host
 Launch Time
 DurationGC Time Shuffle ReadErrors
 0   213 0   SUCCESS PROCESS_LOCAL   1 / slave2
 2015/02/19 11:40:05 1.1 min
 1 s 153.3 KB
 5   218 0   SUCCESS PROCESS_LOCAL   3 / slave4
 2015/02/19 11:40:05 23 ms
 26.0 B
 1   214 0   SUCCESS PROCESS_LOCAL   3 / slave4
 2015/02/19 11:40:05 2 s 0.9
 s   13.8 KB
 4   217 0   SUCCESS PROCESS_LOCAL   1 / slave2
 2015/02/19 11:40:05 26 ms
 26.0 B
 3   216 0   SUCCESS PROCESS_LOCAL   0 / slave1
 2015/02/19 11:40:05 11 ms
 0.0 B
 2   215 0   SUCCESS PROCESS_LOCAL   2 / slave3
 2015/02/19 11:40:05 27 ms
 26.0 B
 7   220 0   SUCCESS PROCESS_LOCAL   0 / slave1
 2015/02/19 11:40:05 11 ms
 0.0 B
 10  223 0   SUCCESS PROCESS_LOCAL   2 / slave3
 2015/02/19 11:40:05 23 ms
 26.0 B
 6   219 0   SUCCESS PROCESS_LOCAL   2 / slave3
 2015/02/19 11:40:05 23 ms
 26.0 B
 9   222 0   SUCCESS PROCESS_LOCAL   3 / slave4
 2015/02/19 11:40:05 23 ms
 26.0 B
 8   221 0   SUCCESS PROCESS_LOCAL   1 / slave2
 2015/02/19 11:40:05 23 ms
 26.0 B
 11  224 0   SUCCESS PROCESS_LOCAL   0 / slave1
 2015/02/19 11:40:05 10 ms
 0.0 B
 14  227 0   SUCCESS PROCESS_LOCAL   2 / slave3
 2015/02/19 11:40:05 24 ms
 26.0 B
 13  226 0   SUCCESS PROCESS_LOCAL   3 / slave4
 2015/02/19 11:40:05 23 ms
 26.0 B
 16  229 0   SUCCESS PROCESS_LOCAL   1 / slave2
 2015/02/19 11:40:05 22 ms
 26.0 B
 12  225 0   SUCCESS PROCESS_LOCAL   1 / slave2
 2015/02/19 11:40:05 22 ms
 26.0 B
 15  228 0   SUCCESS PROCESS_LOCAL   0 / slave1
 2015/02/19 11:40:05 10 ms
 0.0 B
 17  230 0   SUCCESS PROCESS_LOCAL   3 / slave4
 2015/02/19 11:40:05 22 ms
 26.0 B
 23  236 0   SUCCESS PROCESS_LOCAL   0 / slave1
 2015/02/19 11:40:05 10 ms
 0.0 B
 22  235 0   SUCCESS PROCESS_LOCAL   2 / slave3
 2015/02/19 11:40:05 21 ms
 26.0 B
 19  232 0   SUCCESS PROCESS_LOCAL   0 / slave1
 2015/02/19 11:40:05 10 ms
 0.0 B
 21  234 0   SUCCESS PROCESS_LOCAL   3 / slave4
 2015/02/19 11:40:05 25 ms
 26.0 B
 18  231 0   SUCCESS PROCESS_LOCAL   2 / slave3
 2015/02/19 11:40:05 24 ms
 26.0 B
 20  233 0   SUCCESS PROCESS_LOCAL   1 / slave2
 2015/02/19 11:40:05 28 ms
 26.0 B
 25  238 0   SUCCESS PROCESS_LOCAL   3 / slave4
 2015/02/19 11:40:05 20 ms
 26.0 B
 28  241 0   SUCCESS PROCESS_LOCAL   1 / slave2
 2015/02/19 11:40:05 27 ms
 26.0 B
 27  240 0   SUCCESS PROCESS_LOCAL   0 / slave1
 2015/02/19 11:40:05 10 ms
 0.0 B


 Thanks



 --
 View this 

Re: Why is RDD lookup slow?

2015-02-19 Thread Burak Yavuz
If your dataset is large, there is a Spark Package called IndexedRDD
optimized for lookups. Feel free to check that out.

Burak
On Feb 19, 2015 7:37 AM, Ilya Ganelin ilgan...@gmail.com wrote:

 Hi Shahab - if your data structures are small enough a broadcasted Map is
 going to provide faster lookup. Lookup within an RDD is an O(m) operation
 where m is the size of the partition. For RDDs with multiple partitions,
 executors can operate on it in parallel so you get some improvement for
 larger RDDs.
 On Thu, Feb 19, 2015 at 7:31 AM shahab shahab.mok...@gmail.com wrote:

 Hi,

 I am doing lookup on cached RDDs [(Int,String)], and I noticed that the
 lookup is relatively slow 30-100 ms ?? I even tried this on one machine
 with single partition, but no difference!

 The RDDs are not large at all, 3-30 MB.

 Is this expected behaviour? should I use other data structures, like
 HashMap to keep data and look up it there and use Broadcast to send a copy
 to all machines?

 best,
 /Shahab





SchemaRDD.select

2015-02-19 Thread Cesar Flores
I am trying to pass a variable number of arguments to the select function
of a SchemaRDD I created, as I want to select the fields in run time:


val variable_argument_list = List('field1,'field2')

val schm1 = myschemaRDD.select('field1,'field2) // works
val schm2 = myschemaRDD.select(variable_argument_list:_*) // do not work


I am interested in selecting in run time the fields
from myschemaRDD variable. However, the usual way of passing variable
number of arguments as a List in Scala fails.

Is there a way of selecting a variable number of arguments in the select
function? If not, what will be a better approach for selecting the required
fields in run time?



Thanks in advance for your help
-- 
Cesar Flores


Re: percentil UDAF in spark 1.2.0

2015-02-19 Thread Mohnish Kodnani
Isnt that PR about being able to pass in an array to percentile function.
If I understand this error correctly, its not able to find the function
percentile itself.
Also, if I am incorrect and that PR fixes it, is it available in a release ?


On Thu, Feb 19, 2015 at 3:27 PM, Mark Hamstra m...@clearstorydata.com
wrote:

 Already fixed: https://github.com/apache/spark/pull/2802


 On Thu, Feb 19, 2015 at 3:17 PM, Mohnish Kodnani 
 mohnish.kodn...@gmail.com wrote:

 Hi,
 I am trying to use percentile and getting the following error. I am using
 spark 1.2.0. Does UDAF percentile exist in that code line and do i have to
 do something to get this to work.

 java.util.NoSuchElementException: key not found: percentile
 at scala.collection.MapLike$class.default(MapLike.scala:228)
 at scala.collection.AbstractMap.default(Map.scala:58)
 at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
 at
 org.apache.spark.sql.catalyst.analysis.SimpleFunctionRegistry.lookupFunction(FunctionRegistry.scala:53)
 at
 org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$10$$anonfun$applyOrElse$2.applyOrElse(Analyzer.scala:220)
 at
 org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$10$$anonfun$applyOrElse$2.applyOrElse(Analyzer.scala:218)
 at
 org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:144)
 at
 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:162)


 Thanks
 mohnish





unsubscribe

2015-02-19 Thread chaitu reddy
-- 
cheers,

chaitu


Re: issue Running Spark Job on Yarn Cluster

2015-02-19 Thread Sachin Singh
Yes.
On 19 Feb 2015 23:40, Harshvardhan Chauhan ha...@gumgum.com wrote:

 Is this the full stack trace ?

 On Wed, Feb 18, 2015 at 2:39 AM, sachin Singh sachin.sha...@gmail.com
 wrote:

 Hi,
 I want to run my spark Job in Hadoop yarn Cluster mode,
 I am using below command -
 spark-submit --master yarn-cluster --driver-memory 1g --executor-memory 1g
 --executor-cores 1 --class com.dc.analysis.jobs.AggregationJob
 sparkanalitic.jar param1 param2 param3
 I am getting error as under, kindly suggest whats going wrong ,is command
 is
 proper or not ,thanks in advance,

 Exception in thread main org.apache.spark.SparkException: Application
 finished with failed status
 at
 org.apache.spark.deploy.yarn.ClientBase$class.run(ClientBase.scala:509)
 at org.apache.spark.deploy.yarn.Client.run(Client.scala:35)
 at org.apache.spark.deploy.yarn.Client$.main(Client.scala:139)
 at org.apache.spark.deploy.yarn.Client.main(Client.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at

 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at

 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at
 org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
 at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
 at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/issue-Running-Spark-Job-on-Yarn-Cluster-tp21697.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




 --
 *Harshvardhan Chauhan*  |  Software Engineer
 *GumGum* http://www.gumgum.com/  |  *Ads that stick*
 310-260-9666  |  ha...@gumgum.com



using a database connection pool to write data into an RDBMS from a Spark application

2015-02-19 Thread Mohammed Guller
Hi –
I am trying to use BoneCP (a database connection pooling library) to write data 
from my Spark application to an RDBMS. The database inserts are inside a 
foreachPartition code block. I am getting this exception when the code tries to 
insert data using BoneCP:

java.sql.SQLException: No suitable driver found for 
jdbc:postgresql://hostname:5432/dbname

I tried explicitly loading the Postgres driver on the worker nodes by adding 
the following line inside the foreachPartition code block:

Class.forName(org.postgresql.Driver)

It didn’t help.

Has anybody able to get a database connection pool library to work with Spark? 
If you got it working, can you please share the steps?

Thanks,
Mohammed



Re: Incorrect number of records after left outer join (I think)

2015-02-19 Thread Imran Rashid
if you have duplicate values for a key, join creates all pairs.  Eg. if you
2 values for key X in rdd A  2 values for key X in rdd B, then a.join(B)
will have 4 records for key X

On Thu, Feb 19, 2015 at 3:39 PM, Darin McBeath ddmcbe...@yahoo.com.invalid
wrote:

 Consider the following left outer join

 potentialDailyModificationsRDD =
 reducedDailyPairRDD.leftOuterJoin(baselinePairRDD).partitionBy(new
 HashPartitioner(1024)).persist(StorageLevel.MEMORY_AND_DISK_SER());


 Below are the record counts for the RDDs involved
 Number of records for reducedDailyPairRDD: 2565206
 Number of records for baselinePairRDD: 56102812
 Number of records for potentialDailyModificationsRDD: 2570115

 Below are the partitioners for the RDDs involved.
 Partitioner for reducedDailyPairRDD:
 Some(org.apache.spark.HashPartitioner@400)
 Partitioner for baselinePairRDD: Some(org.apache.spark.HashPartitioner@400
 )
 Partitioner for potentialDailyModificationsRDD:
 Some(org.apache.spark.HashPartitioner@400)


 I realize in the above statement that the .partitionBy is probably not
 needed as the underlying RDDs used in the left outer join are already hash
 partitioned.

 My question is how the resulting RDD (potentialDailyModificationsRDD) can
 end up with more records than
 reducedDailyPairRDD.  I would think the number of records in
 potentialDailyModificationsRDD should be 2565206 instead of 2570115.  Am I
 missing something or is this possibly a bug?

 I'm using Apache Spark 1.2 on a stand-alone cluster on ec2.  To get the
 counts for the records, I'm using the .count() for the RDD.

 Thanks.

 Darin.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




percentil UDAF in spark 1.2.0

2015-02-19 Thread Mohnish Kodnani
Hi,
I am trying to use percentile and getting the following error. I am using
spark 1.2.0. Does UDAF percentile exist in that code line and do i have to
do something to get this to work.

java.util.NoSuchElementException: key not found: percentile
at scala.collection.MapLike$class.default(MapLike.scala:228)
at scala.collection.AbstractMap.default(Map.scala:58)
at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
at
org.apache.spark.sql.catalyst.analysis.SimpleFunctionRegistry.lookupFunction(FunctionRegistry.scala:53)
at
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$10$$anonfun$applyOrElse$2.applyOrElse(Analyzer.scala:220)
at
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$10$$anonfun$applyOrElse$2.applyOrElse(Analyzer.scala:218)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:144)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:162)


Thanks
mohnish


How to diagnose could not compute split errors and failed jobs?

2015-02-19 Thread Tim Smith
My streaming app runs fine for a few hours and then starts spewing Could
not compute split, block input-xx-xxx not found errors. After this,
jobs start to fail and batches start to pile up.

My question isn't so much about why this error but rather, how do I trace
what leads to this error? I am using disk+memory for storage so shouldn't
be a case of data loss resulting from memory overrun.

15/02/18 22:04:49 ERROR JobScheduler: Error running job streaming job
142429705 ms.28
org.apache.spark.SparkException: Job aborted due to stage failure: Task 3
in stage 247644.0 failed 64 times, most recent failure: Lost task 3.63 in
stage 247644.0 (TID 3705290, node-dn1-16-test.abcdefg.com):
java.lang.Exception: Could not compute split, block input-28-1424297042500
not found
at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:228)
at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Thanks,

Tim


Re: Filter data from one RDD based on data from another RDD

2015-02-19 Thread Imran Rashid
the more scalable alternative is to do a join (or a variant like cogroup,
leftOuterJoin, subtractByKey etc. found in PairRDDFunctions)

the downside is this requires a shuffle of both your RDDs

On Thu, Feb 19, 2015 at 3:36 PM, Himanish Kushary himan...@gmail.com
wrote:

 Hi,

 I have two RDD's with csv data as below :

 RDD-1

 101970_5854301840,fbcf5485-e696-4100-9468-a17ec7c5bb43,19229261643
 101970_5854301839,fbaf5485-e696-4100-9468-a17ec7c5bb39,9229261645
 101970_5854301839,fbbf5485-e696-4100-9468-a17ec7c5bb39,9229261647
 101970_17038953,546853f9-cf07-4700-b202-00f21e7c56d8,791191603
 101970_5854301840,fbcf5485-e696-4100-9468-a17ec7c5bb42,19229261643
 101970_5851048323,218f5485-e58c-4200-a473-348ddb858578,290542385
 101970_5854301839,fbcf5485-e696-4100-9468-a17ec7c5bb41,922926164

 RDD-2

 101970_17038953,546853f9-cf07-4700-b202-00f21e7c56d9,7911160
 101970_5851048323,218f5485-e58c-4200-a473-348ddb858578,2954238
 101970_5854301839,fbaf5485-e696-4100-9468-a17ec7c5bb39,9226164
 101970_5854301839,fbbf5485-e696-4100-9468-a17ec7c5bb39,92292164
 101970_5854301839,fbcf5485-e696-4100-9468-a17ec7c5bb41,9226164

 101970_5854301838,fbcf5485-e696-4100-9468-a17ec7c5bb40,929164
 101970_5854301838,fbcf5485-e696-4100-9468-a17ec7c5bb39,26164

 I need to filter RDD-2 to include only those records where the first
 column value in RDD-2 matches any of the first column values in RDD-1

 Currently , I am broadcasting the first column values from RDD-1 as a list
 and then filtering RDD-2 based on that list.

 val rdd1broadcast = sc.broadcast(rdd1.map { uu = uu.split(,)(0) 
 }.collect().toSet)

 val rdd2filtered = rdd2.filter{ h = 
 rdd1broadcast.value.contains(h.split(,)(0)) }

 This will result in data with first column 101970_5854301838 (last two 
 records) to be filtered out from RDD-2.

 Is this is the best way to accomplish this ? I am worried that for large data 
 volume , the broadcast step may become an issue. Appreciate any other 
 suggestion.

 ---
 Thanks
 Himanish



Failure on a Pipe operation

2015-02-19 Thread athing goingon
Hi, I'm trying to figure out why the following job is failing on a pipe
http://pastebin.com/raw.php?i=U5E8YiNN

With this exception:
http://pastebin.com/raw.php?i=07NTGyPP

Any help is welcome. Thank you.


Re: Failure on a Pipe operation

2015-02-19 Thread Imran Rashid
The error msg is telling you the exact problem, it can't find
ProgramSIM, the thing you are trying to run

Lost task 3520.3 in stage 0.0 (TID 11, compute3.research.dev):
java.io.IOException: Cannot run program ProgramSIM: error=2, No s\
uch file or directory


On Thu, Feb 19, 2015 at 5:52 PM, athing goingon athinggoin...@gmail.com
wrote:

 Hi, I'm trying to figure out why the following job is failing on a pipe
 http://pastebin.com/raw.php?i=U5E8YiNN

 With this exception:
 http://pastebin.com/raw.php?i=07NTGyPP

 Any help is welcome. Thank you.



Re: Spark job fails on cluster but works fine on a single machine

2015-02-19 Thread Ilya Ganelin
The stupid question is whether you're deleting the file from hdfs on the
right node?
On Thu, Feb 19, 2015 at 11:31 AM Pavel Velikhov pavel.velik...@gmail.com
wrote:

 Yeah, I do manually delete the files, but it still fails with this error.

 On Feb 19, 2015, at 8:16 PM, Ganelin, Ilya ilya.gane...@capitalone.com
 wrote:

  When writing to hdfs Spark will not overwrite existing files or
 directories. You must either manually delete these or use Java's Hadoop
 FileSystem class to remove them.



 Sent with Good (www.good.com)


 -Original Message-
 *From: *Pavel Velikhov [pavel.velik...@gmail.com]
 *Sent: *Thursday, February 19, 2015 11:32 AM Eastern Standard Time
 *To: *user@spark.apache.org
 *Subject: *Spark job fails on cluster but works fine on a single machine

 I have a simple Spark job that goes out to Cassandra, runs a pipe and
 stores results:

  val sc = new SparkContext(conf)
 val rdd = sc.cassandraTable(“keyspace, “table)
   .map(r = r.getInt(“column) + \t +
 write(get_lemmas(r.getString(tags
   .pipe(python3 /tmp/scripts_and_models/scripts/run.py)
   .map(r = convertStr(r) )
   .coalesce(1,true)
   .saveAsTextFile(/tmp/pavel/CassandraPipeTest.txt)
   //.saveToCassandra(“keyspace, “table, SomeColumns(“id”,data”))

 When run on a single machine, everything is fine if I save to an hdfs file
 or save to Cassandra.
 When run in cluster neither works:

  - When saving to file, I get an exception: User class threw exception:
 Output directory hdfs://hadoop01:54310/tmp/pavel/CassandraPipeTest.txt
 already exists
  - When saving to Cassandra, only 4 rows are updated with empty data (I
 test on a 4-machine Spark cluster)

 Any hints on how to debug this and where the problem could be?

 - I delete the hdfs file before running
 - Would really like the output to hdfs to work, so I can debug
 - Then it would be nice to save to Cassandra

 --
 The information contained in this e-mail is confidential and/or
 proprietary to Capital One and/or its affiliates. The information
 transmitted herewith is intended only for use by the individual or entity
 to which it is addressed.  If the reader of this message is not the
 intended recipient, you are hereby notified that any review,
 retransmission, dissemination, distribution, copying or other use of, or
 taking of any action in reliance upon this information is strictly
 prohibited. If you have received this communication in error, please
 contact the sender and delete the material from your computer.





  1   2   >