Re: Spark Streaming data checkpoint performance

2015-11-07 Thread trung kien
Hmm,

Seems it just do a trick.
Using this method, it's very hard to recovery from failure, since we don't
know which batch have been done.

I really want to maintain the whole running stats in memory to archive full
failure-tolerant.

I just wonder if the performance of data checkpoint is that bad? or I
misses something in my setup?

30 seconds for data checkpoint of 1M keys is too much for me.


On Sat, Nov 7, 2015 at 1:25 PM, Aniket Bhatnagar  wrote:

> It depends on the stats you are collecting. For example, if you just
> collecting counts, you can do away with updateStateByKey completely by
> doing insert or update operation on the data store after reduce. I.e.
>
> For each (key, batchCount)
>   if (key exists in dataStore)
> update count = count + batchCount for the key
>  else
> insert (key, batchCount)
>
> Thanks,
> Aniket
>
> On Sat, Nov 7, 2015 at 11:38 AM Thúy Hằng Lê 
> wrote:
>
>> Thanks Aniket,
>>
>> I want to store the state to an external storage but it should be in
>> later step I think.
>> Basically, I have to use updateStateByKey function to maintain the
>> running state (which requires checkpoint), and my bottleneck is now in data
>> checkpoint.
>>
>> My pseudo code is like below:
>>
>> JavaStreamingContext jssc = new JavaStreamingContext(
>> sparkConf,Durations.seconds(2));
>> jssc.checkpoint("spark-data/checkpoint");
>> JavaPairInputDStream messages =
>> KafkaUtils.createDirectStream(...);
>> JavaPairDStream stats =
>> messages.mapToPair(parseJson)
>> .reduceByKey(REDUCE_STATS)
>> .updateStateByKey(RUNNING_STATS);
>>
>>JavaPairDStream newData =
>> stages.filter(NEW_STATS);
>>
>>newData.foreachRDD{
>>  rdd.forEachPartition{
>>//Store to external storage.
>>  }
>>   }
>>
>>   Without using updateStageByKey, I'm only have the stats of the last
>> micro-batch.
>>
>> Any advise on this?
>>
>>
>> 2015-11-07 11:35 GMT+07:00 Aniket Bhatnagar :
>>
>>> Can you try storing the state (word count) in an external key value
>>> store?
>>>
>>> On Sat, Nov 7, 2015, 8:40 AM Thúy Hằng Lê  wrote:
>>>
 Hi all,

 Anyone could help me on this. It's a bit urgent for me on this.
 I'm very confused and curious about Spark data checkpoint performance?
 Is there any detail implementation of checkpoint I can look into?
 Spark Streaming only take sub-second to process 20K messages/sec,
 however it take 25 seconds for checkpoint. Now my application have average
 30 seconds latency and keep increasingly.


 2015-11-06 11:11 GMT+07:00 Thúy Hằng Lê :

> Thankd all, it would be great to have this feature soon.
> Do you know what's the release plan for 1.6?
>
> In addition to this, I still have checkpoint performance problem
>
> My code is just simple like this:
> JavaStreamingContext jssc = new
> JavaStreamingContext(sparkConf,Durations.seconds(2));
> jssc.checkpoint("spark-data/checkpoint");
> JavaPairInputDStream messages =
> KafkaUtils.createDirectStream(...);
> JavaPairDStream stats =
> messages.mapToPair(parseJson)
> .reduceByKey(REDUCE_STATS)
> .updateStateByKey(RUNNING_STATS);
>
> stats.print()
>
>   Now I need to maintain about 800k keys, the stats here is only count
> number of occurence for key.
>   While running the cache dir is very small (about 50M), my question
> is:
>
>   1/ For regular micro-batch it takes about 800ms to finish, but every
> 10 seconds when data checkpoint is running
>   It took me 5 seconds to finish the same size micro-batch, why it's
> too high? what's kind of job in checkpoint?
>   why it's keep increasing?
>
>   2/ When I changes the data checkpoint interval like using:
>   stats.checkpoint(Durations.seconds(100)); //change to 100,
> defaults is 10
>
>   The checkpoint is keep increasing significantly first checkpoint is
> 10s, second is 30s, third is 70s ... and keep increasing :)
>   Why it's too high when increasing checkpoint interval?
>
> It seems that default interval works more stable.
>
> On Nov 4, 2015 9:08 PM, "Adrian Tanase"  wrote:
>
>> Nice! Thanks for sharing, I wasn’t aware of the new API.
>>
>> Left some comments on the JIRA and design doc.
>>
>> -adrian
>>
>> From: Shixiong Zhu
>> Date: Tuesday, November 3, 2015 at 3:32 AM
>> To: Thúy Hằng Lê
>> Cc: Adrian Tanase, "user@spark.apache.org"
>> Subject: Re: Spark Streaming data checkpoint performance
>>
>> "trackStateByKey" is about to be added in 1.6 

