Re: MLUtil.kfold generates overlapped training and validation set?

2014-10-10 Thread Xiangrui Meng
1. No.

2. The seed per partition is fixed. So it should generate
non-overlapping subsets.

3. There was a bug in 1.0, which was fixed in 1.0.1 and 1.1.

Best,
Xiangrui

On Thu, Oct 9, 2014 at 11:05 AM, Nan Zhu zhunanmcg...@gmail.com wrote:
 Hi, all

 When we use MLUtils.kfold to generate training and validation set for cross
 validation

 we found that there is overlapped part in two sets….

 from the code, it does sampling for twice for the same dataset

  @Experimental
   def kFold[T: ClassTag](rdd: RDD[T], numFolds: Int, seed: Int):
 Array[(RDD[T], RDD[T])] = {
 val numFoldsF = numFolds.toFloat
 (1 to numFolds).map { fold =
   val sampler = new BernoulliSampler[T]((fold - 1) / numFoldsF, fold /
 numFoldsF,
 complement = false)
   val validation = new PartitionwiseSampledRDD(rdd, sampler, true, seed)
   val training = new PartitionwiseSampledRDD(rdd,
 sampler.cloneComplement(), true, seed)
   (training, validation)
 }.toArray
   }

 the sampler is complement, there is still possibility to generate overlapped
 training and validation set

 because the sampling method looks like :

 override def sample(items: Iterator[T]): Iterator[T] = {
 items.filter { item =
   val x = rng.nextDouble()
   (x = lb  x  ub) ^ complement
 }
   }

 I’m not a machine learning guy, so I guess I must fall into one of the
 following three situations

 1. does it mean actually we allow overlapped training and validation set ?
 (counter intuitive to me)

 2. I had some misunderstanding on the code?

 3. it’s a bug?

 Anyone can explain it to me?

 Best,

 --
 Nan Zhu


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Help with using combineByKey

2014-10-10 Thread Davies Liu
Maybe this version is easier to use:

plist.mapValues((v) = (if (v 0) 1 else 0, 1)).reduceByKey((x, y) =
(x._1 + y._1, x._2 + y._2))

It has similar behavior with combineByKey(), will by faster than
groupByKey() version.

On Thu, Oct 9, 2014 at 9:28 PM, HARIPRIYA AYYALASOMAYAJULA
aharipriy...@gmail.com wrote:
 Sean,

 Thank you. It works. But I am still confused about the function. Can you
 kindly throw some light on it?
 I was going through the example mentioned in
 https://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch04.html

 Is there any better source through which I can learn more about these
 functions? It would be helpful if I can get a chance to look at more
 examples.
 Also, I assume using combineByKey helps us solve it parallel than using
 simple functions provided by scala as mentioned by Yana. Am I correct?

 On Thu, Oct 9, 2014 at 12:30 PM, Sean Owen so...@cloudera.com wrote:

 Oh duh, sorry. The initialization should of course be (v) = (if (v 
 0) 1 else 0, 1)
 This gives the answer you are looking for. I don't see what Part2 is
 supposed to do differently.

 On Thu, Oct 9, 2014 at 6:14 PM, HARIPRIYA AYYALASOMAYAJULA
 aharipriy...@gmail.com wrote:
  Hello Sean,
 
  Thank you, but changing from v to 1 doesn't help me either.
 
  I am trying to count the number of non-zero values using the first
  accumulator.
  val newlist = List ((LAX,6), (LAX,0), (LAX,7), (SFO,0),
  (SFO,0),
  (SFO,9))
 
  val plist = sc.parallelize(newlist)
 
   val part1 = plist.combineByKey(
 (v) = (1, 1),
 (acc: (Int, Int), v) = ( if(v  0) acc._1 + 1 else acc._1, acc._2 +
  1),
 (acc1: (Int, Int), acc2: (Int, Int)) = (acc1._1 + acc2._1, acc1._2 +
  acc2._2)
 )
 
 val Part2 = part1.map{ case (key, value) = (key,
  (value._1,value._2)) }
 
  This should give me the result
  (LAX,(2,3))
  (SFO,(1,3))
 
 
 
  On Thu, Oct 9, 2014 at 11:48 AM, Sean Owen so...@cloudera.com wrote:
 
  You have a typo in your code at var acc:, and the map from opPart1
  to opPart2 looks like a no-op, but those aren't the problem I think.
  It sounds like you intend the first element of each pair to be a count
  of nonzero values, but you initialize the first element of the pair to
  v, not 1, in v = (v,1). Try v = (1,1)
 
 
  On Thu, Oct 9, 2014 at 4:47 PM, HARIPRIYA AYYALASOMAYAJULA
  aharipriy...@gmail.com wrote:
  
   I am a beginner to Spark and finding it difficult to implement a very
   simple
   reduce operation. I read that is ideal to use combineByKey for
   complex
   reduce operations.
  
   My input:
  
   val input = sc.parallelize(List((LAX,6), (LAX,8), (LAX,7),
   (SFO,0),
   (SFO,1),
   (SFO,9),(PHX,65),(PHX,88),(KX,7),(KX,6),(KX,1),
   (KX,9),
  
  
   (HOU,56),(HOU,5),(HOU,59),(HOU,0),(MA,563),(MA,545),(MA,5),(MA,0),(MA,0)))
  
  
val opPart1 = input.combineByKey(
  (v) = (v, 1),
  (var acc: (Int, Int), v) = ( if(v  0) acc._1 + 1 else acc._1,
   acc._2 +
   1),
  (acc1: (Int, Int), acc2: (Int, Int)) = (acc1._1 + acc2._1,
   acc1._2 +
   acc2._2)
  )
  
  val opPart2 = opPart1.map{ case (key, value) = (key,
   (value._1,value._2)) }
  
opPart2.collectAsMap().map(println(_))
  
   If the value is greater than 0, the first accumulator should be
   incremented
   by 1, else it remains the same. The second accumulator is a simple
   counter
   for each value.  I am getting an incorrect output (garbage values
   )for
   the
   first accumulator. Please help.
  
The equivalent reduce operation in Hadoop MapReduce is :
  
   public static class PercentageCalcReducer extends
   ReducerText,IntWritable,Text,FloatWritable
  
   {
  
   private FloatWritable pdelay = new FloatWritable();
  
  
   public void reduce(Text key, IterableIntWritable values,Context
   context)throws IOException,InterruptedException
  
   {
  
   int acc2=0;
  
   float frac_delay, percentage_delay;
  
   int acc1=0;
  
   for(IntWritable val : values)
  
   {
  
   if(val.get()  0)
  
   {
  
   acc1++;
  
   }
  
   acc2++;
  
   }
  
  
  
   frac_delay = (float)acc1/acc2;
  
   percentage_delay = frac_delay * 100 ;
  
   pdelay.set(percentage_delay);
  
   context.write(key,pdelay);
  
   }
  
   }
  
  
   Please help. Thank you for your time.
  
   --
  
   Regards,
  
   Haripriya Ayyalasomayajula
   contact : 650-796-7112
 
 
 
 
  --
  Regards,
  Haripriya Ayyalasomayajula
  contact : 650-796-7112




 --
 Regards,
 Haripriya Ayyalasomayajula
 contact : 650-796-7112

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to save ReceiverInputDStream to Hadoop using saveAsNewAPIHadoopFile

2014-10-10 Thread Akhil Das
You can convert this ReceiverInputDStream
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.dstream.ReceiverInputDStream
into PairRDDFuctions
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions
and call the saveAsNewAPIHadoopFile.

Thanks
Best Regards

On Fri, Oct 10, 2014 at 11:28 AM, Buntu Dev buntu...@gmail.com wrote:

 Basically I'm attempting to convert a JSON stream to Parquet and I get
 this error without the .values or .map(_._2) :

  value saveAsNewAPIHadoopFile is not a member of
 org.apache.spark.streaming.dstream.ReceiverInputDStream[(String, String)]




 On Thu, Oct 9, 2014 at 10:15 PM, Sean Owen so...@cloudera.com wrote:

 Your RDD does not contain pairs, since you .map(_._2) (BTW that can
 just be .values). Hadoop files means SequenceFiles and those
 store key-value pairs. That's why the method only appears for
 RDD[(K,V)].

 On Fri, Oct 10, 2014 at 3:50 AM, Buntu Dev buntu...@gmail.com wrote:
  Thanks Sean, but I'm importing
 org.apache.spark.streaming.StreamingContext._
 
  Here are the spark imports:
 
  import org.apache.spark.streaming._
 
  import org.apache.spark.streaming.StreamingContext._
 
  import org.apache.spark.streaming.kafka._
 
  import org.apache.spark.SparkConf
 
  
 
  val stream = KafkaUtils.createStream(ssc, zkQuorum, group,
  topicpMap).map(_._2) stream.saveAsNewAPIHadoopFile
 (destination,
  classOf[Void], classOf[Group], classOf[ExampleOutputFormat], conf)
 
  
 
  Anything else I might be missing?





How to patch sparkSQL on EC2?

2014-10-10 Thread Christos Kozanitis Christos Kozanitis
Hi

I have written a few extensions for sparkSQL (for version 1.1.0) and I am 
trying to deploy my new jar files (one for catalyst and one for sql/core) on 
ec2.

My approach was to create a new spark/lib/spark-assembly-1.1.0-hadoop1.0.4.jar 
that merged the contents of the old one with the contents of my new jar files 
and I propagated the changes to workers. 

However when I tried the code snippet below I received the error message that I 
paste at the end of this email. I was wondering, do you guys have any 
suggestions on how to fix this?

thanks
Christos

the code is:

import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.SQLContext
val sqlContext = new SQLContext(sc)

the error message is:

error: bad symbolic reference. A signature in package.class refers to term 
scalalogging
in package com.typesafe which is not available.
It may be completely missing from the current classpath, or the version on
the classpath might be incompatible with the version used when compiling 
package.class.
console:14: error: bad symbolic reference. A signature in package.class 
refers to term slf4j
in value com.typesafe.scalalogging which is not available.
It may be completely missing from the current classpath, or the version on
the classpath might be incompatible with the version used when compiling 
package.class.
   val sqlContext = new SQLContext(sc)



Re: IOException and appcache FileNotFoundException in Spark 1.02

2014-10-10 Thread Akhil Das
You could be hitting this issue
https://issues.apache.org/jira/browse/SPARK-3633 (or similar). You can
try the following workarounds:

sc.set(spark.core.connection.ack.wait.timeout,600)
sc.set(spark.akka.frameSize,50)
Also reduce the number of partitions, you could be hitting the kernel's
ulimit. I faced this issue and it was gone when i dropped the partitions
from 1600 to 200.

Thanks
Best Regards

On Fri, Oct 10, 2014 at 5:58 AM, Ilya Ganelin ilgan...@gmail.com wrote:

 Hi all – I could use some help figuring out a couple of exceptions I’ve
 been getting regularly.

 I have been running on a fairly large dataset (150 gigs). With smaller
 datasets I don't have any issues.

 My sequence of operations is as follows – unless otherwise specified, I am
 not caching:

 Map a 30 million row x 70 col string table to approx 30 mil x  5 string
 (For read as textFile I am using 1500 partitions)

 From that, map to ((a,b), score) and reduceByKey, numPartitions = 180

 Then, extract distinct values for A and distinct values for B. (I cache
 the output of distinct), , numPartitions = 180

 Zip with index for A and for B (to remap strings to int)

 Join remapped ids with original table

 This is then fed into MLLIBs ALS algorithm.

 I am running with:

 Spark version 1.02 with CDH5.1

 numExecutors = 8, numCores = 14

 Memory = 12g

 MemoryFration = 0.7

 KryoSerialization

 My issue is that the code runs fine for a while but then will
 non-deterministically crash with either file IOExceptions or the following
 obscure error:

 14/10/08 13:29:59 INFO TaskSetManager: Loss was due to
 java.io.IOException: Filesystem closed [duplicate 10]

 14/10/08 13:30:08 WARN TaskSetManager: Loss was due to
 java.io.FileNotFoundException

 java.io.FileNotFoundException:
 /opt/cloudera/hadoop/1/yarn/nm/usercache/zjb238/appcache/application_1412717093951_0024/spark-local-20141008131827-c082/1c/shuffle_3_117_354
 (No such file or directory)

 Looking through the logs, I see the IOException in other places but it
 appears to be non-catastrophic. The FileNotFoundException, however, is. I
 have found the following stack overflow that at least seems to address the
 IOException:


 http://stackoverflow.com/questions/24038908/spark-fails-on-big-shuffle-jobs-with-java-io-ioexception-filesystem-closed

 But I have not found anything useful at all with regards to the app cache
 error.

 Any help would be much appreciated.



Re: Spark in cluster [ remote.EndpointWriter: AssociationError]

2014-10-10 Thread Akhil Das
Can you paste your spark-env.sh file? Looks like you have it misconfigured.

Thanks
Best Regards

