Re: Spark and KafkaUtils

2016-03-15 Thread Vinti Maheshwari
Hi Cody,

I wanted to update my build.sbt which was working with kafka without giving
any error, it may help other user if they face similar issue.

name := "NetworkStreaming"

version := "1.0"

scalaVersion:= "2.10.5"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-streaming-kafka" % "1.6.0", // kafka
  "org.apache.spark" %% "spark-mllib" % "1.6.0",
  "org.codehaus.groovy" % "groovy-all" % "1.8.6",
  "org.apache.hbase" % "hbase-server" % "1.1.2",
  "org.apache.spark" %% "spark-sql"  % "1.6.0",
  "org.apache.hbase" % "hbase-common" % "1.1.2"
excludeAll(ExclusionRule(organization = "javax.servlet",
name="javax.servlet-api"), ExclusionRule(organization =
"org.mortbay.jetty", name="jetty"), ExclusionRule(organization =
"org.mortbay.jetty", name="servlet-api-2.5")),
  "org.apache.hbase" % "hbase-client" % "1.1.2"
excludeAll(ExclusionRule(organization = "javax.servlet",
name="javax.servlet-api"), ExclusionRule(organization =
"org.mortbay.jetty", name="jetty"), ExclusionRule(organization =
"org.mortbay.jetty", name="servlet-api-2.5"))

assemblyMergeStrategy in assembly := {
  case m if m.toLowerCase.endsWith("")  =>
  case m if m.toLowerCase.matches("meta-inf.*\\.sf$")  =>
  case ""  =>
  case m if m.toLowerCase.startsWith("meta-inf/services/") =>
  case "reference.conf"=>
  case _   =>

Thanks & Regards,


On Wed, Feb 24, 2016 at 1:34 PM, Cody Koeninger <> wrote:

> Looks like conflicting versions of the same dependency.
> If you look at the mergeStrategy section of the build file I posted, you
> can add additional lines for whatever dependencies are causing issues, e.g.
>   case PathList("org", "jboss", "netty", _*) => MergeStrategy.first
> On Wed, Feb 24, 2016 at 2:55 PM, Vinti Maheshwari <>
> wrote:
>> Thanks much Cody, I added assembly.sbt and modified build.sbt with ivy
>> bug related content.
>> It's giving lots of errors related to ivy:
>> *[error]
>> /Users/vintim/.ivy2/cache/javax.activation/activation/jars/activation-1.1.jar:javax/activation/ActivationDataFlavor.class*
>> Here is complete error log:
>> Regards,
>> ~Vinti
>> On Wed, Feb 24, 2016 at 12:16 PM, Cody Koeninger <>
>> wrote:
>>> Ok, that build file I linked earlier has a minimal example of use.  just
>>> running 'sbt assembly' given a similar build file should build a jar with
>>> all the dependencies.
>>> On Wed, Feb 24, 2016 at 1:50 PM, Vinti Maheshwari <>
>>> wrote:
>>>> I am not using sbt assembly currently. I need to check how to use sbt
>>>> assembly.
>>>> Regards,
>>>> ~Vinti
>>>> On Wed, Feb 24, 2016 at 11:10 AM, Cody Koeninger <>
>>>> wrote:
>>>>> Are you using sbt assembly?  That's what will include all of the
>>>>> non-provided dependencies in a single jar along with your code.  Otherwise
>>>>> you'd have to specify each separate jar in your spark-submit line, which 
>>>>> is
>>>>> a pain.
>>>>> On Wed, Feb 24, 2016 at 12:49 PM, Vinti Maheshwari <
>>>>>> wrote:
>>>>>> Hi Cody,
>>>>>> I tried with the build file you provided, but it's not working for
>>>>>> me, getting same error:
>>>>>> Exception in thread "main" java.lang.NoClassDefFoundError:
>>>>>> org/apache/spark/streaming/kafka/KafkaUtils$
>>>>>> I am not getting this error while building  (sbt package). I am
>>>>>> getting this error when i am running my spark-streaming program.
>>>>>> Do i need 

Re: Spark and KafkaUtils

2016-02-24 Thread Cody Koeninger
Looks like conflicting versions of the same dependency.
If you look at the mergeStrategy section of the build file I posted, you
can add additional lines for whatever dependencies are causing issues, e.g.

  case PathList("org", "jboss", "netty", _*) => MergeStrategy.first

On Wed, Feb 24, 2016 at 2:55 PM, Vinti Maheshwari <>

> Thanks much Cody, I added assembly.sbt and modified build.sbt with ivy bug
> related content.
> It's giving lots of errors related to ivy:
> *[error]
> /Users/vintim/.ivy2/cache/javax.activation/activation/jars/activation-1.1.jar:javax/activation/ActivationDataFlavor.class*
> Here is complete error log:
> Regards,
> ~Vinti
> On Wed, Feb 24, 2016 at 12:16 PM, Cody Koeninger <>
> wrote:
>> Ok, that build file I linked earlier has a minimal example of use.  just
>> running 'sbt assembly' given a similar build file should build a jar with
>> all the dependencies.
>> On Wed, Feb 24, 2016 at 1:50 PM, Vinti Maheshwari <>
>> wrote:
>>> I am not using sbt assembly currently. I need to check how to use sbt
>>> assembly.
>>> Regards,
>>> ~Vinti
>>> On Wed, Feb 24, 2016 at 11:10 AM, Cody Koeninger <>
>>> wrote:
>>>> Are you using sbt assembly?  That's what will include all of the
>>>> non-provided dependencies in a single jar along with your code.  Otherwise
>>>> you'd have to specify each separate jar in your spark-submit line, which is
>>>> a pain.
>>>> On Wed, Feb 24, 2016 at 12:49 PM, Vinti Maheshwari <
>>>>> wrote:
>>>>> Hi Cody,
>>>>> I tried with the build file you provided, but it's not working for me,
>>>>> getting same error:
>>>>> Exception in thread "main" java.lang.NoClassDefFoundError:
>>>>> org/apache/spark/streaming/kafka/KafkaUtils$
>>>>> I am not getting this error while building  (sbt package). I am
>>>>> getting this error when i am running my spark-streaming program.
>>>>> Do i need to specify kafka jar path manually with spark-submit --jars
>>>>> flag?
>>>>> My build.sbt:
>>>>> name := "NetworkStreaming"
>>>>> libraryDependencies += "org.apache.hbase" % "hbase" % "0.92.1"
>>>>> libraryDependencies += "org.apache.hadoop" % "hadoop-core" % "1.0.2"
>>>>> libraryDependencies += "org.apache.spark" % "spark-mllib_2.10" % "1.0.0"
>>>>> libraryDependencies ++= Seq(
>>>>>   "org.apache.spark" % "spark-streaming_2.10" % "1.5.2",
>>>>>   "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.5.2"
>>>>> )
>>>>> Regards,
>>>>> ~Vinti
>>>>> On Wed, Feb 24, 2016 at 9:33 AM, Cody Koeninger <>
>>>>> wrote:
>>>>>> spark streaming is provided, kafka is not.
>>>>>> This build file
>>>>>> includes some hacks for ivy issues that may no longer be strictly
>>>>>> necessary, but try that build and see if it works for you.
>>>>>> On Wed, Feb 24, 2016 at 11:14 AM, Vinti Maheshwari <
>>>>>>> wrote:
>>>>>>> Hello,
>>>>>>> I have tried multiple different settings in build.sbt but seems like
>>>>>>> nothing is working.
>>>>>>> Can anyone suggest the right syntax/way to include kafka with spark?
>>>>>>> Error
>>>>>>> Exception in thread "main" java.lang.NoClassDefFoundError:
>>>>>>> org/apache/spark/streaming/kafka/KafkaUtils$
>>>>>>> build.sbt
>>>>>>> libraryDependencies += "org.apache.hbase" % "hbase" % "0.92.1"
>>>>>>> libraryDependencies += "org.apache.hadoop" % "hadoop-core" % "1.0.2"
>>>>>>> libraryDependencies += "org.apache.spark" % "spark-mllib_2.10" %
>>>>>>> "1.0.0"
>>>>>>> libraryDependencies ++= Seq(
>>>>>>>   "org.apache.spark" % "spark-streaming_2.10" % "1.5.2",
>>>>>>>   "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.5.2",
>>>>>>>   "org.apache.spark" %% "spark-streaming" % "1.5.2" % "provided",
>>>>>>>   "org.apache.spark" %% "spark-streaming-kafka" % "1.5.2" %
>>>>>>> "provided"
>>>>>>> )
>>>>>>> Thanks,
>>>>>>> Vinti

Re: Spark and KafkaUtils

2016-02-24 Thread Vinti Maheshwari
Error msg is:

*[error] deduplicate: different file contents found in the following:*

I tried to adding below block, given in stackoverflow, but still no luck.

excludedJars in assembly <<= (fullClasspath in assembly) map { cp =>
 cp filter {x =>"sbt.*") ||".*macros.*")}}