Re: Checkpoint not working after driver restart

2015-11-07 Thread vimal dinakaran
I am pasting the code here . Please let me know if there is any sequence
that is wrong.


def createContext(checkpointDirectory: String, config: Config):
StreamingContext = {
println("Creating new context")

val conf = new
SparkConf(true).setAppName(appName).set("spark.streaming.unpersist","true")

val ssc = new StreamingContext(conf,
Seconds(config.getInt(batchIntervalParam)))
ssc.checkpoint(checkpointDirectory)
val isValid = validate(ssc, config)

if (isValid) {
  val result = runJob(ssc, config)
  println("result is " + result)
} else {
  println(isValid.toString)
}

ssc
 }

  def main(args: Array[String]): Unit = {

if (args.length < 1) {
  println("Must specify the path to config file ")
  println("Usage progname  ")
  return
}
val url = args(0)
logger.info("Starting " + appName)
println("Got the path as %s".format(url))
val source = scala.io.Source.fromFile(url)
val lines = try source.mkString finally source.close()
val config = ConfigFactory.parseString(lines)
val directoryPath = config.getString(checkPointParam)

val ssc = StreamingContext.getOrCreate(directoryPath, () => {
  createContext(directoryPath,config)
})

ssc.start()
ssc.awaitTermination()
  }


  def getRabbitMQStream(config: Config, ssc: StreamingContext):
ReceiverInputDStream[String] = {
val rabbitMQHost = config.getString(rabbitmqHostParam)
val rabbitMQPort = config.getInt(rabbitmqPortParam)
val rabbitMQQueue = config.getString(rabbitmqQueueNameParam)
println("changing the memory lvel")
val receiverStream: ReceiverInputDStream[String] = {
  RabbitMQUtils.createStreamFromAQueue(ssc, rabbitMQHost,
rabbitMQPort, rabbitMQQueue,StorageLevel.MEMORY_AND_DISK_SER)
}
receiverStream.start()
receiverStream
  }

  def getBaseDstream(config: Config, ssc: StreamingContext):
