Re: jar changed on src filesystem

2014-07-17 Thread Chester@work
Since you are running in yarn-cluster mode, and you are supply the spark 
assembly jar file. There is no need to install spark on each node. Is it 
possible two spark jars have different version ? 

Chester

Sent from my iPad

On Jul 16, 2014, at 22:49, cmti95035 cmti95...@gmail.com wrote:

 Hi,
 
 I need some help for running Spark over Yarn:
 
 I set up a cluster running HDP 2.0.6 with 6 nodes, and then installed the
 spark-1.0.1-bin-hadoop2 on each node. When running the SparkPi example with
 the following command:
 
 ./bin/spark-submit --class org.apache.spark.examples.SparkPi --master
 yarn-cluster --num-executors 5 --driver-memory 4g --executor-memory
 2g --executor-cores 1  --jars lib/spark-assembly-1.0.1-hadoop2.2.0.jar  
 lib/spark-examples*.jar 10
 
 The job failed with the following error:
 
 INFO yarn.Client: Application report from ASM: 
application identifier: application_1405545630872_0023
appId: 23
clientToAMToken: null
appDiagnostics: Application application_1405545630872_0023 failed 2 
 times
 due to AM Container for appattempt_1405545630872_0023_02 exited with 
 exitCode: -1000 due to: Resource
 hdfs://ip-172-31-9-187.us-west-1.compute.internal:8020/user/hdfs/.sparkStaging/application_1405545630872_0023/spark-assembly-1.0.1-hadoop2.2.0.jar
 changed on src filesystem (expected 1405574940411, was 1405574941940
 .Failing this attempt.. Failing the application.
 
 I searched online for solutions and tried to sync up ntp but it doesn't seem
 to work. 
 
 Can someone help? Your help is highly appreciated!
 
 Thanks,
 
 Jian
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/jar-changed-on-src-filesystem-tp10011.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.


Using RDD in RDD transformation

2014-07-17 Thread tbin
I implemented a simple KNN classifier. And i can run it successfully on a
single sample, but it occurs an error when it is run on a test samples RDD.
I attach the source code in attachment. Look forward for you replay! Best
wishes to you!

The following is source code.


import math
from pyspark import SparkContext
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib._common import _dot

class KNN(object):
def __init__(self, data, k):
'''
data: RDD of LabeledPoint
'''
self._data = data
self._k = k
self._data.cache()

def predict(self, x):
topksamples = self._data.map(lambda point:(_dot(point.features,
x)/math.sqrt(_dot(point.features,x)*_dot(point.features,x)),
point.label)).sortByKey(False).top(self._k)
labeldict = {}
for score,label in topksamples:
labeldict.setdefault(label, 0)
labeldict[label] += 1
label = sorted([(label,count) for label,count in 
labeldict.items()],
key=lambda x:x[1], reverse=True)[0][0]
return label

# Load and parse the data
def parsePoint(line):
values = [float(x) for x in line.split(' ')]
return LabeledPoint(values[0], values[1:])

sc = SparkContext(appName=PythonLR)
data = sc.textFile(file:///home/hadoop/spark/lr_data.txt)
parsedData = data.map(parsePoint)

# Build the model
model = KNN(parsedData, 20)

# Evaluating a single sample on training data
print 'Predict lable is: %s ' % model.predict(parsedData.first().features)

# Evaluating a samples set which is represented as a RDD
testData = sc.textFile(file:///home/hadoop/spark/lr_data.txt)
testData = testData.map(parsePoint)
labelsAndPreds = testData.map(lambda p: model.predict(p.features))
print labelsAndPreds.collect()




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Using-RDD-in-RDD-transformation-tp10014.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: can we insert and update with spark sql

2014-07-17 Thread Akhil Das
Is this what you are looking for?

https://spark.apache.org/docs/1.0.0/api/java/org/apache/spark/sql/parquet/InsertIntoParquetTable.html

According to the doc, it says Operator that acts as a sink for queries on
RDDs and can be used to store the output inside a directory of Parquet
files. This operator is similar to Hive's INSERT INTO TABLE operation in
the sense that one can choose to either overwrite or append to a directory.
Note that consecutive insertions to the same table must have compatible
(source) schemas.

Thanks
Best Regards


On Thu, Jul 17, 2014 at 11:42 AM, Hu, Leo leo.h...@sap.com wrote:

  Hi

As for spark 1.0, can we insert and update a table with SPARK SQL, and
 how?



 Thanks

 Best Regard



Re: jar changed on src filesystem

2014-07-17 Thread cmti95035
They're all the same version. Actually even without the --jars parameter it
got the same error. Looks like it needs to copy the assembly jar for running
the example jar anyway during the staging.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/jar-changed-on-src-filesystem-tp10011p10017.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Errors accessing hdfs while in local mode

2014-07-17 Thread Akhil Das
You can try the following in the spark-shell:

1. Run it in *Clustermode* by going inside the spark directory:

$ SPARK_MASTER=spark://masterip:7077 ./bin/spark-shell

val textFile = sc.textFile(hdfs://masterip/data/blah.csv)

textFile.take(10).foreach(println)


2. Now try running in *Localmode:*

$ SPARK_MASTER=local ./bin/spark-shell

val textFile = sc.textFile(hdfs://masterip/data/blah.csv)

textFile.take(10).foreach(println)


​Both shoul​d print the first 10 lines from your blah.csv file.


preservesPartitioning

2014-07-17 Thread Kamal Banga
Hi All,

The function *mapPartitions *in RDD.scala
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala
takes
a boolean parameter *preservesPartitioning. *It seems if that parameter is
passed as *false*, the passed function f will operate on the data only
once, whereas if it's passed as *true *the function will operate on each
partition of the data.

In my case, whatever boolean value I pass, *f* operates on each partition
of data.

Any help, regarding why I am getting this unexpected behaviour?


Re: preservesPartitioning

2014-07-17 Thread Matei Zaharia
Hi Kamal,

This is not what preservesPartitioning does -- actually what it means is that 
if the RDD has a Partitioner set (which means it's an RDD of key-value pairs 
and the keys are grouped into a known way, e.g. hashed or range-partitioned), 
your map function is not changing the partition of keys. This lets the job 
scheduler know that downstream operations, like joins or reduceByKey, can be 
optimized assuming that all the data for a given partition is located on the 
same machine. In both cases though, your function f operates on each partition.

Just in case it's not clear, each RDD is composed of multiple blocks that are 
called the partitions. Each partition may be located on a different machine. 
mapPartitions is a way for you to operate on a whole partition at once, which 
is useful if you want to amortize a certain cost across the elements (e.g. you 
open a database connection and test each of them against the database). If you 
just want to see each element once and don't care about sharing stuff across 
them, use map().

Matei

On Jul 17, 2014, at 12:02 AM, Kamal Banga banga.ka...@gmail.com wrote:

 Hi All,
 
 The function mapPartitions in RDD.scala takes a boolean parameter 
 preservesPartitioning. It seems if that parameter is passed as false, the 
 passed function f will operate on the data only once, whereas if it's passed 
 as true the function will operate on each partition of the data. 
 
 In my case, whatever boolean value I pass, f operates on each partition of 
 data. 
 
 Any help, regarding why I am getting this unexpected behaviour?



Re: Kmeans

2014-07-17 Thread Xiangrui Meng
Yes, both run in parallel. Random is a baseline implementation of
initialization, which may ignore small clusters. k-means++ improves
random initialization by adding weights to points far away to the
current candidates. You can view k-means|| as a more scalable version
of K-means++. We don't provide k-means++ for initialization, but we
used it as part of k-means||. Please check the papers for more
details. -Xiangrui

On Wed, Jul 16, 2014 at 10:27 PM, amin mohebbi aminn_...@yahoo.com wrote:
 Thank you for the response-  Can we say that both implementations are
 computing the centroids in parallel?  I mean in both cases will the  data
 and code  send to workers and the results will be collected and passed to
 Driver? and why we have three types of initialization in Mlib ?
 Initialization:
 • random
 • k-means++
 • k-means||


 Best Regards

 ...

 Amin Mohebbi

 PhD candidate in Software Engineering
  at university of Malaysia

 H#x2F;P : +60 18 2040 017



 E-Mail : tp025...@ex.apiit.edu.my

   amin_...@me.com


 On Thursday, July 17, 2014 11:57 AM, Xiangrui Meng men...@gmail.com wrote:


 kmeans.py contains a naive implementation of k-means in python, served
 as an example of how to use pyspark. Please use MLlib's implementation
 in practice. There is a JIRA for making it clear:
 https://issues.apache.org/jira/browse/SPARK-2434

 -Xiangrui

 On Wed, Jul 16, 2014 at 8:16 PM, amin mohebbi aminn_...@yahoo.com wrote:
 Can anyone explain to me what is difference between kmeans in Mlib and
 kmeans in examples/src/main/python/kmeans.py?


 Best Regards

 ...

 Amin Mohebbi

 PhD candidate in Software Engineering
  at university of Malaysia

 H#x2F;P : +60 18 2040 017



 E-Mail : tp025...@ex.apiit.edu.my

  amin_...@me.com




class after join

2014-07-17 Thread Luis Guerra
Hi all,

I am a newbie Spark user with many doubts, so sorry if this is a silly
question.

I am dealing with tabular data formatted as text files, so when I first
load the data, my code is like this:

case class data_class(
   V1: String,
   V2: String,
   V3: String,
   V4: String,
   V5: String,
   V6: String,
   V7: String)

val data= sc.textFile(data_path)
  .map(x = {
  val fields = (x+ ).split(\t)

 data_class(fields(0).trim(),fields(1).trim(),fields(2).trim(),fields(3).trim(),

fields(4).trim(), fields(5).trim(),fields(6).trim())
 })

I am doing this because I would like to access to each position using the
variable name (V1...V7). Is there any other way of doing this?

Also related to this question, if I have data with more than 22 variables,
I am restringed to use class instead of case class. However, this kind of
solution has many restrictions mainly related to getter methods. Is there
any other way of doing this?

And finally, one of my main problems comes after operations of different
data variables. For instance, if I have two different variables (data1 and
data2), and I want to join them both as:

val data3 = data1.keyBy(_.V1).leftOuterJoin(data2.keyBy(_.V1))

Then I have to post process data3 in order to obtain a new class that
contains those variables from data1 and also those variables from data2. As
data3 is (key, (data1, data2)), do I have to create a new different class
with all these attributes from data1 and data2? This is kind of annoying
when there are many attributes.

Thanks in advance,

Best


Re: MLLib - Regularized logistic regression in python

2014-07-17 Thread Xiangrui Meng
1) This is a miss, unfortunately ... We will add support for
regularization and intercept in the coming v1.1. (JIRA:
https://issues.apache.org/jira/browse/SPARK-2550)
2) It has overflow problems in Python but not in Scala. We can
stabilize the computation by ensuring exp only takes a negative value:
1 / ( 1 + e^ x) = 1 - 1 / ( 1 + e^{-x} ) . (JIRA:
https://issues.apache.org/jira/browse/SPARK-2552)

-Xiangrui

On Wed, Jul 16, 2014 at 7:58 PM, Yanbo Liang yanboha...@gmail.com wrote:
 AFAIK for question 2, there is no built-in method to account for that
 problem.
 At right now, we can only perform one type of regularization.
 However, the elastic net implementation is just underway.
 You can refer this topic for further discussion.
 https://issues.apache.org/jira/browse/SPARK-1543


 2014-07-17 2:08 GMT+08:00 fjeg francisco.gime...@gmail.com:

 1) Okay, to clarify, there is *no* way to regularize logistic regression
 in
 python (sorry if I'm repeating your answer).

 2) This method you described will have overflow errors when abs(margin) 
 750. Is there a built-in method to account for this? Otherwise, I will
 probably have to implement something like this:

 http://lingpipe-blog.com/2012/02/16/howprevent-overflow-underflow-logistic-regression

 Also, another question about the Scala implementation:
 Can we only do one type of regularization? Is there any way to perform
 elastic net which is a combination of L1 and L2?





 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/MLLib-Regularized-logistic-regression-in-python-tp9780p9963.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.




Re: Kyro deserialisation error

2014-07-17 Thread Tathagata Das
Seems like there is some sort of stream corruption, causing Kryo read to
read a weird class name from the stream (the name arl Fridtjof Rode in
the exception cannot be a class!).
Not sure how to debug this.

@Patrick: Any idea?



On Wed, Jul 16, 2014 at 10:14 PM, Hao Wang wh.s...@gmail.com wrote:

 I am not sure. Not every task will fail at this Kyro exception. In most
 time, the cluster could successfully finish the WikipediaPageRank.
 How could I debug this exception?

 Thanks

 Regards,
 Wang Hao(王灏)

 CloudTeam | School of Software Engineering
 Shanghai Jiao Tong University
 Address:800 Dongchuan Road, Minhang District, Shanghai, 200240
 Email:wh.s...@gmail.com


 On Thu, Jul 17, 2014 at 2:58 AM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 Is the class that is not found in the wikipediapagerank jar?

 TD


 On Wed, Jul 16, 2014 at 12:32 AM, Hao Wang wh.s...@gmail.com wrote:

 Thanks for your reply. The SparkContext is configured as below:



  sparkConf.setAppName(WikipediaPageRank)






 sparkConf.set(spark.serializer, 
 org.apache.spark.serializer.KryoSerializer)






 sparkConf.set(spark.kryo.registrator,  
 classOf[PRKryoRegistrator].getName)






 val inputFile = args(0)






 val threshold = args(1).toDouble






 val numPartitions = args(2).toInt






 val usePartitioner = args(3).toBoolean







 sparkConf.setAppName(WikipediaPageRank)






 sparkConf.set(spark.executor.memory, 60g)






 sparkConf.set(spark.cores.max, 48)






 sparkConf.set(spark.kryoserializer.buffer.mb, 24)






 val sc = new SparkContext(sparkConf)






 
 sc.addJar(~/Documents/Scala/WikiPageRank/target/scala-2.10/wikipagerank_2.10-1.0.jar)







 And I use spark-submit to run the application:






 ./bin/spark-submit --master spark://sing12:7077  --total-executor-cores 40 
 --executor-memory 40g --class 
 org.apache.spark.examples.bagel.WikipediaPageRank 
 ~/Documents/Scala/WikiPageRank/target/scala-2.10/wikipagerank_2.10-1.0.jar 
 hdfs://192.168.1.12:9000/freebase-26G 1 200 True







 Regards,
 Wang Hao(王灏)

 CloudTeam | School of Software Engineering
 Shanghai Jiao Tong University
 Address:800 Dongchuan Road, Minhang District, Shanghai, 200240
 Email:wh.s...@gmail.com


 On Wed, Jul 16, 2014 at 1:41 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 Are you using classes from external libraries that have not been added
 to the sparkContext, using sparkcontext.addJar()?

 TD


 On Tue, Jul 15, 2014 at 8:36 PM, Hao Wang wh.s...@gmail.com wrote:

 I am running the WikipediaPageRank in Spark example and share the same
 problem with you:

 4/07/16 11:31:06 DEBUG DAGScheduler: submitStage(Stage 6)
 14/07/16 11:31:06 ERROR TaskSetManager: Task 6.0:450 failed 4 times;
 aborting job
 14/07/16 11:31:06 INFO DAGScheduler: Failed to run foreach at
 Bagel.scala:251
 Exception in thread main 14/07/16 11:31:06 INFO TaskSchedulerImpl:
 Cancelling stage 6
 org.apache.spark.SparkException: Job aborted due to stage failure:
 Task 6.0:450 failed 4 times, most recent failure: Exception failure in TID
 1330 on host sing11: com.esotericsoftware.kryo.KryoException: Unable to
 find class: arl Fridtjof Rode

 com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)

 com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
 com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:610)

 com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:721)

 com.twitter.chill.TraversableSerializer.read(Traversable.scala:44)

 com.twitter.chill.TraversableSerializer.read(Traversable.scala:21)

 com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)

 org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:115)

 org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:125)

 org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)

 org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
 scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)

 org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58)

 org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:96)

 org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:95)
 org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:582)

 Anyone cloud help?

 Regards,
 Wang Hao(王灏)

 CloudTeam | School of Software Engineering
 Shanghai Jiao Tong University
 Address:800 Dongchuan Road, Minhang District, Shanghai, 200240
 Email:wh.s...@gmail.com


 On Tue, Jun 3, 2014 at 8:02 PM, Denes te...@outlook.com wrote:

 I tried to use Kryo as a serialiser isn spark streaming, did
 everything
 according to the guide posted on the spark website, i.e. added the
 following
 lines:

 conf.set(spark.serializer,
 org.apache.spark.serializer.KryoSerializer);
 conf.set(spark.kryo.registrator, 

Re: Kyro deserialisation error

2014-07-17 Thread Sean Owen
Not sure if this helps, but it does seem to be part of a name in a
Wikipedia article, and Wikipedia is the data set. So something is
reading this class name from the data.

http://en.wikipedia.org/wiki/Carl_Fridtjof_Rode

On Thu, Jul 17, 2014 at 9:40 AM, Tathagata Das
tathagata.das1...@gmail.com wrote:
 Seems like there is some sort of stream corruption, causing Kryo read to
 read a weird class name from the stream (the name arl Fridtjof Rode in the
 exception cannot be a class!).
 Not sure how to debug this.

 @Patrick: Any idea?


Re: class after join

2014-07-17 Thread Luis Guerra
Thank you for your fast reply.

We are considering this Map[String, String] solution, but there are some
details that we do not control yet. What would happen if we have different
data types for different fields? Also, with this solution, we have to
repeat the field names for every row that we have, is this efficient?

Regarding the solution with composition, the key would be repeated in the
new class, whereas it is only necessary once after the join, isn't it?


On Thu, Jul 17, 2014 at 10:25 AM, Sean Owen so...@cloudera.com wrote:

 If what you have is a large number of named strings, why not use a
 Map[String,String] to represent them? If you're approaching a class
 with 22 String fields anyway, it probably makes more sense. You lose
 a bit of compile-time checking, but gain flexibility.

 Also, merging two Maps to make a new one is pretty simple, compared to
 making many of these values classes.

 (Although, if you otherwise needed a class that represented all of
 the things in class A and class B, this could be done easily with
 composition, a class with an A and a B inside.)

 On Thu, Jul 17, 2014 at 9:15 AM, Luis Guerra luispelay...@gmail.com
 wrote:
  Hi all,
 
  I am a newbie Spark user with many doubts, so sorry if this is a silly
  question.
 
  I am dealing with tabular data formatted as text files, so when I first
 load
  the data, my code is like this:
 
  case class data_class(
V1: String,
V2: String,
V3: String,
V4: String,
V5: String,
V6: String,
V7: String)
 
  val data= sc.textFile(data_path)
.map(x = {
val fields = (x+ ).split(\t)
 
 data_class(fields(0).trim(),fields(1).trim(),fields(2).trim(),fields(3).trim(),
  fields(4).trim(), fields(5).trim(),fields(6).trim())
  })
 
  I am doing this because I would like to access to each position using the
  variable name (V1...V7). Is there any other way of doing this?
 
  Also related to this question, if I have data with more than 22
 variables, I
  am restringed to use class instead of case class. However, this kind of
  solution has many restrictions mainly related to getter methods. Is there
  any other way of doing this?
 
  And finally, one of my main problems comes after operations of different
  data variables. For instance, if I have two different variables (data1
 and
  data2), and I want to join them both as:
 
  val data3 = data1.keyBy(_.V1).leftOuterJoin(data2.keyBy(_.V1))
 
  Then I have to post process data3 in order to obtain a new class that
  contains those variables from data1 and also those variables from data2.
 As
  data3 is (key, (data1, data2)), do I have to create a new different class
  with all these attributes from data1 and data2? This is kind of annoying
  when there are many attributes.
 
  Thanks in advance,
 
  Best



Re: Pysparkshell are not listing in the web UI while running

2014-07-17 Thread Akhil Das
Hi Neethu,

Your application is running on local mode and that's the reason why you are
not seeing the driver app in the 8080 webUI. You can pass the Master IP to
your pyspark and get it running in cluster mode.

eg: IPYTHON_OPTS=notebook --pylab inline $SPARK_HOME/bin/pyspark --master
spark://master:7077


Replace master:7077 with the spark uri that you are seeing in top left of
the 8080 webui.


Thanks
Best Regards


On Thu, Jul 17, 2014 at 1:35 PM, MEETHU MATHEW meethu2...@yahoo.co.in
wrote:


  Hi all,

 I just upgraded to spark 1.0.1. In spark 1.0.0 when I start Ipython
 notebook using the following command,it used to come in the running
 applications tab in master:8080 web UI.

 IPYTHON_OPTS=notebook --pylab inline $SPARK_HOME/bin/pyspark

 But now when I run it,its not getting listed under running
 application/completed application(once its closed).But I am able to see the
 spark stages at master:4040 while its running

 Anyone have any idea why this



 Thanks  Regards,
 Meethu M



Bad Digest error while doing aws s3 put

2014-07-17 Thread lmk
Hi,
I am getting the following error while trying save a large dataset to s3
using the saveAsHadoopFile command with apache spark-1.0.
org.apache.hadoop.fs.s3.S3Exception: org.jets3t.service.S3ServiceException:
S3 PUT failed for
'/spark_test%2Fsmaato_one_day_phase_2%2Fsmaato_2014_05_17%2F_temporary%2F_attempt_201407170658__m_36_276%2Fpart-00036'
XML Error Message: ?xml version=1.0
encoding=UTF-8?ErrorCodeBadDigest/CodeMessageThe Content-MD5 you
specified did not match what we
received./MessageExpectedDigestN808DtNfYiTFzI+i2HxLEw==/ExpectedDigestCalculatedDigest66nS+2C1QqQmmcTeFpXOjw==/CalculatedDigestRequestId4FB3A3D60B187CE7/RequestIdHostIdH2NznP+RvwspekVHBMvgWGYAupKuO5YceSgmiLym6rOajOh5v5GnyM0VkO+dadyG/HostId/Error

I have used the same command to write similar content with lesser data to s3
without any problem. When I googled this error message, they say it might be
due to md5 checksum mismatch. But will this happen due to load?

Regards,
lmk 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Bad-Digest-error-while-doing-aws-s3-put-tp10036.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Apache kafka + spark + Parquet

2014-07-17 Thread Mahebub Sayyed
Hi All,

Currently we are reading (multiple) topics from Apache kafka and storing
that in HBase (multiple tables) using twitter storm (1 tuple stores in 4
different tables).
but we are facing some performance issue with HBase.
so we are replacing* HBase* with *Parquet* file and *storm* with *Apache
Spark*.

difficulties:
1. How to read multiple topics from kafka using spark?
2. One tuple belongs to multiple tables, How to write one topic to multiple
parquet files with proper partitioning using spark??

Please help me
Thanks in advance.

-- 
*Regards,*

*Mahebub *


Spark scheduling with Capacity scheduler

2014-07-17 Thread Konstantin Kudryavtsev
Hi all,

I'm using HDP 2.0, YARN. I'm running both MapReduce and Spark jobs on this
cluster, is it possible somehow use Capacity scheduler for Spark jobs
management as well as MR jobs? I mean, I'm able to send MR job to specific
queue, may I do the same with Spark job?
thank you in advance

Thank you,
Konstantin Kudryavtsev


Re: Apache kafka + spark + Parquet

2014-07-17 Thread Tathagata Das
1. You can put in multiple kafka topics in the same Kafka input stream. See
the example KafkaWordCount
https://github.com/apache/spark/blob/68f28dabe9c7679be82e684385be216319beb610/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala
.
However they will all be read through a single receiver (though multiple
threads, one per topic). To parallelize the read (for increasing
throughput), you can create multiple Kafka input streams, and splits the
topics appropriately between them.

2. You can easily read and write to parquet files in Spark. Any RDD
(generated through DStreams in Spark Streaming, or otherwise), can be
converted to a SchemaRDD and then saved in the parquet format as
rdd.saveAsParquetFile. See the Spark SQL guide
http://spark.apache.org/docs/latest/sql-programming-guide.html#parquet-files
for
more details. So if you want to write a same dataset (as RDDs) to two
different parquet files, you just have to call saveAsParquetFile twice (on
same or transformed versions of the RDD), as shown in the guide.

Hope this helps!

TD


On Thu, Jul 17, 2014 at 2:19 AM, Mahebub Sayyed mahebub...@gmail.com
wrote:

 Hi All,

 Currently we are reading (multiple) topics from Apache kafka and storing
 that in HBase (multiple tables) using twitter storm (1 tuple stores in 4
 different tables).
  but we are facing some performance issue with HBase.
 so we are replacing* HBase* with *Parquet* file and *storm* with *Apache
 Spark*.

 difficulties:
  1. How to read multiple topics from kafka using spark?
 2. One tuple belongs to multiple tables, How to write one topic to
 multiple parquet files with proper partitioning using spark??

 Please help me
 Thanks in advance.

 --
 *Regards,*

 *Mahebub *



Re: Speeding up K-Means Clustering

2014-07-17 Thread Xiangrui Meng
Is it v0.9? Did you run in local mode? Try to set --driver-memory 4g
and repartition your data to match number of CPU cores such that the
data is evenly distributed. You need 1m * 50 * 8 ~ 400MB to storage
the data. Make sure there are enough memory for caching. -Xiangrui

On Thu, Jul 17, 2014 at 1:48 AM, Ravishankar Rajagopalan
viora...@gmail.com wrote:
 I am trying to use MLlib for K-Means clustering on a data set with 1 million
 rows and 50 columns (all columns have double values) which is on HDFS (raw
 txt file is 28 MB)

 I initially tried the following:

 val data3 = sc.textFile(hdfs://...inputData.txt)
 val parsedData3 = data3.map( _.split('\t').map(_.toDouble))
 val numIterations = 10
 val numClusters = 200
 val clusters = KMeans.train(parsedData3, numClusters, numIterations)

 This took me nearly 850 seconds.

 I tried using persist with MEMORY_ONLY option hoping that this would
 significantly speed up the algorithm:

 val data3 = sc.textFile(hdfs://...inputData.txt)
 val parsedData3 = data3.map( _.split('\t').map(_.toDouble))
 parsedData3.persist(MEMORY_ONLY)
 val numIterations = 10
 val numClusters = 200
 val clusters = KMeans.train(parsedData3, numClusters, numIterations)

 This resulted in only a marginal improvement and took around 720 seconds.

 Is there any other way to speed up the algorithm further?

 Thank you.

 Regards,
 Ravi


Re: Kyro deserialisation error

2014-07-17 Thread Hao Wang
Hi, all

Yes, it's a name of Wikipedia article. I am running WikipediaPageRank
example of Spark Bagels.
I am wondering whether there is any relation to buffer size of Kyro.

The page rank can be successfully finished, sometimes not because this kind
of Kyro exception happens too many times, which beats the maxTaskFailures.

I find this *Kyro exception: unable to find class *in my successful case,
too.


Regards,
Wang Hao(王灏)

CloudTeam | School of Software Engineering
Shanghai Jiao Tong University
Address:800 Dongchuan Road, Minhang District, Shanghai, 200240
Email:wh.s...@gmail.com


On Thu, Jul 17, 2014 at 4:44 PM, Sean Owen so...@cloudera.com wrote:

 Not sure if this helps, but it does seem to be part of a name in a
 Wikipedia article, and Wikipedia is the data set. So something is
 reading this class name from the data.

 http://en.wikipedia.org/wiki/Carl_Fridtjof_Rode

 On Thu, Jul 17, 2014 at 9:40 AM, Tathagata Das
 tathagata.das1...@gmail.com wrote:
  Seems like there is some sort of stream corruption, causing Kryo read to
  read a weird class name from the stream (the name arl Fridtjof Rode in
 the
  exception cannot be a class!).
  Not sure how to debug this.
 
  @Patrick: Any idea?



Re: Pysparkshell are not listing in the web UI while running

2014-07-17 Thread MEETHU MATHEW
Hi Akhil,

That fixed the problem...Thanks

 
Thanks  Regards, 
Meethu M


On Thursday, 17 July 2014 2:26 PM, Akhil Das ak...@sigmoidanalytics.com wrote:
 


Hi Neethu,

Your application is running on local mode and that's the reason why you are not 
seeing the driver app in the 8080 webUI. You can pass the Master IP to your 
pyspark and get it running in cluster mode. 

eg: IPYTHON_OPTS=notebook --pylab inline $SPARK_HOME/bin/pyspark --master 
spark://master:7077


Replace master:7077 with the spark uri that you are seeing in top left of the 
8080 webui.



Thanks
Best Regards


On Thu, Jul 17, 2014 at 1:35 PM, MEETHU MATHEW meethu2...@yahoo.co.in wrote:



 Hi all,


I just upgraded to spark 1.0.1. In spark 1.0.0 when I start Ipython notebook 
using the following command,it used to come in the running applications tab in 
master:8080 web UI.


IPYTHON_OPTS=notebook --pylab inline $SPARK_HOME/bin/pyspark


But now when I run it,its not getting listed under running 
application/completed application(once its closed).But I am able to see the 
spark stages at master:4040 while its running


Anyone have any idea why this 






Thanks  Regards, 
Meethu M

GraphX Pragel implementation

2014-07-17 Thread Arun Kumar
Hi



I am trying to implement belief propagation algorithm in GraphX using the
pragel API.

*def* pregel[A]

  (initialMsg*:* A,

   maxIter*:* Int = *Int*.*MaxValue*,

   activeDir*:* EdgeDirection = *EdgeDirection*.*Out*)

  (vprog*:* (VertexId, VD, A) *=* *VD*,

   sendMsg*:* EdgeTriplet[VD, ED] *=* *Iterator*[(VertexId, A)],

   mergeMsg*:* (A, A) *=* A)

In this can we create messages in vprog function(From in coming messages)
and send them using sendMsg ?



Regards
Arun


Re: Apache kafka + spark + Parquet

2014-07-17 Thread Mahebub Sayyed
Hi,

To migrate data from *HBase *to *Parquet* we used following query through
* Impala*:

INSERT INTO table PARQUET_HASHTAGS(
key, city_name, country_name, hashtag_date, hashtag_text,
hashtag_source, hashtag_month, posted_time, hashtag_time,
tweet_id, user_id, user_name,
hashtag_year
) *partition(year, month, day)* SELECT key, city_name, country_name,
hashtag_date, hashtag_text, hashtag_source, hashtag_month, posted_time,
hashtag_time, tweet_id, user_id, user_name,hashtag_year, cast(hashtag_year
as int),cast(hashtag_month as int), cast(hashtag_date as int) from HASHTAGS
where hashtag_year='2014' and hashtag_month='04' and hashtag_date='01'
ORDER BY key,hashtag_year,hashtag_month,hashtag_date LIMIT 1000 offset
0;

using above query we have successfully migrated form HBase to Parquet files
with proper partitions.

Now we are storing Data direct from *Kafka *to *Parquet.*

*How is it possible to create partitions while storing data direct from
kafka to Parquet files??*
*(likewise created in above query)*


On Thu, Jul 17, 2014 at 12:35 PM, Tathagata Das tathagata.das1...@gmail.com
 wrote:

 1. You can put in multiple kafka topics in the same Kafka input stream.
 See the example KafkaWordCount
 https://github.com/apache/spark/blob/68f28dabe9c7679be82e684385be216319beb610/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala
  .
 However they will all be read through a single receiver (though multiple
 threads, one per topic). To parallelize the read (for increasing
 throughput), you can create multiple Kafka input streams, and splits the
 topics appropriately between them.

 2. You can easily read and write to parquet files in Spark. Any RDD
 (generated through DStreams in Spark Streaming, or otherwise), can be
 converted to a SchemaRDD and then saved in the parquet format as
 rdd.saveAsParquetFile. See the Spark SQL guide
 http://spark.apache.org/docs/latest/sql-programming-guide.html#parquet-files
  for
 more details. So if you want to write a same dataset (as RDDs) to two
 different parquet files, you just have to call saveAsParquetFile twice (on
 same or transformed versions of the RDD), as shown in the guide.

 Hope this helps!

 TD


 On Thu, Jul 17, 2014 at 2:19 AM, Mahebub Sayyed mahebub...@gmail.com
 wrote:

 Hi All,

 Currently we are reading (multiple) topics from Apache kafka and storing
 that in HBase (multiple tables) using twitter storm (1 tuple stores in 4
 different tables).
  but we are facing some performance issue with HBase.
 so we are replacing* HBase* with *Parquet* file and *storm* with *Apache
 Spark*.

 difficulties:
  1. How to read multiple topics from kafka using spark?
 2. One tuple belongs to multiple tables, How to write one topic to
 multiple parquet files with proper partitioning using spark??

 Please help me
 Thanks in advance.

 --
 *Regards,*

 *Mahebub *





-- 
*Regards,*
*Mahebub Sayyed*


Re: Speeding up K-Means Clustering

2014-07-17 Thread Ravishankar Rajagopalan
Hi Xiangrui,

Yes I am using Spark v0.9 and am not running it in local mode.

I did the memory setting using export SPARK_MEM=4G before starting the  Spark
instance.

Also previously, I was starting it with -c 1 but changed it to -c 12 since
it is a 12 core machine. It did bring down the time taken to less than 200
seconds from over 700 seconds.

I am not sure how to repartition the data to match the CPU cores. How do I
do it?

Thank you.

Ravi


On Thu, Jul 17, 2014 at 3:17 PM, Xiangrui Meng men...@gmail.com wrote:

 Is it v0.9? Did you run in local mode? Try to set --driver-memory 4g
 and repartition your data to match number of CPU cores such that the
 data is evenly distributed. You need 1m * 50 * 8 ~ 400MB to storage
 the data. Make sure there are enough memory for caching. -Xiangrui

 On Thu, Jul 17, 2014 at 1:48 AM, Ravishankar Rajagopalan
 viora...@gmail.com wrote:
  I am trying to use MLlib for K-Means clustering on a data set with 1
 million
  rows and 50 columns (all columns have double values) which is on HDFS
 (raw
  txt file is 28 MB)
 
  I initially tried the following:
 
  val data3 = sc.textFile(hdfs://...inputData.txt)
  val parsedData3 = data3.map( _.split('\t').map(_.toDouble))
  val numIterations = 10
  val numClusters = 200
  val clusters = KMeans.train(parsedData3, numClusters, numIterations)
 
  This took me nearly 850 seconds.
 
  I tried using persist with MEMORY_ONLY option hoping that this would
  significantly speed up the algorithm:
 
  val data3 = sc.textFile(hdfs://...inputData.txt)
  val parsedData3 = data3.map( _.split('\t').map(_.toDouble))
  parsedData3.persist(MEMORY_ONLY)
  val numIterations = 10
  val numClusters = 200
  val clusters = KMeans.train(parsedData3, numClusters, numIterations)
 
  This resulted in only a marginal improvement and took around 720 seconds.
 
  Is there any other way to speed up the algorithm further?
 
  Thank you.
 
  Regards,
  Ravi



Re: Getting pyspark.resultiterable.ResultIterable at xxxxxx in local shell

2014-07-17 Thread newbee88
Could someone please help me resolve This post has NOT been accepted by the
mailing list yet. issue. I registered and subscribed to the mailing list
many days ago but my post is still in unaccepted state.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Getting-pyspark-resultiterable-ResultIterable-at-xx-in-local-shell-tp9490p10046.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark Streaming timing considerations

2014-07-17 Thread Laeeq Ahmed
Hi TD,

I have been able to filter the first WindowedRDD, but I am not sure how to make 
a generic filter. The larger window is 8 seconds and want to fetch 4 second 
based on application-time-stamp. I have seen an earlier post which suggest 
timeStampBasedwindow but I am not sure how to make timestampBasedwindow in the 
following example. 


 val transformed = keyAndValues.window(Seconds(8), 
Seconds(4)).transform(windowedRDD = {
 //val timeStampBasedWindow = ???                    // define the window over 
the timestamp that you want to process
 val filteredRDD = windowedRDD.filter(_._1  4)     // filter and retain only 
the records that fall in the timestamp-based window
 return filteredRDD
 })
Consider the input tuples as (1,23),(1.2,34) . . . . . (3.8,54)(4,413) . . .  
whereas key is the timestamp.

Regards,
Laeeq
 



On Saturday, July 12, 2014 8:29 PM, Laeeq Ahmed laeeqsp...@yahoo.com wrote:
 


Hi,
Thanks I will try to implement it.

Regards,
Laeeq



 On Saturday, July 12, 2014 4:37 AM, Tathagata Das 
tathagata.das1...@gmail.com wrote:
 


This is not in the current streaming API.

Queue stream is useful for testing with generated RDDs, but not for actual 
data. For actual data stream, the slack time can be implemented by doing 
DStream.window on a larger window that take slack time in consideration, and 
then the required application-time-based-window of data filtered out. For 
example, if you want a slack time of 1 minute and batches of 10 seconds, then 
do a window operation of 70 seconds, then in each RDD filter out the records 
with the desired application time and process them. 

TD



On Fri, Jul 11, 2014 at 7:44 AM, Laeeq Ahmed laeeqsp...@yahoo.com wrote:

Hi,


In the spark streaming paper, slack time has been suggested for delaying the 
batch creation in case of external timestamps. I don't see any such option in 
streamingcontext. Is it available in the API?



Also going through the previous posts, queueStream has been suggested for 
this. I looked into to queueStream example.



     // Create and push some RDDs into Queue
    for (i - 1 to 30) {
    rddQueue += ssc.sparkContext.makeRDD(1 to 10)
    Thread.sleep(1000)
    }

The only thing I am unsure is how to make batches(basic RDD) out of stream 
coming on a port.


Regards,
Laeeq

 

Equivalent functions for NVL() and CASE expressions in Spark SQL

2014-07-17 Thread pandees waran
Do we have any equivalent scala functions available for NVL() and CASE
expressions to use in spark sql?


Re: Simple record matching using Spark SQL

2014-07-17 Thread Sarath Chandra
Added below 2 lines just before the sql query line -
*...*
*file1_schema.count;*
*file2_schema.count;*
*...*
and it started working. But I couldn't get the reason.

Can someone please explain me? What was happening earlier and what is
happening with addition of these 2 lines?

~Sarath


On Thu, Jul 17, 2014 at 1:13 PM, Sarath Chandra 
sarathchandra.jos...@algofusiontech.com wrote:

 No Sonal, I'm not doing any explicit call to stop context.

 If you see my previous post to Michael, the commented portion of the code
 is my requirement. When I run this over standalone spark cluster, the
 execution keeps running with no output or error. After waiting for several
 minutes I'm killing it by pressing Ctrl+C in the terminal.

 But the same code runs perfectly when executed from spark shell.

 ~Sarath


 On Thu, Jul 17, 2014 at 1:05 PM, Sonal Goyal sonalgoy...@gmail.com
 wrote:

 Hi Sarath,

 Are you explicitly stopping the context?

 sc.stop()




 Best Regards,
 Sonal
 Nube Technologies http://www.nubetech.co

 http://in.linkedin.com/in/sonalgoyal




 On Thu, Jul 17, 2014 at 12:51 PM, Sarath Chandra 
 sarathchandra.jos...@algofusiontech.com wrote:

 Hi Michael, Soumya,

 Can you please check and let me know what is the issue? what am I
 missing?
 Let me know if you need any logs to analyze.

 ~Sarath


 On Wed, Jul 16, 2014 at 8:24 PM, Sarath Chandra 
 sarathchandra.jos...@algofusiontech.com wrote:

 Hi Michael,

 Tried it. It's correctly printing the line counts of both the files.
 Here's what I tried -

 *Code:*
 *package test*
 *object Test4 {*
 *  case class Test(fld1: String, *
 *   fld2: String, *
 *   fld3: String, *
 *   fld4: String, *
 *   fld5: String, *
 *   fld6: Double, *
 *   fld7: String);*
 *  def main(args: Array[String]) {*
 *val conf = new SparkConf()*
 *.setMaster(args(0))*
 * .setAppName(SQLTest)*
 * .setSparkHome(args(1))*
 * .set(spark.executor.memory, 2g);*
 *val sc = new SparkContext(conf);*
 *sc.addJar(test1-0.1.jar);*
 *val file1 = sc.textFile(args(2));*
 *println(file1.count());*
 *val file2 = sc.textFile(args(3));*
 *println(file2.count());*
 *//val sq = new SQLContext(sc);*
 *//import sq._*
 *//val file1_recs: RDD[Test] = file1.map(_.split(,)).map(l =
 Test(l(0), l(1), l(2), l(3), l(4), l(5).toDouble, l(6)));*
 *//val file2_recs: RDD[Test] = file2.map(_.split(,)).map(s =
 Test(s(0), s(1), s(2), s(3), s(4), s(5).toDouble, s(6)));*
 *//val file1_schema = sq.createSchemaRDD(file1_recs);*
 *//val file2_schema = sq.createSchemaRDD(file2_recs);*
 *//file1_schema.registerAsTable(file1_tab);*
 *//file2_schema.registerAsTable(file2_tab);*
 *//val matched = sq.sql(select * from file1_tab l join file2_tab s
 on  + *
 *// l.fld7=s.fld7 where l.fld2=s.fld2 and  + *
 *// l.fld3=s.fld3 and l.fld4=s.fld4 and  + *
 *// l.fld6=s.fld6);*
 *//matched.collect().foreach(println);*
 *  }*
 *}*

 *Execution:*
 *export CLASSPATH=$HADOOP_PREFIX/conf:$SPARK_HOME/lib/*:test1-0.1.jar*
 *export CONFIG_OPTS=-Dspark.jars=test1-0.1.jar*
 *java -cp $CLASSPATH $CONFIG_OPTS test.Test4 spark://master:7077
 /usr/local/spark-1.0.1-bin-hadoop1
 hdfs://master:54310/user/hduser/file1.csv
 hdfs://master:54310/user/hduser/file2.csv*

 ~Sarath

 On Wed, Jul 16, 2014 at 8:14 PM, Michael Armbrust 
 mich...@databricks.com wrote:

 What if you just run something like:
 *sc.textFile(hdfs://localhost:54310/user/hduser/file1.csv).count()*


 On Wed, Jul 16, 2014 at 10:37 AM, Sarath Chandra 
 sarathchandra.jos...@algofusiontech.com wrote:

 Yes Soumya, I did it.

 First I tried with the example available in the documentation
 (example using people table and finding teenagers). After successfully
 running it, I moved on to this one which is starting point to a bigger
 requirement for which I'm evaluating Spark SQL.


 On Wed, Jul 16, 2014 at 7:59 PM, Soumya Simanta 
 soumya.sima...@gmail.com wrote:



 Can you try submitting a very simple job to the cluster.

 On Jul 16, 2014, at 10:25 AM, Sarath Chandra 
 sarathchandra.jos...@algofusiontech.com wrote:

 Yes it is appearing on the Spark UI, and remains there with state as
 RUNNING till I press Ctrl+C in the terminal to kill the execution.

 Barring the statements to create the spark context, if I copy paste
 the lines of my code in spark shell, runs perfectly giving the desired
 output.

 ~Sarath

 On Wed, Jul 16, 2014 at 7:48 PM, Soumya Simanta 
 soumya.sima...@gmail.com wrote:

 When you submit your job, it should appear on the Spark UI. Same
 with the REPL. Make sure you job is submitted to the cluster properly.


 On Wed, Jul 16, 2014 at 10:08 AM, Sarath Chandra 
 sarathchandra.jos...@algofusiontech.com wrote:

 Hi Soumya,

 Data is very small, 500+ lines in each file.

 Removed last 2 lines and placed this at the end
 matched.collect().foreach(println);. Still no luck. It's been more 
 than
 5min, the execution is still running.

 Checked logs, nothing in stdout. In stderr I don't see anything
 

Re: Simple record matching using Spark SQL

2014-07-17 Thread Michael Armbrust
What version are you running?  Could you provide a jstack of the driver and
executor when it is hanging?


On Thu, Jul 17, 2014 at 10:55 AM, Sarath Chandra 
sarathchandra.jos...@algofusiontech.com wrote:

 Added below 2 lines just before the sql query line -
 *...*
 *file1_schema.count;*
 *file2_schema.count;*
 *...*
 and it started working. But I couldn't get the reason.

 Can someone please explain me? What was happening earlier and what is
 happening with addition of these 2 lines?

 ~Sarath


 On Thu, Jul 17, 2014 at 1:13 PM, Sarath Chandra 
 sarathchandra.jos...@algofusiontech.com wrote:

 No Sonal, I'm not doing any explicit call to stop context.

 If you see my previous post to Michael, the commented portion of the code
 is my requirement. When I run this over standalone spark cluster, the
 execution keeps running with no output or error. After waiting for several
 minutes I'm killing it by pressing Ctrl+C in the terminal.

 But the same code runs perfectly when executed from spark shell.

 ~Sarath


 On Thu, Jul 17, 2014 at 1:05 PM, Sonal Goyal sonalgoy...@gmail.com
 wrote:

 Hi Sarath,

 Are you explicitly stopping the context?

 sc.stop()




 Best Regards,
 Sonal
 Nube Technologies http://www.nubetech.co

 http://in.linkedin.com/in/sonalgoyal




 On Thu, Jul 17, 2014 at 12:51 PM, Sarath Chandra 
 sarathchandra.jos...@algofusiontech.com wrote:

 Hi Michael, Soumya,

 Can you please check and let me know what is the issue? what am I
 missing?
 Let me know if you need any logs to analyze.

 ~Sarath


 On Wed, Jul 16, 2014 at 8:24 PM, Sarath Chandra 
 sarathchandra.jos...@algofusiontech.com wrote:

 Hi Michael,

 Tried it. It's correctly printing the line counts of both the files.
 Here's what I tried -

 *Code:*
 *package test*
 *object Test4 {*
 *  case class Test(fld1: String, *
 *   fld2: String, *
 *   fld3: String, *
 *   fld4: String, *
 *   fld5: String, *
 *   fld6: Double, *
 *   fld7: String);*
 *  def main(args: Array[String]) {*
 *val conf = new SparkConf()*
 *.setMaster(args(0))*
 * .setAppName(SQLTest)*
 * .setSparkHome(args(1))*
 * .set(spark.executor.memory, 2g);*
 *val sc = new SparkContext(conf);*
 *sc.addJar(test1-0.1.jar);*
 *val file1 = sc.textFile(args(2));*
 *println(file1.count());*
 *val file2 = sc.textFile(args(3));*
 *println(file2.count());*
 *//val sq = new SQLContext(sc);*
 *//import sq._*
 *//val file1_recs: RDD[Test] = file1.map(_.split(,)).map(l =
 Test(l(0), l(1), l(2), l(3), l(4), l(5).toDouble, l(6)));*
 *//val file2_recs: RDD[Test] = file2.map(_.split(,)).map(s =
 Test(s(0), s(1), s(2), s(3), s(4), s(5).toDouble, s(6)));*
 *//val file1_schema = sq.createSchemaRDD(file1_recs);*
 *//val file2_schema = sq.createSchemaRDD(file2_recs);*
 *//file1_schema.registerAsTable(file1_tab);*
 *//file2_schema.registerAsTable(file2_tab);*
 *//val matched = sq.sql(select * from file1_tab l join file2_tab
 s on  + *
 *// l.fld7=s.fld7 where l.fld2=s.fld2 and  + *
 *// l.fld3=s.fld3 and l.fld4=s.fld4 and  + *
 *// l.fld6=s.fld6);*
 *//matched.collect().foreach(println);*
 *  }*
 *}*

 *Execution:*
 *export CLASSPATH=$HADOOP_PREFIX/conf:$SPARK_HOME/lib/*:test1-0.1.jar*
 *export CONFIG_OPTS=-Dspark.jars=test1-0.1.jar*
 *java -cp $CLASSPATH $CONFIG_OPTS test.Test4 spark://master:7077
 /usr/local/spark-1.0.1-bin-hadoop1
 hdfs://master:54310/user/hduser/file1.csv
 hdfs://master:54310/user/hduser/file2.csv*

 ~Sarath

 On Wed, Jul 16, 2014 at 8:14 PM, Michael Armbrust 
 mich...@databricks.com wrote:

 What if you just run something like:
 *sc.textFile(hdfs://localhost:54310/user/hduser/file1.csv).count()*


 On Wed, Jul 16, 2014 at 10:37 AM, Sarath Chandra 
 sarathchandra.jos...@algofusiontech.com wrote:

 Yes Soumya, I did it.

 First I tried with the example available in the documentation
 (example using people table and finding teenagers). After successfully
 running it, I moved on to this one which is starting point to a bigger
 requirement for which I'm evaluating Spark SQL.


 On Wed, Jul 16, 2014 at 7:59 PM, Soumya Simanta 
 soumya.sima...@gmail.com wrote:



 Can you try submitting a very simple job to the cluster.

 On Jul 16, 2014, at 10:25 AM, Sarath Chandra 
 sarathchandra.jos...@algofusiontech.com wrote:

 Yes it is appearing on the Spark UI, and remains there with state
 as RUNNING till I press Ctrl+C in the terminal to kill the execution.

 Barring the statements to create the spark context, if I copy paste
 the lines of my code in spark shell, runs perfectly giving the desired
 output.

 ~Sarath

 On Wed, Jul 16, 2014 at 7:48 PM, Soumya Simanta 
 soumya.sima...@gmail.com wrote:

 When you submit your job, it should appear on the Spark UI. Same
 with the REPL. Make sure you job is submitted to the cluster properly.


 On Wed, Jul 16, 2014 at 10:08 AM, Sarath Chandra 
 sarathchandra.jos...@algofusiontech.com wrote:

 Hi Soumya,

 Data is very small, 500+ lines in each file.

 Removed last 

Re: class after join

2014-07-17 Thread Michael Armbrust
If you intern the string it will be more efficient, but still significantly
more expensive than the class based approach.

** VERY EXPERIMENTAL **
We are working with EPFL on a lightweight syntax for naming the results of
spark transformations in scala (and are going to make it interoperate with
SQL).  Sparse details here: https://github.com/scala-records/scala-records

Stay tuned for more...

Michael


On Thu, Jul 17, 2014 at 4:47 AM, Luis Guerra luispelay...@gmail.com wrote:

 Thank you for your fast reply.

 We are considering this Map[String, String] solution, but there are some
 details that we do not control yet. What would happen if we have different
 data types for different fields? Also, with this solution, we have to
 repeat the field names for every row that we have, is this efficient?

 Regarding the solution with composition, the key would be repeated in the
 new class, whereas it is only necessary once after the join, isn't it?


 On Thu, Jul 17, 2014 at 10:25 AM, Sean Owen so...@cloudera.com wrote:

 If what you have is a large number of named strings, why not use a
 Map[String,String] to represent them? If you're approaching a class
 with 22 String fields anyway, it probably makes more sense. You lose
 a bit of compile-time checking, but gain flexibility.

 Also, merging two Maps to make a new one is pretty simple, compared to
 making many of these values classes.

 (Although, if you otherwise needed a class that represented all of
 the things in class A and class B, this could be done easily with
 composition, a class with an A and a B inside.)

 On Thu, Jul 17, 2014 at 9:15 AM, Luis Guerra luispelay...@gmail.com
 wrote:
  Hi all,
 
  I am a newbie Spark user with many doubts, so sorry if this is a silly
  question.
 
  I am dealing with tabular data formatted as text files, so when I first
 load
  the data, my code is like this:
 
  case class data_class(
V1: String,
V2: String,
V3: String,
V4: String,
V5: String,
V6: String,
V7: String)
 
  val data= sc.textFile(data_path)
.map(x = {
val fields = (x+ ).split(\t)
 
 data_class(fields(0).trim(),fields(1).trim(),fields(2).trim(),fields(3).trim(),
  fields(4).trim(), fields(5).trim(),fields(6).trim())
  })
 
  I am doing this because I would like to access to each position using
 the
  variable name (V1...V7). Is there any other way of doing this?
 
  Also related to this question, if I have data with more than 22
 variables, I
  am restringed to use class instead of case class. However, this kind of
  solution has many restrictions mainly related to getter methods. Is
 there
  any other way of doing this?
 
  And finally, one of my main problems comes after operations of different
  data variables. For instance, if I have two different variables (data1
 and
  data2), and I want to join them both as:
 
  val data3 = data1.keyBy(_.V1).leftOuterJoin(data2.keyBy(_.V1))
 
  Then I have to post process data3 in order to obtain a new class that
  contains those variables from data1 and also those variables from
 data2. As
  data3 is (key, (data1, data2)), do I have to create a new different
 class
  with all these attributes from data1 and data2? This is kind of annoying
  when there are many attributes.
 
  Thanks in advance,
 
  Best





Is there a way to get previous/other keys' state in Spark Streaming?

2014-07-17 Thread Yan Fang
Hi guys,

sure you have similar use case and want to know how you deal with that. In
our application, we want to check the previous state of some keys and
compare with their current state.

AFAIK, Spark Streaming does not have key-value access. So current what I am
doing is storing the previous and current data as one date type in the
state. Call updateStateByKey in every interval and work on the state (have
previous and current data)  of the generated new DStream. But it has
limitations:

1. can not access keys that do appear in this time interval.
2. can not update key A's state from key B's if only key B appears in this
time interval.

Am I doing something wrong? Any suggestions? Thank you.

Cheers,

Fang, Yan
yanfang...@gmail.com
+1 (206) 849-4108


Re: Spark scheduling with Capacity scheduler

2014-07-17 Thread Matei Zaharia
It's possible using the --queue argument of spark-submit. Unfortunately this is 
not documented on http://spark.apache.org/docs/latest/running-on-yarn.html but 
it appears if you just type spark-submit --help or spark-submit with no 
arguments.

Matei

On Jul 17, 2014, at 2:33 AM, Konstantin Kudryavtsev 
kudryavtsev.konstan...@gmail.com wrote:

 Hi all,
 
 I'm using HDP 2.0, YARN. I'm running both MapReduce and Spark jobs on this 
 cluster, is it possible somehow use Capacity scheduler for Spark jobs 
 management as well as MR jobs? I mean, I'm able to send MR job to specific 
 queue, may I do the same with Spark job?
 thank you in advance
 
 Thank you,
 Konstantin Kudryavtsev



Re: Spark scheduling with Capacity scheduler

2014-07-17 Thread Derek Schoettle
unsubscribe



From:   Matei Zaharia matei.zaha...@gmail.com
To: user@spark.apache.org
Date:   07/17/2014 12:41 PM
Subject:Re: Spark scheduling with Capacity scheduler



It's possible using the --queue argument of spark-submit. Unfortunately 
this is not documented on 
http://spark.apache.org/docs/latest/running-on-yarn.html but it appears if 
you just type spark-submit --help or spark-submit with no arguments.

Matei

On Jul 17, 2014, at 2:33 AM, Konstantin Kudryavtsev 
kudryavtsev.konstan...@gmail.com wrote:

 Hi all,
 
 I'm using HDP 2.0, YARN. I'm running both MapReduce and Spark jobs on 
this cluster, is it possible somehow use Capacity scheduler for Spark jobs 
management as well as MR jobs? I mean, I'm able to send MR job to specific 
queue, may I do the same with Spark job?
 thank you in advance
 
 Thank you,
 Konstantin Kudryavtsev




Error while running example/scala application using spark-submit

2014-07-17 Thread ShanxT
Hi,

I am receiving below error while submitting any spark example or scala
application. Really appreciate any help.

spark version = 1.0.0
hadoop version = 2.4.0
Windows/Standalone mode

14/07/17 22:13:19 INFO TaskSchedulerImpl: Cancelling stage 0
Exception in thread main org.apache.spark.SparkException: Job aborted due
to stage failure: Task 0.0:0 failed 4 times, most recent failure: Exception
failure in TID 6 o
n host java.lang.NullPointerException
java.lang.ProcessBuilder.start(ProcessBuilder.java:1012)
org.apache.hadoop.util.Shell.runCommand(Shell.java:445)
org.apache.hadoop.util.Shell.run(Shell.java:418)
   
org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:650)
org.apache.hadoop.fs.FileUtil.chmod(FileUtil.java:873)
org.apache.hadoop.fs.FileUtil.chmod(FileUtil.java:853)
org.apache.spark.util.Utils$.fetchFile(Utils.scala:421)
   
org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:332)
   
org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:330)
   
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
   
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
   
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
   
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
   
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
   
org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$updateDependencies(Executor.scala:330)
   
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:168)
   
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
   
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Exception in thread delete Spark temp dir
C:\Users\~1\AppData\Local\Temp\spark-88e26679-5a8f-4a37-bf02-41f4b2b46d8f
java.io.IOException: Failed to delete: C:\User
s\~1\AppData\Local\Temp\spark-88e26679-5a8f-4a37-bf02-41f4b2b46d8f\jars\spark-examples-1.0.0-hadoop2.4.0.jar
at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:599)
at
org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(Utils.scala:593)
at
org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(Utils.scala:592)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at
scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:592)
at
org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(Utils.scala:593)
at
org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(Utils.scala:592)
at

Need help on Spark UDF (Join) Performance tuning .

2014-07-17 Thread S Malligarjunan
Hello Experts,

I am facing performance problem when I use the UDF function call. Please help 
me to tune the query.
Please find the details below

shark select count(*) from table1;
OK
151096
Time taken: 7.242 seconds
shark select count(*) from table2; 
OK
938
Time taken: 1.273 seconds

Without UDF:
shark SELECT
  count(pvc1.time)
    FROM table2 pvc2 JOIN table1 pvc1
    WHERE pvc1.col1 = pvc2.col2
    AND  unix_timestamp(pvc2.time, '-MM-dd HH:mm:ss,SSS')  
unix_timestamp(pvc1.time, '-MM-dd HH:mm:ss,SSS');
OK
328
Time taken: 200.487 seconds


shark 
  SELECT
  count(pvc1.time)
        FROM table2 pvc2 JOIN table1 pvc1
    WHERE (pvc1.col1 = pvc2.col1 OR pvc1.col1 = pvc2.col2)
    AND  unix_timestamp(pvc2.time, '-MM-dd HH:mm:ss,SSS')  
unix_timestamp(pvc1.time, '-MM-dd HH:mm:ss,SSS');
OK
331
Time taken: 292.86 seconds

With UDF:
shark  
   SELECT
  count(pvc1.time)
    FROM table2 pvc2 JOIN table1 pvc1
    WHERE testCompare(pvc1.col1,pvc1.col2, pvc2.col1,pvc2.col2)
    AND  unix_timestamp(pvc2.time, '-MM-dd HH:mm:ss,SSS')  
unix_timestamp(pvc1.time, '-MM-dd HH:mm:ss,SSS');

OK
331
Time taken: 3718.23 seconds

The above UDF query takes more time to run. 


Where testCompare is an udf function, The function just does the pvc1.col1 = 
pvc2.col1 OR pvc1.col1 = pvc2.col2

Please let me know what is the issue here?

 
Thanks and Regards,
Sankar S.  


Re: can't get jobs to run on cluster (enough memory and cpus are available on worker)

2014-07-17 Thread Marcelo Vanzin
On Wed, Jul 16, 2014 at 12:36 PM, Matt Work Coarr
mattcoarr.w...@gmail.com wrote:
 Thanks Marcelo, I'm not seeing anything in the logs that clearly explains
 what's causing this to break.

 One interesting point that we just discovered is that if we run the driver
 and the slave (worker) on the same host it runs, but if we run the driver on
 a separate host it does not run.

When I meant the executor log, I meant the log of the process launched
by the worker, not the worker. In my CDH-based Spark install, those
end up in /var/run/spark/work.

If you look at your worker log, you'll see it's launching the executor
process. So there should be something there.

Since you say it works when both are run in the same node, that
probably points to some communication issue, since the executor needs
to connect back to the driver. Check to see if you don't have any
firewalls blocking the ports Spark tries to use. (That's one of the
non-resource-related cases that will cause that message.)

-- 
Marcelo


Re: Speeding up K-Means Clustering

2014-07-17 Thread Xiangrui Meng
Please try

val parsedData3 =
data3.repartition(12).map(_.split(\t)).map(_.toDouble).cache()

and check the storage and driver/executor memory in the WebUI. Make
sure the data is fully cached.

-Xiangrui


On Thu, Jul 17, 2014 at 5:09 AM, Ravishankar Rajagopalan
viora...@gmail.com wrote:
 Hi Xiangrui,

 Yes I am using Spark v0.9 and am not running it in local mode.

 I did the memory setting using export SPARK_MEM=4G before starting the
 Spark instance.

 Also previously, I was starting it with -c 1 but changed it to -c 12 since
 it is a 12 core machine. It did bring down the time taken to less than 200
 seconds from over 700 seconds.

 I am not sure how to repartition the data to match the CPU cores. How do I
 do it?

 Thank you.

 Ravi


 On Thu, Jul 17, 2014 at 3:17 PM, Xiangrui Meng men...@gmail.com wrote:

 Is it v0.9? Did you run in local mode? Try to set --driver-memory 4g
 and repartition your data to match number of CPU cores such that the
 data is evenly distributed. You need 1m * 50 * 8 ~ 400MB to storage
 the data. Make sure there are enough memory for caching. -Xiangrui

 On Thu, Jul 17, 2014 at 1:48 AM, Ravishankar Rajagopalan
 viora...@gmail.com wrote:
  I am trying to use MLlib for K-Means clustering on a data set with 1
  million
  rows and 50 columns (all columns have double values) which is on HDFS
  (raw
  txt file is 28 MB)
 
  I initially tried the following:
 
  val data3 = sc.textFile(hdfs://...inputData.txt)
  val parsedData3 = data3.map( _.split('\t').map(_.toDouble))
  val numIterations = 10
  val numClusters = 200
  val clusters = KMeans.train(parsedData3, numClusters, numIterations)
 
  This took me nearly 850 seconds.
 
  I tried using persist with MEMORY_ONLY option hoping that this would
  significantly speed up the algorithm:
 
  val data3 = sc.textFile(hdfs://...inputData.txt)
  val parsedData3 = data3.map( _.split('\t').map(_.toDouble))
  parsedData3.persist(MEMORY_ONLY)
  val numIterations = 10
  val numClusters = 200
  val clusters = KMeans.train(parsedData3, numClusters, numIterations)
 
  This resulted in only a marginal improvement and took around 720
  seconds.
 
  Is there any other way to speed up the algorithm further?
 
  Thank you.
 
  Regards,
  Ravi




Re: Spark Streaming Json file groupby function

2014-07-17 Thread srinivas
hi TD,

  Thanks for the solutions for my previous post...I am running into other
issue..i am getting data from json file and i am trying to parse it and
trying to map it to a record given below 

 val jsonf
=lines.map(JSON.parseFull(_)).map(_.get.asInstanceOf[scala.collection.immutable.Map[Any,Any]]).map(data=Record(data(ID).toString,data(name).toString,data(score).toInt,data(school).toString))



case class Record(ID:String,name:String,score:Int,school:String)


when i am trying to do this i am getting an error

[error]
/home/ubuntu/spark-1.0.0/external/jsonfile2/src/main/scala/jsonfile.scala:36:
value toInt is not a member of Any
[error]
lines.map(JSON.parseFull(_)).map(_.get.asInstanceOf[scala.collection.immutable.Map[Any,Any]]).map(data=Record(data(ID).toString,data(name).toString,data(score).toInt,data(school).toString))
[error]
/home/ubuntu/spark-1.0.0/external/jsonfile2/src/main/scala/jsonfile.scala:36:
value toInt is not a member of Any

I tried giving immutable.Map[Any,Int] and tried converting Int to string my
application compiled but i am getting exception when i am running it 

14/07/17 17:11:30 ERROR Executor: Exception in task ID 6
java.lang.ClassCastException: java.lang.String cannot be cast to
java.lang.Integer
at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:106)

Basically i am trying to do max operation in my sparksql.
please let me know if their any work around solution for this.

Thanks,
-Srinivas.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Json-file-groupby-function-tp9618p10060.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Is there a way to get previous/other keys' state in Spark Streaming?

2014-07-17 Thread Tathagata Das
For accessing previous version, I would do it the same way. :)

1. Can you elaborate on what you mean by that with an example? What do you
mean by accessing keys?

2. Yeah, that is hard to do with the ability to do point lookups into an
RDD, which we dont support yet. You could try embedding the related key in
the values of the keys that need it. That is, B will is present in the
value of key A. Then put this transformed DStream through updateStateByKey.

TD


Re: Spark Streaming Json file groupby function

2014-07-17 Thread Tathagata Das
This is a basic scala problem. You cannot apply toInt to Any. Try doing
toString.toInt

For such scala issues, I recommend trying it out in the Scala shell. For
example, you could have tried this out as the following.

[tdas @ Xion streaming] scala
Welcome to Scala version 2.10.3 (Java HotSpot(TM) 64-Bit Server VM, Java
1.7.0_45).
Type in expressions to have them evaluated.
Type :help for more information.

scala 12.asInstanceOf[Any].toInt
console:8: error: value toInt is not a member of Any
  12.asInstanceOf[Any].toInt
 ^

scala 12.asInstanceOf[Any].toString.toInt
res1: Int = 12

scala



On Thu, Jul 17, 2014 at 10:32 AM, srinivas kusamsrini...@gmail.com wrote:

 hi TD,

   Thanks for the solutions for my previous post...I am running into other
 issue..i am getting data from json file and i am trying to parse it and
 trying to map it to a record given below

  val jsonf

 =lines.map(JSON.parseFull(_)).map(_.get.asInstanceOf[scala.collection.immutable.Map[Any,Any]]).map(data=Record(data(ID).toString,data(name).toString,data(score).toInt,data(school).toString))



 case class Record(ID:String,name:String,score:Int,school:String)


 when i am trying to do this i am getting an error

 [error]

 /home/ubuntu/spark-1.0.0/external/jsonfile2/src/main/scala/jsonfile.scala:36:
 value toInt is not a member of Any
 [error]

 lines.map(JSON.parseFull(_)).map(_.get.asInstanceOf[scala.collection.immutable.Map[Any,Any]]).map(data=Record(data(ID).toString,data(name).toString,data(score).toInt,data(school).toString))
 [error]

 /home/ubuntu/spark-1.0.0/external/jsonfile2/src/main/scala/jsonfile.scala:36:
 value toInt is not a member of Any

 I tried giving immutable.Map[Any,Int] and tried converting Int to string my
 application compiled but i am getting exception when i am running it

 14/07/17 17:11:30 ERROR Executor: Exception in task ID 6
 java.lang.ClassCastException: java.lang.String cannot be cast to
 java.lang.Integer
 at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:106)

 Basically i am trying to do max operation in my sparksql.
 please let me know if their any work around solution for this.

 Thanks,
 -Srinivas.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Json-file-groupby-function-tp9618p10060.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Error while running example/scala application using spark-submit

2014-07-17 Thread ShanxT
Thanks Sean,

1) Yes, I am trying to run locally without Hadoop.
2) I also see the error in the provided link while launching spark-shell but
post launch I am able to execute same code I have in the sample application.
Read any local file and perform some reduction operation. But not through
submit command.





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Error-while-running-example-scala-application-using-spark-submit-tp10056p10064.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark Streaming timing considerations

2014-07-17 Thread Tathagata Das
You have to define what is the range records that needs to be filtered out
in every windowed RDD, right? For example, when the DStream.window has data
from from times 0 - 8 seconds by DStream time, you only want to filter out
data that falls into say 4 - 8 seconds by application time. This latter is
the application-level time window that you need to define in the transform
function. What may help is that there is another version of transform which
allows you to get the current DStream time (that is, it will give the value
8) from which you can calculate the app-time-window 4 - 8.


val transformed = keyAndValues.window(Seconds(8),
Seconds(4)).transform((windowedRDD:
RDD[...], dstreamTime: Time) = {
 val currentAppTimeWindowStart = dstreamTime - appTimeWindowSize
   // define the window over the timestamp that you want to process
 val currentAppTimeWindowEnd = dstreamTime
 val filteredRDD = windowedRDD.filter(r = r._1 = currentAppTimeWindowEnd
 r._1  currentAppTimeWindowStart) // filter and retain only the
records that fall in the current app-time window
 return filteredRDD
 })

Hope this helps!

TD


On Thu, Jul 17, 2014 at 5:54 AM, Laeeq Ahmed laeeqsp...@yahoo.com wrote:

 Hi TD,

 I have been able to filter the first WindowedRDD, but I am not sure how
 to make a generic filter. The larger window is 8 seconds and want to
 fetch 4 second based on application-time-stamp. I have seen an earlier post
 which suggest timeStampBasedwindow but I am not sure how to make
 timestampBasedwindow in the following example.

  val transformed = keyAndValues.window(Seconds(8),
 Seconds(4)).transform(windowedRDD = {
  //val timeStampBasedWindow = ???// define the window
 over the timestamp that you want to process
  val filteredRDD = windowedRDD.filter(_._1  4) // filter and retain
 only the records that fall in the timestamp-based window
  return filteredRDD
  })

 Consider the input tuples as (1,23),(1.2,34) . . . . . (3.8,54)(4,413) . .
 .  whereas key is the timestamp.

 Regards,
 Laeeq




   On Saturday, July 12, 2014 8:29 PM, Laeeq Ahmed laeeqsp...@yahoo.com
 wrote:


 Hi,
 Thanks I will try to implement it.

 Regards,
 Laeeq



   On Saturday, July 12, 2014 4:37 AM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:


 This is not in the current streaming API.

 Queue stream is useful for testing with generated RDDs, but not for actual
 data. For actual data stream, the slack time can be implemented by doing
 DStream.window on a larger window that take slack time in consideration,
 and then the required application-time-based-window of data filtered out.
 For example, if you want a slack time of 1 minute and batches of 10
 seconds, then do a window operation of 70 seconds, then in each RDD filter
 out the records with the desired application time and process them.

 TD


 On Fri, Jul 11, 2014 at 7:44 AM, Laeeq Ahmed laeeqsp...@yahoo.com wrote:

 Hi,

 In the spark streaming paper, slack time has been suggested for delaying
 the batch creation in case of external timestamps. I don't see any such
 option in streamingcontext. Is it available in the API?

  Also going through the previous posts, queueStream has been suggested for
 this. I looked into to queueStream example.

  // Create and push some RDDs into Queue
 for (i - 1 to 30) {
 rddQueue += ssc.sparkContext.makeRDD(1 to 10)
 Thread.sleep(1000)
 }

 The only thing I am unsure is how to make batches(basic RDD) out of stream
 coming on a port.

 Regards,
 Laeeq










Re: Error while running example/scala application using spark-submit

2014-07-17 Thread Stephen Boesch
Hi Sean
 RE: Windows and hadoop 2.4.x

HortonWorks - all the hype aside - only supports Windows Server 2008/2012.
So this general concept of supporting Windows is bunk.

Given that - and since the vast majority of Windows users do not happen to
have Windows Server on their laptop - do you have any further insight into
what it means to say that hadoop 2.4.x supports Windows ?   Are you
referring to cygwin support?



2014-07-17 11:13 GMT-07:00 Sean Owen so...@cloudera.com:

 I imagine the issue is ultimately combination of Windows and (stock?)
 Apache Hadoop. I know that in the past, operations like 'chmod' didn't
 work on Windows since it assumed the existence of POSIX binaries. That
 should be long since patched up for 2.4.x but there may be a gotcha
 here that others can comment on.

 Do I understand that you're trying to run entirely locally, without
 Hadoop at all?
 Then I think this sounds like
 https://issues.apache.org/jira/browse/SPARK-2356 which does deserve
 attention. The Hadoop APIs get tickled even when they're not used, and
 this can cause some initialization gotchas on Windows in particular.

 On Thu, Jul 17, 2014 at 6:16 PM, ShanxT mail4.shash...@gmail.com wrote:
  Hi,
 
  I am receiving below error while submitting any spark example or scala
  application. Really appreciate any help.
 
  spark version = 1.0.0
  hadoop version = 2.4.0
  Windows/Standalone mode
 
  14/07/17 22:13:19 INFO TaskSchedulerImpl: Cancelling stage 0
  Exception in thread main org.apache.spark.SparkException: Job aborted
 due
  to stage failure: Task 0.0:0 failed 4 times, most recent failure:
 Exception
  failure in TID 6 o
  n host java.lang.NullPointerException
  java.lang.ProcessBuilder.start(ProcessBuilder.java:1012)
  org.apache.hadoop.util.Shell.runCommand(Shell.java:445)
  org.apache.hadoop.util.Shell.run(Shell.java:418)
 
  org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:650)
  org.apache.hadoop.fs.FileUtil.chmod(FileUtil.java:873)
  org.apache.hadoop.fs.FileUtil.chmod(FileUtil.java:853)
  org.apache.spark.util.Utils$.fetchFile(Utils.scala:421)
 
 
 org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:332)
 
 
 org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:330)
 
 
 scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
 
 
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
 
 
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
 
 
 scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
  scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
  scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
 
 
 scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
 
  org.apache.spark.executor.Executor.org
 $apache$spark$executor$Executor$$updateDependencies(Executor.scala:330)
 
  org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:168)
 
 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 
 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  java.lang.Thread.run(Thread.java:745)
  Driver stacktrace:
  at
  org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
  at
 
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
  at
 
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
  at
 
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at
  scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
  at
 
 org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
  at
 
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
  at
 
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
  at scala.Option.foreach(Option.scala:236)
  at
 
 org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633)
  at
 
 org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207)
  at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
  at akka.actor.ActorCell.invoke(ActorCell.scala:456)
  at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
  at akka.dispatch.Mailbox.run(Mailbox.scala:219)
  at
 
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
  at
  

Re: GraphX Pragel implementation

2014-07-17 Thread Ankur Dave
If your sendMsg function needs to know the incoming messages as well as the
vertex value, you could define VD to be a tuple of the vertex value and the
last received message. The vprog function would then store the incoming
messages into the tuple, allowing sendMsg to access them.

For example, if the vertex value was a String and the message type was an
Int, you could call Pregel as follows:

val graph: Graph[String, _] = ...

graph.mapVertices((id, attr) = (attr, 0)).pregel(0)(
  (id, attr: (String, Int), msg: Int) = (attr._1, msg),
  edge = Iterator(...), // can use edge.srcAttr._2 and
edge.dstAttr._2 to access the messages  (a: Int, b: Int) = a + b)


Ankur http://www.ankurdave.com/


Custom Metrics Sink

2014-07-17 Thread jjaffe
What is the preferred way of adding a custom metrics sink to Spark? I noticed
that the Sink Trait has been private since April, so I cannot simply extend
Sink in an outside package, but I would like to avoid having to create a
custom build of Spark. Is this possible?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Custom-Metrics-Sink-tp10068.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Error while running example/scala application using spark-submit

2014-07-17 Thread Sean Owen
I am probably the wrong person to ask as I never use Hadoop on Windows. But
from looking at the code just now it is clearly trying to accommodate
Windows shell commands. Yes I would not be surprised if it still needs
Cygwin.

A slightly broader point is that ideally it doesnt matter whether Hadoop
works on your platform if using Spark locally without Hadoop. I don't know
how feasible it is to separate but there may be some tweaks to avoid
initializing Hadoop in more cases. See the JIRA.
On Jul 17, 2014 7:52 PM, Stephen Boesch java...@gmail.com wrote:

 Hi Sean
  RE: Windows and hadoop 2.4.x

 HortonWorks - all the hype aside - only supports Windows Server 2008/2012.
 So this general concept of supporting Windows is bunk.

 Given that - and since the vast majority of Windows users do not happen to
 have Windows Server on their laptop - do you have any further insight into
 what it means to say that hadoop 2.4.x supports Windows ?   Are you
 referring to cygwin support?



 2014-07-17 11:13 GMT-07:00 Sean Owen so...@cloudera.com:

 I imagine the issue is ultimately combination of Windows and (stock?)
 Apache Hadoop. I know that in the past, operations like 'chmod' didn't
 work on Windows since it assumed the existence of POSIX binaries. That
 should be long since patched up for 2.4.x but there may be a gotcha
 here that others can comment on.

 Do I understand that you're trying to run entirely locally, without
 Hadoop at all?
 Then I think this sounds like
 https://issues.apache.org/jira/browse/SPARK-2356 which does deserve
 attention. The Hadoop APIs get tickled even when they're not used, and
 this can cause some initialization gotchas on Windows in particular.

 On Thu, Jul 17, 2014 at 6:16 PM, ShanxT mail4.shash...@gmail.com wrote:
  Hi,
 
  I am receiving below error while submitting any spark example or scala
  application. Really appreciate any help.
 
  spark version = 1.0.0
  hadoop version = 2.4.0
  Windows/Standalone mode
 
  14/07/17 22:13:19 INFO TaskSchedulerImpl: Cancelling stage 0
  Exception in thread main org.apache.spark.SparkException: Job aborted
 due
  to stage failure: Task 0.0:0 failed 4 times, most recent failure:
 Exception
  failure in TID 6 o
  n host java.lang.NullPointerException
  java.lang.ProcessBuilder.start(ProcessBuilder.java:1012)
  org.apache.hadoop.util.Shell.runCommand(Shell.java:445)
  org.apache.hadoop.util.Shell.run(Shell.java:418)
 
 
 org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:650)
  org.apache.hadoop.fs.FileUtil.chmod(FileUtil.java:873)
  org.apache.hadoop.fs.FileUtil.chmod(FileUtil.java:853)
  org.apache.spark.util.Utils$.fetchFile(Utils.scala:421)
 
 
 org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:332)
 
 
 org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:330)
 
 
 scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
 
 
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
 
 
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
 
 
 scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
  scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
  scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
 
 
 scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
 
  org.apache.spark.executor.Executor.org
 $apache$spark$executor$Executor$$updateDependencies(Executor.scala:330)
 
  org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:168)
 
 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 
 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  java.lang.Thread.run(Thread.java:745)
  Driver stacktrace:
  at
  org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
  at
 
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
  at
 
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
  at
 
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at
  scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
  at
 
 org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
  at
 
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
  at
 
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
  at scala.Option.foreach(Option.scala:236)
  at
 
 

Re: Is there a way to get previous/other keys' state in Spark Streaming?

2014-07-17 Thread Yan Fang
Hi TD,

Thank you for the quick replying and backing my approach. :)

1) The example is this:

1. In the first 2 second interval, after updateStateByKey, I get a few keys
and their states, say, (a - 1, b - 2, c - 3)
2. In the following 2 second interval, I only receive c and d and their
value. But I want to update/display the state of a and b accordingly.
* It seems I have no way to access the a and b and get their states.
* also, do I have a way to show all the existing states?

I guess the approach to solve this will be similar to what you mentioned
for 2). But the difficulty is that, if I want to display all the existing
states, need to bundle all the rest keys to one key.

Thank you.

Cheers,

Fang, Yan
yanfang...@gmail.com
+1 (206) 849-4108


On Thu, Jul 17, 2014 at 11:36 AM, Tathagata Das tathagata.das1...@gmail.com
 wrote:

 For accessing previous version, I would do it the same way. :)

 1. Can you elaborate on what you mean by that with an example? What do you
 mean by accessing keys?

 2. Yeah, that is hard to do with the ability to do point lookups into an
 RDD, which we dont support yet. You could try embedding the related key in
 the values of the keys that need it. That is, B will is present in the
 value of key A. Then put this transformed DStream through updateStateByKey.

 TD



Re: Difference among batchDuration, windowDuration, slideDuration

2014-07-17 Thread hsy...@gmail.com
Thanks Tathagata, so can I say RDD size(from the stream) is window size.
and the overlap between 2 adjacent RDDs are sliding size.

But I still don't understand what it batch size, why do we need this since
data processing is RDD by RDD right?

And does spark chop the data into RDDs at the very beginning? Do you allow
event by event processing, for example filtering




On Wed, Jul 16, 2014 at 6:47 PM, Tathagata Das tathagata.das1...@gmail.com
wrote:

 I guess this is better explained in the streaming programming guide's
 http://spark.apache.org/docs/latest/streaming-programming-guide.html#transformations
 window operation subsection.

 For completeness sake, its worth mentioning the following. Window
 operations can be applied on other windowed-DStreams as well. So the
 correct thing to say is that the slide duration of the window operations
 must be a multiple of sliding interval of the parent DStream. For simple,
 non-window dstream, this sliding interval is same as the batch interval

 // say batch interval is 2 seconds
 inputstream// moves every batch interval 2
 seconds
 inputstream.window(Seconds(3))  // not allowed, must be multiple of 2
 seconds
 inputstream.window(Seconds(4))  // allowed, moves every 2 seconds
 (therefore sliding interval is 2 seconds)
 inputstream.window(Seconds(10), Seconds(4))// allowed, moves every 4
 seconds (therefore sliding interval is 4 seconds)
 inputstream.window(Seconds(10), Seconds(4)).window(Seconds(6))// not
 allowed, as window interval must be multiple of parent's sliding interval
 which is 4 seconds
 inputstream.window(Seconds(10), Seconds(4)).window(Seconds(8))//
 allowed

 Hopefully that made sense :)

 TD




 On Wed, Jul 16, 2014 at 12:41 PM, Walrus theCat walrusthe...@gmail.com
 wrote:

 I did not!


 On Wed, Jul 16, 2014 at 12:31 PM, aaronjosephs aa...@placeiq.com wrote:

 The only other thing to keep in mind is that window duration and slide
 duration have to be multiples of batch duration, IDK if you made that
 fully
 clear



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Difference-among-batchDuration-windowDuration-slideDuration-tp9966p9973.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.






Re: Error: No space left on device

2014-07-17 Thread Chris DuBois
Hi Xiangrui,

Thanks. I have taken your advice and set all 5 of my slaves to be
c3.4xlarge. In this case /mnt and /mnt2 have plenty of space by default. I
now do sc.textFile(blah).repartition(N).map(...).cache() with N=80 and
spark.executor.memory to be 20gb and --driver-memory 20g. So far things
seem more stable.

Thanks for the help,
Chris


On Thu, Jul 17, 2014 at 12:56 AM, Xiangrui Meng men...@gmail.com wrote:

 Set N be the total number of cores on the cluster or less. sc.textFile
 doesn't always give you that number, depends on the block size. For
 MovieLens, I think the default behavior should be 2~3 partitions. You
 need to call repartition to ensure the right number of partitions.

 Which EC2 instance type did you use? I usually use m3.2xlarge or c?
 instances that come with SSD and 1G or 10G network. For those
 instances, you should see local drives mounted at /mnt, /mnt2, /mnt3,
 ... Make sure there was no error when you used the ec2 script to
 launch the cluster.

 It is a little strange to see 94% of / was used on a slave. Maybe
 shuffle data went to /. I'm not sure which settings went wrong. I
 recommend trying re-launching a cluster with m3.2xlarge instances and
 using the default settings (don't set anything in SparkConf). Submit
 the application with --driver-memory 20g.

 The running times are slower than what I remember, but it depends on
 the instance type.

 Best,
 Xiangrui



 On Wed, Jul 16, 2014 at 10:18 PM, Chris DuBois chris.dub...@gmail.com
 wrote:
  Hi Xiangrui,
 
  I will try this shortly. When using N partitions, do you recommend N be
 the
  number of cores on each slave or the number of cores on the master?
 Forgive
  my ignorance, but is this best achieved as an argument to sc.textFile?
 
  The slaves on the EC2 clusters start with only 8gb of storage, and it
  doesn't seem that /mnt/spark and /mnt2/spark are mounted anywhere else by
  default. Looking at spark-ec2/setup-slaves.sh, it appears that these are
  only mounted if the instance type begins with r3. (Or am I not reading
 that
  right?) My slaves are a different instance type, and currently look like
  this:
  FilesystemSize  Used Avail Use% Mounted on
  /dev/xvda17.9G  7.3G  515M  94% /
  tmpfs 7.4G  4.0K  7.4G   1% /dev/shm
  /dev/xvdv 500G  2.5G  498G   1% /vol
 
  I have been able to finish ALS on MovieLens 10M only twice, taking 221s
 and
  315s for 20 iterations at K=20 and lambda=0.01. Does that timing sound
 about
  right, or does it point to a poor configuration? The same script with
  MovieLens 1M runs fine in about 30-40s with the same settings. (In both
  cases I'm training on 70% of the data.)
 
  Thanks for your help!
  Chris
 
 
  On Wed, Jul 16, 2014 at 4:29 PM, Xiangrui Meng men...@gmail.com wrote:
 
  For ALS, I would recommend repartitioning the ratings to match the
  number of CPU cores or even less. ALS is not computation heavy for
  small k but communication heavy. Having small number of partitions may
  help. For EC2 clusters, we use /mnt/spark and /mnt2/spark as the
  default local directory because they are local hard drives. Did your
  last run of ALS on MovieLens 10M-100K with the default settings
  succeed? -Xiangrui
 
  On Wed, Jul 16, 2014 at 8:00 AM, Chris DuBois chris.dub...@gmail.com
  wrote:
   Hi Xiangrui,
  
   I accidentally did not send df -i for the master node. Here it is at
 the
   moment of failure:
  
   FilesystemInodes   IUsed   IFree IUse% Mounted on
   /dev/xvda1524288  280938  243350   54% /
   tmpfs3845409   1 38454081% /dev/shm
   /dev/xvdb100024321027 100014051% /mnt
   /dev/xvdf10002432  16 100024161% /mnt2
   /dev/xvdv524288000  13 5242879871% /vol
  
   I am using default settings now, but is there a way to make sure that
   the
   proper directories are being used? How many blocks/partitions do you
   recommend?
  
   Chris
  
  
   On Wed, Jul 16, 2014 at 1:09 AM, Chris DuBois chris.dub...@gmail.com
 
   wrote:
  
   Hi Xiangrui,
  
   Here is the result on the master node:
   $ df -i
   FilesystemInodes   IUsed   IFree IUse% Mounted on
   /dev/xvda1524288  273997  250291   53% /
   tmpfs1917974   1 19179731% /dev/shm
   /dev/xvdv524288000  30 5242879701% /vol
  
   I have reproduced the error while using the MovieLens 10M data set
 on a
   newly created cluster.
  
   Thanks for the help.
   Chris
  
  
   On Wed, Jul 16, 2014 at 12:22 AM, Xiangrui Meng men...@gmail.com
   wrote:
  
   Hi Chris,
  
   Could you also try `df -i` on the master node? How many
   blocks/partitions did you set?
  
   In the current implementation, ALS doesn't clean the shuffle data
   because the operations are chained together. But it shouldn't run
 out
   of disk space on the MovieLens dataset, which is small. spark-ec2
   script sets /mnt/spark and 

Re: using multiple dstreams together (spark streaming)

2014-07-17 Thread Walrus theCat
Thanks!


On Wed, Jul 16, 2014 at 6:34 PM, Tathagata Das tathagata.das1...@gmail.com
wrote:

 Have you taken a look at DStream.transformWith( ... ) . That allows you
 apply arbitrary transformation between RDDs (of the same timestamp) of two
 different streams.

 So you can do something like this.

 2s-window-stream.transformWith(1s-window-stream, (rdd1: RDD[...], rdd2:
 RDD[...]) = {
  ...
   // return a new RDD
 })


 And streamingContext.transform() extends it to N DStreams. :)

 Hope this helps!

 TD




 On Wed, Jul 16, 2014 at 10:42 AM, Walrus theCat walrusthe...@gmail.com
 wrote:

 hey at least it's something (thanks!) ... not sure what i'm going to do
 if i can't find a solution (other than not use spark) as i really need
 these capabilities.  anyone got anything else?


 On Wed, Jul 16, 2014 at 10:34 AM, Luis Ángel Vicente Sánchez 
 langel.gro...@gmail.com wrote:

 hum... maybe consuming all streams at the same time with an actor that
 would act as a new DStream source... but this is just a random idea... I
 don't really know if that would be a good idea or even possible.


 2014-07-16 18:30 GMT+01:00 Walrus theCat walrusthe...@gmail.com:

 Yeah -- I tried the .union operation and it didn't work for that
 reason.  Surely there has to be a way to do this, as I imagine this is a
 commonly desired goal in streaming applications?


 On Wed, Jul 16, 2014 at 10:10 AM, Luis Ángel Vicente Sánchez 
 langel.gro...@gmail.com wrote:

 I'm joining several kafka dstreams using the join operation but you
 have the limitation that the duration of the batch has to be same,i.e. 1
 second window for all dstreams... so it would not work for you.


 2014-07-16 18:08 GMT+01:00 Walrus theCat walrusthe...@gmail.com:

 Hi,

 My application has multiple dstreams on the same inputstream:

 dstream1 // 1 second window
 dstream2 // 2 second window
 dstream3 // 5 minute window


 I want to write logic that deals with all three windows (e.g. when
 the 1 second window differs from the 2 second window by some delta ...)

 I've found some examples online (there's not much out there!), and I
 can only see people transforming a single dstream.  In conventional 
 spark,
 we'd do this sort of thing with a cartesian on RDDs.

 How can I deal with multiple Dstreams at once?

 Thanks









Re: Supported SQL syntax in Spark SQL

2014-07-17 Thread Nicholas Chammas
FYI: I've created SPARK-2560
https://issues.apache.org/jira/browse/SPARK-2560 to track creating SQL
reference docs for Spark SQL.


On Mon, Jul 14, 2014 at 2:06 PM, Michael Armbrust mich...@databricks.com
wrote:

 You can find the parser here:
 https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala

 In general the hive parser provided by HQL is much more complete at the
 moment.  Long term we will likely stop using parser combinators and either
 write a more complete parser, or adopt one from an existing project.


 On Mon, Jul 14, 2014 at 12:25 AM, Martin Gammelsæter 
 martingammelsae...@gmail.com wrote:

 I am very interested in the original question as well, is there any
 list (even if it is simply in the code) of all supported syntax for
 Spark SQL?

 On Mon, Jul 14, 2014 at 6:41 AM, Nicholas Chammas
 nicholas.cham...@gmail.com wrote:
  Are you sure the code running on the cluster has been updated?
 
  I launched the cluster using spark-ec2 from the 1.0.1 release, so I’m
  assuming that’s taken care of, at least in theory.
 
  I just spun down the clusters I had up, but I will revisit this
 tomorrow and
  provide the information you requested.
 
  Nick



 --
 Mvh.
 Martin Gammelsæter
 92209139





replacement for SPARK_LIBRARY_PATH ?

2014-07-17 Thread Eric Friedman
I used to use SPARK_LIBRARY_PATH to specify the location of native libs
for lzo compression when using spark 0.9.0.

The references to that environment variable have disappeared from the docs
for
spark 1.0.1 and it's not clear how to specify the location for lzo.

Any guidance?


Re: Error: No space left on device

2014-07-17 Thread Bill Jay
Hi,

I also have some issues with repartition. In my program, I consume data
from Kafka. After I consume data, I use repartition(N). However, although I
set N to be 120, there are around 18 executors allocated for my reduce
stage. I am not sure how the repartition command works ton ensure the
parallelism.

Bill


On Thu, Jul 17, 2014 at 12:56 AM, Xiangrui Meng men...@gmail.com wrote:

 Set N be the total number of cores on the cluster or less. sc.textFile
 doesn't always give you that number, depends on the block size. For
 MovieLens, I think the default behavior should be 2~3 partitions. You
 need to call repartition to ensure the right number of partitions.

 Which EC2 instance type did you use? I usually use m3.2xlarge or c?
 instances that come with SSD and 1G or 10G network. For those
 instances, you should see local drives mounted at /mnt, /mnt2, /mnt3,
 ... Make sure there was no error when you used the ec2 script to
 launch the cluster.

 It is a little strange to see 94% of / was used on a slave. Maybe
 shuffle data went to /. I'm not sure which settings went wrong. I
 recommend trying re-launching a cluster with m3.2xlarge instances and
 using the default settings (don't set anything in SparkConf). Submit
 the application with --driver-memory 20g.

 The running times are slower than what I remember, but it depends on
 the instance type.

 Best,
 Xiangrui



 On Wed, Jul 16, 2014 at 10:18 PM, Chris DuBois chris.dub...@gmail.com
 wrote:
  Hi Xiangrui,
 
  I will try this shortly. When using N partitions, do you recommend N be
 the
  number of cores on each slave or the number of cores on the master?
 Forgive
  my ignorance, but is this best achieved as an argument to sc.textFile?
 
  The slaves on the EC2 clusters start with only 8gb of storage, and it
  doesn't seem that /mnt/spark and /mnt2/spark are mounted anywhere else by
  default. Looking at spark-ec2/setup-slaves.sh, it appears that these are
  only mounted if the instance type begins with r3. (Or am I not reading
 that
  right?) My slaves are a different instance type, and currently look like
  this:
  FilesystemSize  Used Avail Use% Mounted on
  /dev/xvda17.9G  7.3G  515M  94% /
  tmpfs 7.4G  4.0K  7.4G   1% /dev/shm
  /dev/xvdv 500G  2.5G  498G   1% /vol
 
  I have been able to finish ALS on MovieLens 10M only twice, taking 221s
 and
  315s for 20 iterations at K=20 and lambda=0.01. Does that timing sound
 about
  right, or does it point to a poor configuration? The same script with
  MovieLens 1M runs fine in about 30-40s with the same settings. (In both
  cases I'm training on 70% of the data.)
 
  Thanks for your help!
  Chris
 
 
  On Wed, Jul 16, 2014 at 4:29 PM, Xiangrui Meng men...@gmail.com wrote:
 
  For ALS, I would recommend repartitioning the ratings to match the
  number of CPU cores or even less. ALS is not computation heavy for
  small k but communication heavy. Having small number of partitions may
  help. For EC2 clusters, we use /mnt/spark and /mnt2/spark as the
  default local directory because they are local hard drives. Did your
  last run of ALS on MovieLens 10M-100K with the default settings
  succeed? -Xiangrui
 
  On Wed, Jul 16, 2014 at 8:00 AM, Chris DuBois chris.dub...@gmail.com
  wrote:
   Hi Xiangrui,
  
   I accidentally did not send df -i for the master node. Here it is at
 the
   moment of failure:
  
   FilesystemInodes   IUsed   IFree IUse% Mounted on
   /dev/xvda1524288  280938  243350   54% /
   tmpfs3845409   1 38454081% /dev/shm
   /dev/xvdb100024321027 100014051% /mnt
   /dev/xvdf10002432  16 100024161% /mnt2
   /dev/xvdv524288000  13 5242879871% /vol
  
   I am using default settings now, but is there a way to make sure that
   the
   proper directories are being used? How many blocks/partitions do you
   recommend?
  
   Chris
  
  
   On Wed, Jul 16, 2014 at 1:09 AM, Chris DuBois chris.dub...@gmail.com
 
   wrote:
  
   Hi Xiangrui,
  
   Here is the result on the master node:
   $ df -i
   FilesystemInodes   IUsed   IFree IUse% Mounted on
   /dev/xvda1524288  273997  250291   53% /
   tmpfs1917974   1 19179731% /dev/shm
   /dev/xvdv524288000  30 5242879701% /vol
  
   I have reproduced the error while using the MovieLens 10M data set
 on a
   newly created cluster.
  
   Thanks for the help.
   Chris
  
  
   On Wed, Jul 16, 2014 at 12:22 AM, Xiangrui Meng men...@gmail.com
   wrote:
  
   Hi Chris,
  
   Could you also try `df -i` on the master node? How many
   blocks/partitions did you set?
  
   In the current implementation, ALS doesn't clean the shuffle data
   because the operations are chained together. But it shouldn't run
 out
   of disk space on the MovieLens dataset, which is small. spark-ec2
   script sets /mnt/spark and /mnt/spark2 as the local.dir by 

Re: can't get jobs to run on cluster (enough memory and cpus are available on worker)

2014-07-17 Thread Matt Work Coarr
Thanks Marcelo!  This is a huge help!!

Looking at the executor logs (in a vanilla spark install, I'm finding them
in $SPARK_HOME/work/*)...

It launches the executor, but it looks like the
CoarseGrainedExecutorBackend is having trouble talking to the driver
(exactly what you said!!!).

Do you know what the range of random ports that is used for the the
executor-to-driver?  Is that range adjustable?  Any config setting or
environment variable?

I manually setup my ec2 security group to include all the ports that the
spark ec2 script ($SPARK_HOME/ec2/spark_ec2.py) sets up in it's security
groups.  They included (for those listed above 1):
1
50060
50070
50075
60060
60070
60075

Obviously I'll need to make some adjustments to my EC2 security group!
 Just need to figure out exactly what should be in there.  To keep things
simple, I just have one security group for the master, slaves, and the
driver machine.

In listing the port ranges in my current security group I looked at the
ports that spark_ec2.py sets up as well as the ports listed in the spark
standalone mode documentation page under configuring ports for network
security:

http://spark.apache.org/docs/latest/spark-standalone.html


Here are the relevant fragments from the executor log:

Spark Executor Command: /cask/jdk/bin/java -cp
::/cask/spark/conf:/cask/spark/lib/spark-assembly-1.0.0-hadoop2.2.0.jar:/cask/spark/lib/datanucleus-api-jdo-3.

2.1.jar:/cask/spark/lib/datanucleus-rdbms-3.2.1.jar:/cask/spark/lib/datanucleus-core-3.2.2.jar
-XX:MaxPermSize=128m -Dspark.akka.frameSize=100 -Dspark.akka.

frameSize=100 -Xms512M -Xmx512M
org.apache.spark.executor.CoarseGrainedExecutorBackend
akka.tcp://spark@ip-10-202-11-191.ec2.internal:46787/user/CoarseGra

inedScheduler 0 ip-10-202-8-45.ec2.internal 8
akka.tcp://sparkWorker@ip-10-202-8-45.ec2.internal:7101/user/Worker
app-20140717195146-


...

14/07/17 19:51:47 DEBUG NativeCodeLoader: Trying to load the custom-built
native-hadoop library...

14/07/17 19:51:47 DEBUG NativeCodeLoader: Failed to load native-hadoop with
error: java.lang.UnsatisfiedLinkError: no hadoop in java.library.path

14/07/17 19:51:47 DEBUG NativeCodeLoader:
java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib

14/07/17 19:51:47 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable

14/07/17 19:51:47 DEBUG JniBasedUnixGroupsMappingWithFallback: Falling back
to shell based

14/07/17 19:51:47 DEBUG JniBasedUnixGroupsMappingWithFallback: Group
mapping impl=org.apache.hadoop.security.ShellBasedUnixGroupsMapping

14/07/17 19:51:48 DEBUG Groups: Group mapping
impl=org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback;
cacheTimeout=30

14/07/17 19:51:48 DEBUG SparkHadoopUtil: running as user: ec2-user

...


14/07/17 19:51:48 INFO CoarseGrainedExecutorBackend: Connecting to driver:
akka.tcp://spark@ip-10-202-11-191.ec2.internal
:46787/user/CoarseGrainedScheduler

14/07/17 19:51:48 INFO WorkerWatcher: Connecting to worker
akka.tcp://sparkWorker@ip-10-202-8-45.ec2.internal:7101/user/Worker

14/07/17 19:51:49 INFO WorkerWatcher: Successfully connected to
akka.tcp://sparkWorker@ip-10-202-8-45.ec2.internal:7101/user/Worker

14/07/17 19:53:29 ERROR CoarseGrainedExecutorBackend: Driver Disassociated
[akka.tcp://sparkExecutor@ip-10-202-8-45.ec2.internal:55670] -
[akka.tcp://spark@ip-10-202-11-191.ec2.internal:46787] disassociated!
Shutting down.


Thanks a bunch!
Matt


On Thu, Jul 17, 2014 at 1:21 PM, Marcelo Vanzin van...@cloudera.com wrote:

 When I meant the executor log, I meant the log of the process launched
 by the worker, not the worker. In my CDH-based Spark install, those
 end up in /var/run/spark/work.

 If you look at your worker log, you'll see it's launching the executor
 process. So there should be something there.

 Since you say it works when both are run in the same node, that
 probably points to some communication issue, since the executor needs
 to connect back to the driver. Check to see if you don't have any
 firewalls blocking the ports Spark tries to use. (That's one of the
 non-resource-related cases that will cause that message.)



Re: replacement for SPARK_LIBRARY_PATH ?

2014-07-17 Thread Zongheng Yang
One way is to set this in your conf/spark-defaults.conf:

spark.executor.extraLibraryPath /path/to/native/lib

The key is documented here:
http://spark.apache.org/docs/latest/configuration.html

On Thu, Jul 17, 2014 at 1:25 PM, Eric Friedman
eric.d.fried...@gmail.com wrote:
 I used to use SPARK_LIBRARY_PATH to specify the location of native libs
 for lzo compression when using spark 0.9.0.

 The references to that environment variable have disappeared from the docs
 for
 spark 1.0.1 and it's not clear how to specify the location for lzo.

 Any guidance?


unserializable object in Spark Streaming context

2014-07-17 Thread Yan Fang
Hi guys,

need some help in this problem. In our use case, we need to continuously
insert values into the database. So our approach is to create the jdbc
object in the main method and then do the inserting operation in the
DStream foreachRDD operation. Is this approach reasonable?

Then the problem comes: since we are using com.mysql.jdbc.java, which is
unserializable, we keep seeing the notSerializableException. I think that
is because Spark Streaming is trying to serialize and then checkpoint the
whole class which contains the StreamingContext, not only the
StreamingContext object, right? Or other reason to trigger the serialize
operation? Any workaround for this? (except not using the
com.mysql.jdbc.java)

Thank you.

Cheers,
Fang, Yan
yanfang...@gmail.com
+1 (206) 849-4108


Re: unserializable object in Spark Streaming context

2014-07-17 Thread Marcelo Vanzin
Could you share some code (or pseudo-code)?

Sounds like you're instantiating the JDBC connection in the driver,
and using it inside a closure that would be run in a remote executor.
That means that the connection object would need to be serializable.
If that sounds like what you're doing, it won't work.


On Thu, Jul 17, 2014 at 1:37 PM, Yan Fang yanfang...@gmail.com wrote:
 Hi guys,

 need some help in this problem. In our use case, we need to continuously
 insert values into the database. So our approach is to create the jdbc
 object in the main method and then do the inserting operation in the DStream
 foreachRDD operation. Is this approach reasonable?

 Then the problem comes: since we are using com.mysql.jdbc.java, which is
 unserializable, we keep seeing the notSerializableException. I think that is
 because Spark Streaming is trying to serialize and then checkpoint the whole
 class which contains the StreamingContext, not only the StreamingContext
 object, right? Or other reason to trigger the serialize operation? Any
 workaround for this? (except not using the com.mysql.jdbc.java)

 Thank you.

 Cheers,
 Fang, Yan
 yanfang...@gmail.com
 +1 (206) 849-4108



-- 
Marcelo


Re: Include permalinks in mail footer

2014-07-17 Thread Matei Zaharia
Good question.. I'll ask INFRA because I haven't seen other Apache mailing 
lists provide this. It would indeed be helpful.

Matei

On Jul 17, 2014, at 12:59 PM, Nick Chammas nicholas.cham...@gmail.com wrote:

 Can we modify the mailing list to include permalinks to the thread in the 
 footer of every email? Or at least of the initial email in a thread?
 
 I often find myself wanting to reference one thread from another, or from a 
 JIRA issue. Right now I have to google the thread subject and find the link 
 that way.
 
 It would be nice to be able to find the permalink I need from the thread 
 itself. 
 
 It might also be helpful for people to include an unsubscribe link in the 
 footer. That is a common practice in most mailing lists.
 
 Nick
 
 
 View this message in context: Include permalinks in mail footer
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Is there a way to get previous/other keys' state in Spark Streaming?

2014-07-17 Thread Tathagata Das
The updateFunction given in updateStateByKey should be called on ALL the
keys are in the state, even if there is no new data in the batch for some
key. Is that not the behavior you see?

What do you mean by show all the existing states? You have access to the
latest state RDD by doing stateStream.foreachRDD(...). There you can do
whatever operation on all the key-state pairs.

TD




On Thu, Jul 17, 2014 at 11:58 AM, Yan Fang yanfang...@gmail.com wrote:

 Hi TD,

 Thank you for the quick replying and backing my approach. :)

 1) The example is this:

 1. In the first 2 second interval, after updateStateByKey, I get a few
 keys and their states, say, (a - 1, b - 2, c - 3)
 2. In the following 2 second interval, I only receive c and d and
 their value. But I want to update/display the state of a and b
 accordingly.
 * It seems I have no way to access the a and b and get their states.
 * also, do I have a way to show all the existing states?

 I guess the approach to solve this will be similar to what you mentioned
 for 2). But the difficulty is that, if I want to display all the existing
 states, need to bundle all the rest keys to one key.

 Thank you.

 Cheers,

 Fang, Yan
 yanfang...@gmail.com
 +1 (206) 849-4108


 On Thu, Jul 17, 2014 at 11:36 AM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 For accessing previous version, I would do it the same way. :)

 1. Can you elaborate on what you mean by that with an example? What do
 you mean by accessing keys?

 2. Yeah, that is hard to do with the ability to do point lookups into an
 RDD, which we dont support yet. You could try embedding the related key in
 the values of the keys that need it. That is, B will is present in the
 value of key A. Then put this transformed DStream through updateStateByKey.

 TD





Re: unserializable object in Spark Streaming context

2014-07-17 Thread Tathagata Das
And if Marcelo's guess is correct, then the right way to do this would be
to lazily  / dynamically create the jdbc connection server as a singleton
in the workers/executors and use that. Something like this.


dstream.foreachRDD(rdd = {
   rdd.foreachPartition((iterator: Iterator[...]) = {
   val driver = JDBCDriver.getSingleton()   // this will create the
single jdbc server in the worker, if it does not exist
   // loop through iterator to get the records in the partition and use
the driver to push them out to the DB
   }
}

This will avoid the JDBC server being serialized as part of the closure /
DStream checkpoint.

TD


On Thu, Jul 17, 2014 at 1:42 PM, Marcelo Vanzin van...@cloudera.com wrote:

 Could you share some code (or pseudo-code)?

 Sounds like you're instantiating the JDBC connection in the driver,
 and using it inside a closure that would be run in a remote executor.
 That means that the connection object would need to be serializable.
 If that sounds like what you're doing, it won't work.


 On Thu, Jul 17, 2014 at 1:37 PM, Yan Fang yanfang...@gmail.com wrote:
  Hi guys,
 
  need some help in this problem. In our use case, we need to continuously
  insert values into the database. So our approach is to create the jdbc
  object in the main method and then do the inserting operation in the
 DStream
  foreachRDD operation. Is this approach reasonable?
 
  Then the problem comes: since we are using com.mysql.jdbc.java, which is
  unserializable, we keep seeing the notSerializableException. I think
 that is
  because Spark Streaming is trying to serialize and then checkpoint the
 whole
  class which contains the StreamingContext, not only the StreamingContext
  object, right? Or other reason to trigger the serialize operation? Any
  workaround for this? (except not using the com.mysql.jdbc.java)
 
  Thank you.
 
  Cheers,
  Fang, Yan
  yanfang...@gmail.com
  +1 (206) 849-4108



 --
 Marcelo



Re: Spark Streaming timestamps

2014-07-17 Thread Bill Jay
Hi Tathagata,

Thanks for your answer. Please see my further question below:


On Wed, Jul 16, 2014 at 6:57 PM, Tathagata Das tathagata.das1...@gmail.com
wrote:

 Answers inline.


 On Wed, Jul 16, 2014 at 5:39 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi all,

 I am currently using Spark Streaming to conduct a real-time data
 analytics. We receive data from Kafka. We want to generate output files
 that contain results that are based on the data we receive from a specific
 time interval.

 I have several questions on Spark Streaming's timestamp:

 1) If I use saveAsTextFiles, it seems Spark streaming will generate files
 in complete minutes, such as 5:00:01, 5:00:01 (converted from Unix time),
 etc. Does this mean the results are based on the data from 5:00:01 to
 5:00:02, 5:00:02 to 5:00:03, etc. Or the time stamps just mean the time the
 files are generated?

 File named  5:00:01 contains results from data received between  5:00:00
 and  5:00:01 (based on system time of the cluster).



 2) If I do not use saveAsTextFiles, how do I get the exact time interval
 of the RDD when I use foreachRDD to do custom output of the results?

 There is a version of foreachRDD which allows you specify the function
 that takes in Time object.


 3) How can we specify the starting time of the batches?


 What do you mean? Batches are timed based on the system time of the
 cluster.

I would like to control the starting time and ending time of each batch.
For example, if I use saveAsTextFiles as output method and the batch size
is 1 minute, Spark will align time intervals to complete minutes, such as
5:01:00, 5:02:00, 5:03:00. It will have not results that are 5:01:03,
5:02:03, 5:03:03, etc. My goal is to generate output for a customized
interval such as from 5:01:30 to 5:02:29, 5:02:30 to 5:03:29, etc.

I checked the api of foreachRDD with time parameter. It seems there is not
explanation on what does that parameter mean. Does it mean the starting
time of the first batch?




 Thanks!

 Bill





how to pass extra Java opts to workers for spark streaming jobs

2014-07-17 Thread Chen Song
I am using spark 0.9.0 and I am able to submit job to YARN,
https://spark.apache.org/docs/0.9.0/running-on-yarn.html.

I am trying to turn on gc logging on executors but could not find a way to
set extra Java opts for workers.

I tried to set spark.executor.extraJavaOptions but that did not work.

Any idea on how I should do this?

-- 
Chen Song


Re: spark streaming rate limiting from kafka

2014-07-17 Thread Chen Song
Thanks Luis and Tobias.


On Tue, Jul 1, 2014 at 11:39 PM, Tobias Pfeiffer t...@preferred.jp wrote:

 Hi,

 On Wed, Jul 2, 2014 at 1:57 AM, Chen Song chen.song...@gmail.com wrote:

 * Is there a way to control how far Kafka Dstream can read on
 topic-partition (via offset for example). By setting this to a small
 number, it will force DStream to read less data initially.


 Please see the post at

 http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201406.mbox/%3ccaph-c_m2ppurjx-n_tehh0bvqe_6la-rvgtrf1k-lwrmme+...@mail.gmail.com%3E
 Kafka's auto.offset.reset parameter may be what you are looking for.

 Tobias




-- 
Chen Song


Re: Apache kafka + spark + Parquet

2014-07-17 Thread Tathagata Das
val kafkaStream = KafkaUtils.createStream(... ) // see the example in my
previous post

val transformedStream = kafkaStream.map ...   // whatever transformation
you want to do

transformedStream.foreachRDD((rdd: RDD[...], time: Time) = {
 // save the rdd to parquet file, using time as the file name, see
other link i sent in how to do it
 // every batch of data will create a new parquet file
})


Maybe michael (cc'ed) will be able to give more insights about the parquet
stuff.

TD




On Thu, Jul 17, 2014 at 3:59 AM, Mahebub Sayyed mahebub...@gmail.com
wrote:

 Hi,

 To migrate data from *HBase *to *Parquet* we used following query through
 * Impala*:

 INSERT INTO table PARQUET_HASHTAGS(

 key, city_name, country_name, hashtag_date, hashtag_text, hashtag_source, 
 hashtag_month, posted_time, hashtag_time,
 tweet_id, user_id, user_name,
 hashtag_year
 ) *partition(year, month, day)* SELECT key, city_name, country_name,
 hashtag_date, hashtag_text, hashtag_source, hashtag_month, posted_time,
 hashtag_time, tweet_id, user_id, user_name,hashtag_year, cast(hashtag_year
 as int),cast(hashtag_month as int), cast(hashtag_date as int) from HASHTAGS
 where hashtag_year='2014' and hashtag_month='04' and hashtag_date='01'
 ORDER BY key,hashtag_year,hashtag_month,hashtag_date LIMIT 1000 offset
 0;

 using above query we have successfully migrated form HBase to Parquet
 files with proper partitions.

 Now we are storing Data direct from *Kafka *to *Parquet.*

 *How is it possible to create partitions while storing data direct from
 kafka to Parquet files??*
 *(likewise created in above query)*


 On Thu, Jul 17, 2014 at 12:35 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 1. You can put in multiple kafka topics in the same Kafka input stream.
 See the example KafkaWordCount
 https://github.com/apache/spark/blob/68f28dabe9c7679be82e684385be216319beb610/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala
  .
 However they will all be read through a single receiver (though multiple
 threads, one per topic). To parallelize the read (for increasing
 throughput), you can create multiple Kafka input streams, and splits the
 topics appropriately between them.

 2. You can easily read and write to parquet files in Spark. Any RDD
 (generated through DStreams in Spark Streaming, or otherwise), can be
 converted to a SchemaRDD and then saved in the parquet format as
 rdd.saveAsParquetFile. See the Spark SQL guide
 http://spark.apache.org/docs/latest/sql-programming-guide.html#parquet-files
  for
 more details. So if you want to write a same dataset (as RDDs) to two
 different parquet files, you just have to call saveAsParquetFile twice (on
 same or transformed versions of the RDD), as shown in the guide.

 Hope this helps!

 TD


 On Thu, Jul 17, 2014 at 2:19 AM, Mahebub Sayyed mahebub...@gmail.com
 wrote:

 Hi All,

 Currently we are reading (multiple) topics from Apache kafka and storing
 that in HBase (multiple tables) using twitter storm (1 tuple stores in 4
 different tables).
  but we are facing some performance issue with HBase.
 so we are replacing* HBase* with *Parquet* file and *storm* with *Apache
 Spark*.

 difficulties:
  1. How to read multiple topics from kafka using spark?
 2. One tuple belongs to multiple tables, How to write one topic to
 multiple parquet files with proper partitioning using spark??

 Please help me
 Thanks in advance.

 --
 *Regards,*

 *Mahebub *





 --
 *Regards,*
 *Mahebub Sayyed*



Re: can't get jobs to run on cluster (enough memory and cpus are available on worker)

2014-07-17 Thread Marcelo Vanzin
Hi Matt,

I'm not very familiar with setup on ec2; the closest I can point you
at is to look at the launch_cluster in ec2/spark_ec2.py, where the
ports seem to be configured.


On Thu, Jul 17, 2014 at 1:29 PM, Matt Work Coarr
mattcoarr.w...@gmail.com wrote:
 Thanks Marcelo!  This is a huge help!!

 Looking at the executor logs (in a vanilla spark install, I'm finding them
 in $SPARK_HOME/work/*)...

 It launches the executor, but it looks like the CoarseGrainedExecutorBackend
 is having trouble talking to the driver (exactly what you said!!!).

 Do you know what the range of random ports that is used for the the
 executor-to-driver?  Is that range adjustable?  Any config setting or
 environment variable?

 I manually setup my ec2 security group to include all the ports that the
 spark ec2 script ($SPARK_HOME/ec2/spark_ec2.py) sets up in it's security
 groups.  They included (for those listed above 1):
 1
 50060
 50070
 50075
 60060
 60070
 60075

 Obviously I'll need to make some adjustments to my EC2 security group!  Just
 need to figure out exactly what should be in there.  To keep things simple,
 I just have one security group for the master, slaves, and the driver
 machine.

 In listing the port ranges in my current security group I looked at the
 ports that spark_ec2.py sets up as well as the ports listed in the spark
 standalone mode documentation page under configuring ports for network
 security:

 http://spark.apache.org/docs/latest/spark-standalone.html


 Here are the relevant fragments from the executor log:

 Spark Executor Command: /cask/jdk/bin/java -cp
 ::/cask/spark/conf:/cask/spark/lib/spark-assembly-1.0.0-hadoop2.2.0.jar:/cask/spark/lib/datanucleus-api-jdo-3.

 2.1.jar:/cask/spark/lib/datanucleus-rdbms-3.2.1.jar:/cask/spark/lib/datanucleus-core-3.2.2.jar
 -XX:MaxPermSize=128m -Dspark.akka.frameSize=100 -Dspark.akka.

 frameSize=100 -Xms512M -Xmx512M
 org.apache.spark.executor.CoarseGrainedExecutorBackend
 akka.tcp://spark@ip-10-202-11-191.ec2.internal:46787/user/CoarseGra

 inedScheduler 0 ip-10-202-8-45.ec2.internal 8
 akka.tcp://sparkWorker@ip-10-202-8-45.ec2.internal:7101/user/Worker
 app-20140717195146-

 

 ...

 14/07/17 19:51:47 DEBUG NativeCodeLoader: Trying to load the custom-built
 native-hadoop library...

 14/07/17 19:51:47 DEBUG NativeCodeLoader: Failed to load native-hadoop with
 error: java.lang.UnsatisfiedLinkError: no hadoop in java.library.path

 14/07/17 19:51:47 DEBUG NativeCodeLoader:
 java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib

 14/07/17 19:51:47 WARN NativeCodeLoader: Unable to load native-hadoop
 library for your platform... using builtin-java classes where applicable

 14/07/17 19:51:47 DEBUG JniBasedUnixGroupsMappingWithFallback: Falling back
 to shell based

 14/07/17 19:51:47 DEBUG JniBasedUnixGroupsMappingWithFallback: Group mapping
 impl=org.apache.hadoop.security.ShellBasedUnixGroupsMapping

 14/07/17 19:51:48 DEBUG Groups: Group mapping
 impl=org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback;
 cacheTimeout=30

 14/07/17 19:51:48 DEBUG SparkHadoopUtil: running as user: ec2-user

 ...


 14/07/17 19:51:48 INFO CoarseGrainedExecutorBackend: Connecting to driver:
 akka.tcp://spark@ip-10-202-11-191.ec2.internal:46787/user/CoarseGrainedScheduler

 14/07/17 19:51:48 INFO WorkerWatcher: Connecting to worker
 akka.tcp://sparkWorker@ip-10-202-8-45.ec2.internal:7101/user/Worker

 14/07/17 19:51:49 INFO WorkerWatcher: Successfully connected to
 akka.tcp://sparkWorker@ip-10-202-8-45.ec2.internal:7101/user/Worker

 14/07/17 19:53:29 ERROR CoarseGrainedExecutorBackend: Driver Disassociated
 [akka.tcp://sparkExecutor@ip-10-202-8-45.ec2.internal:55670] -
 [akka.tcp://spark@ip-10-202-11-191.ec2.internal:46787] disassociated!
 Shutting down.


 Thanks a bunch!
 Matt


 On Thu, Jul 17, 2014 at 1:21 PM, Marcelo Vanzin van...@cloudera.com wrote:

 When I meant the executor log, I meant the log of the process launched
 by the worker, not the worker. In my CDH-based Spark install, those
 end up in /var/run/spark/work.

 If you look at your worker log, you'll see it's launching the executor
 process. So there should be something there.

 Since you say it works when both are run in the same node, that
 probably points to some communication issue, since the executor needs
 to connect back to the driver. Check to see if you don't have any
 firewalls blocking the ports Spark tries to use. (That's one of the
 non-resource-related cases that will cause that message.)



-- 
Marcelo


Re: unserializable object in Spark Streaming context

2014-07-17 Thread Yan Fang
Hi Marcelo and TD,

Thank you for the help. If I use TD's approache, it works and there is no
exception. Only drawback is that it will create many connections to the DB,
which I was trying to avoid.

Here is a snapshot of my code. Mark as red for the important code. What I
was thinking is that, if I call the collect() method, Spark Streaming will
bring the data to the driver and then the db object does not need to be
sent to executors. My observation is that, thought exceptions are thrown,
the insert function still works. Any thought about that? Also paste the log
in case it helps .http://pastebin.com/T1bYvLWB

== code ==

   SparkConf sparkConf = new SparkConf().setAppName(balababala);
   JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
new Duration(2000));

final MySQLHelper db = new MySQLHelper();  // this class contain
instantiate the jdbc driver.

   /**
   /* a few DStream transformations
   **/