On Wed, Feb 24, 2016 at 12:55 PM, Vinti Maheshwari <>

> Thanks much Cody, I added assembly.sbt and modified build.sbt with ivy bug
> related content.
> It's giving lots of errors related to ivy:
> *[error]
> /Users/vintim/.ivy2/cache/javax.activation/activation/jars/activation-1.1.jar:javax/activation/ActivationDataFlavor.class*
> Here is complete error log:
> Regards,
> ~Vinti
> On Wed, Feb 24, 2016 at 12:16 PM, Cody Koeninger <>
> wrote:
>> Ok, that build file I linked earlier has a minimal example of use.  just
>> running 'sbt assembly' given a similar build file should build a jar with
>> all the dependencies.
>> On Wed, Feb 24, 2016 at 1:50 PM, Vinti Maheshwari <>
>> wrote:
>>> I am not using sbt assembly currently. I need to check how to use sbt
>>> assembly.
>>> Regards,
>>> ~Vinti
>>> On Wed, Feb 24, 2016 at 11:10 AM, Cody Koeninger <>
>>> wrote:
>>>> Are you using sbt assembly?  That's what will include all of the
>>>> non-provided dependencies in a single jar along with your code.  Otherwise
>>>> you'd have to specify each separate jar in your spark-submit line, which is
>>>> a pain.
>>>> On Wed, Feb 24, 2016 at 12:49 PM, Vinti Maheshwari <
>>>>> wrote:
>>>>> Hi Cody,
>>>>> I tried with the build file you provided, but it's not working for me,
>>>>> getting same error:
>>>>> Exception in thread "main" java.lang.NoClassDefFoundError:
>>>>> org/apache/spark/streaming/kafka/KafkaUtils$
>>>>> I am not getting this error while building  (sbt package). I am
>>>>> getting this error when i am running my spark-streaming program.
>>>>> Do i need to specify kafka jar path manually with spark-submit --jars
>>>>> flag?
>>>>> My build.sbt:
>>>>> name := "NetworkStreaming"
>>>>> libraryDependencies += "org.apache.hbase" % "hbase" % "0.92.1"
>>>>> libraryDependencies += "org.apache.hadoop" % "hadoop-core" % "1.0.2"
>>>>> libraryDependencies += "org.apache.spark" % "spark-mllib_2.10" % "1.0.0"
>>>>> libraryDependencies ++= Seq(
>>>>>   "org.apache.spark" % "spark-streaming_2.10" % "1.5.2",
>>>>>   "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.5.2"
>>>>> )
>>>>> Regards,
>>>>> ~Vinti
>>>>> On Wed, Feb 24, 2016 at 9:33 AM, Cody Koeninger <>
>>>>> wrote:
>>>>>> spark streaming is provided, kafka is not.
>>>>>> This build file
>>>>>> includes some hacks for ivy issues that may no longer be strictly
>>>>>> necessary, but try that build and see if it works for you.
>>>>>> On Wed, Feb 24, 2016 at 11:14 AM, Vinti Maheshwari <
>>>>>>> wrote:
>>>>>>> Hello,
>>>>>>> I have tried multiple different settings in build.sbt but seems like
>>>>>>> nothing is working.
>>>>>>> Can anyone suggest the right syntax/way to include kafka with spark?
>>>>>>> Error
>>>>>>> Exception in thread "main" java.lang.NoClassDefFoundError:
>>>>>>> org/apache/spark/streaming/kafka/KafkaUtils$
>>>>>>> build.sbt
>>>>>>> libraryDependencies += "org.apache.hbase" % "hbase" % "0.92.1"
>>>>>>> libraryDependencies += "org.apache.hadoop" % "hadoop-core" % "1.0.2"
>>>>>>> libraryDependencies += "org.apache.spark" % "spark-mllib_2.10" %
>>>>>>> "1.0.0"
>>>>>>> libraryDependencies ++= Seq(
>>>>>>>   "org.apache.spark" % "spark-streaming_2.10" % "1.5.2",
>>>>>>>   "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.5.2",
>>>>>>>   "org.apache.spark" %% "spark-streaming" % "1.5.2" % "provided",
>>>>>>>   "org.apache.spark" %% "spark-streaming-kafka" % "1.5.2" %
>>>>>>> "provided"
>>>>>>> )
>>>>>>> Thanks,
>>>>>>> Vinti

Re: Spark and KafkaUtils

2016-02-24 Thread Vinti Maheshwari
Thanks much Cody, I added assembly.sbt and modified build.sbt with ivy bug
related content.

It's giving lots of errors related to ivy:


Here is complete error log:


On Wed, Feb 24, 2016 at 12:16 PM, Cody Koeninger <> wrote:

> Ok, that build file I linked earlier has a minimal example of use.  just
> running 'sbt assembly' given a similar build file should build a jar with
> all the dependencies.
> On Wed, Feb 24, 2016 at 1:50 PM, Vinti Maheshwari <>
> wrote:
>> I am not using sbt assembly currently. I need to check how to use sbt
>> assembly.
>> Regards,
>> ~Vinti
>> On Wed, Feb 24, 2016 at 11:10 AM, Cody Koeninger <>
>> wrote:
>>> Are you using sbt assembly?  That's what will include all of the
>>> non-provided dependencies in a single jar along with your code.  Otherwise
>>> you'd have to specify each separate jar in your spark-submit line, which is
>>> a pain.
>>> On Wed, Feb 24, 2016 at 12:49 PM, Vinti Maheshwari <
>>> > wrote:
>>>> Hi Cody,
>>>> I tried with the build file you provided, but it's not working for me,
>>>> getting same error:
>>>> Exception in thread "main" java.lang.NoClassDefFoundError:
>>>> org/apache/spark/streaming/kafka/KafkaUtils$
>>>> I am not getting this error while building  (sbt package). I am getting
>>>> this error when i am running my spark-streaming program.
>>>> Do i need to specify kafka jar path manually with spark-submit --jars
>>>> flag?
>>>> My build.sbt:
>>>> name := "NetworkStreaming"
>>>> libraryDependencies += "org.apache.hbase" % "hbase" % "0.92.1"
>>>> libraryDependencies += "org.apache.hadoop" % "hadoop-core" % "1.0.2"
>>>> libraryDependencies += "org.apache.spark" % "spark-mllib_2.10" % "1.0.0"
>>>> libraryDependencies ++= Seq(
>>>>   "org.apache.spark" % "spark-streaming_2.10" % "1.5.2",
>>>>   "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.5.2"
>>>> )
>>>> Regards,
>>>> ~Vinti
>>>> On Wed, Feb 24, 2016 at 9:33 AM, Cody Koeninger <>
>>>> wrote:
>>>>> spark streaming is provided, kafka is not.
>>>>> This build file
>>>>> includes some hacks for ivy issues that may no longer be strictly
>>>>> necessary, but try that build and see if it works for you.
>>>>> On Wed, Feb 24, 2016 at 11:14 AM, Vinti Maheshwari <
>>>>>> wrote:
>>>>>> Hello,
>>>>>> I have tried multiple different settings in build.sbt but seems like
>>>>>> nothing is working.
>>>>>> Can anyone suggest the right syntax/way to include kafka with spark?
>>>>>> Error
>>>>>> Exception in thread "main" java.lang.NoClassDefFoundError:
>>>>>> org/apache/spark/streaming/kafka/KafkaUtils$
>>>>>> build.sbt
>>>>>> libraryDependencies += "org.apache.hbase" % "hbase" % "0.92.1"
>>>>>> libraryDependencies += "org.apache.hadoop" % "hadoop-core" % "1.0.2"
>>>>>> libraryDependencies += "org.apache.spark" % "spark-mllib_2.10" %
>>>>>> "1.0.0"
>>>>>> libraryDependencies ++= Seq(
>>>>>>   "org.apache.spark" % "spark-streaming_2.10" % "1.5.2",
>>>>>>   "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.5.2",
>>>>>>   "org.apache.spark" %% "spark-streaming" % "1.5.2" % "provided",
>>>>>>   "org.apache.spark" %% "spark-streaming-kafka" % "1.5.2" % "provided"
>>>>>> )
>>>>>> Thanks,
>>>>>> Vinti

Re: Spark and KafkaUtils

2016-02-24 Thread Cody Koeninger
Ok, that build file I linked earlier has a minimal example of use.  just
running 'sbt assembly' given a similar build file should build a jar with
all the dependencies.

On Wed, Feb 24, 2016 at 1:50 PM, Vinti Maheshwari <>

> I am not using sbt assembly currently. I need to check how to use sbt
> assembly.
> Regards,
> ~Vinti
> On Wed, Feb 24, 2016 at 11:10 AM, Cody Koeninger <>
> wrote:
>> Are you using sbt assembly?  That's what will include all of the
>> non-provided dependencies in a single jar along with your code.  Otherwise
>> you'd have to specify each separate jar in your spark-submit line, which is
>> a pain.
>> On Wed, Feb 24, 2016 at 12:49 PM, Vinti Maheshwari <>
>> wrote:
>>> Hi Cody,
>>> I tried with the build file you provided, but it's not working for me,
>>> getting same error:
>>> Exception in thread "main" java.lang.NoClassDefFoundError:
>>> org/apache/spark/streaming/kafka/KafkaUtils$
>>> I am not getting this error while building  (sbt package). I am getting
>>> this error when i am running my spark-streaming program.
>>> Do i need to specify kafka jar path manually with spark-submit --jars
>>> flag?
>>> My build.sbt:
>>> name := "NetworkStreaming"
>>> libraryDependencies += "org.apache.hbase" % "hbase" % "0.92.1"
>>> libraryDependencies += "org.apache.hadoop" % "hadoop-core" % "1.0.2"
>>> libraryDependencies += "org.apache.spark" % "spark-mllib_2.10" % "1.0.0"
>>> libraryDependencies ++= Seq(
>>>   "org.apache.spark" % "spark-streaming_2.10" % "1.5.2",
>>>   "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.5.2"
>>> )
>>> Regards,
>>> ~Vinti
>>> On Wed, Feb 24, 2016 at 9:33 AM, Cody Koeninger <>
>>> wrote:
>>>> spark streaming is provided, kafka is not.
>>>> This build file
>>>> includes some hacks for ivy issues that may no longer be strictly
>>>> necessary, but try that build and see if it works for you.
>>>> On Wed, Feb 24, 2016 at 11:14 AM, Vinti Maheshwari <
>>>>> wrote:
>>>>> Hello,
>>>>> I have tried multiple different settings in build.sbt but seems like
>>>>> nothing is working.
>>>>> Can anyone suggest the right syntax/way to include kafka with spark?
>>>>> Error
>>>>> Exception in thread "main" java.lang.NoClassDefFoundError:
>>>>> org/apache/spark/streaming/kafka/KafkaUtils$
>>>>> build.sbt
>>>>> libraryDependencies += "org.apache.hbase" % "hbase" % "0.92.1"
>>>>> libraryDependencies += "org.apache.hadoop" % "hadoop-core" % "1.0.2"
>>>>> libraryDependencies += "org.apache.spark" % "spark-mllib_2.10" %
>>>>> "1.0.0"
>>>>> libraryDependencies ++= Seq(
>>>>>   "org.apache.spark" % "spark-streaming_2.10" % "1.5.2",
>>>>>   "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.5.2",
>>>>>   "org.apache.spark" %% "spark-streaming" % "1.5.2" % "provided",
>>>>>   "org.apache.spark" %% "spark-streaming-kafka" % "1.5.2" % "provided"
>>>>> )
>>>>> Thanks,
>>>>> Vinti

Re: Spark and KafkaUtils

2016-02-24 Thread Vinti Maheshwari
I am not using sbt assembly currently. I need to check how to use sbt


On Wed, Feb 24, 2016 at 11:10 AM, Cody Koeninger <> wrote:

> Are you using sbt assembly?  That's what will include all of the
> non-provided dependencies in a single jar along with your code.  Otherwise
> you'd have to specify each separate jar in your spark-submit line, which is
> a pain.
> On Wed, Feb 24, 2016 at 12:49 PM, Vinti Maheshwari <>
> wrote:
>> Hi Cody,
>> I tried with the build file you provided, but it's not working for me,
>> getting same error:
>> Exception in thread "main" java.lang.NoClassDefFoundError:
>> org/apache/spark/streaming/kafka/KafkaUtils$
>> I am not getting this error while building  (sbt package). I am getting
>> this error when i am running my spark-streaming program.
>> Do i need to specify kafka jar path manually with spark-submit --jars
>> flag?
>> My build.sbt:
>> name := "NetworkStreaming"
>> libraryDependencies += "org.apache.hbase" % "hbase" % "0.92.1"
>> libraryDependencies += "org.apache.hadoop" % "hadoop-core" % "1.0.2"
>> libraryDependencies += "org.apache.spark" % "spark-mllib_2.10" % "1.0.0"
>> libraryDependencies ++= Seq(
>>   "org.apache.spark" % "spark-streaming_2.10" % "1.5.2",
>>   "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.5.2"
>> )
>> Regards,
>> ~Vinti
>> On Wed, Feb 24, 2016 at 9:33 AM, Cody Koeninger <>
>> wrote:
>>> spark streaming is provided, kafka is not.
>>> This build file
>>> includes some hacks for ivy issues that may no longer be strictly
>>> necessary, but try that build and see if it works for you.
>>> On Wed, Feb 24, 2016 at 11:14 AM, Vinti Maheshwari <
>>> > wrote:
>>>> Hello,
>>>> I have tried multiple different settings in build.sbt but seems like
>>>> nothing is working.
>>>> Can anyone suggest the right syntax/way to include kafka with spark?
>>>> Error
>>>> Exception in thread "main" java.lang.NoClassDefFoundError:
>>>> org/apache/spark/streaming/kafka/KafkaUtils$
>>>> build.sbt
>>>> libraryDependencies += "org.apache.hbase" % "hbase" % "0.92.1"
>>>> libraryDependencies += "org.apache.hadoop" % "hadoop-core" % "1.0.2"
>>>> libraryDependencies += "org.apache.spark" % "spark-mllib_2.10" % "1.0.0"
>>>> libraryDependencies ++= Seq(
>>>>   "org.apache.spark" % "spark-streaming_2.10" % "1.5.2",
>>>>   "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.5.2",
>>>>   "org.apache.spark" %% "spark-streaming" % "1.5.2" % "provided",
>>>>   "org.apache.spark" %% "spark-streaming-kafka" % "1.5.2" % "provided"
>>>> )
>>>> Thanks,
>>>> Vinti

