You don't submit it like that :/ You use [*] things when you run the job in local mode, whereas here you are running it in stand alone cluster mode.
You can try either of these: 1. /opt/cloudera/parcels/CDH-5.2.1-1.cdh5.2.10.12/lib/spark/bin/spark-submit --class SimpleApp --master spark://10.0.1.230:7077 *--total-executor-cores 4* --jars $(echo /home/ec2-user/sparkApps/SimpleApp/lib/*.jar | tr ' ' ',') /home/ec2-user/sparkApps/SimpleApp/target/simple-project-1.0.jar 2. /opt/cloudera/parcels/CDH-5.2.1-1.cdh5.2.10.12/lib/spark/bin/spark-submit --class SimpleApp --master *local[4]* --jars $(echo /home/ec2-user/sparkApps/SimpleApp/lib/*.jar | tr ' ' ',') /home/ec2-user/sparkApps/SimpleApp/target/simple-project-1.0.jar Thanks Best Regards On Mon, Dec 29, 2014 at 2:36 PM, Suhas Shekar <suhsheka...@gmail.com> wrote: > I tried submitting the application like this with 2 cores as you can see > with the [2]. > > > /opt/cloudera/parcels/CDH-5.2.1-1.cdh5.2.10.12/lib/spark/bin/spark-submit > --class SimpleApp --master spark://10.0.1.230:7077[2] --jars $(echo > /home/ec2-user/sparkApps/SimpleApp/lib/*.jar | tr ' ' ',') > /home/ec2-user/sparkApps/SimpleApp/target/simple-project-1.0.jar > > So I checked the url of the master (10.0.1.230). I found the results > interesting... > > Workers: 2 > Cores: 4 Total, 0 Used //so does this mean running it on the localhost has > 0 cores? > > Also when I ran my application, it did not show under "Running > Applications". > > Also, under "Completed Applications" none of my previous runs were > recorded (all were from spark-shell). > > I tried changing my submit script to 10.0.1.231:70877[2], but that did > not change anything. > > Any suggestions on if I should change my submit script or how I could do > so? > > Thanks a lot for the help! > > > Suhas Shekar > > University of California, Los Angeles > B.A. Economics, Specialization in Computing 2014 > > On Mon, Dec 29, 2014 at 12:55 AM, Akhil Das <ak...@sigmoidanalytics.com> > wrote: > >> How many cores are you allocated/seeing in the webui? (that usually runs >> on 8080, for cloudera i think its 18080). Most likely the job is being >> allocated 1 core (should be >= 2 cores) and that's why the count is never >> happening. >> >> Thanks >> Best Regards >> >> On Mon, Dec 29, 2014 at 2:22 PM, Suhas Shekar <suhsheka...@gmail.com> >> wrote: >> >>> So it got rid of the logs, but the problem still persists that : >>> >>> a) The program never terminates (I have pasted all output after the >>> Hello World statements below) >>> >>> b) I am not seeing the word count >>> >>> c) I tried adding [2] next to my 10.0.1.232:2181 looking at this post >>> http://apache-spark-user-list.1001560.n3.nabble.com/streaming-questions-td3281.html, >>> but that did not work as well. >>> >>> Any other suggestions are appreciated. >>> >>> Thanks a lot for the time :) >>> >>> 14/12/29 08:46:38 INFO ZookeeperConsumerConnector: >>> [c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471], >>> starting auto committer every 60000 ms >>> 14/12/29 08:46:38 INFO ZookeeperConsumerConnector: >>> [c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471], begin >>> registering consumer >>> c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471 in ZK >>> 14/12/29 08:46:38 INFO ZookeeperConsumerConnector: >>> [c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471], end >>> registering consumer >>> c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471 in ZK >>> 14/12/29 08:46:38 INFO ZookeeperConsumerConnector: >>> [c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471], >>> starting watcher executor thread for consumer >>> c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471 >>> 14/12/29 08:46:38 INFO ZookeeperConsumerConnector: >>> [c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471], begin >>> rebalancing consumer >>> c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471 try #0 >>> 14/12/29 08:46:39 INFO ConsumerFetcherManager: >>> [ConsumerFetcherManager-1419860798873] Stopping leader finder thread >>> 14/12/29 08:46:39 INFO ConsumerFetcherManager: >>> [ConsumerFetcherManager-1419860798873] Stopping all fetchers >>> 14/12/29 08:46:39 INFO ConsumerFetcherManager: >>> [ConsumerFetcherManager-1419860798873] All connections stopped >>> 14/12/29 08:46:39 INFO ZookeeperConsumerConnector: >>> [c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471], >>> Cleared all relevant queues for this fetcher >>> 14/12/29 08:46:39 INFO ZookeeperConsumerConnector: >>> [c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471], >>> Cleared the data chunks in all the consumer message iterators >>> 14/12/29 08:46:39 INFO ZookeeperConsumerConnector: >>> [c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471], >>> Committing all offsets after clearing the fetcher queues >>> 14/12/29 08:46:39 INFO ZookeeperConsumerConnector: >>> [c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471], >>> Releasing partition ownership >>> 14/12/29 08:46:39 INFO RangeAssignor: Consumer >>> c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471 >>> rebalancing the following partitions: ArrayBuffer(0) for topic test with >>> consumers: >>> List(c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471-0) >>> 14/12/29 08:46:39 INFO RangeAssignor: >>> c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471-0 >>> attempting to claim partition 0 >>> 14/12/29 08:46:39 INFO ZookeeperConsumerConnector: >>> [c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471], >>> c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471-0 >>> successfully owned partition 0 for topic test >>> 14/12/29 08:46:39 INFO ZookeeperConsumerConnector: >>> [c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471], >>> Consumer c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471 >>> selected partitions : test:0: fetched offset = 221: consumed offset = 221 >>> 14/12/29 08:46:39 INFO ConsumerFetcherManager$LeaderFinderThread: >>> [c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471-leader-finder-thread], >>> Starting >>> 14/12/29 08:46:39 INFO ZookeeperConsumerConnector: >>> [c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471], end >>> rebalancing consumer >>> c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471 try #0 >>> 14/12/29 08:46:39 INFO VerifiableProperties: Verifying properties >>> 14/12/29 08:46:39 INFO VerifiableProperties: Property client.id is >>> overridden to c1 >>> 14/12/29 08:46:39 INFO VerifiableProperties: Property >>> metadata.broker.list is overridden to >>> ip-10-0-1-232.us-west-1.compute.internal:9092 >>> 14/12/29 08:46:39 INFO VerifiableProperties: Property request.timeout.ms >>> is overridden to 30000 >>> 14/12/29 08:46:39 INFO ClientUtils$: Fetching metadata from broker >>> id:0,host:ip-10-0-1-232.us-west-1.compute.internal,port:9092 with >>> correlation id 0 for 1 topic(s) Set(test) >>> 14/12/29 08:46:39 INFO SyncProducer: Connected to >>> ip-10-0-1-232.us-west-1.compute.internal:9092 for producing >>> 14/12/29 08:46:39 INFO SyncProducer: Disconnecting from >>> ip-10-0-1-232.us-west-1.compute.internal:9092 >>> 14/12/29 08:46:39 INFO ConsumerFetcherThread: >>> [ConsumerFetcherThread-c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471-0-0], >>> Starting >>> 14/12/29 08:46:39 INFO ConsumerFetcherManager: >>> [ConsumerFetcherManager-1419860798873] Added fetcher for partitions >>> ArrayBuffer([[test,0], initOffset 221 to broker >>> id:0,host:ip-10-0-1-232.us-west-1.compute.internal,port:9092] ) >>> >>> >>> Suhas Shekar >>> >>> University of California, Los Angeles >>> B.A. Economics, Specialization in Computing 2014 >>> >>> On Mon, Dec 29, 2014 at 12:43 AM, Akhil Das <ak...@sigmoidanalytics.com> >>> wrote: >>> >>>> Now, Add these lines to get ride of those logs >>>> >>>> import org.apache.log4j.Logger >>>> import org.apache.log4j.Level >>>> >>>> Logger.getLogger("org").setLevel(Level.OFF) >>>> Logger.getLogger("akka").setLevel(Level.OFF) >>>> >>>> >>>> Thanks >>>> Best Regards >>>> >>>> On Mon, Dec 29, 2014 at 2:09 PM, Suhas Shekar <suhsheka...@gmail.com> >>>> wrote: >>>> >>>>> Hmmm..soo I added 10000 (10,000) to jssc.awaitTermination , however it >>>>> does not stop. When I am not pushing in any data it gives me this: >>>>> >>>>> 14/12/29 08:35:12 INFO ReceiverTracker: Stream 0 received 0 blocks >>>>> 14/12/29 08:35:12 INFO JobScheduler: Added jobs for time 1419860112000 >>>>> ms >>>>> 14/12/29 08:35:14 INFO ReceiverTracker: Stream 0 received 0 blocks >>>>> 14/12/29 08:35:14 INFO JobScheduler: Added jobs for time 1419860114000 >>>>> ms >>>>> 14/12/29 08:35:16 INFO ReceiverTracker: Stream 0 received 0 blocks >>>>> >>>>> When I am pushing in data it does this: >>>>> >>>>> 14/12/29 08:35:08 WARN BlockManager: Block input-0-1419860108200 >>>>> already exists on this machine; not re-adding it >>>>> 14/12/29 08:35:08 INFO BlockGenerator: Pushed block >>>>> input-0-1419860108200 >>>>> 14/12/29 08:35:09 INFO MemoryStore: ensureFreeSpace(80) called with >>>>> curMem=6515, maxMem=277842493 >>>>> 14/12/29 08:35:09 INFO MemoryStore: Block input-0-1419860109200 stored >>>>> as bytes in memory (estimated size 80.0 B, free 265.0 MB) >>>>> 14/12/29 08:35:09 INFO BlockManagerInfo: Added input-0-1419860109200 >>>>> in memory on ip-10-0-1-230.us-west-1.compute.internal:48171 (size: 80.0 B, >>>>> free: 265.0 MB) >>>>> 14/12/29 08:35:09 INFO BlockManagerMaster: Updated info of block >>>>> input-0-1419860109200 >>>>> 14/12/29 08:35:09 WARN BlockManager: Block input-0-1419860109200 >>>>> already exists on this machine; not re-adding it >>>>> 14/12/29 08:35:09 INFO BlockGenerator: Pushed block >>>>> input-0-1419860109200 >>>>> 14/12/29 08:35:10 INFO ReceiverTracker: Stream 0 received 2 blocks >>>>> >>>>> I know I am close as everytime I enter a message in my kafka producer, >>>>> the console reacts as I showed above...do I have to place the >>>>> awaitTermination somewhere else? Or Is the warning saying there is an >>>>> underlying problem? >>>>> >>>>> Thank you for the help...hopefully I am as close as I think I am! >>>>> >>>>> >>>>> >>>>> Suhas Shekar >>>>> >>>>> University of California, Los Angeles >>>>> B.A. Economics, Specialization in Computing 2014 >>>>> >>>>> On Mon, Dec 29, 2014 at 12:28 AM, Akhil Das < >>>>> ak...@sigmoidanalytics.com> wrote: >>>>> >>>>>> If you want to stop the streaming after 10 seconds, then use >>>>>> ssc.awaitTermination(10000). Make sure you push some data to kafka for >>>>>> the >>>>>> streaming to consume within the 10 seconds. >>>>>> >>>>>> Thanks >>>>>> Best Regards >>>>>> >>>>>> On Mon, Dec 29, 2014 at 1:53 PM, Suhas Shekar <suhsheka...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> I'm very close! So I added that and then I added this: >>>>>>> http://mvnrepository.com/artifact/org.apache.kafka/kafka-clients/0.8.2-beta >>>>>>> >>>>>>> and it seems as though the stream is working as it says Stream 0 >>>>>>> received 1 or 2 blocks as I enter in messages on my kafka producer. >>>>>>> However, the Receiver seems to keep trying every 2 seconds (as I've >>>>>>> included 2000 in my duration in my java app). How can I stop the >>>>>>> Receiver >>>>>>> from consuming messages after 10 seconds and output the word count to >>>>>>> the >>>>>>> console? >>>>>>> >>>>>>> Thanks a lot for all the help! I'm excited to see this word count :) >>>>>>> >>>>>>> Suhas Shekar >>>>>>> >>>>>>> University of California, Los Angeles >>>>>>> B.A. Economics, Specialization in Computing 2014 >>>>>>> >>>>>>> On Mon, Dec 29, 2014 at 12:04 AM, Akhil Das < >>>>>>> ak...@sigmoidanalytics.com> wrote: >>>>>>> >>>>>>>> Add this jar in the dependency >>>>>>>> http://mvnrepository.com/artifact/com.yammer.metrics/metrics-core/2.2.0 >>>>>>>> >>>>>>>> Thanks >>>>>>>> Best Regards >>>>>>>> >>>>>>>> On Mon, Dec 29, 2014 at 1:31 PM, Suhas Shekar < >>>>>>>> suhsheka...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Hello Akhil, >>>>>>>>> >>>>>>>>> I chanced my Kafka dependency to 2.10 (which is the version of >>>>>>>>> kafka that was on 10.0.1.232). I am getting a slightly different >>>>>>>>> error, but >>>>>>>>> at the same place as the previous error (pasted below). >>>>>>>>> >>>>>>>>> FYI, when I make these changes to the pom file, I do "mvn clean >>>>>>>>> package" then cp the new jar files from the repository to my lib of >>>>>>>>> jar >>>>>>>>> files which is a argument in my spark-submit script which is in my >>>>>>>>> original >>>>>>>>> post. >>>>>>>>> >>>>>>>>> Thanks again for the time and help...much appreciated. >>>>>>>>> >>>>>>>>> >>>>>>>>> 14/12/29 07:56:00 INFO ReceiverSupervisorImpl: Starting receiver >>>>>>>>> 14/12/29 07:56:00 INFO KafkaReceiver: Starting Kafka Consumer >>>>>>>>> Stream with group: c1 >>>>>>>>> 14/12/29 07:56:00 INFO KafkaReceiver: Connecting to Zookeeper: >>>>>>>>> 10.0.1.232:2181 >>>>>>>>> 14/12/29 07:56:00 INFO BlockGenerator: Started block pushing thread >>>>>>>>> 14/12/29 07:56:00 INFO VerifiableProperties: Verifying properties >>>>>>>>> 14/12/29 07:56:00 INFO VerifiableProperties: Property group.id is >>>>>>>>> overridden to c1 >>>>>>>>> 14/12/29 07:56:00 INFO VerifiableProperties: Property >>>>>>>>> zookeeper.connect is overridden to 10.0.1.232:2181 >>>>>>>>> 14/12/29 07:56:00 INFO VerifiableProperties: Property >>>>>>>>> zookeeper.connection.timeout.ms is overridden to 10000 >>>>>>>>> 14/12/29 07:56:00 INFO ReceiverSupervisorImpl: Stopping receiver >>>>>>>>> with message: Error starting receiver 0: >>>>>>>>> java.lang.NoClassDefFoundError: >>>>>>>>> com/yammer/metrics/Metrics >>>>>>>>> 14/12/29 07:56:00 INFO ReceiverSupervisorImpl: Called receiver >>>>>>>>> onStop >>>>>>>>> 14/12/29 07:56:00 INFO ReceiverSupervisorImpl: Deregistering >>>>>>>>> receiver 0 >>>>>>>>> 14/12/29 07:56:00 ERROR ReceiverTracker: Deregistered receiver for >>>>>>>>> stream 0: Error starting receiver 0 - java.lang.NoClassDefFoundError: >>>>>>>>> com/yammer/metrics/Metrics >>>>>>>>> at >>>>>>>>> kafka.metrics.KafkaMetricsGroup$class.newMeter(KafkaMetricsGroup.scala:51) >>>>>>>>> at >>>>>>>>> kafka.consumer.ZookeeperConsumerConnector.newMeter(ZookeeperConsumerConnector.scala:83) >>>>>>>>> at >>>>>>>>> kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:107) >>>>>>>>> at >>>>>>>>> kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:142) >>>>>>>>> at >>>>>>>>> kafka.consumer.Consumer$.create(ConsumerConnector.scala:89) >>>>>>>>> at >>>>>>>>> org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:97) >>>>>>>>> at >>>>>>>>> org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121) >>>>>>>>> at >>>>>>>>> org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106) >>>>>>>>> at >>>>>>>>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264) >>>>>>>>> at >>>>>>>>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257) >>>>>>>>> at >>>>>>>>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121) >>>>>>>>> at >>>>>>>>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121) >>>>>>>>> at >>>>>>>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) >>>>>>>>> at org.apache.spark.scheduler.Task.run(Task.scala:54) >>>>>>>>> at >>>>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:180) >>>>>>>>> at >>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) >>>>>>>>> at >>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) >>>>>>>>> at java.lang.Thread.run(Thread.java:722) >>>>>>>>> Caused by: java.lang.ClassNotFoundException: >>>>>>>>> com.yammer.metrics.Metrics >>>>>>>>> at java.net.URLClassLoader$1.run(URLClassLoader.java:366) >>>>>>>>> at java.net.URLClassLoader$1.run(URLClassLoader.java:355) >>>>>>>>> at java.security.AccessController.doPrivileged(Native >>>>>>>>> Method) >>>>>>>>> at >>>>>>>>> java.net.URLClassLoader.findClass(URLClassLoader.java:354) >>>>>>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:423) >>>>>>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:356) >>>>>>>>> ... 18 more >>>>>>>>> >>>>>>>>> >>>>>>>>> Suhas Shekar >>>>>>>>> >>>>>>>>> University of California, Los Angeles >>>>>>>>> B.A. Economics, Specialization in Computing 2014 >>>>>>>>> >>>>>>>>> On Sun, Dec 28, 2014 at 11:52 PM, Suhas Shekar < >>>>>>>>> suhsheka...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> I made both versions 1.1.1 and I got the same error. I then tried >>>>>>>>>> making both 1.1.0 as that is the version of my Spark Core, but I got >>>>>>>>>> the >>>>>>>>>> same error. >>>>>>>>>> >>>>>>>>>> I noticed my Kafka dependency is for scala 2.9.2, while my spark >>>>>>>>>> streaming kafka dependency is 2.10.x...I will try changing that >>>>>>>>>> next, but >>>>>>>>>> don't think that will solve the error as I dont think the >>>>>>>>>> application had >>>>>>>>>> got to level yet. >>>>>>>>>> >>>>>>>>>> Please let me know of any possible next steps. >>>>>>>>>> >>>>>>>>>> Thank you again for the time and the help! >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Suhas Shekar >>>>>>>>>> >>>>>>>>>> University of California, Los Angeles >>>>>>>>>> B.A. Economics, Specialization in Computing 2014 >>>>>>>>>> >>>>>>>>>> On Sun, Dec 28, 2014 at 11:31 PM, Akhil Das < >>>>>>>>>> ak...@sigmoidanalytics.com> wrote: >>>>>>>>>> >>>>>>>>>>> Just looked at the pom file that you are using, why are you >>>>>>>>>>> having different versions in it? >>>>>>>>>>> >>>>>>>>>>> <dependency> >>>>>>>>>>> <groupId>org.apache.spark</groupId> >>>>>>>>>>> <artifactId>spark-streaming-kafka_2.10</artifactId> >>>>>>>>>>> <version>*1.1.1*</version> >>>>>>>>>>> </dependency> >>>>>>>>>>> <dependency> >>>>>>>>>>> <groupId>org.apache.spark</groupId> >>>>>>>>>>> <artifactId>spark-streaming_2.10</artifactId> >>>>>>>>>>> <version>*1.0.2*</version> >>>>>>>>>>> </dependency> >>>>>>>>>>> >>>>>>>>>>> can you make both the versions the same? >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Thanks >>>>>>>>>>> Best Regards >>>>>>>>>>> >>>>>>>>>>> On Mon, Dec 29, 2014 at 12:44 PM, Suhas Shekar < >>>>>>>>>>> suhsheka...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> 1) Could you please clarify on what you mean by checking the >>>>>>>>>>>> Scala version is correct? In my pom.xml file it is 2.10.4 (which >>>>>>>>>>>> is the >>>>>>>>>>>> same as when I start spark-shell). >>>>>>>>>>>> >>>>>>>>>>>> 2) The spark master URL is definitely correct as I have run >>>>>>>>>>>> other apps with the same script that use Spark (like a word count >>>>>>>>>>>> with a >>>>>>>>>>>> local file) >>>>>>>>>>>> >>>>>>>>>>>> Thank you for the help! >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Suhas Shekar >>>>>>>>>>>> >>>>>>>>>>>> University of California, Los Angeles >>>>>>>>>>>> B.A. Economics, Specialization in Computing 2014 >>>>>>>>>>>> >>>>>>>>>>>> On Sun, Dec 28, 2014 at 11:04 PM, Akhil Das < >>>>>>>>>>>> ak...@sigmoidanalytics.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Make sure you verify the following: >>>>>>>>>>>>> >>>>>>>>>>>>> - Scala version : I think the correct version would be 2.10.x >>>>>>>>>>>>> - SparkMasterURL: Be sure that you copied the one displayed on >>>>>>>>>>>>> the webui's top left corner (running on port 8080) >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks >>>>>>>>>>>>> Best Regards >>>>>>>>>>>>> >>>>>>>>>>>>> On Mon, Dec 29, 2014 at 12:26 PM, suhshekar52 < >>>>>>>>>>>>> suhsheka...@gmail.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hello Everyone, >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thank you for the time and the help :). >>>>>>>>>>>>>> >>>>>>>>>>>>>> My goal here is to get this program working: >>>>>>>>>>>>>> >>>>>>>>>>>>>> https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java >>>>>>>>>>>>>> >>>>>>>>>>>>>> The only lines I do not have from the example are lines >>>>>>>>>>>>>> 62-67. pom.xml >>>>>>>>>>>>>> < >>>>>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n20879/pom.xml >>>>>>>>>>>>>> > >>>>>>>>>>>>>> >>>>>>>>>>>>>> Background: Have ec2 instances running. The standalone spark >>>>>>>>>>>>>> is running on >>>>>>>>>>>>>> top of Cloudera Manager 5.2. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Pom file is attached and the same for both clusters. >>>>>>>>>>>>>> pom.xml >>>>>>>>>>>>>> < >>>>>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n20879/pom.xml >>>>>>>>>>>>>> > >>>>>>>>>>>>>> >>>>>>>>>>>>>> Here are a few different approaches I have taken and the >>>>>>>>>>>>>> issues I run into: >>>>>>>>>>>>>> >>>>>>>>>>>>>> *Standalone Mode* >>>>>>>>>>>>>> >>>>>>>>>>>>>> 1) Use spark-submit script to run: >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> /opt/cloudera/parcels/CDH-5.2.1-1.cdh5.2.1.p0.12/lib/spark/bin/spark-submit >>>>>>>>>>>>>> --class SimpleApp --master spark://10.0.1.230:7077 --jars >>>>>>>>>>>>>> $(echo >>>>>>>>>>>>>> /home/ec2-user/sparkApps/SimpleApp/lib/*.jar | tr ' ' ',') >>>>>>>>>>>>>> >>>>>>>>>>>>>> /home/ec2-user/sparkApps/SimpleApp/target/simple-project-1.0.jar >>>>>>>>>>>>>> >>>>>>>>>>>>>> Interesting...I was getting an error like this: Initial job >>>>>>>>>>>>>> has not accepted >>>>>>>>>>>>>> any resources; check your cluster UI >>>>>>>>>>>>>> >>>>>>>>>>>>>> Now, when I run, it prints out the 3 Hello world statements >>>>>>>>>>>>>> in my code: >>>>>>>>>>>>>> KafkaJavaConsumer.txt >>>>>>>>>>>>>> < >>>>>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n20879/KafkaJavaConsumer.txt >>>>>>>>>>>>>> > >>>>>>>>>>>>>> >>>>>>>>>>>>>> and then it seems to try to start the Kafka Stream, but fails: >>>>>>>>>>>>>> >>>>>>>>>>>>>> 14/12/29 05:58:05 INFO KafkaReceiver: Starting Kafka Consumer >>>>>>>>>>>>>> Stream with >>>>>>>>>>>>>> group: c1 >>>>>>>>>>>>>> 14/12/29 05:58:05 INFO ReceiverTracker: Registered receiver >>>>>>>>>>>>>> for stream 0 >>>>>>>>>>>>>> from akka://sparkDriver >>>>>>>>>>>>>> 14/12/29 05:58:05 INFO KafkaReceiver: Connecting to Zookeeper: >>>>>>>>>>>>>> 10.0.1.232:2181 >>>>>>>>>>>>>> 14/12/29 05:58:05 INFO BlockGenerator: Started block pushing >>>>>>>>>>>>>> thread >>>>>>>>>>>>>> 14/12/29 05:58:05 INFO ReceiverSupervisorImpl: Stopping >>>>>>>>>>>>>> receiver with >>>>>>>>>>>>>> message: Error starting receiver 0: >>>>>>>>>>>>>> java.lang.NoClassDefFoundError: >>>>>>>>>>>>>> scala/reflect/ClassManifest >>>>>>>>>>>>>> 14/12/29 05:58:05 INFO ReceiverSupervisorImpl: Called >>>>>>>>>>>>>> receiver onStop >>>>>>>>>>>>>> 14/12/29 05:58:05 INFO ReceiverSupervisorImpl: Deregistering >>>>>>>>>>>>>> receiver 0 >>>>>>>>>>>>>> ^C14/12/29 05:58:05 ERROR ReceiverTracker: Deregistered >>>>>>>>>>>>>> receiver for stream >>>>>>>>>>>>>> 0: Error starting receiver 0 - java.lang.NoClassDefFoundError: >>>>>>>>>>>>>> scala/reflect/ClassManifest >>>>>>>>>>>>>> at >>>>>>>>>>>>>> kafka.utils.Log4jController$.<init>(Log4jController.scala:29) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> kafka.utils.Log4jController$.<clinit>(Log4jController.scala) >>>>>>>>>>>>>> at kafka.utils.Logging$class.$init$(Logging.scala:29) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> >>>>>>>>>>>>>> kafka.utils.VerifiableProperties.<init>(VerifiableProperties.scala:26) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> kafka.consumer.ConsumerConfig.<init>(ConsumerConfig.scala:94) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> >>>>>>>>>>>>>> org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:96) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> >>>>>>>>>>>>>> org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> >>>>>>>>>>>>>> org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> >>>>>>>>>>>>>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> >>>>>>>>>>>>>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> >>>>>>>>>>>>>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> >>>>>>>>>>>>>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> >>>>>>>>>>>>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) >>>>>>>>>>>>>> at org.apache.spark.scheduler.Task.run(Task.scala:54) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> >>>>>>>>>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:180) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> >>>>>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> >>>>>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) >>>>>>>>>>>>>> at java.lang.Thread.run(Thread.java:722) >>>>>>>>>>>>>> Caused by: java.lang.ClassNotFoundException: >>>>>>>>>>>>>> scala.reflect.ClassManifest >>>>>>>>>>>>>> at >>>>>>>>>>>>>> java.net.URLClassLoader$1.run(URLClassLoader.java:366) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> java.net.URLClassLoader$1.run(URLClassLoader.java:355) >>>>>>>>>>>>>> at java.security.AccessController.doPrivileged(Native >>>>>>>>>>>>>> Method) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> java.net.URLClassLoader.findClass(URLClassLoader.java:354) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> java.lang.ClassLoader.loadClass(ClassLoader.java:423) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> java.lang.ClassLoader.loadClass(ClassLoader.java:356) >>>>>>>>>>>>>> ... 18 more >>>>>>>>>>>>>> >>>>>>>>>>>>>> 14/12/29 05:58:05 INFO ReceiverSupervisorImpl: Stopped >>>>>>>>>>>>>> receiver 0 >>>>>>>>>>>>>> 14/12/29 05:58:05 INFO BlockGenerator: Stopping BlockGenerator >>>>>>>>>>>>>> >>>>>>>>>>>>>> I ran into a couple other Class not found errors, and was >>>>>>>>>>>>>> able to solve them >>>>>>>>>>>>>> by adding dependencies on the pom file, but have not found >>>>>>>>>>>>>> such a solution >>>>>>>>>>>>>> to this error. >>>>>>>>>>>>>> >>>>>>>>>>>>>> On the Kafka side of things, I am simply typing in messages >>>>>>>>>>>>>> as soon as I >>>>>>>>>>>>>> start the Java app on another console. Is this okay? >>>>>>>>>>>>>> >>>>>>>>>>>>>> I have not set up an advertised host on the kafka side as I >>>>>>>>>>>>>> was able to >>>>>>>>>>>>>> still receive messages from other consoles by setting up a >>>>>>>>>>>>>> consumer to >>>>>>>>>>>>>> listen to the private ip:port. Is this okay? >>>>>>>>>>>>>> >>>>>>>>>>>>>> Lastly, is there command, like --from-beginning for a >>>>>>>>>>>>>> consumer in the java >>>>>>>>>>>>>> application to get messages from the beginning? >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks a lot for the help and happy holidays! >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> -- >>>>>>>>>>>>>> View this message in context: >>>>>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Setting-up-Simple-Kafka-Consumer-via-Spark-Java-app-tp20879.html >>>>>>>>>>>>>> Sent from the Apache Spark User List mailing list archive at >>>>>>>>>>>>>> Nabble.com. >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> --------------------------------------------------------------------- >>>>>>>>>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>>>>>>>>>>>> For additional commands, e-mail: user-h...@spark.apache.org >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >