ᐧ
Hi all,

Below is the output that I am getting. My Kinesis stream has 1 shard, and
my Spark cluster on EC2 has 2 slaves (I think that's fine?).
I should mention that my Kinesis producer is written in Python where I
followed the example
http://blogs.aws.amazon.com/bigdata/post/Tx2Z24D4T99AN35/Snakes-in-the-Stream-Feeding-and-Eating-Amazon-Kinesis-Streams-with-Python

I also wrote a Python consumer, again using the example at the above link,
that works fine. But I am unable to display output from my Spark consumer.

I'd appreciate any help.

Thanks,
Vadim

-------------------------------------------

Time: 1428254090000 ms

-------------------------------------------


15/04/05 17:14:50 INFO scheduler.JobScheduler: Finished job streaming job
1428254090000 ms.0 from job set of time 1428254090000 ms

15/04/05 17:14:50 INFO scheduler.JobScheduler: Total delay: 0.099 s for
time 1428254090000 ms (execution: 0.090 s)

15/04/05 17:14:50 INFO rdd.ShuffledRDD: Removing RDD 63 from persistence
list

15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 63

15/04/05 17:14:50 INFO rdd.MapPartitionsRDD: Removing RDD 62 from
persistence list

15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 62

15/04/05 17:14:50 INFO rdd.MapPartitionsRDD: Removing RDD 61 from
persistence list

15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 61

15/04/05 17:14:50 INFO rdd.UnionRDD: Removing RDD 60 from persistence list

15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 60

15/04/05 17:14:50 INFO rdd.BlockRDD: Removing RDD 59 from persistence list

15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 59

15/04/05 17:14:50 INFO dstream.PluggableInputDStream: Removing blocks of
RDD BlockRDD[59] at createStream at MyConsumer.scala:56 of time
1428254090000 ms

***********

15/04/05 17:14:50 INFO scheduler.ReceivedBlockTracker: Deleting batches
ArrayBuffer(1428254070000 ms)
On Sat, Apr 4, 2015 at 3:13 PM, Vadim Bichutskiy <vadim.bichuts...@gmail.com
> wrote:

> Hi all,
>
> More good news! I was able to utilize mergeStrategy to assembly my Kinesis
> consumer into an "uber jar"
>
> Here's what I added to* build.sbt:*
>
> *mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>*
> *  {*
> *  case PathList("com", "esotericsoftware", "minlog", xs @ _*) =>
> MergeStrategy.first*
> *  case PathList("com", "google", "common", "base", xs @ _*) =>
> MergeStrategy.first*
> *  case PathList("org", "apache", "commons", xs @ _*) =>
> MergeStrategy.last*
> *  case PathList("org", "apache", "hadoop", xs @ _*) =>
> MergeStrategy.first*
> *  case PathList("org", "apache", "spark", "unused", xs @ _*) =>
> MergeStrategy.first*
> *        case x => old(x)*
> *  }*
> *}*
>
> Everything appears to be working fine. Right now my producer is pushing
> simple strings through Kinesis,
> which my consumer is trying to print (using Spark's print() method for
> now).
>
> However, instead of displaying my strings, I get the following:
>
> *15/04/04 18:57:32 INFO scheduler.ReceivedBlockTracker: Deleting batches
> ArrayBuffer(1428173848000 ms)*
>
> Any idea on what might be going on?
>
> Thanks,
>
> Vadim
>
> Here's my consumer code (adapted from the WordCount example):
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *private object MyConsumer extends Logging {  def main(args:
> Array[String]) {    /* Check that all required args were passed in. */
> if (args.length < 2) {      System.err.println(        """          |Usage:
> KinesisWordCount <stream-name> <endpoint-url>          |    <stream-name>
> is the name of the Kinesis stream          |    <endpoint-url> is the
> endpoint of the Kinesis service          |                   (e.g.
> https://kinesis.us-east-1.amazonaws.com
> <https://kinesis.us-east-1.amazonaws.com>)        """.stripMargin)
> System.exit(1)    }    /* Populate the appropriate variables from the given
> args */    val Array(streamName, endpointUrl) = args    /* Determine the
> number of shards from the stream */    val kinesisClient = new
> AmazonKinesisClient(new DefaultAWSCredentialsProviderChain())
> kinesisClient.setEndpoint(endpointUrl)    val numShards =
> kinesisClient.describeStream(streamName).getStreamDescription().getShards()
>     .size()    System.out.println("Num shards: " + numShards)    /* In this
> example, we're going to create 1 Kinesis Worker/Receiver/DStream for each
> shard. */    val numStreams = numShards    /* Setup the and SparkConfig and
> StreamingContext */    /* Spark Streaming batch interval */    val
> batchInterval = Milliseconds(2000)    val sparkConfig = new
> SparkConf().setAppName("MyConsumer")    val ssc = new
> StreamingContext(sparkConfig, batchInterval)    /* Kinesis checkpoint
> interval.  Same as batchInterval for this example. */    val
> kinesisCheckpointInterval = batchInterval    /* Create the same number of
> Kinesis DStreams/Receivers as Kinesis stream's shards */    val
> kinesisStreams = (0 until numStreams).map { i =>
> KinesisUtils.createStream(ssc, streamName, endpointUrl,
> kinesisCheckpointInterval,          InitialPositionInStream.LATEST,
> StorageLevel.MEMORY_AND_DISK_2)    }    /* Union all the streams */    val
> unionStreams  = ssc.union(kinesisStreams).map(byteArray => new
> String(byteArray))    unionStreams.print()    ssc.start()
> ssc.awaitTermination()  }}*
>
>
> On Fri, Apr 3, 2015 at 3:48 PM, Tathagata Das <t...@databricks.com> wrote:
>
>> Just remove "provided" for spark-streaming-kinesis-asl
>>
>> libraryDependencies += "org.apache.spark" %%
>> "spark-streaming-kinesis-asl" % "1.3.0"
>>
>> On Fri, Apr 3, 2015 at 12:45 PM, Vadim Bichutskiy <
>> vadim.bichuts...@gmail.com> wrote:
>>
>>> Thanks. So how do I fix it?
>>>
>>> On Fri, Apr 3, 2015 at 3:43 PM, Kelly, Jonathan <jonat...@amazon.com>
>>> wrote:
>>>
>>>>   spark-streaming-kinesis-asl is not part of the Spark distribution on
>>>> your cluster, so you cannot have it be just a "provided" dependency.  This
>>>> is also why the KCL and its dependencies were not included in the assembly
>>>> (but yes, they should be).
>>>>
>>>>
>>>>  ~ Jonathan Kelly
>>>>
>>>>   From: Vadim Bichutskiy <vadim.bichuts...@gmail.com>
>>>> Date: Friday, April 3, 2015 at 12:26 PM
>>>> To: Jonathan Kelly <jonat...@amazon.com>
>>>> Cc: "user@spark.apache.org" <user@spark.apache.org>
>>>> Subject: Re: Spark + Kinesis
>>>>
>>>>   Hi all,
>>>>
>>>>  Good news! I was able to create a Kinesis consumer and assemble it
>>>> into an "uber jar" following
>>>> http://spark.apache.org/docs/latest/streaming-kinesis-integration.html
>>>> and example
>>>> https://github.com/apache/spark/blob/master/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala
>>>> .
>>>>
>>>>  However when I try to spark-submit it I get the following exception:
>>>>
>>>>  *Exception in thread "main" java.lang.NoClassDefFoundError:
>>>> com/amazonaws/auth/AWSCredentialsProvider*
>>>>
>>>>  Do I need to include KCL dependency in *build.sbt*, here's what it
>>>> looks like currently:
>>>>
>>>>  import AssemblyKeys._
>>>> name := "Kinesis Consumer"
>>>> version := "1.0"
>>>> organization := "com.myconsumer"
>>>> scalaVersion := "2.11.5"
>>>>
>>>>  libraryDependencies += "org.apache.spark" %% "spark-core" % "1.3.0" %
>>>> "provided"
>>>> libraryDependencies += "org.apache.spark" %% "spark-streaming" %
>>>> "1.3.0" % "provided"
>>>> libraryDependencies += "org.apache.spark" %%
>>>> "spark-streaming-kinesis-asl" % "1.3.0" % "provided"
>>>>
>>>>  assemblySettings
>>>> jarName in assembly :=  "consumer-assembly.jar"
>>>> assemblyOption in assembly := (assemblyOption in
>>>> assembly).value.copy(includeScala=false)
>>>>
>>>>  Any help appreciated.
>>>>
>>>>  Thanks,
>>>> Vadim
>>>>
>>>> On Thu, Apr 2, 2015 at 1:15 PM, Kelly, Jonathan <jonat...@amazon.com>
>>>> wrote:
>>>>
>>>>>  It looks like you're attempting to mix Scala versions, so that's
>>>>> going to cause some problems.  If you really want to use Scala 2.11.5, you
>>>>> must also use Spark package versions built for Scala 2.11 rather than
>>>>> 2.10.  Anyway, that's not quite the correct way to specify Scala
>>>>> dependencies in build.sbt.  Instead of placing the Scala version after the
>>>>> artifactId (like "spark-core_2.10"), what you actually want is to use just
>>>>> "spark-core" with two percent signs before it.  Using two percent signs
>>>>> will make it use the version of Scala that matches your declared
>>>>> scalaVersion.  For example:
>>>>>
>>>>>  libraryDependencies += "org.apache.spark" %% "spark-core" % "1.3.0"
>>>>> % "provided"
>>>>>
>>>>>  libraryDependencies += "org.apache.spark" %% "spark-streaming" %
>>>>> "1.3.0" % "provided"
>>>>>
>>>>>  libraryDependencies += "org.apache.spark" %%
>>>>> "spark-streaming-kinesis-asl" % "1.3.0"
>>>>>
>>>>>  I think that may get you a little closer, though I think you're
>>>>> probably going to run into the same problems I ran into in this thread:
>>>>> https://www.mail-archive.com/user@spark.apache.org/msg23891.html  I
>>>>> never really got an answer for that, and I temporarily moved on to other
>>>>> things for now.
>>>>>
>>>>>
>>>>>  ~ Jonathan Kelly
>>>>>
>>>>>   From: 'Vadim Bichutskiy' <vadim.bichuts...@gmail.com>
>>>>> Date: Thursday, April 2, 2015 at 9:53 AM
>>>>> To: "user@spark.apache.org" <user@spark.apache.org>
>>>>> Subject: Spark + Kinesis
>>>>>
>>>>>   Hi all,
>>>>>
>>>>>  I am trying to write an Amazon Kinesis consumer Scala app that
>>>>> processes data in the
>>>>> Kinesis stream. Is this the correct way to specify *build.sbt*:
>>>>>
>>>>>  -------
>>>>> *import AssemblyKeys._*
>>>>> *name := "Kinesis Consumer"*
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> *version := "1.0" organization := "com.myconsumer" scalaVersion :=
>>>>> "2.11.5" libraryDependencies ++= Seq("org.apache.spark" % 
>>>>> "spark-core_2.10"
>>>>> % "1.3.0" % "provided", "org.apache.spark" % "spark-streaming_2.10" %
>>>>> "1.3.0" "org.apache.spark" % "spark-streaming-kinesis-asl_2.10" % 
>>>>> "1.3.0")*
>>>>>
>>>>>
>>>>>
>>>>> * assemblySettings jarName in assembly :=  "consumer-assembly.jar"
>>>>> assemblyOption in assembly := (assemblyOption in
>>>>> assembly).value.copy(includeScala=false)*
>>>>> --------
>>>>>
>>>>>  In *project/assembly.sbt* I have only the following line:
>>>>>
>>>>>  *addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.13.0")*
>>>>>
>>>>>  I am using sbt 0.13.7. I adapted Example 7.7 in the Learning Spark
>>>>> book.
>>>>>
>>>>>  Thanks,
>>>>> Vadim
>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to