On Fri, Oct 10, 2014 at 1:43 AM, Morbious knowledgefromgro...@gmail.com
wrote:

 Hi,

 Recently I've configured spark in cluster with zookeper.
 I have 2 masters ( active/standby) and 6 workers.
 I've begun my installation with samples from example directory.
 Everything worked fine when I only used memory .
 When I used word count example I got messages like the ones below:

 14/10/09 19:37:19 ERROR remote.EndpointWriter: AssociationError
 [akka.tcp://sparkwor...@spark-slave1.domain.org:7078] - [akka.tcp://
 sparkexecu...@spark-slave1.domain.org:53757]: Error [Association failed
 with [akka.tcp://sparkexecu...@spark-slave1.domain.org:53757]] [
 akka.remote.EndpointAssociationException: Association failed with
 [akka.tcp://sparkexecu...@spark-slave1.domain.org:53757]
 Caused by:
 akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
 Connection refused: spark-slave1.domain.org/10.0.6.228:53757
 ]
 14/10/09 19:37:19 ERROR remote.EndpointWriter: AssociationError
 [akka.tcp://sparkwor...@spark-slave1.domain.org:7078] - [akka.tcp://
 sparkexecu...@spark-slave1.domain.org:53757]: Error [Association failed
 with [akka.tcp://sparkexecu...@spark-slave1.domain.org:53757]] [
 akka.remote.EndpointAssociationException: Association failed with
 [akka.tcp://sparkexecu...@spark-slave1.domain.org:53757]
 Caused by:
 akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
 Connection refused: spark-slave1.domain.org/10.0.6.228:53757
 ]

 I'm a little confused because I can't find any solution to my problem.
 I use Cloudera hadoop with spark.

 Best regards,

 Morbious



Delayed hotspot optimizations in Spark

2014-10-10 Thread Alexey Romanchuk
Hello spark users and developers!

I am using hdfs + spark sql + hive schema + parquet as storage format. I
have lot of parquet files - one files fits one hdfs block for one day. The
strange thing is very slow first query for spark sql.

To reproduce situation I use only one core and I have 97sec for first time
and only 13sec for all next queries. Sure I query for different data, but
it has same structure and size. The situation can be reproduced after
restart thrift server.

Here it information about parquet files reading from worker node:

Slow one:
Oct 10, 2014 2:26:53 PM INFO: parquet.hadoop.InternalParquetRecordReader:
Assembled and processed 1560251 records from 30 columns in 11686 ms:
133.51454 rec/ms, 4005.4363 cell/ms

Fast one:
Oct 10, 2014 2:31:30 PM INFO: parquet.hadoop.InternalParquetRecordReader:
Assembled and processed 1568899 records from 1 columns in 1373 ms:
1142.6796 rec/ms, 1142.6796 cell/ms

As you can see second reading is 10x times faster then first. Most of the
query time spent to work with parquet file.

This problem is really annoying, because most of my spark task contains
just 1 sql query and data processing and to speedup my jobs I put special
warmup query in from of any job.

My assumption is that it is hotspot optimizations that used due first
reading. Do you have any idea how to confirm/solve this performance problem?

Thanks for advice!

p.s. I have billion hotspot optimization showed with -XX:+PrintCompilation
but can not figure out what are important and what are not.


Re: Delayed hotspot optimizations in Spark

2014-10-10 Thread Sean Owen
You could try setting -Xcomp for executors to force JIT compilation
upfront. I don't know if it's a good idea overall but might show
whether the upfront compilation really helps. I doubt it.

However is this almost surely due to caching somewhere, in Spark SQL
or HDFS? I really doubt hotspot makes a difference compared to these
much larger factors.

On Fri, Oct 10, 2014 at 8:49 AM, Alexey Romanchuk
alexey.romanc...@gmail.com wrote:
 Hello spark users and developers!

 I am using hdfs + spark sql + hive schema + parquet as storage format. I
 have lot of parquet files - one files fits one hdfs block for one day. The
 strange thing is very slow first query for spark sql.

 To reproduce situation I use only one core and I have 97sec for first time
 and only 13sec for all next queries. Sure I query for different data, but it
 has same structure and size. The situation can be reproduced after restart
 thrift server.

 Here it information about parquet files reading from worker node:

 Slow one:
 Oct 10, 2014 2:26:53 PM INFO: parquet.hadoop.InternalParquetRecordReader:
 Assembled and processed 1560251 records from 30 columns in 11686 ms:
 133.51454 rec/ms, 4005.4363 cell/ms

 Fast one:
 Oct 10, 2014 2:31:30 PM INFO: parquet.hadoop.InternalParquetRecordReader:
 Assembled and processed 1568899 records from 1 columns in 1373 ms: 1142.6796
 rec/ms, 1142.6796 cell/ms

 As you can see second reading is 10x times faster then first. Most of the
 query time spent to work with parquet file.

 This problem is really annoying, because most of my spark task contains just
 1 sql query and data processing and to speedup my jobs I put special warmup
 query in from of any job.

 My assumption is that it is hotspot optimizations that used due first
 reading. Do you have any idea how to confirm/solve this performance problem?

 Thanks for advice!

 p.s. I have billion hotspot optimization showed with -XX:+PrintCompilation
 but can not figure out what are important and what are not.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Delayed hotspot optimizations in Spark

2014-10-10 Thread Alexey Romanchuk
Hey Sean and spark users!

Thanks for reply. I try -Xcomp right now and start time was about few
minutes (as expected), but I got first query slow as before:
Oct 10, 2014 3:03:41 PM INFO: parquet.hadoop.InternalParquetRecordReader:
Assembled and processed 1568899 records from 30 columns in 12897 ms:
121.64837 rec/ms, 3649.451 cell/ms

and next

Oct 10, 2014 3:05:03 PM INFO: parquet.hadoop.InternalParquetRecordReader:
Assembled and processed 1568899 records from 1 columns in 1757 ms:
892.94196 rec/ms, 892.94196 cell/ms

I have no idea about caching or other stuff because CPU load is 100% on
worker and jstack show that worker is reading from parquet file.

Any ideas?

Thanks!

On Fri, Oct 10, 2014 at 2:55 PM, Sean Owen so...@cloudera.com wrote:

 You could try setting -Xcomp for executors to force JIT compilation
 upfront. I don't know if it's a good idea overall but might show
 whether the upfront compilation really helps. I doubt it.

 However is this almost surely due to caching somewhere, in Spark SQL
 or HDFS? I really doubt hotspot makes a difference compared to these
 much larger factors.

 On Fri, Oct 10, 2014 at 8:49 AM, Alexey Romanchuk
 alexey.romanc...@gmail.com wrote:
  Hello spark users and developers!
 
  I am using hdfs + spark sql + hive schema + parquet as storage format. I
  have lot of parquet files - one files fits one hdfs block for one day.
 The
  strange thing is very slow first query for spark sql.
 
  To reproduce situation I use only one core and I have 97sec for first
 time
  and only 13sec for all next queries. Sure I query for different data,
 but it
  has same structure and size. The situation can be reproduced after
 restart
  thrift server.
 
  Here it information about parquet files reading from worker node:
 
  Slow one:
  Oct 10, 2014 2:26:53 PM INFO: parquet.hadoop.InternalParquetRecordReader:
  Assembled and processed 1560251 records from 30 columns in 11686 ms:
  133.51454 rec/ms, 4005.4363 cell/ms
 
  Fast one:
  Oct 10, 2014 2:31:30 PM INFO: parquet.hadoop.InternalParquetRecordReader:
  Assembled and processed 1568899 records from 1 columns in 1373 ms:
 1142.6796
  rec/ms, 1142.6796 cell/ms
 
  As you can see second reading is 10x times faster then first. Most of the
  query time spent to work with parquet file.
 
  This problem is really annoying, because most of my spark task contains
 just
  1 sql query and data processing and to speedup my jobs I put special
 warmup
  query in from of any job.
 
  My assumption is that it is hotspot optimizations that used due first
  reading. Do you have any idea how to confirm/solve this performance
 problem?
 
  Thanks for advice!
 
  p.s. I have billion hotspot optimization showed with
 -XX:+PrintCompilation
  but can not figure out what are important and what are not.



Re: Spark SQL - Exception only when using cacheTable

2014-10-10 Thread visakh
Can you try checking whether the table is being cached? You can use isCached
method. More details are here -
http://spark.apache.org/docs/1.0.2/api/java/org/apache/spark/sql/SQLContext.html



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Exception-only-when-using-cacheTable-tp16031p16123.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Can I run examples on cluster?

2014-10-10 Thread Theodore Si

Hi all,

I want to use two nodes for test, one as master, the other worker.
Can I submit the example application included in Spark source code 
tarball on master to let it run on the worker?

What should I do?

BR,
Theo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Delayed hotspot optimizations in Spark

2014-10-10 Thread Guillaume Pitel

Hi

Could it be due to GC ? I read it may happen if your program starts with 
a small heap. What are your -Xms and -Xmx values ?


Print GC stats with -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps

Guillaume

Hello spark users and developers!

I am using hdfs + spark sql + hive schema + parquet as storage format. 
I have lot of parquet files - one files fits one hdfs block for one 
day. The strange thing is very slow first query for spark sql.


To reproduce situation I use only one core and I have 97sec for first 
time and only 13sec for all next queries. Sure I query for different 
data, but it has same structure and size. The situation can be 
reproduced after restart thrift server.


Here it information about parquet files reading from worker node:

Slow one:
Oct 10, 2014 2:26:53 PM INFO: 
parquet.hadoop.InternalParquetRecordReader: Assembled and processed 
1560251 records from 30 columns in 11686 ms: 133.51454 rec/ms, 
4005.4363 cell/ms


Fast one:
Oct 10, 2014 2:31:30 PM INFO: 
parquet.hadoop.InternalParquetRecordReader: Assembled and processed 
1568899 records from 1 columns in 1373 ms: 1142.6796 rec/ms, 1142.6796 
cell/ms


As you can see second reading is 10x times faster then first. Most of 
the query time spent to work with parquet file.


This problem is really annoying, because most of my spark task 
contains just 1 sql query and data processing and to speedup my jobs I 
put special warmup query in from of any job.


My assumption is that it is hotspot optimizations that used due first 
reading. Do you have any idea how to confirm/solve this performance 
problem?


Thanks for advice!

p.s. I have billion hotspot optimization showed 
with -XX:+PrintCompilation but can not figure out what are important 
and what are not.



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Windowed Operations

2014-10-10 Thread julyfire
hi Diego,

I have the same problem.

// reduce by key in the first window
val *w1* = *one*.reduceByKeyAndWindow(_ + _, Seconds(30), Seconds(10))
w1.count().print()

//reduce by key in the second window based on the results of the first
window
val *w2* = *w1*.reduceByKeyAndWindow(_ + _, Seconds(120), Seconds(30))
w2.print()

then w1 works, but w2 always does not print any information.

Do you have any update for this issue?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Windowed-Operations-tp15133p16128.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark on Mesos Issue - Do I need to install Spark on Mesos slaves

2014-10-10 Thread Bijoy Deb
Hi,

I am trying to submit a Spark job on Mesos using spark-submit from my
Mesos-Master machine.
My SPARK_HOME = /vol1/spark/spark-1.0.2-bin-hadoop2

I have uploaded the spark-1.0.2-bin-hadoop2.tgz to hdfs so that the mesos
slaves can download it to invoke the Mesos Spark backend executor.


But on submitting the job, I can see the below error in 'stderr' logs on
the Mesos slave machine:




*sh: /vol1/spark/spark-1.0.2-bin-hadoop2/sbin/spark-executor: No such file
or directory*
Based on documentation,I understand that if I keep the spark-mesos binary
file in hdfs,I dont need to install Spark separately on the slave nodes.So,
the SPARK_HOME or /vol1/spark/spark-1.0.2-bin-hadoop2/ path is non-existent
on any of my slave machines and hence the error.

Now, my questions is:
Shouldn't the mesos-slave be looking for the spark-executor command in the
temporary directory where it is supposed to extract the
spark-1.0.2-bin-hadoop2.tgz from hdfs,instead of the SPARK_HOME
directory?What am I doing wrong here?
Any help would be really appreciated.


Thanks,
Bijoy


Re: Can I run examples on cluster?

2014-10-10 Thread Akhil Das
This is how the spark-cluster looks like


So your driver program (example application) can be ran on the master (or
anywhere which has access to the master - clustermanager) and the workers
will execute it.

Thanks
Best Regards

On Fri, Oct 10, 2014 at 2:47 PM, Theodore Si sjyz...@gmail.com wrote:

 Hi all,

 I want to use two nodes for test, one as master, the other worker.
 Can I submit the example application included in Spark source code tarball
 on master to let it run on the worker?
 What should I do?

 BR,
 Theo

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark SQL parser bug?

2014-10-10 Thread Cheng Lian

Hi Mohammed,

Would you mind to share the DDL of the table |x| and the complete 
stacktrace of the exception you got? A full Spark shell session history 
would be more than helpful. PR #2084 had been merged in master in Aug, 
and timestamp type is supported in 1.1.


I tried the following snippets in Spark shell (v1.1), and didn’t observe 
this issue:


|scala import org.apache.spark.sql._
import org.apache.spark.sql._

scala import sc._
import sc._

scala val sqlContext = new SQLContext(sc)
sqlContext: org.apache.spark.sql.SQLContext = 
org.apache.spark.sql.SQLContext@6c3441c5

scala import sqlContext._
import sqlContext._

scala case class Record(a: Int, ts: java.sql.Timestamp)
defined class Record

scala makeRDD(Seq.empty[Record], 1).registerTempTable(x)

scala sql(SELECT a FROM x WHERE ts = '2012-01-01T00:00:00' AND ts = 
'2012-03-31T23:59:59')
res1: org.apache.spark.sql.SchemaRDD =
SchemaRDD[3] at RDD at SchemaRDD.scala:103
== Query Plan ==
== Physical Plan ==
Project [a#0]
 ExistingRdd [a#0,ts#1], MapPartitionsRDD[5] at mapPartitions at 
basicOperators.scala:208

scala res1.collect()
...
res2: Array[org.apache.spark.sql.Row] = Array()
|

Cheng

On 10/9/14 10:26 AM, Mohammed Guller wrote:


Hi –

When I run the following Spark SQL query in Spark-shell ( version 1.1.0) :

val rdd = sqlContext.sql(SELECT a FROM x WHERE ts = 
'2012-01-01T00:00:00' AND ts = '2012-03-31T23:59:59' )


it gives the following error:

rdd: org.apache.spark.sql.SchemaRDD =

SchemaRDD[294] at RDD at SchemaRDD.scala:103

== Query Plan ==

== Physical Plan ==

java.util.NoSuchElementException: head of empty list

The ts column in the where clause has timestamp data and is of type 
timestamp. If I replace the string '2012-01-01T00:00:00' in the where 
clause with its epoch value, then the query works fine.


It looks I have run into an issue described in this pull request: 
https://github.com/apache/spark/pull/2084


Is that PR not merged in Spark version 1.1.0? Or am I missing something?

Thanks,

Mohammed


​


Re: Spark SQL - Exception only when using cacheTable

2014-10-10 Thread Cheng Lian
Hi Poiuytrez, what version of Spark are you using? Exception details 
like stacktrace are really needed to investigate this issue. You can 
find them in the executor logs, or just browse the application 
stderr/stdout link from Spark Web UI.


On 10/9/14 9:37 PM, poiuytrez wrote:

Hello,

I have a weird issue, this request works fine:
sqlContext.sql(SELECT customer_id FROM transactions WHERE purchaseamount =
200).count()

However, when I cache the table before making the request:
sqlContext.cacheTable(transactions)
sqlContext.sql(SELECT customer_id FROM transactions WHERE purchaseamount =
200).count()

I am getting an exception on of the task:
: org.apache.spark.SparkException: Job aborted due to stage failure: Task
120 in stage 104.0 failed 4 times, most recent failure: Lost task 120.3 in
stage 104.0 (TID 20537, spark-w-0.c.internal): java.lang.ClassCastException:

(I have no details after the ':')

Any ideas of what could be wrong?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Exception-only-when-using-cacheTable-tp16031.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Unable to share Sql between HiveContext and JDBC Thrift Server

2014-10-10 Thread Cheng Lian
Which version are you using? Also |.saveAsTable()| saves the table to 
Hive metastore, so you need to make sure your Spark application points 
to the same Hive metastore instance as the JDBC Thrift server. For 
example, put |hive-site.xml| under |$SPARK_HOME/conf|, and run 
|spark-shell| and |start-thriftserver.sh| under the same |$SPARK_HOME| 
should work. Just verified this against Spark 1.1.


On 10/10/14 9:32 AM, Steve Arnold wrote:

I am writing a Spark job to persist data using HiveContext so that it 
can be accessed via the JDBC Thrift server. Although my code doesn't 
throw an error, I am unable to see my persisted data when I query from 
the Thrift server.


I tried three different ways to get this to work:

1)
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
val schemaRdd: SchemaRDD = sqlContext.createSchemaRDD(rdd)
schemaRdd.saveAsTable(test_table)

rdd - RDD of a case class
sc - Spark Context
Case class used in all my examples:

case class SomeClass(key:String, value:String) extends Serializable

2) I then created a table called test_table after logging in to the 
thrift server and added two dummy records in it.


  val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
   import sqlContext._
val usermeta =  hql(  SELECT key, value from test_table)

val rdd = usermeta.map(t={SomeClass(3,i)})
val schemaRdd = createSchemaRDD(rdd)
schemaRdd.insertInto(test_table)

3) Tried the documented link on the Spark Sql programming page

 val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
 sqlContext.sql(CREATE TABLE IF NOT EXISTS test_table (key 
String, value String))


The Spark job does other computations which it is able to complete and 
returns the correct results; just the Sql part doesn't work. What am I 
doing wrong? I thought that the HiveContext could be accessed from 
running command line queries in the Thrift Server.


Cheers,
Steve


​


Re: Spark in cluster [ remote.EndpointWriter: AssociationError]

2014-10-10 Thread Morbious
Sorry, but your solution doesn't work.
I can see on my master port 7077 open and listening  and connected workers
but I don't understand why it's trying to connect 
itself ...

= Master is running on the specific host
 netstat -at | grep 7077
You will get something similar to:
tcp0  0 akhldz.master.io:7077 *:* LISTEN  

If that is the case, then from your worker machine do a 
host akhldz.master.io ( replace akhldz.master.io with your master host. If
something goes wrong, then add a host entry in your /etc/hosts file)   
telnet akhldz.master.io 7077 ( If this is not connecting, then your worker
wont connect either. )

= Adding Host entry in /etc/hosts

Open /etc/hosts from your worker machine and add the following entry
(example)

192.168.100.20   akhldz.master.io

PS :In the above case Pillis was having two ip addresses having same host
name
eg:
192.168.100.40  s1.machine.org
192.168.100.41  s1.machine.org




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-in-cluster-remote-EndpointWriter-AssociationError-tp16063p16134.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: IOException and appcache FileNotFoundException in Spark 1.02

2014-10-10 Thread Ilya Ganelin
Thank you - I will try this. If I drop the partition count am I not more
likely to hit memory issues? Especially if the dataset is rather large?
On Oct 10, 2014 3:19 AM, Akhil Das ak...@sigmoidanalytics.com wrote:

 You could be hitting this issue
 https://issues.apache.org/jira/browse/SPARK-3633 (or similar). You can
 try the following workarounds:

 sc.set(spark.core.connection.ack.wait.timeout,600)
 sc.set(spark.akka.frameSize,50)
 Also reduce the number of partitions, you could be hitting the kernel's
 ulimit. I faced this issue and it was gone when i dropped the partitions
 from 1600 to 200.

 Thanks
 Best Regards

 On Fri, Oct 10, 2014 at 5:58 AM, Ilya Ganelin ilgan...@gmail.com wrote:

 Hi all – I could use some help figuring out a couple of exceptions I’ve
 been getting regularly.

 I have been running on a fairly large dataset (150 gigs). With smaller
 datasets I don't have any issues.

 My sequence of operations is as follows – unless otherwise specified, I
 am not caching:

 Map a 30 million row x 70 col string table to approx 30 mil x  5 string
 (For read as textFile I am using 1500 partitions)

 From that, map to ((a,b), score) and reduceByKey, numPartitions = 180

 Then, extract distinct values for A and distinct values for B. (I cache
 the output of distinct), , numPartitions = 180

 Zip with index for A and for B (to remap strings to int)

 Join remapped ids with original table

 This is then fed into MLLIBs ALS algorithm.

 I am running with:

 Spark version 1.02 with CDH5.1

 numExecutors = 8, numCores = 14

 Memory = 12g

 MemoryFration = 0.7

 KryoSerialization

 My issue is that the code runs fine for a while but then will
 non-deterministically crash with either file IOExceptions or the following
 obscure error:

 14/10/08 13:29:59 INFO TaskSetManager: Loss was due to
 java.io.IOException: Filesystem closed [duplicate 10]

 14/10/08 13:30:08 WARN TaskSetManager: Loss was due to
 java.io.FileNotFoundException

 java.io.FileNotFoundException:
 /opt/cloudera/hadoop/1/yarn/nm/usercache/zjb238/appcache/application_1412717093951_0024/spark-local-20141008131827-c082/1c/shuffle_3_117_354
 (No such file or directory)

 Looking through the logs, I see the IOException in other places but it
 appears to be non-catastrophic. The FileNotFoundException, however, is. I
 have found the following stack overflow that at least seems to address the
 IOException:


 http://stackoverflow.com/questions/24038908/spark-fails-on-big-shuffle-jobs-with-java-io-ioexception-filesystem-closed

 But I have not found anything useful at all with regards to the app cache
 error.

 Any help would be much appreciated.





Re: Spark SQL - Exception only when using cacheTable

2014-10-10 Thread poiuytrez
I am using the python api. Unfortunately, I cannot find the isCached method
equivalent in the documentation:
https://spark.apache.org/docs/1.1.0/api/python/index.html in the SQLContext
section.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Exception-only-when-using-cacheTable-tp16031p16137.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark SQL - Exception only when using cacheTable

2014-10-10 Thread poiuytrez
Hi Cheng, 

I am using Spark 1.1.0. 
This is the stack trace:
14/10/10 12:17:40 WARN TaskSetManager: Lost task 120.0 in stage 7.0 (TID
2235, spark-w-0.c.db.internal): java.lang.ClassCastException: java.lang.Long
cannot be cast to java.lang.Integer
scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:106)
   
org.apache.spark.sql.catalyst.expressions.GenericRow.getInt(Row.scala:146)
org.apache.spark.sql.columnar.INT$.getField(ColumnType.scala:105)
org.apache.spark.sql.columnar.INT$.getField(ColumnType.scala:92)
   
org.apache.spark.sql.columnar.BasicColumnBuilder.appendFrom(ColumnBuilder.scala:72)
   
org.apache.spark.sql.columnar.NativeColumnBuilder.org$apache$spark$sql$columnar$NullableColumnBuilder$$super$appendFrom(ColumnBuilder.scala:88)
   
org.apache.spark.sql.columnar.NullableColumnBuilder$class.appendFrom(NullableColumnBuilder.scala:57)
   
org.apache.spark.sql.columnar.NativeColumnBuilder.org$apache$spark$sql$columnar$compression$CompressibleColumnBuilder$$super$appendFrom(ColumnBuilder.scala:88)
   
org.apache.spark.sql.columnar.compression.CompressibleColumnBuilder$class.appendFrom(CompressibleColumnBuilder.scala:76)
   
org.apache.spark.sql.columnar.NativeColumnBuilder.appendFrom(ColumnBuilder.scala:88)
   
org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$1$$anon$1.next(InMemoryColumnarTableScan.scala:65)
   
org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$1$$anon$1.next(InMemoryColumnarTableScan.scala:50)
   
org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:236)
   
org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:163)
org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
org.apache.spark.rdd.RDD.iterator(RDD.scala:227)
   
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
   
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
   
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
   
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
   
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
   
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
   
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
org.apache.spark.scheduler.Task.run(Task.scala:54)
   
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
   
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
   
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)


This was also printed on the driver:

14/10/10 12:17:43 ERROR TaskSetManager: Task 120 in stage 7.0 failed 4
times; aborting job
14/10/10 12:17:43 INFO TaskSchedulerImpl: Cancelling stage 7
14/10/10 12:17:43 INFO TaskSchedulerImpl: Stage 7 was cancelled
14/10/10 12:17:43 INFO DAGScheduler: Failed to run collect at
SparkPlan.scala:85
Traceback (most recent call last):
  File stdin, line 1, in module
  File /home/hadoop/spark-install/python/pyspark/sql.py, line 1606, in
count
return self._jschema_rdd.count()
  File
/home/hadoop/spark-install/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py,
line 538, in __call__
  File
/home/hadoop/spark-install/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py,
line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o100.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task
120 in stage 7.0 failed 4 times, most recent failure: Lost task 120.3 in
stage 7.0 (TID 2248, spark-w-0.c.db.internal): java.lang.ClassCastException: 

