Re: error in running StructuredStreaming-Kafka integration code (Spark 2.x & Kafka 10)

2017-07-10 Thread David Newberger
Karen,

It looks like the Kafka version is incorrect. You mention Kafka 0.10
however the classpath references Kafka 0.9

Thanks,

David

On July 10, 2017 at 1:44:06 PM, karan alang (karan.al...@gmail.com) wrote:

Hi All,

I'm running Spark Streaming - Kafka integration using Spark 2.x & Kafka 10.
& seems to be running into issues.

I compiled the program using sbt, and the compilation went through fine.
I was able able to import this into Eclipse & run the program from Eclipse.

However, when i run the program using spark-submit, i'm getting the
following error :

--

>  $SPARK_HOME/bin/spark-submit --class 
> "structuredStreaming.kafka.StructuredKafkaWordCount1"
> --master local[2] /Users/karanalang/Documents/Te
> chnology/Coursera_spark_scala/structuredStreamingKafka/targe
> t/scala-2.11/StructuredStreamingKafka-assembly-1.0.jar



> *java.lang.ClassNotFoundException:
> structuredStreaming.kafka.StructuredKafkaWordCount1*
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:348)
> at org.apache.spark.util.Utils$.classForName(Utils.scala:229)
> at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy
> $SparkSubmit$$runMain(SparkSubmit.scala:695)
> at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)
> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>
--

I've put the jar in the classpath, but i still get the error ->

echo $CLASSPATH