JavaPairDStreamString, MachineState noiseState = machineIdNoise
.updateStateByKey(getUpdateFunction());

JavaPairDStreamString, Tuple2MachineState, Integer
noiseStateTemperature = noiseState.join(machineIdTemperature);

noiseStateTemperature
.foreachRDD(new FunctionJavaPairRDDString, Tuple2MachineState,
Integer, Void() {
@Override
public Void call(JavaPairRDDString, Tuple2MachineState,
Integer arg0)
throws Exception {
ListTuple2String, Tuple2MachineState, Integer list =
arg0
.collect();
for (Tuple2String, Tuple2MachineState, Integer tuple :
list) {
String machineId
String machineState
db.insertAverages(machineId, machineState);
}
return null;
}
});

 end code ===

Thank you. If there is no other workaround, I may use TD's approach because
it is the only option.

Best,

Fang, Yan
yanfang...@gmail.com
+1 (206) 849-4108


On Thu, Jul 17, 2014 at 1:54 PM, Tathagata Das tathagata.das1...@gmail.com
wrote:

 And if Marcelo's guess is correct, then the right way to do this would be
 to lazily  / dynamically create the jdbc connection server as a singleton
 in the workers/executors and use that. Something like this.


 dstream.foreachRDD(rdd = {
rdd.foreachPartition((iterator: Iterator[...]) = {
val driver = JDBCDriver.getSingleton()   // this will create the
 single jdbc server in the worker, if it does not exist
// loop through iterator to get the records in the partition and
 use the driver to push them out to the DB
}
 }

 This will avoid the JDBC server being serialized as part of the closure /
 DStream checkpoint.

 TD


 On Thu, Jul 17, 2014 at 1:42 PM, Marcelo Vanzin van...@cloudera.com
 wrote:

 Could you share some code (or pseudo-code)?

 Sounds like you're instantiating the JDBC connection in the driver,
 and using it inside a closure that would be run in a remote executor.
 That means that the connection object would need to be serializable.
 If that sounds like what you're doing, it won't work.


 On Thu, Jul 17, 2014 at 1:37 PM, Yan Fang yanfang...@gmail.com wrote:
  Hi guys,
 
  need some help in this problem. In our use case, we need to continuously
  insert values into the database. So our approach is to create the jdbc
  object in the main method and then do the inserting operation in the
 DStream
  foreachRDD operation. Is this approach reasonable?
 
  Then the problem comes: since we are using com.mysql.jdbc.java, which is
  unserializable, we keep seeing the notSerializableException. I think
 that is
  because Spark Streaming is trying to serialize and then checkpoint the
 whole
  class which contains the StreamingContext, not only the StreamingContext
  object, right? Or other reason to trigger the serialize operation? Any
  workaround for this? (except not using the com.mysql.jdbc.java)
 
  Thank you.
 
  Cheers,
  Fang, Yan
  yanfang...@gmail.com
  +1 (206) 849-4108



 --
 Marcelo





Re: Apache kafka + spark + Parquet

2014-07-17 Thread Michael Armbrust
We don't have support for partitioned parquet yet.  There is a JIRA here:
https://issues.apache.org/jira/browse/SPARK-2406


On Thu, Jul 17, 2014 at 5:00 PM, Tathagata Das tathagata.das1...@gmail.com
wrote:

 val kafkaStream = KafkaUtils.createStream(... ) // see the example in my
 previous post

 val transformedStream = kafkaStream.map ...   // whatever transformation
 you want to do

 transformedStream.foreachRDD((rdd: RDD[...], time: Time) = {
  // save the rdd to parquet file, using time as the file name, see
 other link i sent in how to do it
  // every batch of data will create a new parquet file
 })


 Maybe michael (cc'ed) will be able to give more insights about the parquet
 stuff.

 TD




 On Thu, Jul 17, 2014 at 3:59 AM, Mahebub Sayyed mahebub...@gmail.com
 wrote:

 Hi,

 To migrate data from *HBase *to *Parquet* we used following query
 through * Impala*:

 INSERT INTO table PARQUET_HASHTAGS(

 key, city_name, country_name, hashtag_date, hashtag_text, hashtag_source, 
 hashtag_month, posted_time, hashtag_time,
 tweet_id, user_id, user_name,
 hashtag_year
 ) *partition(year, month, day)* SELECT key, city_name, country_name,
 hashtag_date, hashtag_text, hashtag_source, hashtag_month, posted_time,
 hashtag_time, tweet_id, user_id, user_name,hashtag_year, cast(hashtag_year
 as int),cast(hashtag_month as int), cast(hashtag_date as int) from HASHTAGS
 where hashtag_year='2014' and hashtag_month='04' and hashtag_date='01'
 ORDER BY key,hashtag_year,hashtag_month,hashtag_date LIMIT 1000 offset
 0;

 using above query we have successfully migrated form HBase to Parquet
 files with proper partitions.

 Now we are storing Data direct from *Kafka *to *Parquet.*

 *How is it possible to create partitions while storing data direct from
 kafka to Parquet files??*
 *(likewise created in above query)*


 On Thu, Jul 17, 2014 at 12:35 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 1. You can put in multiple kafka topics in the same Kafka input stream.
 See the example KafkaWordCount
 https://github.com/apache/spark/blob/68f28dabe9c7679be82e684385be216319beb610/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala
  .
 However they will all be read through a single receiver (though multiple
 threads, one per topic). To parallelize the read (for increasing
 throughput), you can create multiple Kafka input streams, and splits the
 topics appropriately between them.

 2. You can easily read and write to parquet files in Spark. Any RDD
 (generated through DStreams in Spark Streaming, or otherwise), can be
 converted to a SchemaRDD and then saved in the parquet format as
 rdd.saveAsParquetFile. See the Spark SQL guide
 http://spark.apache.org/docs/latest/sql-programming-guide.html#parquet-files
  for
 more details. So if you want to write a same dataset (as RDDs) to two
 different parquet files, you just have to call saveAsParquetFile twice (on
 same or transformed versions of the RDD), as shown in the guide.

 Hope this helps!

 TD


 On Thu, Jul 17, 2014 at 2:19 AM, Mahebub Sayyed mahebub...@gmail.com
 wrote:

 Hi All,

 Currently we are reading (multiple) topics from Apache kafka and
 storing that in HBase (multiple tables) using twitter storm (1 tuple stores
 in 4 different tables).
  but we are facing some performance issue with HBase.
 so we are replacing* HBase* with *Parquet* file and *storm* with *Apache
 Spark*.

 difficulties:
  1. How to read multiple topics from kafka using spark?
 2. One tuple belongs to multiple tables, How to write one topic to
 multiple parquet files with proper partitioning using spark??

 Please help me
 Thanks in advance.

 --
 *Regards,*

 *Mahebub *





 --
 *Regards,*
 *Mahebub Sayyed*





Re: Retrieve dataset of Big Data Benchmark

2014-07-17 Thread Tom
Hi Burak,

I tried running it through the Spark shell, but I still ended with the same
error message as in Hadoop:
java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key
must be specified as the username or password (respectively) of a s3n URL,
or by setting the fs.s3n.awsAccessKeyId or fs.s3n.awsSecretAccessKey
properties (respectively).
I guess the files are publicly available, but only to registered AWS users,
so I caved in and registered for the service. Using the credentials that I
got I was able to download the files using the local spark shell. 

Thanks!

Tom



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Retrieve-dataset-of-Big-Data-Benchmark-tp9821p10096.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Error with spark-submit

2014-07-17 Thread ranjanp
Hi,I am new to Spark and trying out with a stand-alone, 3-node (1 master, 2
workers) cluster. From the Web UI at the master, I see that the workers are
registered. But when I try running the SparkPi example from the master node,
I get the following message and then an exception.14/07/17 01:20:36 INFO
AppClient$ClientActor: Connecting to master spark://10.1.3.7:7077...14/07/17
01:20:46 WARN TaskSchedulerImpl: Initial job has not accepted any resources;
check your cluster UI to ensure that workers are registered and have
sufficient memoryI searched a bit for the above warning, and found and found
that others have encountered this problem before, but did not see a clear
resolution except for this link:
http://apache-spark-user-list.1001560.n3.nabble.com/TaskSchedulerImpl-Initial-job-has-not-accepted-any-resources-check-your-cluster-UI-to-ensure-that-woy-tt8247.html#a8444
Based on the suggestion there I tried supplying --executor-memory option to
spark-submit but that did not help.Any suggestions. Here are the details of
my set up. - 3 nodes (each with 4 CPU cores and 7 GB memory) - 1 node
configured as Master, and the other two configured as workers - Firewall is
disabled on all nodes, and network communication between the nodes is not a
problem - Edited the conf/spark-env.sh on all nodes to set the following:   
SPARK_WORKER_CORES=3SPARK_WORKER_MEMORY=5G - The Web UI as well as logs
on master show that Workers were able to register correctly. Also the Web UI
correctly shows the aggregate available memory and CPU cores on the
workers:URL: spark://vmsparkwin1:7077Workers: 2Cores: 6 Total, 0 UsedMemory:
10.0 GB Total, 0.0 B UsedApplications: 0 Running, 0 CompletedDrivers: 0
Running, 0 CompletedStatus: ALIVEI try running the SparkPi example first
using the run-example (which was failing) and later directly using the
spark-submit as shown below:azureuser@vmsparkwin1
/cygdrive/c/opt/spark-1.0.0$ export MASTER=spark://vmsparkwin1:7077$ echo
$MASTERspark://vmsparkwin1:7077azureuser@vmsparkwin1
/cygdrive/c/opt/spark-1.0.0$ ./bin/spark-submit --class
org.apache.spark.examples.SparkPi --master spark://10.1.3.7:7077
--executor-memory 1G --total-executor-cores 2
./lib/spark-examples-1.0.0-hadoop2.2.0.jar 10The following is the full
screen output:14/07/17 01:20:13 INFO SecurityManager: Using Spark's default
log4j profile: org/apache/spark/log4j-defaults.properties14/07/17 01:20:13
INFO SecurityManager: Changing view acls to: azureuser14/07/17 01:20:13 INFO
SecurityManager: SecurityManager: authentication disabled; ui acls disabled;
users with view permissions: Set(azureuser)14/07/17 01:20:14 INFO
Slf4jLogger: Slf4jLogger started14/07/17 01:20:14 INFO Remoting: Starting
remoting14/07/17 01:20:14 INFO Remoting: Remoting started; listening on
addresses
:[akka.tcp://sp...@vmsparkwin1.cssparkwin.b1.internal.cloudapp.net:49839]14/07/17
01:20:14 INFO Remoting: Remoting now listens on addresses:
[akka.tcp://sp...@vmsparkwin1.cssparkwin.b1.internal.cloudapp.net:49839]14/07/17
01:20:14 INFO SparkEnv: Registering MapOutputTracker14/07/17 01:20:14 INFO
SparkEnv: Registering BlockManagerMaster14/07/17 01:20:14 INFO
DiskBlockManager: Created local directory at
C:\cygwin\tmp\spark-local-20140717012014-b60614/07/17 01:20:14 INFO
MemoryStore: MemoryStore started with capacity 294.9 MB.14/07/17 01:20:14
INFO ConnectionManager: Bound socket to port 49842 with id =
ConnectionManagerId(vmsparkwin1.cssparkwin.b1.internal.cloudapp.net,49842)14/07/17
01:20:14 INFO BlockManagerMaster: Trying to register BlockManager14/07/17
01:20:14 INFO BlockManagerInfo: Registering block manager
vmsparkwin1.cssparkwin.b1.internal.cloudapp.net:49842 with 294.9 MB
RAM14/07/17 01:20:14 INFO BlockManagerMaster: Registered
BlockManager14/07/17 01:20:14 INFO HttpServer: Starting HTTP Server14/07/17
01:20:14 INFO HttpBroadcast: Broadcast server started at
http://10.1.3.7:4984314/07/17 01:20:14 INFO HttpFileServer: HTTP File server
directory is
C:\cygwin\tmp\spark-6a076e92-53bb-4c7a-9e27-ce53a818146d14/07/17 01:20:14
INFO HttpServer: Starting HTTP Server14/07/17 01:20:15 INFO SparkUI: Started
SparkUI at
http://vmsparkwin1.cssparkwin.b1.internal.cloudapp.net:404014/07/17 01:20:15
WARN NativeCodeLoader: Unable to load native-hadoop library for your
platform... using builtin-java classes where applicable14/07/17 01:20:16
INFO SparkContext: Added JAR
file:/C:/opt/spark-1.0.0/./lib/spark-examples-1.0.0-hadoop2.2.0.jar at
http://10.1.3.7:49844/jars/spark-examples-1.0.0-hadoop2.2.0.jar with
timestamp 140556001631614/07/17 01:20:16 INFO AppClient$ClientActor:
Connecting to master spark://10.1.3.7:7077...14/07/17 01:20:16 INFO
SparkContext: Starting job: reduce at SparkPi.scala:3514/07/17 01:20:16 INFO
DAGScheduler: Got job 0 (reduce at SparkPi.scala:35) with 10 output
partitions (allowLocal=false)14/07/17 01:20:16 INFO DAGScheduler: Final
stage: Stage 0(reduce at SparkPi.scala:35)14/07/17 01:20:16 INFO
DAGScheduler: Parents of final stage: List()14/07/17 01:20:16 INFO

Large scale ranked recommendation

2014-07-17 Thread m3.sharma
Hi,

I am trying to develop a recommender system for about 1 million users and 10
thousand items. Currently it's a simple regression based model where for
every user, item pair in dataset we generate some features and learn model
from it. Till training and evaluation everything is fine the bottleneck is
prediction and ranking for deployment, as at the end of day we need to
recommend each user top 10 personalized items. To do this for every user I
need to use model to predict his rating/preference on all items and take top
10 items from list. Hence after learning the model I need to do 10K X
1million predictions (model.predict(featureVector)).

Currently I have the following process, feature vectors are sparse and of
length ~300 each.
*1. userFeatures:RDD[(Int, Vector)] , itemFeatures:RDD[(Int, Vector)]*
I do cartesian product of above to generate every user, item combination and
corresponding feature:
*2. val allUIFeat:RDD[(Int, Int, Vector)] =
userFeatures.cartesian(itemFeatures).map(...)*
Then I use the model to do prediction as follow:
*3. val allUIPred:RDD[(Int, Int, Double)] = allUIFeat.map{x = (x._1, x._2,
model.predict(x._3))}*
*4. Then we do group by user and sort to get top 10 items.*

We are not able to complete step 3 above, its taking a really long time
(~5hrs) to get all the predictions which is really long considering we
already have the model and it just needs to do some computation for
prediction. I have tried partitioning  userFeatures across 800 partitions
before doing above steps, still it was of no help.

I am using about 100 executor , 2 core, each executor with 2gb RAM.

Are there any suggestions to make these predictions fast?






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Large-scale-ranked-recommendation-tp10098.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Release date for new pyspark

2014-07-17 Thread Paul Wais
Thanks all!  (And thanks Matei for the developer link!)  I was able to
build using maven[1] but `./sbt/sbt assembly` results in build errors.
(Not familiar enough with the build to know why; in the past sbt
worked for me and maven did not).

I was able to run the master version of pyspark, which was what I
wanted, though I discovered a bug when trying to read spark-pickled
data from HDFS.  (Looks similar to
https://spark-project.atlassian.net/browse/SPARK-1034 from my naive
point of view).  For the curious:

Code:

conf = SparkConf()
conf.set('spark.local.dir', '/nail/tmp')
conf.set('spark.executor.memory', '28g')
conf.set('spark.app.name', 'test')

sc = SparkContext(conf=conf)

sc.parallelize(range(10)).saveAsPickleFile(hdfs://host:9000/test_pickle)
unpickled_rdd = sc.pickleFile(hdfs://host:9000/test_pickle)
print unpickled_rdd.takeSample(False, 3)

Traceback (most recent call last):
  File /path/to/my/home/spark-master/tast.py, line 33, in module
print unpickled_rdd.takeSample(False, 3)
  File /path/to/my/home/spark-master/python/pyspark/rdd.py, line
391, in takeSample
initialCount = self.count()
  File /path/to/my/home/spark-master/python/pyspark/rdd.py, line 791, in count
return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
  File /path/to/my/home/spark-master/python/pyspark/rdd.py, line 782, in sum
return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
  File /path/to/my/home/spark-master/python/pyspark/rdd.py, line
703, in reduce
vals = self.mapPartitions(func).collect()
  File /path/to/my/home/spark-master/python/pyspark/rdd.py, line
667, in collect
bytesInJava = self._jrdd.collect().iterator()
  File /path/to/my/home/spark-master/python/pyspark/rdd.py, line
1600, in _jrdd
class_tag)
  File 
/path/to/my/home/spark-master/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py,
line 669, in __call__
  File 
/path/to/my/home/spark-master/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py,
line 304, in get_return_value
py4j.protocol.Py4JError: An error occurred while calling
None.org.apache.spark.api.python.PythonRDD. Trace:
py4j.Py4JException: Constructor
org.apache.spark.api.python.PythonRDD([class
org.apache.spark.rdd.FlatMappedRDD, class [B, class java.util.HashMap,
class java.util.ArrayList, class java.lang.Boolean, class
java.lang.String, class java.util.ArrayList, class
org.apache.spark.Accumulator, class
scala.reflect.ManifestFactory$$anon$2]) does not exist
at 
py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:184)
at 
py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:202)
at py4j.Gateway.invoke(Gateway.java:213)
at 
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:79)
at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:68)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:662)


[1] mvn -Phadoop-2.3 -Dhadoop.verson=2.3.0 -DskipTests clean package

On Wed, Jul 16, 2014 at 8:39 PM, Michael Armbrust
mich...@databricks.com wrote:
 You should try cleaning and then building.  We have recently hit a bug in
 the scala compiler that sometimes causes non-clean builds to fail.


 On Wed, Jul 16, 2014 at 7:56 PM, Matei Zaharia matei.zaha...@gmail.com
 wrote:

 Yeah, we try to have a regular 3 month release cycle; see
 https://cwiki.apache.org/confluence/display/SPARK/Wiki+Homepage for the
 current window.

 Matei

 On Jul 16, 2014, at 4:21 PM, Mark Hamstra m...@clearstorydata.com wrote:

 You should expect master to compile and run: patches aren't merged unless
 they build and pass tests on Jenkins.

 You shouldn't expect new features to be added to stable code in
 maintenance releases (e.g. 1.0.1).

 AFAIK, we're still on track with Spark 1.1.0 development, which means that
 it should be released sometime in the second half of next month (or shortly
 thereafter).


 On Wed, Jul 16, 2014 at 4:03 PM, Paul Wais pw...@yelp.com wrote:

 Dear List,

 The version of pyspark on master has a lot of nice new features, e.g.
 SequenceFile reading, pickle i/o, etc:
 https://github.com/apache/spark/blob/master/python/pyspark/context.py#L353

 I downloaded the recent 1.0.1 release and was surprised to see the
 distribution did not include these changes in master.  (I've tried pulling
 master [ 9c249743ea ] and compiling from source, but I get a build failure
 in TestSQLContext.scala FWIW).

 Is an updated pyspark scheduled for the next release?  (Also, am I wrong
 in expecting HEAD on master should probably compile and run?)

 Best Regards,
 -Paul Wais






Error with spark-submit (formatting corrected)

2014-07-17 Thread ranjanp
Hi, 
I am new to Spark and trying out with a stand-alone, 3-node (1 master, 2
workers) cluster. 

From the Web UI at the master, I see that the workers are registered. But
when I try running the SparkPi example from the master node, I get the
following message and then an exception. 

14/07/17 01:20:36 INFO AppClient$ClientActor: Connecting to master
spark://10.1.3.7:7077... 
14/07/17 01:20:46 WARN TaskSchedulerImpl: Initial job has not accepted any
resources; check your cluster UI to ensure that workers are registered and
have sufficient memory 

I searched a bit for the above warning, and found and found that others have
encountered this problem before, but did not see a clear resolution except
for this link:
http://apache-spark-user-list.1001560.n3.nabble.com/TaskSchedulerImpl-Initial-job-has-not-accepted-any-resources-check-your-cluster-UI-to-ensure-that-woy-tt8247.html#a8444

Based on the suggestion there I tried supplying --executor-memory option to
spark-submit but that did not help. 

Any suggestions. Here are the details of my set up. 
- 3 nodes (each with 4 CPU cores and 7 GB memory) 
- 1 node configured as Master, and the other two configured as workers 
- Firewall is disabled on all nodes, and network communication between the
nodes is not a problem 
- Edited the conf/spark-env.sh on all nodes to set the following: 
  SPARK_WORKER_CORES=3 
  SPARK_WORKER_MEMORY=5G 
- The Web UI as well as logs on master show that Workers were able to
register correctly. Also the Web UI correctly shows the aggregate available
memory and CPU cores on the workers: 

URL: spark://vmsparkwin1:7077
Workers: 2
Cores: 6 Total, 0 Used
Memory: 10.0 GB Total, 0.0 B Used
Applications: 0 Running, 0 Completed
Drivers: 0 Running, 0 Completed
Status: ALIVE

I try running the SparkPi example first using the run-example (which was
failing) and later directly using the spark-submit as shown below: 

$ export MASTER=spark://vmsparkwin1:7077

$ echo $MASTER
spark://vmsparkwin1:7077

azureuser@vmsparkwin1 /cygdrive/c/opt/spark-1.0.0
$ ./bin/spark-submit --class org.apache.spark.examples.SparkPi --master
spark://10.1.3.7:7077 --executor-memory 1G --total-executor-cores 2
./lib/spark-examples-1.0.0-hadoop2.2.0.jar 10


The following is the full screen output:

14/07/17 01:20:13 INFO SecurityManager: Using Spark's default log4j profile:
org/apache/spark/log4j-defaults.properties
14/07/17 01:20:13 INFO SecurityManager: Changing view acls to: azureuser
14/07/17 01:20:13 INFO SecurityManager: SecurityManager: authentication
disabled; ui acls disabled; users with view permissions: Set(azureuser)
14/07/17 01:20:14 INFO Slf4jLogger: Slf4jLogger started
14/07/17 01:20:14 INFO Remoting: Starting remoting
14/07/17 01:20:14 INFO Remoting: Remoting started; listening on addresses
:[akka.tcp://sp...@vmsparkwin1.cssparkwin.b1.internal.cloudapp.net:49839]
14/07/17 01:20:14 INFO Remoting: Remoting now listens on addresses:
[akka.tcp://sp...@vmsparkwin1.cssparkwin.b1.internal.cloudapp.net:49839]
14/07/17 01:20:14 INFO SparkEnv: Registering MapOutputTracker
14/07/17 01:20:14 INFO SparkEnv: Registering BlockManagerMaster
14/07/17 01:20:14 INFO DiskBlockManager: Created local directory at
C:\cygwin\tmp\spark-local-20140717012014-b606
14/07/17 01:20:14 INFO MemoryStore: MemoryStore started with capacity 294.9
MB.
14/07/17 01:20:14 INFO ConnectionManager: Bound socket to port 49842 with id
= ConnectionManagerId(vmsparkwin1.cssparkwin.b1.internal.cloudapp.net,49842)
14/07/17 01:20:14 INFO BlockManagerMaster: Trying to register BlockManager
14/07/17 01:20:14 INFO BlockManagerInfo: Registering block manager
vmsparkwin1.cssparkwin.b1.internal.cloudapp.net:49842 with 294.9 MB RAM
14/07/17 01:20:14 INFO BlockManagerMaster: Registered BlockManager
14/07/17 01:20:14 INFO HttpServer: Starting HTTP Server
14/07/17 01:20:14 INFO HttpBroadcast: Broadcast server started at
http://10.1.3.7:49843
14/07/17 01:20:14 INFO HttpFileServer: HTTP File server directory is
C:\cygwin\tmp\spark-6a076e92-53bb-4c7a-9e27-ce53a818146d
14/07/17 01:20:14 INFO HttpServer: Starting HTTP Server
14/07/17 01:20:15 INFO SparkUI: Started SparkUI at
http://vmsparkwin1.cssparkwin.b1.internal.cloudapp.net:4040
14/07/17 01:20:15 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
14/07/17 01:20:16 INFO SparkContext: Added JAR
file:/C:/opt/spark-1.0.0/./lib/spark-examples-1.0.0-hadoop2.2.0.jar at
http://10.1.3.7:49844/jars/spark-examples-1.0.0-hadoop2.2.0.jar with
timestamp 1405560016316
14/07/17 01:20:16 INFO AppClient$ClientActor: Connecting to master
spark://10.1.3.7:7077...
14/07/17 01:20:16 INFO SparkContext: Starting job: reduce at
SparkPi.scala:35
14/07/17 01:20:16 INFO DAGScheduler: Got job 0 (reduce at SparkPi.scala:35)
with 10 output partitions (allowLocal=false)
14/07/17 01:20:16 INFO DAGScheduler: Final stage: Stage 0(reduce at
SparkPi.scala:35)
14/07/17 01:20:16 INFO DAGScheduler: Parents of final stage: List()

Re: Large scale ranked recommendation

2014-07-17 Thread m3.sharma
We are using RegressionModels that comes with *mllib* package in SPARK.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Large-scale-ranked-recommendation-tp10098p10103.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Large scale ranked recommendation

2014-07-17 Thread Shuo Xiang
Hi,
  Are you suggesting that taking simple vector dot products or sigmoid
function on 10K * 1M data takes 5hrs?


On Thu, Jul 17, 2014 at 3:59 PM, m3.sharma sharm...@umn.edu wrote:

 We are using RegressionModels that comes with *mllib* package in SPARK.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Large-scale-ranked-recommendation-tp10098p10103.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: spark streaming rate limiting from kafka

2014-07-17 Thread Bill Jay
I also have an issue consuming from Kafka. When I consume from Kafka, there
are always a single executor working on this job. Even I use repartition,
it seems that there is still a single executor. Does anyone has an idea how
to add parallelism to this job?



On Thu, Jul 17, 2014 at 2:06 PM, Chen Song chen.song...@gmail.com wrote:

 Thanks Luis and Tobias.


 On Tue, Jul 1, 2014 at 11:39 PM, Tobias Pfeiffer t...@preferred.jp wrote:

 Hi,

 On Wed, Jul 2, 2014 at 1:57 AM, Chen Song chen.song...@gmail.com wrote:

 * Is there a way to control how far Kafka Dstream can read on
 topic-partition (via offset for example). By setting this to a small
 number, it will force DStream to read less data initially.


 Please see the post at

 http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201406.mbox/%3ccaph-c_m2ppurjx-n_tehh0bvqe_6la-rvgtrf1k-lwrmme+...@mail.gmail.com%3E
 Kafka's auto.offset.reset parameter may be what you are looking for.

 Tobias




 --
 Chen Song




Re: Large scale ranked recommendation

2014-07-17 Thread m3.sharma
Yes, thats what prediction should be doing, taking dot products or sigmoid
function for each user,item pair. For 1 million users and 10 K items data
there are 10 billion pairs.  



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Large-scale-ranked-recommendation-tp10098p10107.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Spark Streaming

2014-07-17 Thread Guangle Fan
Hi, All

When I run spark streaming, in one of the flatMap stage, I want to access
database.

Code looks like :

stream.flatMap(
new FlatMapFunction {
call () {
//access database cluster
}
  }
)

Since I don't want to create database connection every time call() was
called, where is the best place do I create the connection and reuse it on
per-host basis (Like one database connection per Mapper/Reducer ) ?

Regards,

Guangle


Re: unserializable object in Spark Streaming context

2014-07-17 Thread Yan Fang
Hi Sean,

Thank you. I see your point. What I was thinking is that, do computation in
a distributed fashion and do the storing from a single place. But you are
right, having multiple DB connections actually is fine.

Thanks for answering my questions. That helps me understand the system.

Cheers,

Fang, Yan
yanfang...@gmail.com
+1 (206) 849-4108


On Thu, Jul 17, 2014 at 2:53 PM, Sean Owen so...@cloudera.com wrote:

 On Thu, Jul 17, 2014 at 10:39 PM, Yan Fang yanfang...@gmail.com wrote:
  Thank you for the help. If I use TD's approache, it works and there is no
  exception. Only drawback is that it will create many connections to the
 DB,
  which I was trying to avoid.

 Connection-like objects aren't data that can be serialized. What would
 it mean to share one connection with N workers? that they all connect
 back to the driver, and through one DB connection there? this defeats
 the purpose of distributed computing. You want multiple DB
 connections. You can limit the number of partitions if needed.


  Here is a snapshot of my code. Mark as red for the important code. What I
  was thinking is that, if I call the collect() method, Spark Streaming
 will
  bring the data to the driver and then the db object does not need to be
 sent

 The Function you pass to foreachRDD() has a reference to db though.
 That's what is making it be serialized.

  to executors. My observation is that, thought exceptions are thrown, the
  insert function still works. Any thought about that? Also paste the log
 in
  case it helps .http://pastebin.com/T1bYvLWB

 Any executors that run locally might skip the serialization and
 succeed (?) but I don't think the remote executors can be succeeding.



Hive From Spark

2014-07-17 Thread JiajiaJing
Hello Spark Users,

I am new to Spark SQL and now trying to first get the HiveFromSpark example
working. 
However, I got the following error when running HiveFromSpark.scala program.
May I get some help on this please?

ERROR MESSAGE:

org.apache.thrift.TApplicationException: Invalid method name: 'get_table'
at
org.apache.thrift.TApplicationException.read(TApplicationException.java:108)
at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:71)
at
org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_get_table(ThriftHiveMetastore.java:936)
at
org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.get_table(ThriftHiveMetastore.java:922)
at
org.apache.hadoop.hive.metastore.HiveMetaStoreClient.getTable(HiveMetaStoreClient.java:854)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:89)
at $Proxy9.getTable(Unknown Source)
at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:950)
at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:905)
at
org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.analyzeCreateTable(SemanticAnalyzer.java:8999)
at
org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.analyzeInternal(SemanticAnalyzer.java:8313)
at
org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.analyze(BaseSemanticAnalyzer.java:284)
at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:441)
at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:342)
at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:977)
at org.apache.hadoop.hive.ql.Driver.run(Driver.java:888)
at org.apache.spark.sql.hive.HiveContext.runHive(HiveContext.scala:186)
at 
org.apache.spark.sql.hive.HiveContext.runSqlHive(HiveContext.scala:160)
at
org.apache.spark.sql.hive.HiveContext$QueryExecution.toRdd$lzycompute(HiveContext.scala:250)
at
org.apache.spark.sql.hive.HiveContext$QueryExecution.toRdd(HiveContext.scala:247)
at org.apache.spark.sql.hive.HiveContext.hiveql(HiveContext.scala:85)
at org.apache.spark.sql.hive.HiveContext.hql(HiveContext.scala:90)
at HiveFromSpark$.main(HiveFromSpark.scala:38)
at HiveFromSpark.main(HiveFromSpark.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:292)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)



Thank you very much!

JJing



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Hive-From-Spark-tp10110.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: replacement for SPARK_LIBRARY_PATH ?

2014-07-17 Thread Koert Kuipers
but be aware that spark-defaults.conf is only used if you use spark-submit
On Jul 17, 2014 4:29 PM, Zongheng Yang zonghen...@gmail.com wrote:

 One way is to set this in your conf/spark-defaults.conf:

 spark.executor.extraLibraryPath /path/to/native/lib

 The key is documented here:
 http://spark.apache.org/docs/latest/configuration.html

 On Thu, Jul 17, 2014 at 1:25 PM, Eric Friedman
 eric.d.fried...@gmail.com wrote:
  I used to use SPARK_LIBRARY_PATH to specify the location of native libs
  for lzo compression when using spark 0.9.0.
 
  The references to that environment variable have disappeared from the
 docs
  for
  spark 1.0.1 and it's not clear how to specify the location for lzo.
 
  Any guidance?



Re: Is there a way to get previous/other keys' state in Spark Streaming?

2014-07-17 Thread Yan Fang
Hi TD,

Thank you. Yes, it behaves as you described. Sorry for missing this point.

Then my only concern is in the performance side - since Spark Streaming
operates on all the keys everytime a new batch comes, I think it is fine
when the state size is small. When the state size becomes big, say, a few
GBs, if we still go through the whole key list, would the operation be a
little inefficient then? Maybe I miss some points in Spark Streaming, which
consider this situation.