Driver stacktrace:
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at

Disabling log4j in Spark-Shell on ec2 stopped working on Wednesday (Oct 8)

2014-10-10 Thread Darin McBeath
For weeks, I've been using the following trick to successfully disable log4j in 
the spark-shell when running a cluster on ec2 that was started by the Spark 
provided ec2 scripts.

cp ./conf/log4j.properties.template ./conf/log4j.properties


I then change log4j.rootCategory=INFO to log4j.rootCategory=WARN.

This all stopped working on Wednesday when I could no longer successfully start 
a cluster on ec2 (using the Spark provided ec2 scripts).  I noticed the 
resolution to this problem was a script referenced by the ec2 scripts had been 
changed (and that this referenced script has since been reverted).  I raise 
this as I don't know if this is a symptom of my problem and that it's 
interesting the problems started happening at the same time.

When I now start up the cluster on ec2 and subsequently start the spark-shell I 
can no longer disable the log4j messages using the above trick.  I'm using 
Apache Spark 1.1.0.

What's interesting is that I can start the cluster locally on my laptop (using 
Spark 1.1.0) and the above trick for disabling log4j in the spark-shell works.  
So, the issue appears to be related to ec2 and potentially something referenced 
by the Spark provided ec2 startup script.  But, that is purely a guess on my 
part.

I'm wondering if anyone else has noticed this issue and if so has a workaround.

Thanks.

Darin.


Re: Can I run examples on cluster?

2014-10-10 Thread Theodore Si
But I cannot do this via using

./bin/run-example SparkPi 10

right?


On Fri, Oct 10, 2014 at 6:04 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 This is how the spark-cluster looks like


 So your driver program (example application) can be ran on the master (or
 anywhere which has access to the master - clustermanager) and the workers
 will execute it.

 Thanks
 Best Regards

 On Fri, Oct 10, 2014 at 2:47 PM, Theodore Si sjyz...@gmail.com wrote:

 Hi all,

 I want to use two nodes for test, one as master, the other worker.
 Can I submit the example application included in Spark source code
 tarball on master to let it run on the worker?
 What should I do?

 BR,
 Theo

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Can I run examples on cluster?

2014-10-10 Thread Theodore Si
Should I pack the example into a jar file and submit it on master?

On Fri, Oct 10, 2014 at 9:32 PM, Theodore Si sjyz...@gmail.com wrote:

 But I cannot do this via using

 ./bin/run-example SparkPi 10

 right?


 On Fri, Oct 10, 2014 at 6:04 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 This is how the spark-cluster looks like


 So your driver program (example application) can be ran on the master (or
 anywhere which has access to the master - clustermanager) and the workers
 will execute it.

 Thanks
 Best Regards

 On Fri, Oct 10, 2014 at 2:47 PM, Theodore Si sjyz...@gmail.com wrote:

 Hi all,

 I want to use two nodes for test, one as master, the other worker.
 Can I submit the example application included in Spark source code
 tarball on master to let it run on the worker?
 What should I do?

 BR,
 Theo

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






Re: Can I run examples on cluster?

2014-10-10 Thread Akhil Das
Yes, you can run it with --master=spark://your-spark-uri:7077 i believe.

Thanks
Best Regards

On Fri, Oct 10, 2014 at 7:03 PM, Theodore Si sjyz...@gmail.com wrote:

 Should I pack the example into a jar file and submit it on master?

 On Fri, Oct 10, 2014 at 9:32 PM, Theodore Si sjyz...@gmail.com wrote:

 But I cannot do this via using

 ./bin/run-example SparkPi 10

 right?


 On Fri, Oct 10, 2014 at 6:04 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 This is how the spark-cluster looks like


 So your driver program (example application) can be ran on the master
 (or anywhere which has access to the master - clustermanager) and the
 workers will execute it.

 Thanks
 Best Regards

 On Fri, Oct 10, 2014 at 2:47 PM, Theodore Si sjyz...@gmail.com wrote:

 Hi all,

 I want to use two nodes for test, one as master, the other worker.
 Can I submit the example application included in Spark source code
 tarball on master to let it run on the worker?
 What should I do?

 BR,
 Theo

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org







Re: Can I run examples on cluster?

2014-10-10 Thread HARIPRIYA AYYALASOMAYAJULA
spark-submit --class “Classname   --master yarn-cluster
jarfile(withcomplete path)

This should work.

On Fri, Oct 10, 2014 at 8:36 AM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 Yes, you can run it with --master=spark://your-spark-uri:7077 i believe.

 Thanks
 Best Regards

 On Fri, Oct 10, 2014 at 7:03 PM, Theodore Si sjyz...@gmail.com wrote:

 Should I pack the example into a jar file and submit it on master?

 On Fri, Oct 10, 2014 at 9:32 PM, Theodore Si sjyz...@gmail.com wrote:

 But I cannot do this via using

 ./bin/run-example SparkPi 10

 right?


 On Fri, Oct 10, 2014 at 6:04 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 This is how the spark-cluster looks like


 So your driver program (example application) can be ran on the master
 (or anywhere which has access to the master - clustermanager) and the
 workers will execute it.

 Thanks
 Best Regards

 On Fri, Oct 10, 2014 at 2:47 PM, Theodore Si sjyz...@gmail.com wrote:

 Hi all,

 I want to use two nodes for test, one as master, the other worker.
 Can I submit the example application included in Spark source code
 tarball on master to let it run on the worker?
 What should I do?

 BR,
 Theo

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org








-- 
Regards,
Haripriya Ayyalasomayajula


How does the Spark Accumulator work under the covers?

2014-10-10 Thread Areg Baghdasaryan (BLOOMBERG/ 731 LEX -)
Hello,
I was wondering on what does the Spark accumulator do under the covers. I’ve 
implemented my own associative addInPlace function for the accumulator, where 
is this function being run? Let’s say you call something like myRdd.map(x = 
sum += x) is “sum” being accumulated locally in any way, for each element or 
partition or node? Is “sum” a broadcast variable? Or does it only exist on the 
driver node? How does the driver node get access to the “sum”?
Thanks,
Areg

Re: MLUtil.kfold generates overlapped training and validation set?

2014-10-10 Thread Nan Zhu
Thanks, Xiangrui,   

I found the reason of overlapped training set and test set

….

Another counter-intuitive issue related to 
https://github.com/apache/spark/pull/2508

Best,  

--  
Nan Zhu


On Friday, October 10, 2014 at 2:19 AM, Xiangrui Meng wrote:

 1. No.
  
 2. The seed per partition is fixed. So it should generate
 non-overlapping subsets.
  
 3. There was a bug in 1.0, which was fixed in 1.0.1 and 1.1.
  
 Best,
 Xiangrui
  
 On Thu, Oct 9, 2014 at 11:05 AM, Nan Zhu zhunanmcg...@gmail.com 
 (mailto:zhunanmcg...@gmail.com) wrote:
  Hi, all
   
  When we use MLUtils.kfold to generate training and validation set for cross
  validation
   
  we found that there is overlapped part in two sets….
   
  from the code, it does sampling for twice for the same dataset
   
  @Experimental
  def kFold[T: ClassTag](rdd: RDD[T], numFolds: Int, seed: Int):
  Array[(RDD[T], RDD[T])] = {
  val numFoldsF = numFolds.toFloat
  (1 to numFolds).map { fold =
  val sampler = new BernoulliSampler[T]((fold - 1) / numFoldsF, fold /
  numFoldsF,
  complement = false)
  val validation = new PartitionwiseSampledRDD(rdd, sampler, true, seed)
  val training = new PartitionwiseSampledRDD(rdd,
  sampler.cloneComplement(), true, seed)
  (training, validation)
  }.toArray
  }
   
  the sampler is complement, there is still possibility to generate overlapped
  training and validation set
   
  because the sampling method looks like :
   
  override def sample(items: Iterator[T]): Iterator[T] = {
  items.filter { item =
  val x = rng.nextDouble()
  (x = lb  x  ub) ^ complement
  }
  }
   
  I’m not a machine learning guy, so I guess I must fall into one of the
  following three situations
   
  1. does it mean actually we allow overlapped training and validation set ?
  (counter intuitive to me)
   
  2. I had some misunderstanding on the code?
   
  3. it’s a bug?
   
  Anyone can explain it to me?
   
  Best,
   
  --
  Nan Zhu
   
  
  
  




Re: How does the Spark Accumulator work under the covers?

2014-10-10 Thread HARIPRIYA AYYALASOMAYAJULA
If you use parallelize, the data is distributed across multiple nodes
available and sum is computed individually within each partition and later
merged. The driver manages the entire process. Is my understanding correct?
Can someone please correct me if I am wrong?

On Fri, Oct 10, 2014 at 9:37 AM, Areg Baghdasaryan (BLOOMBERG/ 731 LEX -) 
abaghdasa...@bloomberg.net wrote:

 Hello,
 I was wondering on what does the Spark accumulator do under the covers.
 I’ve implemented my own associative addInPlace function for the
 accumulator, where is this function being run? Let’s say you call something
 like myRdd.map(x = sum += x) is “sum” being accumulated locally in any
 way, for each element or partition or node? Is “sum” a broadcast variable?
 Or does it only exist on the driver node? How does the driver node get
 access to the “sum”?
 Thanks,
 Areg




-- 
Regards,
Haripriya Ayyalasomayajula


RE: Spark on Mesos Issue - Do I need to install Spark on Mesos slaves

2014-10-10 Thread Yangcheng Huang
spark-defaults.conf
spark.executor.uri
hdfs://:9000/user//spark-1.1.0-bin-hadoop2.4.tgz



From: Bijoy Deb [mailto:bijoy.comput...@gmail.com]
Sent: den 10 oktober 2014 11:59
To: user@spark.apache.org
Subject: Spark on Mesos Issue - Do I need to install Spark on Mesos slaves

Hi,
I am trying to submit a Spark job on Mesos using spark-submit from my 
Mesos-Master machine.
My SPARK_HOME = /vol1/spark/spark-1.0.2-bin-hadoop2

I have uploaded the spark-1.0.2-bin-hadoop2.tgz to hdfs so that the mesos 
slaves can download it to invoke the Mesos Spark backend executor.


But on submitting the job, I can see the below error in 'stderr' logs on the 
Mesos slave machine:


sh: /vol1/spark/spark-1.0.2-bin-hadoop2/sbin/spark-executor: No such file or 
directory
Based on documentation,I understand that if I keep the spark-mesos binary file 
in hdfs,I dont need to install Spark separately on the slave nodes.So, the 
SPARK_HOME or /vol1/spark/spark-1.0.2-bin-hadoop2/ path is non-existent on any 
of my slave machines and hence the error.
Now, my questions is:
Shouldn't the mesos-slave be looking for the spark-executor command in the 
temporary directory where it is supposed to extract the 
spark-1.0.2-bin-hadoop2.tgz from hdfs,instead of the SPARK_HOME directory?What 
am I doing wrong here?
Any help would be really appreciated.

Thanks,
Bijoy


Breaking the previous large-scale sort record with Spark

2014-10-10 Thread Matei Zaharia
Hi folks,

I interrupt your regularly scheduled user / dev list to bring you some pretty 
cool news for the project, which is that we've been able to use Spark to break 
MapReduce's 100 TB and 1 PB sort records, sorting data 3x faster on 10x fewer 
nodes. There's a detailed writeup at 
http://databricks.com/blog/2014/10/10/spark-breaks-previous-large-scale-sort-record.html.
 Summary: while Hadoop MapReduce held last year's 100 TB world record by 
sorting 100 TB in 72 minutes on 2100 nodes, we sorted it in 23 minutes on 206 
nodes; and we also scaled up to sort 1 PB in 234 minutes.

I want to thank Reynold Xin for leading this effort over the past few weeks, 
along with Parviz Deyhim, Xiangrui Meng, Aaron Davidson and Ali Ghodsi. In 
addition, we'd really like to thank Amazon's EC2 team for providing the 
machines to make this possible. Finally, this result would of course not be 
possible without the many many other contributions, testing and feature 
requests from throughout the community.

For an engine to scale from these multi-hour petabyte batch jobs down to 
100-millisecond streaming and interactive queries is quite uncommon, and it's 
thanks to all of you folks that we are able to make this happen.

Matei
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Breaking the previous large-scale sort record with Spark

2014-10-10 Thread Debasish Das
Awesome news Matei !

Congratulations to the databricks team and all the community members...

On Fri, Oct 10, 2014 at 7:54 AM, Matei Zaharia matei.zaha...@gmail.com
wrote:

 Hi folks,

 I interrupt your regularly scheduled user / dev list to bring you some
 pretty cool news for the project, which is that we've been able to use
 Spark to break MapReduce's 100 TB and 1 PB sort records, sorting data 3x
 faster on 10x fewer nodes. There's a detailed writeup at
 http://databricks.com/blog/2014/10/10/spark-breaks-previous-large-scale-sort-record.html.
 Summary: while Hadoop MapReduce held last year's 100 TB world record by
 sorting 100 TB in 72 minutes on 2100 nodes, we sorted it in 23 minutes on
 206 nodes; and we also scaled up to sort 1 PB in 234 minutes.

 I want to thank Reynold Xin for leading this effort over the past few
 weeks, along with Parviz Deyhim, Xiangrui Meng, Aaron Davidson and Ali
 Ghodsi. In addition, we'd really like to thank Amazon's EC2 team for
 providing the machines to make this possible. Finally, this result would of
 course not be possible without the many many other contributions, testing
 and feature requests from throughout the community.

 For an engine to scale from these multi-hour petabyte batch jobs down to
 100-millisecond streaming and interactive queries is quite uncommon, and
 it's thanks to all of you folks that we are able to make this happen.

 Matei
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Breaking the previous large-scale sort record with Spark

2014-10-10 Thread Mridul Muralidharan
Brilliant stuff ! Congrats all :-)
This is indeed really heartening news !

Regards,
Mridul


On Fri, Oct 10, 2014 at 8:24 PM, Matei Zaharia matei.zaha...@gmail.com wrote:
 Hi folks,

 I interrupt your regularly scheduled user / dev list to bring you some pretty 
 cool news for the project, which is that we've been able to use Spark to 
 break MapReduce's 100 TB and 1 PB sort records, sorting data 3x faster on 10x 
 fewer nodes. There's a detailed writeup at 
 http://databricks.com/blog/2014/10/10/spark-breaks-previous-large-scale-sort-record.html.
  Summary: while Hadoop MapReduce held last year's 100 TB world record by 
 sorting 100 TB in 72 minutes on 2100 nodes, we sorted it in 23 minutes on 206 
 nodes; and we also scaled up to sort 1 PB in 234 minutes.

 I want to thank Reynold Xin for leading this effort over the past few weeks, 
 along with Parviz Deyhim, Xiangrui Meng, Aaron Davidson and Ali Ghodsi. In 
 addition, we'd really like to thank Amazon's EC2 team for providing the 
 machines to make this possible. Finally, this result would of course not be 
 possible without the many many other contributions, testing and feature 
 requests from throughout the community.

 For an engine to scale from these multi-hour petabyte batch jobs down to 
 100-millisecond streaming and interactive queries is quite uncommon, and it's 
 thanks to all of you folks that we are able to make this happen.

 Matei
 -
 To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
 For additional commands, e-mail: dev-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



SparkSQL LEFT JOIN problem

2014-10-10 Thread invkrh
Hi,

I am exploring SparkSQL 1.1.0, I have a problem on LEFT JOIN.

Here is the request:

select * from customer left join profile on customer.account_id =
profile.account_id

The two tables' schema are shown as following:

// Table: customer
root
 |-- account_id: string (nullable = false)
 |-- birthday: string (nullable = true)
 |-- preferstore: string (nullable = true)
 |-- registstore: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- city_name_en: string (nullable = true)
 |-- register_date: string (nullable = true)
 |-- zip: string (nullable = true)

// Table: profile
root
 |-- account_id: string (nullable = false)
 |-- card_type: string (nullable = true)
 |-- card_upgrade_time_black: string (nullable = true)
 |-- card_upgrade_time_gold: string (nullable = true)

However, I have always an exception:

Exception in thread main
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved
attributes: *, tree:
Project [*]
 Join LeftOuter, Some(('customer.account_id = 'profile.account_id))
  Subquery customer
   SparkLogicalPlan (ExistingRdd
[account_id#0,birthday#1,preferstore#2,registstore#3,gender#4,city_name_en#5,register_date#6,zip#7],
MappedRDD[5] at map at SQLFetcher.scala:43)
  Subquery profile
   SparkLogicalPlan (ExistingRdd
[account_id#8,card_type#9,card_upgrade_time_black#10,card_upgrade_time_gold#11],
MappedRDD[12] at map at SQLFetcher.scala:43)

I was not sure where the problem is. So I create two simple tables to
isolate the problem.

// table 1
a   b   c
4   8   9
1   3   4
3   4   5

// table 2
a   b   c
1   2   3
4   5   6

This time, it works.

So the problem might be in data. I have just sampled some lines of input
tables to create new ones.
This also works.

I am so confused. The problem is in the data, but the error messages are not
enough to find it (if I am not missing anything.)

Some lines of the sampled tables.

// Table: customer

[50660,1975-06-05 00:00:00.000,13,12,male,ningboshi,2006-12-14
00:00:00.000,]
[50666,1984-02-23 00:00:00.000,72,5,Female,beijingshi,2006-12-14
00:00:00.000,100086]
[50680,1976-11-25 00:00:00.000,59,5,Female,beijingshi,2006-12-14
00:00:00.000,100022]
[85,1971-03-27 00:00:00.000,2,2,Female,shanghaishi,2005-09-20
00:00:00.000,200336]


// Table: profile

[1144681,3,2010-02-18 00:00:00.000,2013-02-28 00:00:00.000]
[50666,2,2010-10-31 00:00:00.000,]
[3930657,1,,]
[1056365,2,2009-12-29 00:00:00.000,]

Any help ? =)

Hao




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-LEFT-JOIN-problem-tp16152.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Does Ipython notebook work with spark? trivial example does not work. Re: bug with IPython notebook?

2014-10-10 Thread jay vyas
PySpark definetly works for me in ipython notebook.  A good way to debug is
do setMaster(local) in your python sc object, see if that works.  Then
from there, modify it to point to the real spark server.

Also, I added a hack where i  did sys.path.insert the path to pyspark in my
python note book to get it working properly.

You can try these instructions out if you want which i recently put
together based on some other stuff online + a few minor modifications .

http://jayunit100.blogspot.com/2014/07/ipython-on-spark.html


On Thu, Oct 9, 2014 at 2:50 PM, Andy Davidson a...@santacruzintegration.com
 wrote:

 I wonder if I am starting iPython notebook incorrectly. The example in my
 original email does not work. It looks like stdout is not configured
 correctly If I submit it as a python.py file It works correctly

 Any idea how I what the problem is?


 Thanks

 Andy


 From: Andrew Davidson a...@santacruzintegration.com
 Date: Tuesday, October 7, 2014 at 4:23 PM
 To: user@spark.apache.org user@spark.apache.org
 Subject: bug with IPython notebook?

 Hi

 I think I found a bug in the iPython notebook integration. I am not sure
 how to report it

 I am running spark-1.1.0-bin-hadoop2.4 on an AWS ec2 cluster. I start the
 cluster using the launch script provided by spark

 I start iPython notebook on my cluster master as follows and use an ssh
 tunnel to open the notebook in a browser running on my local computer

 ec2-user@ip-172-31-20-107 ~]$ IPYTHON_OPTS=notebook --pylab inline
 --no-browser --port=7000 /root/spark/bin/pyspark

 Bellow is the code my notebook executes


 Bug list:

1. Why do I need to create a SparkContext? If I run pyspark
interactively The context is created automatically for me
2. The print statement causes the output to be displayed in the
terminal I started pyspark, not in the notebooks output

 Any comments or suggestions would be greatly appreciated

 Thanks

 Andy


 import sys
 from operator import add

 from pyspark import SparkContext

 # only stand alone jobs should create a SparkContext
 sc = SparkContext(appName=pyStreamingSparkRDDPipe”)

 data = [1, 2, 3, 4, 5]
 rdd = sc.parallelize(data)

 def echo(data):
 print python recieved: %s % (data) # output winds up in the shell
 console in my cluster (ie. The machine I launched pyspark from)

 rdd.foreach(echo)
 print we are done





-- 
jay vyas


Re: Executor and BlockManager memory size

2014-10-10 Thread Boromir Widas
Hey Larry,

I have been trying to figure this out for standalone clusters as well.
http://apache-spark-user-list.1001560.n3.nabble.com/What-is-a-Block-Manager-td12833.html
has an answer as to what block manager is for.

From the documentation, what I understood was if you assign X GB to each
executor, spark.storage.memoryFraction(default 0.6) * X is assigned to the
BlockManager and the rest for the JVM itself(?).
However, as you see, 26.8G is assigned to the BM, and assuming 0.6
memoryFraction, this means the executor sees ~44.7G of memory, I am not
sure what happens to the difference(5.3G).


On Thu, Oct 9, 2014 at 9:40 PM, Larry Xiao xia...@sjtu.edu.cn wrote:

 Hi all,

 I'm confused about Executor and BlockManager, why they have different
 memory.

 14/10/10 08:50:02 INFO AppClient$ClientActor: Executor added:
 app-20141010085001-/2 on worker-20141010004933-brick6-35657
 (brick6:35657) with 6 cores
 14/10/10 08:50:02 INFO SparkDeploySchedulerBackend: Granted executor ID
 app-20141010085001-/2 on hostPort brick6:35657 with 6 cores, 50.0 GB RAM

 14/10/10 08:50:07 INFO BlockManagerMasterActor: Registering block manager
 brick6:53296 with 26.8 GB RAM

 and on the WebUI,

 Executor IDAddressRDD Blocks  Memory UsedDisk UsedActive
 TasksFailed Tasks  Complete TasksTotal TasksTask TimeInput
   Shuffle ReadShuffle Write
 0brick3:3760700.0 B / 26.8 GB0.0 B60  060
 ms0.0 B0.0 B0.0 B
 1brick1:5949300.0 B / 26.8 GB0.0 B60  060
 ms0.0 B0.0 B0.0 B
 2brick6:5329600.0 B / 26.8 GB0.0 B60  060
 ms0.0 B0.0 B0.0 B
 3brick5:3854300.0 B / 26.8 GB0.0 B60  060
 ms0.0 B0.0 B0.0 B
 4brick2:4493700.0 B / 26.8 GB0.0 B60  060
 ms0.0 B0.0 B0.0 B
 5brick4:4679800.0 B / 26.8 GB0.0 B60  060
 ms0.0 B0.0 B0.0 B
 driverbrick0:5769200.0 B / 274.6 MB0.0 B  000
   00 ms0.0 B0.0 B0.0 B

 As I understand it, a worker consist of a daemon and an executor, and
 executor takes charge both execution and storage.
 So does it mean that 26.8 GB is saved for storage and the rest is for
 execution?

 Another question is that, throughout execution, it seems that the
 blockmanager is always almost free.

 14/10/05 14:33:44 INFO BlockManagerInfo: Added broadcast_21_piece0 in
 memory on brick2:57501 (size: 1669.0 B, free: 21.2 GB)

 I don't know what I'm missing here.

 Best regards,
 Larry

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Breaking the previous large-scale sort record with Spark

2014-10-10 Thread Nan Zhu
Great! Congratulations! 

-- 
Nan Zhu


On Friday, October 10, 2014 at 11:19 AM, Mridul Muralidharan wrote:

 Brilliant stuff ! Congrats all :-)
 This is indeed really heartening news !
 
 Regards,
 Mridul
 
 
 On Fri, Oct 10, 2014 at 8:24 PM, Matei Zaharia matei.zaha...@gmail.com 
 (mailto:matei.zaha...@gmail.com) wrote:
  Hi folks,
  
  I interrupt your regularly scheduled user / dev list to bring you some 
  pretty cool news for the project, which is that we've been able to use 
  Spark to break MapReduce's 100 TB and 1 PB sort records, sorting data 3x 
  faster on 10x fewer nodes. There's a detailed writeup at 
  http://databricks.com/blog/2014/10/10/spark-breaks-previous-large-scale-sort-record.html.
   Summary: while Hadoop MapReduce held last year's 100 TB world record by 
  sorting 100 TB in 72 minutes on 2100 nodes, we sorted it in 23 minutes on 
  206 nodes; and we also scaled up to sort 1 PB in 234 minutes.
  
  I want to thank Reynold Xin for leading this effort over the past few 
  weeks, along with Parviz Deyhim, Xiangrui Meng, Aaron Davidson and Ali 
  Ghodsi. In addition, we'd really like to thank Amazon's EC2 team for 
  providing the machines to make this possible. Finally, this result would of 
  course not be possible without the many many other contributions, testing 
  and feature requests from throughout the community.
  
  For an engine to scale from these multi-hour petabyte batch jobs down to 
  100-millisecond streaming and interactive queries is quite uncommon, and 
  it's thanks to all of you folks that we are able to make this happen.
  
  Matei
  -
  To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org 
  (mailto:dev-unsubscr...@spark.apache.org)
  For additional commands, e-mail: dev-h...@spark.apache.org 
  (mailto:dev-h...@spark.apache.org)
  
 
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
 (mailto:user-unsubscr...@spark.apache.org)
 For additional commands, e-mail: user-h...@spark.apache.org 
 (mailto:user-h...@spark.apache.org)
 
 




Re: java.io.IOException Error in task deserialization

2014-10-10 Thread Sung Hwan Chung
I haven't seen this at all since switching to HttpBroadcast. It seems
TorrentBroadcast might have some issues?

On Thu, Oct 9, 2014 at 4:28 PM, Sung Hwan Chung coded...@cs.stanford.edu
wrote:

 I don't think that I saw any other error message. This is all I saw.

 I'm currently experimenting to see if this can be alleviated by using
 HttpBroadcastFactory instead of TorrentBroadcast. So far, with
 HttpBroadcast, I haven't seen this recurring as of yet. I'll keep you
 posted.

 On Thu, Oct 9, 2014 at 4:21 PM, Davies Liu dav...@databricks.com wrote:

 This exception should be caused by another one, could you paste all of
 them here?

 Also, that will be great if you can provide a script to reproduce this
 problem.

 Thanks!

 On Fri, Sep 26, 2014 at 6:11 AM, Arun Ahuja aahuj...@gmail.com wrote:
  Has anyone else seen this erorr in task deserialization?  The task is
  processing a small amount of data and doesn't seem to have much data
 hanging
  to the closure?  I've only seen this with Spark 1.1
 
  Job aborted due to stage failure: Task 975 in stage 8.0 failed 4 times,
 most
  recent failure: Lost task 975.3 in stage 8.0 (TID 24777, host.com):
  java.io.IOException: unexpected exception type
 
 
 java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1538)
 
  java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1025)
 
  java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
 
 
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
 
  java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 
  java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
 
  java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
 
 
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
 
  java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
  java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
 
 
 org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
 
 
 org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
 
  org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:159)
 
 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 
 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
  java.lang.Thread.run(Thread.java:744)

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Breaking the previous large-scale sort record with Spark

2014-10-10 Thread arthur.hk.c...@gmail.com
Wonderful !!

On 11 Oct, 2014, at 12:00 am, Nan Zhu zhunanmcg...@gmail.com wrote:

 Great! Congratulations!
 
 -- 
 Nan Zhu
 On Friday, October 10, 2014 at 11:19 AM, Mridul Muralidharan wrote:
 
 Brilliant stuff ! Congrats all :-)
 This is indeed really heartening news !
 
 Regards,
 Mridul
 
 
 On Fri, Oct 10, 2014 at 8:24 PM, Matei Zaharia matei.zaha...@gmail.com 
 wrote:
 Hi folks,
 
 I interrupt your regularly scheduled user / dev list to bring you some 
 pretty cool news for the project, which is that we've been able to use 
 Spark to break MapReduce's 100 TB and 1 PB sort records, sorting data 3x 
 faster on 10x fewer nodes. There's a detailed writeup at 
 http://databricks.com/blog/2014/10/10/spark-breaks-previous-large-scale-sort-record.html.
  Summary: while Hadoop MapReduce held last year's 100 TB world record by 
 sorting 100 TB in 72 minutes on 2100 nodes, we sorted it in 23 minutes on 
 206 nodes; and we also scaled up to sort 1 PB in 234 minutes.
 
 I want to thank Reynold Xin for leading this effort over the past few 
 weeks, along with Parviz Deyhim, Xiangrui Meng, Aaron Davidson and Ali 
 Ghodsi. In addition, we'd really like to thank Amazon's EC2 team for 
 providing the machines to make this possible. Finally, this result would of 
 course not be possible without the many many other contributions, testing 
 and feature requests from throughout the community.
 
 For an engine to scale from these multi-hour petabyte batch jobs down to 
 100-millisecond streaming and interactive queries is quite uncommon, and 
 it's thanks to all of you folks that we are able to make this happen.
 
 Matei
 -
 To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
 For additional commands, e-mail: dev-h...@spark.apache.org
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 



Rdd repartitioning

2014-10-10 Thread rapelly kartheek
Hi,
I was facing GC overhead errors while executing an application with 570MB
data(with rdd replication).

In order to fix the heap errors, I repartitioned the rdd to 10:

val logData = sc.textFile(hdfs:/text_data/text
data.txt).persist(StorageLevel.MEMORY_ONLY_2)
val parts=logData.coalesce(10,true)
  println(parts.partitions.length).

But the problem is, WebUI still shows number of partitions as 5 while the
print statement outputs 10. I tried even repartition(), but face the same
problem.

Also, does webUI show the storage details of each partition twice when I
replicate the rdd? Because, I see that webUI displays each partition only
once while it says 2 x replicated.

Can someone help me out in this!!!

-Karthik


Spark job (not Spark streaming) doesn't delete un-needed checkpoints.

2014-10-10 Thread Sung Hwan Chung
Un-needed checkpoints are not getting automatically deleted in my
application.

I.e. the lineage looks something like this and checkpoints simply
accumulate in a temporary directory (every lineage point, however, does zip
with a globally permanent):

PermanentRDD:Global zips with all the intermediate ones

Intermediate RDDs: A---B---CDEF-G
|  |
   |
  checkpoint  checkpoint
 checkpoint

Older intermediate RDDs never get used.


Re: Breaking the previous large-scale sort record with Spark

2014-10-10 Thread Steve Nunez
Great stuff. Wonderful to see such progress in so short a time.

How about some links to code and instructions so that these benchmarks can
be reproduced?

Regards,
- Steve

From:  Debasish Das debasish.da...@gmail.com
Date:  Friday, October 10, 2014 at 8:17
To:  Matei Zaharia matei.zaha...@gmail.com
Cc:  user user@spark.apache.org, dev d...@spark.apache.org
Subject:  Re: Breaking the previous large-scale sort record with Spark

 Awesome news Matei !
 
 Congratulations to the databricks team and all the community members...
 
 On Fri, Oct 10, 2014 at 7:54 AM, Matei Zaharia matei.zaha...@gmail.com
 wrote:
 Hi folks,
 
 I interrupt your regularly scheduled user / dev list to bring you some pretty
 cool news for the project, which is that we've been able to use Spark to
 break MapReduce's 100 TB and 1 PB sort records, sorting data 3x faster on 10x
 fewer nodes. There's a detailed writeup at
 http://databricks.com/blog/2014/10/10/spark-breaks-previous-large-scale-sort-
 record.html. Summary: while Hadoop MapReduce held last year's 100 TB world
 record by sorting 100 TB in 72 minutes on 2100 nodes, we sorted it in 23
 minutes on 206 nodes; and we also scaled up to sort 1 PB in 234 minutes.
 
 I want to thank Reynold Xin for leading this effort over the past few weeks,
 along with Parviz Deyhim, Xiangrui Meng, Aaron Davidson and Ali Ghodsi. In
 addition, we'd really like to thank Amazon's EC2 team for providing the
 machines to make this possible. Finally, this result would of course not be
 possible without the many many other contributions, testing and feature
 requests from throughout the community.
 
 For an engine to scale from these multi-hour petabyte batch jobs down to
 100-millisecond streaming and interactive queries is quite uncommon, and it's
 thanks to all of you folks that we are able to make this happen.
 
 Matei
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 
 



-- 
CONFIDENTIALITY NOTICE
NOTICE: This message is intended for the use of the individual or entity to 
which it is addressed and may contain information that is confidential, 
privileged and exempt from disclosure under applicable law. If the reader 
of this message is not the intended recipient, you are hereby notified that 
any printing, copying, dissemination, distribution, disclosure or 
forwarding of this communication is strictly prohibited. If you have 
received this communication in error, please contact the sender immediately 
and delete it from your system. Thank You.


Re: How does the Spark Accumulator work under the covers?

2014-10-10 Thread Jayant Shekhar
Hi Areg,

Check out
http://spark.apache.org/docs/latest/programming-guide.html#accumulators

val sum = sc.accumulator(0)   // accumulator created from an initial value
in the driver

The accumulator variable is created in the driver. Tasks running on the
cluster can then add to it. However, they cannot read its value. Only the
driver program can read the accumulator’s value, using its value method.

sum.value  // in the driver

 myRdd.map(x = sum += x)
 where is this function being run
This is being run by the tasks in the workers.

The driver accumulates the data from the various workers and mergers them
to get the final result as Haripriya mentioned.

Thanks,
Jayant


On Fri, Oct 10, 2014 at 7:46 AM, HARIPRIYA AYYALASOMAYAJULA 
aharipriy...@gmail.com wrote:

 If you use parallelize, the data is distributed across multiple nodes
 available and sum is computed individually within each partition and later
 merged. The driver manages the entire process. Is my understanding correct?
 Can someone please correct me if I am wrong?

 On Fri, Oct 10, 2014 at 9:37 AM, Areg Baghdasaryan (BLOOMBERG/ 731 LEX -)
 abaghdasa...@bloomberg.net wrote:

 Hello,
 I was wondering on what does the Spark accumulator do under the covers.
 I’ve implemented my own associative addInPlace function for the
 accumulator, where is this function being run? Let’s say you call something
 like myRdd.map(x = sum += x) is “sum” being accumulated locally in any
 way, for each element or partition or node? Is “sum” a broadcast variable?
 Or does it only exist on the driver node? How does the driver node get
 access to the “sum”?
 Thanks,
 Areg




 --
 Regards,
 Haripriya Ayyalasomayajula



Application failure in yarn-cluster mode

2014-10-10 Thread Christophe Préaud
Hi,

After updating from spark-1.0.0 to spark-1.1.0, my spark applications failed 
most of the time (but not always) in yarn-cluster mode (but not in yarn-client 
mode).

Here is my configuration:

 *   spark-1.1.0
 *   hadoop-2.2.0

And the hadoop.tmp.dir definition in the hadoop core-site.xml file (each 
directory is on its own partition):
property
  namehadoop.tmp.dir/name
  
valuefile:/d1/yarn/local,file:/d2/yarn/local,file:/d3/yarn/local,file:/d4/yarn/local,file:/d5/yarn/local,file:/d6/yarn/local,file:/d7/yarn/local/value
/property

After investigating, it turns out that the problem is when the executor fetches 
a jar file: the jar is downloaded in a temporary file, always in /d1/yarn/local 
(see hadoop.tmp.dir definition above), and then moved in one of the temporary 
directory defined in hadoop.tmp.dir:

 *   if it is the same than the temporary file (i.e. /d1/yarn/local), then the 
application continues normally
 *   if it is another one (i.e. /d2/yarn/local, /d3/yarn/local,...), it fails 
with the following error:

14/10/10 14:33:51 ERROR executor.Executor: Exception in task 0.0 in stage 1.0 
(TID 0)
java.io.FileNotFoundException: ./logReader-1.0.10.jar (Permission denied)
at java.io.FileOutputStream.open(Native Method)
at java.io.FileOutputStream.init(FileOutputStream.java:221)
at com.google.common.io.Files$FileByteSink.openStream(Files.java:223)
at com.google.common.io.Files$FileByteSink.openStream(Files.java:211)
at com.google.common.io.ByteSource.copyTo(ByteSource.java:203)
at com.google.common.io.Files.copy(Files.java:436)
at com.google.common.io.Files.move(Files.java:651)
at org.apache.spark.util.Utils$.fetchFile(Utils.scala:440)
at 
org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:325)
at 
org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:323)
at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at 
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at 
org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$updateDependencies(Executor.scala:323)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:158)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

I have no idea why the move fails when the source and target files are not on 
the same partition, for the moment I have worked around the problem with the 
attached patch (i.e. I ensure that the temp file and the moved file are always 
on the same partition).

Any thought about this problem?

Thanks!
Christophe.


Kelkoo SAS
Société par Actions Simplifiée
Au capital de € 4.168.964,30
Siège social : 8, rue du Sentier 75002 Paris
425 093 069 RCS Paris

