Re: createDirectStream with offsets

2016-05-07 Thread Eric Friedman
Thanks Cody.  It turns out that there was an even simpler explanation (the
flaw you pointed out was accurate too).  I had mutable.Map instances being
passed where KafkaUtils wants immutable ones.

On Fri, May 6, 2016 at 8:32 AM, Cody Koeninger  wrote:

> Look carefully at the error message, the types you're passing in don't
> match.  For instance, you're passing in a message handler that returns
> a tuple, but the rdd return type you're specifying (the 5th type
> argument) is just String.
>
> On Fri, May 6, 2016 at 9:49 AM, Eric Friedman 
> wrote:
> > My build dependencies:
> >
> >
> > compile 'org.scala-lang:scala-library:2.10.4'
> >
> > compile 'org.apache.spark:spark-core_2.10:1.6.1'
> >
> > compile 'org.apache.spark:spark-sql_2.10:1.6.1'
> >
> > compile 'org.apache.spark:spark-hive_2.10:1.6.1'
> >
> > compile 'org.apache.spark:spark-streaming_2.10:1.6.1'
> >
> > compile 'com.databricks:spark-avro_2.10:2.0.1'
> >
> >
> > compile 'org.apache.spark:spark-streaming-kafka_2.10:1.6.1'
> >
> > compile 'org.apache.kafka:kafka-clients:0.8.2.1'
> >
> > compile 'org.apache.kafka:kafka_2.10:0.8.2.1'
> >
> > compile 'com.yammer.metrics:metrics-core:2.2.0'
> >
> >
> > On Fri, May 6, 2016 at 7:47 AM, Eric Friedman  >
> > wrote:
> >>
> >> Hello,
> >>
> >> I've been using createDirectStream with Kafka and now need to switch to
> >> the version of that API that lets me supply offsets for my topics.  I'm
> >> unable to get this to compile for some reason, even if I lift the very
> same
> >> usage from the Spark test suite.
> >>
> >> I'm calling it like this:
> >>
> >> val topic = "offset"
> >>
> >> val topicPartition = TopicAndPartition(topic, 0)
> >>
> >> val messageHandler = (mmd: MessageAndMetadata[String, String]) =>
> >> (mmd.key, mmd.message)
> >>
> >> val stream =  KafkaUtils.createDirectStream[String, String,
> >> StringDecoder, StringDecoder, String](
> >>
> >> ssc, kafkaParams, Map(topicPartition -> 11L), messageHandler)
> >>
> >>
> >>
> >>
> >> Error:
> >>
> >> MyCode.scala:97: overloaded method value createDirectStream with
> >> alternatives:
> >>
> >>   (jssc:
> >> org.apache.spark.streaming.api.java.JavaStreamingContext,keyClass:
> >> Class[String],valueClass: Class[String],keyDecoderClass:
> >> Class[kafka.serializer.StringDecoder],valueDecoderClass:
> >> Class[kafka.serializer.StringDecoder],recordClass:
> >> Class[String],kafkaParams: java.util.Map[String,String],fromOffsets:
> >>
> java.util.Map[kafka.common.TopicAndPartition,java.lang.Long],messageHandler:
> >>
> org.apache.spark.api.java.function.Function[kafka.message.MessageAndMetadata[String,String],String])org.apache.spark.streaming.api.java.JavaInputDStream[String]
> >> 
> >>
> >>   (ssc: org.apache.spark.streaming.StreamingContext,kafkaParams:
> >> scala.collection.immutable.Map[String,String],fromOffsets:
> >>
> scala.collection.immutable.Map[kafka.common.TopicAndPartition,scala.Long],messageHandler:
> >> kafka.message.MessageAndMetadata[String,String] => String)(implicit
> >> evidence$14: scala.reflect.ClassTag[String], implicit evidence$15:
> >> scala.reflect.ClassTag[String], implicit evidence$16:
> >> scala.reflect.ClassTag[kafka.serializer.StringDecoder], implicit
> >> evidence$17: scala.reflect.ClassTag[kafka.serializer.StringDecoder],
> >> implicit evidence$18:
> >>
> scala.reflect.ClassTag[String])org.apache.spark.streaming.dstream.InputDStream[String]
> >>
> >>  cannot be applied to (org.apache.spark.streaming.StreamingContext,
> >> scala.collection.mutable.Map[String,String],
> >> scala.collection.mutable.Map[kafka.common.TopicAndPartition,scala.Long],
> >> kafka.message.MessageAndMetadata[String,String] => (String, String))
> >>
> >> val stream =  KafkaUtils.createDirectStream[String, String,
> >> StringDecoder, StringDecoder, String](
> >>
> >>^
> >>
> >> one error found
> >
> >
>


Re: createDirectStream with offsets

2016-05-06 Thread Cody Koeninger
Look carefully at the error message, the types you're passing in don't
match.  For instance, you're passing in a message handler that returns
a tuple, but the rdd return type you're specifying (the 5th type
argument) is just String.