Cheers,

Fang, Yan
yanfang...@gmail.com
+1 (206) 849-4108


On Thu, Jul 17, 2014 at 1:47 PM, Tathagata Das tathagata.das1...@gmail.com
wrote:

 The updateFunction given in updateStateByKey should be called on ALL the
 keys are in the state, even if there is no new data in the batch for some
 key. Is that not the behavior you see?

 What do you mean by show all the existing states? You have access to the
 latest state RDD by doing stateStream.foreachRDD(...). There you can do
 whatever operation on all the key-state pairs.

 TD




 On Thu, Jul 17, 2014 at 11:58 AM, Yan Fang yanfang...@gmail.com wrote:

 Hi TD,

 Thank you for the quick replying and backing my approach. :)

 1) The example is this:

 1. In the first 2 second interval, after updateStateByKey, I get a few
 keys and their states, say, (a - 1, b - 2, c - 3)
 2. In the following 2 second interval, I only receive c and d and
 their value. But I want to update/display the state of a and b
 accordingly.
 * It seems I have no way to access the a and b and get their
 states.
 * also, do I have a way to show all the existing states?

 I guess the approach to solve this will be similar to what you mentioned
 for 2). But the difficulty is that, if I want to display all the existing
 states, need to bundle all the rest keys to one key.

 Thank you.

 Cheers,

 Fang, Yan
 yanfang...@gmail.com
 +1 (206) 849-4108


 On Thu, Jul 17, 2014 at 11:36 AM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 For accessing previous version, I would do it the same way. :)

 1. Can you elaborate on what you mean by that with an example? What do
 you mean by accessing keys?

 2. Yeah, that is hard to do with the ability to do point lookups into an
 RDD, which we dont support yet. You could try embedding the related key in
 the values of the keys that need it. That is, B will is present in the
 value of key A. Then put this transformed DStream through updateStateByKey.

 TD