Ce message et les pièces jointes sont confidentiels et établis à l'attention 
exclusive de leurs destinataires. Si vous n'êtes pas le destinataire de ce 
message, merci de le détruire et d'en avertir l'expéditeur.
--- core/src/main/scala/org/apache/spark/util/Utils.scala.orig	2014-09-03 08:00:33.0 +0200
+++ core/src/main/scala/org/apache/spark/util/Utils.scala	2014-10-10 17:51:59.0 +0200
@@ -349,8 +349,7 @@
*/
   def fetchFile(url: String, targetDir: File, conf: SparkConf, securityMgr: SecurityManager) {
 val filename = url.split(/).last
-val tempDir = getLocalDir(conf)
-val tempFile =  File.createTempFile(fetchFileTemp, null, new File(tempDir))
+val tempFile =  File.createTempFile(fetchFileTemp, null, new File(targetDir.getAbsolutePath))
 val targetFile = new File(targetDir, filename)
 val uri = new URI(url)
 val fileOverwrite = conf.getBoolean(spark.files.overwrite, false)


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Re: SparkSQL LEFT JOIN problem

2014-10-10 Thread Liquan Pei
Hi

Can you try
select birthday from customer left join profile on customer.account_id =
profile.account_id
to see if the problems remains on your entire data?

Thanks,
Liquan

On Fri, Oct 10, 2014 at 8:20 AM, invkrh inv...@gmail.com wrote:

 Hi,

 I am exploring SparkSQL 1.1.0, I have a problem on LEFT JOIN.

 Here is the request:

 select * from customer left join profile on customer.account_id =
 profile.account_id

 The two tables' schema are shown as following:

 // Table: customer
 root
  |-- account_id: string (nullable = false)
  |-- birthday: string (nullable = true)
  |-- preferstore: string (nullable = true)
  |-- registstore: string (nullable = true)
  |-- gender: string (nullable = true)
  |-- city_name_en: string (nullable = true)
  |-- register_date: string (nullable = true)
  |-- zip: string (nullable = true)

 // Table: profile
 root
  |-- account_id: string (nullable = false)
  |-- card_type: string (nullable = true)
  |-- card_upgrade_time_black: string (nullable = true)
  |-- card_upgrade_time_gold: string (nullable = true)

 However, I have always an exception:

 Exception in thread main
 org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved
 attributes: *, tree:
 Project [*]
  Join LeftOuter, Some(('customer.account_id = 'profile.account_id))
   Subquery customer
SparkLogicalPlan (ExistingRdd

 [account_id#0,birthday#1,preferstore#2,registstore#3,gender#4,city_name_en#5,register_date#6,zip#7],
 MappedRDD[5] at map at SQLFetcher.scala:43)
   Subquery profile
SparkLogicalPlan (ExistingRdd

 [account_id#8,card_type#9,card_upgrade_time_black#10,card_upgrade_time_gold#11],
 MappedRDD[12] at map at SQLFetcher.scala:43)

 I was not sure where the problem is. So I create two simple tables to
 isolate the problem.

 // table 1
 a   b   c
 4   8   9
 1   3   4
 3   4   5

 // table 2
 a   b   c
 1   2   3
 4   5   6

 This time, it works.

 So the problem might be in data. I have just sampled some lines of input
 tables to create new ones.
 This also works.

 I am so confused. The problem is in the data, but the error messages are
 not
 enough to find it (if I am not missing anything.)

 Some lines of the sampled tables.

 // Table: customer

 [50660,1975-06-05 00:00:00.000,13,12,male,ningboshi,2006-12-14
 00:00:00.000,]
 [50666,1984-02-23 00:00:00.000,72,5,Female,beijingshi,2006-12-14
 00:00:00.000,100086]
 [50680,1976-11-25 00:00:00.000,59,5,Female,beijingshi,2006-12-14
 00:00:00.000,100022]
 [85,1971-03-27 00:00:00.000,2,2,Female,shanghaishi,2005-09-20
 00:00:00.000,200336]


 // Table: profile

 [1144681,3,2010-02-18 00:00:00.000,2013-02-28 00:00:00.000]
 [50666,2,2010-10-31 00:00:00.000,]
 [3930657,1,,]
 [1056365,2,2009-12-29 00:00:00.000,]

 Any help ? =)

 Hao




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-LEFT-JOIN-problem-tp16152.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
Liquan Pei
Department of Physics
University of Massachusetts Amherst


RE: Spark on Mesos Issue - Do I need to install Spark on Mesos slaves

2014-10-10 Thread Malte
I have actually had the same problem. spark.executor.uri on HDFS did not work
so I had to put it in a local folder



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-on-Mesos-Issue-Do-I-need-to-install-Spark-on-Mesos-slaves-tp16129p16165.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Help with using combineByKey

2014-10-10 Thread HARIPRIYA AYYALASOMAYAJULA
Thank you guys!

It was very helpful and now I understand it better.




On Fri, Oct 10, 2014 at 1:38 AM, Davies Liu dav...@databricks.com wrote:

 Maybe this version is easier to use:

 plist.mapValues((v) = (if (v 0) 1 else 0, 1)).reduceByKey((x, y) =
 (x._1 + y._1, x._2 + y._2))

 It has similar behavior with combineByKey(), will by faster than
 groupByKey() version.

 On Thu, Oct 9, 2014 at 9:28 PM, HARIPRIYA AYYALASOMAYAJULA
 aharipriy...@gmail.com wrote:
  Sean,
 
  Thank you. It works. But I am still confused about the function. Can you
  kindly throw some light on it?
  I was going through the example mentioned in
 
 https://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch04.html
 
  Is there any better source through which I can learn more about these
  functions? It would be helpful if I can get a chance to look at more
  examples.
  Also, I assume using combineByKey helps us solve it parallel than using
  simple functions provided by scala as mentioned by Yana. Am I correct?
 
  On Thu, Oct 9, 2014 at 12:30 PM, Sean Owen so...@cloudera.com wrote:
 
  Oh duh, sorry. The initialization should of course be (v) = (if (v 
  0) 1 else 0, 1)
  This gives the answer you are looking for. I don't see what Part2 is
  supposed to do differently.
 
  On Thu, Oct 9, 2014 at 6:14 PM, HARIPRIYA AYYALASOMAYAJULA
  aharipriy...@gmail.com wrote:
   Hello Sean,
  
   Thank you, but changing from v to 1 doesn't help me either.
  
   I am trying to count the number of non-zero values using the first
   accumulator.
   val newlist = List ((LAX,6), (LAX,0), (LAX,7), (SFO,0),
   (SFO,0),
   (SFO,9))
  
   val plist = sc.parallelize(newlist)
  
val part1 = plist.combineByKey(
  (v) = (1, 1),
  (acc: (Int, Int), v) = ( if(v  0) acc._1 + 1 else acc._1, acc._2
 +
   1),
  (acc1: (Int, Int), acc2: (Int, Int)) = (acc1._1 + acc2._1,
 acc1._2 +
   acc2._2)
  )
  
  val Part2 = part1.map{ case (key, value) = (key,
   (value._1,value._2)) }
  
   This should give me the result
   (LAX,(2,3))
   (SFO,(1,3))
  
  
  
   On Thu, Oct 9, 2014 at 11:48 AM, Sean Owen so...@cloudera.com
 wrote:
  
   You have a typo in your code at var acc:, and the map from opPart1
   to opPart2 looks like a no-op, but those aren't the problem I think.
   It sounds like you intend the first element of each pair to be a
 count
   of nonzero values, but you initialize the first element of the pair
 to
   v, not 1, in v = (v,1). Try v = (1,1)
  
  
   On Thu, Oct 9, 2014 at 4:47 PM, HARIPRIYA AYYALASOMAYAJULA
   aharipriy...@gmail.com wrote:
   
I am a beginner to Spark and finding it difficult to implement a
 very
simple
reduce operation. I read that is ideal to use combineByKey for
complex
reduce operations.
   
My input:
   
val input = sc.parallelize(List((LAX,6), (LAX,8), (LAX,7),
(SFO,0),
(SFO,1),
(SFO,9),(PHX,65),(PHX,88),(KX,7),(KX,6),(KX,1),
(KX,9),
   
   
   
 (HOU,56),(HOU,5),(HOU,59),(HOU,0),(MA,563),(MA,545),(MA,5),(MA,0),(MA,0)))
   
   
 val opPart1 = input.combineByKey(
   (v) = (v, 1),
   (var acc: (Int, Int), v) = ( if(v  0) acc._1 + 1 else acc._1,
acc._2 +
1),
   (acc1: (Int, Int), acc2: (Int, Int)) = (acc1._1 + acc2._1,
acc1._2 +
acc2._2)
   )
   
   val opPart2 = opPart1.map{ case (key, value) = (key,
(value._1,value._2)) }
   
 opPart2.collectAsMap().map(println(_))
   
If the value is greater than 0, the first accumulator should be
incremented
by 1, else it remains the same. The second accumulator is a simple
counter
for each value.  I am getting an incorrect output (garbage values
)for
the
first accumulator. Please help.
   
 The equivalent reduce operation in Hadoop MapReduce is :
   
public static class PercentageCalcReducer extends
ReducerText,IntWritable,Text,FloatWritable
   
{
   
private FloatWritable pdelay = new FloatWritable();
   
   
public void reduce(Text key, IterableIntWritable values,Context
context)throws IOException,InterruptedException
   
{
   
int acc2=0;
   
float frac_delay, percentage_delay;
   
int acc1=0;
   
for(IntWritable val : values)
   
{
   
if(val.get()  0)
   
{
   
acc1++;
   
}
   
acc2++;
   
}
   
   
   
frac_delay = (float)acc1/acc2;
   
percentage_delay = frac_delay * 100 ;
   
pdelay.set(percentage_delay);
   
context.write(key,pdelay);
   
}
   
}
   
   
Please help. Thank you for your time.
   
--
   
Regards,
   
Haripriya Ayyalasomayajula
contact : 650-796-7112
  
  
  
  
   --
   Regards,
   Haripriya Ayyalasomayajula
   contact : 650-796-7112
 
 
 
 
  --
  Regards,
  Haripriya Ayyalasomayajula
  contact : 650-796-7112




-- 
Regards,
Haripriya Ayyalasomayajula
contact : 650-796-7112


Re: java.io.IOException Error in task deserialization

2014-10-10 Thread Davies Liu
Maybe, TorrentBroadcast is more complicated than HttpBroadcast, could
you tell us
how to reproduce this issue? That will help us a lot to improve
TorrentBroadcast.

Thanks!

On Fri, Oct 10, 2014 at 8:46 AM, Sung Hwan Chung
coded...@cs.stanford.edu wrote:
 I haven't seen this at all since switching to HttpBroadcast. It seems
 TorrentBroadcast might have some issues?

 On Thu, Oct 9, 2014 at 4:28 PM, Sung Hwan Chung coded...@cs.stanford.edu
 wrote:

 I don't think that I saw any other error message. This is all I saw.

 I'm currently experimenting to see if this can be alleviated by using
 HttpBroadcastFactory instead of TorrentBroadcast. So far, with
 HttpBroadcast, I haven't seen this recurring as of yet. I'll keep you
 posted.

 On Thu, Oct 9, 2014 at 4:21 PM, Davies Liu dav...@databricks.com wrote:

 This exception should be caused by another one, could you paste all of
 them here?

 Also, that will be great if you can provide a script to reproduce this
 problem.

 Thanks!

 On Fri, Sep 26, 2014 at 6:11 AM, Arun Ahuja aahuj...@gmail.com wrote:
  Has anyone else seen this erorr in task deserialization?  The task is
  processing a small amount of data and doesn't seem to have much data
  hanging
  to the closure?  I've only seen this with Spark 1.1
 
  Job aborted due to stage failure: Task 975 in stage 8.0 failed 4 times,
  most
  recent failure: Lost task 975.3 in stage 8.0 (TID 24777, host.com):
  java.io.IOException: unexpected exception type
 
 
  java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1538)
 
  java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1025)
 
  java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
 
 
  java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
 
  java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 
 
  java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
 
  java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
 
 
  java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
 
  java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 
  java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
 
 
  org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
 
 
  org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
 
  org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:159)
 
 
  java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 
 
  java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
  java.lang.Thread.run(Thread.java:744)

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Spark SQL parser bug?

2014-10-10 Thread Mohammed Guller
Hi Chen,
Thanks for looking into this.

It looks like the bug may be in the Spark Cassandra connector code. Table x is 
a table in Cassandra.

However, while trying to troubleshoot this issue, I noticed another issue. This 
time I did not use Cassandra; instead created a table on the fly. I am not 
seeing the same issue, but the results do not like right. Here is a my complete 
Spark-shell session:

Spark assembly has been built with Hive, including Datanucleus jars on classpath
Welcome to
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.1.0
  /_/

Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_67)
Type in expressions to have them evaluated.
Type :help for more information.
14/10/10 11:05:11 WARN Utils: Your hostname, ubuntu resolves to a loopback 
address: 127.0.1.1; using 192.168.59.135 instead (on interface eth0)
14/10/10 11:05:11 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another 
address
14/10/10 11:05:14 WARN NativeCodeLoader: Unable to load native-hadoop library 
for your platform... using builtin-java classes where applicable
Spark context available as sc.

scala import org.apache.spark.sql._
import org.apache.spark.sql._

scala val sqlContext = new SQLContext(sc)
sqlContext: org.apache.spark.sql.SQLContext = 
org.apache.spark.sql.SQLContext@2be5c74d

scala import sqlContext.createSchemaRDD
import sqlContext.createSchemaRDD

scala case class X(a: Int, ts: java.sql.Timestamp)
defined class X

scala val rdd = sc.parallelize( 1 to 5).map{ n = X(n, new 
java.sql.Timestamp(132554880L + n*8640))}
rdd: org.apache.spark.rdd.RDD[X] = MappedRDD[1] at map at console:20

scala rdd.collect
res0: Array[X] = Array(X(1,2012-01-03 16:00:00.0), X(2,2012-01-04 16:00:00.0), 
X(3,2012-01-05 16:00:00.0), X(4,2012-01-06 16:00:00.0), X(5,2012-01-07 
16:00:00.0))

scala rdd.registerTempTable(x)