On Fri, May 6, 2016 at 9:49 AM, Eric Friedman  wrote:
> My build dependencies:
>
>
> compile 'org.scala-lang:scala-library:2.10.4'
>
> compile 'org.apache.spark:spark-core_2.10:1.6.1'
>
> compile 'org.apache.spark:spark-sql_2.10:1.6.1'
>
> compile 'org.apache.spark:spark-hive_2.10:1.6.1'
>
> compile 'org.apache.spark:spark-streaming_2.10:1.6.1'
>
> compile 'com.databricks:spark-avro_2.10:2.0.1'
>
>
> compile 'org.apache.spark:spark-streaming-kafka_2.10:1.6.1'
>
> compile 'org.apache.kafka:kafka-clients:0.8.2.1'
>
> compile 'org.apache.kafka:kafka_2.10:0.8.2.1'
>
> compile 'com.yammer.metrics:metrics-core:2.2.0'
>
>
> On Fri, May 6, 2016 at 7:47 AM, Eric Friedman 
> wrote:
>>
>> Hello,
>>
>> I've been using createDirectStream with Kafka and now need to switch to
>> the version of that API that lets me supply offsets for my topics.  I'm
>> unable to get this to compile for some reason, even if I lift the very same
>> usage from the Spark test suite.
>>
>> I'm calling it like this:
>>
>> val topic = "offset"
>>
>> val topicPartition = TopicAndPartition(topic, 0)
>>
>> val messageHandler = (mmd: MessageAndMetadata[String, String]) =>
>> (mmd.key, mmd.message)
>>
>> val stream =  KafkaUtils.createDirectStream[String, String,
>> StringDecoder, StringDecoder, String](
>>
>> ssc, kafkaParams, Map(topicPartition -> 11L), messageHandler)
>>
>>
>>
>>
>> Error:
>>
>> MyCode.scala:97: overloaded method value createDirectStream with
>> alternatives:
>>
>>   (jssc:
>> org.apache.spark.streaming.api.java.JavaStreamingContext,keyClass:
>> Class[String],valueClass: Class[String],keyDecoderClass:
>> Class[kafka.serializer.StringDecoder],valueDecoderClass:
>> Class[kafka.serializer.StringDecoder],recordClass:
>> Class[String],kafkaParams: java.util.Map[String,String],fromOffsets:
>> java.util.Map[kafka.common.TopicAndPartition,java.lang.Long],messageHandler:
>> org.apache.spark.api.java.function.Function[kafka.message.MessageAndMetadata[String,String],String])org.apache.spark.streaming.api.java.JavaInputDStream[String]
>> 
>>
>>   (ssc: org.apache.spark.streaming.StreamingContext,kafkaParams:
>> scala.collection.immutable.Map[String,String],fromOffsets:
>> scala.collection.immutable.Map[kafka.common.TopicAndPartition,scala.Long],messageHandler:
>> kafka.message.MessageAndMetadata[String,String] => String)(implicit
>> evidence$14: scala.reflect.ClassTag[String], implicit evidence$15:
>> scala.reflect.ClassTag[String], implicit evidence$16:
>> scala.reflect.ClassTag[kafka.serializer.StringDecoder], implicit
>> evidence$17: scala.reflect.ClassTag[kafka.serializer.StringDecoder],
>> implicit evidence$18:
>> scala.reflect.ClassTag[String])org.apache.spark.streaming.dstream.InputDStream[String]
>>
>>  cannot be applied to (org.apache.spark.streaming.StreamingContext,
>> scala.collection.mutable.Map[String,String],
>> scala.collection.mutable.Map[kafka.common.TopicAndPartition,scala.Long],
>> kafka.message.MessageAndMetadata[String,String] => (String, String))
>>
>> val stream =  KafkaUtils.createDirectStream[String, String,
>> StringDecoder, StringDecoder, String](
>>
>>^
>>
>> one error found
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: createDirectStream with offsets

2016-05-06 Thread Eric Friedman
My build dependencies:


compile 'org.scala-lang:scala-library:2.10.4'

compile 'org.apache.spark:spark-core_2.10:1.6.1'

compile 'org.apache.spark:spark-sql_2.10:1.6.1'

compile 'org.apache.spark:spark-hive_2.10:1.6.1'

compile 'org.apache.spark:spark-streaming_2.10:1.6.1'

compile 'com.databricks:spark-avro_2.10:2.0.1'


compile 'org.apache.spark:spark-streaming-kafka_2.10:1.6.1'

compile 'org.apache.kafka:kafka-clients:0.8.2.1'

compile 'org.apache.kafka:kafka_2.10:0.8.2.1'

compile 'com.yammer.metrics:metrics-core:2.2.0'

On Fri, May 6, 2016 at 7:47 AM, Eric Friedman 
wrote:

> Hello,
>
> I've been using createDirectStream with Kafka and now need to switch to
> the version of that API that lets me supply offsets for my topics.  I'm
> unable to get this to compile for some reason, even if I lift the very same
> usage from the Spark test suite.
>
> I'm calling it like this:
>
> val topic = "offset"
>
> val topicPartition = TopicAndPartition(topic, 0)
>
> val messageHandler = (mmd: MessageAndMetadata[String, String]) =>
> (mmd.key, mmd.message)
>
> val stream =  KafkaUtils.createDirectStream[String, String,
> StringDecoder, StringDecoder, String](
>
> ssc, kafkaParams, Map(topicPartition -> 11L), messageHandler)
>
>
>
> Error:
>
> MyCode.scala:97: overloaded method value createDirectStream with
> alternatives:
>
>   (jssc:
> org.apache.spark.streaming.api.java.JavaStreamingContext,keyClass:
> Class[String],valueClass: Class[String],keyDecoderClass:
> Class[kafka.serializer.StringDecoder],valueDecoderClass:
> Class[kafka.serializer.StringDecoder],recordClass:
> Class[String],kafkaParams: java.util.Map[String,String],fromOffsets:
> java.util.Map[kafka.common.TopicAndPartition,java.lang.Long],messageHandler:
> org.apache.spark.api.java.function.Function[kafka.message.MessageAndMetadata[String,String],String])org.apache.spark.streaming.api.java.JavaInputDStream[String]
> 
>
>   (ssc: org.apache.spark.streaming.StreamingContext,kafkaParams:
> scala.collection.immutable.Map[String,String],fromOffsets:
> scala.collection.immutable.Map[kafka.common.TopicAndPartition,scala.Long],messageHandler:
> kafka.message.MessageAndMetadata[String,String] => String)(implicit
> evidence$14: scala.reflect.ClassTag[String], implicit evidence$15:
> scala.reflect.ClassTag[String], implicit evidence$16:
> scala.reflect.ClassTag[kafka.serializer.StringDecoder], implicit
> evidence$17: scala.reflect.ClassTag[kafka.serializer.StringDecoder],
> implicit evidence$18:
> scala.reflect.ClassTag[String])org.apache.spark.streaming.dstream.InputDStream[String]
>
>  cannot be applied to (org.apache.spark.streaming.StreamingContext,
> scala.collection.mutable.Map[String,String],
> scala.collection.mutable.Map[kafka.common.TopicAndPartition,scala.Long],
> kafka.message.MessageAndMetadata[String,String] => (String, String))
>
> val stream =  KafkaUtils.createDirectStream[String, String,
> StringDecoder, StringDecoder, String](
>
>^
>
> one error found
>


createDirectStream with offsets

2016-05-06 Thread Eric Friedman
Hello,

I've been using createDirectStream with Kafka and now need to switch to the
version of that API that lets me supply offsets for my topics.  I'm unable
to get this to compile for some reason, even if I lift the very same usage
from the Spark test suite.

I'm calling it like this:

val topic = "offset"

val topicPartition = TopicAndPartition(topic, 0)

val messageHandler = (mmd: MessageAndMetadata[String, String]) =>
(mmd.key, mmd.message)

val stream =  KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder, String](

ssc, kafkaParams, Map(topicPartition -> 11L), messageHandler)



Error:

MyCode.scala:97: overloaded method value createDirectStream with
alternatives:

  (jssc: org.apache.spark.streaming.api.java.JavaStreamingContext,keyClass:
Class[String],valueClass: Class[String],keyDecoderClass:
Class[kafka.serializer.StringDecoder],valueDecoderClass:
Class[kafka.serializer.StringDecoder],recordClass:
Class[String],kafkaParams: java.util.Map[String,String],fromOffsets:
java.util.Map[kafka.common.TopicAndPartition,java.lang.Long],messageHandler:
org.apache.spark.api.java.function.Function[kafka.message.MessageAndMetadata[String,String],String])org.apache.spark.streaming.api.java.JavaInputDStream[String]


  (ssc: org.apache.spark.streaming.StreamingContext,kafkaParams:
scala.collection.immutable.Map[String,String],fromOffsets:
scala.collection.immutable.Map[kafka.common.TopicAndPartition,scala.Long],messageHandler:
kafka.message.MessageAndMetadata[String,String] => String)(implicit
evidence$14: scala.reflect.ClassTag[String], implicit evidence$15:
scala.reflect.ClassTag[String], implicit evidence$16:
scala.reflect.ClassTag[kafka.serializer.StringDecoder], implicit
evidence$17: scala.reflect.ClassTag[kafka.serializer.StringDecoder],
implicit evidence$18:
scala.reflect.ClassTag[String])org.apache.spark.streaming.dstream.InputDStream[String]

 cannot be applied to (org.apache.spark.streaming.StreamingContext,
scala.collection.mutable.Map[String,String],
scala.collection.mutable.Map[kafka.common.TopicAndPartition,scala.Long],
kafka.message.MessageAndMetadata[String,String] => (String, String))

val stream =  KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder, String](

   ^

one error found