ReceiverInputDStream[String] = {
val baseDstream = config.getString(receiverTypeParam) match {
  case "rabbitmq" => getRabbitMQStream(config, ssc)
}
baseDstream
  }

  def runJob(ssc: StreamingContext, config: Config): Any = {

val keyspace = config.getString(keyspaceParam)
val clientStatsTable = config.getString(clientStatsTableParam)
val hourlyStatsTable = config.getString(hourlyStatsTableParam)
val batchInterval = config.getInt(batchIntervalParam)
val windowInterval = config.getInt(windowIntervalParam)
val hourlyInterval = config.getInt(hourlyParam)
val limit = config.getInt(limitParam)

val lines = getBaseDstream(config, ssc)
val statsRDD =
lines.filter(_.contains("client_stats")).map(_.split(",")(1))

val parserFunc = getProtobufParserFunction()
val clientUsageRDD: DStream[((String, String), Double)] =
statsRDD.flatMap(x => parserFunc(x))
val formatterFunc = getJsonFormatterFunc()
val oneMinuteWindowResult = clientUsageRDD.reduceByKeyAndWindow((x:
Double, y: Double) => x + y, Seconds(windowInterval),
Seconds(batchInterval))
  .map(x => ((x._1._2), ArrayBuffer((x._1._1, x._2
  .reduceByKey((x, y) => (x ++ y))
  .mapValues(x => (x.toList.sortBy(x => -x._2).take(limit)))

println("Client Usage from rabbitmq ")
oneMinuteWindowResult.map(x => (x._1, DateTime.now,
formatterFunc(x._2))).saveToCassandra(keyspace, clientStatsTable)
oneMinuteWindowResult.print()

val HourlyResult = clientUsageRDD.reduceByKeyAndWindow((x: Double,
y: Double) => x + y, Seconds(hourlyInterval), Seconds(batchInterval))
  .map(x => ((x._1._2), ArrayBuffer((x._1._1, x._2
  .reduceByKey((x, y) => (x ++ y))
  .mapValues(x => (x.toList.sortBy(x => -x._2).take(limit)))

HourlyResult.map(x => (x._1, DateTime.now,
formatterFunc(x._2))).saveToCassandra(keyspace, hourlyStatsTable)
HourlyResult.map(x => (x, "hourly")).print()

  }
}


On Wed, Nov 4, 2015 at 12:27 PM, vimal dinakaran 
wrote:

> I have a simple spark streaming application which reads the data from the
> rabbitMQ
>  and does some aggregation on window interval of  1 min and 1 hour for
> batch interval of 30s.
>
>  I have a three node setup. And to enable checkpoint,
>  I have mounted the same directory using sshfs to all worker node for
> creating checkpoint.
>
>  When I run the spark streaming App for the first time it works fine .
>  I could see the results being printed on console and some checkpoints
> happening in the network directory.
>
>  But when I run the job for the second time , it fails with the following
> exception
>
> at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)
> at
> 

RE: [sparkR] Any insight on java.lang.OutOfMemoryError: GC overhead limit exceeded

2015-11-07 Thread Sun, Rui
This is probably because your config option actually do not take effect. Please 
refer to the email thread titled “How to set memory for SparkR with 
master="local[*]"”, which may answer you.

I recommend you to try to use SparkR built from the master branch, which 
contains two fixes that may help you in your use case:
https://issues.apache.org/jira/browse/SPARK-11340
https://issues.apache.org/jira/browse/SPARK-11258

BTW, it seems that there is a config conflict in your settings?
spark.driver.memory="30g",
spark.driver.extraJavaOptions="-Xms5g -Xmx5g


From: Dhaval Patel [mailto:dhaval1...@gmail.com]
Sent: Saturday, November 7, 2015 12:26 AM
To: Spark User Group
Subject: [sparkR] Any insight on java.lang.OutOfMemoryError: GC overhead limit 
exceeded

I have been struggling through this error since past 3 days and have tried all 
possible ways/suggestions people have provided on stackoverflow and here in 
this group.

I am trying to read a parquet file using sparkR and convert it into an R 
dataframe for further usage. The file size is not that big, ~4G and 250 mil 
records.

My standalone cluster has more than enough memory and processing power : 24 
core, 128 GB RAM. Here is configuration to give an idea:

Tried this on both spark 1.4.1 and 1.5.1.  I have attached both stack 
traces/logs. Parquet file has 24 partitions.

spark.default.confs=list(spark.cores.max="24",
 spark.executor.memory="50g",
 spark.driver.memory="30g",
 spark.driver.extraJavaOptions="-Xms5g -Xmx5g 
-XX:MaxPermSize=1024M")
sc <- sparkR.init(master="local[24]",sparkEnvir = spark.default.confs)
...
 reading parquet file and storing in R dataframe
med.Rdf <- collect(mednew.DF)




--- Begin Message ---
Hi, Matej,

For the convenience of SparkR users, when they start SparkR without using 
bin/sparkR, (for example, in RStudio), 
https://issues.apache.org/jira/browse/SPARK-11340 enables setting of 
“spark.driver.memory”, (also other similar options, like: 
spark.driver.extraClassPath, spark.driver.extraJavaOptions, 
spark.driver.extraLibraryPath) in the sparkEnvir parameter for sparkR.init() to 
take effect.

Would you like to give it a try? Note the change is on the master branch, you 
have to build Spark from source before using it.


From: Sun, Rui [mailto:rui@intel.com]
Sent: Monday, October 26, 2015 10:24 AM
To: Dirceu Semighini Filho
Cc: user
Subject: RE: How to set memory for SparkR with master="local[*]"

As documented in 
http://spark.apache.org/docs/latest/configuration.html#available-properties,
Note for “spark.driver.memory”:
Note: In client mode, this config must not be set through the SparkConf 
directly in your application, because the driver JVM has already started at 
that point. Instead, please set this through the --driver-memory command line 
option or in your default properties file.

If you are to start a SparkR shell using bin/sparkR, then you can use 
bin/sparkR –driver-memory. You have no chance to set the driver memory size 
after the R shell has been launched via bin/sparkR.

Buf if you are to start a SparkR shell manually without using bin/sparkR (for 
example, in Rstudio), you can:
library(SparkR)
Sys.setenv("SPARKR_SUBMIT_ARGS" = "--conf spark.driver.memory=2g sparkr-shell")
sc <- sparkR.init()

From: Dirceu Semighini Filho [mailto:dirceu.semigh...@gmail.com]
Sent: Friday, October 23, 2015 7:53 PM
Cc: user
Subject: Re: How to set memory for SparkR with master="local[*]"

Hi Matej,
I'm also using this and I'm having the same behavior here, my driver has only 
530mb which is the default value.

Maybe this is a bug.

2015-10-23 9:43 GMT-02:00 Matej Holec 
>:
Hello!

How to adjust the memory settings properly for SparkR with master="local[*]"
in R?


*When running from  R -- SparkR doesn't accept memory settings :(*

I use the following commands:

R>  library(SparkR)
R>  sc <- sparkR.init(master = "local[*]", sparkEnvir =
list(spark.driver.memory = "5g"))

Despite the variable spark.driver.memory is correctly set (checked in
http://node:4040/environment/), the driver has only the default amount of
memory allocated (Storage Memory 530.3 MB).

*But when running from  spark-1.5.1-bin-hadoop2.6/bin/sparkR -- OK*

The following command:

]$ spark-1.5.1-bin-hadoop2.6/bin/sparkR --driver-memory 5g

creates SparkR session with properly adjustest driver memory (Storage Memory
2.6 GB).


Any suggestion?

Thanks
Matej



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-set-memory-for-SparkR-with-master-local-tp25178.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.org
For additional commands, e-mail: 

streaming: missing data. does saveAsTextFile() append or replace?

2015-11-07 Thread Andy Davidson
Hi

I just started a new spark streaming project. In this phase of the system
all we want to do is save the data we received to hdfs. I after running for
a couple of days it looks like I am missing a lot of data. I wonder if
saveAsTextFile("hdfs:///rawSteamingData²); is overwriting the data I capture
in previous window? I noticed that after running for a couple of days  my
hdfs file system has 25 file. The names are something like ³part-6². I
used 'hadoop fs ­dus¹ to check the total data captured. While the system was
running I would periodically call Œdus¹ I was surprised sometimes the
numbers of total bytes actually dropped.


Is there a better way to save write my data to disk?

Any suggestions would be appreciated

Andy


   public static void main(String[] args) {

  SparkConf conf = new SparkConf().setAppName(appName);

JavaSparkContext jsc = new JavaSparkContext(conf);

JavaStreamingContext ssc = new JavaStreamingContext(jsc, new
Duration(5 * 1000));



[ deleted code Š]



data.foreachRDD(new Function(){

private static final long serialVersionUID =
-7957854392903581284L;



@Override

public Void call(JavaRDD jsonStr) throws Exception {

jsonStr.saveAsTextFile("hdfs:///rawSteamingData²); //
/rawSteamingData is a directory

return null;

}  

});



ssc.checkpoint(checkPointUri);



ssc.start();

ssc.awaitTermination();

}




sqlCtx.sql('some_hive_table') works in pyspark but not spark-submit

2015-11-07 Thread YaoPau
Within a pyspark shell, both of these work for me:

print hc.sql("SELECT * from raw.location_tbl LIMIT 10").collect()
print sqlCtx.sql("SELECT * from raw.location_tbl LIMIT 10").collect()

But when I submit both of those in batch mode (hc and sqlCtx both exist), I
get the following error.  Why is this happening?  I'll note that I'm running
on YARN (CDH) and connecting to the Hive Metastore by setting an environment
variable with export HADOOP_CONF_DIR=/etc/hive/conf/

An error occurred while calling o39.sql.
: java.lang.RuntimeException: Table Not Found: raw.location_tbl
at scala.sys.package$.error(package.scala:27)
at
org.apache.spark.sql.catalyst.analysis.SimpleCatalog$$anonfun$1.apply(Catalog.scala:111)
at
org.apache.spark.sql.catalyst.analysis.SimpleCatalog$$anonfun$1.apply(Catalog.scala:111)
at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
at scala.collection.AbstractMap.getOrElse(Map.scala:58)
at
org.apache.spark.sql.catalyst.analysis.SimpleCatalog.lookupRelation(Catalog.scala:111)
at
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.getTable(Analyzer.scala:175)
at
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$6.applyOrElse(Analyzer.scala:187)
at
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$6.applyOrElse(Analyzer.scala:182)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:187)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:187)
at
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:50)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:186)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:207)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at 
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformChildrenDown(TreeNode.scala:236)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:192)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:207)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at 
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformChildrenDown(TreeNode.scala:236)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:192)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:177)
at
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:182)
at
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:172)
at
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:61)
at
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:59)
at
scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111)
at scala.collection.immutable.List.foldLeft(List.scala:84)
at

Re: bug: can not run Ipython notebook on cluster

2015-11-07 Thread Andy Davidson
What a BEAR! The following recipe worked for me. (took a couple of days
hacking).

I hope this improves the out of the box experience for others

Andy

My test program is now

In [1]:
from pyspark import SparkContext
textFile = sc.textFile("file:///home/ec2-user/dataScience/readme.md")
In [2]:
print("hello world²)

hello world
In [3]:
textFile.take(3)

Out[3]:
[' hello world', '']


Installation instructions

1. Ssh to cluster master
2. Sudo su
3. install python3.4 on all machines
```
yum install -y python34
bash-4.2# which python3
/usr/bin/python3

pssh -h /root/spark-ec2/slaves yum install -y python34

```

4. Install pip on all machines


```
yum list available |grep pip
yum install -y python34-pip

find /usr/bin -name "*pip*" -print
/usr/bin/pip-3.4

pssh -h /root/spark-ec2/slaves yum install -y python34-pip
```

5. install python on master

```
/usr/bin/pip-3.4 install ipython

pssh -h /root/spark-ec2/slaves /usr/bin/pip-3.4 install python
```

6. Install python develop stuff and jupiter on master
```
yum install -y python34-devel
/usr/bin/pip-3.4 install jupyter
```

7. Set up update spark-env.sh on all machine so by default we use python3.4

```
cd /root/spark/conf
printf "\n# Set Spark Python version\nexport PYSPARK_PYTHON=python3.4\n" >>
/root/spark/conf/spark-env.sh
for i in `cat slaves` ; do scp spark-env.sh
root@$i:/root/spark/conf/spark-env.sh; done
```

8. Restart cluster

```
/root/spark/sbin/stop-all.sh
/root/spark/sbin/start-all.sh
```

Running ipython notebook

1. set up an ssh tunnel on your local machine
ssh -i $KEY_FILE -N -f -L localhost::localhost:7000
ec2-user@$SPARK_MASTER

2. Log on to cluster master and start ipython notebook server

```
export PYSPARK_PYTHON=python3.4

export PYSPARK_DRIVER_PYTHON=python3.4

export IPYTHON_OPTS="notebook --no-browser --port=7000"

$SPARK_ROOT/bin/pyspark --master local[2]

```

3. On your local machine open http://localhost:



From:  Andrew Davidson 
Date:  Friday, November 6, 2015 at 2:18 PM
To:  "user @spark" 
Subject:  bug: can not run Ipython notebook on cluster

> Does anyone use iPython notebooks?
> 
> I am able to use it on my local machine with spark how ever I can not get it
> work on my cluster.
> 
> 
> For unknown reason on my cluster I have to manually create the spark context.
> My test code generated this exception
> 
> Exception: Python in worker has different version 2.7 than that in driver 2.6,
> PySpark cannot run with different minor versions
> 
> On my mac I can solve the exception problem by setting
> 
> export PYSPARK_PYTHON=python3
> 
> export PYSPARK_DRIVER_PYTHON=python3
> 
> IPYTHON_OPTS=notebook $SPARK_ROOT/bin/pyspark
> 
> 
> 
> On my cluster I set the values to python2.7. And PYTHON_OPTS=³notebook
> ‹no-browser ‹port=7000² . I connect using a ssh tunnel from my local machine.
> 
> 
> 
> I also tried installing python 3 , pip, ipython, and jupyter in/on my cluster
> 
> 
> 
> I tried adding export PYSPARK_PYTHON=python2.7 to the
> /root/spark/conf/spark-env.sh on all my machines
> 
> 
> from pyspark import SparkContext
> textFile = sc.textFile("file:///home/ec2-user/dataScience/readme.md")
> textFile.take(3
> 
> 
> In [1]:
> from pyspark import SparkContext
> sc = SparkContext("local", "Simple App")
> textFile = sc.textFile("file:///home/ec2-user/dataScience/readme.md")
> textFile.take(3)
> ---Py4
> JJavaError Traceback (most recent call last)
>  in ()  2 sc = SparkContext("local",
> "Simple App")  3 textFile =
> sc.textFile("file:///home/ec2-user/dataScience/readme.md")> 4
> textFile.take(3)/root/spark/python/pyspark/rdd.py in take(self, num)   1297
> 1298 p = range(partsScanned, min(partsScanned + numPartsToTry,
> totalParts))-> 1299 res = self.context.runJob(self,
> takeUpToNumLeft, p)   13001301 items +=
> res/root/spark/python/pyspark/context.py in runJob(self, rdd, partitionFunc,
> partitions, allowLocal)914 # SparkContext#runJob.915
> mappedRDD = rdd.mapPartitions(partitionFunc)--> 916 port =
> self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)917
> return list(_load_from_socket(port, mappedRDD._jrdd_deserializer))918
> /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in
> __call__(self, *args)536 answer =
> self.gateway_client.send_command(command)537 return_value =
> get_return_value(answer, self.gateway_client,
> --> 538 self.target_id, self.name)
> 539 540 for temp_arg in
> temp_args:/root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in
> get_return_value(answer, gateway_client, target_id, name)298
> raise Py4JJavaError(
> 299 'An error occurred while calling {0}{1}{2}.\n'.-->
> 300 format(target_id, '.', name), 

Spark Job failing with exit status 15

2015-11-07 Thread Shashi Vishwakarma
I am trying to run simple word count job in spark but I am getting
exception while running job.

For more detailed output, check application tracking
page:http://quickstart.cloudera:8088/proxy/application_1446699275562_0006/Then,
click on links to logs of each attempt.Diagnostics: Exception from
container-launch.Container id:
container_1446699275562_0006_02_01Exit code: 15Stack trace:
ExitCodeException exitCode=15:
at org.apache.hadoop.util.Shell.runCommand(Shell.java:538)
at org.apache.hadoop.util.Shell.run(Shell.java:455)
at 
org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:715)
at 
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:211)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

