Re: MLUtil.kfold generates overlapped training and validation set?
1. No. 2. The seed per partition is fixed. So it should generate non-overlapping subsets. 3. There was a bug in 1.0, which was fixed in 1.0.1 and 1.1. Best, Xiangrui On Thu, Oct 9, 2014 at 11:05 AM, Nan Zhu zhunanmcg...@gmail.com wrote: Hi, all When we use MLUtils.kfold to generate training and validation set for cross validation we found that there is overlapped part in two sets…. from the code, it does sampling for twice for the same dataset @Experimental def kFold[T: ClassTag](rdd: RDD[T], numFolds: Int, seed: Int): Array[(RDD[T], RDD[T])] = { val numFoldsF = numFolds.toFloat (1 to numFolds).map { fold = val sampler = new BernoulliSampler[T]((fold - 1) / numFoldsF, fold / numFoldsF, complement = false) val validation = new PartitionwiseSampledRDD(rdd, sampler, true, seed) val training = new PartitionwiseSampledRDD(rdd, sampler.cloneComplement(), true, seed) (training, validation) }.toArray } the sampler is complement, there is still possibility to generate overlapped training and validation set because the sampling method looks like : override def sample(items: Iterator[T]): Iterator[T] = { items.filter { item = val x = rng.nextDouble() (x = lb x ub) ^ complement } } I’m not a machine learning guy, so I guess I must fall into one of the following three situations 1. does it mean actually we allow overlapped training and validation set ? (counter intuitive to me) 2. I had some misunderstanding on the code? 3. it’s a bug? Anyone can explain it to me? Best, -- Nan Zhu - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Help with using combineByKey
Maybe this version is easier to use: plist.mapValues((v) = (if (v 0) 1 else 0, 1)).reduceByKey((x, y) = (x._1 + y._1, x._2 + y._2)) It has similar behavior with combineByKey(), will by faster than groupByKey() version. On Thu, Oct 9, 2014 at 9:28 PM, HARIPRIYA AYYALASOMAYAJULA aharipriy...@gmail.com wrote: Sean, Thank you. It works. But I am still confused about the function. Can you kindly throw some light on it? I was going through the example mentioned in https://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch04.html Is there any better source through which I can learn more about these functions? It would be helpful if I can get a chance to look at more examples. Also, I assume using combineByKey helps us solve it parallel than using simple functions provided by scala as mentioned by Yana. Am I correct? On Thu, Oct 9, 2014 at 12:30 PM, Sean Owen so...@cloudera.com wrote: Oh duh, sorry. The initialization should of course be (v) = (if (v 0) 1 else 0, 1) This gives the answer you are looking for. I don't see what Part2 is supposed to do differently. On Thu, Oct 9, 2014 at 6:14 PM, HARIPRIYA AYYALASOMAYAJULA aharipriy...@gmail.com wrote: Hello Sean, Thank you, but changing from v to 1 doesn't help me either. I am trying to count the number of non-zero values using the first accumulator. val newlist = List ((LAX,6), (LAX,0), (LAX,7), (SFO,0), (SFO,0), (SFO,9)) val plist = sc.parallelize(newlist) val part1 = plist.combineByKey( (v) = (1, 1), (acc: (Int, Int), v) = ( if(v 0) acc._1 + 1 else acc._1, acc._2 + 1), (acc1: (Int, Int), acc2: (Int, Int)) = (acc1._1 + acc2._1, acc1._2 + acc2._2) ) val Part2 = part1.map{ case (key, value) = (key, (value._1,value._2)) } This should give me the result (LAX,(2,3)) (SFO,(1,3)) On Thu, Oct 9, 2014 at 11:48 AM, Sean Owen so...@cloudera.com wrote: You have a typo in your code at var acc:, and the map from opPart1 to opPart2 looks like a no-op, but those aren't the problem I think. It sounds like you intend the first element of each pair to be a count of nonzero values, but you initialize the first element of the pair to v, not 1, in v = (v,1). Try v = (1,1) On Thu, Oct 9, 2014 at 4:47 PM, HARIPRIYA AYYALASOMAYAJULA aharipriy...@gmail.com wrote: I am a beginner to Spark and finding it difficult to implement a very simple reduce operation. I read that is ideal to use combineByKey for complex reduce operations. My input: val input = sc.parallelize(List((LAX,6), (LAX,8), (LAX,7), (SFO,0), (SFO,1), (SFO,9),(PHX,65),(PHX,88),(KX,7),(KX,6),(KX,1), (KX,9), (HOU,56),(HOU,5),(HOU,59),(HOU,0),(MA,563),(MA,545),(MA,5),(MA,0),(MA,0))) val opPart1 = input.combineByKey( (v) = (v, 1), (var acc: (Int, Int), v) = ( if(v 0) acc._1 + 1 else acc._1, acc._2 + 1), (acc1: (Int, Int), acc2: (Int, Int)) = (acc1._1 + acc2._1, acc1._2 + acc2._2) ) val opPart2 = opPart1.map{ case (key, value) = (key, (value._1,value._2)) } opPart2.collectAsMap().map(println(_)) If the value is greater than 0, the first accumulator should be incremented by 1, else it remains the same. The second accumulator is a simple counter for each value. I am getting an incorrect output (garbage values )for the first accumulator. Please help. The equivalent reduce operation in Hadoop MapReduce is : public static class PercentageCalcReducer extends ReducerText,IntWritable,Text,FloatWritable { private FloatWritable pdelay = new FloatWritable(); public void reduce(Text key, IterableIntWritable values,Context context)throws IOException,InterruptedException { int acc2=0; float frac_delay, percentage_delay; int acc1=0; for(IntWritable val : values) { if(val.get() 0) { acc1++; } acc2++; } frac_delay = (float)acc1/acc2; percentage_delay = frac_delay * 100 ; pdelay.set(percentage_delay); context.write(key,pdelay); } } Please help. Thank you for your time. -- Regards, Haripriya Ayyalasomayajula contact : 650-796-7112 -- Regards, Haripriya Ayyalasomayajula contact : 650-796-7112 -- Regards, Haripriya Ayyalasomayajula contact : 650-796-7112 - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to save ReceiverInputDStream to Hadoop using saveAsNewAPIHadoopFile
You can convert this ReceiverInputDStream http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.dstream.ReceiverInputDStream into PairRDDFuctions http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions and call the saveAsNewAPIHadoopFile. Thanks Best Regards On Fri, Oct 10, 2014 at 11:28 AM, Buntu Dev buntu...@gmail.com wrote: Basically I'm attempting to convert a JSON stream to Parquet and I get this error without the .values or .map(_._2) : value saveAsNewAPIHadoopFile is not a member of org.apache.spark.streaming.dstream.ReceiverInputDStream[(String, String)] On Thu, Oct 9, 2014 at 10:15 PM, Sean Owen so...@cloudera.com wrote: Your RDD does not contain pairs, since you .map(_._2) (BTW that can just be .values). Hadoop files means SequenceFiles and those store key-value pairs. That's why the method only appears for RDD[(K,V)]. On Fri, Oct 10, 2014 at 3:50 AM, Buntu Dev buntu...@gmail.com wrote: Thanks Sean, but I'm importing org.apache.spark.streaming.StreamingContext._ Here are the spark imports: import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.kafka._ import org.apache.spark.SparkConf val stream = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2) stream.saveAsNewAPIHadoopFile (destination, classOf[Void], classOf[Group], classOf[ExampleOutputFormat], conf) Anything else I might be missing?
How to patch sparkSQL on EC2?
Hi I have written a few extensions for sparkSQL (for version 1.1.0) and I am trying to deploy my new jar files (one for catalyst and one for sql/core) on ec2. My approach was to create a new spark/lib/spark-assembly-1.1.0-hadoop1.0.4.jar that merged the contents of the old one with the contents of my new jar files and I propagated the changes to workers. However when I tried the code snippet below I received the error message that I paste at the end of this email. I was wondering, do you guys have any suggestions on how to fix this? thanks Christos the code is: import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.sql.SQLContext val sqlContext = new SQLContext(sc) the error message is: error: bad symbolic reference. A signature in package.class refers to term scalalogging in package com.typesafe which is not available. It may be completely missing from the current classpath, or the version on the classpath might be incompatible with the version used when compiling package.class. console:14: error: bad symbolic reference. A signature in package.class refers to term slf4j in value com.typesafe.scalalogging which is not available. It may be completely missing from the current classpath, or the version on the classpath might be incompatible with the version used when compiling package.class. val sqlContext = new SQLContext(sc)
Re: IOException and appcache FileNotFoundException in Spark 1.02
You could be hitting this issue https://issues.apache.org/jira/browse/SPARK-3633 (or similar). You can try the following workarounds: sc.set(spark.core.connection.ack.wait.timeout,600) sc.set(spark.akka.frameSize,50) Also reduce the number of partitions, you could be hitting the kernel's ulimit. I faced this issue and it was gone when i dropped the partitions from 1600 to 200. Thanks Best Regards On Fri, Oct 10, 2014 at 5:58 AM, Ilya Ganelin ilgan...@gmail.com wrote: Hi all – I could use some help figuring out a couple of exceptions I’ve been getting regularly. I have been running on a fairly large dataset (150 gigs). With smaller datasets I don't have any issues. My sequence of operations is as follows – unless otherwise specified, I am not caching: Map a 30 million row x 70 col string table to approx 30 mil x 5 string (For read as textFile I am using 1500 partitions) From that, map to ((a,b), score) and reduceByKey, numPartitions = 180 Then, extract distinct values for A and distinct values for B. (I cache the output of distinct), , numPartitions = 180 Zip with index for A and for B (to remap strings to int) Join remapped ids with original table This is then fed into MLLIBs ALS algorithm. I am running with: Spark version 1.02 with CDH5.1 numExecutors = 8, numCores = 14 Memory = 12g MemoryFration = 0.7 KryoSerialization My issue is that the code runs fine for a while but then will non-deterministically crash with either file IOExceptions or the following obscure error: 14/10/08 13:29:59 INFO TaskSetManager: Loss was due to java.io.IOException: Filesystem closed [duplicate 10] 14/10/08 13:30:08 WARN TaskSetManager: Loss was due to java.io.FileNotFoundException java.io.FileNotFoundException: /opt/cloudera/hadoop/1/yarn/nm/usercache/zjb238/appcache/application_1412717093951_0024/spark-local-20141008131827-c082/1c/shuffle_3_117_354 (No such file or directory) Looking through the logs, I see the IOException in other places but it appears to be non-catastrophic. The FileNotFoundException, however, is. I have found the following stack overflow that at least seems to address the IOException: http://stackoverflow.com/questions/24038908/spark-fails-on-big-shuffle-jobs-with-java-io-ioexception-filesystem-closed But I have not found anything useful at all with regards to the app cache error. Any help would be much appreciated.
Re: Spark in cluster [ remote.EndpointWriter: AssociationError]
Can you paste your spark-env.sh file? Looks like you have it misconfigured. Thanks Best Regards On Fri, Oct 10, 2014 at 1:43 AM, Morbious knowledgefromgro...@gmail.com wrote: Hi, Recently I've configured spark in cluster with zookeper. I have 2 masters ( active/standby) and 6 workers. I've begun my installation with samples from example directory. Everything worked fine when I only used memory . When I used word count example I got messages like the ones below: 14/10/09 19:37:19 ERROR remote.EndpointWriter: AssociationError [akka.tcp://sparkwor...@spark-slave1.domain.org:7078] - [akka.tcp:// sparkexecu...@spark-slave1.domain.org:53757]: Error [Association failed with [akka.tcp://sparkexecu...@spark-slave1.domain.org:53757]] [ akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkexecu...@spark-slave1.domain.org:53757] Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: Connection refused: spark-slave1.domain.org/10.0.6.228:53757 ] 14/10/09 19:37:19 ERROR remote.EndpointWriter: AssociationError [akka.tcp://sparkwor...@spark-slave1.domain.org:7078] - [akka.tcp:// sparkexecu...@spark-slave1.domain.org:53757]: Error [Association failed with [akka.tcp://sparkexecu...@spark-slave1.domain.org:53757]] [ akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkexecu...@spark-slave1.domain.org:53757] Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: Connection refused: spark-slave1.domain.org/10.0.6.228:53757 ] I'm a little confused because I can't find any solution to my problem. I use Cloudera hadoop with spark. Best regards, Morbious
Delayed hotspot optimizations in Spark
Hello spark users and developers! I am using hdfs + spark sql + hive schema + parquet as storage format. I have lot of parquet files - one files fits one hdfs block for one day. The strange thing is very slow first query for spark sql. To reproduce situation I use only one core and I have 97sec for first time and only 13sec for all next queries. Sure I query for different data, but it has same structure and size. The situation can be reproduced after restart thrift server. Here it information about parquet files reading from worker node: Slow one: Oct 10, 2014 2:26:53 PM INFO: parquet.hadoop.InternalParquetRecordReader: Assembled and processed 1560251 records from 30 columns in 11686 ms: 133.51454 rec/ms, 4005.4363 cell/ms Fast one: Oct 10, 2014 2:31:30 PM INFO: parquet.hadoop.InternalParquetRecordReader: Assembled and processed 1568899 records from 1 columns in 1373 ms: 1142.6796 rec/ms, 1142.6796 cell/ms As you can see second reading is 10x times faster then first. Most of the query time spent to work with parquet file. This problem is really annoying, because most of my spark task contains just 1 sql query and data processing and to speedup my jobs I put special warmup query in from of any job. My assumption is that it is hotspot optimizations that used due first reading. Do you have any idea how to confirm/solve this performance problem? Thanks for advice! p.s. I have billion hotspot optimization showed with -XX:+PrintCompilation but can not figure out what are important and what are not.
Re: Delayed hotspot optimizations in Spark
You could try setting -Xcomp for executors to force JIT compilation upfront. I don't know if it's a good idea overall but might show whether the upfront compilation really helps. I doubt it. However is this almost surely due to caching somewhere, in Spark SQL or HDFS? I really doubt hotspot makes a difference compared to these much larger factors. On Fri, Oct 10, 2014 at 8:49 AM, Alexey Romanchuk alexey.romanc...@gmail.com wrote: Hello spark users and developers! I am using hdfs + spark sql + hive schema + parquet as storage format. I have lot of parquet files - one files fits one hdfs block for one day. The strange thing is very slow first query for spark sql. To reproduce situation I use only one core and I have 97sec for first time and only 13sec for all next queries. Sure I query for different data, but it has same structure and size. The situation can be reproduced after restart thrift server. Here it information about parquet files reading from worker node: Slow one: Oct 10, 2014 2:26:53 PM INFO: parquet.hadoop.InternalParquetRecordReader: Assembled and processed 1560251 records from 30 columns in 11686 ms: 133.51454 rec/ms, 4005.4363 cell/ms Fast one: Oct 10, 2014 2:31:30 PM INFO: parquet.hadoop.InternalParquetRecordReader: Assembled and processed 1568899 records from 1 columns in 1373 ms: 1142.6796 rec/ms, 1142.6796 cell/ms As you can see second reading is 10x times faster then first. Most of the query time spent to work with parquet file. This problem is really annoying, because most of my spark task contains just 1 sql query and data processing and to speedup my jobs I put special warmup query in from of any job. My assumption is that it is hotspot optimizations that used due first reading. Do you have any idea how to confirm/solve this performance problem? Thanks for advice! p.s. I have billion hotspot optimization showed with -XX:+PrintCompilation but can not figure out what are important and what are not. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Delayed hotspot optimizations in Spark
Hey Sean and spark users! Thanks for reply. I try -Xcomp right now and start time was about few minutes (as expected), but I got first query slow as before: Oct 10, 2014 3:03:41 PM INFO: parquet.hadoop.InternalParquetRecordReader: Assembled and processed 1568899 records from 30 columns in 12897 ms: 121.64837 rec/ms, 3649.451 cell/ms and next Oct 10, 2014 3:05:03 PM INFO: parquet.hadoop.InternalParquetRecordReader: Assembled and processed 1568899 records from 1 columns in 1757 ms: 892.94196 rec/ms, 892.94196 cell/ms I have no idea about caching or other stuff because CPU load is 100% on worker and jstack show that worker is reading from parquet file. Any ideas? Thanks! On Fri, Oct 10, 2014 at 2:55 PM, Sean Owen so...@cloudera.com wrote: You could try setting -Xcomp for executors to force JIT compilation upfront. I don't know if it's a good idea overall but might show whether the upfront compilation really helps. I doubt it. However is this almost surely due to caching somewhere, in Spark SQL or HDFS? I really doubt hotspot makes a difference compared to these much larger factors. On Fri, Oct 10, 2014 at 8:49 AM, Alexey Romanchuk alexey.romanc...@gmail.com wrote: Hello spark users and developers! I am using hdfs + spark sql + hive schema + parquet as storage format. I have lot of parquet files - one files fits one hdfs block for one day. The strange thing is very slow first query for spark sql. To reproduce situation I use only one core and I have 97sec for first time and only 13sec for all next queries. Sure I query for different data, but it has same structure and size. The situation can be reproduced after restart thrift server. Here it information about parquet files reading from worker node: Slow one: Oct 10, 2014 2:26:53 PM INFO: parquet.hadoop.InternalParquetRecordReader: Assembled and processed 1560251 records from 30 columns in 11686 ms: 133.51454 rec/ms, 4005.4363 cell/ms Fast one: Oct 10, 2014 2:31:30 PM INFO: parquet.hadoop.InternalParquetRecordReader: Assembled and processed 1568899 records from 1 columns in 1373 ms: 1142.6796 rec/ms, 1142.6796 cell/ms As you can see second reading is 10x times faster then first. Most of the query time spent to work with parquet file. This problem is really annoying, because most of my spark task contains just 1 sql query and data processing and to speedup my jobs I put special warmup query in from of any job. My assumption is that it is hotspot optimizations that used due first reading. Do you have any idea how to confirm/solve this performance problem? Thanks for advice! p.s. I have billion hotspot optimization showed with -XX:+PrintCompilation but can not figure out what are important and what are not.
Re: Spark SQL - Exception only when using cacheTable
Can you try checking whether the table is being cached? You can use isCached method. More details are here - http://spark.apache.org/docs/1.0.2/api/java/org/apache/spark/sql/SQLContext.html -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Exception-only-when-using-cacheTable-tp16031p16123.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Can I run examples on cluster?
Hi all, I want to use two nodes for test, one as master, the other worker. Can I submit the example application included in Spark source code tarball on master to let it run on the worker? What should I do? BR, Theo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Delayed hotspot optimizations in Spark
Hi Could it be due to GC ? I read it may happen if your program starts with a small heap. What are your -Xms and -Xmx values ? Print GC stats with -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps Guillaume Hello spark users and developers! I am using hdfs + spark sql + hive schema + parquet as storage format. I have lot of parquet files - one files fits one hdfs block for one day. The strange thing is very slow first query for spark sql. To reproduce situation I use only one core and I have 97sec for first time and only 13sec for all next queries. Sure I query for different data, but it has same structure and size. The situation can be reproduced after restart thrift server. Here it information about parquet files reading from worker node: Slow one: Oct 10, 2014 2:26:53 PM INFO: parquet.hadoop.InternalParquetRecordReader: Assembled and processed 1560251 records from 30 columns in 11686 ms: 133.51454 rec/ms, 4005.4363 cell/ms Fast one: Oct 10, 2014 2:31:30 PM INFO: parquet.hadoop.InternalParquetRecordReader: Assembled and processed 1568899 records from 1 columns in 1373 ms: 1142.6796 rec/ms, 1142.6796 cell/ms As you can see second reading is 10x times faster then first. Most of the query time spent to work with parquet file. This problem is really annoying, because most of my spark task contains just 1 sql query and data processing and to speedup my jobs I put special warmup query in from of any job. My assumption is that it is hotspot optimizations that used due first reading. Do you have any idea how to confirm/solve this performance problem? Thanks for advice! p.s. I have billion hotspot optimization showed with -XX:+PrintCompilation but can not figure out what are important and what are not. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Windowed Operations
hi Diego, I have the same problem. // reduce by key in the first window val *w1* = *one*.reduceByKeyAndWindow(_ + _, Seconds(30), Seconds(10)) w1.count().print() //reduce by key in the second window based on the results of the first window val *w2* = *w1*.reduceByKeyAndWindow(_ + _, Seconds(120), Seconds(30)) w2.print() then w1 works, but w2 always does not print any information. Do you have any update for this issue? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Windowed-Operations-tp15133p16128.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark on Mesos Issue - Do I need to install Spark on Mesos slaves
Hi, I am trying to submit a Spark job on Mesos using spark-submit from my Mesos-Master machine. My SPARK_HOME = /vol1/spark/spark-1.0.2-bin-hadoop2 I have uploaded the spark-1.0.2-bin-hadoop2.tgz to hdfs so that the mesos slaves can download it to invoke the Mesos Spark backend executor. But on submitting the job, I can see the below error in 'stderr' logs on the Mesos slave machine: *sh: /vol1/spark/spark-1.0.2-bin-hadoop2/sbin/spark-executor: No such file or directory* Based on documentation,I understand that if I keep the spark-mesos binary file in hdfs,I dont need to install Spark separately on the slave nodes.So, the SPARK_HOME or /vol1/spark/spark-1.0.2-bin-hadoop2/ path is non-existent on any of my slave machines and hence the error. Now, my questions is: Shouldn't the mesos-slave be looking for the spark-executor command in the temporary directory where it is supposed to extract the spark-1.0.2-bin-hadoop2.tgz from hdfs,instead of the SPARK_HOME directory?What am I doing wrong here? Any help would be really appreciated. Thanks, Bijoy
Re: Can I run examples on cluster?
This is how the spark-cluster looks like So your driver program (example application) can be ran on the master (or anywhere which has access to the master - clustermanager) and the workers will execute it. Thanks Best Regards On Fri, Oct 10, 2014 at 2:47 PM, Theodore Si sjyz...@gmail.com wrote: Hi all, I want to use two nodes for test, one as master, the other worker. Can I submit the example application included in Spark source code tarball on master to let it run on the worker? What should I do? BR, Theo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark SQL parser bug?
Hi Mohammed, Would you mind to share the DDL of the table |x| and the complete stacktrace of the exception you got? A full Spark shell session history would be more than helpful. PR #2084 had been merged in master in Aug, and timestamp type is supported in 1.1. I tried the following snippets in Spark shell (v1.1), and didn’t observe this issue: |scala import org.apache.spark.sql._ import org.apache.spark.sql._ scala import sc._ import sc._ scala val sqlContext = new SQLContext(sc) sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@6c3441c5 scala import sqlContext._ import sqlContext._ scala case class Record(a: Int, ts: java.sql.Timestamp) defined class Record scala makeRDD(Seq.empty[Record], 1).registerTempTable(x) scala sql(SELECT a FROM x WHERE ts = '2012-01-01T00:00:00' AND ts = '2012-03-31T23:59:59') res1: org.apache.spark.sql.SchemaRDD = SchemaRDD[3] at RDD at SchemaRDD.scala:103 == Query Plan == == Physical Plan == Project [a#0] ExistingRdd [a#0,ts#1], MapPartitionsRDD[5] at mapPartitions at basicOperators.scala:208 scala res1.collect() ... res2: Array[org.apache.spark.sql.Row] = Array() | Cheng On 10/9/14 10:26 AM, Mohammed Guller wrote: Hi – When I run the following Spark SQL query in Spark-shell ( version 1.1.0) : val rdd = sqlContext.sql(SELECT a FROM x WHERE ts = '2012-01-01T00:00:00' AND ts = '2012-03-31T23:59:59' ) it gives the following error: rdd: org.apache.spark.sql.SchemaRDD = SchemaRDD[294] at RDD at SchemaRDD.scala:103 == Query Plan == == Physical Plan == java.util.NoSuchElementException: head of empty list The ts column in the where clause has timestamp data and is of type timestamp. If I replace the string '2012-01-01T00:00:00' in the where clause with its epoch value, then the query works fine. It looks I have run into an issue described in this pull request: https://github.com/apache/spark/pull/2084 Is that PR not merged in Spark version 1.1.0? Or am I missing something? Thanks, Mohammed
Re: Spark SQL - Exception only when using cacheTable
Hi Poiuytrez, what version of Spark are you using? Exception details like stacktrace are really needed to investigate this issue. You can find them in the executor logs, or just browse the application stderr/stdout link from Spark Web UI. On 10/9/14 9:37 PM, poiuytrez wrote: Hello, I have a weird issue, this request works fine: sqlContext.sql(SELECT customer_id FROM transactions WHERE purchaseamount = 200).count() However, when I cache the table before making the request: sqlContext.cacheTable(transactions) sqlContext.sql(SELECT customer_id FROM transactions WHERE purchaseamount = 200).count() I am getting an exception on of the task: : org.apache.spark.SparkException: Job aborted due to stage failure: Task 120 in stage 104.0 failed 4 times, most recent failure: Lost task 120.3 in stage 104.0 (TID 20537, spark-w-0.c.internal): java.lang.ClassCastException: (I have no details after the ':') Any ideas of what could be wrong? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Exception-only-when-using-cacheTable-tp16031.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Unable to share Sql between HiveContext and JDBC Thrift Server
Which version are you using? Also |.saveAsTable()| saves the table to Hive metastore, so you need to make sure your Spark application points to the same Hive metastore instance as the JDBC Thrift server. For example, put |hive-site.xml| under |$SPARK_HOME/conf|, and run |spark-shell| and |start-thriftserver.sh| under the same |$SPARK_HOME| should work. Just verified this against Spark 1.1. On 10/10/14 9:32 AM, Steve Arnold wrote: I am writing a Spark job to persist data using HiveContext so that it can be accessed via the JDBC Thrift server. Although my code doesn't throw an error, I am unable to see my persisted data when I query from the Thrift server. I tried three different ways to get this to work: 1) val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) val schemaRdd: SchemaRDD = sqlContext.createSchemaRDD(rdd) schemaRdd.saveAsTable(test_table) rdd - RDD of a case class sc - Spark Context Case class used in all my examples: case class SomeClass(key:String, value:String) extends Serializable 2) I then created a table called test_table after logging in to the thrift server and added two dummy records in it. val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) import sqlContext._ val usermeta = hql( SELECT key, value from test_table) val rdd = usermeta.map(t={SomeClass(3,i)}) val schemaRdd = createSchemaRDD(rdd) schemaRdd.insertInto(test_table) 3) Tried the documented link on the Spark Sql programming page val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) sqlContext.sql(CREATE TABLE IF NOT EXISTS test_table (key String, value String)) The Spark job does other computations which it is able to complete and returns the correct results; just the Sql part doesn't work. What am I doing wrong? I thought that the HiveContext could be accessed from running command line queries in the Thrift Server. Cheers, Steve
Re: Spark in cluster [ remote.EndpointWriter: AssociationError]
Sorry, but your solution doesn't work. I can see on my master port 7077 open and listening and connected workers but I don't understand why it's trying to connect itself ... = Master is running on the specific host netstat -at | grep 7077 You will get something similar to: tcp0 0 akhldz.master.io:7077 *:* LISTEN If that is the case, then from your worker machine do a host akhldz.master.io ( replace akhldz.master.io with your master host. If something goes wrong, then add a host entry in your /etc/hosts file) telnet akhldz.master.io 7077 ( If this is not connecting, then your worker wont connect either. ) = Adding Host entry in /etc/hosts Open /etc/hosts from your worker machine and add the following entry (example) 192.168.100.20 akhldz.master.io PS :In the above case Pillis was having two ip addresses having same host name eg: 192.168.100.40 s1.machine.org 192.168.100.41 s1.machine.org -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-in-cluster-remote-EndpointWriter-AssociationError-tp16063p16134.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: IOException and appcache FileNotFoundException in Spark 1.02
Thank you - I will try this. If I drop the partition count am I not more likely to hit memory issues? Especially if the dataset is rather large? On Oct 10, 2014 3:19 AM, Akhil Das ak...@sigmoidanalytics.com wrote: You could be hitting this issue https://issues.apache.org/jira/browse/SPARK-3633 (or similar). You can try the following workarounds: sc.set(spark.core.connection.ack.wait.timeout,600) sc.set(spark.akka.frameSize,50) Also reduce the number of partitions, you could be hitting the kernel's ulimit. I faced this issue and it was gone when i dropped the partitions from 1600 to 200. Thanks Best Regards On Fri, Oct 10, 2014 at 5:58 AM, Ilya Ganelin ilgan...@gmail.com wrote: Hi all – I could use some help figuring out a couple of exceptions I’ve been getting regularly. I have been running on a fairly large dataset (150 gigs). With smaller datasets I don't have any issues. My sequence of operations is as follows – unless otherwise specified, I am not caching: Map a 30 million row x 70 col string table to approx 30 mil x 5 string (For read as textFile I am using 1500 partitions) From that, map to ((a,b), score) and reduceByKey, numPartitions = 180 Then, extract distinct values for A and distinct values for B. (I cache the output of distinct), , numPartitions = 180 Zip with index for A and for B (to remap strings to int) Join remapped ids with original table This is then fed into MLLIBs ALS algorithm. I am running with: Spark version 1.02 with CDH5.1 numExecutors = 8, numCores = 14 Memory = 12g MemoryFration = 0.7 KryoSerialization My issue is that the code runs fine for a while but then will non-deterministically crash with either file IOExceptions or the following obscure error: 14/10/08 13:29:59 INFO TaskSetManager: Loss was due to java.io.IOException: Filesystem closed [duplicate 10] 14/10/08 13:30:08 WARN TaskSetManager: Loss was due to java.io.FileNotFoundException java.io.FileNotFoundException: /opt/cloudera/hadoop/1/yarn/nm/usercache/zjb238/appcache/application_1412717093951_0024/spark-local-20141008131827-c082/1c/shuffle_3_117_354 (No such file or directory) Looking through the logs, I see the IOException in other places but it appears to be non-catastrophic. The FileNotFoundException, however, is. I have found the following stack overflow that at least seems to address the IOException: http://stackoverflow.com/questions/24038908/spark-fails-on-big-shuffle-jobs-with-java-io-ioexception-filesystem-closed But I have not found anything useful at all with regards to the app cache error. Any help would be much appreciated.
Re: Spark SQL - Exception only when using cacheTable
I am using the python api. Unfortunately, I cannot find the isCached method equivalent in the documentation: https://spark.apache.org/docs/1.1.0/api/python/index.html in the SQLContext section. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Exception-only-when-using-cacheTable-tp16031p16137.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark SQL - Exception only when using cacheTable
Hi Cheng, I am using Spark 1.1.0. This is the stack trace: 14/10/10 12:17:40 WARN TaskSetManager: Lost task 120.0 in stage 7.0 (TID 2235, spark-w-0.c.db.internal): java.lang.ClassCastException: java.lang.Long cannot be cast to java.lang.Integer scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:106) org.apache.spark.sql.catalyst.expressions.GenericRow.getInt(Row.scala:146) org.apache.spark.sql.columnar.INT$.getField(ColumnType.scala:105) org.apache.spark.sql.columnar.INT$.getField(ColumnType.scala:92) org.apache.spark.sql.columnar.BasicColumnBuilder.appendFrom(ColumnBuilder.scala:72) org.apache.spark.sql.columnar.NativeColumnBuilder.org$apache$spark$sql$columnar$NullableColumnBuilder$$super$appendFrom(ColumnBuilder.scala:88) org.apache.spark.sql.columnar.NullableColumnBuilder$class.appendFrom(NullableColumnBuilder.scala:57) org.apache.spark.sql.columnar.NativeColumnBuilder.org$apache$spark$sql$columnar$compression$CompressibleColumnBuilder$$super$appendFrom(ColumnBuilder.scala:88) org.apache.spark.sql.columnar.compression.CompressibleColumnBuilder$class.appendFrom(CompressibleColumnBuilder.scala:76) org.apache.spark.sql.columnar.NativeColumnBuilder.appendFrom(ColumnBuilder.scala:88) org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$1$$anon$1.next(InMemoryColumnarTableScan.scala:65) org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$1$$anon$1.next(InMemoryColumnarTableScan.scala:50) org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:236) org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:163) org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70) org.apache.spark.rdd.RDD.iterator(RDD.scala:227) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) This was also printed on the driver: 14/10/10 12:17:43 ERROR TaskSetManager: Task 120 in stage 7.0 failed 4 times; aborting job 14/10/10 12:17:43 INFO TaskSchedulerImpl: Cancelling stage 7 14/10/10 12:17:43 INFO TaskSchedulerImpl: Stage 7 was cancelled 14/10/10 12:17:43 INFO DAGScheduler: Failed to run collect at SparkPlan.scala:85 Traceback (most recent call last): File stdin, line 1, in module File /home/hadoop/spark-install/python/pyspark/sql.py, line 1606, in count return self._jschema_rdd.count() File /home/hadoop/spark-install/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 538, in __call__ File /home/hadoop/spark-install/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o100.count. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 120 in stage 7.0 failed 4 times, most recent failure: Lost task 120.3 in stage 7.0 (TID 2248, spark-w-0.c.db.internal): java.lang.ClassCastException: Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at
Disabling log4j in Spark-Shell on ec2 stopped working on Wednesday (Oct 8)
For weeks, I've been using the following trick to successfully disable log4j in the spark-shell when running a cluster on ec2 that was started by the Spark provided ec2 scripts. cp ./conf/log4j.properties.template ./conf/log4j.properties I then change log4j.rootCategory=INFO to log4j.rootCategory=WARN. This all stopped working on Wednesday when I could no longer successfully start a cluster on ec2 (using the Spark provided ec2 scripts). I noticed the resolution to this problem was a script referenced by the ec2 scripts had been changed (and that this referenced script has since been reverted). I raise this as I don't know if this is a symptom of my problem and that it's interesting the problems started happening at the same time. When I now start up the cluster on ec2 and subsequently start the spark-shell I can no longer disable the log4j messages using the above trick. I'm using Apache Spark 1.1.0. What's interesting is that I can start the cluster locally on my laptop (using Spark 1.1.0) and the above trick for disabling log4j in the spark-shell works. So, the issue appears to be related to ec2 and potentially something referenced by the Spark provided ec2 startup script. But, that is purely a guess on my part. I'm wondering if anyone else has noticed this issue and if so has a workaround. Thanks. Darin.
Re: Can I run examples on cluster?
But I cannot do this via using ./bin/run-example SparkPi 10 right? On Fri, Oct 10, 2014 at 6:04 PM, Akhil Das ak...@sigmoidanalytics.com wrote: This is how the spark-cluster looks like So your driver program (example application) can be ran on the master (or anywhere which has access to the master - clustermanager) and the workers will execute it. Thanks Best Regards On Fri, Oct 10, 2014 at 2:47 PM, Theodore Si sjyz...@gmail.com wrote: Hi all, I want to use two nodes for test, one as master, the other worker. Can I submit the example application included in Spark source code tarball on master to let it run on the worker? What should I do? BR, Theo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Can I run examples on cluster?
Should I pack the example into a jar file and submit it on master? On Fri, Oct 10, 2014 at 9:32 PM, Theodore Si sjyz...@gmail.com wrote: But I cannot do this via using ./bin/run-example SparkPi 10 right? On Fri, Oct 10, 2014 at 6:04 PM, Akhil Das ak...@sigmoidanalytics.com wrote: This is how the spark-cluster looks like So your driver program (example application) can be ran on the master (or anywhere which has access to the master - clustermanager) and the workers will execute it. Thanks Best Regards On Fri, Oct 10, 2014 at 2:47 PM, Theodore Si sjyz...@gmail.com wrote: Hi all, I want to use two nodes for test, one as master, the other worker. Can I submit the example application included in Spark source code tarball on master to let it run on the worker? What should I do? BR, Theo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Can I run examples on cluster?
Yes, you can run it with --master=spark://your-spark-uri:7077 i believe. Thanks Best Regards On Fri, Oct 10, 2014 at 7:03 PM, Theodore Si sjyz...@gmail.com wrote: Should I pack the example into a jar file and submit it on master? On Fri, Oct 10, 2014 at 9:32 PM, Theodore Si sjyz...@gmail.com wrote: But I cannot do this via using ./bin/run-example SparkPi 10 right? On Fri, Oct 10, 2014 at 6:04 PM, Akhil Das ak...@sigmoidanalytics.com wrote: This is how the spark-cluster looks like So your driver program (example application) can be ran on the master (or anywhere which has access to the master - clustermanager) and the workers will execute it. Thanks Best Regards On Fri, Oct 10, 2014 at 2:47 PM, Theodore Si sjyz...@gmail.com wrote: Hi all, I want to use two nodes for test, one as master, the other worker. Can I submit the example application included in Spark source code tarball on master to let it run on the worker? What should I do? BR, Theo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Can I run examples on cluster?
spark-submit --class “Classname --master yarn-cluster jarfile(withcomplete path) This should work. On Fri, Oct 10, 2014 at 8:36 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Yes, you can run it with --master=spark://your-spark-uri:7077 i believe. Thanks Best Regards On Fri, Oct 10, 2014 at 7:03 PM, Theodore Si sjyz...@gmail.com wrote: Should I pack the example into a jar file and submit it on master? On Fri, Oct 10, 2014 at 9:32 PM, Theodore Si sjyz...@gmail.com wrote: But I cannot do this via using ./bin/run-example SparkPi 10 right? On Fri, Oct 10, 2014 at 6:04 PM, Akhil Das ak...@sigmoidanalytics.com wrote: This is how the spark-cluster looks like So your driver program (example application) can be ran on the master (or anywhere which has access to the master - clustermanager) and the workers will execute it. Thanks Best Regards On Fri, Oct 10, 2014 at 2:47 PM, Theodore Si sjyz...@gmail.com wrote: Hi all, I want to use two nodes for test, one as master, the other worker. Can I submit the example application included in Spark source code tarball on master to let it run on the worker? What should I do? BR, Theo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Regards, Haripriya Ayyalasomayajula
How does the Spark Accumulator work under the covers?
Hello, I was wondering on what does the Spark accumulator do under the covers. I’ve implemented my own associative addInPlace function for the accumulator, where is this function being run? Let’s say you call something like myRdd.map(x = sum += x) is “sum” being accumulated locally in any way, for each element or partition or node? Is “sum” a broadcast variable? Or does it only exist on the driver node? How does the driver node get access to the “sum”? Thanks, Areg
Re: MLUtil.kfold generates overlapped training and validation set?
Thanks, Xiangrui, I found the reason of overlapped training set and test set …. Another counter-intuitive issue related to https://github.com/apache/spark/pull/2508 Best, -- Nan Zhu On Friday, October 10, 2014 at 2:19 AM, Xiangrui Meng wrote: 1. No. 2. The seed per partition is fixed. So it should generate non-overlapping subsets. 3. There was a bug in 1.0, which was fixed in 1.0.1 and 1.1. Best, Xiangrui On Thu, Oct 9, 2014 at 11:05 AM, Nan Zhu zhunanmcg...@gmail.com (mailto:zhunanmcg...@gmail.com) wrote: Hi, all When we use MLUtils.kfold to generate training and validation set for cross validation we found that there is overlapped part in two sets…. from the code, it does sampling for twice for the same dataset @Experimental def kFold[T: ClassTag](rdd: RDD[T], numFolds: Int, seed: Int): Array[(RDD[T], RDD[T])] = { val numFoldsF = numFolds.toFloat (1 to numFolds).map { fold = val sampler = new BernoulliSampler[T]((fold - 1) / numFoldsF, fold / numFoldsF, complement = false) val validation = new PartitionwiseSampledRDD(rdd, sampler, true, seed) val training = new PartitionwiseSampledRDD(rdd, sampler.cloneComplement(), true, seed) (training, validation) }.toArray } the sampler is complement, there is still possibility to generate overlapped training and validation set because the sampling method looks like : override def sample(items: Iterator[T]): Iterator[T] = { items.filter { item = val x = rng.nextDouble() (x = lb x ub) ^ complement } } I’m not a machine learning guy, so I guess I must fall into one of the following three situations 1. does it mean actually we allow overlapped training and validation set ? (counter intuitive to me) 2. I had some misunderstanding on the code? 3. it’s a bug? Anyone can explain it to me? Best, -- Nan Zhu
Re: How does the Spark Accumulator work under the covers?
If you use parallelize, the data is distributed across multiple nodes available and sum is computed individually within each partition and later merged. The driver manages the entire process. Is my understanding correct? Can someone please correct me if I am wrong? On Fri, Oct 10, 2014 at 9:37 AM, Areg Baghdasaryan (BLOOMBERG/ 731 LEX -) abaghdasa...@bloomberg.net wrote: Hello, I was wondering on what does the Spark accumulator do under the covers. I’ve implemented my own associative addInPlace function for the accumulator, where is this function being run? Let’s say you call something like myRdd.map(x = sum += x) is “sum” being accumulated locally in any way, for each element or partition or node? Is “sum” a broadcast variable? Or does it only exist on the driver node? How does the driver node get access to the “sum”? Thanks, Areg -- Regards, Haripriya Ayyalasomayajula
RE: Spark on Mesos Issue - Do I need to install Spark on Mesos slaves
spark-defaults.conf spark.executor.uri hdfs://:9000/user//spark-1.1.0-bin-hadoop2.4.tgz From: Bijoy Deb [mailto:bijoy.comput...@gmail.com] Sent: den 10 oktober 2014 11:59 To: user@spark.apache.org Subject: Spark on Mesos Issue - Do I need to install Spark on Mesos slaves Hi, I am trying to submit a Spark job on Mesos using spark-submit from my Mesos-Master machine. My SPARK_HOME = /vol1/spark/spark-1.0.2-bin-hadoop2 I have uploaded the spark-1.0.2-bin-hadoop2.tgz to hdfs so that the mesos slaves can download it to invoke the Mesos Spark backend executor. But on submitting the job, I can see the below error in 'stderr' logs on the Mesos slave machine: sh: /vol1/spark/spark-1.0.2-bin-hadoop2/sbin/spark-executor: No such file or directory Based on documentation,I understand that if I keep the spark-mesos binary file in hdfs,I dont need to install Spark separately on the slave nodes.So, the SPARK_HOME or /vol1/spark/spark-1.0.2-bin-hadoop2/ path is non-existent on any of my slave machines and hence the error. Now, my questions is: Shouldn't the mesos-slave be looking for the spark-executor command in the temporary directory where it is supposed to extract the spark-1.0.2-bin-hadoop2.tgz from hdfs,instead of the SPARK_HOME directory?What am I doing wrong here? Any help would be really appreciated. Thanks, Bijoy
Breaking the previous large-scale sort record with Spark
Hi folks, I interrupt your regularly scheduled user / dev list to bring you some pretty cool news for the project, which is that we've been able to use Spark to break MapReduce's 100 TB and 1 PB sort records, sorting data 3x faster on 10x fewer nodes. There's a detailed writeup at http://databricks.com/blog/2014/10/10/spark-breaks-previous-large-scale-sort-record.html. Summary: while Hadoop MapReduce held last year's 100 TB world record by sorting 100 TB in 72 minutes on 2100 nodes, we sorted it in 23 minutes on 206 nodes; and we also scaled up to sort 1 PB in 234 minutes. I want to thank Reynold Xin for leading this effort over the past few weeks, along with Parviz Deyhim, Xiangrui Meng, Aaron Davidson and Ali Ghodsi. In addition, we'd really like to thank Amazon's EC2 team for providing the machines to make this possible. Finally, this result would of course not be possible without the many many other contributions, testing and feature requests from throughout the community. For an engine to scale from these multi-hour petabyte batch jobs down to 100-millisecond streaming and interactive queries is quite uncommon, and it's thanks to all of you folks that we are able to make this happen. Matei - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Breaking the previous large-scale sort record with Spark
Awesome news Matei ! Congratulations to the databricks team and all the community members... On Fri, Oct 10, 2014 at 7:54 AM, Matei Zaharia matei.zaha...@gmail.com wrote: Hi folks, I interrupt your regularly scheduled user / dev list to bring you some pretty cool news for the project, which is that we've been able to use Spark to break MapReduce's 100 TB and 1 PB sort records, sorting data 3x faster on 10x fewer nodes. There's a detailed writeup at http://databricks.com/blog/2014/10/10/spark-breaks-previous-large-scale-sort-record.html. Summary: while Hadoop MapReduce held last year's 100 TB world record by sorting 100 TB in 72 minutes on 2100 nodes, we sorted it in 23 minutes on 206 nodes; and we also scaled up to sort 1 PB in 234 minutes. I want to thank Reynold Xin for leading this effort over the past few weeks, along with Parviz Deyhim, Xiangrui Meng, Aaron Davidson and Ali Ghodsi. In addition, we'd really like to thank Amazon's EC2 team for providing the machines to make this possible. Finally, this result would of course not be possible without the many many other contributions, testing and feature requests from throughout the community. For an engine to scale from these multi-hour petabyte batch jobs down to 100-millisecond streaming and interactive queries is quite uncommon, and it's thanks to all of you folks that we are able to make this happen. Matei - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Breaking the previous large-scale sort record with Spark
Brilliant stuff ! Congrats all :-) This is indeed really heartening news ! Regards, Mridul On Fri, Oct 10, 2014 at 8:24 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Hi folks, I interrupt your regularly scheduled user / dev list to bring you some pretty cool news for the project, which is that we've been able to use Spark to break MapReduce's 100 TB and 1 PB sort records, sorting data 3x faster on 10x fewer nodes. There's a detailed writeup at http://databricks.com/blog/2014/10/10/spark-breaks-previous-large-scale-sort-record.html. Summary: while Hadoop MapReduce held last year's 100 TB world record by sorting 100 TB in 72 minutes on 2100 nodes, we sorted it in 23 minutes on 206 nodes; and we also scaled up to sort 1 PB in 234 minutes. I want to thank Reynold Xin for leading this effort over the past few weeks, along with Parviz Deyhim, Xiangrui Meng, Aaron Davidson and Ali Ghodsi. In addition, we'd really like to thank Amazon's EC2 team for providing the machines to make this possible. Finally, this result would of course not be possible without the many many other contributions, testing and feature requests from throughout the community. For an engine to scale from these multi-hour petabyte batch jobs down to 100-millisecond streaming and interactive queries is quite uncommon, and it's thanks to all of you folks that we are able to make this happen. Matei - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
SparkSQL LEFT JOIN problem
Hi, I am exploring SparkSQL 1.1.0, I have a problem on LEFT JOIN. Here is the request: select * from customer left join profile on customer.account_id = profile.account_id The two tables' schema are shown as following: // Table: customer root |-- account_id: string (nullable = false) |-- birthday: string (nullable = true) |-- preferstore: string (nullable = true) |-- registstore: string (nullable = true) |-- gender: string (nullable = true) |-- city_name_en: string (nullable = true) |-- register_date: string (nullable = true) |-- zip: string (nullable = true) // Table: profile root |-- account_id: string (nullable = false) |-- card_type: string (nullable = true) |-- card_upgrade_time_black: string (nullable = true) |-- card_upgrade_time_gold: string (nullable = true) However, I have always an exception: Exception in thread main org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved attributes: *, tree: Project [*] Join LeftOuter, Some(('customer.account_id = 'profile.account_id)) Subquery customer SparkLogicalPlan (ExistingRdd [account_id#0,birthday#1,preferstore#2,registstore#3,gender#4,city_name_en#5,register_date#6,zip#7], MappedRDD[5] at map at SQLFetcher.scala:43) Subquery profile SparkLogicalPlan (ExistingRdd [account_id#8,card_type#9,card_upgrade_time_black#10,card_upgrade_time_gold#11], MappedRDD[12] at map at SQLFetcher.scala:43) I was not sure where the problem is. So I create two simple tables to isolate the problem. // table 1 a b c 4 8 9 1 3 4 3 4 5 // table 2 a b c 1 2 3 4 5 6 This time, it works. So the problem might be in data. I have just sampled some lines of input tables to create new ones. This also works. I am so confused. The problem is in the data, but the error messages are not enough to find it (if I am not missing anything.) Some lines of the sampled tables. // Table: customer [50660,1975-06-05 00:00:00.000,13,12,male,ningboshi,2006-12-14 00:00:00.000,] [50666,1984-02-23 00:00:00.000,72,5,Female,beijingshi,2006-12-14 00:00:00.000,100086] [50680,1976-11-25 00:00:00.000,59,5,Female,beijingshi,2006-12-14 00:00:00.000,100022] [85,1971-03-27 00:00:00.000,2,2,Female,shanghaishi,2005-09-20 00:00:00.000,200336] // Table: profile [1144681,3,2010-02-18 00:00:00.000,2013-02-28 00:00:00.000] [50666,2,2010-10-31 00:00:00.000,] [3930657,1,,] [1056365,2,2009-12-29 00:00:00.000,] Any help ? =) Hao -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-LEFT-JOIN-problem-tp16152.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Does Ipython notebook work with spark? trivial example does not work. Re: bug with IPython notebook?
PySpark definetly works for me in ipython notebook. A good way to debug is do setMaster(local) in your python sc object, see if that works. Then from there, modify it to point to the real spark server. Also, I added a hack where i did sys.path.insert the path to pyspark in my python note book to get it working properly. You can try these instructions out if you want which i recently put together based on some other stuff online + a few minor modifications . http://jayunit100.blogspot.com/2014/07/ipython-on-spark.html On Thu, Oct 9, 2014 at 2:50 PM, Andy Davidson a...@santacruzintegration.com wrote: I wonder if I am starting iPython notebook incorrectly. The example in my original email does not work. It looks like stdout is not configured correctly If I submit it as a python.py file It works correctly Any idea how I what the problem is? Thanks Andy From: Andrew Davidson a...@santacruzintegration.com Date: Tuesday, October 7, 2014 at 4:23 PM To: user@spark.apache.org user@spark.apache.org Subject: bug with IPython notebook? Hi I think I found a bug in the iPython notebook integration. I am not sure how to report it I am running spark-1.1.0-bin-hadoop2.4 on an AWS ec2 cluster. I start the cluster using the launch script provided by spark I start iPython notebook on my cluster master as follows and use an ssh tunnel to open the notebook in a browser running on my local computer ec2-user@ip-172-31-20-107 ~]$ IPYTHON_OPTS=notebook --pylab inline --no-browser --port=7000 /root/spark/bin/pyspark Bellow is the code my notebook executes Bug list: 1. Why do I need to create a SparkContext? If I run pyspark interactively The context is created automatically for me 2. The print statement causes the output to be displayed in the terminal I started pyspark, not in the notebooks output Any comments or suggestions would be greatly appreciated Thanks Andy import sys from operator import add from pyspark import SparkContext # only stand alone jobs should create a SparkContext sc = SparkContext(appName=pyStreamingSparkRDDPipe”) data = [1, 2, 3, 4, 5] rdd = sc.parallelize(data) def echo(data): print python recieved: %s % (data) # output winds up in the shell console in my cluster (ie. The machine I launched pyspark from) rdd.foreach(echo) print we are done -- jay vyas
Re: Executor and BlockManager memory size
Hey Larry, I have been trying to figure this out for standalone clusters as well. http://apache-spark-user-list.1001560.n3.nabble.com/What-is-a-Block-Manager-td12833.html has an answer as to what block manager is for. From the documentation, what I understood was if you assign X GB to each executor, spark.storage.memoryFraction(default 0.6) * X is assigned to the BlockManager and the rest for the JVM itself(?). However, as you see, 26.8G is assigned to the BM, and assuming 0.6 memoryFraction, this means the executor sees ~44.7G of memory, I am not sure what happens to the difference(5.3G). On Thu, Oct 9, 2014 at 9:40 PM, Larry Xiao xia...@sjtu.edu.cn wrote: Hi all, I'm confused about Executor and BlockManager, why they have different memory. 14/10/10 08:50:02 INFO AppClient$ClientActor: Executor added: app-20141010085001-/2 on worker-20141010004933-brick6-35657 (brick6:35657) with 6 cores 14/10/10 08:50:02 INFO SparkDeploySchedulerBackend: Granted executor ID app-20141010085001-/2 on hostPort brick6:35657 with 6 cores, 50.0 GB RAM 14/10/10 08:50:07 INFO BlockManagerMasterActor: Registering block manager brick6:53296 with 26.8 GB RAM and on the WebUI, Executor IDAddressRDD Blocks Memory UsedDisk UsedActive TasksFailed Tasks Complete TasksTotal TasksTask TimeInput Shuffle ReadShuffle Write 0brick3:3760700.0 B / 26.8 GB0.0 B60 060 ms0.0 B0.0 B0.0 B 1brick1:5949300.0 B / 26.8 GB0.0 B60 060 ms0.0 B0.0 B0.0 B 2brick6:5329600.0 B / 26.8 GB0.0 B60 060 ms0.0 B0.0 B0.0 B 3brick5:3854300.0 B / 26.8 GB0.0 B60 060 ms0.0 B0.0 B0.0 B 4brick2:4493700.0 B / 26.8 GB0.0 B60 060 ms0.0 B0.0 B0.0 B 5brick4:4679800.0 B / 26.8 GB0.0 B60 060 ms0.0 B0.0 B0.0 B driverbrick0:5769200.0 B / 274.6 MB0.0 B 000 00 ms0.0 B0.0 B0.0 B As I understand it, a worker consist of a daemon and an executor, and executor takes charge both execution and storage. So does it mean that 26.8 GB is saved for storage and the rest is for execution? Another question is that, throughout execution, it seems that the blockmanager is always almost free. 14/10/05 14:33:44 INFO BlockManagerInfo: Added broadcast_21_piece0 in memory on brick2:57501 (size: 1669.0 B, free: 21.2 GB) I don't know what I'm missing here. Best regards, Larry - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Breaking the previous large-scale sort record with Spark
Great! Congratulations! -- Nan Zhu On Friday, October 10, 2014 at 11:19 AM, Mridul Muralidharan wrote: Brilliant stuff ! Congrats all :-) This is indeed really heartening news ! Regards, Mridul On Fri, Oct 10, 2014 at 8:24 PM, Matei Zaharia matei.zaha...@gmail.com (mailto:matei.zaha...@gmail.com) wrote: Hi folks, I interrupt your regularly scheduled user / dev list to bring you some pretty cool news for the project, which is that we've been able to use Spark to break MapReduce's 100 TB and 1 PB sort records, sorting data 3x faster on 10x fewer nodes. There's a detailed writeup at http://databricks.com/blog/2014/10/10/spark-breaks-previous-large-scale-sort-record.html. Summary: while Hadoop MapReduce held last year's 100 TB world record by sorting 100 TB in 72 minutes on 2100 nodes, we sorted it in 23 minutes on 206 nodes; and we also scaled up to sort 1 PB in 234 minutes. I want to thank Reynold Xin for leading this effort over the past few weeks, along with Parviz Deyhim, Xiangrui Meng, Aaron Davidson and Ali Ghodsi. In addition, we'd really like to thank Amazon's EC2 team for providing the machines to make this possible. Finally, this result would of course not be possible without the many many other contributions, testing and feature requests from throughout the community. For an engine to scale from these multi-hour petabyte batch jobs down to 100-millisecond streaming and interactive queries is quite uncommon, and it's thanks to all of you folks that we are able to make this happen. Matei - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org (mailto:dev-unsubscr...@spark.apache.org) For additional commands, e-mail: dev-h...@spark.apache.org (mailto:dev-h...@spark.apache.org) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org (mailto:user-unsubscr...@spark.apache.org) For additional commands, e-mail: user-h...@spark.apache.org (mailto:user-h...@spark.apache.org)
Re: java.io.IOException Error in task deserialization
I haven't seen this at all since switching to HttpBroadcast. It seems TorrentBroadcast might have some issues? On Thu, Oct 9, 2014 at 4:28 PM, Sung Hwan Chung coded...@cs.stanford.edu wrote: I don't think that I saw any other error message. This is all I saw. I'm currently experimenting to see if this can be alleviated by using HttpBroadcastFactory instead of TorrentBroadcast. So far, with HttpBroadcast, I haven't seen this recurring as of yet. I'll keep you posted. On Thu, Oct 9, 2014 at 4:21 PM, Davies Liu dav...@databricks.com wrote: This exception should be caused by another one, could you paste all of them here? Also, that will be great if you can provide a script to reproduce this problem. Thanks! On Fri, Sep 26, 2014 at 6:11 AM, Arun Ahuja aahuj...@gmail.com wrote: Has anyone else seen this erorr in task deserialization? The task is processing a small amount of data and doesn't seem to have much data hanging to the closure? I've only seen this with Spark 1.1 Job aborted due to stage failure: Task 975 in stage 8.0 failed 4 times, most recent failure: Lost task 975.3 in stage 8.0 (TID 24777, host.com): java.io.IOException: unexpected exception type java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1538) java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1025) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:159) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:744) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Breaking the previous large-scale sort record with Spark
Wonderful !! On 11 Oct, 2014, at 12:00 am, Nan Zhu zhunanmcg...@gmail.com wrote: Great! Congratulations! -- Nan Zhu On Friday, October 10, 2014 at 11:19 AM, Mridul Muralidharan wrote: Brilliant stuff ! Congrats all :-) This is indeed really heartening news ! Regards, Mridul On Fri, Oct 10, 2014 at 8:24 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Hi folks, I interrupt your regularly scheduled user / dev list to bring you some pretty cool news for the project, which is that we've been able to use Spark to break MapReduce's 100 TB and 1 PB sort records, sorting data 3x faster on 10x fewer nodes. There's a detailed writeup at http://databricks.com/blog/2014/10/10/spark-breaks-previous-large-scale-sort-record.html. Summary: while Hadoop MapReduce held last year's 100 TB world record by sorting 100 TB in 72 minutes on 2100 nodes, we sorted it in 23 minutes on 206 nodes; and we also scaled up to sort 1 PB in 234 minutes. I want to thank Reynold Xin for leading this effort over the past few weeks, along with Parviz Deyhim, Xiangrui Meng, Aaron Davidson and Ali Ghodsi. In addition, we'd really like to thank Amazon's EC2 team for providing the machines to make this possible. Finally, this result would of course not be possible without the many many other contributions, testing and feature requests from throughout the community. For an engine to scale from these multi-hour petabyte batch jobs down to 100-millisecond streaming and interactive queries is quite uncommon, and it's thanks to all of you folks that we are able to make this happen. Matei - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Rdd repartitioning
Hi, I was facing GC overhead errors while executing an application with 570MB data(with rdd replication). In order to fix the heap errors, I repartitioned the rdd to 10: val logData = sc.textFile(hdfs:/text_data/text data.txt).persist(StorageLevel.MEMORY_ONLY_2) val parts=logData.coalesce(10,true) println(parts.partitions.length). But the problem is, WebUI still shows number of partitions as 5 while the print statement outputs 10. I tried even repartition(), but face the same problem. Also, does webUI show the storage details of each partition twice when I replicate the rdd? Because, I see that webUI displays each partition only once while it says 2 x replicated. Can someone help me out in this!!! -Karthik
Spark job (not Spark streaming) doesn't delete un-needed checkpoints.
Un-needed checkpoints are not getting automatically deleted in my application. I.e. the lineage looks something like this and checkpoints simply accumulate in a temporary directory (every lineage point, however, does zip with a globally permanent): PermanentRDD:Global zips with all the intermediate ones Intermediate RDDs: A---B---CDEF-G | | | checkpoint checkpoint checkpoint Older intermediate RDDs never get used.
Re: Breaking the previous large-scale sort record with Spark
Great stuff. Wonderful to see such progress in so short a time. How about some links to code and instructions so that these benchmarks can be reproduced? Regards, - Steve From: Debasish Das debasish.da...@gmail.com Date: Friday, October 10, 2014 at 8:17 To: Matei Zaharia matei.zaha...@gmail.com Cc: user user@spark.apache.org, dev d...@spark.apache.org Subject: Re: Breaking the previous large-scale sort record with Spark Awesome news Matei ! Congratulations to the databricks team and all the community members... On Fri, Oct 10, 2014 at 7:54 AM, Matei Zaharia matei.zaha...@gmail.com wrote: Hi folks, I interrupt your regularly scheduled user / dev list to bring you some pretty cool news for the project, which is that we've been able to use Spark to break MapReduce's 100 TB and 1 PB sort records, sorting data 3x faster on 10x fewer nodes. There's a detailed writeup at http://databricks.com/blog/2014/10/10/spark-breaks-previous-large-scale-sort- record.html. Summary: while Hadoop MapReduce held last year's 100 TB world record by sorting 100 TB in 72 minutes on 2100 nodes, we sorted it in 23 minutes on 206 nodes; and we also scaled up to sort 1 PB in 234 minutes. I want to thank Reynold Xin for leading this effort over the past few weeks, along with Parviz Deyhim, Xiangrui Meng, Aaron Davidson and Ali Ghodsi. In addition, we'd really like to thank Amazon's EC2 team for providing the machines to make this possible. Finally, this result would of course not be possible without the many many other contributions, testing and feature requests from throughout the community. For an engine to scale from these multi-hour petabyte batch jobs down to 100-millisecond streaming and interactive queries is quite uncommon, and it's thanks to all of you folks that we are able to make this happen. Matei - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- CONFIDENTIALITY NOTICE NOTICE: This message is intended for the use of the individual or entity to which it is addressed and may contain information that is confidential, privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any printing, copying, dissemination, distribution, disclosure or forwarding of this communication is strictly prohibited. If you have received this communication in error, please contact the sender immediately and delete it from your system. Thank You.
Re: How does the Spark Accumulator work under the covers?
Hi Areg, Check out http://spark.apache.org/docs/latest/programming-guide.html#accumulators val sum = sc.accumulator(0) // accumulator created from an initial value in the driver The accumulator variable is created in the driver. Tasks running on the cluster can then add to it. However, they cannot read its value. Only the driver program can read the accumulator’s value, using its value method. sum.value // in the driver myRdd.map(x = sum += x) where is this function being run This is being run by the tasks in the workers. The driver accumulates the data from the various workers and mergers them to get the final result as Haripriya mentioned. Thanks, Jayant On Fri, Oct 10, 2014 at 7:46 AM, HARIPRIYA AYYALASOMAYAJULA aharipriy...@gmail.com wrote: If you use parallelize, the data is distributed across multiple nodes available and sum is computed individually within each partition and later merged. The driver manages the entire process. Is my understanding correct? Can someone please correct me if I am wrong? On Fri, Oct 10, 2014 at 9:37 AM, Areg Baghdasaryan (BLOOMBERG/ 731 LEX -) abaghdasa...@bloomberg.net wrote: Hello, I was wondering on what does the Spark accumulator do under the covers. I’ve implemented my own associative addInPlace function for the accumulator, where is this function being run? Let’s say you call something like myRdd.map(x = sum += x) is “sum” being accumulated locally in any way, for each element or partition or node? Is “sum” a broadcast variable? Or does it only exist on the driver node? How does the driver node get access to the “sum”? Thanks, Areg -- Regards, Haripriya Ayyalasomayajula
Application failure in yarn-cluster mode
Hi, After updating from spark-1.0.0 to spark-1.1.0, my spark applications failed most of the time (but not always) in yarn-cluster mode (but not in yarn-client mode). Here is my configuration: * spark-1.1.0 * hadoop-2.2.0 And the hadoop.tmp.dir definition in the hadoop core-site.xml file (each directory is on its own partition): property namehadoop.tmp.dir/name valuefile:/d1/yarn/local,file:/d2/yarn/local,file:/d3/yarn/local,file:/d4/yarn/local,file:/d5/yarn/local,file:/d6/yarn/local,file:/d7/yarn/local/value /property After investigating, it turns out that the problem is when the executor fetches a jar file: the jar is downloaded in a temporary file, always in /d1/yarn/local (see hadoop.tmp.dir definition above), and then moved in one of the temporary directory defined in hadoop.tmp.dir: * if it is the same than the temporary file (i.e. /d1/yarn/local), then the application continues normally * if it is another one (i.e. /d2/yarn/local, /d3/yarn/local,...), it fails with the following error: 14/10/10 14:33:51 ERROR executor.Executor: Exception in task 0.0 in stage 1.0 (TID 0) java.io.FileNotFoundException: ./logReader-1.0.10.jar (Permission denied) at java.io.FileOutputStream.open(Native Method) at java.io.FileOutputStream.init(FileOutputStream.java:221) at com.google.common.io.Files$FileByteSink.openStream(Files.java:223) at com.google.common.io.Files$FileByteSink.openStream(Files.java:211) at com.google.common.io.ByteSource.copyTo(ByteSource.java:203) at com.google.common.io.Files.copy(Files.java:436) at com.google.common.io.Files.move(Files.java:651) at org.apache.spark.util.Utils$.fetchFile(Utils.scala:440) at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:325) at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:323) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$updateDependencies(Executor.scala:323) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:158) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) I have no idea why the move fails when the source and target files are not on the same partition, for the moment I have worked around the problem with the attached patch (i.e. I ensure that the temp file and the moved file are always on the same partition). Any thought about this problem? Thanks! Christophe. Kelkoo SAS Société par Actions Simplifiée Au capital de € 4.168.964,30 Siège social : 8, rue du Sentier 75002 Paris 425 093 069 RCS Paris Ce message et les pièces jointes sont confidentiels et établis à l'attention exclusive de leurs destinataires. Si vous n'êtes pas le destinataire de ce message, merci de le détruire et d'en avertir l'expéditeur. --- core/src/main/scala/org/apache/spark/util/Utils.scala.orig 2014-09-03 08:00:33.0 +0200 +++ core/src/main/scala/org/apache/spark/util/Utils.scala 2014-10-10 17:51:59.0 +0200 @@ -349,8 +349,7 @@ */ def fetchFile(url: String, targetDir: File, conf: SparkConf, securityMgr: SecurityManager) { val filename = url.split(/).last -val tempDir = getLocalDir(conf) -val tempFile = File.createTempFile(fetchFileTemp, null, new File(tempDir)) +val tempFile = File.createTempFile(fetchFileTemp, null, new File(targetDir.getAbsolutePath)) val targetFile = new File(targetDir, filename) val uri = new URI(url) val fileOverwrite = conf.getBoolean(spark.files.overwrite, false) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SparkSQL LEFT JOIN problem
Hi Can you try select birthday from customer left join profile on customer.account_id = profile.account_id to see if the problems remains on your entire data? Thanks, Liquan On Fri, Oct 10, 2014 at 8:20 AM, invkrh inv...@gmail.com wrote: Hi, I am exploring SparkSQL 1.1.0, I have a problem on LEFT JOIN. Here is the request: select * from customer left join profile on customer.account_id = profile.account_id The two tables' schema are shown as following: // Table: customer root |-- account_id: string (nullable = false) |-- birthday: string (nullable = true) |-- preferstore: string (nullable = true) |-- registstore: string (nullable = true) |-- gender: string (nullable = true) |-- city_name_en: string (nullable = true) |-- register_date: string (nullable = true) |-- zip: string (nullable = true) // Table: profile root |-- account_id: string (nullable = false) |-- card_type: string (nullable = true) |-- card_upgrade_time_black: string (nullable = true) |-- card_upgrade_time_gold: string (nullable = true) However, I have always an exception: Exception in thread main org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved attributes: *, tree: Project [*] Join LeftOuter, Some(('customer.account_id = 'profile.account_id)) Subquery customer SparkLogicalPlan (ExistingRdd [account_id#0,birthday#1,preferstore#2,registstore#3,gender#4,city_name_en#5,register_date#6,zip#7], MappedRDD[5] at map at SQLFetcher.scala:43) Subquery profile SparkLogicalPlan (ExistingRdd [account_id#8,card_type#9,card_upgrade_time_black#10,card_upgrade_time_gold#11], MappedRDD[12] at map at SQLFetcher.scala:43) I was not sure where the problem is. So I create two simple tables to isolate the problem. // table 1 a b c 4 8 9 1 3 4 3 4 5 // table 2 a b c 1 2 3 4 5 6 This time, it works. So the problem might be in data. I have just sampled some lines of input tables to create new ones. This also works. I am so confused. The problem is in the data, but the error messages are not enough to find it (if I am not missing anything.) Some lines of the sampled tables. // Table: customer [50660,1975-06-05 00:00:00.000,13,12,male,ningboshi,2006-12-14 00:00:00.000,] [50666,1984-02-23 00:00:00.000,72,5,Female,beijingshi,2006-12-14 00:00:00.000,100086] [50680,1976-11-25 00:00:00.000,59,5,Female,beijingshi,2006-12-14 00:00:00.000,100022] [85,1971-03-27 00:00:00.000,2,2,Female,shanghaishi,2005-09-20 00:00:00.000,200336] // Table: profile [1144681,3,2010-02-18 00:00:00.000,2013-02-28 00:00:00.000] [50666,2,2010-10-31 00:00:00.000,] [3930657,1,,] [1056365,2,2009-12-29 00:00:00.000,] Any help ? =) Hao -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-LEFT-JOIN-problem-tp16152.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Liquan Pei Department of Physics University of Massachusetts Amherst
RE: Spark on Mesos Issue - Do I need to install Spark on Mesos slaves
I have actually had the same problem. spark.executor.uri on HDFS did not work so I had to put it in a local folder -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-on-Mesos-Issue-Do-I-need-to-install-Spark-on-Mesos-slaves-tp16129p16165.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Help with using combineByKey
Thank you guys! It was very helpful and now I understand it better. On Fri, Oct 10, 2014 at 1:38 AM, Davies Liu dav...@databricks.com wrote: Maybe this version is easier to use: plist.mapValues((v) = (if (v 0) 1 else 0, 1)).reduceByKey((x, y) = (x._1 + y._1, x._2 + y._2)) It has similar behavior with combineByKey(), will by faster than groupByKey() version. On Thu, Oct 9, 2014 at 9:28 PM, HARIPRIYA AYYALASOMAYAJULA aharipriy...@gmail.com wrote: Sean, Thank you. It works. But I am still confused about the function. Can you kindly throw some light on it? I was going through the example mentioned in https://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch04.html Is there any better source through which I can learn more about these functions? It would be helpful if I can get a chance to look at more examples. Also, I assume using combineByKey helps us solve it parallel than using simple functions provided by scala as mentioned by Yana. Am I correct? On Thu, Oct 9, 2014 at 12:30 PM, Sean Owen so...@cloudera.com wrote: Oh duh, sorry. The initialization should of course be (v) = (if (v 0) 1 else 0, 1) This gives the answer you are looking for. I don't see what Part2 is supposed to do differently. On Thu, Oct 9, 2014 at 6:14 PM, HARIPRIYA AYYALASOMAYAJULA aharipriy...@gmail.com wrote: Hello Sean, Thank you, but changing from v to 1 doesn't help me either. I am trying to count the number of non-zero values using the first accumulator. val newlist = List ((LAX,6), (LAX,0), (LAX,7), (SFO,0), (SFO,0), (SFO,9)) val plist = sc.parallelize(newlist) val part1 = plist.combineByKey( (v) = (1, 1), (acc: (Int, Int), v) = ( if(v 0) acc._1 + 1 else acc._1, acc._2 + 1), (acc1: (Int, Int), acc2: (Int, Int)) = (acc1._1 + acc2._1, acc1._2 + acc2._2) ) val Part2 = part1.map{ case (key, value) = (key, (value._1,value._2)) } This should give me the result (LAX,(2,3)) (SFO,(1,3)) On Thu, Oct 9, 2014 at 11:48 AM, Sean Owen so...@cloudera.com wrote: You have a typo in your code at var acc:, and the map from opPart1 to opPart2 looks like a no-op, but those aren't the problem I think. It sounds like you intend the first element of each pair to be a count of nonzero values, but you initialize the first element of the pair to v, not 1, in v = (v,1). Try v = (1,1) On Thu, Oct 9, 2014 at 4:47 PM, HARIPRIYA AYYALASOMAYAJULA aharipriy...@gmail.com wrote: I am a beginner to Spark and finding it difficult to implement a very simple reduce operation. I read that is ideal to use combineByKey for complex reduce operations. My input: val input = sc.parallelize(List((LAX,6), (LAX,8), (LAX,7), (SFO,0), (SFO,1), (SFO,9),(PHX,65),(PHX,88),(KX,7),(KX,6),(KX,1), (KX,9), (HOU,56),(HOU,5),(HOU,59),(HOU,0),(MA,563),(MA,545),(MA,5),(MA,0),(MA,0))) val opPart1 = input.combineByKey( (v) = (v, 1), (var acc: (Int, Int), v) = ( if(v 0) acc._1 + 1 else acc._1, acc._2 + 1), (acc1: (Int, Int), acc2: (Int, Int)) = (acc1._1 + acc2._1, acc1._2 + acc2._2) ) val opPart2 = opPart1.map{ case (key, value) = (key, (value._1,value._2)) } opPart2.collectAsMap().map(println(_)) If the value is greater than 0, the first accumulator should be incremented by 1, else it remains the same. The second accumulator is a simple counter for each value. I am getting an incorrect output (garbage values )for the first accumulator. Please help. The equivalent reduce operation in Hadoop MapReduce is : public static class PercentageCalcReducer extends ReducerText,IntWritable,Text,FloatWritable { private FloatWritable pdelay = new FloatWritable(); public void reduce(Text key, IterableIntWritable values,Context context)throws IOException,InterruptedException { int acc2=0; float frac_delay, percentage_delay; int acc1=0; for(IntWritable val : values) { if(val.get() 0) { acc1++; } acc2++; } frac_delay = (float)acc1/acc2; percentage_delay = frac_delay * 100 ; pdelay.set(percentage_delay); context.write(key,pdelay); } } Please help. Thank you for your time. -- Regards, Haripriya Ayyalasomayajula contact : 650-796-7112 -- Regards, Haripriya Ayyalasomayajula contact : 650-796-7112 -- Regards, Haripriya Ayyalasomayajula contact : 650-796-7112 -- Regards, Haripriya Ayyalasomayajula contact : 650-796-7112
Re: java.io.IOException Error in task deserialization
Maybe, TorrentBroadcast is more complicated than HttpBroadcast, could you tell us how to reproduce this issue? That will help us a lot to improve TorrentBroadcast. Thanks! On Fri, Oct 10, 2014 at 8:46 AM, Sung Hwan Chung coded...@cs.stanford.edu wrote: I haven't seen this at all since switching to HttpBroadcast. It seems TorrentBroadcast might have some issues? On Thu, Oct 9, 2014 at 4:28 PM, Sung Hwan Chung coded...@cs.stanford.edu wrote: I don't think that I saw any other error message. This is all I saw. I'm currently experimenting to see if this can be alleviated by using HttpBroadcastFactory instead of TorrentBroadcast. So far, with HttpBroadcast, I haven't seen this recurring as of yet. I'll keep you posted. On Thu, Oct 9, 2014 at 4:21 PM, Davies Liu dav...@databricks.com wrote: This exception should be caused by another one, could you paste all of them here? Also, that will be great if you can provide a script to reproduce this problem. Thanks! On Fri, Sep 26, 2014 at 6:11 AM, Arun Ahuja aahuj...@gmail.com wrote: Has anyone else seen this erorr in task deserialization? The task is processing a small amount of data and doesn't seem to have much data hanging to the closure? I've only seen this with Spark 1.1 Job aborted due to stage failure: Task 975 in stage 8.0 failed 4 times, most recent failure: Lost task 975.3 in stage 8.0 (TID 24777, host.com): java.io.IOException: unexpected exception type java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1538) java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1025) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:159) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:744) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Spark SQL parser bug?
Hi Chen, Thanks for looking into this. It looks like the bug may be in the Spark Cassandra connector code. Table x is a table in Cassandra. However, while trying to troubleshoot this issue, I noticed another issue. This time I did not use Cassandra; instead created a table on the fly. I am not seeing the same issue, but the results do not like right. Here is a my complete Spark-shell session: Spark assembly has been built with Hive, including Datanucleus jars on classpath Welcome to __ / __/__ ___ _/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 1.1.0 /_/ Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_67) Type in expressions to have them evaluated. Type :help for more information. 14/10/10 11:05:11 WARN Utils: Your hostname, ubuntu resolves to a loopback address: 127.0.1.1; using 192.168.59.135 instead (on interface eth0) 14/10/10 11:05:11 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address 14/10/10 11:05:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Spark context available as sc. scala import org.apache.spark.sql._ import org.apache.spark.sql._ scala val sqlContext = new SQLContext(sc) sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@2be5c74d scala import sqlContext.createSchemaRDD import sqlContext.createSchemaRDD scala case class X(a: Int, ts: java.sql.Timestamp) defined class X scala val rdd = sc.parallelize( 1 to 5).map{ n = X(n, new java.sql.Timestamp(132554880L + n*8640))} rdd: org.apache.spark.rdd.RDD[X] = MappedRDD[1] at map at console:20 scala rdd.collect res0: Array[X] = Array(X(1,2012-01-03 16:00:00.0), X(2,2012-01-04 16:00:00.0), X(3,2012-01-05 16:00:00.0), X(4,2012-01-06 16:00:00.0), X(5,2012-01-07 16:00:00.0)) scala rdd.registerTempTable(x) scala val sRdd = sqlContext.sql(select a from x where ts = '2012-01-01T00:00:00';) sRdd: org.apache.spark.sql.SchemaRDD = SchemaRDD[4] at RDD at SchemaRDD.scala:103 == Query Plan == == Physical Plan == Project [a#0] ExistingRdd [a#0,ts#1], MapPartitionsRDD[6] at mapPartitions at basicOperators.scala:208 scala sRdd.collect res2: Array[org.apache.spark.sql.Row] = Array() Mohammed From: Cheng Lian [mailto:lian.cs@gmail.com] Sent: Friday, October 10, 2014 4:37 AM To: Mohammed Guller; user@spark.apache.org Subject: Re: Spark SQL parser bug? Hi Mohammed, Would you mind to share the DDL of the table x and the complete stacktrace of the exception you got? A full Spark shell session history would be more than helpful. PR #2084 had been merged in master in Aug, and timestamp type is supported in 1.1. I tried the following snippets in Spark shell (v1.1), and didn’t observe this issue: scala import org.apache.spark.sql._ import org.apache.spark.sql._ scala import sc._ import sc._ scala val sqlContext = new SQLContext(sc) sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@6c3441c5mailto:org.apache.spark.sql.SQLContext@6c3441c5 scala import sqlContext._ import sqlContext._ scala case class Record(a: Int, ts: java.sql.Timestamp) defined class Record scala makeRDD(Seq.empty[Record], 1).registerTempTable(x) scala sql(SELECT a FROM x WHERE ts = '2012-01-01T00:00:00' AND ts = '2012-03-31T23:59:59') res1: org.apache.spark.sql.SchemaRDD = SchemaRDD[3] at RDD at SchemaRDD.scala:103 == Query Plan == == Physical Plan == Project [a#0] ExistingRdd [a#0,ts#1], MapPartitionsRDD[5] at mapPartitions at basicOperators.scala:208 scala res1.collect() ... res2: Array[org.apache.spark.sql.Row] = Array() Cheng On 10/9/14 10:26 AM, Mohammed Guller wrote: Hi – When I run the following Spark SQL query in Spark-shell ( version 1.1.0) : val rdd = sqlContext.sql(SELECT a FROM x WHERE ts = '2012-01-01T00:00:00' AND ts = '2012-03-31T23:59:59' ) it gives the following error: rdd: org.apache.spark.sql.SchemaRDD = SchemaRDD[294] at RDD at SchemaRDD.scala:103 == Query Plan == == Physical Plan == java.util.NoSuchElementException: head of empty list The ts column in the where clause has timestamp data and is of type timestamp. If I replace the string '2012-01-01T00:00:00' in the where clause with its epoch value, then the query works fine. It looks I have run into an issue described in this pull request: https://github.com/apache/spark/pull/2084 Is that PR not merged in Spark version 1.1.0? Or am I missing something? Thanks, Mohammed
Re: Akka disassociation on Java SE Embedded
How do you increase the spark block manager timeout? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Akka-disassociation-on-Java-SE-Embedded-tp6266p16176.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Akka disassociation on Java SE Embedded
https://github.com/CodingCat/spark/commit/c5cee24689ac4ad1187244e6a16537452e99e771 -- Nan Zhu On Friday, October 10, 2014 at 4:31 PM, bhusted wrote: How do you increase the spark block manager timeout? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Akka-disassociation-on-Java-SE-Embedded-tp6266p16176.html Sent from the Apache Spark User List mailing list archive at Nabble.com (http://Nabble.com). - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org (mailto:user-unsubscr...@spark.apache.org) For additional commands, e-mail: user-h...@spark.apache.org (mailto:user-h...@spark.apache.org)
Issue with java spark broadcast
I'm trying to broadcast an accumulator I generated earlier in my app. However I get a nullpointer exception whenever I reference the value. // The start of my accumulator generation LookupKeyToIntMap keyToIntMapper = new LookupKeyToIntMap(); keyToIntMapper.setNumPartitions(intermediatePair.splits().size()); keyToIntMapper.setMapAccumulator(keyToIntMap); JavaRDDTuple2Integer,IterableLong intermediateIntsTuple = intermediatePair.mapPartitionsWithIndex(keyToIntMapper,false); JavaPairRDDInteger,IterableLong intermediatePairInts = JavaPairRDD.fromJavaRDD(intermediateIntsTuple); JavaPairRDDInteger,Tuple2Integer,Integer sims = intermediatePairInts.mapValues(new SelfSim()); // I force the RDD to evaluate so to avoid laziness issues MapInteger,Tuple2Integer,Integer simsMap = sims.collectAsMap(); // Broadcast the map // If I include a print statement here on the accumulator I can print the map out succesfully broadcastVar = ctx.broadcast(keyToIntMap.value()); // Here I try to access the broadcasted map JavaPairRDDInteger,Long indidIntKeyPair = indidKeyPairFiltered.mapToPair(new PairFunctionTuple2String,Long, Integer, Long(){ @Override public Tuple2Integer,Long call(Tuple2String,Long keyVal) throws Exception{ Integer outInt = broadcastVar.value().inverse().get(keyVal._1); return new Tuple2Integer,Long(outInt,keyVal._2); } }); This works when I run it locally just fine but when I move it to a cluster environment it throws nullpointerexceptions. My questions is why can't I access this map? And what do I have to do to make it accessible. Thanks, Jacob -Original Message- From: user-h...@spark.apache.org [mailto:user-h...@spark.apache.org] Sent: Friday, October 10, 2014 4:02 PM To: Jacob Maloney Subject: FAQ for user@spark.apache.org Hi! This is the ezmlm program. I'm managing the user@spark.apache.org mailing list. FAQ - Frequently asked questions of the user@spark.apache.org list. None available yet. --- Administrative commands for the user list --- I can handle administrative requests automatically. Please do not send them to the list address! Instead, send your message to the correct command address: To subscribe to the list, send a message to: user-subscr...@spark.apache.org To remove your address from the list, send a message to: user-unsubscr...@spark.apache.org Send mail to the following for info and FAQ for this list: user-i...@spark.apache.org user-...@spark.apache.org Similar addresses exist for the digest list: user-digest-subscr...@spark.apache.org user-digest-unsubscr...@spark.apache.org To get messages 123 through 145 (a maximum of 100 per request), mail: user-get.123_...@spark.apache.org To get an index with subject and author for messages 123-456 , mail: user-index.123_...@spark.apache.org They are always returned as sets of 100, max 2000 per request, so you'll actually get 100-499. To receive all messages with the same subject as message 12345, send a short message to: user-thread.12...@spark.apache.org The messages should contain one line or word of text to avoid being treated as sp@m, but I will ignore their content. Only the ADDRESS you send to is important. You can start a subscription for an alternate address, for example john@host.domain, just add a hyphen and your address (with '=' instead of '@') after the command word: user-subscribe-john=host.dom...@spark.apache.org To stop subscription for this address, mail: user-unsubscribe-john=host.dom...@spark.apache.org In both cases, I'll send a confirmation message to that address. When you receive it, simply reply to it to complete your subscription. If despite following these instructions, you do not get the desired results, please contact my owner at user-ow...@spark.apache.org. Please be patient, my owner is a lot slower than I am ;-) --- Enclosed is a copy of the request I received. Return-Path: jmalo...@conversantmedia.com Received: (qmail 26980 invoked by uid 99); 10 Oct 2014 21:02:15 - Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 10 Oct 2014 21:02:15 + X-ASF-Spam-Status: No, hits=2.2 required=5.0 tests=HTML_MESSAGE,SPF_PASS,T_FILL_THIS_FORM_SHORT X-Spam-Check-By: apache.org Received-SPF: pass (nike.apache.org: domain of jmalo...@conversantmedia.com designates 69.8.121.83 as permitted sender) Received: from [69.8.121.83] (HELO ord-smtp.vclk.net) (69.8.121.83) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 10 Oct 2014 21:01:49
Re: getting tweets for a specified handle
Thanks. I made the change and ran the code. But I dont get any tweets for my handle, although I do see the tweets when I search for it on twitter. Does Spark allow us to get the tweets from the past (say the last 100 tweets? tweets that appeared in the last 10 minutes)? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/getting-tweets-for-a-specified-handle-tp16085p16180.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark Streaming KafkaUtils Issue
Hi Folks, I am seeing some strange behavior when using the Spark Kafka connector in Spark streaming. I have a Kafka topic which has 8 partitions. I have a kafka producer that pumps some messages into this topic. On the consumer side I have a spark streaming application that that has 8 executors on 8 worker nodes and 8 ReceiverInputDStream with the same kafka group id connected to the 8 partitions I have for the topic. Also the kafka consumer property auto.offset.reset is set to smallest. Now here is the sequence of steps - (1) I Start the the spark streaming app. (2) Start the producer. As this point I see the messages that are being pumped from the producer in Spark Streaming. Then I - (1) Stopped the producer (2) Wait for all the message to be consumed. (2) Stopped the spark streaming app. Now when I restart the spark streaming app (note - the producer is still down and no messages are being pumped into the topic) - I observe the following - (1) Spark Streaming starts reading from each partition right from the beginning. This is not what I was expecting. I was expecting the consumers started by spark streaming to start from where it left off Is my assumption not correct that the consumers (the kafka/spark connector) to start reading from the topic where it last left off...? Has anyone else seen this behavior? Is there a way to make it such that it starts from where it left off? Regards, - Abraham
Re: Spark Streaming KafkaUtils Issue
Would you mind sharing the code leading to your createStream? Are you also setting group.id? Thanks, Sean On Oct 10, 2014, at 4:31 PM, Abraham Jacob abe.jac...@gmail.com wrote: Hi Folks, I am seeing some strange behavior when using the Spark Kafka connector in Spark streaming. I have a Kafka topic which has 8 partitions. I have a kafka producer that pumps some messages into this topic. On the consumer side I have a spark streaming application that that has 8 executors on 8 worker nodes and 8 ReceiverInputDStream with the same kafka group id connected to the 8 partitions I have for the topic. Also the kafka consumer property auto.offset.reset is set to smallest. Now here is the sequence of steps - (1) I Start the the spark streaming app. (2) Start the producer. As this point I see the messages that are being pumped from the producer in Spark Streaming. Then I - (1) Stopped the producer (2) Wait for all the message to be consumed. (2) Stopped the spark streaming app. Now when I restart the spark streaming app (note - the producer is still down and no messages are being pumped into the topic) - I observe the following - (1) Spark Streaming starts reading from each partition right from the beginning. This is not what I was expecting. I was expecting the consumers started by spark streaming to start from where it left off Is my assumption not correct that the consumers (the kafka/spark connector) to start reading from the topic where it last left off...? Has anyone else seen this behavior? Is there a way to make it such that it starts from where it left off? Regards, - Abraham - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming KafkaUtils Issue
Sure... I do set the group.id for all the consumers to be the same. Here is the code --- SparkConf sparkConf = new SparkConf().setMaster(yarn-cluster).setAppName(Streaming WordCount); sparkConf.set(spark.shuffle.manager, SORT); sparkConf.set(spark.streaming.unpersist, true); JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(1000)); MapString, String kafkaConf = new HashMapString, String(); kafkaConf.put(zookeeper.connect, zookeeper); kafkaConf.put(group.id, consumerGrp); kafkaConf.put(auto.offset.reset, smallest); kafkaConf.put(zookeeper.conection.timeout.ms, 1000); kafkaConf.put(rebalance.max.retries, 4); kafkaConf.put(rebalance.backoff.ms, 3000); MapString, Integer topicMap = new HashMapString, Integer(); topicMap.put(topic, 1); ListJavaPairDStreambyte[], String kafkaStreams = new ArrayListJavaPairDStreambyte[], String(); for(int i = 0; i numPartitions; i++) { kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, String.class, DefaultDecoder.class, PayloadDeSerializer.class, kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER()).mapToPair(new PairFunctionTuple2byte[],String, byte[], String() { private static final long serialVersionUID = -1936810126415608167L; public Tuple2byte[], String call(Tuple2byte[], String tuple2) throws Exception { return tuple2; } } ) ); } JavaPairDStreambyte[], String unifiedStream; if (kafkaStreams.size() 1) { unifiedStream = jssc.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size())); } else { unifiedStream = kafkaStreams.get(0); } unifiedStream.print(); jssc.start(); jssc.awaitTermination(); -abe On Fri, Oct 10, 2014 at 3:37 PM, Sean McNamara sean.mcnam...@webtrends.com wrote: Would you mind sharing the code leading to your createStream? Are you also setting group.id? Thanks, Sean On Oct 10, 2014, at 4:31 PM, Abraham Jacob abe.jac...@gmail.com wrote: Hi Folks, I am seeing some strange behavior when using the Spark Kafka connector in Spark streaming. I have a Kafka topic which has 8 partitions. I have a kafka producer that pumps some messages into this topic. On the consumer side I have a spark streaming application that that has 8 executors on 8 worker nodes and 8 ReceiverInputDStream with the same kafka group id connected to the 8 partitions I have for the topic. Also the kafka consumer property auto.offset.reset is set to smallest. Now here is the sequence of steps - (1) I Start the the spark streaming app. (2) Start the producer. As this point I see the messages that are being pumped from the producer in Spark Streaming. Then I - (1) Stopped the producer (2) Wait for all the message to be consumed. (2) Stopped the spark streaming app. Now when I restart the spark streaming app (note - the producer is still down and no messages are being pumped into the topic) - I observe the following - (1) Spark Streaming starts reading from each partition right from the beginning. This is not what I was expecting. I was expecting the consumers started by spark streaming to start from where it left off Is my assumption not correct that the consumers (the kafka/spark connector) to start reading from the topic where it last left off...? Has anyone else seen this behavior? Is there a way to make it such that it starts from where it left off? Regards, - Abraham -- ~
Re: Spark Streaming KafkaUtils Issue
How long do you let the consumers run for? Is it less than 60 seconds by chance? auto.commit.interval.ms defaults to 6 (60 seconds). If so that may explain why you are seeing that behavior. Cheers, Sean On Oct 10, 2014, at 4:47 PM, Abraham Jacob abe.jac...@gmail.commailto:abe.jac...@gmail.com wrote: Sure... I do set the group.idhttp://group.id/ for all the consumers to be the same. Here is the code --- SparkConf sparkConf = new SparkConf().setMaster(yarn-cluster).setAppName(Streaming WordCount); sparkConf.set(spark.shuffle.manager, SORT); sparkConf.set(spark.streaming.unpersist, true); JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(1000)); MapString, String kafkaConf = new HashMapString, String(); kafkaConf.put(zookeeper.connect, zookeeper); kafkaConf.put(group.idhttp://group.id/, consumerGrp); kafkaConf.put(auto.offset.reset, smallest); kafkaConf.put(zookeeper.conection.timeout.mshttp://zookeeper.conection.timeout.ms/, 1000); kafkaConf.put(rebalance.max.retries, 4); kafkaConf.put(rebalance.backoff.mshttp://rebalance.backoff.ms/, 3000); MapString, Integer topicMap = new HashMapString, Integer(); topicMap.put(topic, 1); ListJavaPairDStreambyte[], String kafkaStreams = new ArrayListJavaPairDStreambyte[], String(); for(int i = 0; i numPartitions; i++) { kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, String.class, DefaultDecoder.class, PayloadDeSerializer.class, kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER()).mapToPair(new PairFunctionTuple2byte[],String, byte[], String() { private static final long serialVersionUID = -1936810126415608167L; public Tuple2byte[], String call(Tuple2byte[], String tuple2) throws Exception { return tuple2; } } ) ); } JavaPairDStreambyte[], String unifiedStream; if (kafkaStreams.size() 1) { unifiedStream = jssc.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size())); } else { unifiedStream = kafkaStreams.get(0); } unifiedStream.print(); jssc.start(); jssc.awaitTermination(); -abe On Fri, Oct 10, 2014 at 3:37 PM, Sean McNamara sean.mcnam...@webtrends.commailto:sean.mcnam...@webtrends.com wrote: Would you mind sharing the code leading to your createStream? Are you also setting group.idhttp://group.id/? Thanks, Sean On Oct 10, 2014, at 4:31 PM, Abraham Jacob abe.jac...@gmail.commailto:abe.jac...@gmail.com wrote: Hi Folks, I am seeing some strange behavior when using the Spark Kafka connector in Spark streaming. I have a Kafka topic which has 8 partitions. I have a kafka producer that pumps some messages into this topic. On the consumer side I have a spark streaming application that that has 8 executors on 8 worker nodes and 8 ReceiverInputDStream with the same kafka group id connected to the 8 partitions I have for the topic. Also the kafka consumer property auto.offset.reset is set to smallest. Now here is the sequence of steps - (1) I Start the the spark streaming app. (2) Start the producer. As this point I see the messages that are being pumped from the producer in Spark Streaming. Then I - (1) Stopped the producer (2) Wait for all the message to be consumed. (2) Stopped the spark streaming app. Now when I restart the spark streaming app (note - the producer is still down and no messages are being pumped into the topic) - I observe the following - (1) Spark Streaming starts reading from each partition right from the beginning. This is not what I was expecting. I was expecting the consumers started by spark streaming to start from where it left off Is my assumption not correct that the consumers (the kafka/spark connector) to start reading from the topic where it last left off...? Has anyone else seen this behavior? Is there a way to make it such that it starts from where it left off? Regards, - Abraham -- ~
Running Example in local mode with input files
Hi folks, I have just upgraded to Spark 1.1.0, and try some examples like: ./run-example SparkPageRank pagerank_data.txt 5 It turns out that Spark keeps trying to connect to my name node and read the file from HDFS other than local FS: Client: Retrying connect to server: Node1/192.168.0.101:9000. Already tried 0 time(s) Even if I use file:// in my data file path, the issue still comes. This does not happen in spark-shell. Is there anything I am missing in configurations or the way I specified the path? Thanks, Max -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Running-Example-in-local-mode-with-input-files-tp16186.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
how to find the sources for spark-project
We have our own customization on top of parquet serde that we've been using for hive. In order to make it work with spark-sql, we need to be able to re-build spark with this. It'll be much easier to rebuild spark with this patch once I can find the sources for org.spark-project.hive. Not sure where to find it ? This seems like the place where we need to put our patch. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/how-to-find-the-sources-for-spark-project-tp16187.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: where are my python lambda functions run in yarn-client mode?
Thank you! I was looking for a config variable to that end, but I was looking in Spark 1.0.2 documentation, since that was the version I had the problem with. Is this behavior documented in 1.0.2's documentation? Evan On 10/09/2014 04:12 PM, Davies Liu wrote: When you call rdd.take() or rdd.first(), it may[1] executor the job locally (in driver), otherwise, all the jobs are executed in cluster. There is config called `spark.localExecution.enabled` (since 1.1+) to change this, it's not enabled by default, so all the functions will be executed in cluster. If you change set this to `true`, then you get the same behavior as 1.0. [1] If it did not get enough items from the first partitions, it will try multiple partitions in a time, so they will be executed in cluster. On Thu, Oct 9, 2014 at 12:14 PM, esamanas evan.sama...@gmail.com wrote: Hi, I am using pyspark and I'm trying to support both Spark 1.0.2 and 1.1.0 with my app, which will run in yarn-client mode. However, it appears when I use 'map' to run a python lambda function over an RDD, they appear to be run on different machines, and this is causing problems. In both cases, I am using a Hadoop cluster that runs linux on all of its nodes. I am submitting my jobs with a machine running Mac OS X 10.9. As a reproducer, here is my script: import platform print sc.parallelize([1]).map(lambda x: platform.system()).take(1)[0] The answer in Spark 1.1.0: 'Linux' The answer in Spark 1.0.2: 'Darwin' In other experiments I changed the size of the list that gets parallelized, thinking maybe 1.0.2 just runs jobs on the driver node if they're small enough. I got the same answer (with only 1 million numbers). This is a troubling difference. I would expect all functions run on an RDD to be executed on my worker nodes in the Hadoop cluster, but this is clearly not the case for 1.0.2. Why does this difference exist? How can I accurately detect which jobs will run where? Thank you, Evan -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/where-are-my-python-lambda-functions-run-in-yarn-client-mode-tp16059.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
small bug in pyspark
Hi I am running spark on an ec2 cluster. I need to update python to 2.7. I have been following the directions on http://nbviewer.ipython.org/gist/JoshRosen/6856670 https://issues.apache.org/jira/browse/SPARK-922 I noticed that when I start a shell using pyspark, I correctly got python2.7, how ever when I tried to start a notebook I got python2.6 change exec ipython $IPYTHON_OPTS to exec ipython2 $IPYTHON_OPTS One clean way to resolve this would be to add another environmental variable like PYSPARK_PYTHON Andy P.s. Matplotlab does not upgrade because of dependency problems. I¹ll let you know once I get this resolved
RE: Spark Streaming KafkaUtils Issue
Hi Abraham, You are correct, the “auto.offset.reset“ behavior in KafkaReceiver is different from original Kafka’s semantics, if you set this configure, KafkaReceiver will clean the related immediately, but for Kafka this configuration is just a hint which will be effective only when offset is out-of-range. So you will always read data from the beginning as you set to “smallest”, otherwise if you set to “largest”, you will always get data from the end immediately. There’s a JIRA and PR to follow this, but still not merged to the master, you can check to see it (https://issues.apache.org/jira/browse/SPARK-2492). Thanks Jerry From: Abraham Jacob [mailto:abe.jac...@gmail.com] Sent: Saturday, October 11, 2014 6:57 AM To: Sean McNamara Cc: user@spark.apache.org Subject: Re: Spark Streaming KafkaUtils Issue Probably this is the issue - http://www.michael-noll.com/blog/2014/10/01/kafka-spark-streaming-integration-example-tutorial/ ·Spark’s usage of the Kafka consumer parameter auto.offset.resethttp://kafka.apache.org/documentation.html#consumerconfigs is different from Kafka’s semantics. In Kafka, the behavior of setting auto.offset.reset to “smallest” is that the consumer will automatically reset the offset to the smallest offset when a) there is no existing offset stored in ZooKeeper or b) there is an existing offset but it is out of range. Spark however will always remove existing offsets and then start all the way from zero again. This means whenever you restart your application with auto.offset.reset = smallest, your application will completely re-process all available Kafka data. Doh! See this discussionhttp://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-and-the-spark-shell-tp3347p3387.htmland that discussionhttp://markmail.org/message/257a5l3oqyftsjxj. Hmm interesting... Wondering what happens if I set it as largest...? On Fri, Oct 10, 2014 at 3:47 PM, Abraham Jacob abe.jac...@gmail.commailto:abe.jac...@gmail.com wrote: Sure... I do set the group.idhttp://group.id for all the consumers to be the same. Here is the code --- SparkConf sparkConf = new SparkConf().setMaster(yarn-cluster).setAppName(Streaming WordCount); sparkConf.set(spark.shuffle.manager, SORT); sparkConf.set(spark.streaming.unpersist, true); JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(1000)); MapString, String kafkaConf = new HashMapString, String(); kafkaConf.put(zookeeper.connect, zookeeper); kafkaConf.put(group.idhttp://group.id, consumerGrp); kafkaConf.put(auto.offset.reset, smallest); kafkaConf.put(zookeeper.conection.timeout.mshttp://zookeeper.conection.timeout.ms, 1000); kafkaConf.put(rebalance.max.retries, 4); kafkaConf.put(rebalance.backoff.mshttp://rebalance.backoff.ms, 3000); MapString, Integer topicMap = new HashMapString, Integer(); topicMap.put(topic, 1); ListJavaPairDStreambyte[], String kafkaStreams = new ArrayListJavaPairDStreambyte[], String(); for(int i = 0; i numPartitions; i++) { kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, String.class, DefaultDecoder.class, PayloadDeSerializer.class, kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER()).mapToPair(new PairFunctionTuple2byte[],String, byte[], String() { private static final long serialVersionUID = -1936810126415608167L; public Tuple2byte[], String call(Tuple2byte[], String tuple2) throws Exception { return tuple2; } } ) ); } JavaPairDStreambyte[], String unifiedStream; if (kafkaStreams.size() 1) { unifiedStream = jssc.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size())); } else { unifiedStream = kafkaStreams.get(0); } unifiedStream.print(); jssc.start(); jssc.awaitTermination(); -abe On Fri, Oct 10, 2014 at 3:37 PM, Sean McNamara sean.mcnam...@webtrends.commailto:sean.mcnam...@webtrends.com wrote: Would you mind sharing the code leading to your createStream? Are you also setting group.idhttp://group.id? Thanks, Sean On Oct 10, 2014, at 4:31 PM, Abraham Jacob abe.jac...@gmail.commailto:abe.jac...@gmail.com wrote: Hi Folks, I am seeing some strange behavior when using the Spark Kafka connector in Spark streaming. I have a Kafka topic which has 8 partitions. I have a kafka producer that pumps some messages into this topic. On the consumer side I have a spark streaming application that that has 8 executors on 8 worker nodes and 8 ReceiverInputDStream with the same kafka group id connected to the 8 partitions I have for the topic. Also the kafka consumer property auto.offset.reset is set to smallest. Now here is the sequence of steps - (1) I Start the the spark streaming app. (2) Start the producer. As this point I see the messages that are being pumped from the producer in Spark Streaming. Then I - (1) Stopped the producer (2) Wait for all the message to be consumed. (2) Stopped the spark streaming app. Now when I restart the spark streaming app (note - the
Re: Spark Streaming KafkaUtils Issue
Thanks Jerry, So, from what I can understand from the code, if I leave out auto.offset.reset, it should theoretically read from the last commit point... Correct? -abe On Fri, Oct 10, 2014 at 5:40 PM, Shao, Saisai saisai.s...@intel.com wrote: Hi Abraham, You are correct, the “auto.offset.reset“ behavior in KafkaReceiver is different from original Kafka’s semantics, if you set this configure, KafkaReceiver will clean the related immediately, but for Kafka this configuration is just a hint which will be effective only when offset is out-of-range. So you will always read data from the beginning as you set to “smallest”, otherwise if you set to “largest”, you will always get data from the end immediately. There’s a JIRA and PR to follow this, but still not merged to the master, you can check to see it (https://issues.apache.org/jira/browse/SPARK-2492 ). Thanks Jerry *From:* Abraham Jacob [mailto:abe.jac...@gmail.com] *Sent:* Saturday, October 11, 2014 6:57 AM *To:* Sean McNamara *Cc:* user@spark.apache.org *Subject:* Re: Spark Streaming KafkaUtils Issue Probably this is the issue - http://www.michael-noll.com/blog/2014/10/01/kafka-spark-streaming-integration-example-tutorial/ ·Spark’s usage of the Kafka consumer parameter auto.offset.reset http://kafka.apache.org/documentation.html#consumerconfigs is different from Kafka’s semantics. In Kafka, the behavior of setting auto.offset.reset to “smallest” is that the consumer will automatically reset the offset to the smallest offset when a) there is no existing offset stored in ZooKeeper or b) there is an existing offset but it is out of range. Spark however will *always* remove existing offsets and then start all the way from zero again. This means whenever you restart your application with auto.offset.reset = smallest, your application will completely re-process all available Kafka data. Doh! See this discussion http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-and-the-spark-shell-tp3347p3387.html and that discussion http://markmail.org/message/257a5l3oqyftsjxj. Hmm interesting... Wondering what happens if I set it as largest...? On Fri, Oct 10, 2014 at 3:47 PM, Abraham Jacob abe.jac...@gmail.com wrote: Sure... I do set the group.id for all the consumers to be the same. Here is the code --- SparkConf sparkConf = new SparkConf().setMaster(yarn-cluster).setAppName(Streaming WordCount); sparkConf.set(spark.shuffle.manager, SORT); sparkConf.set(spark.streaming.unpersist, true); JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(1000)); MapString, String kafkaConf = new HashMapString, String(); kafkaConf.put(zookeeper.connect, zookeeper); kafkaConf.put(group.id, consumerGrp); kafkaConf.put(auto.offset.reset, smallest); kafkaConf.put(zookeeper.conection.timeout.ms, 1000); kafkaConf.put(rebalance.max.retries, 4); kafkaConf.put(rebalance.backoff.ms, 3000); MapString, Integer topicMap = new HashMapString, Integer(); topicMap.put(topic, 1); ListJavaPairDStreambyte[], String kafkaStreams = new ArrayListJavaPairDStreambyte[], String(); for(int i = 0; i numPartitions; i++) { kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, String.class, DefaultDecoder.class, PayloadDeSerializer.class, kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER()).mapToPair(new PairFunctionTuple2byte[],String, byte[], String() { private static final long serialVersionUID = -1936810126415608167L; public Tuple2byte[], String call(Tuple2byte[], String tuple2) throws Exception { return tuple2; } } ) ); } JavaPairDStreambyte[], String unifiedStream; if (kafkaStreams.size() 1) { unifiedStream = jssc.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size())); } else { unifiedStream = kafkaStreams.get(0); } unifiedStream.print(); jssc.start(); jssc.awaitTermination(); -abe On Fri, Oct 10, 2014 at 3:37 PM, Sean McNamara sean.mcnam...@webtrends.com wrote: Would you mind sharing the code leading to your createStream? Are you also setting group.id? Thanks, Sean On Oct 10, 2014, at 4:31 PM, Abraham Jacob abe.jac...@gmail.com wrote: Hi Folks, I am seeing some strange behavior when using the Spark Kafka connector in Spark streaming. I have a Kafka topic which has 8 partitions. I have a kafka producer that pumps some messages into this topic. On the consumer side I have a spark streaming application that that has 8 executors on 8 worker nodes and 8 ReceiverInputDStream with the same kafka group id connected to the 8 partitions I have for the topic. Also the kafka consumer property auto.offset.reset is set to smallest. Now here is the sequence of steps - (1) I Start the the spark streaming app. (2) Start the producer. As this point I see the messages that are being pumped from the
Maryland Meetup
Please add the Maryland Spark meetup to the Spark website http://www.meetup.com/Apache-Spark-Maryland/ Thanks Brian Husted
Re: Spark Streaming KafkaUtils Issue
This jira and comment sums up the issue: https://issues.apache.org/jira/browse/SPARK-2492?focusedCommentId=14069708page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14069708 Basically the offset param was renamed and had slightly different semantics between kafka 0.7 than 0.8. Also it was useful because earlier versions of the spark streaming receiver could be overwhelmed when having a streaming job down for a period of time. I think this PR quite nicely addresses the issue: https://github.com/apache/spark/pull/1420 Best, Sean On Oct 10, 2014, at 6:48 PM, Abraham Jacob abe.jac...@gmail.com wrote: Thanks Jerry, So, from what I can understand from the code, if I leave out auto.offset.reset, it should theoretically read from the last commit point... Correct? -abe On Fri, Oct 10, 2014 at 5:40 PM, Shao, Saisai saisai.s...@intel.commailto:saisai.s...@intel.com wrote: Hi Abraham, You are correct, the “auto.offset.reset“ behavior in KafkaReceiver is different from original Kafka’s semantics, if you set this configure, KafkaReceiver will clean the related immediately, but for Kafka this configuration is just a hint which will be effective only when offset is out-of-range. So you will always read data from the beginning as you set to “smallest”, otherwise if you set to “largest”, you will always get data from the end immediately. There’s a JIRA and PR to follow this, but still not merged to the master, you can check to see it (https://issues.apache.org/jira/browse/SPARK-2492). Thanks Jerry From: Abraham Jacob [mailto:abe.jac...@gmail.commailto:abe.jac...@gmail.com] Sent: Saturday, October 11, 2014 6:57 AM To: Sean McNamara Cc: user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Spark Streaming KafkaUtils Issue Probably this is the issue - http://www.michael-noll.com/blog/2014/10/01/kafka-spark-streaming-integration-example-tutorial/ •Spark’s usage of the Kafka consumer parameter auto.offset.resethttp://kafka.apache.org/documentation.html#consumerconfigs is different from Kafka’s semantics. In Kafka, the behavior of setting auto.offset.reset to “smallest” is that the consumer will automatically reset the offset to the smallest offset when a) there is no existing offset stored in ZooKeeper or b) there is an existing offset but it is out of range. Spark however will always remove existing offsets and then start all the way from zero again. This means whenever you restart your application with auto.offset.reset = smallest, your application will completely re-process all available Kafka data. Doh! See this discussionhttp://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-and-the-spark-shell-tp3347p3387.htmland that discussionhttp://markmail.org/message/257a5l3oqyftsjxj. Hmm interesting... Wondering what happens if I set it as largest...? On Fri, Oct 10, 2014 at 3:47 PM, Abraham Jacob abe.jac...@gmail.commailto:abe.jac...@gmail.com wrote: Sure... I do set the group.idhttp://group.id/ for all the consumers to be the same. Here is the code --- SparkConf sparkConf = new SparkConf().setMaster(yarn-cluster).setAppName(Streaming WordCount); sparkConf.set(spark.shuffle.manager, SORT); sparkConf.set(spark.streaming.unpersist, true); JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(1000)); MapString, String kafkaConf = new HashMapString, String(); kafkaConf.put(zookeeper.connect, zookeeper); kafkaConf.put(group.idhttp://group.id/, consumerGrp); kafkaConf.put(auto.offset.reset, smallest); kafkaConf.put(zookeeper.conection.timeout.mshttp://zookeeper.conection.timeout.ms/, 1000); kafkaConf.put(rebalance.max.retries, 4); kafkaConf.put(rebalance.backoff.mshttp://rebalance.backoff.ms/, 3000); MapString, Integer topicMap = new HashMapString, Integer(); topicMap.put(topic, 1); ListJavaPairDStreambyte[], String kafkaStreams = new ArrayListJavaPairDStreambyte[], String(); for(int i = 0; i numPartitions; i++) { kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, String.class, DefaultDecoder.class, PayloadDeSerializer.class, kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER()).mapToPair(new PairFunctionTuple2byte[],String, byte[], String() { private static final long serialVersionUID = -1936810126415608167L; public Tuple2byte[], String call(Tuple2byte[], String tuple2) throws Exception { return tuple2; } } ) ); } JavaPairDStreambyte[], String unifiedStream; if (kafkaStreams.size() 1) { unifiedStream = jssc.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size())); } else { unifiedStream = kafkaStreams.get(0); } unifiedStream.print(); jssc.start(); jssc.awaitTermination(); -abe On Fri, Oct 10, 2014 at 3:37 PM, Sean McNamara sean.mcnam...@webtrends.commailto:sean.mcnam...@webtrends.com wrote: Would you mind sharing the code leading to your createStream? Are you also setting group.idhttp://group.id/? Thanks, Sean On Oct 10, 2014,
RE: Spark Streaming KafkaUtils Issue
Hi abe, You can see the details in KafkaInputDStream.scala, here is the snippet // When auto.offset.reset is defined, it is our responsibility to try and whack the // consumer group zk node. if (kafkaParams.contains(auto.offset.reset)) { tryZookeeperConsumerGroupCleanup(zkConnect, kafkaParams(group.id)) } KafkaReceiver will check your kafkaParams, if “auto.offset.reset” is set, it will clean ZK metadata immediately, so you will always read data from beginning (set to “smallest”) and end (set to “largest”) immediately, because the ZK metadata is deleted beforehand. If you do not set this parameter, this code path will not be triggered, so data will be read from the last commit point. And if last commit point is not yet available, Kafka will move the offset to the end of partition (Kafka is set “auto.commit.offset” to “largest” by default). If you want to keep the same semantics as Kafka, you need to remove the above code path manually and recompile the Spark. Thanks Jerry From: Abraham Jacob [mailto:abe.jac...@gmail.com] Sent: Saturday, October 11, 2014 8:49 AM To: Shao, Saisai Cc: user@spark.apache.org; Sean McNamara Subject: Re: Spark Streaming KafkaUtils Issue Thanks Jerry, So, from what I can understand from the code, if I leave out auto.offset.reset, it should theoretically read from the last commit point... Correct? -abe On Fri, Oct 10, 2014 at 5:40 PM, Shao, Saisai saisai.s...@intel.commailto:saisai.s...@intel.com wrote: Hi Abraham, You are correct, the “auto.offset.reset“ behavior in KafkaReceiver is different from original Kafka’s semantics, if you set this configure, KafkaReceiver will clean the related immediately, but for Kafka this configuration is just a hint which will be effective only when offset is out-of-range. So you will always read data from the beginning as you set to “smallest”, otherwise if you set to “largest”, you will always get data from the end immediately. There’s a JIRA and PR to follow this, but still not merged to the master, you can check to see it (https://issues.apache.org/jira/browse/SPARK-2492). Thanks Jerry From: Abraham Jacob [mailto:abe.jac...@gmail.commailto:abe.jac...@gmail.com] Sent: Saturday, October 11, 2014 6:57 AM To: Sean McNamara Cc: user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Spark Streaming KafkaUtils Issue Probably this is the issue - http://www.michael-noll.com/blog/2014/10/01/kafka-spark-streaming-integration-example-tutorial/ •Spark’s usage of the Kafka consumer parameter auto.offset.resethttp://kafka.apache.org/documentation.html#consumerconfigs is different from Kafka’s semantics. In Kafka, the behavior of setting auto.offset.reset to “smallest” is that the consumer will automatically reset the offset to the smallest offset when a) there is no existing offset stored in ZooKeeper or b) there is an existing offset but it is out of range. Spark however will always remove existing offsets and then start all the way from zero again. This means whenever you restart your application with auto.offset.reset = smallest, your application will completely re-process all available Kafka data. Doh! See this discussionhttp://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-and-the-spark-shell-tp3347p3387.htmland that discussionhttp://markmail.org/message/257a5l3oqyftsjxj. Hmm interesting... Wondering what happens if I set it as largest...? On Fri, Oct 10, 2014 at 3:47 PM, Abraham Jacob abe.jac...@gmail.commailto:abe.jac...@gmail.com wrote: Sure... I do set the group.idhttp://group.id for all the consumers to be the same. Here is the code --- SparkConf sparkConf = new SparkConf().setMaster(yarn-cluster).setAppName(Streaming WordCount); sparkConf.set(spark.shuffle.manager, SORT); sparkConf.set(spark.streaming.unpersist, true); JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(1000)); MapString, String kafkaConf = new HashMapString, String(); kafkaConf.put(zookeeper.connect, zookeeper); kafkaConf.put(group.idhttp://group.id, consumerGrp); kafkaConf.put(auto.offset.reset, smallest); kafkaConf.put(zookeeper.conection.timeout.mshttp://zookeeper.conection.timeout.ms, 1000); kafkaConf.put(rebalance.max.retries, 4); kafkaConf.put(rebalance.backoff.mshttp://rebalance.backoff.ms, 3000); MapString, Integer topicMap = new HashMapString, Integer(); topicMap.put(topic, 1); ListJavaPairDStreambyte[], String kafkaStreams = new ArrayListJavaPairDStreambyte[], String(); for(int i = 0; i numPartitions; i++) { kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, String.class, DefaultDecoder.class, PayloadDeSerializer.class, kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER()).mapToPair(new PairFunctionTuple2byte[],String, byte[], String() { private static final long serialVersionUID = -1936810126415608167L; public Tuple2byte[], String call(Tuple2byte[], String tuple2) throws Exception { return tuple2; } } ) ); }
Re: Spark Streaming KafkaUtils Issue
Ah I see... much clearer now... Because auto.offset.reset will trigger KafkaReciver to delete the ZK metadata; when the control passes over to Kafka consumer API it will see that there is no offset available for the partition. This then will trigger the smallest or largest logic to execute in kafka, depending on what we set for auto.offset.reset... Thanks for explaining this clearly! Appreciate your effort. On Fri, Oct 10, 2014 at 6:08 PM, Shao, Saisai saisai.s...@intel.com wrote: Hi abe, You can see the details in KafkaInputDStream.scala, here is the snippet // When auto.offset.reset is defined, it is our responsibility to try and whack the // consumer group zk node. if (kafkaParams.contains(auto.offset.reset)) { tryZookeeperConsumerGroupCleanup(zkConnect, kafkaParams(group.id)) } KafkaReceiver will check your kafkaParams, if “auto.offset.reset” is set, it will clean ZK metadata immediately, so you will always read data from beginning (set to “smallest”) and end (set to “largest”) immediately, because the ZK metadata is deleted beforehand. If you do not set this parameter, this code path will not be triggered, so data will be read from the last commit point. And if last commit point is not yet available, Kafka will move the offset to the end of partition (Kafka is set “auto.commit.offset” to “largest” by default). If you want to keep the same semantics as Kafka, you need to remove the above code path manually and recompile the Spark. Thanks Jerry *From:* Abraham Jacob [mailto:abe.jac...@gmail.com] *Sent:* Saturday, October 11, 2014 8:49 AM *To:* Shao, Saisai *Cc:* user@spark.apache.org; Sean McNamara *Subject:* Re: Spark Streaming KafkaUtils Issue Thanks Jerry, So, from what I can understand from the code, if I leave out auto.offset.reset, it should theoretically read from the last commit point... Correct? -abe On Fri, Oct 10, 2014 at 5:40 PM, Shao, Saisai saisai.s...@intel.com wrote: Hi Abraham, You are correct, the “auto.offset.reset“ behavior in KafkaReceiver is different from original Kafka’s semantics, if you set this configure, KafkaReceiver will clean the related immediately, but for Kafka this configuration is just a hint which will be effective only when offset is out-of-range. So you will always read data from the beginning as you set to “smallest”, otherwise if you set to “largest”, you will always get data from the end immediately. There’s a JIRA and PR to follow this, but still not merged to the master, you can check to see it (https://issues.apache.org/jira/browse/SPARK-2492 ). Thanks Jerry *From:* Abraham Jacob [mailto:abe.jac...@gmail.com] *Sent:* Saturday, October 11, 2014 6:57 AM *To:* Sean McNamara *Cc:* user@spark.apache.org *Subject:* Re: Spark Streaming KafkaUtils Issue Probably this is the issue - http://www.michael-noll.com/blog/2014/10/01/kafka-spark-streaming-integration-example-tutorial/ ·Spark’s usage of the Kafka consumer parameter auto.offset.reset http://kafka.apache.org/documentation.html#consumerconfigs is different from Kafka’s semantics. In Kafka, the behavior of setting auto.offset.reset to “smallest” is that the consumer will automatically reset the offset to the smallest offset when a) there is no existing offset stored in ZooKeeper or b) there is an existing offset but it is out of range. Spark however will *always* remove existing offsets and then start all the way from zero again. This means whenever you restart your application with auto.offset.reset = smallest, your application will completely re-process all available Kafka data. Doh! See this discussion http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-and-the-spark-shell-tp3347p3387.html and that discussion http://markmail.org/message/257a5l3oqyftsjxj. Hmm interesting... Wondering what happens if I set it as largest...? On Fri, Oct 10, 2014 at 3:47 PM, Abraham Jacob abe.jac...@gmail.com wrote: Sure... I do set the group.id for all the consumers to be the same. Here is the code --- SparkConf sparkConf = new SparkConf().setMaster(yarn-cluster).setAppName(Streaming WordCount); sparkConf.set(spark.shuffle.manager, SORT); sparkConf.set(spark.streaming.unpersist, true); JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(1000)); MapString, String kafkaConf = new HashMapString, String(); kafkaConf.put(zookeeper.connect, zookeeper); kafkaConf.put(group.id, consumerGrp); kafkaConf.put(auto.offset.reset, smallest); kafkaConf.put(zookeeper.conection.timeout.ms, 1000); kafkaConf.put(rebalance.max.retries, 4); kafkaConf.put(rebalance.backoff.ms, 3000); MapString, Integer topicMap = new HashMapString, Integer(); topicMap.put(topic, 1); ListJavaPairDStreambyte[], String kafkaStreams = new ArrayListJavaPairDStreambyte[], String(); for(int i = 0; i
What if I port Spark from TCP/IP to RDMA?
Hi, Let's say that I managed to port Spark from TCP/IP to RDMA. What tool or benchmark can I use to test the performance improvement? BR, Theo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Window comparison matching using the sliding window functionality: feasibility
Thanks @category_theory, the post was of great help!! I had to learn a few thing before I could understand it completely. However, I am facing the issue of partitioning the data (using partitionBy) without providing a hardcoded value for number of partitions. The partitions need to be driven by data(segmentation key I am using) in my case. So my question is say if the number of partitions generated by my segmentation key = 1000 the number given to the partitioner = 2000 In this case, would there be 2000 partitions created(which will break the partition boundary of the segmentation key)? If so then sliding window will roll over multiple partitions and computation would generate wrong results. Thanks again for the response!! On Tue, Sep 30, 2014 at 11:51 AM, category_theory [via Apache Spark User List] ml-node+s1001560n1540...@n3.nabble.com wrote: Not sure if this is what you are after but its based on a moving average within spark... I was building an ARIMA model on top of spark and this helped me out a lot: http://stackoverflow.com/questions/23402303/apache-spark-moving-average ᐧ *JIMMY MCERLAIN* DATA SCIENTIST (NERD) *. . . . . . . . . . . . . . . . . .* *IF WE CAN’T DOUBLE YOUR SALES,* *ONE OF US IS IN THE WRONG BUSINESS.* *E*: [hidden email] http://user/SendEmail.jtp?type=nodenode=15407i=0 *M*: *510.303.7751 510.303.7751* On Tue, Sep 30, 2014 at 8:19 AM, nitinkak001 [hidden email] http://user/SendEmail.jtp?type=nodenode=15407i=1 wrote: Any ideas guys? Trying to find some information online. Not much luck so far. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Window-comparison-matching-using-the-sliding-window-functionality-feasibility-tp15352p15404.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: [hidden email] http://user/SendEmail.jtp?type=nodenode=15407i=2 For additional commands, e-mail: [hidden email] http://user/SendEmail.jtp?type=nodenode=15407i=3 -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Window-comparison-matching-using-the-sliding-window-functionality-feasibility-tp15352p15407.html To unsubscribe from Window comparison matching using the sliding window functionality: feasibility, click here http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=15352code=bml0aW5rYWswMDFAZ21haWwuY29tfDE1MzUyfDEyMjcwMjA2NQ== . NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Window-comparison-matching-using-the-sliding-window-functionality-feasibility-tp15352p16201.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: add Boulder-Denver Spark meetup to list on website
Added you, thanks! (You may have to shift-refresh the page to see it updated). Matei On Oct 10, 2014, at 1:52 PM, Michael Oczkowski michael.oczkow...@seeq.com wrote: Please add the Boulder-Denver Spark meetup group to the list on the website. http://www.meetup.com/Boulder-Denver-Spark-Meetup/ Michael Oczkowski, Ph.D. Big Data Architect Chief Data Scientist Seeq Corporation 206-801-9339 x704 image001.png Transforming industrial process data into actionable intelligence Connect: LinkedIn | Facebook | Twitter
Re: IOException and appcache FileNotFoundException in Spark 1.02
Hi Akhil - I tried your suggestions and tried varying my partition sizes. Reducing the number of partitions led to memory errors (presumably - I saw IOExceptions much sooner). With the settings you provided the program ran for longer but ultimately crashes in the same way. I would like to understand what is going on internally leading to this. Could this be related to garbage collection? On Oct 10, 2014 3:19 AM, Akhil Das ak...@sigmoidanalytics.com wrote: You could be hitting this issue https://issues.apache.org/jira/browse/SPARK-3633 (or similar). You can try the following workarounds: sc.set(spark.core.connection.ack.wait.timeout,600) sc.set(spark.akka.frameSize,50) Also reduce the number of partitions, you could be hitting the kernel's ulimit. I faced this issue and it was gone when i dropped the partitions from 1600 to 200. Thanks Best Regards On Fri, Oct 10, 2014 at 5:58 AM, Ilya Ganelin ilgan...@gmail.com wrote: Hi all – I could use some help figuring out a couple of exceptions I’ve been getting regularly. I have been running on a fairly large dataset (150 gigs). With smaller datasets I don't have any issues. My sequence of operations is as follows – unless otherwise specified, I am not caching: Map a 30 million row x 70 col string table to approx 30 mil x 5 string (For read as textFile I am using 1500 partitions) From that, map to ((a,b), score) and reduceByKey, numPartitions = 180 Then, extract distinct values for A and distinct values for B. (I cache the output of distinct), , numPartitions = 180 Zip with index for A and for B (to remap strings to int) Join remapped ids with original table This is then fed into MLLIBs ALS algorithm. I am running with: Spark version 1.02 with CDH5.1 numExecutors = 8, numCores = 14 Memory = 12g MemoryFration = 0.7 KryoSerialization My issue is that the code runs fine for a while but then will non-deterministically crash with either file IOExceptions or the following obscure error: 14/10/08 13:29:59 INFO TaskSetManager: Loss was due to java.io.IOException: Filesystem closed [duplicate 10] 14/10/08 13:30:08 WARN TaskSetManager: Loss was due to java.io.FileNotFoundException java.io.FileNotFoundException: /opt/cloudera/hadoop/1/yarn/nm/usercache/zjb238/appcache/application_1412717093951_0024/spark-local-20141008131827-c082/1c/shuffle_3_117_354 (No such file or directory) Looking through the logs, I see the IOException in other places but it appears to be non-catastrophic. The FileNotFoundException, however, is. I have found the following stack overflow that at least seems to address the IOException: http://stackoverflow.com/questions/24038908/spark-fails-on-big-shuffle-jobs-with-java-io-ioexception-filesystem-closed But I have not found anything useful at all with regards to the app cache error. Any help would be much appreciated.
Re: Breaking the previous large-scale sort record with Spark
Hi Matei - I read your post with great interest. Could you possibly comment in more depth on some of the issues you guys saw when scaling up spark and how you resolved them? I am interested specifically in spark-related problems. I'm working on scaling up spark to very large datasets and have been running into a variety of issues. Thanks in advance! On Oct 10, 2014 10:54 AM, Matei Zaharia matei.zaha...@gmail.com wrote: Hi folks, I interrupt your regularly scheduled user / dev list to bring you some pretty cool news for the project, which is that we've been able to use Spark to break MapReduce's 100 TB and 1 PB sort records, sorting data 3x faster on 10x fewer nodes. There's a detailed writeup at http://databricks.com/blog/2014/10/10/spark-breaks-previous-large-scale-sort-record.html. Summary: while Hadoop MapReduce held last year's 100 TB world record by sorting 100 TB in 72 minutes on 2100 nodes, we sorted it in 23 minutes on 206 nodes; and we also scaled up to sort 1 PB in 234 minutes. I want to thank Reynold Xin for leading this effort over the past few weeks, along with Parviz Deyhim, Xiangrui Meng, Aaron Davidson and Ali Ghodsi. In addition, we'd really like to thank Amazon's EC2 team for providing the machines to make this possible. Finally, this result would of course not be possible without the many many other contributions, testing and feature requests from throughout the community. For an engine to scale from these multi-hour petabyte batch jobs down to 100-millisecond streaming and interactive queries is quite uncommon, and it's thanks to all of you folks that we are able to make this happen. Matei - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Debug Spark in Cluster Mode
I would also be interested in knowing more about this. I have used the cloudera manager and the spark resource interface (clientnode:4040) but would love to know if there are other tools out there - either for post processing or better observation during execution. On Oct 9, 2014 4:50 PM, Rohit Pujari rpuj...@hortonworks.com wrote: Hello Folks: What're some best practices to debug Spark in cluster mode? Thanks, Rohit CONFIDENTIALITY NOTICE NOTICE: This message is intended for the use of the individual or entity to which it is addressed and may contain information that is confidential, privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any printing, copying, dissemination, distribution, disclosure or forwarding of this communication is strictly prohibited. If you have received this communication in error, please contact the sender immediately and delete it from your system. Thank You.
Re: Spark SQL parser bug?
Hmm, there is a “T” in the timestamp string, which makes the string not a valid timestamp string representation. Internally Spark SQL uses |java.sql.Timestamp.valueOf| to cast a string to a timestamp. On 10/11/14 2:08 AM, Mohammed Guller wrote: scala rdd.registerTempTable(x) scala val sRdd = sqlContext.sql(select a from x where ts = '2012-01-01*T*00:00:00';) sRdd: org.apache.spark.sql.SchemaRDD = SchemaRDD[4] at RDD at SchemaRDD.scala:103 == Query Plan == == Physical Plan == Project [a#0] ExistingRdd [a#0,ts#1], MapPartitionsRDD[6] at mapPartitions at basicOperators.scala:208 scala sRdd.collect res2: Array[org.apache.spark.sql.Row] = Array()
RDD size in memory - Array[String] vs. case classes
Hi all, I'm playing with Spark currently as a possible solution at work, and I've been recently working out a rough correlation between our input data size and RAM needed to cache an RDD that will be used multiple times in a job. As part of this I've been trialling different methods of representing the data, and I came across a result that surprised me, so I just wanted to check what I was seeing. So my data set is comprised of CSV with appx. 17 fields. When I load my sample data set locally, and cache it after splitting on the comma as an RDD[Array[String]], the Spark UI shows 8% of the RDD can be cached in available RAM. When I cache it as an RDD of a case class, 11% of the RDD is cacheable, so case classes are actually taking up less serialized space than an array. Is it because case class represents numbers as numbers, as opposed to the string array keeping them as strings? Cheers, Liam Clarke
Re: where are my python lambda functions run in yarn-client mode?
This is some kind of implementation details, so not documented :-( If you think this is a blocker for you, you could create a JIRA, maybe it's could be fixed in 1.0.3+. Davies On Fri, Oct 10, 2014 at 5:11 PM, Evan evan.sama...@gmail.com wrote: Thank you! I was looking for a config variable to that end, but I was looking in Spark 1.0.2 documentation, since that was the version I had the problem with. Is this behavior documented in 1.0.2's documentation? Evan On 10/09/2014 04:12 PM, Davies Liu wrote: When you call rdd.take() or rdd.first(), it may[1] executor the job locally (in driver), otherwise, all the jobs are executed in cluster. There is config called `spark.localExecution.enabled` (since 1.1+) to change this, it's not enabled by default, so all the functions will be executed in cluster. If you change set this to `true`, then you get the same behavior as 1.0. [1] If it did not get enough items from the first partitions, it will try multiple partitions in a time, so they will be executed in cluster. On Thu, Oct 9, 2014 at 12:14 PM, esamanas evan.sama...@gmail.com wrote: Hi, I am using pyspark and I'm trying to support both Spark 1.0.2 and 1.1.0 with my app, which will run in yarn-client mode. However, it appears when I use 'map' to run a python lambda function over an RDD, they appear to be run on different machines, and this is causing problems. In both cases, I am using a Hadoop cluster that runs linux on all of its nodes. I am submitting my jobs with a machine running Mac OS X 10.9. As a reproducer, here is my script: import platform print sc.parallelize([1]).map(lambda x: platform.system()).take(1)[0] The answer in Spark 1.1.0: 'Linux' The answer in Spark 1.0.2: 'Darwin' In other experiments I changed the size of the list that gets parallelized, thinking maybe 1.0.2 just runs jobs on the driver node if they're small enough. I got the same answer (with only 1 million numbers). This is a troubling difference. I would expect all functions run on an RDD to be executed on my worker nodes in the Hadoop cluster, but this is clearly not the case for 1.0.2. Why does this difference exist? How can I accurately detect which jobs will run where? Thank you, Evan -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/where-are-my-python-lambda-functions-run-in-yarn-client-mode-tp16059.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org