Re: can't get jobs to run on cluster (enough memory and cpus are available on worker)

2014-07-17 Thread Andrew Or
Hi Matt,

The security group shouldn't be an issue; the ports listed in
`spark_ec2.py` are only for communication with the outside world.

How did you launch your application? I notice you did not launch your
driver from your Master node. What happens if you did? Another thing is
that there seems to be some inconsistency or missing pieces in the logs you
posted. After an executor says driver disassociated, what happens in the
driver logs? Is an exception thrown or something?

It would be useful if you could also post your conf/spark-env.sh.

Andrew


2014-07-17 14:11 GMT-07:00 Marcelo Vanzin van...@cloudera.com:

 Hi Matt,

 I'm not very familiar with setup on ec2; the closest I can point you
 at is to look at the launch_cluster in ec2/spark_ec2.py, where the
 ports seem to be configured.


 On Thu, Jul 17, 2014 at 1:29 PM, Matt Work Coarr
 mattcoarr.w...@gmail.com wrote:
  Thanks Marcelo!  This is a huge help!!
 
  Looking at the executor logs (in a vanilla spark install, I'm finding
 them
  in $SPARK_HOME/work/*)...
 
  It launches the executor, but it looks like the
 CoarseGrainedExecutorBackend
  is having trouble talking to the driver (exactly what you said!!!).
 
  Do you know what the range of random ports that is used for the the
  executor-to-driver?  Is that range adjustable?  Any config setting or
  environment variable?
 
  I manually setup my ec2 security group to include all the ports that the
  spark ec2 script ($SPARK_HOME/ec2/spark_ec2.py) sets up in it's security
  groups.  They included (for those listed above 1):
  1
  50060
  50070
  50075
  60060
  60070
  60075
 
  Obviously I'll need to make some adjustments to my EC2 security group!
  Just
  need to figure out exactly what should be in there.  To keep things
 simple,
  I just have one security group for the master, slaves, and the driver
  machine.
 
  In listing the port ranges in my current security group I looked at the
  ports that spark_ec2.py sets up as well as the ports listed in the spark
  standalone mode documentation page under configuring ports for network
  security:
 
  http://spark.apache.org/docs/latest/spark-standalone.html
 
 
  Here are the relevant fragments from the executor log:
 
  Spark Executor Command: /cask/jdk/bin/java -cp
 
 ::/cask/spark/conf:/cask/spark/lib/spark-assembly-1.0.0-hadoop2.2.0.jar:/cask/spark/lib/datanucleus-api-jdo-3.
 
 
 2.1.jar:/cask/spark/lib/datanucleus-rdbms-3.2.1.jar:/cask/spark/lib/datanucleus-core-3.2.2.jar
  -XX:MaxPermSize=128m -Dspark.akka.frameSize=100 -Dspark.akka.
 
  frameSize=100 -Xms512M -Xmx512M
  org.apache.spark.executor.CoarseGrainedExecutorBackend
  akka.tcp://spark@ip-10-202-11-191.ec2.internal:46787/user/CoarseGra
 
  inedScheduler 0 ip-10-202-8-45.ec2.internal 8
  akka.tcp://sparkWorker@ip-10-202-8-45.ec2.internal:7101/user/Worker
  app-20140717195146-
 
  
 
  ...
 
  14/07/17 19:51:47 DEBUG NativeCodeLoader: Trying to load the custom-built
  native-hadoop library...
 
  14/07/17 19:51:47 DEBUG NativeCodeLoader: Failed to load native-hadoop
 with
  error: java.lang.UnsatisfiedLinkError: no hadoop in java.library.path
 
  14/07/17 19:51:47 DEBUG NativeCodeLoader:
 
 java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
 
  14/07/17 19:51:47 WARN NativeCodeLoader: Unable to load native-hadoop
  library for your platform... using builtin-java classes where applicable
 
  14/07/17 19:51:47 DEBUG JniBasedUnixGroupsMappingWithFallback: Falling
 back
  to shell based
 
  14/07/17 19:51:47 DEBUG JniBasedUnixGroupsMappingWithFallback: Group
 mapping
  impl=org.apache.hadoop.security.ShellBasedUnixGroupsMapping
 
  14/07/17 19:51:48 DEBUG Groups: Group mapping
  impl=org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback;
  cacheTimeout=30
 
  14/07/17 19:51:48 DEBUG SparkHadoopUtil: running as user: ec2-user
 
  ...
 
 
  14/07/17 19:51:48 INFO CoarseGrainedExecutorBackend: Connecting to
 driver:
  akka.tcp://spark@ip-10-202-11-191.ec2.internal
 :46787/user/CoarseGrainedScheduler
 
  14/07/17 19:51:48 INFO WorkerWatcher: Connecting to worker
  akka.tcp://sparkWorker@ip-10-202-8-45.ec2.internal:7101/user/Worker
 
  14/07/17 19:51:49 INFO WorkerWatcher: Successfully connected to
  akka.tcp://sparkWorker@ip-10-202-8-45.ec2.internal:7101/user/Worker
 
  14/07/17 19:53:29 ERROR CoarseGrainedExecutorBackend: Driver
 Disassociated
  [akka.tcp://sparkExecutor@ip-10-202-8-45.ec2.internal:55670] -
  [akka.tcp://spark@ip-10-202-11-191.ec2.internal:46787] disassociated!
  Shutting down.
 
 
  Thanks a bunch!
  Matt
 
 
  On Thu, Jul 17, 2014 at 1:21 PM, Marcelo Vanzin van...@cloudera.com
 wrote:
 
  When I meant the executor log, I meant the log of the process launched
  by the worker, not the worker. In my CDH-based Spark install, those
  end up in /var/run/spark/work.
 
  If you look at your worker log, you'll see it's launching the executor
  process. So there should be something there.
 

Re: Include permalinks in mail footer

2014-07-17 Thread Tobias Pfeiffer

 On Jul 17, 2014, at 12:59 PM, Nick Chammas nicholas.cham...@gmail.com
 wrote:

 I often find myself wanting to reference one thread from another, or from
 a JIRA issue. Right now I have to google the thread subject and find the
 link that way.


+1


Re: spark streaming rate limiting from kafka

2014-07-17 Thread Tobias Pfeiffer
Bill,

are you saying, after repartition(400), you have 400 partitions on one host
and the other hosts receive nothing of the data?

Tobias


On Fri, Jul 18, 2014 at 8:11 AM, Bill Jay bill.jaypeter...@gmail.com
wrote:

 I also have an issue consuming from Kafka. When I consume from Kafka,
 there are always a single executor working on this job. Even I use
 repartition, it seems that there is still a single executor. Does anyone
 has an idea how to add parallelism to this job?



 On Thu, Jul 17, 2014 at 2:06 PM, Chen Song chen.song...@gmail.com wrote:

 Thanks Luis and Tobias.


 On Tue, Jul 1, 2014 at 11:39 PM, Tobias Pfeiffer t...@preferred.jp
 wrote:

 Hi,

 On Wed, Jul 2, 2014 at 1:57 AM, Chen Song chen.song...@gmail.com
 wrote:

 * Is there a way to control how far Kafka Dstream can read on
 topic-partition (via offset for example). By setting this to a small
 number, it will force DStream to read less data initially.


 Please see the post at

 http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201406.mbox/%3ccaph-c_m2ppurjx-n_tehh0bvqe_6la-rvgtrf1k-lwrmme+...@mail.gmail.com%3E
 Kafka's auto.offset.reset parameter may be what you are looking for.

 Tobias




 --
 Chen Song





Re: how to pass extra Java opts to workers for spark streaming jobs

2014-07-17 Thread Tathagata Das
Can you check in the environment tab of Spark web ui to see whether this
configuration parameter is in effect?

TD


On Thu, Jul 17, 2014 at 2:05 PM, Chen Song chen.song...@gmail.com wrote:

 I am using spark 0.9.0 and I am able to submit job to YARN,
 https://spark.apache.org/docs/0.9.0/running-on-yarn.html.

 I am trying to turn on gc logging on executors but could not find a way to
 set extra Java opts for workers.

 I tried to set spark.executor.extraJavaOptions but that did not work.

 Any idea on how I should do this?

 --
 Chen Song




Re: spark streaming rate limiting from kafka

2014-07-17 Thread Tathagata Das
You can create multiple kafka stream to partition your topics across them,
which will run multiple receivers or multiple executors. This is covered in
the Spark streaming guide.
http://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving

And for the purpose of this thread, to answer the original question, we now
have the ability
https://issues.apache.org/jira/browse/SPARK-1854?jql=project%20%3D%20SPARK%20AND%20resolution%20%3D%20Unresolved%20AND%20component%20%3D%20Streaming%20ORDER%20BY%20priority%20DESC
to limit the receiving rate. Its in the master branch, and will be
available in Spark 1.1. It basically sets the limits at the receiver level
(so applies to all sources) on what is the max records per second that can
will be received by the receiver.

TD


On Thu, Jul 17, 2014 at 6:15 PM, Tobias Pfeiffer t...@preferred.jp wrote:

 Bill,

 are you saying, after repartition(400), you have 400 partitions on one
 host and the other hosts receive nothing of the data?

 Tobias


 On Fri, Jul 18, 2014 at 8:11 AM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 I also have an issue consuming from Kafka. When I consume from Kafka,
 there are always a single executor working on this job. Even I use
 repartition, it seems that there is still a single executor. Does anyone
 has an idea how to add parallelism to this job?



 On Thu, Jul 17, 2014 at 2:06 PM, Chen Song chen.song...@gmail.com
 wrote:

 Thanks Luis and Tobias.


 On Tue, Jul 1, 2014 at 11:39 PM, Tobias Pfeiffer t...@preferred.jp
 wrote:

 Hi,

 On Wed, Jul 2, 2014 at 1:57 AM, Chen Song chen.song...@gmail.com
 wrote:

 * Is there a way to control how far Kafka Dstream can read on
 topic-partition (via offset for example). By setting this to a small
 number, it will force DStream to read less data initially.


 Please see the post at

 http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201406.mbox/%3ccaph-c_m2ppurjx-n_tehh0bvqe_6la-rvgtrf1k-lwrmme+...@mail.gmail.com%3E
 Kafka's auto.offset.reset parameter may be what you are looking for.

 Tobias




 --
 Chen Song






Re: Error with spark-submit (formatting corrected)

2014-07-17 Thread Andrew Or
Hi ranjanp,

If you go to the master UI (masterIP:8080), what does the first line say?
Verify that this is the same as what you expect. Another thing is that
--master in spark submit overwrites whatever you set MASTER to, so the
environment variable won't actually take effect. Another obvious thing to
check is whether the node from which you launch spark submit can access the
internal address of the master (and port 7077). One quick way to verify
that is to attempt a telnet into it.

Let me know if you find anything.
Andrew


2014-07-17 15:57 GMT-07:00 ranjanp piyush_ran...@hotmail.com:

 Hi,
 I am new to Spark and trying out with a stand-alone, 3-node (1 master, 2
 workers) cluster.

 From the Web UI at the master, I see that the workers are registered. But
 when I try running the SparkPi example from the master node, I get the
 following message and then an exception.

 14/07/17 01:20:36 INFO AppClient$ClientActor: Connecting to master
 spark://10.1.3.7:7077...
 14/07/17 01:20:46 WARN TaskSchedulerImpl: Initial job has not accepted any
 resources; check your cluster UI to ensure that workers are registered and
 have sufficient memory

 I searched a bit for the above warning, and found and found that others
 have
 encountered this problem before, but did not see a clear resolution except
 for this link:

 http://apache-spark-user-list.1001560.n3.nabble.com/TaskSchedulerImpl-Initial-job-has-not-accepted-any-resources-check-your-cluster-UI-to-ensure-that-woy-tt8247.html#a8444

 Based on the suggestion there I tried supplying --executor-memory option to
 spark-submit but that did not help.

 Any suggestions. Here are the details of my set up.
 - 3 nodes (each with 4 CPU cores and 7 GB memory)
 - 1 node configured as Master, and the other two configured as workers
 - Firewall is disabled on all nodes, and network communication between the
 nodes is not a problem
 - Edited the conf/spark-env.sh on all nodes to set the following:
   SPARK_WORKER_CORES=3
   SPARK_WORKER_MEMORY=5G
 - The Web UI as well as logs on master show that Workers were able to
 register correctly. Also the Web UI correctly shows the aggregate available
 memory and CPU cores on the workers:

 URL: spark://vmsparkwin1:7077
 Workers: 2
 Cores: 6 Total, 0 Used
 Memory: 10.0 GB Total, 0.0 B Used
 Applications: 0 Running, 0 Completed
 Drivers: 0 Running, 0 Completed
 Status: ALIVE

 I try running the SparkPi example first using the run-example (which was
 failing) and later directly using the spark-submit as shown below:

 $ export MASTER=spark://vmsparkwin1:7077

 $ echo $MASTER
 spark://vmsparkwin1:7077

 azureuser@vmsparkwin1 /cygdrive/c/opt/spark-1.0.0
 $ ./bin/spark-submit --class org.apache.spark.examples.SparkPi --master
 spark://10.1.3.7:7077 --executor-memory 1G --total-executor-cores 2
 ./lib/spark-examples-1.0.0-hadoop2.2.0.jar 10


 The following is the full screen output:

 14/07/17 01:20:13 INFO SecurityManager: Using Spark's default log4j
 profile:
 org/apache/spark/log4j-defaults.properties
 14/07/17 01:20:13 INFO SecurityManager: Changing view acls to: azureuser
 14/07/17 01:20:13 INFO SecurityManager: SecurityManager: authentication
 disabled; ui acls disabled; users with view permissions: Set(azureuser)
 14/07/17 01:20:14 INFO Slf4jLogger: Slf4jLogger started
 14/07/17 01:20:14 INFO Remoting: Starting remoting
 14/07/17 01:20:14 INFO Remoting: Remoting started; listening on addresses
 :[akka.tcp://sp...@vmsparkwin1.cssparkwin.b1.internal.cloudapp.net:49839]
 14/07/17 01:20:14 INFO Remoting: Remoting now listens on addresses:
 [akka.tcp://sp...@vmsparkwin1.cssparkwin.b1.internal.cloudapp.net:49839]
 14/07/17 01:20:14 INFO SparkEnv: Registering MapOutputTracker
 14/07/17 01:20:14 INFO SparkEnv: Registering BlockManagerMaster
 14/07/17 01:20:14 INFO DiskBlockManager: Created local directory at
 C:\cygwin\tmp\spark-local-20140717012014-b606
 14/07/17 01:20:14 INFO MemoryStore: MemoryStore started with capacity 294.9
 MB.
 14/07/17 01:20:14 INFO ConnectionManager: Bound socket to port 49842 with
 id
 = ConnectionManagerId(vmsparkwin1.cssparkwin.b1.internal.cloudapp.net
 ,49842)
 14/07/17 01:20:14 INFO BlockManagerMaster: Trying to register BlockManager
 14/07/17 01:20:14 INFO BlockManagerInfo: Registering block manager
 vmsparkwin1.cssparkwin.b1.internal.cloudapp.net:49842 with 294.9 MB RAM
 14/07/17 01:20:14 INFO BlockManagerMaster: Registered BlockManager
 14/07/17 01:20:14 INFO HttpServer: Starting HTTP Server
 14/07/17 01:20:14 INFO HttpBroadcast: Broadcast server started at
 http://10.1.3.7:49843
 14/07/17 01:20:14 INFO HttpFileServer: HTTP File server directory is
 C:\cygwin\tmp\spark-6a076e92-53bb-4c7a-9e27-ce53a818146d
 14/07/17 01:20:14 INFO HttpServer: Starting HTTP Server
 14/07/17 01:20:15 INFO SparkUI: Started SparkUI at
 http://vmsparkwin1.cssparkwin.b1.internal.cloudapp.net:4040
 14/07/17 01:20:15 WARN NativeCodeLoader: Unable to load native-hadoop
 library for your platform... using builtin-java classes where 

Re: how to pass extra Java opts to workers for spark streaming jobs

2014-07-17 Thread Andrew Or
Hi Chen,

spark.executor.extraJavaOptions is introduced in Spark 1.0, not in Spark
0.9. You need to

export SPARK_JAVA_OPTS= -Dspark.config1=value1 -Dspark.config2=value2

in conf/spark-env.sh.

Let me know if that works.
Andrew


2014-07-17 18:15 GMT-07:00 Tathagata Das tathagata.das1...@gmail.com:

 Can you check in the environment tab of Spark web ui to see whether this
 configuration parameter is in effect?

 TD


 On Thu, Jul 17, 2014 at 2:05 PM, Chen Song chen.song...@gmail.com wrote:

 I am using spark 0.9.0 and I am able to submit job to YARN,
 https://spark.apache.org/docs/0.9.0/running-on-yarn.html.

 I am trying to turn on gc logging on executors but could not find a way
 to set extra Java opts for workers.

 I tried to set spark.executor.extraJavaOptions but that did not work.

 Any idea on how I should do this?

 --
 Chen Song





Re: jar changed on src filesystem

2014-07-17 Thread Andrew Or
Hi Jian,

In yarn-cluster mode, Spark submit automatically uploads the assembly jar
to a distributed cache that all executor containers read from, so there is
no need to manually copy the assembly jar to all nodes (or pass it through
--jars).
It seems there are two versions of the same jar in your HDFS. Try removing
all old jars from your .sparkStaging directory and try again?

Let me know if that does the job,
Andrew


2014-07-16 23:42 GMT-07:00 cmti95035 cmti95...@gmail.com:

 They're all the same version. Actually even without the --jars parameter
 it
 got the same error. Looks like it needs to copy the assembly jar for
 running
 the example jar anyway during the staging.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/jar-changed-on-src-filesystem-tp10011p10017.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Spark Streaming

2014-07-17 Thread Tathagata Das
Step 1: use rdd.mapPartitions(). Thats equivalent to mapper of the
MapReduce. You can open connection, get all the data and buffer it, close
connection, return iterator to the buffer
Step 2: Make step 1 better, by making it reuse connections. You can use
singletons / static vars, to lazily initialize and reuse a pool of
connections. You will have to take care of concurrency, as multiple tasks
may using the database in parallel in the same worker JVM.

TD


On Thu, Jul 17, 2014 at 4:41 PM, Guangle Fan fanguan...@gmail.com wrote:

 Hi, All

 When I run spark streaming, in one of the flatMap stage, I want to access
 database.

 Code looks like :

 stream.flatMap(
 new FlatMapFunction {
 call () {
 //access database cluster
 }
   }
 )

 Since I don't want to create database connection every time call() was
 called, where is the best place do I create the connection and reuse it on
 per-host basis (Like one database connection per Mapper/Reducer ) ?

 Regards,

 Guangle




Re: Errors accessing hdfs while in local mode

2014-07-17 Thread Andrew Or
Hi Chris,

Did you ever figure this out? It should just work provided that your HDFS
is set up correctly. If you don't call setMaster, it actually uses the
spark://[master-node-ip]:7077 by default (this is configured in your
conf/spark-env.sh). However, even if you use a local master, it should
still work (I just tried this on my own EC2 cluster). By the way,
SPARK_MASTER is actually deprecated. Instead, please use bin/spark-submit
--master [your master].

Andrew


2014-07-16 23:46 GMT-07:00 Akhil Das ak...@sigmoidanalytics.com:

 You can try the following in the spark-shell:

 1. Run it in *Clustermode* by going inside the spark directory:

 $ SPARK_MASTER=spark://masterip:7077 ./bin/spark-shell

 val textFile = sc.textFile(hdfs://masterip/data/blah.csv)

 textFile.take(10).foreach(println)


 2. Now try running in *Localmode:*

 $ SPARK_MASTER=local ./bin/spark-shell

 val textFile = sc.textFile(hdfs://masterip/data/blah.csv)

 textFile.take(10).foreach(println)


 ​Both shoul​d print the first 10 lines from your blah.csv file.






Re: Is there a way to get previous/other keys' state in Spark Streaming?

2014-07-17 Thread Tathagata Das
Yes, this is the limitation of the current implementation. But this will be
improved a lt when we have IndexedRDD
https://github.com/apache/spark/pull/1297 in the Spark that allows faster
single value updates to a key-value (within each partition, without
processing the entire partition.

Soon.

TD


On Thu, Jul 17, 2014 at 5:57 PM, Yan Fang yanfang...@gmail.com wrote:

 Hi TD,

 Thank you. Yes, it behaves as you described. Sorry for missing this point.

 Then my only concern is in the performance side - since Spark Streaming
 operates on all the keys everytime a new batch comes, I think it is fine
 when the state size is small. When the state size becomes big, say, a few
 GBs, if we still go through the whole key list, would the operation be a
 little inefficient then? Maybe I miss some points in Spark Streaming, which
 consider this situation.

 Cheers,

 Fang, Yan
 yanfang...@gmail.com
 +1 (206) 849-4108


 On Thu, Jul 17, 2014 at 1:47 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 The updateFunction given in updateStateByKey should be called on ALL the
 keys are in the state, even if there is no new data in the batch for some
 key. Is that not the behavior you see?

 What do you mean by show all the existing states? You have access to
 the latest state RDD by doing stateStream.foreachRDD(...). There you can do
 whatever operation on all the key-state pairs.

 TD




 On Thu, Jul 17, 2014 at 11:58 AM, Yan Fang yanfang...@gmail.com wrote:

 Hi TD,

 Thank you for the quick replying and backing my approach. :)

 1) The example is this:

 1. In the first 2 second interval, after updateStateByKey, I get a few
 keys and their states, say, (a - 1, b - 2, c - 3)
 2. In the following 2 second interval, I only receive c and d and
 their value. But I want to update/display the state of a and b
 accordingly.
 * It seems I have no way to access the a and b and get their
 states.
 * also, do I have a way to show all the existing states?

 I guess the approach to solve this will be similar to what you mentioned
 for 2). But the difficulty is that, if I want to display all the existing
 states, need to bundle all the rest keys to one key.

 Thank you.

 Cheers,

 Fang, Yan
 yanfang...@gmail.com
 +1 (206) 849-4108


 On Thu, Jul 17, 2014 at 11:36 AM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 For accessing previous version, I would do it the same way. :)

 1. Can you elaborate on what you mean by that with an example? What do
 you mean by accessing keys?

 2. Yeah, that is hard to do with the ability to do point lookups into
 an RDD, which we dont support yet. You could try embedding the related key
 in the values of the keys that need it. That is, B will is present in the
 value of key A. Then put this transformed DStream through updateStateByKey.

 TD







Re: Error with spark-submit (formatting corrected)

2014-07-17 Thread Jay Vyas
I think I know what is happening to you.  I've looked some into this just this 
week, and so its fresh in my brain :) hope this helps.


When no workers are known to the master, iirc, you get this message.

I think  this is how it works.

1) You start your master
2) You start a slave, and give it master url as an argument.
3) The slave then binds to a random port
4) The slave then does a handshake with master, which you can see in the slave 
logs (it sais something like sucesfully connected to master at ….
  Actualy, i think tha master also logs that it now is aware of a slave running 
on ip:port…

So in your case, I suspect, none of the slaves have connected to the master, so 
the job sits idle.

This is similar to the yarn scenario of submitting a job to a resource manager 
with no node-managers running. 



On Jul 17, 2014, at 6:57 PM, ranjanp piyush_ran...@hotmail.com wrote:

 Hi, 
 I am new to Spark and trying out with a stand-alone, 3-node (1 master, 2
 workers) cluster. 
 
 From the Web UI at the master, I see that the workers are registered. But
 when I try running the SparkPi example from the master node, I get the
 following message and then an exception. 
 
 14/07/17 01:20:36 INFO AppClient$ClientActor: Connecting to master
 spark://10.1.3.7:7077... 
 14/07/17 01:20:46 WARN TaskSchedulerImpl: Initial job has not accepted any
 resources; check your cluster UI to ensure that workers are registered and
 have sufficient memory 
 
 I searched a bit for the above warning, and found and found that others have
 encountered this problem before, but did not see a clear resolution except
 for this link:
 http://apache-spark-user-list.1001560.n3.nabble.com/TaskSchedulerImpl-Initial-job-has-not-accepted-any-resources-check-your-cluster-UI-to-ensure-that-woy-tt8247.html#a8444
 
 Based on the suggestion there I tried supplying --executor-memory option to
 spark-submit but that did not help. 
 
 Any suggestions. Here are the details of my set up. 
 - 3 nodes (each with 4 CPU cores and 7 GB memory) 
 - 1 node configured as Master, and the other two configured as workers 
 - Firewall is disabled on all nodes, and network communication between the
 nodes is not a problem 
 - Edited the conf/spark-env.sh on all nodes to set the following: 
  SPARK_WORKER_CORES=3 
  SPARK_WORKER_MEMORY=5G 
 - The Web UI as well as logs on master show that Workers were able to
 register correctly. Also the Web UI correctly shows the aggregate available
 memory and CPU cores on the workers: 
 
 URL: spark://vmsparkwin1:7077
 Workers: 2
 Cores: 6 Total, 0 Used
 Memory: 10.0 GB Total, 0.0 B Used
 Applications: 0 Running, 0 Completed
 Drivers: 0 Running, 0 Completed
 Status: ALIVE
 
 I try running the SparkPi example first using the run-example (which was
 failing) and later directly using the spark-submit as shown below: 
 
 $ export MASTER=spark://vmsparkwin1:7077
 
 $ echo $MASTER
 spark://vmsparkwin1:7077
 
 azureuser@vmsparkwin1 /cygdrive/c/opt/spark-1.0.0
 $ ./bin/spark-submit --class org.apache.spark.examples.SparkPi --master
 spark://10.1.3.7:7077 --executor-memory 1G --total-executor-cores 2
 ./lib/spark-examples-1.0.0-hadoop2.2.0.jar 10
 
 
 The following is the full screen output:
 
 14/07/17 01:20:13 INFO SecurityManager: Using Spark's default log4j profile:
 org/apache/spark/log4j-defaults.properties
 14/07/17 01:20:13 INFO SecurityManager: Changing view acls to: azureuser
 14/07/17 01:20:13 INFO SecurityManager: SecurityManager: authentication
 disabled; ui acls disabled; users with view permissions: Set(azureuser)
 14/07/17 01:20:14 INFO Slf4jLogger: Slf4jLogger started
 14/07/17 01:20:14 INFO Remoting: Starting remoting
 14/07/17 01:20:14 INFO Remoting: Remoting started; listening on addresses
 :[akka.tcp://sp...@vmsparkwin1.cssparkwin.b1.internal.cloudapp.net:49839]
 14/07/17 01:20:14 INFO Remoting: Remoting now listens on addresses:
 [akka.tcp://sp...@vmsparkwin1.cssparkwin.b1.internal.cloudapp.net:49839]
 14/07/17 01:20:14 INFO SparkEnv: Registering MapOutputTracker
 14/07/17 01:20:14 INFO SparkEnv: Registering BlockManagerMaster
 14/07/17 01:20:14 INFO DiskBlockManager: Created local directory at
 C:\cygwin\tmp\spark-local-20140717012014-b606
 14/07/17 01:20:14 INFO MemoryStore: MemoryStore started with capacity 294.9
 MB.
 14/07/17 01:20:14 INFO ConnectionManager: Bound socket to port 49842 with id
 = ConnectionManagerId(vmsparkwin1.cssparkwin.b1.internal.cloudapp.net,49842)
 14/07/17 01:20:14 INFO BlockManagerMaster: Trying to register BlockManager
 14/07/17 01:20:14 INFO BlockManagerInfo: Registering block manager
 vmsparkwin1.cssparkwin.b1.internal.cloudapp.net:49842 with 294.9 MB RAM
 14/07/17 01:20:14 INFO BlockManagerMaster: Registered BlockManager
 14/07/17 01:20:14 INFO HttpServer: Starting HTTP Server
 14/07/17 01:20:14 INFO HttpBroadcast: Broadcast server started at
 http://10.1.3.7:49843
 14/07/17 01:20:14 INFO HttpFileServer: HTTP File server directory is
 

iScala or Scala-notebook

2014-07-17 Thread ericjohnston1989
Hey everyone,

I know this was asked before but I'm wondering if there have since been any
updates. Are there any plans to integrate iScala/Scala-notebook with spark
in the near future?

This seems like something a lot of people would find very useful, so I was
just wondering if anyone has started working on it.

Thanks,

Eric



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/iScala-or-Scala-notebook-tp10127.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Cannot connect to hive metastore

2014-07-17 Thread linkpatrickliu
Seems like the mysql connector jar is not included in the classpath.
Where can I set the jar to the classpath?

hive-site.xml:
  property
namejavax.jdo.option.ConnectionURL/name
   
valuejdbc:mysql://localhost:3306/metastore?createDatabaseIfNotExist=trueamp;characterEncoding=UTF-8/value
descriptionJDBC connect string for a JDBC metastore/description
  /property

Log:
14/07/18 11:46:58 ERROR DDLTask:
org.apache.hadoop.hive.ql.metadata.HiveException:
java.lang.RuntimeException: Unable to instantiate
org.apache.hadoop.hive.metastore.HiveMetaStoreClient
at org.apache.hadoop.hive.ql.metadata.Hive.getDatabase(Hive.java:1143)
at 
org.apache.hadoop.hive.ql.metadata.Hive.databaseExists(Hive.java:1128)
at org.apache.hadoop.hive.ql.exec.DDLTask.showTables(DDLTask.java:2236)
at org.apache.hadoop.hive.ql.exec.DDLTask.execute(DDLTask.java:333)
at org.apache.hadoop.hive.ql.exec.Task.executeTask(Task.java:151)
at
org.apache.hadoop.hive.ql.exec.TaskRunner.runSequential(TaskRunner.java:65)
at org.apache.hadoop.hive.ql.Driver.launchTask(Driver.java:1414)
at org.apache.hadoop.hive.ql.Driver.execute(Driver.java:1192)
at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1020)
at org.apache.hadoop.hive.ql.Driver.run(Driver.java:888)
at org.apache.spark.sql.hive.HiveContext.runHive(HiveContext.scala:189)
at 
org.apache.spark.sql.hive.HiveContext.runSqlHive(HiveContext.scala:163)
at
org.apache.spark.sql.hive.execution.NativeCommand.sideEffectResult$lzycompute(NativeCommand.scala:35)
at
org.apache.spark.sql.hive.execution.NativeCommand.sideEffectResult(NativeCommand.scala:35)
at
org.apache.spark.sql.hive.execution.NativeCommand.execute(NativeCommand.scala:38)
at
org.apache.spark.sql.hive.HiveContext$QueryExecution.toRdd$lzycompute(HiveContext.scala:250)
at
org.apache.spark.sql.hive.HiveContext$QueryExecution.toRdd(HiveContext.scala:250)
at 
org.apache.spark.sql.SchemaRDDLike$class.$init$(SchemaRDDLike.scala:58)
at org.apache.spark.sql.SchemaRDD.init(SchemaRDD.scala:100)
at org.apache.spark.sql.hive.HiveContext.hiveql(HiveContext.scala:75)
at org.apache.spark.sql.hive.HiveContext.hql(HiveContext.scala:78)
at $line9.$read$$iwC$$iwC$$iwC$$iwC.init(console:15)
at $line9.$read$$iwC$$iwC$$iwC.init(console:20)
at $line9.$read$$iwC$$iwC.init(console:22)
at $line9.$read$$iwC.init(console:24)
at $line9.$read.init(console:26)
at $line9.$read$.init(console:30)
at $line9.$read$.clinit(console)
at $line9.$eval$.init(console:7)
at $line9.$eval$.clinit(console)
at $line9.$eval.$print(console)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:788)
at
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1056)
at 
org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:614)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:645)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:609)
at 
org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:796)
at
org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:841)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:753)
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:601)
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:608)
at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:611)
at
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:936)
at
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:884)
at
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:884)
at
scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:884)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:982)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:303)
  

Re: how to pass extra Java opts to workers for spark streaming jobs

2014-07-17 Thread Chen Song
Thanks Andrew.

Say that I want to turn on CMS gc for each worker.

All I need to do is add the following line to conf/spark-env.sh on node
where I submit the application.

-XX:+UseConcMarkSweepGC

Is that correct?

Will this option be populated to each worker in yarn?



On Thu, Jul 17, 2014 at 9:26 PM, Andrew Or and...@databricks.com wrote:

 Hi Chen,

 spark.executor.extraJavaOptions is introduced in Spark 1.0, not in Spark
 0.9. You need to

 export SPARK_JAVA_OPTS= -Dspark.config1=value1 -Dspark.config2=value2

 in conf/spark-env.sh.

 Let me know if that works.
 Andrew


 2014-07-17 18:15 GMT-07:00 Tathagata Das tathagata.das1...@gmail.com:

 Can you check in the environment tab of Spark web ui to see whether this
 configuration parameter is in effect?

 TD


 On Thu, Jul 17, 2014 at 2:05 PM, Chen Song chen.song...@gmail.com
 wrote:

 I am using spark 0.9.0 and I am able to submit job to YARN,
 https://spark.apache.org/docs/0.9.0/running-on-yarn.html.

 I am trying to turn on gc logging on executors but could not find a way
 to set extra Java opts for workers.

 I tried to set spark.executor.extraJavaOptions but that did not work.

 Any idea on how I should do this?

 --
 Chen Song






-- 
Chen Song


Re: Spark Streaming Json file groupby function

2014-07-17 Thread srinivas
Hi TD,
It Worked...Thank you so much for all your help.

Thanks,
-Srinivas.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Json-file-groupby-function-tp9618p10132.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: how to pass extra Java opts to workers for spark streaming jobs

2014-07-17 Thread Andrew Or
You will need to include that in the SPARK_JAVA_OPTS environment variable,
so add the following line to spark-env.sh:

export SPARK_JAVA_OPTS= -XX:+UseConcMarkSweepGC

This should propagate to the executors. (Though you should double check,
since 0.9 is a little old and I could be forgetting something) If you wish
to add spark options in addition to this, simply append them to the
environment variable:

export SPARK_JAVA_OPTS= -XX:+UseConcMarkSweepGC -Dspark.config.one=value
-Dspark.config.two=value

(Please note that this is only for Spark 0.9. The part where we set Spark
options within SPARK_JAVA_OPTS is deprecated as of 1.0)


2014-07-17 21:08 GMT-07:00 Chen Song chen.song...@gmail.com:

 Thanks Andrew.

 Say that I want to turn on CMS gc for each worker.

 All I need to do is add the following line to conf/spark-env.sh on node
 where I submit the application.

 -XX:+UseConcMarkSweepGC

 Is that correct?

 Will this option be populated to each worker in yarn?



 On Thu, Jul 17, 2014 at 9:26 PM, Andrew Or and...@databricks.com wrote:

 Hi Chen,

 spark.executor.extraJavaOptions is introduced in Spark 1.0, not in Spark
 0.9. You need to

 export SPARK_JAVA_OPTS= -Dspark.config1=value1 -Dspark.config2=value2

 in conf/spark-env.sh.

 Let me know if that works.
 Andrew


 2014-07-17 18:15 GMT-07:00 Tathagata Das tathagata.das1...@gmail.com:

 Can you check in the environment tab of Spark web ui to see whether this
 configuration parameter is in effect?

 TD


 On Thu, Jul 17, 2014 at 2:05 PM, Chen Song chen.song...@gmail.com
 wrote:

 I am using spark 0.9.0 and I am able to submit job to YARN,
 https://spark.apache.org/docs/0.9.0/running-on-yarn.html.

 I am trying to turn on gc logging on executors but could not find a way
 to set extra Java opts for workers.

 I tried to set spark.executor.extraJavaOptions but that did not work.

 Any idea on how I should do this?

 --
 Chen Song






 --
 Chen Song




Last step of processing is using too much memory.

2014-07-17 Thread Roch Denis
Hello,

I have an issue where my spark code is using too much memory in the final
step ( a count for testing purpose, it will write the result to a db when it
works ). I'm really not too sure how I can break down the last step to use
less RAM.

So, basically my data is log lines and each log line has a session id. I
want to group by session to reconstruct the events of a session for BI
purposes.

So my steps are:

-Load the loglines
-Do a map to create a K,V for each log line
-Do a groupByKey.
-Do a final map on the log lines to rebuild my session.
-Do a count to trigger everything.

That did not work at all, I let it run for 35 minutes and all it was doing
was disk read/write and all the cpu were blocked on IO wait and I have 1%
free Mem.

So, I thought that I could help by reading my log lines in chunks of 1 200
000 lines and THEN doing a groupByKey on that subset. After everything was
done, I would just combine all my rdd with + and do a final groupByKey
pass. The result is still the same, heavy disk swapping, 1% memory left and
all the CPU are doing io wait.

It looks like:
-Load subset
-Do a map to create a K,V for each log line
-Do a groupByKey.
-Add all the subset rdd together.
-Do a final groupByKey.
-Do a count.

I can post the code if it would help but there's a lot of code confusing the
issue that's used to extract the logs from mongodb with a flatmap.


This is the memory usage of each process, it's an issue because I have 12GB
of RAM on that machine:
 VIRTRESSHR S  %CPU TIME+ COMMAND
 3378712 2.646g700 D   0.3   0:21.30 python
 3377568 2.566g700 D   0.0   0:20.80 python
 3374984 2.485g700 D   0.0   0:20.29 python
 3375588 2.449g700 D   0.3   0:20.62 python
 3495560 206908   3920 S   1.3   0:45.36 java

If I look at the swap space with free, same thing, there's no memory left
to swap out from buffer/cache

  total   used   free sharedbuffers cached
Mem:  12305524   12159320 146204 20   1072  29036
-/+ buffers/cache:   12129212 176312
Swap:  585727638852961971980


In the screenshot below, you can see the step where it's stuck at. The
substep are groups of 4 because I break down each sub chunk into blocks of
4.
http://apache-spark-user-list.1001560.n3.nabble.com/file/n10134/issue.png 




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Last-step-of-processing-is-using-too-much-memory-tp10134.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


spark1.0.1 spark sql error java.lang.NoClassDefFoundError: Could not initialize class $line11.$read$

2014-07-17 Thread Victor Sheng
when I run a query to a hadoop file.
mobile.registerAsTable(mobile)
val count = sqlContext.sql(select count(1) from mobile)
res5: org.apache.spark.sql.SchemaRDD = 
SchemaRDD[21] at RDD at SchemaRDD.scala:100
== Query Plan ==
ExistingRdd [data_date#0,mobile#1,create_time#2], MapPartitionsRDD[4] at
mapPartitions at basicOperators.scala:176

when I run collect.
count.collect()

It throws exceptions, Can anyone help me ?

Job aborted due to stage failure: Task 3.0:22 failed 4 times, most recent
failure: Exception failure in TID 153 on host wh-8-210:
java.lang.NoClassDefFoundError: Could not initialize class $line11.$read$
$line12.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(console:19)
$line12.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(console:19)
scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
scala.collection.Iterator$$anon$1.next(Iterator.scala:853)
scala.collection.Iterator$$anon$1.head(Iterator.scala:840)
org.apache.spark.sql.execution.ExistingRdd$$anonfun$productToRowRdd$1.apply(basicOperators.scala:181)
org.apache.spark.sql.execution.ExistingRdd$$anonfun$productToRowRdd$1.apply(basicOperators.scala:176)
org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:559)
org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:559)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
org.apache.spark.scheduler.Task.run(Task.scala:51)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
java.lang.Thread.run(Thread.java:722) Driver stacktrace:



java.lang.ExceptionInInitializerError
at $line11.$read$$iwC.init(console:6)
at $line11.$read.init(console:26)
at $line11.$read$.init(console:30)
at $line11.$read$.clinit(console)
at $line12.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(console:19)
at $line12.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(console:19)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$1.next(Iterator.scala:853)
at scala.collection.Iterator$$anon$1.head(Iterator.scala:840)
at
org.apache.spark.sql.execution.ExistingRdd$$anonfun$productToRowRdd$1.apply(basicOperators.scala:181)
at
org.apache.spark.sql.execution.ExistingRdd$$anonfun$productToRowRdd$1.apply(basicOperators.scala:176)
at org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:559)
at org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:559)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
at org.apache.spark.scheduler.Task.run(Task.scala:51)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
at java.lang.Thread.run(Thread.java:722)


My classpath is :
/app/hadoop/spark-1.0.1/assembly/target/scala-2.10/spark-assembly-1.0.1-hadoop0.20.2-cdh3u5.jar
System Classpath
/app/hadoop/spark-1.0.1/confSystem Classpath
/app/hadoop/spark-1.0.1/lib_managed/jars/JavaEWAH-0.3.2.jar System Classpath
/app/hadoop/spark-1.0.1/lib_managed/jars/JavaEWAH-0.6.6.jar System Classpath
/app/hadoop/spark-1.0.1/lib_managed/jars/ST4-4.0.4.jar  System Classpath
/app/hadoop/spark-1.0.1/lib_managed/jars/activation-1.1.jar System Classpath
/app/hadoop/spark-1.0.1/lib_managed/jars/akka-actor_2.10-2.2.3-shaded-protobuf.jar
System Classpath
/app/hadoop/spark-1.0.1/lib_managed/jars/algebird-core_2.10-0.1.11.jar
System Classpath

  1   2   >