Container exited with a non-zero exit code 15Failing this attempt.
Failing the application.
 ApplicationMaster host: N/A
 ApplicationMaster RPC port: -1
 queue: root.cloudera
 start time: 1446910483956
 final status: FAILED
 tracking URL:
http://quickstart.cloudera:8088/cluster/app/application_1446699275562_0006
 user: clouderaException in thread "main"
org.apache.spark.SparkException: Application finished with failed
status
at org.apache.spark.deploy.yarn.Client.run(Client.scala:626)
at org.apache.spark.deploy.yarn.Client$.main(Client.scala:651)
at org.apache.spark.deploy.yarn.Client.main(Client.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
at 
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

I checked log from following command

yarn logs -applicationId application_1446699275562_0006

Here is log

 15/11/07 07:35:09 ERROR yarn.ApplicationMaster: User class threw
exception: Output directory
hdfs://quickstart.cloudera:8020/user/cloudera/WordCountOutput already
exists
org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory
hdfs://quickstart.cloudera:8020/user/cloudera/WordCountOutput already
exists
at 
org.apache.hadoop.mapred.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:132)
at 
org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1053)
at 
org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:954)
at 
org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:863)
at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1290)
at org.com.td.sparkdemo.spark.WordCount$.main(WordCount.scala:23)
at org.com.td.sparkdemo.spark.WordCount.main(WordCount.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:480)15/11/07
07:35:09 INFO yarn.ApplicationMaster: Final app status: FAILED,
exitCode: 15, (reason: User class threw exception: Output directory
hdfs://quickstart.cloudera:8020/user/cloudera/WordCountOutput already
exists)15/11/07 07:35:14 ERROR yarn.ApplicationMaster: SparkContext
did not initialize after waiting for 10 ms. Please check earlier
log output for errors. Failing the application.