scala val sRdd = sqlContext.sql(select a from x where ts = 
'2012-01-01T00:00:00';)
sRdd: org.apache.spark.sql.SchemaRDD =
SchemaRDD[4] at RDD at SchemaRDD.scala:103
== Query Plan ==
== Physical Plan ==
Project [a#0]
ExistingRdd [a#0,ts#1], MapPartitionsRDD[6] at mapPartitions at 
basicOperators.scala:208

scala sRdd.collect
res2: Array[org.apache.spark.sql.Row] = Array()



Mohammed

From: Cheng Lian [mailto:lian.cs@gmail.com]
Sent: Friday, October 10, 2014 4:37 AM
To: Mohammed Guller; user@spark.apache.org
Subject: Re: Spark SQL parser bug?


Hi Mohammed,

Would you mind to share the DDL of the table x and the complete stacktrace of 
the exception you got? A full Spark shell session history would be more than 
helpful. PR #2084 had been merged in master in Aug, and timestamp type is 
supported in 1.1.

I tried the following snippets in Spark shell (v1.1), and didn’t observe this 
issue:

scala import org.apache.spark.sql._

import org.apache.spark.sql._



scala import sc._

import sc._



scala val sqlContext = new SQLContext(sc)

sqlContext: org.apache.spark.sql.SQLContext = 
org.apache.spark.sql.SQLContext@6c3441c5mailto:org.apache.spark.sql.SQLContext@6c3441c5



scala import sqlContext._

import sqlContext._



scala case class Record(a: Int, ts: java.sql.Timestamp)

defined class Record



scala makeRDD(Seq.empty[Record], 1).registerTempTable(x)



scala sql(SELECT a FROM x WHERE ts = '2012-01-01T00:00:00' AND ts = 
'2012-03-31T23:59:59')

res1: org.apache.spark.sql.SchemaRDD =

SchemaRDD[3] at RDD at SchemaRDD.scala:103

== Query Plan ==

== Physical Plan ==

Project [a#0]

 ExistingRdd [a#0,ts#1], MapPartitionsRDD[5] at mapPartitions at 
basicOperators.scala:208



scala res1.collect()

...

res2: Array[org.apache.spark.sql.Row] = Array()

Cheng

On 10/9/14 10:26 AM, Mohammed Guller wrote:
Hi –

When I run the following Spark SQL query in Spark-shell ( version 1.1.0) :

val rdd = sqlContext.sql(SELECT a FROM x WHERE ts = '2012-01-01T00:00:00' AND 
ts = '2012-03-31T23:59:59' )

it gives the following error:
rdd: org.apache.spark.sql.SchemaRDD =
SchemaRDD[294] at RDD at SchemaRDD.scala:103
== Query Plan ==
== Physical Plan ==
java.util.NoSuchElementException: head of empty list

The ts column in the where clause has timestamp data and is of type timestamp. 
If I replace the string '2012-01-01T00:00:00' in the where clause with its 
epoch value, then the query works fine.

It looks I have run into an issue described in this pull request: 
https://github.com/apache/spark/pull/2084

Is that PR not merged in Spark version 1.1.0? Or am I missing something?

Thanks,
Mohammed


​


Re: Akka disassociation on Java SE Embedded

2014-10-10 Thread bhusted
How do you increase the spark block manager timeout?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Akka-disassociation-on-Java-SE-Embedded-tp6266p16176.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Akka disassociation on Java SE Embedded

2014-10-10 Thread Nan Zhu
https://github.com/CodingCat/spark/commit/c5cee24689ac4ad1187244e6a16537452e99e771
 

-- 
Nan Zhu


On Friday, October 10, 2014 at 4:31 PM, bhusted wrote:

 How do you increase the spark block manager timeout?
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Akka-disassociation-on-Java-SE-Embedded-tp6266p16176.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com 
 (http://Nabble.com).
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
 (mailto:user-unsubscr...@spark.apache.org)
 For additional commands, e-mail: user-h...@spark.apache.org 
 (mailto:user-h...@spark.apache.org)
 
 




Issue with java spark broadcast

2014-10-10 Thread Jacob Maloney
I'm trying to broadcast an accumulator I generated earlier in my app. However I 
get a nullpointer exception whenever I reference the value.

// The start of my accumulator generation
LookupKeyToIntMap keyToIntMapper = new LookupKeyToIntMap();

keyToIntMapper.setNumPartitions(intermediatePair.splits().size());
keyToIntMapper.setMapAccumulator(keyToIntMap);
JavaRDDTuple2Integer,IterableLong intermediateIntsTuple = 
intermediatePair.mapPartitionsWithIndex(keyToIntMapper,false);

JavaPairRDDInteger,IterableLong intermediatePairInts = 
JavaPairRDD.fromJavaRDD(intermediateIntsTuple);

JavaPairRDDInteger,Tuple2Integer,Integer sims = 
intermediatePairInts.mapValues(new SelfSim());

// I force the RDD to evaluate so to avoid laziness issues
MapInteger,Tuple2Integer,Integer simsMap = 
sims.collectAsMap();

// Broadcast the map
// If I include a print statement here on the accumulator I can 
print the map out succesfully
broadcastVar = ctx.broadcast(keyToIntMap.value());

//  Here I try to access the broadcasted map
JavaPairRDDInteger,Long indidIntKeyPair = 
indidKeyPairFiltered.mapToPair(new PairFunctionTuple2String,Long, Integer, 
Long(){  
@Override
public Tuple2Integer,Long call(Tuple2String,Long 
keyVal) throws Exception{
Integer outInt = 
broadcastVar.value().inverse().get(keyVal._1);
return new 
Tuple2Integer,Long(outInt,keyVal._2);
}
});

This works when I run it locally just fine but when I move it to a cluster 
environment it throws nullpointerexceptions. My questions is why can't I access 
this map? And what do I have to do to make it accessible.

Thanks,

Jacob

-Original Message-
From: user-h...@spark.apache.org [mailto:user-h...@spark.apache.org] 
Sent: Friday, October 10, 2014 4:02 PM
To: Jacob Maloney
Subject: FAQ for user@spark.apache.org

Hi! This is the ezmlm program. I'm managing the user@spark.apache.org mailing 
list.

FAQ - Frequently asked questions of the user@spark.apache.org list.

None available yet.


--- Administrative commands for the user list ---

I can handle administrative requests automatically. Please do not send them to 
the list address! Instead, send your message to the correct command address:

To subscribe to the list, send a message to:
   user-subscr...@spark.apache.org

To remove your address from the list, send a message to:
   user-unsubscr...@spark.apache.org

Send mail to the following for info and FAQ for this list:
   user-i...@spark.apache.org
   user-...@spark.apache.org

Similar addresses exist for the digest list:
   user-digest-subscr...@spark.apache.org
   user-digest-unsubscr...@spark.apache.org

To get messages 123 through 145 (a maximum of 100 per request), mail:
   user-get.123_...@spark.apache.org

To get an index with subject and author for messages 123-456 , mail:
   user-index.123_...@spark.apache.org

They are always returned as sets of 100, max 2000 per request, so you'll 
actually get 100-499.

To receive all messages with the same subject as message 12345, send a short 
message to:
   user-thread.12...@spark.apache.org

The messages should contain one line or word of text to avoid being treated as 
sp@m, but I will ignore their content.
Only the ADDRESS you send to is important.

You can start a subscription for an alternate address, for example 
john@host.domain, just add a hyphen and your address (with '=' instead of 
'@') after the command word:
user-subscribe-john=host.dom...@spark.apache.org

To stop subscription for this address, mail:
user-unsubscribe-john=host.dom...@spark.apache.org

In both cases, I'll send a confirmation message to that address. When you 
receive it, simply reply to it to complete your subscription.

If despite following these instructions, you do not get the desired results, 
please contact my owner at user-ow...@spark.apache.org. Please be patient, my 
owner is a lot slower than I am ;-)

--- Enclosed is a copy of the request I received.

Return-Path: jmalo...@conversantmedia.com
Received: (qmail 26980 invoked by uid 99); 10 Oct 2014 21:02:15 -
Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230)
by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 10 Oct 2014 21:02:15 +
X-ASF-Spam-Status: No, hits=2.2 required=5.0
tests=HTML_MESSAGE,SPF_PASS,T_FILL_THIS_FORM_SHORT
X-Spam-Check-By: apache.org
Received-SPF: pass (nike.apache.org: domain of jmalo...@conversantmedia.com 
designates 69.8.121.83 as permitted sender)
Received: from [69.8.121.83] (HELO ord-smtp.vclk.net) (69.8.121.83)
by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 10 Oct 2014 21:01:49 

Re: getting tweets for a specified handle

2014-10-10 Thread SK
Thanks. I made the change and ran the code. But I dont get any tweets for my
handle, although I do see the tweets when I search for it on twitter. Does
Spark allow us to get the tweets from the past (say the last 100 tweets?
tweets that appeared in the last 10 minutes)?

thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/getting-tweets-for-a-specified-handle-tp16085p16180.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark Streaming KafkaUtils Issue

2014-10-10 Thread Abraham Jacob
Hi Folks,

I am seeing some strange behavior when using the Spark Kafka connector in
Spark streaming.

I have a Kafka topic which has 8 partitions. I have a kafka producer that
pumps some messages into this topic.

On the consumer side I have a spark streaming application that that has 8
executors on 8 worker nodes and 8 ReceiverInputDStream with the same kafka
group id connected to the 8 partitions I have for the topic. Also the kafka
consumer property auto.offset.reset is set to smallest.


Now here is the sequence of steps -

(1) I Start the the spark streaming app.
(2) Start the producer.

As this point I see the messages that are being pumped from the producer in
Spark Streaming.  Then I -

(1) Stopped the producer
(2) Wait for all the message to be consumed.
(2) Stopped the spark streaming app.

Now when I restart the spark streaming app (note - the producer is still
down and no messages are being pumped into the topic) - I observe the
following -

(1) Spark Streaming starts reading from each partition right from the
beginning.


This is not what I was expecting. I was expecting the consumers started by
spark streaming to start from where it left off

Is my assumption not correct that the consumers (the kafka/spark
connector) to start reading from the topic where it last left off...?

Has anyone else seen this behavior? Is there a way to make it such that it
starts from where it left off?

Regards,
- Abraham


Re: Spark Streaming KafkaUtils Issue

2014-10-10 Thread Sean McNamara
Would you mind sharing the code leading to your createStream?  Are you also 
setting group.id?

Thanks,

Sean


On Oct 10, 2014, at 4:31 PM, Abraham Jacob abe.jac...@gmail.com wrote:

 Hi Folks,
 
 I am seeing some strange behavior when using the Spark Kafka connector in 
 Spark streaming. 
 
 I have a Kafka topic which has 8 partitions. I have a kafka producer that 
 pumps some messages into this topic.
 
 On the consumer side I have a spark streaming application that that has 8 
 executors on 8 worker nodes and 8 ReceiverInputDStream with the same kafka 
 group id connected to the 8 partitions I have for the topic. Also the kafka 
 consumer property auto.offset.reset is set to smallest.
 
 
 Now here is the sequence of steps - 
 
 (1) I Start the the spark streaming app.
 (2) Start the producer.
 
 As this point I see the messages that are being pumped from the producer in 
 Spark Streaming.  Then I - 
 
 (1) Stopped the producer
 (2) Wait for all the message to be consumed.
 (2) Stopped the spark streaming app.
 
 Now when I restart the spark streaming app (note - the producer is still down 
 and no messages are being pumped into the topic) - I observe the following - 
 
 (1) Spark Streaming starts reading from each partition right from the 
 beginning.
 
 
 This is not what I was expecting. I was expecting the consumers started by 
 spark streaming to start from where it left off
 
 Is my assumption not correct that the consumers (the kafka/spark connector) 
 to start reading from the topic where it last left off...?
 
 Has anyone else seen this behavior? Is there a way to make it such that it 
 starts from where it left off?
 
 Regards,
 - Abraham


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming KafkaUtils Issue

2014-10-10 Thread Abraham Jacob
Sure... I do set the group.id for all the consumers to be the same. Here is
the code ---

SparkConf sparkConf = new
SparkConf().setMaster(yarn-cluster).setAppName(Streaming WordCount);
sparkConf.set(spark.shuffle.manager, SORT);
sparkConf.set(spark.streaming.unpersist, true);
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new
Duration(1000));
 MapString, String kafkaConf = new HashMapString, String();
kafkaConf.put(zookeeper.connect, zookeeper);
kafkaConf.put(group.id, consumerGrp);
kafkaConf.put(auto.offset.reset, smallest);
kafkaConf.put(zookeeper.conection.timeout.ms, 1000);
kafkaConf.put(rebalance.max.retries, 4);
kafkaConf.put(rebalance.backoff.ms, 3000);
 MapString, Integer topicMap = new HashMapString, Integer();
topicMap.put(topic, 1);
 ListJavaPairDStreambyte[], String kafkaStreams = new
ArrayListJavaPairDStreambyte[], String();
for(int i = 0; i  numPartitions; i++) {
kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, String.class,
DefaultDecoder.class, PayloadDeSerializer.class,
kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER()).mapToPair(new
PairFunctionTuple2byte[],String, byte[], String() {

private static final long serialVersionUID = -1936810126415608167L;

public Tuple2byte[], String call(Tuple2byte[], String tuple2) throws
Exception {
return tuple2;
}
}
)
);
}


JavaPairDStreambyte[], String unifiedStream;
if (kafkaStreams.size()  1) {
unifiedStream = jssc.union(kafkaStreams.get(0), kafkaStreams.subList(1,
kafkaStreams.size()));
} else {
unifiedStream = kafkaStreams.get(0);
}
 unifiedStream.print();
jssc.start();
jssc.awaitTermination();


-abe


On Fri, Oct 10, 2014 at 3:37 PM, Sean McNamara sean.mcnam...@webtrends.com
wrote:

 Would you mind sharing the code leading to your createStream?  Are you
 also setting group.id?

 Thanks,

 Sean


 On Oct 10, 2014, at 4:31 PM, Abraham Jacob abe.jac...@gmail.com wrote:

  Hi Folks,
 
  I am seeing some strange behavior when using the Spark Kafka connector
 in Spark streaming.
 
  I have a Kafka topic which has 8 partitions. I have a kafka producer
 that pumps some messages into this topic.
 
  On the consumer side I have a spark streaming application that that has
 8 executors on 8 worker nodes and 8 ReceiverInputDStream with the same
 kafka group id connected to the 8 partitions I have for the topic. Also the
 kafka consumer property auto.offset.reset is set to smallest.
 
 
  Now here is the sequence of steps -
 
  (1) I Start the the spark streaming app.
  (2) Start the producer.
 
  As this point I see the messages that are being pumped from the producer
 in Spark Streaming.  Then I -
 
  (1) Stopped the producer
  (2) Wait for all the message to be consumed.
  (2) Stopped the spark streaming app.
 
  Now when I restart the spark streaming app (note - the producer is still
 down and no messages are being pumped into the topic) - I observe the
 following -
 
  (1) Spark Streaming starts reading from each partition right from the
 beginning.
 
 
  This is not what I was expecting. I was expecting the consumers started
 by spark streaming to start from where it left off
 
  Is my assumption not correct that the consumers (the kafka/spark
 connector) to start reading from the topic where it last left off...?
 
  Has anyone else seen this behavior? Is there a way to make it such that
 it starts from where it left off?
 
  Regards,
  - Abraham




-- 
~


Re: Spark Streaming KafkaUtils Issue

2014-10-10 Thread Sean McNamara
How long do you let the consumers run for?  Is it less than 60 seconds by 
chance?  auto.commit.interval.ms defaults to 6 (60 seconds).  If so that 
may explain why you are seeing that behavior.

Cheers,

Sean


On Oct 10, 2014, at 4:47 PM, Abraham Jacob 
abe.jac...@gmail.commailto:abe.jac...@gmail.com wrote:

Sure... I do set the group.idhttp://group.id/ for all the consumers to be the 
same. Here is the code ---

SparkConf sparkConf = new 
SparkConf().setMaster(yarn-cluster).setAppName(Streaming WordCount);
sparkConf.set(spark.shuffle.manager, SORT);
sparkConf.set(spark.streaming.unpersist, true);
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new 
Duration(1000));
MapString, String kafkaConf = new HashMapString, String();
kafkaConf.put(zookeeper.connect, zookeeper);
kafkaConf.put(group.idhttp://group.id/, consumerGrp);
kafkaConf.put(auto.offset.reset, smallest);
kafkaConf.put(zookeeper.conection.timeout.mshttp://zookeeper.conection.timeout.ms/,
 1000);
kafkaConf.put(rebalance.max.retries, 4);
kafkaConf.put(rebalance.backoff.mshttp://rebalance.backoff.ms/, 3000);
MapString, Integer topicMap = new HashMapString, Integer();
topicMap.put(topic, 1);
ListJavaPairDStreambyte[], String kafkaStreams = new 
ArrayListJavaPairDStreambyte[], String();
for(int i = 0; i  numPartitions; i++) {
kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, String.class,
DefaultDecoder.class, PayloadDeSerializer.class,
kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER()).mapToPair(new 
PairFunctionTuple2byte[],String, byte[], String() {

private static final long serialVersionUID = -1936810126415608167L;

public Tuple2byte[], String call(Tuple2byte[], String tuple2) throws 
Exception {
return tuple2;
}
}
)
);
}


JavaPairDStreambyte[], String unifiedStream;
if (kafkaStreams.size()  1) {
unifiedStream = jssc.union(kafkaStreams.get(0), kafkaStreams.subList(1, 
kafkaStreams.size()));
} else {
unifiedStream = kafkaStreams.get(0);
}
unifiedStream.print();
jssc.start();
jssc.awaitTermination();


-abe


On Fri, Oct 10, 2014 at 3:37 PM, Sean McNamara 
sean.mcnam...@webtrends.commailto:sean.mcnam...@webtrends.com wrote:
Would you mind sharing the code leading to your createStream?  Are you also 
setting group.idhttp://group.id/?

Thanks,

Sean


On Oct 10, 2014, at 4:31 PM, Abraham Jacob 
abe.jac...@gmail.commailto:abe.jac...@gmail.com wrote:

 Hi Folks,

 I am seeing some strange behavior when using the Spark Kafka connector in 
 Spark streaming.

 I have a Kafka topic which has 8 partitions. I have a kafka producer that 
 pumps some messages into this topic.

 On the consumer side I have a spark streaming application that that has 8 
 executors on 8 worker nodes and 8 ReceiverInputDStream with the same kafka 
 group id connected to the 8 partitions I have for the topic. Also the kafka 
 consumer property auto.offset.reset is set to smallest.


 Now here is the sequence of steps -

 (1) I Start the the spark streaming app.
 (2) Start the producer.

 As this point I see the messages that are being pumped from the producer in 
 Spark Streaming.  Then I -

 (1) Stopped the producer
 (2) Wait for all the message to be consumed.
 (2) Stopped the spark streaming app.

 Now when I restart the spark streaming app (note - the producer is still down 
 and no messages are being pumped into the topic) - I observe the following -

 (1) Spark Streaming starts reading from each partition right from the 
 beginning.


 This is not what I was expecting. I was expecting the consumers started by 
 spark streaming to start from where it left off

 Is my assumption not correct that the consumers (the kafka/spark connector) 
 to start reading from the topic where it last left off...?

 Has anyone else seen this behavior? Is there a way to make it such that it 
 starts from where it left off?

 Regards,
 - Abraham




--
~



Running Example in local mode with input files

2014-10-10 Thread maxpar
Hi folks, 

I have just upgraded to Spark 1.1.0, and try some examples like: 
./run-example SparkPageRank pagerank_data.txt 5 

It turns out that Spark keeps trying to connect to my name node and read the
file from HDFS other than local FS: 
Client: Retrying connect to server: Node1/192.168.0.101:9000. Already
tried 0 time(s) 

Even if I use file:// in my data file path, the issue still comes. This
does not happen in spark-shell. 

Is there anything I am missing in configurations or the way I specified the
path? 

Thanks, 

Max



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Running-Example-in-local-mode-with-input-files-tp16186.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



how to find the sources for spark-project

2014-10-10 Thread sadhan
We have our own customization on top of parquet serde that we've been using
for hive. In order to make it work with spark-sql, we need to be able to
re-build spark with this. It'll be much easier to rebuild spark with this
patch once I can find the sources for org.spark-project.hive. Not sure where
to find it ? This seems like the place where we need to put our patch.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/how-to-find-the-sources-for-spark-project-tp16187.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: where are my python lambda functions run in yarn-client mode?

2014-10-10 Thread Evan
Thank you!  I was looking for a config variable to that end, but I was 
looking in Spark 1.0.2 documentation, since that was the version I had 
the problem with.  Is this behavior documented in 1.0.2's documentation?


Evan


On 10/09/2014 04:12 PM, Davies Liu wrote:

When you call rdd.take() or rdd.first(), it may[1] executor the job
locally (in driver),
otherwise, all the jobs are executed in cluster.

There is config called `spark.localExecution.enabled` (since 1.1+) to
change this,
it's not enabled by default, so all the functions will be executed in cluster.
If you change set this to `true`, then you get the same behavior as 1.0.

[1] If it did not get enough items from the first partitions, it will
try multiple partitions
in a time, so they will be executed in cluster.

On Thu, Oct 9, 2014 at 12:14 PM, esamanas evan.sama...@gmail.com wrote:

Hi,

I am using pyspark and I'm trying to support both Spark 1.0.2 and 1.1.0 with
my app, which will run in yarn-client mode.  However, it appears when I use
'map' to run a python lambda function over an RDD, they appear to be run on
different machines, and this is causing problems.

In both cases, I am using a Hadoop cluster that runs linux on all of its
nodes.  I am submitting my jobs with a machine running Mac OS X 10.9.  As a
reproducer, here is my script:

import platform
print sc.parallelize([1]).map(lambda x: platform.system()).take(1)[0]

The answer in Spark 1.1.0:
'Linux'

The answer in Spark 1.0.2:
'Darwin'

In other experiments I changed the size of the list that gets parallelized,
thinking maybe 1.0.2 just runs jobs on the driver node if they're small
enough.  I got the same answer (with only 1 million numbers).

This is a troubling difference.  I would expect all functions run on an RDD
to be executed on my worker nodes in the Hadoop cluster, but this is clearly
not the case for 1.0.2.  Why does this difference exist?  How can I
accurately detect which jobs will run where?

Thank you,

Evan




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/where-are-my-python-lambda-functions-run-in-yarn-client-mode-tp16059.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



small bug in pyspark

2014-10-10 Thread Andy Davidson
Hi

I am running spark on an ec2 cluster. I need to update python to 2.7. I have
been following the directions on
http://nbviewer.ipython.org/gist/JoshRosen/6856670
https://issues.apache.org/jira/browse/SPARK-922



I noticed that when I start a shell using pyspark, I correctly got
python2.7, how ever when I tried to start a notebook I got python2.6





change

exec ipython $IPYTHON_OPTS

to

exec ipython2 $IPYTHON_OPTS



One clean way to resolve this would be to add another environmental variable
like PYSPARK_PYTHON



Andy





P.s. Matplotlab does not upgrade because of dependency problems. I¹ll let
you know once I get this resolved








RE: Spark Streaming KafkaUtils Issue

2014-10-10 Thread Shao, Saisai
Hi Abraham,

You are correct, the “auto.offset.reset“ behavior in KafkaReceiver is different 
from original Kafka’s semantics, if you set this configure, KafkaReceiver will 
clean the related immediately, but for Kafka this configuration is just a hint 
which will be effective only when offset is out-of-range. So you will always 
read data from the beginning as you set to “smallest”, otherwise if you set to 
“largest”, you will always get data from the end immediately.

There’s a JIRA and PR to follow this, but still not merged to the master, you 
can check to see it (https://issues.apache.org/jira/browse/SPARK-2492).

Thanks
Jerry

From: Abraham Jacob [mailto:abe.jac...@gmail.com]
Sent: Saturday, October 11, 2014 6:57 AM
To: Sean McNamara
Cc: user@spark.apache.org
Subject: Re: Spark Streaming KafkaUtils Issue

Probably this is the issue -

http://www.michael-noll.com/blog/2014/10/01/kafka-spark-streaming-integration-example-tutorial/


·Spark’s usage of the Kafka consumer parameter 
auto.offset.resethttp://kafka.apache.org/documentation.html#consumerconfigs 
is different from Kafka’s semantics. In Kafka, the behavior of setting 
auto.offset.reset to “smallest” is that the consumer will automatically reset 
the offset to the smallest offset when a) there is no existing offset stored in 
ZooKeeper or b) there is an existing offset but it is out of range. Spark 
however will always remove existing offsets and then start all the way from 
zero again. This means whenever you restart your application with 
auto.offset.reset = smallest, your application will completely re-process all 
available Kafka data. Doh! See this 
discussionhttp://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-and-the-spark-shell-tp3347p3387.htmland
 that discussionhttp://markmail.org/message/257a5l3oqyftsjxj.