.:/Users/karanalang/Documents/Technology/kafka/kafka_2.11-0.
> 9.0.1/lib/jopt-simple-3.2.jar:/Users/karanalang/Documents/Te
> chnology/kafka/kafka_2.11-0.9.0.1/lib/kafka-clients-0.9.0.1.
> jar:/Users/karanalang/Documents/Technology/kafka/kafka_2.11-
> 0.9.0.1/lib/kafka_2.11-0.9.0.1.jar:/Users/karanalang/
> Documents/Technology/kafka/kafka_2.11-0.9.0.1/lib/log4j-
> 1.2.17.jar:/Users/karanalang/Documents/Technology/kafka/
> kafka_2.11-0.9.0.1/lib/metrics-core-2.2.0.jar:/Users/karanalang/Documents/
> Technology/kafka/kafka_2.11-0.9.0.1/lib/scala-library-2.11.
> 7.jar:/Users/karanalang/Documents/Technology/kafka/
> kafka_2.11-0.9.0.1/lib/slf4j-api-1.7.6.jar:/Users/
> karanalang/Documents/Technology/kafka/kafka_2.11-0.9.0.1/
> lib/slf4j-log4j12-1.7.6.jar:/Users/karanalang/Documents/
> Technology/kafka/kafka_2.11-0.9.0.1/lib/snappy-java-1.1.1.7.
> jar:/Users/karanalang/Documents/Technology/kafka/kafka_2.11-0.9.0.1/lib/
> zkclient-0.7.jar:/Users/karanalang/Documents/Technolog
> y/kafka/kafka_2.11-0.9.0.1/lib/zookeeper-3.4.6.jar:/
> Users/karanalang/Documents/Technology/ApacheSpark-v2.1/spark
> -2.1.0-bin-hadoop2.7/jars/*.jar:/Users/karanalang/Document
> s/Technology/kafka/mirrormaker_topic_rename-master/target/
> mmchangetopic-1.0-SNAPSHOT.jar:/Users/karanalang/Documents/Technology/
> *Coursera_spark_scala/structuredStreamingKafka/target/scala-2.11/*
> *StructuredStreamingKafka-assembly-1.0.jar*


When i look inside the jar - *StructuredStreamingKafka-assembly-1.0.jar, i
don't see the file "*StructuredKafkaWordCount1.class"

Attaching my build.sbt.

Any ideas on what i need to do ?












-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org


RE: HBase-Spark Module

2016-07-29 Thread David Newberger
Hi Ben,

This seems more like a question for community.cloudera.com. However, it would 
be in hbase not spark I believe. 

https://repository.cloudera.com/artifactory/webapp/#/artifacts/browse/tree/General/cloudera-release-repo/org/apache/hbase/hbase-spark

David Newberger


-Original Message-
From: Benjamin Kim [mailto:bbuil...@gmail.com] 
Sent: Friday, July 29, 2016 12:57 PM
To: user@spark.apache.org
Subject: HBase-Spark Module

I would like to know if anyone has tried using the hbase-spark module? I tried 
to follow the examples in conjunction with CDH 5.8.0. I cannot find the 
HBaseTableCatalog class in the module or in any of the Spark jars. Can someone 
help?

Thanks,
Ben
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



RE: difference between dataframe and dataframwrite

2016-06-16 Thread David Newberger
DataFrame is a collection of data which is organized into named columns.
DataFrame.write is an interface for saving the contents of a DataFrame to 
external storage.

Hope this helps

David Newberger


From: pseudo oduesp [mailto:pseudo20...@gmail.com]
Sent: Thursday, June 16, 2016 9:43 AM
To: user@spark.apache.org
Subject: difference between dataframe and dataframwrite

hi,

what is difference between dataframe and dataframwrite ?



RE: streaming example has error

2016-06-16 Thread David Newberger
Try adding wordCounts.print() before ssc.start()


David Newberger

From: Lee Ho Yeung [mailto:jobmatt...@gmail.com]
Sent: Wednesday, June 15, 2016 9:16 PM
To: David Newberger
Cc: user@spark.apache.org
Subject: Re: streaming example has error

got another error StreamingContext: Error starting the context, marking it as 
stopped

/home/martin/Downloads/spark-1.6.1/bin/spark-shell --packages 
com.databricks:spark-csv_2.11:1.4.0
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
val conf = new 
SparkConf().setMaster("local[2]").setAppName("NetworkWordCount").set("spark.driver.allowMultipleContexts",
 "true")
val ssc = new StreamingContext(conf, Seconds(1))
val lines = ssc.socketTextStream("localhost", )
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
ssc.start()
ssc.awaitTermination()



scala> val pairs = words.map(word => (word, 1))
pairs: org.apache.spark.streaming.dstream.DStream[(String, Int)] = 
org.apache.spark.streaming.dstream.MappedDStream@61a5e7<mailto:org.apache.spark.streaming.dstream.MappedDStream@61a5e7>

scala> val wordCounts = pairs.reduceByKey(_ + _)
wordCounts: org.apache.spark.streaming.dstream.DStream[(String, Int)] = 
org.apache.spark.streaming.dstream.ShuffledDStream@a522f1<mailto:org.apache.spark.streaming.dstream.ShuffledDStream@a522f1>

scala> ssc.start()
16/06/15 19:14:10 ERROR StreamingContext: Error starting the context, marking 
it as stopped
java.lang.IllegalArgumentException: requirement failed: No output operations 
registered, so nothing to execute
at scala.Predef$.require(Predef.scala:233)
at 
org.apache.spark.streaming.DStreamGraph.validate(DStreamGraph.scala:161)
at 
org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:542)
at 
org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:601)
at 
org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:600)
at 
$line42.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:39)
at 
$line42.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:44)
at 
$line42.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:46)
at 
$line42.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:48)
at 
$line42.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:50)
at 
$line42.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:52)
at 
$line42.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:54)
at 
$line42.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:56)
at $line42.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:58)
at $line42.$read$$iwC$$iwC$$iwC$$iwC$$iwC.(:60)
at $line42.$read$$iwC$$iwC$$iwC$$iwC.(:62)
at $line42.$read$$iwC$$iwC$$iwC.(:64)
at $line42.$read$$iwC$$iwC.(:66)
at $line42.$read$$iwC.(:68)
at $line42.$read.(:70)
at $line42.$read$.(:74)
at $line42.$read$.()
at $line42.$eval$.(:7)
at $line42.$eval$.()
at $line42.$eval.$print()
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
at 
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346)
at 
org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
at 
org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
at 
org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
at 
org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
at 
org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
at 
org.apache.spark.repl.SparkILoop.org<http://org.apache.spark.repl.SparkILoop.org>$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
at 
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
at 
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$proces

RE: Limit pyspark.daemon threads

2016-06-15 Thread David Newberger
Have you tried setting spark.cores.max

“When running on a standalone deploy 
cluster<http://spark.apache.org/docs/latest/spark-standalone.html> or a Mesos 
cluster in "coarse-grained" sharing 
mode<http://spark.apache.org/docs/latest/running-on-mesos.html#mesos-run-modes>,
 the maximum amount of CPU cores to request for the application from across the 
cluster (not from each machine). If not set, the default will 
bespark.deploy.defaultCores on Spark's standalone cluster manager, or infinite 
(all available cores) on Mesos.”

David Newberger

From: agateaaa [mailto:agate...@gmail.com]
Sent: Wednesday, June 15, 2016 4:39 PM
To: Gene Pang
Cc: Sven Krasser; Carlile, Ken; user
Subject: Re: Limit pyspark.daemon threads

Thx Gene! But my concern is with CPU usage not memory. I want to see if there 
is anyway to control the number of pyspark.daemon processes that get spawned. 
We have some restriction on number of CPU's we can use on a node, and number of 
pyspark.daemon processes that get created dont seem to honor 
spark.executor.cores property setting
Thanks!

On Wed, Jun 15, 2016 at 1:53 PM, Gene Pang 
mailto:gene.p...@gmail.com>> wrote:
As Sven mentioned, you can use Alluxio to store RDDs in off-heap memory, and 
you can then share that RDD across different jobs. If you would like to run 
Spark on Alluxio, this documentation can help: 
http://www.alluxio.org/documentation/master/en/Running-Spark-on-Alluxio.html

Thanks,
Gene

On Tue, Jun 14, 2016 at 12:44 AM, agateaaa 
mailto:agate...@gmail.com>> wrote:
Hi,
I am seeing this issue too with pyspark (Using Spark 1.6.1).  I have set 
spark.executor.cores to 1, but I see that whenever streaming batch starts 
processing data, see python -m pyspark.daemon processes increase gradually to 
about 5, (increasing CPU% on a box about 4-5 times, each pyspark.daemon takes 
up around 100 % CPU)
After the processing is done 4 pyspark.daemon processes go away and we are left 
with one till the next batch run. Also sometimes the  CPU usage for executor 
process spikes to about 800% even though spark.executor.core is set to 1
e.g. top output
PID USER  PR   NI  VIRT  RES  SHR S   %CPU %MEMTIME+  COMMAND
19634 spark 20   0 8871420 1.790g  32056 S 814.1  2.9   0:39.33 /usr/lib/j+ 
<--EXECUTOR

13897 spark 20   0   46576  17916   6720 S   100.0  0.0   0:00.17 python -m 
+ <--pyspark.daemon
13991 spark 20   0   46524  15572   4124 S   98.0  0.0   0:08.18 python -m 
+ <--pyspark.daemon
14488 spark 20   0   46524  15636   4188 S   98.0  0.0   0:07.25 python -m 
+ <--pyspark.daemon
14514 spark 20   0   46524  15636   4188 S   94.0  0.0   0:06.72 python -m 
+ <--pyspark.daemon
14526 spark 20   0   48200  17172   4092 S   0.0  0.0   0:00.38 python -m + 
<--pyspark.daemon


Is there any way to control the number of pyspark.daemon processes that get 
spawned ?
Thank you
Agateaaa

On Sun, Mar 27, 2016 at 1:08 AM, Sven Krasser 
mailto:kras...@gmail.com>> wrote:
Hey Ken,

1. You're correct, cached RDDs live on the JVM heap. (There's an off-heap 
storage option using Alluxio, formerly Tachyon, with which I have no experience 
however.)

2. The worker memory setting is not a hard maximum unfortunately. What happens 
is that during aggregation the Python daemon will check its process size. If 
the size is larger than this setting, it will start spilling to disk. I've seen 
many occasions where my daemons grew larger. Also, you're relying on Python's 
memory management to free up space again once objects are evicted. In practice, 
leave this setting reasonably small but make sure there's enough free memory on 
the machine so you don't run into OOM conditions. If the lower memory setting 
causes strains for your users, make sure they increase the parallelism of their 
jobs (smaller partitions meaning less data is processed at a time).

3. I believe that is the behavior you can expect when setting 
spark.executor.cores. I've not experimented much with it and haven't looked at 
that part of the code, but what you describe also reflects my understanding. 
Please share your findings here, I'm sure those will be very helpful to others, 
too.

One more suggestion for your users is to move to the Pyspark DataFrame API. 
Much of the processing will then happen in the JVM, and you will bump into 
fewer Python resource contention issues.

Best,
-Sven


On Sat, Mar 26, 2016 at 1:38 PM, Carlile, Ken 
mailto:carli...@janelia.hhmi.org>> wrote:
This is extremely helpful!

I’ll have to talk to my users about how the python memory limit should be 
adjusted and what their expectations are. I’m fairly certain we bumped it up in 
the dark past when jobs were failing because of insufficient memory for the 
python processes.

So just to make sure I’m understanding correctly:


  *   JVM memory (set by SPARK_EXECUTOR_MEMORY and/or SPARK_WORKER_MEMORY?) is 
wher

RE: Handle empty kafka in Spark Streaming

2016-06-15 Thread David Newberger
Hi Yogesh,

I'm not sure if this is possible or not. I'd be interested in knowing. My gut 
thinks it would be an anti-pattern if it's possible to do something like this 
and that's why I handle it in either the foreachRDD or foreachPartition. The 
way I look at spark streaming is as an application which is always running and 
doing something like windowed batching or microbatching or whatever I'm trying 
to accomplish. IF an RDD I get from Kafka is empty then I don't run the rest of 
the job.  IF the RDD I'm get from Kafka has some number of events then I'll 
process the RDD further. 

David Newberger

-Original Message-
From: Yogesh Vyas [mailto:informy...@gmail.com] 
Sent: Wednesday, June 15, 2016 8:30 AM
To: David Newberger
Subject: Re: Handle empty kafka in Spark Streaming

I am looking for something which checks the JavaPairReceiverInputDStreambefore 
further going for any operations.
For example, if I have get JavaPairReceiverInputDStream in following
manner:

JavaPairReceiverInputDStream 
message=KafkaUtils.createStream(ssc, zkQuorum, group, topics, 
StorageLevel.MEMORY_AND_DISK_SER());

Then I would like check whether message is empty or not. If it not empty then 
go for further operations else wait for some data in Kafka.

On Wed, Jun 15, 2016 at 6:31 PM, David Newberger  
wrote:
> If you're asking how to handle no messages in a batch window then I would add 
> an isEmpty check like:
>
> dStream.foreachRDD(rdd => {
> if (!rdd.isEmpty())
> ...
> }
>
> Or something like that.
>
>
> David Newberger
>
> -Original Message-
> From: Yogesh Vyas [mailto:informy...@gmail.com]
> Sent: Wednesday, June 15, 2016 6:31 AM
> To: user
> Subject: Handle empty kafka in Spark Streaming
>
> Hi,
>
> Does anyone knows how to handle empty Kafka while Spark Streaming job is 
> running ?
>
> Regards,
> Yogesh
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For 
> additional commands, e-mail: user-h...@spark.apache.org
>


RE: Handle empty kafka in Spark Streaming

2016-06-15 Thread David Newberger
If you're asking how to handle no messages in a batch window then I would add 
an isEmpty check like:

dStream.foreachRDD(rdd => {
if (!rdd.isEmpty()) 
...
}

Or something like that. 


David Newberger

-Original Message-
From: Yogesh Vyas [mailto:informy...@gmail.com] 
Sent: Wednesday, June 15, 2016 6:31 AM
To: user
Subject: Handle empty kafka in Spark Streaming

Hi,

Does anyone knows how to handle empty Kafka while Spark Streaming job is 
running ?

Regards,
Yogesh

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org



RE: streaming example has error

2016-06-15 Thread David Newberger
Have you tried to “set spark.driver.allowMultipleContexts = true”?

David Newberger

From: Lee Ho Yeung [mailto:jobmatt...@gmail.com]
Sent: Tuesday, June 14, 2016 8:34 PM
To: user@spark.apache.org
Subject: streaming example has error

when simulate streaming with nc -lk 
got error below,
then i try example,

martin@ubuntu:~/Downloads$ /home/martin/Downloads/spark-1.6.1/bin/run-example 
streaming.NetworkWordCount localhost 
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
16/06/14 18:33:06 INFO StreamingExamples: Setting log level to [WARN] for 
streaming example. To override add a custom log4j.properties to the classpath.
16/06/14 18:33:06 WARN NativeCodeLoader: Unable to load native-hadoop library 
for your platform... using builtin-java classes where applicable
16/06/14 18:33:06 WARN Utils: Your hostname, ubuntu resolves to a loopback 
address: 127.0.1.1; using 192.168.157.134 instead (on interface eth0)
16/06/14 18:33:06 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another 
address
16/06/14 18:33:13 WARN SizeEstimator: Failed to check whether UseCompressedOops 
is set; assuming yes

got error too.

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
val lines = ssc.socketTextStream("localhost", )
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
ssc.start()
ssc.awaitTermination()



scala> import org.apache.spark._
import org.apache.spark._

scala> import org.apache.spark.streaming._
import org.apache.spark.streaming._

scala> import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.StreamingContext._

scala> val conf = new 
SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
conf: org.apache.spark.SparkConf = 
org.apache.spark.SparkConf@67bcaf<mailto:org.apache.spark.SparkConf@67bcaf>

scala> val ssc = new StreamingContext(conf, Seconds(1))
16/06/14 18:28:44 WARN AbstractLifeCycle: FAILED 
SelectChannelConnector@0.0.0.0:4040<http://SelectChannelConnector@0.0.0.0:4040>:
 java.net.BindException: Address already in use
java.net.BindException: Address already in use
at sun.nio.ch.Net.bind0(Native Method)
at sun.nio.ch.Net.bind(Net.java:433)
at sun.nio.ch.Net.bind(Net.java:425)
at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:223)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)
at 
org.eclipse.jetty.server.nio.SelectChannelConnector.open(SelectChannelConnector.java:187)
at 
org.eclipse.jetty.server.AbstractConnector.doStart(AbstractConnector.java:316)
at 
org.eclipse.jetty.server.nio.SelectChannelConnector.doStart(SelectChannelConnector.java:265)
at 
org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64)
at org.eclipse.jetty.server.Server.doStart(Server.java:293)
at 
org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64)
at 
org.apache.spark.ui.JettyUtils$.org$apache$spark$ui$JettyUtils$$connect$1(JettyUtils.scala:252)
at org.apache.spark.ui.JettyUtils$$anonfun$5.apply(JettyUtils.scala:262)
at org.apache.spark.ui.JettyUtils$$anonfun$5.apply(JettyUtils.scala:262)
at 
org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1988)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1979)
at org.apache.spark.ui.JettyUtils$.startJettyServer(JettyUtils.scala:262)
at org.apache.spark.ui.WebUI.bind(WebUI.scala:136)
at org.apache.spark.SparkContext$$anonfun$13.apply(SparkContext.scala:481)
at org.apache.spark.SparkContext$$anonfun$13.apply(SparkContext.scala:481)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.SparkContext.(SparkContext.scala:481)
at 
org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:874)
at 
org.apache.spark.streaming.StreamingContext.(StreamingContext.scala:81)
at 
$line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:36)
at 
$line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:41)
at 
$line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:43)
at 
$line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:45)
at 
$line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:47)
at 
$line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:49)
at 
$line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:51)
at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:53)
  

RE: Creating a Hive table through Spark and potential locking issue (a bug)

2016-06-08 Thread David Newberger
Could you be looking at 2 jobs trying to use the same file and one getting to 
it before the other and finally removing it?

David Newberger

From: Mich Talebzadeh [mailto:mich.talebza...@gmail.com]
Sent: Wednesday, June 8, 2016 1:33 PM
To: user; user @spark
Subject: Creating a Hive table through Spark and potential locking issue (a bug)


Hi,

I noticed an issue with Spark creating and populating a Hive table.

The process as I see is as follows:


  1.  Spark creates the Hive table. In this case an ORC table in a Hive Database
  2.  Spark uses JDBC connection to get data out from an Oracle
  3.  I create a temp table in Spark through (registerTempTable)
  4.  Spark populates that table. That table is actually created in

   hdfs dfs -ls /tmp/hive/hduser
   drwx--   - hduser supergroup
   /tmp/hive/hduser/b1ea6829-790f-4b37-a0ff-3ed218388059



  1.  However, The original table itself does not have any locking on it!
  2.  I log in into Hive and drop that table

3. hive> drop table dummy;
OK


  1.   That table is dropped OK
  2.  Spark crashes with message
Started at
[08/06/2016 18:37:53.53]
16/06/08 19:13:46 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException):
 No lease on 
/user/hive/warehouse/oraclehadoop.db/dummy/.hive-staging_hive_2016-06-08_18-38-08_804_3299712811201460314-1/-ext-1/_temporary/0/_temporary/attempt_201606081838_0001_m_00_0/part-0
 (inode 831621): File does not exist. Holder 
DFSClient_NONMAPREDUCE_-1836386597_1 does not have any open files.
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3516)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.analyzeFileState(FSNamesystem.java:3313)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3169)
at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:641)
at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:482)
at 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:962)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2039)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2035)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2033)
at org.apache.hadoop.ipc.Client.call(Client.java:1468)
at org.apache.hadoop.ipc.Client.call(Client.java:1399)
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232)
at com.sun.proxy.$Proxy22.addBlock(Unknown Source)
at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:399)
at sun.reflect.GeneratedMethodAccessor16.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy23.addBlock(Unknown Source)
at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.locateFollowingBlock(DFSOutputStream.java:1532)
at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1349)
at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:588)
16/06/08 19:13:46 ERROR TaskSetManager: Task 0 in stage 1.0 failed 1 times; 
aborting job

Suggested solution.
In a concurrent env, Spark should apply locks in order to prevent such 
operations. Locks are kept in Hive meta data table HIVE_LOCKS

HTH
Dr Mich Talebzadeh



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com<http://talebzadehmich.wordpress.com/>




RE: Twitter streaming error : No lease on /user/hduser/checkpoint/temp (inode 806125): File does not exist.

2016-06-03 Thread David Newberger
Hi Mich,

My gut says you are correct that each application should have its own 
checkpoint directory. Though honestly I’m a bit fuzzy on checkpointing still as 
I’ve not worked with it much yet.

Cheers,

David Newberger

From: Mich Talebzadeh [mailto:mich.talebza...@gmail.com]
Sent: Friday, June 3, 2016 3:40 PM
To: David Newberger
Cc: user @spark
Subject: Re: Twitter streaming error : No lease on /user/hduser/checkpoint/temp 
(inode 806125): File does not exist.

Hi David

yes they do

The  first streaming job does

val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint("checkpoint")

And the twitter does

  /** Returns the HDFS URL */
  def getCheckpointDirectory(): String = {
try {
  val name : String = Seq("bash", "-c", "curl -s 
http://169.254.169.254/latest/meta-data/hostname";) !! ;
  println("Hostname = " + name)
  "hdfs://" + name.trim + ":9000/checkpoint/"
} catch {
  case e: Exception => {
"./checkpoint/"
  }
}

I need to change one of these.

Actually a better alternative would be that each application has its own 
checkpoint?

THanks




Dr Mich Talebzadeh



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com<http://talebzadehmich.wordpress.com/>



On 3 June 2016 at 21:23, David Newberger 
mailto:david.newber...@wandcorp.com>> wrote:
I was going to ask if you had 2 jobs running. If the checkpointing for both are 
setup to look at the same location I could see an error like this happening. Do 
both spark jobs have a reference to a checkpointing dir?

David Newberger

From: Mich Talebzadeh 
[mailto:mich.talebza...@gmail.com<mailto:mich.talebza...@gmail.com>]
Sent: Friday, June 3, 2016 3:20 PM
To: user @spark
Subject: Re: Twitter streaming error : No lease on /user/hduser/checkpoint/temp 
(inode 806125): File does not exist.

OK

I was running two spark streaming jobs, one using streaming data from Kafka and 
another from twitter in local mode on the same node.

It is possible that the directory /user/hduser/checkpoint/temp is  shared by 
both spark streaming jobs

any experience on this please?

Thanks


Dr Mich Talebzadeh



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com<http://talebzadehmich.wordpress.com/>



On 3 June 2016 at 20:48, Mich Talebzadeh 
mailto:mich.talebza...@gmail.com>> wrote:
Hi,

Just started seeing these errors:

16/06/03 20:30:01 ERROR DFSClient: Failed to close inode 806125
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException):
 No lease on /user/hduser/checkpoint/temp (inode 806125): File does not exist. 
[Lease.  Holder: DFSClient_NONMAPREDUCE_-907736468_1, pendingcreates: 1]
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3516)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.analyzeFileState(FSNamesystem.java:3313)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3169)
at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:641)
at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:482)
at 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619)


Sounds like a connection is left open but cannot establish why!

Thanks


Dr Mich Talebzadeh



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com<http://talebzadehmich.wordpress.com/>






RE: Twitter streaming error : No lease on /user/hduser/checkpoint/temp (inode 806125): File does not exist.

2016-06-03 Thread David Newberger
I was going to ask if you had 2 jobs running. If the checkpointing for both are 
setup to look at the same location I could see an error like this happening. Do 
both spark jobs have a reference to a checkpointing dir?

David Newberger

From: Mich Talebzadeh [mailto:mich.talebza...@gmail.com]
Sent: Friday, June 3, 2016 3:20 PM
To: user @spark
Subject: Re: Twitter streaming error : No lease on /user/hduser/checkpoint/temp 
(inode 806125): File does not exist.

OK

I was running two spark streaming jobs, one using streaming data from Kafka and 
another from twitter in local mode on the same node.

It is possible that the directory /user/hduser/checkpoint/temp is  shared by 
both spark streaming jobs

any experience on this please?

Thanks


Dr Mich Talebzadeh



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com<http://talebzadehmich.wordpress.com/>



On 3 June 2016 at 20:48, Mich Talebzadeh 
mailto:mich.talebza...@gmail.com>> wrote:
Hi,

Just started seeing these errors:

16/06/03 20:30:01 ERROR DFSClient: Failed to close inode 806125
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException):
 No lease on /user/hduser/checkpoint/temp (inode 806125): File does not exist. 
[Lease.  Holder: DFSClient_NONMAPREDUCE_-907736468_1, pendingcreates: 1]
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3516)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.analyzeFileState(FSNamesystem.java:3313)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3169)
at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:641)
at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:482)
at 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619)


Sounds like a connection is left open but cannot establish why!

Thanks


Dr Mich Talebzadeh



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com<http://talebzadehmich.wordpress.com/>





RE: Spark Streaming - long garbage collection time

2016-06-03 Thread David Newberger
Have you tried UseG1GC in place of UseConcMarkSweepGC? This article really 
helped me with GC a few short weeks ago

https://databricks.com/blog/2015/05/28/tuning-java-garbage-collection-for-spark-applications.html
 

David Newberger

-Original Message-
From: Marco1982 [mailto:marco.plata...@yahoo.it] 
Sent: Friday, June 3, 2016 2:19 PM
To: user@spark.apache.org
Subject: Spark Streaming - long garbage collection time

Hi all,

I'm running a Spark Streaming application with 1-hour batches to join two data 
feeds and write the output to disk. The total size of one data feed is about 40 
GB per hour (split in multiple files), while the size of the second data feed 
is about 600-800 MB per hour (also split in multiple files). Due to application 
constraints, I may not be able to run smaller batches.
Currently, it takes about 20 minutes to produce the output in a cluster with
140 cores and 700 GB of RAM. I'm running 7 workers and 28 executors, each with 
5 cores and 22 GB of RAM.

I execute mapToPair(), filter(), and reduceByKeyAndWindow(1 hour batch) on the 
40 GB data feed. Most of the computation time is spent on these operations. 
What worries me is the Garbage Collection (GC) execution time per executor, 
which goes from 25 secs to 9.2 mins. I attach two screenshots
below: one lists the GC time and one prints out GC comments for a single 
executor. I anticipate that the executor that spends 9.2 mins in doing garbage 
collection is eventually killed by the Spark driver.

I think these numbers are too high. Do you have any suggestion about keeping GC 
time low? I'm already using Kryo Serializer, ++UseConcMarkSweepGC, and 
spark.rdd.compress=true.

Is there anything else that would help?

Thanks
<http://apache-spark-user-list.1001560.n3.nabble.com/file/n27087/gc_time.png>
<http://apache-spark-user-list.1001560.n3.nabble.com/file/n27087/executor_16.png>
 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-long-garbage-collection-time-tp27087.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: About a problem running a spark job in a cdh-5.7.0 vmware image.

2016-06-03 Thread David Newberger
Alonso, I could totally be misunderstanding something or missing a piece of the 
puzzle however remove .setMaster. If you do that it will run with whatever the 
CDH VM is setup for which in the out of the box default case is YARN and Client.

val sparkConf = new SparkConf().setAppName(“Some App thingy thing”)

From the Spark 1.6.0 Scala API Documentation:
https://spark.apache.org/docs/1.6.0/api/scala/index.html#org.apache.spark.SparkConf


“
Configuration for a Spark application. Used to set various Spark parameters as 
key-value pairs.

Most of the time, you would create a SparkConf object with new SparkConf(), 
which will load values from any spark.* Java system properties set in your 
application as well. In this case, parameters you set directly on the SparkConf 
object take priority over system properties.

For unit tests, you can also call new SparkConf(false) to skip loading external 
settings and get the same configuration no matter what the system properties 
are.

All setter methods in this class support chaining. For example, you can write 
new SparkConf().setMaster("local").setAppName("My app").

Note that once a SparkConf object is passed to Spark, it is cloned and can no 
longer be modified by the user. Spark does not support modifying the 
configuration at runtime.
“

David Newberger

From: Alonso Isidoro Roman [mailto:alons...@gmail.com]
Sent: Friday, June 3, 2016 10:37 AM
To: David Newberger
Cc: user@spark.apache.org
Subject: Re: About a problem running a spark job in a cdh-5.7.0 vmware image.

Thank you David, so, i would have to change the way that i am creating  
SparkConf object, isn't?

I can see in this 
link<http://www.cloudera.com/documentation/enterprise/latest/topics/cdh_ig_running_spark_on_yarn.html#concept_ysw_lnp_h5>
 that the way to run a spark job using YARN is using this kind of command:


spark-submit --class org.apache.spark.examples.SparkPi --master yarn \

--deploy-mode client SPARK_HOME/lib/spark-examples.jar 10

Can i use this way programmatically? maybe changing setMaster? to something 
like setMaster("yarn:quickstart.cloudera:8032")?

I have seen the port in this guide: 
http://www.cloudera.com/documentation/enterprise/5-6-x/topics/cdh_ig_ports_cdh5.html






RE: [REPOST] Severe Spark Streaming performance degradation after upgrading to 1.6.1

2016-06-03 Thread David Newberger
What does your processing time look like. Is it consistently within that 20sec 
micro batch window?

David Newberger

From: Adrian Tanase [mailto:atan...@adobe.com]
Sent: Friday, June 3, 2016 8:14 AM
To: user@spark.apache.org
Cc: Cosmin Ciobanu
Subject: [REPOST] Severe Spark Streaming performance degradation after 
upgrading to 1.6.1

Hi all,

Trying to repost this question from a colleague on my team, somehow his 
subscription is not active:
http://apache-spark-user-list.1001560.n3.nabble.com/Severe-Spark-Streaming-performance-degradation-after-upgrading-to-1-6-1-td27056.html

Appreciate any thoughts,
-adrian


RE: About a problem running a spark job in a cdh-5.7.0 vmware image.

2016-06-03 Thread David Newberger
Alonso,

The CDH VM uses YARN and the default deploy mode is client. I’ve been able to 
use the CDH VM for many learning scenarios.


http://www.cloudera.com/documentation/enterprise/latest.html
http://www.cloudera.com/documentation/enterprise/latest/topics/spark.html

David Newberger

From: Alonso [mailto:alons...@gmail.com]
Sent: Friday, June 3, 2016 5:39 AM
To: user@spark.apache.org
Subject: About a problem running a spark job in a cdh-5.7.0 vmware image.

Hi, i am developing a project that needs to use kafka, spark-streaming and 
spark-mllib, this is the github 
project<https://github.com/alonsoir/awesome-recommendation-engine/tree/develop>.

I am using a vmware cdh-5.7-0 image, with 4 cores and 8 GB of ram, the file 
that i want to use is only 16 MB, if i finding problems related with resources 
because the process outputs this message:

   .set("spark.driver.allowMultipleContexts", 
"true")


<https://about.me/alonso.isidoro.roman?promo=email_sig&utm_source=email_sig&utm_medium=email_sig&utm_campaign=external_links>
16/06/03 11:58:09 WARN TaskSchedulerImpl: Initial job has not accepted any 
resources; check your cluster UI to ensure that workers are registered and have 
sufficient resources



when i go to spark-master page, i can see this:


Spark Master at spark://192.168.30.137:7077

URL: spark://192.168.30.137:7077
REST URL: spark://192.168.30.137:6066 (cluster mode)
Alive Workers: 0
Cores in use: 0 Total, 0 Used
Memory in use: 0.0 B Total, 0.0 B Used
Applications: 2 Running, 0 Completed
Drivers: 0 Running, 0 Completed
Status: ALIVE

Workers
Worker Id Address State Cores Memory
Running Applications
Application ID Name Cores Memory per Node Submitted Time User State Duration
app-20160603115752-0001
(kill)
AmazonKafkaConnector 0 1024.0 MB 2016/06/03 11:57:52 cloudera WAITING 2.0 min
app-20160603115751-
(kill)
AmazonKafkaConnector 0 1024.0 MB 2016/06/03 11:57:51 cloudera WAITING 2.0 min


And this is the spark-worker output:

Spark Worker at 192.168.30.137:7078

ID: worker-20160603115937-192.168.30.137-7078
Master URL:
Cores: 4 (0 Used)
Memory: 6.7 GB (0.0 B Used)

Back to Master
Running Executors (0)
ExecutorID Cores State Memory Job Details Logs

It is weird isn't ? master url is not set up and there is not any ExecutorID, 
Cores, so on so forth...

If i do a ps xa | grep spark, this is the output:

[cloudera@quickstart bin]$ ps xa | grep spark
 6330 ?Sl 0:11 /usr/java/jdk1.7.0_67-cloudera/bin/java -cp 
/usr/lib/spark/conf/:/usr/lib/spark/lib/spark-assembly-1.6.0-cdh5.7.0-hadoop2.6.0-cdh5.7.0.jar:/etc/hadoop/conf/:/usr/lib/spark/lib/spark-assembly.jar:/usr/lib/hadoop/lib/*:/usr/lib/hadoop/*:/usr/lib/hadoop-hdfs/lib/*:/usr/lib/hadoop-hdfs/*:/usr/lib/hadoop-mapreduce/lib/*:/usr/lib/hadoop-mapreduce/*:/usr/lib/hadoop-yarn/lib/*:/usr/lib/hadoop-yarn/*:/usr/lib/hive/lib/*:/usr/lib/flume-ng/lib/*:/usr/lib/paquet/lib/*:/usr/lib/avro/lib/*
 -Dspark.deploy.defaultCores=4 -Xms1g -Xmx1g -XX:MaxPermSize=256m 
org.apache.spark.deploy.master.Master

 6674 ?Sl 0:12 /usr/java/jdk1.7.0_67-cloudera/bin/java -cp 
/etc/spark/conf/:/usr/lib/spark/lib/spark-assembly-1.6.0-cdh5.7.0-hadoop2.6.0-cdh5.7.0.jar:/etc/hadoop/conf/:/usr/lib/spark/lib/spark-assembly.jar:/usr/lib/hadoop/lib/*:/usr/lib/hadoop/*:/usr/lib/hadoop-hdfs/lib/*:/usr/lib/hadoop-hdfs/*:/usr/lib/hadoop-mapreduce/lib/*:/usr/lib/hadoop-mapreduce/*:/usr/lib/hadoop-yarn/lib/*:/usr/lib/hadoop-yarn/*:/usr/lib/hive/lib/*:/usr/lib/flume-ng/lib/*:/usr/lib/paquet/lib/*:/usr/lib/avro/lib/*
 -Dspark.history.fs.logDirectory=hdfs:///user/spark/applicationHistory 
-Dspark.history.ui.port=18088 -Xms1g -Xmx1g -XX:MaxPermSize=256m 
org.apache.spark.deploy.history.HistoryServer

 8153 pts/1Sl+0:14 /usr/java/jdk1.7.0_67-cloudera/bin/java -cp 
/home/cloudera/awesome-recommendation-engine/target/pack/lib/* 
-Dprog.home=/home/cloudera/awesome-recommendation-engine/target/pack 
-Dprog.version=1.0-SNAPSHOT example.spark.AmazonKafkaConnector 
192.168.1.35:9092 amazonRatingsTopic

 8413 ?Sl 0:04 /usr/java/jdk1.7.0_67-cloudera/bin/java -cp 
/usr/lib/spark/conf/:/usr/lib/spark/lib/spark-assembly-1.6.0-cdh5.7.0-hadoop2.6.0-cdh5.7.0.jar:/etc/hadoop/conf/:/usr/lib/spark/lib/spark-assembly.jar:/usr/lib/hadoop/lib/*:/usr/lib/hadoop/*:/usr/lib/hadoop-hdfs/lib/*:/usr/lib/hadoop-hdfs/*:/usr/lib/hadoop-mapreduce/lib/*:/usr/lib/hadoop-mapreduce/*:/usr/lib/hadoop-yarn/lib/*:/usr/lib/hadoop-yarn/*:/usr/lib/hive/lib/*:/usr/lib/flume-ng/lib/*:/usr/lib/paquet/lib/*:/usr/lib/avro/lib/*
 -Xms1g -Xmx1g -XX:MaxPermSize=256m org.apache.spark.deploy.worker.Worker 
spark://quickstart.cloudera:7077

 8619 pts/3S+ 0:00 grep spark

master is set up with four cores and 1 GB and worker has not any dedicated core 
and it is using 1GB, that is weird isn't ? I have configured the vmware image 
with 4 cores (from eight) 

RE: About a problem when mapping a file located within a HDFS vmware cdh-5.7 image

2016-05-31 Thread David Newberger
Have you tried it without either of the setMaster lines?

Also, CDH 5.7 uses spark 1.6.0 with some patches. I would recommend using the 
cloudera repo for spark files in build sbt. I’d also check other files in the 
build sbt to see if there are cdh specific versions.

David Newberger

From: Alonso Isidoro Roman [mailto:alons...@gmail.com]
Sent: Tuesday, May 31, 2016 1:23 PM
To: David Newberger
Cc: user@spark.apache.org
Subject: Re: About a problem when mapping a file located within a HDFS vmware 
cdh-5.7 image

Hi David, the one of the develop branch. I think It should be the same, but 
actually not sure...

Regards

Alonso Isidoro Roman
about.me/alonso.isidoro.roman


2016-05-31 19:40 GMT+02:00 David Newberger 
mailto:david.newber...@wandcorp.com>>:
Is 
https://github.com/alonsoir/awesome-recommendation-engine/blob/master/build.sbt 
  the build.sbt you are using?

David Newberger
QA Analyst
WAND  -  The Future of Restaurant Technology
(W)  www.wandcorp.com<http://www.wandcorp.com/>
(E)   david.newber...@wandcorp.com<mailto:david.newber...@wandcorp.com>
(P)   952.361.6200

From: Alonso [mailto:alons...@gmail.com<mailto:alons...@gmail.com>]
Sent: Tuesday, May 31, 2016 11:11 AM
To: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: About a problem when mapping a file located within a HDFS vmware 
cdh-5.7 image


I have a vmware cloudera image, cdh-5.7 running with centos6.8, i am using OS X 
as my development machine, and the cdh image to run the code, i upload the code 
using git to the cdh image, i have modified my /etc/hosts file located in the 
cdh image with a line like this:

127.0.0.1   quickstart.cloudera quickstart  localhost   
localhost.domain



192.168.30.138   quickstart.cloudera quickstart  localhost   
localhost.domain

The cloudera version that i am running is:

[cloudera@quickstart bin]$ cat /usr/lib/hadoop/cloudera/cdh_version.properties



# Autogenerated build properties

version=2.6.0-cdh5.7.0

git.hash=c00978c67b0d3fe9f3b896b5030741bd40bf541a

cloudera.hash=c00978c67b0d3fe9f3b896b5030741bd40bf541a

cloudera.cdh.hash=e7465a27c5da4ceee397421b89e924e67bc3cbe1

cloudera.cdh-packaging.hash=8f9a1632ebfb9da946f7d8a3a8cf86efcdccec76

cloudera.base-branch=cdh5-base-2.6.0

cloudera.build-branch=cdh5-2.6.0_5.7.0

cloudera.pkg.version=2.6.0+cdh5.7.0+1280

cloudera.pkg.release=1.cdh5.7.0.p0.92

cloudera.cdh.release=cdh5.7.0

cloudera.build.time=2016.03.23-18:30:29GMT

I can do a ls command in the vmware machine:

[cloudera@quickstart ~]$ hdfs dfs -ls /user/cloudera/ratings.csv

-rw-r--r-- 1 cloudera cloudera 16906296 2016-05-30 11:29 
/user/cloudera/ratings.csv

I can read its content:

[cloudera@quickstart ~]$ hdfs dfs -cat /user/cloudera/ratings.csv | wc -l

568454

The code is quite simple, just trying to map its content:

val ratingFile="hdfs://quickstart.cloudera:8020/user/cloudera/ratings.csv"



case class AmazonRating(userId: String, productId: String, rating: Double)



val NumRecommendations = 10

val MinRecommendationsPerUser = 10

val MaxRecommendationsPerUser = 20

val MyUsername = "myself"

val NumPartitions = 20





println("Using this ratingFile: " + ratingFile)

  // first create an RDD out of the rating file

val rawTrainingRatings = sc.textFile(ratingFile).map {

line =>

  val Array(userId, productId, scoreStr) = line.split(",")

  AmazonRating(userId, productId, scoreStr.toDouble)

}



  // only keep users that have rated between MinRecommendationsPerUser and 
MaxRecommendationsPerUser products

val trainingRatings = rawTrainingRatings.groupBy(_.userId).filter(r => 
MinRecommendationsPerUser <= r._2.size  && r._2.size < 
MaxRecommendationsPerUser).flatMap(_._2).repartition(NumPartitions).cache()



println(s"Parsed $ratingFile. Kept ${trainingRatings.count()} ratings out of 
${rawTrainingRatings.count()}")

I am getting this message:

Parsed hdfs://quickstart.cloudera:8020/user/cloudera/ratings.csv. Kept 0 
ratings out of 568454

because if i run the exact code within the spark-shell, i got this message:

Parsed hdfs://quickstart.cloudera:8020/user/cloudera/ratings.csv. Kept 73279 
ratings out of 568454

Why is it working fine within the spark-shell but it is not running fine 
programmatically  in the vmware image?

I am running the code using sbt-pack plugin to generate unix commands and run 
them within the vmware image which has the spark pseudocluster,

This is the code i use to instantiate the sparkconf:

val sparkConf = new SparkConf().setAppName("AmazonKafkaConnector")

   
.setMaster("local[4]").set("spark.driver.allowMultipleContexts", "true")

val sc = new SparkContext(sparkConf)

val sqlContext = new SQLContext(sc)

val ssc = new StreamingContext(sparkConf, Seconds(2))

//this checkpointdir should be in a conf file

RE: About a problem when mapping a file located within a HDFS vmware cdh-5.7 image

2016-05-31 Thread David Newberger
Is 
https://github.com/alonsoir/awesome-recommendation-engine/blob/master/build.sbt 
  the build.sbt you are using?

David Newberger
QA Analyst
WAND  -  The Future of Restaurant Technology
(W)  www.wandcorp.com<http://www.wandcorp.com/>
(E)   david.newber...@wandcorp.com<mailto:david.newber...@wandcorp.com>
(P)   952.361.6200

From: Alonso [mailto:alons...@gmail.com]
Sent: Tuesday, May 31, 2016 11:11 AM
To: user@spark.apache.org
Subject: About a problem when mapping a file located within a HDFS vmware 
cdh-5.7 image


I have a vmware cloudera image, cdh-5.7 running with centos6.8, i am using OS X 
as my development machine, and the cdh image to run the code, i upload the code 
using git to the cdh image, i have modified my /etc/hosts file located in the 
cdh image with a line like this:

127.0.0.1   quickstart.cloudera quickstart  localhost   
localhost.domain



192.168.30.138   quickstart.cloudera quickstart  localhost   
localhost.domain

The cloudera version that i am running is:

[cloudera@quickstart bin]$ cat /usr/lib/hadoop/cloudera/cdh_version.properties



# Autogenerated build properties

version=2.6.0-cdh5.7.0

git.hash=c00978c67b0d3fe9f3b896b5030741bd40bf541a

cloudera.hash=c00978c67b0d3fe9f3b896b5030741bd40bf541a

cloudera.cdh.hash=e7465a27c5da4ceee397421b89e924e67bc3cbe1

cloudera.cdh-packaging.hash=8f9a1632ebfb9da946f7d8a3a8cf86efcdccec76

cloudera.base-branch=cdh5-base-2.6.0

cloudera.build-branch=cdh5-2.6.0_5.7.0

cloudera.pkg.version=2.6.0+cdh5.7.0+1280

cloudera.pkg.release=1.cdh5.7.0.p0.92

cloudera.cdh.release=cdh5.7.0

cloudera.build.time=2016.03.23-18:30:29GMT

I can do a ls command in the vmware machine:

[cloudera@quickstart ~]$ hdfs dfs -ls /user/cloudera/ratings.csv

-rw-r--r-- 1 cloudera cloudera 16906296 2016-05-30 11:29 
/user/cloudera/ratings.csv

I can read its content:

[cloudera@quickstart ~]$ hdfs dfs -cat /user/cloudera/ratings.csv | wc -l

568454

The code is quite simple, just trying to map its content:

val ratingFile="hdfs://quickstart.cloudera:8020/user/cloudera/ratings.csv"



case class AmazonRating(userId: String, productId: String, rating: Double)



val NumRecommendations = 10

val MinRecommendationsPerUser = 10

val MaxRecommendationsPerUser = 20

val MyUsername = "myself"

val NumPartitions = 20





println("Using this ratingFile: " + ratingFile)

  // first create an RDD out of the rating file

val rawTrainingRatings = sc.textFile(ratingFile).map {

line =>

  val Array(userId, productId, scoreStr) = line.split(",")

  AmazonRating(userId, productId, scoreStr.toDouble)

}



  // only keep users that have rated between MinRecommendationsPerUser and 
MaxRecommendationsPerUser products

val trainingRatings = rawTrainingRatings.groupBy(_.userId).filter(r => 
MinRecommendationsPerUser <= r._2.size  && r._2.size < 
MaxRecommendationsPerUser).flatMap(_._2).repartition(NumPartitions).cache()



println(s"Parsed $ratingFile. Kept ${trainingRatings.count()} ratings out of 
${rawTrainingRatings.count()}")

I am getting this message:

Parsed hdfs://quickstart.cloudera:8020/user/cloudera/ratings.csv. Kept 0 
ratings out of 568454

because if i run the exact code within the spark-shell, i got this message:

Parsed hdfs://quickstart.cloudera:8020/user/cloudera/ratings.csv. Kept 73279 
ratings out of 568454

Why is it working fine within the spark-shell but it is not running fine 
programmatically  in the vmware image?

I am running the code using sbt-pack plugin to generate unix commands and run 
them within the vmware image which has the spark pseudocluster,

This is the code i use to instantiate the sparkconf:

val sparkConf = new SparkConf().setAppName("AmazonKafkaConnector")

   
.setMaster("local[4]").set("spark.driver.allowMultipleContexts", "true")

val sc = new SparkContext(sparkConf)

val sqlContext = new SQLContext(sc)

val ssc = new StreamingContext(sparkConf, Seconds(2))

//this checkpointdir should be in a conf file, for now it is hardcoded!

val streamingCheckpointDir = 
"/home/cloudera/my-recommendation-spark-engine/checkpoint"

ssc.checkpoint(streamingCheckpointDir)

I have tried to use this way of setting spark master, but an exception raises, 
i suspect that this is symptomatic of my problem.  
//.setMaster("spark://quickstart.cloudera:7077")

The exception when i try to use the fully qualified domain name:

.setMaster("spark://quickstart.cloudera:7077")



java.io.IOException: Failed to connect to 
quickstart.cloudera/127.0.0.1:7077<http://127.0.0.1:7077>

at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:216)

at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:167)

at 
org.a

RE: Can not set spark dynamic resource allocation

2016-05-20 Thread David Newberger
Hi All,

The error you are seeing looks really similar to Spark-13514 to me. I could be 
wrong though

https://issues.apache.org/jira/browse/SPARK-13514

Can you check yarn.nodemanager.local-dirs  in your YARN configuration for 
"file://"


Cheers!
David Newberger

-Original Message-
From: Cui, Weifeng [mailto:weife...@a9.com] 
Sent: Friday, May 20, 2016 4:26 PM
To: Marcelo Vanzin
Cc: Ted Yu; Rodrick Brown; user; Zhao, Jun; Aulakh, Sahib; Song, Yiwei
Subject: Re: Can not set spark dynamic resource allocation

Sorry, here is the node-manager log. application_1463692924309_0002 is my test. 
Hope this will help.
http://pastebin.com/0BPEcgcW



On 5/20/16, 2:09 PM, "Marcelo Vanzin"  wrote:

>Hi Weifeng,
>
>That's the Spark event log, not the YARN application log. You get the 
>latter using the "yarn logs" command.
>
>On Fri, May 20, 2016 at 1:14 PM, Cui, Weifeng  wrote:
>> Here is the application log for this spark job.
>>
>> http://pastebin.com/2UJS9L4e
>>
>>
>>
>> Thanks,
>> Weifeng
>>
>>
>>
>>
>>
>> From: "Aulakh, Sahib" 
>> Date: Friday, May 20, 2016 at 12:43 PM
>> To: Ted Yu 
>> Cc: Rodrick Brown , Cui Weifeng 
>> , user , "Zhao, Jun"
>> 
>> Subject: Re: Can not set spark dynamic resource allocation
>>
>>
>>
>> Yes it is yarn. We have configured spark shuffle service w yarn node 
>> manager but something must be off.
>>
>>
>>
>> We will send u app log on paste bin.
>>
>> Sent from my iPhone
>>
>>
>> On May 20, 2016, at 12:35 PM, Ted Yu  wrote:
>>
>> Since yarn-site.xml was cited, I assume the cluster runs YARN.
>>
>>
>>
>> On Fri, May 20, 2016 at 12:30 PM, Rodrick Brown 
>>  wrote:
>>
>> Is this Yarn or Mesos? For the later you need to start an external 
>> shuffle service.
>>
>> Get Outlook for iOS
>>
>>
>>
>>
>>
>> On Fri, May 20, 2016 at 11:48 AM -0700, "Cui, Weifeng" 
>> 
>> wrote:
>>
>> Hi guys,
>>
>>
>>
>> Our team has a hadoop 2.6.0 cluster with Spark 1.6.1. We want to set 
>> dynamic resource allocation for spark and we followed the following 
>> link. After the changes, all spark jobs failed.
>>
>> https://spark.apache.org/docs/latest/job-scheduling.html#dynamic-reso
>> urce-allocation
>>
>> This test was on a test cluster which has 1 master machine (running 
>> namenode, resourcemanager and hive server), 1 worker machine (running 
>> datanode and nodemanager) and 1 machine as client( running spark shell).
>>
>>
>>
>> What I updated in config :
>>
>>
>>
>> 1. Update in spark-defaults.conf
>>
>> spark.dynamicAllocation.enabled true
>>
>> spark.shuffle.service.enabledtrue
>>
>>
>>
>> 2. Update yarn-site.xml
>>
>> 
>>
>>  yarn.nodemanager.aux-services
>>   mapreduce_shuffle,spark_shuffle
>> 
>>
>> 
>> yarn.nodemanager.aux-services.spark_shuffle.class
>> org.apache.spark.network.yarn.YarnShuffleService
>> 
>>
>> 
>> spark.shuffle.service.enabled
>>  true
>> 
>>
>> 3. Copy  spark-1.6.1-yarn-shuffle.jar to yarn.application.classpath
>> ($HADOOP_HOME/share/hadoop/yarn/*) in python code
>>
>> 4. Restart namenode, datanode, resourcemanager, nodemanger... retart 
>> everything
>>
>> 5. The config will update in all machines, resourcemanager and nodemanager.
>> We update the config in one place and copy to all machines.
>>
>>
>>
>> What I tested:
>>
>>
>>
>> 1. I started a scala spark shell and check its environment variables, 
>> spark.dynamicAllocation.enabled is true.
>>
>> 2. I used the following code:
>>
>> scala > val line =
>> sc.textFile("/spark-events/application_1463681113470_0006")
>>
>> line: org.apache.spark.rdd.RDD[String] =
>> /spark-events/application_1463681113470_0006 MapPartitionsRDD[1] at 
>> textFile at :27
>>
>> scala > line.count # This command just stuck here
>>
>>
>>
>> 3. In the beginning, there is only 1 executor(this is for driver) and 
>> after line.count, I could see 3 executors, then dropped to 1.
>>
>> 4. Several jobs were launched and all of them failed.   Tasks (for all
>&

RE: Spark replacing Hadoop

2016-04-14 Thread David Newberger
Can we assume your question is “Will Spark replace Hadoop MapReduce?” or do you 
literally mean replacing the whole of Hadoop?

David

From: Ashok Kumar [mailto:ashok34...@yahoo.com.INVALID]
Sent: Thursday, April 14, 2016 2:13 PM
To: User
Subject: Spark replacing Hadoop

Hi,

I hear that some saying that Hadoop is getting old and out of date and will be 
replaced by Spark!

Does this make sense and if so how accurate is it?

Best


RE: DStream how many RDD's are created by batch

2016-04-12 Thread David Newberger
Hi Natu,

I believe you are correct one RDD would be created for each file.

Cheers,

David

From: Natu Lauchande [mailto:nlaucha...@gmail.com]
Sent: Tuesday, April 12, 2016 1:48 PM
To: David Newberger
Cc: user@spark.apache.org
Subject: Re: DStream how many RDD's are created by batch

Hi David,
Thanks for you answer.
I have a follow up question :
I am using textFileStream , and listening in an S3 bucket for new files to 
process.  Files are created every 5 minutes and my batch interval is 2 minutes .

Does it mean that each file will be for one RDD ?

Thanks,
Natu

On Tue, Apr 12, 2016 at 7:46 PM, David Newberger 
mailto:david.newber...@wandcorp.com>> wrote:
Hi,

Time is usually the criteria if I’m understanding your question. An RDD is 
created for each batch interval. If your interval is 500ms then an RDD would be 
created every 500ms. If it’s 2 seconds then an RDD is created every 2 seconds.

Cheers,

David

From: Natu Lauchande [mailto:nlaucha...@gmail.com<mailto:nlaucha...@gmail.com>]
Sent: Tuesday, April 12, 2016 7:09 AM
To: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: DStream how many RDD's are created by batch

Hi,
What's the criteria for the number of RDD's created for each micro bath 
iteration  ?

Thanks,
Natu



RE: DStream how many RDD's are created by batch

2016-04-12 Thread David Newberger
Hi,

Time is usually the criteria if I’m understanding your question. An RDD is 
created for each batch interval. If your interval is 500ms then an RDD would be 
created every 500ms. If it’s 2 seconds then an RDD is created every 2 seconds.

Cheers,

David

From: Natu Lauchande [mailto:nlaucha...@gmail.com]
Sent: Tuesday, April 12, 2016 7:09 AM
To: user@spark.apache.org
Subject: DStream how many RDD's are created by batch

Hi,
What's the criteria for the number of RDD's created for each micro bath 
iteration  ?

Thanks,
Natu


Using Experminal Spark Features

2015-12-30 Thread David Newberger
Hi All,

I've been looking at the Direct Approach for streaming Kafka integration 
(http://spark.apache.org/docs/latest/streaming-kafka-integration.html) because 
it looks like a good fit for our use cases. My concern is the feature is 
experimental according to the documentation. Has anyone used this approach yet 
and if so what has you experience been with using it? If it helps we'd be 
looking to implement it using Scala. Secondly, in general what has people 
experience been with using experimental features in Spark?

Cheers,

David Newberger



RE: fishing for help!

2015-12-21 Thread David Newberger
Hi Eran,

Based on the limited information the first things that come to my mind are 
Processor, RAM, and Disk speed.

David Newberger
QA Analyst

WAND - The Future of Restaurant Technology
(W) www.wandcorp.com<http://www.wandcorp.com/>
(E) david.newber...@wandcorp.com<mailto:david.newber...@wandcorp.com>
(P) 952.361.6200

From: Eran Witkon [mailto:eranwit...@gmail.com]
Sent: Monday, December 21, 2015 6:54 AM
To: user
Subject: fishing for help!

Hi,
I know it is a wide question but can you think of reasons why a pyspark job 
which runs on from server 1 using user 1 will run faster then the same job when 
running on server 2 with user 1
Eran