Re: Spark and KafkaUtils

2016-02-24 Thread Cody Koeninger
Are you using sbt assembly?  That's what will include all of the
non-provided dependencies in a single jar along with your code.  Otherwise
you'd have to specify each separate jar in your spark-submit line, which is
a pain.

On Wed, Feb 24, 2016 at 12:49 PM, Vinti Maheshwari <>

> Hi Cody,
> I tried with the build file you provided, but it's not working for me,
> getting same error:
> Exception in thread "main" java.lang.NoClassDefFoundError:
> org/apache/spark/streaming/kafka/KafkaUtils$
> I am not getting this error while building  (sbt package). I am getting
> this error when i am running my spark-streaming program.
> Do i need to specify kafka jar path manually with spark-submit --jars flag?
> My build.sbt:
> name := "NetworkStreaming"
> libraryDependencies += "org.apache.hbase" % "hbase" % "0.92.1"
> libraryDependencies += "org.apache.hadoop" % "hadoop-core" % "1.0.2"
> libraryDependencies += "org.apache.spark" % "spark-mllib_2.10" % "1.0.0"
> libraryDependencies ++= Seq(
>   "org.apache.spark" % "spark-streaming_2.10" % "1.5.2",
>   "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.5.2"
> )
> Regards,
> ~Vinti
> On Wed, Feb 24, 2016 at 9:33 AM, Cody Koeninger <>
> wrote:
>> spark streaming is provided, kafka is not.
>> This build file
>> includes some hacks for ivy issues that may no longer be strictly
>> necessary, but try that build and see if it works for you.
>> On Wed, Feb 24, 2016 at 11:14 AM, Vinti Maheshwari <>
>> wrote:
>>> Hello,
>>> I have tried multiple different settings in build.sbt but seems like
>>> nothing is working.
>>> Can anyone suggest the right syntax/way to include kafka with spark?
>>> Error
>>> Exception in thread "main" java.lang.NoClassDefFoundError:
>>> org/apache/spark/streaming/kafka/KafkaUtils$
>>> build.sbt
>>> libraryDependencies += "org.apache.hbase" % "hbase" % "0.92.1"
>>> libraryDependencies += "org.apache.hadoop" % "hadoop-core" % "1.0.2"
>>> libraryDependencies += "org.apache.spark" % "spark-mllib_2.10" % "1.0.0"
>>> libraryDependencies ++= Seq(
>>>   "org.apache.spark" % "spark-streaming_2.10" % "1.5.2",
>>>   "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.5.2",
>>>   "org.apache.spark" %% "spark-streaming" % "1.5.2" % "provided",
>>>   "org.apache.spark" %% "spark-streaming-kafka" % "1.5.2" % "provided"
>>> )
>>> Thanks,
>>> Vinti

Re: Spark and KafkaUtils

2016-02-24 Thread Vinti Maheshwari
Hi Cody,

I tried with the build file you provided, but it's not working for me,
getting same error:
Exception in thread "main" java.lang.NoClassDefFoundError:

I am not getting this error while building  (sbt package). I am getting
this error when i am running my spark-streaming program.
Do i need to specify kafka jar path manually with spark-submit --jars flag?

My build.sbt:

name := "NetworkStreaming"
libraryDependencies += "org.apache.hbase" % "hbase" % "0.92.1"

libraryDependencies += "org.apache.hadoop" % "hadoop-core" % "1.0.2"

libraryDependencies += "org.apache.spark" % "spark-mllib_2.10" % "1.0.0"

libraryDependencies ++= Seq(
  "org.apache.spark" % "spark-streaming_2.10" % "1.5.2",
  "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.5.2"


On Wed, Feb 24, 2016 at 9:33 AM, Cody Koeninger <> wrote:

> spark streaming is provided, kafka is not.
> This build file
> includes some hacks for ivy issues that may no longer be strictly
> necessary, but try that build and see if it works for you.
> On Wed, Feb 24, 2016 at 11:14 AM, Vinti Maheshwari <>
> wrote:
>> Hello,
>> I have tried multiple different settings in build.sbt but seems like
>> nothing is working.
>> Can anyone suggest the right syntax/way to include kafka with spark?
>> Error
>> Exception in thread "main" java.lang.NoClassDefFoundError:
>> org/apache/spark/streaming/kafka/KafkaUtils$
>> build.sbt
>> libraryDependencies += "org.apache.hbase" % "hbase" % "0.92.1"
>> libraryDependencies += "org.apache.hadoop" % "hadoop-core" % "1.0.2"
>> libraryDependencies += "org.apache.spark" % "spark-mllib_2.10" % "1.0.0"
>> libraryDependencies ++= Seq(
>>   "org.apache.spark" % "spark-streaming_2.10" % "1.5.2",
>>   "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.5.2",
>>   "org.apache.spark" %% "spark-streaming" % "1.5.2" % "provided",
>>   "org.apache.spark" %% "spark-streaming-kafka" % "1.5.2" % "provided"
>> )
>> Thanks,
>> Vinti

Re: Spark and KafkaUtils

2016-02-24 Thread Cody Koeninger
spark streaming is provided, kafka is not.

This build file

includes some hacks for ivy issues that may no longer be strictly
necessary, but try that build and see if it works for you.

On Wed, Feb 24, 2016 at 11:14 AM, Vinti Maheshwari <>

> Hello,
> I have tried multiple different settings in build.sbt but seems like
> nothing is working.
> Can anyone suggest the right syntax/way to include kafka with spark?
> Error
> Exception in thread "main" java.lang.NoClassDefFoundError:
> org/apache/spark/streaming/kafka/KafkaUtils$
> build.sbt
> libraryDependencies += "org.apache.hbase" % "hbase" % "0.92.1"
> libraryDependencies += "org.apache.hadoop" % "hadoop-core" % "1.0.2"
> libraryDependencies += "org.apache.spark" % "spark-mllib_2.10" % "1.0.0"
> libraryDependencies ++= Seq(
>   "org.apache.spark" % "spark-streaming_2.10" % "1.5.2",
>   "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.5.2",
>   "org.apache.spark" %% "spark-streaming" % "1.5.2" % "provided",
>   "org.apache.spark" %% "spark-streaming-kafka" % "1.5.2" % "provided"
> )
> Thanks,
> Vinti

Spark and KafkaUtils

2016-02-24 Thread Vinti Maheshwari

I have tried multiple different settings in build.sbt but seems like
nothing is working.
Can anyone suggest the right syntax/way to include kafka with spark?

Exception in thread "main" java.lang.NoClassDefFoundError:

libraryDependencies += "org.apache.hbase" % "hbase" % "0.92.1"
libraryDependencies += "org.apache.hadoop" % "hadoop-core" % "1.0.2"
libraryDependencies += "org.apache.spark" % "spark-mllib_2.10" % "1.0.0"
libraryDependencies ++= Seq(
  "org.apache.spark" % "spark-streaming_2.10" % "1.5.2",
  "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.5.2",
  "org.apache.spark" %% "spark-streaming" % "1.5.2" % "provided",
  "org.apache.spark" %% "spark-streaming-kafka" % "1.5.2" % "provided"


Spark Streaming KafkaUtils Issue

2014-10-10 Thread Abraham Jacob
Hi Folks,

I am seeing some strange behavior when using the Spark Kafka connector in
Spark streaming.

I have a Kafka topic which has 8 partitions. I have a kafka producer that
pumps some messages into this topic.

On the consumer side I have a spark streaming application that that has 8
executors on 8 worker nodes and 8 ReceiverInputDStream with the same kafka
group id connected to the 8 partitions I have for the topic. Also the kafka
consumer property auto.offset.reset is set to smallest.

Now here is the sequence of steps -

(1) I Start the the spark streaming app.
(2) Start the producer.

As this point I see the messages that are being pumped from the producer in
Spark Streaming.  Then I -

(1) Stopped the producer
(2) Wait for all the message to be consumed.
(2) Stopped the spark streaming app.

Now when I restart the spark streaming app (note - the producer is still
down and no messages are being pumped into the topic) - I observe the
following -

(1) Spark Streaming starts reading from each partition right from the

This is not what I was expecting. I was expecting the consumers started by
spark streaming to start from where it left off

Is my assumption not correct that the consumers (the kafka/spark
connector) to start reading from the topic where it last left off...?

Has anyone else seen this behavior? Is there a way to make it such that it
starts from where it left off?

- Abraham

Re: Spark Streaming KafkaUtils Issue

2014-10-10 Thread Sean McNamara
Would you mind sharing the code leading to your createStream?  Are you also 



On Oct 10, 2014, at 4:31 PM, Abraham Jacob wrote:

 Hi Folks,
 I am seeing some strange behavior when using the Spark Kafka connector in 
 Spark streaming. 
 I have a Kafka topic which has 8 partitions. I have a kafka producer that 
 pumps some messages into this topic.
 On the consumer side I have a spark streaming application that that has 8 
 executors on 8 worker nodes and 8 ReceiverInputDStream with the same kafka 
 group id connected to the 8 partitions I have for the topic. Also the kafka 
 consumer property auto.offset.reset is set to smallest.
 Now here is the sequence of steps - 
 (1) I Start the the spark streaming app.
 (2) Start the producer.
 As this point I see the messages that are being pumped from the producer in 
 Spark Streaming.  Then I - 
 (1) Stopped the producer
 (2) Wait for all the message to be consumed.
 (2) Stopped the spark streaming app.
 Now when I restart the spark streaming app (note - the producer is still down 
 and no messages are being pumped into the topic) - I observe the following - 
 (1) Spark Streaming starts reading from each partition right from the 
 This is not what I was expecting. I was expecting the consumers started by 
 spark streaming to start from where it left off
 Is my assumption not correct that the consumers (the kafka/spark connector) 
 to start reading from the topic where it last left off...?
 Has anyone else seen this behavior? Is there a way to make it such that it 
 starts from where it left off?
 - Abraham

To unsubscribe, e-mail:
For additional commands, e-mail:

Re: Spark Streaming KafkaUtils Issue

2014-10-10 Thread Abraham Jacob
Sure... I do set the for all the consumers to be the same. Here is
the code ---

SparkConf sparkConf = new
SparkConf().setMaster(yarn-cluster).setAppName(Streaming WordCount);
sparkConf.set(spark.shuffle.manager, SORT);
sparkConf.set(spark.streaming.unpersist, true);
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new
 MapString, String kafkaConf = new HashMapString, String();
kafkaConf.put(zookeeper.connect, zookeeper);
kafkaConf.put(, consumerGrp);
kafkaConf.put(auto.offset.reset, smallest);
kafkaConf.put(, 1000);
kafkaConf.put(rebalance.max.retries, 4);
kafkaConf.put(, 3000);
 MapString, Integer topicMap = new HashMapString, Integer();
topicMap.put(topic, 1);
 ListJavaPairDStreambyte[], String kafkaStreams = new
ArrayListJavaPairDStreambyte[], String();
for(int i = 0; i  numPartitions; i++) {
kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, String.class,
DefaultDecoder.class, PayloadDeSerializer.class,
kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER()).mapToPair(new
PairFunctionTuple2byte[],String, byte[], String() {

private static final long serialVersionUID = -1936810126415608167L;

public Tuple2byte[], String call(Tuple2byte[], String tuple2) throws
Exception {
return tuple2;

JavaPairDStreambyte[], String unifiedStream;
if (kafkaStreams.size()  1) {
unifiedStream = jssc.union(kafkaStreams.get(0), kafkaStreams.subList(1,
} else {
unifiedStream = kafkaStreams.get(0);


On Fri, Oct 10, 2014 at 3:37 PM, Sean McNamara

 Would you mind sharing the code leading to your createStream?  Are you
 also setting



 On Oct 10, 2014, at 4:31 PM, Abraham Jacob wrote:

  Hi Folks,
  I am seeing some strange behavior when using the Spark Kafka connector
 in Spark streaming.
  I have a Kafka topic which has 8 partitions. I have a kafka producer
 that pumps some messages into this topic.
  On the consumer side I have a spark streaming application that that has
 8 executors on 8 worker nodes and 8 ReceiverInputDStream with the same
 kafka group id connected to the 8 partitions I have for the topic. Also the
 kafka consumer property auto.offset.reset is set to smallest.
  Now here is the sequence of steps -
  (1) I Start the the spark streaming app.
  (2) Start the producer.
  As this point I see the messages that are being pumped from the producer
 in Spark Streaming.  Then I -
  (1) Stopped the producer
  (2) Wait for all the message to be consumed.
  (2) Stopped the spark streaming app.
  Now when I restart the spark streaming app (note - the producer is still
 down and no messages are being pumped into the topic) - I observe the
 following -
  (1) Spark Streaming starts reading from each partition right from the
  This is not what I was expecting. I was expecting the consumers started
 by spark streaming to start from where it left off
  Is my assumption not correct that the consumers (the kafka/spark
 connector) to start reading from the topic where it last left off...?
  Has anyone else seen this behavior? Is there a way to make it such that
 it starts from where it left off?
  - Abraham


Re: Spark Streaming KafkaUtils Issue

2014-10-10 Thread Sean McNamara
How long do you let the consumers run for?  Is it less than 60 seconds by 
chance? defaults to 6 (60 seconds).  If so that 
may explain why you are seeing that behavior.



On Oct 10, 2014, at 4:47 PM, Abraham Jacob wrote:

Sure... I do set the group.id for all the consumers to be the 
same. Here is the code ---

SparkConf sparkConf = new 
SparkConf().setMaster(yarn-cluster).setAppName(Streaming WordCount);
sparkConf.set(spark.shuffle.manager, SORT);
sparkConf.set(spark.streaming.unpersist, true);
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new 
MapString, String kafkaConf = new HashMapString, String();
kafkaConf.put(zookeeper.connect, zookeeper);
kafkaConf.put(group.id, consumerGrp);
kafkaConf.put(auto.offset.reset, smallest);
kafkaConf.put(rebalance.max.retries, 4);
kafkaConf.put(rebalance.backoff.ms, 3000);
MapString, Integer topicMap = new HashMapString, Integer();
topicMap.put(topic, 1);
ListJavaPairDStreambyte[], String kafkaStreams = new 
ArrayListJavaPairDStreambyte[], String();
for(int i = 0; i  numPartitions; i++) {
kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, String.class,
DefaultDecoder.class, PayloadDeSerializer.class,
kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER()).mapToPair(new 
PairFunctionTuple2byte[],String, byte[], String() {

private static final long serialVersionUID = -1936810126415608167L;

public Tuple2byte[], String call(Tuple2byte[], String tuple2) throws 
Exception {
return tuple2;

JavaPairDStreambyte[], String unifiedStream;
if (kafkaStreams.size()  1) {
unifiedStream = jssc.union(kafkaStreams.get(0), kafkaStreams.subList(1, 
} else {
unifiedStream = kafkaStreams.get(0);


On Fri, Oct 10, 2014 at 3:37 PM, Sean McNamara wrote:
Would you mind sharing the code leading to your createStream?  Are you also 
setting group.id



On Oct 10, 2014, at 4:31 PM, Abraham Jacob wrote:

 Hi Folks,

 I am seeing some strange behavior when using the Spark Kafka connector in 
 Spark streaming.

 I have a Kafka topic which has 8 partitions. I have a kafka producer that 
 pumps some messages into this topic.

 On the consumer side I have a spark streaming application that that has 8 
 executors on 8 worker nodes and 8 ReceiverInputDStream with the same kafka 
 group id connected to the 8 partitions I have for the topic. Also the kafka 
 consumer property auto.offset.reset is set to smallest.

 Now here is the sequence of steps -

 (1) I Start the the spark streaming app.
 (2) Start the producer.

 As this point I see the messages that are being pumped from the producer in 
 Spark Streaming.  Then I -

 (1) Stopped the producer
 (2) Wait for all the message to be consumed.
 (2) Stopped the spark streaming app.

 Now when I restart the spark streaming app (note - the producer is still down 
 and no messages are being pumped into the topic) - I observe the following -

 (1) Spark Streaming starts reading from each partition right from the 

 This is not what I was expecting. I was expecting the consumers started by 
 spark streaming to start from where it left off

 Is my assumption not correct that the consumers (the kafka/spark connector) 
 to start reading from the topic where it last left off...?

 Has anyone else seen this behavior? Is there a way to make it such that it 
 starts from where it left off?

 - Abraham


RE: Spark Streaming KafkaUtils Issue

2014-10-10 Thread Shao, Saisai
Hi Abraham,

You are correct, the “auto.offset.reset“ behavior in KafkaReceiver is different 
from original Kafka’s semantics, if you set this configure, KafkaReceiver will 
clean the related immediately, but for Kafka this configuration is just a hint 
which will be effective only when offset is out-of-range. So you will always 
read data from the beginning as you set to “smallest”, otherwise if you set to 
“largest”, you will always get data from the end immediately.

There’s a JIRA and PR to follow this, but still not merged to the master, you 
can check to see it (


From: Abraham Jacob []
Sent: Saturday, October 11, 2014 6:57 AM
To: Sean McNamara
Subject: Re: Spark Streaming KafkaUtils Issue

Probably this is the issue -

·Spark’s usage of the Kafka consumer parameter 
is different from Kafka’s semantics. In Kafka, the behavior of setting 
auto.offset.reset to “smallest” is that the consumer will automatically reset 
the offset to the smallest offset when a) there is no existing offset stored in 
ZooKeeper or b) there is an existing offset but it is out of range. Spark 
however will always remove existing offsets and then start all the way from 
zero again. This means whenever you restart your application with 
auto.offset.reset = smallest, your application will completely re-process all 
available Kafka data. Doh! See this 
 that discussion

Hmm interesting... Wondering what happens if I set it as largest...?

On Fri, Oct 10, 2014 at 3:47 PM, Abraham Jacob wrote:
Sure... I do set the group.id for all the consumers to be the 
same. Here is the code ---

SparkConf sparkConf = new 
SparkConf().setMaster(yarn-cluster).setAppName(Streaming WordCount);
sparkConf.set(spark.shuffle.manager, SORT);
sparkConf.set(spark.streaming.unpersist, true);
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new 
MapString, String kafkaConf = new HashMapString, String();
kafkaConf.put(zookeeper.connect, zookeeper);
kafkaConf.put(group.id, consumerGrp);
kafkaConf.put(auto.offset.reset, smallest);
kafkaConf.put(rebalance.max.retries, 4);
kafkaConf.put(rebalance.backoff.ms, 3000);
MapString, Integer topicMap = new HashMapString, Integer();
topicMap.put(topic, 1);
ListJavaPairDStreambyte[], String kafkaStreams = new 
ArrayListJavaPairDStreambyte[], String();
for(int i = 0; i  numPartitions; i++) {
kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, String.class,
DefaultDecoder.class, PayloadDeSerializer.class,
kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER()).mapToPair(new 
PairFunctionTuple2byte[],String, byte[], String() {

private static final long serialVersionUID = -1936810126415608167L;

public Tuple2byte[], String call(Tuple2byte[], String tuple2) throws 
Exception {
return tuple2;

JavaPairDStreambyte[], String unifiedStream;
if (kafkaStreams.size()  1) {
unifiedStream = jssc.union(kafkaStreams.get(0), kafkaStreams.subList(1, 
} else {
unifiedStream = kafkaStreams.get(0);


On Fri, Oct 10, 2014 at 3:37 PM, Sean McNamara wrote:
Would you mind sharing the code leading to your createStream?  Are you also 
setting group.id



On Oct 10, 2014, at 4:31 PM, Abraham Jacob wrote:

 Hi Folks,

 I am seeing some strange behavior when using the Spark Kafka connector in 
 Spark streaming.

 I have a Kafka topic which has 8 partitions. I have a kafka producer that 
 pumps some messages into this topic.

 On the consumer side I have a spark streaming application that that has 8 
 executors on 8 worker nodes and 8 ReceiverInputDStream with the same kafka 
 group id connected to the 8 partitions I have for the topic. Also the kafka 
 consumer property auto.offset.reset is set to smallest.

 Now here is the sequence of steps -

 (1) I Start the the spark streaming app.
 (2) Start the producer.

 As this point I see the messages that are being pumped from the producer in 
 Spark Streaming.  Then I -

 (1) Stopped the producer
 (2) Wait for all the message to be consumed.
 (2) Stopped the spark streaming app.

 Now when I restart the spark streaming app (note

Re: Spark Streaming KafkaUtils Issue

2014-10-10 Thread Abraham Jacob
Thanks Jerry, So, from what I can understand from the code, if I leave out
auto.offset.reset, it should theoretically read from the last commit
point... Correct?


On Fri, Oct 10, 2014 at 5:40 PM, Shao, Saisai wrote:

  Hi Abraham,

 You are correct, the “auto.offset.reset“ behavior in KafkaReceiver is
 different from original Kafka’s semantics, if you set this configure,
 KafkaReceiver will clean the related immediately, but for Kafka this
 configuration is just a hint which will be effective only when offset is
 out-of-range. So you will always read data from the beginning as you set to
 “smallest”, otherwise if you set to “largest”, you will always get data
 from the end immediately.

 There’s a JIRA and PR to follow this, but still not merged to the master,
 you can check to see it (



 *From:* Abraham Jacob []
 *Sent:* Saturday, October 11, 2014 6:57 AM
 *To:* Sean McNamara
 *Subject:* Re: Spark Streaming KafkaUtils Issue

 Probably this is the issue -

 ·Spark’s usage of the Kafka consumer parameter auto.offset.reset is different
 from Kafka’s semantics. In Kafka, the behavior of setting
 auto.offset.reset to “smallest” is that the consumer will automatically
 reset the offset to the smallest offset when a) there is no existing offset
 stored in ZooKeeper or b) there is an existing offset but it is out of
 range. Spark however will *always* remove existing offsets and then start
 all the way from zero again. This means whenever you restart your
 application with auto.offset.reset = smallest, your application will
 completely re-process all available Kafka data. Doh! See this discussion
 and that discussion

 Hmm interesting... Wondering what happens if I set it as largest...?

 On Fri, Oct 10, 2014 at 3:47 PM, Abraham Jacob

  Sure... I do set the for all the consumers to be the same. Here
 is the code ---

 SparkConf sparkConf = new
 SparkConf().setMaster(yarn-cluster).setAppName(Streaming WordCount);

 sparkConf.set(spark.shuffle.manager, SORT);

 sparkConf.set(spark.streaming.unpersist, true);

 JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new

 MapString, String kafkaConf = new HashMapString, String();

 kafkaConf.put(zookeeper.connect, zookeeper);

 kafkaConf.put(, consumerGrp);

 kafkaConf.put(auto.offset.reset, smallest);

 kafkaConf.put(, 1000);

 kafkaConf.put(rebalance.max.retries, 4);

 kafkaConf.put(, 3000);

 MapString, Integer topicMap = new HashMapString, Integer();

 topicMap.put(topic, 1);

 ListJavaPairDStreambyte[], String kafkaStreams = new
 ArrayListJavaPairDStreambyte[], String();

 for(int i = 0; i  numPartitions; i++) {

 kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, String.class,

 DefaultDecoder.class, PayloadDeSerializer.class,

 kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER()).mapToPair(new
 PairFunctionTuple2byte[],String, byte[], String() {

 private static final long serialVersionUID = -1936810126415608167L;

 public Tuple2byte[], String call(Tuple2byte[], String tuple2) throws
 Exception {

 return tuple2;






 JavaPairDStreambyte[], String unifiedStream;

 if (kafkaStreams.size()  1) {

 unifiedStream = jssc.union(kafkaStreams.get(0), kafkaStreams.subList(1,

 } else {

 unifiedStream = kafkaStreams.get(0);






 On Fri, Oct 10, 2014 at 3:37 PM, Sean McNamara wrote:

 Would you mind sharing the code leading to your createStream?  Are you
 also setting



 On Oct 10, 2014, at 4:31 PM, Abraham Jacob wrote:

  Hi Folks,
  I am seeing some strange behavior when using the Spark Kafka connector
 in Spark streaming.
  I have a Kafka topic which has 8 partitions. I have a kafka producer
 that pumps some messages into this topic.
  On the consumer side I have a spark streaming application that that has
 8 executors on 8 worker nodes and 8 ReceiverInputDStream with the same
 kafka group id connected to the 8 partitions I have for the topic. Also the
 kafka consumer property auto.offset.reset is set to smallest.
  Now here is the sequence of steps -
  (1) I Start the the spark streaming app.
  (2) Start the producer.
  As this point I see the messages that are being pumped from

Re: Spark Streaming KafkaUtils Issue

2014-10-10 Thread Sean McNamara
This jira and comment sums up the issue:

Basically the offset param was renamed and had slightly different semantics 
between kafka 0.7 than 0.8.  Also it was useful because earlier versions of the 
spark streaming receiver could be overwhelmed when having a streaming job down 
for a period of time.

I think this PR quite nicely addresses the issue:



On Oct 10, 2014, at 6:48 PM, Abraham Jacob wrote:

Thanks Jerry, So, from what I can understand from the code, if I leave out 
auto.offset.reset, it should theoretically read from the last commit point... 


On Fri, Oct 10, 2014 at 5:40 PM, Shao, Saisai wrote:
Hi Abraham,

You are correct, the “auto.offset.reset“ behavior in KafkaReceiver is different 
from original Kafka’s semantics, if you set this configure, KafkaReceiver will 
clean the related immediately, but for Kafka this configuration is just a hint 
which will be effective only when offset is out-of-range. So you will always 
read data from the beginning as you set to “smallest”, otherwise if you set to 
“largest”, you will always get data from the end immediately.

There’s a JIRA and PR to follow this, but still not merged to the master, you 
can check to see it (


From: Abraham Jacob []
Sent: Saturday, October 11, 2014 6:57 AM
To: Sean McNamara
Subject: Re: Spark Streaming KafkaUtils Issue

Probably this is the issue -

•Spark’s usage of the Kafka consumer parameter 
is different from Kafka’s semantics. In Kafka, the behavior of setting 
auto.offset.reset to “smallest” is that the consumer will automatically reset 
the offset to the smallest offset when a) there is no existing offset stored in 
ZooKeeper or b) there is an existing offset but it is out of range. Spark 
however will always remove existing offsets and then start all the way from 
zero again. This means whenever you restart your application with 
auto.offset.reset = smallest, your application will completely re-process all 
available Kafka data. Doh! See this 
 that discussion

Hmm interesting... Wondering what happens if I set it as largest...?

On Fri, Oct 10, 2014 at 3:47 PM, Abraham Jacob wrote:
Sure... I do set the group.id for all the consumers to be the 
same. Here is the code ---

SparkConf sparkConf = new 
SparkConf().setMaster(yarn-cluster).setAppName(Streaming WordCount);
sparkConf.set(spark.shuffle.manager, SORT);
sparkConf.set(spark.streaming.unpersist, true);
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new 
MapString, String kafkaConf = new HashMapString, String();
kafkaConf.put(zookeeper.connect, zookeeper);
kafkaConf.put(group.id, consumerGrp);
kafkaConf.put(auto.offset.reset, smallest);
kafkaConf.put(rebalance.max.retries, 4);
kafkaConf.put(rebalance.backoff.ms, 3000);
MapString, Integer topicMap = new HashMapString, Integer();
topicMap.put(topic, 1);
ListJavaPairDStreambyte[], String kafkaStreams = new 
ArrayListJavaPairDStreambyte[], String();
for(int i = 0; i  numPartitions; i++) {
kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, String.class,
DefaultDecoder.class, PayloadDeSerializer.class,
kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER()).mapToPair(new 
PairFunctionTuple2byte[],String, byte[], String() {

private static final long serialVersionUID = -1936810126415608167L;

public Tuple2byte[], String call(Tuple2byte[], String tuple2) throws 
Exception {
return tuple2;

JavaPairDStreambyte[], String unifiedStream;
if (kafkaStreams.size()  1) {
unifiedStream = jssc.union(kafkaStreams.get(0), kafkaStreams.subList(1, 
} else {
unifiedStream = kafkaStreams.get(0);


On Fri, Oct 10, 2014 at 3:37 PM, Sean McNamara wrote:
Would you mind sharing the code leading to your createStream?  Are you also 
setting group.id



On Oct 10, 2014

RE: Spark Streaming KafkaUtils Issue

2014-10-10 Thread Shao, Saisai
Hi abe,

You can see the details in KafkaInputDStream.scala, here is the snippet

// When auto.offset.reset is defined, it is our responsibility to try and 
whack the
// consumer group zk node.
if (kafkaParams.contains(auto.offset.reset)) {
 tryZookeeperConsumerGroupCleanup(zkConnect, kafkaParams(

KafkaReceiver will check your kafkaParams, if “auto.offset.reset” is set, it 
will clean ZK metadata immediately, so you will always read data from beginning 
(set to “smallest”) and end (set to “largest”) immediately, because the ZK 
metadata is deleted beforehand.

If you do not set this parameter, this code path will not be triggered, so data 
will be read from the last commit point. And if last commit point is not yet 
available, Kafka will move the offset to the end of partition (Kafka is set 
“auto.commit.offset” to “largest” by default).

If you want to keep the same semantics as Kafka, you need to remove the above 
code path manually and recompile the Spark.


From: Abraham Jacob []
Sent: Saturday, October 11, 2014 8:49 AM
To: Shao, Saisai
Cc:; Sean McNamara
Subject: Re: Spark Streaming KafkaUtils Issue

Thanks Jerry, So, from what I can understand from the code, if I leave out 
auto.offset.reset, it should theoretically read from the last commit point... 


On Fri, Oct 10, 2014 at 5:40 PM, Shao, Saisai wrote:
Hi Abraham,

You are correct, the “auto.offset.reset“ behavior in KafkaReceiver is different 
from original Kafka’s semantics, if you set this configure, KafkaReceiver will 
clean the related immediately, but for Kafka this configuration is just a hint 
which will be effective only when offset is out-of-range. So you will always 
read data from the beginning as you set to “smallest”, otherwise if you set to 
“largest”, you will always get data from the end immediately.

There’s a JIRA and PR to follow this, but still not merged to the master, you 
can check to see it (


From: Abraham Jacob []
Sent: Saturday, October 11, 2014 6:57 AM
To: Sean McNamara
Subject: Re: Spark Streaming KafkaUtils Issue

Probably this is the issue -

•Spark’s usage of the Kafka consumer parameter 
is different from Kafka’s semantics. In Kafka, the behavior of setting 
auto.offset.reset to “smallest” is that the consumer will automatically reset 
the offset to the smallest offset when a) there is no existing offset stored in 
ZooKeeper or b) there is an existing offset but it is out of range. Spark 
however will always remove existing offsets and then start all the way from 
zero again. This means whenever you restart your application with 
auto.offset.reset = smallest, your application will completely re-process all 
available Kafka data. Doh! See this 
 that discussion

Hmm interesting... Wondering what happens if I set it as largest...?

On Fri, Oct 10, 2014 at 3:47 PM, Abraham Jacob wrote:
Sure... I do set the group.id for all the consumers to be the 
same. Here is the code ---

SparkConf sparkConf = new 
SparkConf().setMaster(yarn-cluster).setAppName(Streaming WordCount);
sparkConf.set(spark.shuffle.manager, SORT);
sparkConf.set(spark.streaming.unpersist, true);
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new 
MapString, String kafkaConf = new HashMapString, String();
kafkaConf.put(zookeeper.connect, zookeeper);
kafkaConf.put(group.id, consumerGrp);
kafkaConf.put(auto.offset.reset, smallest);
kafkaConf.put(rebalance.max.retries, 4);
kafkaConf.put(rebalance.backoff.ms, 3000);
MapString, Integer topicMap = new HashMapString, Integer();
topicMap.put(topic, 1);
ListJavaPairDStreambyte[], String kafkaStreams = new 
ArrayListJavaPairDStreambyte[], String();
for(int i = 0; i  numPartitions; i++) {
kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, String.class,
DefaultDecoder.class, PayloadDeSerializer.class,
kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER()).mapToPair(new 
PairFunctionTuple2byte[],String, byte[], String() {

private static final long serialVersionUID = -1936810126415608167L;

public Tuple2byte[], String call(Tuple2byte[], String tuple2) throws 
Exception {
return tuple2

Re: Spark Streaming KafkaUtils Issue

2014-10-10 Thread Abraham Jacob
Ah I see... much clearer now...

Because auto.offset.reset will trigger KafkaReciver to delete the ZK
metadata; when the control passes over to Kafka consumer API it will see
that there is no offset available for the partition. This then will trigger
the smallest or largest logic to execute in kafka, depending on what we
set for auto.offset.reset...

Thanks for explaining this clearly! Appreciate your effort.

On Fri, Oct 10, 2014 at 6:08 PM, Shao, Saisai wrote:

  Hi abe,

 You can see the details in KafkaInputDStream.scala, here is the snippet

 // When auto.offset.reset is defined, it is our responsibility to try
 and whack the

 // consumer group zk node.

 if (kafkaParams.contains(auto.offset.reset)) {

  tryZookeeperConsumerGroupCleanup(zkConnect, kafkaParams(


 KafkaReceiver will check your kafkaParams, if “auto.offset.reset” is set,
 it will clean ZK metadata immediately, so you will always read data from
 beginning (set to “smallest”) and end (set to “largest”) immediately,
 because the ZK metadata is deleted beforehand.

 If you do not set this parameter, this code path will not be triggered, so
 data will be read from the last commit point. And if last commit point is
 not yet available, Kafka will move the offset to the end of partition
 (Kafka is set “auto.commit.offset” to “largest” by default).

 If you want to keep the same semantics as Kafka, you need to remove the
 above code path manually and recompile the Spark.



 *From:* Abraham Jacob []
 *Sent:* Saturday, October 11, 2014 8:49 AM
 *To:* Shao, Saisai
 *Cc:*; Sean McNamara

 *Subject:* Re: Spark Streaming KafkaUtils Issue

 Thanks Jerry, So, from what I can understand from the code, if I leave out
 auto.offset.reset, it should theoretically read from the last commit
 point... Correct?


 On Fri, Oct 10, 2014 at 5:40 PM, Shao, Saisai

  Hi Abraham,

 You are correct, the “auto.offset.reset“ behavior in KafkaReceiver is
 different from original Kafka’s semantics, if you set this configure,
 KafkaReceiver will clean the related immediately, but for Kafka this
 configuration is just a hint which will be effective only when offset is
 out-of-range. So you will always read data from the beginning as you set to
 “smallest”, otherwise if you set to “largest”, you will always get data
 from the end immediately.

 There’s a JIRA and PR to follow this, but still not merged to the master,
 you can check to see it (



 *From:* Abraham Jacob []
 *Sent:* Saturday, October 11, 2014 6:57 AM
 *To:* Sean McNamara
 *Subject:* Re: Spark Streaming KafkaUtils Issue

 Probably this is the issue -

 ·Spark’s usage of the Kafka consumer parameter auto.offset.reset is different
 from Kafka’s semantics. In Kafka, the behavior of setting
 auto.offset.reset to “smallest” is that the consumer will automatically
 reset the offset to the smallest offset when a) there is no existing offset
 stored in ZooKeeper or b) there is an existing offset but it is out of
 range. Spark however will *always* remove existing offsets and then start
 all the way from zero again. This means whenever you restart your
 application with auto.offset.reset = smallest, your application will
 completely re-process all available Kafka data. Doh! See this discussion
 and that discussion

 Hmm interesting... Wondering what happens if I set it as largest...?

 On Fri, Oct 10, 2014 at 3:47 PM, Abraham Jacob

  Sure... I do set the for all the consumers to be the same. Here
 is the code ---

 SparkConf sparkConf = new
 SparkConf().setMaster(yarn-cluster).setAppName(Streaming WordCount);

 sparkConf.set(spark.shuffle.manager, SORT);

 sparkConf.set(spark.streaming.unpersist, true);

 JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new

 MapString, String kafkaConf = new HashMapString, String();

 kafkaConf.put(zookeeper.connect, zookeeper);

 kafkaConf.put(, consumerGrp);

 kafkaConf.put(auto.offset.reset, smallest);

 kafkaConf.put(, 1000);

 kafkaConf.put(rebalance.max.retries, 4);

 kafkaConf.put(, 3000);

 MapString, Integer topicMap = new HashMapString, Integer();

 topicMap.put(topic, 1);

 ListJavaPairDStreambyte[], String kafkaStreams = new
 ArrayListJavaPairDStreambyte[], String();

 for(int i = 0; i