Hmm interesting... Wondering what happens if I set it as largest...?


On Fri, Oct 10, 2014 at 3:47 PM, Abraham Jacob 
abe.jac...@gmail.commailto:abe.jac...@gmail.com wrote:
Sure... I do set the group.idhttp://group.id for all the consumers to be the 
same. Here is the code ---

SparkConf sparkConf = new 
SparkConf().setMaster(yarn-cluster).setAppName(Streaming WordCount);
sparkConf.set(spark.shuffle.manager, SORT);
sparkConf.set(spark.streaming.unpersist, true);
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new 
Duration(1000));
MapString, String kafkaConf = new HashMapString, String();
kafkaConf.put(zookeeper.connect, zookeeper);
kafkaConf.put(group.idhttp://group.id, consumerGrp);
kafkaConf.put(auto.offset.reset, smallest);
kafkaConf.put(zookeeper.conection.timeout.mshttp://zookeeper.conection.timeout.ms,
 1000);
kafkaConf.put(rebalance.max.retries, 4);
kafkaConf.put(rebalance.backoff.mshttp://rebalance.backoff.ms, 3000);
MapString, Integer topicMap = new HashMapString, Integer();
topicMap.put(topic, 1);
ListJavaPairDStreambyte[], String kafkaStreams = new 
ArrayListJavaPairDStreambyte[], String();
for(int i = 0; i  numPartitions; i++) {
kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, String.class,
DefaultDecoder.class, PayloadDeSerializer.class,
kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER()).mapToPair(new 
PairFunctionTuple2byte[],String, byte[], String() {

private static final long serialVersionUID = -1936810126415608167L;

public Tuple2byte[], String call(Tuple2byte[], String tuple2) throws 
Exception {
return tuple2;
}
}
)
);
}


JavaPairDStreambyte[], String unifiedStream;
if (kafkaStreams.size()  1) {
unifiedStream = jssc.union(kafkaStreams.get(0), kafkaStreams.subList(1, 
kafkaStreams.size()));
} else {
unifiedStream = kafkaStreams.get(0);
}
unifiedStream.print();
jssc.start();
jssc.awaitTermination();


-abe


On Fri, Oct 10, 2014 at 3:37 PM, Sean McNamara 
sean.mcnam...@webtrends.commailto:sean.mcnam...@webtrends.com wrote:
Would you mind sharing the code leading to your createStream?  Are you also 
setting group.idhttp://group.id?

Thanks,

Sean


On Oct 10, 2014, at 4:31 PM, Abraham Jacob 
abe.jac...@gmail.commailto:abe.jac...@gmail.com wrote:

 Hi Folks,

 I am seeing some strange behavior when using the Spark Kafka connector in 
 Spark streaming.

 I have a Kafka topic which has 8 partitions. I have a kafka producer that 
 pumps some messages into this topic.

 On the consumer side I have a spark streaming application that that has 8 
 executors on 8 worker nodes and 8 ReceiverInputDStream with the same kafka 
 group id connected to the 8 partitions I have for the topic. Also the kafka 
 consumer property auto.offset.reset is set to smallest.


 Now here is the sequence of steps -

 (1) I Start the the spark streaming app.
 (2) Start the producer.

 As this point I see the messages that are being pumped from the producer in 
 Spark Streaming.  Then I -

 (1) Stopped the producer
 (2) Wait for all the message to be consumed.
 (2) Stopped the spark streaming app.

 Now when I restart the spark streaming app (note - the 

Re: Spark Streaming KafkaUtils Issue

2014-10-10 Thread Abraham Jacob
Thanks Jerry, So, from what I can understand from the code, if I leave out
auto.offset.reset, it should theoretically read from the last commit
point... Correct?

-abe

On Fri, Oct 10, 2014 at 5:40 PM, Shao, Saisai saisai.s...@intel.com wrote:

  Hi Abraham,



 You are correct, the “auto.offset.reset“ behavior in KafkaReceiver is
 different from original Kafka’s semantics, if you set this configure,
 KafkaReceiver will clean the related immediately, but for Kafka this
 configuration is just a hint which will be effective only when offset is
 out-of-range. So you will always read data from the beginning as you set to
 “smallest”, otherwise if you set to “largest”, you will always get data
 from the end immediately.



 There’s a JIRA and PR to follow this, but still not merged to the master,
 you can check to see it (https://issues.apache.org/jira/browse/SPARK-2492
 ).



 Thanks

 Jerry



 *From:* Abraham Jacob [mailto:abe.jac...@gmail.com]
 *Sent:* Saturday, October 11, 2014 6:57 AM
 *To:* Sean McNamara
 *Cc:* user@spark.apache.org
 *Subject:* Re: Spark Streaming KafkaUtils Issue



 Probably this is the issue -




 http://www.michael-noll.com/blog/2014/10/01/kafka-spark-streaming-integration-example-tutorial/





 ·Spark’s usage of the Kafka consumer parameter auto.offset.reset
 http://kafka.apache.org/documentation.html#consumerconfigs is different
 from Kafka’s semantics. In Kafka, the behavior of setting
 auto.offset.reset to “smallest” is that the consumer will automatically
 reset the offset to the smallest offset when a) there is no existing offset
 stored in ZooKeeper or b) there is an existing offset but it is out of
 range. Spark however will *always* remove existing offsets and then start
 all the way from zero again. This means whenever you restart your
 application with auto.offset.reset = smallest, your application will
 completely re-process all available Kafka data. Doh! See this discussion
 http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-and-the-spark-shell-tp3347p3387.html
 and that discussion http://markmail.org/message/257a5l3oqyftsjxj.



 Hmm interesting... Wondering what happens if I set it as largest...?





 On Fri, Oct 10, 2014 at 3:47 PM, Abraham Jacob abe.jac...@gmail.com
 wrote:

  Sure... I do set the group.id for all the consumers to be the same. Here
 is the code ---



 SparkConf sparkConf = new
 SparkConf().setMaster(yarn-cluster).setAppName(Streaming WordCount);

 sparkConf.set(spark.shuffle.manager, SORT);

 sparkConf.set(spark.streaming.unpersist, true);

 JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new
 Duration(1000));

 MapString, String kafkaConf = new HashMapString, String();

 kafkaConf.put(zookeeper.connect, zookeeper);

 kafkaConf.put(group.id, consumerGrp);

 kafkaConf.put(auto.offset.reset, smallest);

 kafkaConf.put(zookeeper.conection.timeout.ms, 1000);

 kafkaConf.put(rebalance.max.retries, 4);

 kafkaConf.put(rebalance.backoff.ms, 3000);

 MapString, Integer topicMap = new HashMapString, Integer();

 topicMap.put(topic, 1);

 ListJavaPairDStreambyte[], String kafkaStreams = new
 ArrayListJavaPairDStreambyte[], String();

 for(int i = 0; i  numPartitions; i++) {

 kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, String.class,

 DefaultDecoder.class, PayloadDeSerializer.class,

 kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER()).mapToPair(new
 PairFunctionTuple2byte[],String, byte[], String() {



 private static final long serialVersionUID = -1936810126415608167L;



 public Tuple2byte[], String call(Tuple2byte[], String tuple2) throws
 Exception {

 return tuple2;

 }

 }

 )

 );

 }





 JavaPairDStreambyte[], String unifiedStream;

 if (kafkaStreams.size()  1) {

 unifiedStream = jssc.union(kafkaStreams.get(0), kafkaStreams.subList(1,
 kafkaStreams.size()));

 } else {

 unifiedStream = kafkaStreams.get(0);

 }

 unifiedStream.print();

 jssc.start();

 jssc.awaitTermination();





 -abe





 On Fri, Oct 10, 2014 at 3:37 PM, Sean McNamara 
 sean.mcnam...@webtrends.com wrote:

 Would you mind sharing the code leading to your createStream?  Are you
 also setting group.id?

 Thanks,

 Sean



 On Oct 10, 2014, at 4:31 PM, Abraham Jacob abe.jac...@gmail.com wrote:

  Hi Folks,
 
  I am seeing some strange behavior when using the Spark Kafka connector
 in Spark streaming.
 
  I have a Kafka topic which has 8 partitions. I have a kafka producer
 that pumps some messages into this topic.
 
  On the consumer side I have a spark streaming application that that has
 8 executors on 8 worker nodes and 8 ReceiverInputDStream with the same
 kafka group id connected to the 8 partitions I have for the topic. Also the
 kafka consumer property auto.offset.reset is set to smallest.
 
 
  Now here is the sequence of steps -
 
  (1) I Start the the spark streaming app.
  (2) Start the producer.
 
  As this point I see the messages that are being pumped from the 

Maryland Meetup

2014-10-10 Thread Brian Husted
Please add the Maryland Spark meetup to the Spark website

http://www.meetup.com/Apache-Spark-Maryland/

Thanks
Brian Husted


Re: Spark Streaming KafkaUtils Issue

2014-10-10 Thread Sean McNamara
This jira and comment sums up the issue:
https://issues.apache.org/jira/browse/SPARK-2492?focusedCommentId=14069708page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14069708

Basically the offset param was renamed and had slightly different semantics 
between kafka 0.7 than 0.8.  Also it was useful because earlier versions of the 
spark streaming receiver could be overwhelmed when having a streaming job down 
for a period of time.

I think this PR quite nicely addresses the issue:
https://github.com/apache/spark/pull/1420


Best,

Sean


On Oct 10, 2014, at 6:48 PM, Abraham Jacob abe.jac...@gmail.com wrote:

Thanks Jerry, So, from what I can understand from the code, if I leave out 
auto.offset.reset, it should theoretically read from the last commit point... 
Correct?

-abe

On Fri, Oct 10, 2014 at 5:40 PM, Shao, Saisai 
saisai.s...@intel.commailto:saisai.s...@intel.com wrote:
Hi Abraham,

You are correct, the “auto.offset.reset“ behavior in KafkaReceiver is different 
from original Kafka’s semantics, if you set this configure, KafkaReceiver will 
clean the related immediately, but for Kafka this configuration is just a hint 
which will be effective only when offset is out-of-range. So you will always 
read data from the beginning as you set to “smallest”, otherwise if you set to 
“largest”, you will always get data from the end immediately.

There’s a JIRA and PR to follow this, but still not merged to the master, you 
can check to see it (https://issues.apache.org/jira/browse/SPARK-2492).

Thanks
Jerry

From: Abraham Jacob [mailto:abe.jac...@gmail.commailto:abe.jac...@gmail.com]
Sent: Saturday, October 11, 2014 6:57 AM
To: Sean McNamara
Cc: user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: Spark Streaming KafkaUtils Issue

Probably this is the issue -

http://www.michael-noll.com/blog/2014/10/01/kafka-spark-streaming-integration-example-tutorial/


•Spark’s usage of the Kafka consumer parameter 
auto.offset.resethttp://kafka.apache.org/documentation.html#consumerconfigs 
is different from Kafka’s semantics. In Kafka, the behavior of setting 
auto.offset.reset to “smallest” is that the consumer will automatically reset 
the offset to the smallest offset when a) there is no existing offset stored in 
ZooKeeper or b) there is an existing offset but it is out of range. Spark 
however will always remove existing offsets and then start all the way from 
zero again. This means whenever you restart your application with 
auto.offset.reset = smallest, your application will completely re-process all 
available Kafka data. Doh! See this 
discussionhttp://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-and-the-spark-shell-tp3347p3387.htmland
 that discussionhttp://markmail.org/message/257a5l3oqyftsjxj.

Hmm interesting... Wondering what happens if I set it as largest...?


On Fri, Oct 10, 2014 at 3:47 PM, Abraham Jacob 
abe.jac...@gmail.commailto:abe.jac...@gmail.com wrote:
Sure... I do set the group.idhttp://group.id/ for all the consumers to be the 
same. Here is the code ---

SparkConf sparkConf = new 
SparkConf().setMaster(yarn-cluster).setAppName(Streaming WordCount);
sparkConf.set(spark.shuffle.manager, SORT);
sparkConf.set(spark.streaming.unpersist, true);
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new 
Duration(1000));
MapString, String kafkaConf = new HashMapString, String();
kafkaConf.put(zookeeper.connect, zookeeper);
kafkaConf.put(group.idhttp://group.id/, consumerGrp);
kafkaConf.put(auto.offset.reset, smallest);
kafkaConf.put(zookeeper.conection.timeout.mshttp://zookeeper.conection.timeout.ms/,
 1000);
kafkaConf.put(rebalance.max.retries, 4);
kafkaConf.put(rebalance.backoff.mshttp://rebalance.backoff.ms/, 3000);
MapString, Integer topicMap = new HashMapString, Integer();
topicMap.put(topic, 1);
ListJavaPairDStreambyte[], String kafkaStreams = new 
ArrayListJavaPairDStreambyte[], String();
for(int i = 0; i  numPartitions; i++) {
kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, String.class,
DefaultDecoder.class, PayloadDeSerializer.class,
kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER()).mapToPair(new 
PairFunctionTuple2byte[],String, byte[], String() {

private static final long serialVersionUID = -1936810126415608167L;

public Tuple2byte[], String call(Tuple2byte[], String tuple2) throws 
Exception {
return tuple2;
}
}
)
);
}


JavaPairDStreambyte[], String unifiedStream;
if (kafkaStreams.size()  1) {
unifiedStream = jssc.union(kafkaStreams.get(0), kafkaStreams.subList(1, 
kafkaStreams.size()));
} else {
unifiedStream = kafkaStreams.get(0);
}
unifiedStream.print();
jssc.start();
jssc.awaitTermination();


-abe


On Fri, Oct 10, 2014 at 3:37 PM, Sean McNamara 
sean.mcnam...@webtrends.commailto:sean.mcnam...@webtrends.com wrote:
Would you mind sharing the code leading to your createStream?  Are you also 
setting group.idhttp://group.id/?

Thanks,

Sean


On Oct 10, 2014, 

RE: Spark Streaming KafkaUtils Issue

2014-10-10 Thread Shao, Saisai
Hi abe,

You can see the details in KafkaInputDStream.scala, here is the snippet

// When auto.offset.reset is defined, it is our responsibility to try and 
whack the
// consumer group zk node.
if (kafkaParams.contains(auto.offset.reset)) {
 tryZookeeperConsumerGroupCleanup(zkConnect, kafkaParams(group.id))
}

KafkaReceiver will check your kafkaParams, if “auto.offset.reset” is set, it 
will clean ZK metadata immediately, so you will always read data from beginning 
(set to “smallest”) and end (set to “largest”) immediately, because the ZK 
metadata is deleted beforehand.

If you do not set this parameter, this code path will not be triggered, so data 
will be read from the last commit point. And if last commit point is not yet 
available, Kafka will move the offset to the end of partition (Kafka is set 
“auto.commit.offset” to “largest” by default).

If you want to keep the same semantics as Kafka, you need to remove the above 
code path manually and recompile the Spark.

Thanks
Jerry

From: Abraham Jacob [mailto:abe.jac...@gmail.com]
Sent: Saturday, October 11, 2014 8:49 AM
To: Shao, Saisai
Cc: user@spark.apache.org; Sean McNamara
Subject: Re: Spark Streaming KafkaUtils Issue

Thanks Jerry, So, from what I can understand from the code, if I leave out 
auto.offset.reset, it should theoretically read from the last commit point... 
Correct?

-abe

On Fri, Oct 10, 2014 at 5:40 PM, Shao, Saisai 
saisai.s...@intel.commailto:saisai.s...@intel.com wrote:
Hi Abraham,

You are correct, the “auto.offset.reset“ behavior in KafkaReceiver is different 
from original Kafka’s semantics, if you set this configure, KafkaReceiver will 
clean the related immediately, but for Kafka this configuration is just a hint 
which will be effective only when offset is out-of-range. So you will always 
read data from the beginning as you set to “smallest”, otherwise if you set to 
“largest”, you will always get data from the end immediately.

There’s a JIRA and PR to follow this, but still not merged to the master, you 
can check to see it (https://issues.apache.org/jira/browse/SPARK-2492).

Thanks
Jerry

From: Abraham Jacob [mailto:abe.jac...@gmail.commailto:abe.jac...@gmail.com]
Sent: Saturday, October 11, 2014 6:57 AM
To: Sean McNamara
Cc: user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: Spark Streaming KafkaUtils Issue

Probably this is the issue -

http://www.michael-noll.com/blog/2014/10/01/kafka-spark-streaming-integration-example-tutorial/


•Spark’s usage of the Kafka consumer parameter 
auto.offset.resethttp://kafka.apache.org/documentation.html#consumerconfigs 
is different from Kafka’s semantics. In Kafka, the behavior of setting 
auto.offset.reset to “smallest” is that the consumer will automatically reset 
the offset to the smallest offset when a) there is no existing offset stored in 
ZooKeeper or b) there is an existing offset but it is out of range. Spark 
however will always remove existing offsets and then start all the way from 
zero again. This means whenever you restart your application with 
auto.offset.reset = smallest, your application will completely re-process all 
available Kafka data. Doh! See this 
discussionhttp://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-and-the-spark-shell-tp3347p3387.htmland
 that discussionhttp://markmail.org/message/257a5l3oqyftsjxj.

Hmm interesting... Wondering what happens if I set it as largest...?


On Fri, Oct 10, 2014 at 3:47 PM, Abraham Jacob 
abe.jac...@gmail.commailto:abe.jac...@gmail.com wrote:
Sure... I do set the group.idhttp://group.id for all the consumers to be the 
same. Here is the code ---

SparkConf sparkConf = new 
SparkConf().setMaster(yarn-cluster).setAppName(Streaming WordCount);
sparkConf.set(spark.shuffle.manager, SORT);
sparkConf.set(spark.streaming.unpersist, true);
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new 
Duration(1000));
MapString, String kafkaConf = new HashMapString, String();
kafkaConf.put(zookeeper.connect, zookeeper);
kafkaConf.put(group.idhttp://group.id, consumerGrp);
kafkaConf.put(auto.offset.reset, smallest);
kafkaConf.put(zookeeper.conection.timeout.mshttp://zookeeper.conection.timeout.ms,
 1000);
kafkaConf.put(rebalance.max.retries, 4);
kafkaConf.put(rebalance.backoff.mshttp://rebalance.backoff.ms, 3000);
MapString, Integer topicMap = new HashMapString, Integer();
topicMap.put(topic, 1);
ListJavaPairDStreambyte[], String kafkaStreams = new 
ArrayListJavaPairDStreambyte[], String();
for(int i = 0; i  numPartitions; i++) {
kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, String.class,
DefaultDecoder.class, PayloadDeSerializer.class,
kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER()).mapToPair(new 
PairFunctionTuple2byte[],String, byte[], String() {

private static final long serialVersionUID = -1936810126415608167L;

public Tuple2byte[], String call(Tuple2byte[], String tuple2) throws 
Exception {
return tuple2;
}
}
)
);
}

Re: Spark Streaming KafkaUtils Issue

2014-10-10 Thread Abraham Jacob
Ah I see... much clearer now...

Because auto.offset.reset will trigger KafkaReciver to delete the ZK
metadata; when the control passes over to Kafka consumer API it will see
that there is no offset available for the partition. This then will trigger
the smallest or largest logic to execute in kafka, depending on what we
set for auto.offset.reset...

Thanks for explaining this clearly! Appreciate your effort.



On Fri, Oct 10, 2014 at 6:08 PM, Shao, Saisai saisai.s...@intel.com wrote:

  Hi abe,



 You can see the details in KafkaInputDStream.scala, here is the snippet



 // When auto.offset.reset is defined, it is our responsibility to try
 and whack the

 // consumer group zk node.

 if (kafkaParams.contains(auto.offset.reset)) {

  tryZookeeperConsumerGroupCleanup(zkConnect, kafkaParams(group.id))

 }



 KafkaReceiver will check your kafkaParams, if “auto.offset.reset” is set,
 it will clean ZK metadata immediately, so you will always read data from
 beginning (set to “smallest”) and end (set to “largest”) immediately,
 because the ZK metadata is deleted beforehand.



 If you do not set this parameter, this code path will not be triggered, so
 data will be read from the last commit point. And if last commit point is
 not yet available, Kafka will move the offset to the end of partition
 (Kafka is set “auto.commit.offset” to “largest” by default).



 If you want to keep the same semantics as Kafka, you need to remove the
 above code path manually and recompile the Spark.



 Thanks

 Jerry



 *From:* Abraham Jacob [mailto:abe.jac...@gmail.com]
 *Sent:* Saturday, October 11, 2014 8:49 AM
 *To:* Shao, Saisai
 *Cc:* user@spark.apache.org; Sean McNamara

 *Subject:* Re: Spark Streaming KafkaUtils Issue



 Thanks Jerry, So, from what I can understand from the code, if I leave out
 auto.offset.reset, it should theoretically read from the last commit
 point... Correct?



 -abe



 On Fri, Oct 10, 2014 at 5:40 PM, Shao, Saisai saisai.s...@intel.com
 wrote:

  Hi Abraham,



 You are correct, the “auto.offset.reset“ behavior in KafkaReceiver is
 different from original Kafka’s semantics, if you set this configure,
 KafkaReceiver will clean the related immediately, but for Kafka this
 configuration is just a hint which will be effective only when offset is
 out-of-range. So you will always read data from the beginning as you set to
 “smallest”, otherwise if you set to “largest”, you will always get data
 from the end immediately.



 There’s a JIRA and PR to follow this, but still not merged to the master,
 you can check to see it (https://issues.apache.org/jira/browse/SPARK-2492
 ).



 Thanks

 Jerry



 *From:* Abraham Jacob [mailto:abe.jac...@gmail.com]
 *Sent:* Saturday, October 11, 2014 6:57 AM
 *To:* Sean McNamara
 *Cc:* user@spark.apache.org
 *Subject:* Re: Spark Streaming KafkaUtils Issue



 Probably this is the issue -




 http://www.michael-noll.com/blog/2014/10/01/kafka-spark-streaming-integration-example-tutorial/





 ·Spark’s usage of the Kafka consumer parameter auto.offset.reset
 http://kafka.apache.org/documentation.html#consumerconfigs is different
 from Kafka’s semantics. In Kafka, the behavior of setting
 auto.offset.reset to “smallest” is that the consumer will automatically
 reset the offset to the smallest offset when a) there is no existing offset
 stored in ZooKeeper or b) there is an existing offset but it is out of
 range. Spark however will *always* remove existing offsets and then start
 all the way from zero again. This means whenever you restart your
 application with auto.offset.reset = smallest, your application will
 completely re-process all available Kafka data. Doh! See this discussion
 http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-and-the-spark-shell-tp3347p3387.html
 and that discussion http://markmail.org/message/257a5l3oqyftsjxj.



 Hmm interesting... Wondering what happens if I set it as largest...?





 On Fri, Oct 10, 2014 at 3:47 PM, Abraham Jacob abe.jac...@gmail.com
 wrote:

  Sure... I do set the group.id for all the consumers to be the same. Here
 is the code ---



 SparkConf sparkConf = new
 SparkConf().setMaster(yarn-cluster).setAppName(Streaming WordCount);

 sparkConf.set(spark.shuffle.manager, SORT);

 sparkConf.set(spark.streaming.unpersist, true);

 JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new
 Duration(1000));

 MapString, String kafkaConf = new HashMapString, String();

 kafkaConf.put(zookeeper.connect, zookeeper);

 kafkaConf.put(group.id, consumerGrp);

 kafkaConf.put(auto.offset.reset, smallest);

 kafkaConf.put(zookeeper.conection.timeout.ms, 1000);

 kafkaConf.put(rebalance.max.retries, 4);

 kafkaConf.put(rebalance.backoff.ms, 3000);

 MapString, Integer topicMap = new HashMapString, Integer();

 topicMap.put(topic, 1);

 ListJavaPairDStreambyte[], String kafkaStreams = new
 ArrayListJavaPairDStreambyte[], String();

 for(int i = 0; i  

What if I port Spark from TCP/IP to RDMA?

2014-10-10 Thread Theodore Si

Hi,

Let's say that I managed to port Spark from TCP/IP to RDMA.
What tool or benchmark can I use to test the performance improvement?

BR,
Theo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Window comparison matching using the sliding window functionality: feasibility

2014-10-10 Thread nitinkak001
Thanks @category_theory, the post was of great help!!

I had to learn a few thing before I could understand it completely.
However, I am facing the issue of partitioning the data (using partitionBy)
without providing a hardcoded value for number of partitions. The
partitions need to be driven by data(segmentation key I am using) in my
case.

So my question is say if

the number of partitions generated by my segmentation key = 1000
the number given to the partitioner = 2000

In this case, would there be 2000 partitions created(which will break the
partition boundary of the segmentation key)? If so then sliding window will
roll over multiple partitions and computation would generate wrong results.

Thanks again for the response!!

On Tue, Sep 30, 2014 at 11:51 AM, category_theory [via Apache Spark User
List] ml-node+s1001560n1540...@n3.nabble.com wrote:

 Not sure if this is what you are after but its based on a moving average
 within spark...  I was building an ARIMA model on top of spark and this
 helped me out a lot:

 http://stackoverflow.com/questions/23402303/apache-spark-moving-average
 ᐧ




 *JIMMY MCERLAIN*

 DATA SCIENTIST (NERD)

 *. . . . . . . . . . . . . . . . . .*


 *IF WE CAN’T DOUBLE YOUR SALES,*



 *ONE OF US IS IN THE WRONG BUSINESS.*

 *E*: [hidden email] http://user/SendEmail.jtp?type=nodenode=15407i=0


 *M*: *510.303.7751 510.303.7751*

 On Tue, Sep 30, 2014 at 8:19 AM, nitinkak001 [hidden email]
 http://user/SendEmail.jtp?type=nodenode=15407i=1 wrote:

 Any ideas guys?

 Trying to find some information online. Not much luck so far.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Window-comparison-matching-using-the-sliding-window-functionality-feasibility-tp15352p15404.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: [hidden email]
 http://user/SendEmail.jtp?type=nodenode=15407i=2
 For additional commands, e-mail: [hidden email]
 http://user/SendEmail.jtp?type=nodenode=15407i=3




 --
  If you reply to this email, your message will be added to the discussion
 below:

 http://apache-spark-user-list.1001560.n3.nabble.com/Window-comparison-matching-using-the-sliding-window-functionality-feasibility-tp15352p15407.html
  To unsubscribe from Window comparison matching using the sliding window
 functionality: feasibility, click here
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=15352code=bml0aW5rYWswMDFAZ21haWwuY29tfDE1MzUyfDEyMjcwMjA2NQ==
 .
 NAML
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Window-comparison-matching-using-the-sliding-window-functionality-feasibility-tp15352p16201.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: add Boulder-Denver Spark meetup to list on website

2014-10-10 Thread Matei Zaharia
Added you, thanks! (You may have to shift-refresh the page to see it updated).

Matei

On Oct 10, 2014, at 1:52 PM, Michael Oczkowski michael.oczkow...@seeq.com 
wrote:

 Please add the Boulder-Denver Spark meetup group to the list on the website.
 http://www.meetup.com/Boulder-Denver-Spark-Meetup/
  
  
 Michael Oczkowski, Ph.D.
 Big Data Architect  Chief Data Scientist 
 Seeq Corporation
 206-801-9339 x704
 image001.png
 Transforming industrial process data into actionable intelligence
 Connect: LinkedIn | Facebook | Twitter



Re: IOException and appcache FileNotFoundException in Spark 1.02

2014-10-10 Thread Ilya Ganelin
Hi Akhil - I tried your suggestions and tried varying my partition sizes.
Reducing the number of partitions led to memory errors (presumably - I saw
IOExceptions much sooner).

With the settings you provided the program ran for longer but ultimately
crashes in the same way. I would like to understand what is going on
internally leading to this.

Could this be related to garbage collection?
On Oct 10, 2014 3:19 AM, Akhil Das ak...@sigmoidanalytics.com wrote:

 You could be hitting this issue
 https://issues.apache.org/jira/browse/SPARK-3633 (or similar). You can
 try the following workarounds:

 sc.set(spark.core.connection.ack.wait.timeout,600)
 sc.set(spark.akka.frameSize,50)
 Also reduce the number of partitions, you could be hitting the kernel's
 ulimit. I faced this issue and it was gone when i dropped the partitions
 from 1600 to 200.

 Thanks
 Best Regards

 On Fri, Oct 10, 2014 at 5:58 AM, Ilya Ganelin ilgan...@gmail.com wrote:

 Hi all – I could use some help figuring out a couple of exceptions I’ve
 been getting regularly.

 I have been running on a fairly large dataset (150 gigs). With smaller
 datasets I don't have any issues.

 My sequence of operations is as follows – unless otherwise specified, I
 am not caching:

 Map a 30 million row x 70 col string table to approx 30 mil x  5 string
 (For read as textFile I am using 1500 partitions)

 From that, map to ((a,b), score) and reduceByKey, numPartitions = 180

 Then, extract distinct values for A and distinct values for B. (I cache
 the output of distinct), , numPartitions = 180

 Zip with index for A and for B (to remap strings to int)

 Join remapped ids with original table

 This is then fed into MLLIBs ALS algorithm.

 I am running with:

 Spark version 1.02 with CDH5.1

 numExecutors = 8, numCores = 14

 Memory = 12g

 MemoryFration = 0.7

 KryoSerialization

 My issue is that the code runs fine for a while but then will
 non-deterministically crash with either file IOExceptions or the following
 obscure error:

 14/10/08 13:29:59 INFO TaskSetManager: Loss was due to
 java.io.IOException: Filesystem closed [duplicate 10]

 14/10/08 13:30:08 WARN TaskSetManager: Loss was due to
 java.io.FileNotFoundException

 java.io.FileNotFoundException:
 /opt/cloudera/hadoop/1/yarn/nm/usercache/zjb238/appcache/application_1412717093951_0024/spark-local-20141008131827-c082/1c/shuffle_3_117_354
 (No such file or directory)

 Looking through the logs, I see the IOException in other places but it
 appears to be non-catastrophic. The FileNotFoundException, however, is. I
 have found the following stack overflow that at least seems to address the
 IOException:


 http://stackoverflow.com/questions/24038908/spark-fails-on-big-shuffle-jobs-with-java-io-ioexception-filesystem-closed

 But I have not found anything useful at all with regards to the app cache
 error.

 Any help would be much appreciated.





Re: Breaking the previous large-scale sort record with Spark

2014-10-10 Thread Ilya Ganelin
Hi Matei - I read your post with great interest. Could you possibly comment
in more depth on some of the issues you guys saw when scaling up spark and
how you resolved them? I am interested specifically in spark-related
problems. I'm working on scaling up spark to very large datasets and have
been running into a variety of issues. Thanks in advance!
On Oct 10, 2014 10:54 AM, Matei Zaharia matei.zaha...@gmail.com wrote:

 Hi folks,

 I interrupt your regularly scheduled user / dev list to bring you some
 pretty cool news for the project, which is that we've been able to use
 Spark to break MapReduce's 100 TB and 1 PB sort records, sorting data 3x
 faster on 10x fewer nodes. There's a detailed writeup at
 http://databricks.com/blog/2014/10/10/spark-breaks-previous-large-scale-sort-record.html.
 Summary: while Hadoop MapReduce held last year's 100 TB world record by
 sorting 100 TB in 72 minutes on 2100 nodes, we sorted it in 23 minutes on
 206 nodes; and we also scaled up to sort 1 PB in 234 minutes.

 I want to thank Reynold Xin for leading this effort over the past few
 weeks, along with Parviz Deyhim, Xiangrui Meng, Aaron Davidson and Ali
 Ghodsi. In addition, we'd really like to thank Amazon's EC2 team for
 providing the machines to make this possible. Finally, this result would of
 course not be possible without the many many other contributions, testing
 and feature requests from throughout the community.

 For an engine to scale from these multi-hour petabyte batch jobs down to
 100-millisecond streaming and interactive queries is quite uncommon, and
 it's thanks to all of you folks that we are able to make this happen.

 Matei
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Debug Spark in Cluster Mode

2014-10-10 Thread Ilya Ganelin
I would also be interested in knowing more about this. I have used the
cloudera manager and the spark resource interface (clientnode:4040) but
would love to know if there are other tools out there - either for post
processing or better observation during execution.
On Oct 9, 2014 4:50 PM, Rohit Pujari rpuj...@hortonworks.com wrote:

 Hello Folks:

 What're some best practices to debug Spark in cluster mode?


 Thanks,
 Rohit

 CONFIDENTIALITY NOTICE
 NOTICE: This message is intended for the use of the individual or entity
 to which it is addressed and may contain information that is confidential,
 privileged and exempt from disclosure under applicable law. If the reader
 of this message is not the intended recipient, you are hereby notified that
 any printing, copying, dissemination, distribution, disclosure or
 forwarding of this communication is strictly prohibited. If you have
 received this communication in error, please contact the sender immediately
 and delete it from your system. Thank You.


Re: Spark SQL parser bug?

2014-10-10 Thread Cheng Lian
Hmm, there is a “T” in the timestamp string, which makes the string not 
a valid timestamp string representation. Internally Spark SQL uses 
|java.sql.Timestamp.valueOf| to cast a string to a timestamp.


On 10/11/14 2:08 AM, Mohammed Guller wrote:


scala rdd.registerTempTable(x)

scala val sRdd = sqlContext.sql(select a from x where ts = 
'2012-01-01*T*00:00:00';)


sRdd: org.apache.spark.sql.SchemaRDD =

SchemaRDD[4] at RDD at SchemaRDD.scala:103

== Query Plan ==

== Physical Plan ==

Project [a#0]

ExistingRdd [a#0,ts#1], MapPartitionsRDD[6] at mapPartitions at 
basicOperators.scala:208


scala sRdd.collect

res2: Array[org.apache.spark.sql.Row] = Array()


​


RDD size in memory - Array[String] vs. case classes

2014-10-10 Thread Liam Clarke-Hutchinson
Hi all,

I'm playing with Spark currently as a possible solution at work, and I've
been recently working out a rough correlation between our input data size
and RAM needed to cache an RDD that will be used multiple times in a job.

As part of this I've been trialling different methods of representing the
data, and I came across a result that surprised me, so I just wanted to
check what I was seeing.

So my data set is comprised of CSV with appx. 17 fields. When I load my
sample data set locally, and cache it after splitting on the comma as an
RDD[Array[String]], the Spark UI shows 8% of the RDD can be cached in
available RAM.

When I cache it as an RDD of a case class, 11% of the RDD is cacheable, so
case classes are actually taking up less serialized space than an array.

Is it because case class represents numbers as numbers, as opposed to the
string array keeping them as strings?

Cheers,

Liam Clarke


Re: where are my python lambda functions run in yarn-client mode?

2014-10-10 Thread Davies Liu
This is some kind of implementation details, so not documented :-(

If you think this is a blocker for you, you could create a JIRA, maybe
it's could be fixed in 1.0.3+.

Davies

On Fri, Oct 10, 2014 at 5:11 PM, Evan evan.sama...@gmail.com wrote:
 Thank you!  I was looking for a config variable to that end, but I was
 looking in Spark 1.0.2 documentation, since that was the version I had the
 problem with.  Is this behavior documented in 1.0.2's documentation?

 Evan

 On 10/09/2014 04:12 PM, Davies Liu wrote:

 When you call rdd.take() or rdd.first(), it may[1] executor the job
 locally (in driver),
 otherwise, all the jobs are executed in cluster.

 There is config called `spark.localExecution.enabled` (since 1.1+) to
 change this,
 it's not enabled by default, so all the functions will be executed in
 cluster.
 If you change set this to `true`, then you get the same behavior as 1.0.

 [1] If it did not get enough items from the first partitions, it will
 try multiple partitions
 in a time, so they will be executed in cluster.

 On Thu, Oct 9, 2014 at 12:14 PM, esamanas evan.sama...@gmail.com wrote:

 Hi,

 I am using pyspark and I'm trying to support both Spark 1.0.2 and 1.1.0
 with
 my app, which will run in yarn-client mode.  However, it appears when I
 use
 'map' to run a python lambda function over an RDD, they appear to be run
 on
 different machines, and this is causing problems.

 In both cases, I am using a Hadoop cluster that runs linux on all of its
 nodes.  I am submitting my jobs with a machine running Mac OS X 10.9.  As
 a
 reproducer, here is my script:

 import platform
 print sc.parallelize([1]).map(lambda x: platform.system()).take(1)[0]

 The answer in Spark 1.1.0:
 'Linux'

 The answer in Spark 1.0.2:
 'Darwin'

 In other experiments I changed the size of the list that gets
 parallelized,
 thinking maybe 1.0.2 just runs jobs on the driver node if they're small
 enough.  I got the same answer (with only 1 million numbers).

 This is a troubling difference.  I would expect all functions run on an
 RDD
 to be executed on my worker nodes in the Hadoop cluster, but this is
 clearly
 not the case for 1.0.2.  Why does this difference exist?  How can I
 accurately detect which jobs will run where?

 Thank you,

 Evan




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/where-are-my-python-lambda-functions-run-in-yarn-client-mode-tp16059.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org