ᐧ Hi all, Below is the output that I am getting. My Kinesis stream has 1 shard, and my Spark cluster on EC2 has 2 slaves (I think that's fine?). I should mention that my Kinesis producer is written in Python where I followed the example http://blogs.aws.amazon.com/bigdata/post/Tx2Z24D4T99AN35/Snakes-in-the-Stream-Feeding-and-Eating-Amazon-Kinesis-Streams-with-Python
I also wrote a Python consumer, again using the example at the above link, that works fine. But I am unable to display output from my Spark consumer. I'd appreciate any help. Thanks, Vadim ------------------------------------------- Time: 1428254090000 ms ------------------------------------------- 15/04/05 17:14:50 INFO scheduler.JobScheduler: Finished job streaming job 1428254090000 ms.0 from job set of time 1428254090000 ms 15/04/05 17:14:50 INFO scheduler.JobScheduler: Total delay: 0.099 s for time 1428254090000 ms (execution: 0.090 s) 15/04/05 17:14:50 INFO rdd.ShuffledRDD: Removing RDD 63 from persistence list 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 63 15/04/05 17:14:50 INFO rdd.MapPartitionsRDD: Removing RDD 62 from persistence list 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 62 15/04/05 17:14:50 INFO rdd.MapPartitionsRDD: Removing RDD 61 from persistence list 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 61 15/04/05 17:14:50 INFO rdd.UnionRDD: Removing RDD 60 from persistence list 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 60 15/04/05 17:14:50 INFO rdd.BlockRDD: Removing RDD 59 from persistence list 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 59 15/04/05 17:14:50 INFO dstream.PluggableInputDStream: Removing blocks of RDD BlockRDD[59] at createStream at MyConsumer.scala:56 of time 1428254090000 ms *********** 15/04/05 17:14:50 INFO scheduler.ReceivedBlockTracker: Deleting batches ArrayBuffer(1428254070000 ms) On Sat, Apr 4, 2015 at 3:13 PM, Vadim Bichutskiy <vadim.bichuts...@gmail.com > wrote: > Hi all, > > More good news! I was able to utilize mergeStrategy to assembly my Kinesis > consumer into an "uber jar" > > Here's what I added to* build.sbt:* > > *mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>* > * {* > * case PathList("com", "esotericsoftware", "minlog", xs @ _*) => > MergeStrategy.first* > * case PathList("com", "google", "common", "base", xs @ _*) => > MergeStrategy.first* > * case PathList("org", "apache", "commons", xs @ _*) => > MergeStrategy.last* > * case PathList("org", "apache", "hadoop", xs @ _*) => > MergeStrategy.first* > * case PathList("org", "apache", "spark", "unused", xs @ _*) => > MergeStrategy.first* > * case x => old(x)* > * }* > *}* > > Everything appears to be working fine. Right now my producer is pushing > simple strings through Kinesis, > which my consumer is trying to print (using Spark's print() method for > now). > > However, instead of displaying my strings, I get the following: > > *15/04/04 18:57:32 INFO scheduler.ReceivedBlockTracker: Deleting batches > ArrayBuffer(1428173848000 ms)* > > Any idea on what might be going on? > > Thanks, > > Vadim > > Here's my consumer code (adapted from the WordCount example): > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > *private object MyConsumer extends Logging { def main(args: > Array[String]) { /* Check that all required args were passed in. */ > if (args.length < 2) { System.err.println( """ |Usage: > KinesisWordCount <stream-name> <endpoint-url> | <stream-name> > is the name of the Kinesis stream | <endpoint-url> is the > endpoint of the Kinesis service | (e.g. > https://kinesis.us-east-1.amazonaws.com > <https://kinesis.us-east-1.amazonaws.com>) """.stripMargin) > System.exit(1) } /* Populate the appropriate variables from the given > args */ val Array(streamName, endpointUrl) = args /* Determine the > number of shards from the stream */ val kinesisClient = new > AmazonKinesisClient(new DefaultAWSCredentialsProviderChain()) > kinesisClient.setEndpoint(endpointUrl) val numShards = > kinesisClient.describeStream(streamName).getStreamDescription().getShards() > .size() System.out.println("Num shards: " + numShards) /* In this > example, we're going to create 1 Kinesis Worker/Receiver/DStream for each > shard. */ val numStreams = numShards /* Setup the and SparkConfig and > StreamingContext */ /* Spark Streaming batch interval */ val > batchInterval = Milliseconds(2000) val sparkConfig = new > SparkConf().setAppName("MyConsumer") val ssc = new > StreamingContext(sparkConfig, batchInterval) /* Kinesis checkpoint > interval. Same as batchInterval for this example. */ val > kinesisCheckpointInterval = batchInterval /* Create the same number of > Kinesis DStreams/Receivers as Kinesis stream's shards */ val > kinesisStreams = (0 until numStreams).map { i => > KinesisUtils.createStream(ssc, streamName, endpointUrl, > kinesisCheckpointInterval, InitialPositionInStream.LATEST, > StorageLevel.MEMORY_AND_DISK_2) } /* Union all the streams */ val > unionStreams = ssc.union(kinesisStreams).map(byteArray => new > String(byteArray)) unionStreams.print() ssc.start() > ssc.awaitTermination() }}* > > > On Fri, Apr 3, 2015 at 3:48 PM, Tathagata Das <t...@databricks.com> wrote: > >> Just remove "provided" for spark-streaming-kinesis-asl >> >> libraryDependencies += "org.apache.spark" %% >> "spark-streaming-kinesis-asl" % "1.3.0" >> >> On Fri, Apr 3, 2015 at 12:45 PM, Vadim Bichutskiy < >> vadim.bichuts...@gmail.com> wrote: >> >>> Thanks. So how do I fix it? >>> >>> On Fri, Apr 3, 2015 at 3:43 PM, Kelly, Jonathan <jonat...@amazon.com> >>> wrote: >>> >>>> spark-streaming-kinesis-asl is not part of the Spark distribution on >>>> your cluster, so you cannot have it be just a "provided" dependency. This >>>> is also why the KCL and its dependencies were not included in the assembly >>>> (but yes, they should be). >>>> >>>> >>>> ~ Jonathan Kelly >>>> >>>> From: Vadim Bichutskiy <vadim.bichuts...@gmail.com> >>>> Date: Friday, April 3, 2015 at 12:26 PM >>>> To: Jonathan Kelly <jonat...@amazon.com> >>>> Cc: "user@spark.apache.org" <user@spark.apache.org> >>>> Subject: Re: Spark + Kinesis >>>> >>>> Hi all, >>>> >>>> Good news! I was able to create a Kinesis consumer and assemble it >>>> into an "uber jar" following >>>> http://spark.apache.org/docs/latest/streaming-kinesis-integration.html >>>> and example >>>> https://github.com/apache/spark/blob/master/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala >>>> . >>>> >>>> However when I try to spark-submit it I get the following exception: >>>> >>>> *Exception in thread "main" java.lang.NoClassDefFoundError: >>>> com/amazonaws/auth/AWSCredentialsProvider* >>>> >>>> Do I need to include KCL dependency in *build.sbt*, here's what it >>>> looks like currently: >>>> >>>> import AssemblyKeys._ >>>> name := "Kinesis Consumer" >>>> version := "1.0" >>>> organization := "com.myconsumer" >>>> scalaVersion := "2.11.5" >>>> >>>> libraryDependencies += "org.apache.spark" %% "spark-core" % "1.3.0" % >>>> "provided" >>>> libraryDependencies += "org.apache.spark" %% "spark-streaming" % >>>> "1.3.0" % "provided" >>>> libraryDependencies += "org.apache.spark" %% >>>> "spark-streaming-kinesis-asl" % "1.3.0" % "provided" >>>> >>>> assemblySettings >>>> jarName in assembly := "consumer-assembly.jar" >>>> assemblyOption in assembly := (assemblyOption in >>>> assembly).value.copy(includeScala=false) >>>> >>>> Any help appreciated. >>>> >>>> Thanks, >>>> Vadim >>>> >>>> On Thu, Apr 2, 2015 at 1:15 PM, Kelly, Jonathan <jonat...@amazon.com> >>>> wrote: >>>> >>>>> It looks like you're attempting to mix Scala versions, so that's >>>>> going to cause some problems. If you really want to use Scala 2.11.5, you >>>>> must also use Spark package versions built for Scala 2.11 rather than >>>>> 2.10. Anyway, that's not quite the correct way to specify Scala >>>>> dependencies in build.sbt. Instead of placing the Scala version after the >>>>> artifactId (like "spark-core_2.10"), what you actually want is to use just >>>>> "spark-core" with two percent signs before it. Using two percent signs >>>>> will make it use the version of Scala that matches your declared >>>>> scalaVersion. For example: >>>>> >>>>> libraryDependencies += "org.apache.spark" %% "spark-core" % "1.3.0" >>>>> % "provided" >>>>> >>>>> libraryDependencies += "org.apache.spark" %% "spark-streaming" % >>>>> "1.3.0" % "provided" >>>>> >>>>> libraryDependencies += "org.apache.spark" %% >>>>> "spark-streaming-kinesis-asl" % "1.3.0" >>>>> >>>>> I think that may get you a little closer, though I think you're >>>>> probably going to run into the same problems I ran into in this thread: >>>>> https://www.mail-archive.com/user@spark.apache.org/msg23891.html I >>>>> never really got an answer for that, and I temporarily moved on to other >>>>> things for now. >>>>> >>>>> >>>>> ~ Jonathan Kelly >>>>> >>>>> From: 'Vadim Bichutskiy' <vadim.bichuts...@gmail.com> >>>>> Date: Thursday, April 2, 2015 at 9:53 AM >>>>> To: "user@spark.apache.org" <user@spark.apache.org> >>>>> Subject: Spark + Kinesis >>>>> >>>>> Hi all, >>>>> >>>>> I am trying to write an Amazon Kinesis consumer Scala app that >>>>> processes data in the >>>>> Kinesis stream. Is this the correct way to specify *build.sbt*: >>>>> >>>>> ------- >>>>> *import AssemblyKeys._* >>>>> *name := "Kinesis Consumer"* >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> *version := "1.0" organization := "com.myconsumer" scalaVersion := >>>>> "2.11.5" libraryDependencies ++= Seq("org.apache.spark" % >>>>> "spark-core_2.10" >>>>> % "1.3.0" % "provided", "org.apache.spark" % "spark-streaming_2.10" % >>>>> "1.3.0" "org.apache.spark" % "spark-streaming-kinesis-asl_2.10" % >>>>> "1.3.0")* >>>>> >>>>> >>>>> >>>>> * assemblySettings jarName in assembly := "consumer-assembly.jar" >>>>> assemblyOption in assembly := (assemblyOption in >>>>> assembly).value.copy(includeScala=false)* >>>>> -------- >>>>> >>>>> In *project/assembly.sbt* I have only the following line: >>>>> >>>>> *addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.13.0")* >>>>> >>>>> I am using sbt 0.13.7. I adapted Example 7.7 in the Learning Spark >>>>> book. >>>>> >>>>> Thanks, >>>>> Vadim >>>>> >>>>> >>>> >>> >> >