Exception clearly indicates that WordCountOutput directory already exists
but I made sure that directory is not there before running job.

Why I am getting this error even though directory was not there before
running my job?


Re: Checkpointing an InputDStream from Kafka

2015-11-07 Thread Sandip Mehta
I believe you’ll have to use another way of creating StreamingContext by 
passing create function in getOrCreate function.

private def setupSparkContext(): StreamingContext = {
  val streamingSparkContext = {
val sparkConf = new 
SparkConf().setAppName(config.appName).setMaster(config.master)
new StreamingContext(sparkConf, config.batchInterval)
  }
  streamingSparkContext.checkpoint(config.checkpointDir)
  streamingSparkContext
}

….
val ssc = StreamingContext.getOrCreate(config.checkpointDir, setupSparkContext)

Javadoc for getOrCreate

/**
 * Either recreate a StreamingContext from checkpoint data or create a new 
StreamingContext.
 * If checkpoint data exists in the provided `checkpointPath`, then 
StreamingContext will be
 * recreated from the checkpoint data. If the data does not exist, then the 
StreamingContext
 * will be created by called the provided `creatingFunc`.
 *
 * @param checkpointPath Checkpoint directory used in an earlier 
StreamingContext program
 * @param creatingFunc   Function to create a new StreamingContext
 * @param hadoopConf Optional Hadoop configuration if necessary for reading 
from the
 *   file system
 * @param createOnError  Optional, whether to create a new StreamingContext if 
there is an
 *   error in reading checkpoint data. By default, an 
exception will be
 *   thrown on error.
 */

Hope this helps!

SM



> On 06-Nov-2015, at 8:19 PM, Cody Koeninger  wrote:
> 
> Have you looked at the driver and executor logs?
> 
> Without being able to see what's in the "do stuff with the dstream" section 
> of code... I'd suggest starting with a simpler job, e.g that does nothing but 
> print each message, and verify whether it checkpoints
> 
> On Fri, Nov 6, 2015 at 3:59 AM, Kathi Stutz  > wrote:
> Hi all,
> 
> I want to load an InputDStream from a checkkpoint, but I doesn't work, and
> after trying several things I have finally run out of ideas.
> 
> So, here's what I do:
> 
> 1. I create the streaming context - or load it from the checkpoint directory.
> 
>   def main(args: Array[String]) {
> val ssc = StreamingContext.getOrCreate("files/checkpoint",
> createStreamingContext _)
> ssc.start()
> ssc.awaitTermination()
>   }
> 
> 2. In the function createStreamingContext(), I first create a new Spark
> config...
> 
>   def createStreamingContext(): StreamingContext = {
> println("New Context")
> 
> val conf = new SparkConf()
>   .setMaster("local[2]")
>   .setAppName("CheckpointTest")
>   .set("spark.streaming.kafka.maxRatePerPartition", "1")
> 
> //...then I create the streaming context...
> val ssc = new StreamingContext(conf, Seconds(1))
> 
> var offsetRanges = Array[OffsetRange]()
> val kafkaParams = Map("metadata.broker.list" ->
> "sandbox.hortonworks.com:6667 ",
>   "auto.offset.reset" -> "smallest") //Start from beginning
> val kafkaTopics = Set("Bla")
> 
> //...then I go and get a DStream from Kafka...
> val directKafkaStream = KafkaUtils.createDirectStream[String,
> Array[Byte], StringDecoder, DefaultDecoder](ssc,
> kafkaParams, kafkaTopics)
> 
> //...I do stuff with the DStream
> ...
> 
> //...and finally I checkpoint the streaming context and return it
> ssc.checkpoint("files/checkpoint")
> ssc
> }
> 
> 3. When I start the application, after a while it creates in
> files/checkpoint/ an empty directory with a name like
> 23207ed2-c021-4a1d-8af8-0620a19a8665. But that's all, no more files or
> directories or whatever appear there.
> 
> 4. When I stop the application and restart it, it creates a new streaming
> context each time. (This also means it starts the Kafka streaming from the
> smallest available offset again and again. The main reason for using
> checkpoints for me was to not having to keep track of Kafka offsets.)
> 
> So, what am I doing wrong?
> 
> Thanks a lot!
> 
> Kathi
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> 
> For additional commands, e-mail: user-h...@spark.apache.org 
> 
> 
> 



Re: Whether Spark is appropriate for our use case.

2015-11-07 Thread Igor Berman
1. if you have join by some specific field(e.g. user id or account-id or
whatever) you may try to partition parquet file by this field and then join
will be more efficient.
2. you need to see in spark metrics what is performance of particular join,
how much partitions is there, what is shuffle size...in general - tune for
the shuffle performance(e.g. shuffle fraction)

On 21 October 2015 at 00:29, Aliaksei Tsyvunchyk 
wrote:

> Hello all community members,
>
> I need opinion of people who was using Spark before and can share there
> experience to help me select technical approach.
> I have a project in Proof Of Concept phase, where we are evaluating
> possibility of Spark usage for our use case.
> Here is brief task description.
> We should process big amount of raw data to calculate ratings. We have
> different type of textual source data. This is just text lines which
> represents different type of input data (we call them type 20, type 24,
> type 26, type 33, etc).
> To perform calculations we should make joins between diffrerent type of
> raw data - event records (which represents actual user action) and users
> description records (which represents person which performs action) and
> sometimes with userGroup record (we group all users by some criteria).
> All ratings are calculated on daily basis and our dataset could be
> partitioned by date (except probably reference data).
>
>
> So we have tried to implement it using possibly most obvious way, we parse
> text file, store data in parquet format and trying to use sparkSQL to query
> data and perform calculation.
> Experimenting with sparkSQL I’ve noticed that SQL query speed decreased
> proportionally to data size growth. Base on this I assume that SparkSQL
> performs full records scan while servicing my SQL queries.
>
> So here are the questions I’m trying to find answers:
> 1.  Is parquet format appropriate for storing data in our case (to
> efficiently query data)? Could it be more suitable to have some DB as
> storage which could filter data efficiently before it gets to Spark
> processing engine ?
> 2.  For now we assume that joins we are doing for calculations slowing
> down execution. As alternatives we consider denormalizing data and join it
> on parsing phase, but this increase data volume Spark should handle (due to
> the duplicates we will get). Is it valid approach? Would it be better if we
> create 2 RDD, from Parquet files filter them out and next join without
> sparkSQL involvement?  Or joins in SparkSQL are fine and we should look for
> performance bottlenecks in different place?
> 3.  Should we look closer on Cloudera Impala? As I know it is working over
> the same parquet files and I’m wondering whether it gives better
> performance for data querying ?
> 4.  90% of results we need could be pre-calculated since they are not
> change after one day of data is loaded. So I think it makes sense to keep
> this pre-calculated data in some DB storage which give me best performance
> while querying by key. Now I’m consider to use Cassandra for this purpose
> due to it’s perfect scalability and performance. Could somebody provide any
> other options we can consider ?
>
> Thanks in Advance,
> Any opinion will be helpful and greatly appreciated
> --
>
>
> CONFIDENTIALITY NOTICE: This email and files attached to it are
> confidential. If you are not the intended recipient you are hereby notified
> that using, copying, distributing or taking any action in reliance on the
> contents of this information is strictly prohibited. If you have received
> this email in error please notify the sender and delete this email.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark Streaming : minimum cores for a Receiver

2015-11-07 Thread Gideon
I'm not a Spark expert but:

What Spark does is run receivers in the executors. 
These receivers are a long-running task, each receiver occupies 1 core in
your executor, if an executor has more cores than receivers it can also
process (at least some of) the data that it is receiving. 

So, enough cores basically means allowing executors to process the data as
well as receiving it by giving each executor more cores than receivers (at
least 1 more than the number of receivers used by the executor). By allowing
the same executor to process the received data you're also avoiding (again
at least to some extent) moving the data inside the cluster which is
generally a good thing




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-minimum-cores-for-a-Receiver-tp25307p25316.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org