Re: createDirectStream with offsets
Thanks Cody. It turns out that there was an even simpler explanation (the flaw you pointed out was accurate too). I had mutable.Map instances being passed where KafkaUtils wants immutable ones. On Fri, May 6, 2016 at 8:32 AM, Cody Koeninger wrote: > Look carefully at the error message, the types you're passing in don't > match. For instance, you're passing in a message handler that returns > a tuple, but the rdd return type you're specifying (the 5th type > argument) is just String. > > On Fri, May 6, 2016 at 9:49 AM, Eric Friedman > wrote: > > My build dependencies: > > > > > > compile 'org.scala-lang:scala-library:2.10.4' > > > > compile 'org.apache.spark:spark-core_2.10:1.6.1' > > > > compile 'org.apache.spark:spark-sql_2.10:1.6.1' > > > > compile 'org.apache.spark:spark-hive_2.10:1.6.1' > > > > compile 'org.apache.spark:spark-streaming_2.10:1.6.1' > > > > compile 'com.databricks:spark-avro_2.10:2.0.1' > > > > > > compile 'org.apache.spark:spark-streaming-kafka_2.10:1.6.1' > > > > compile 'org.apache.kafka:kafka-clients:0.8.2.1' > > > > compile 'org.apache.kafka:kafka_2.10:0.8.2.1' > > > > compile 'com.yammer.metrics:metrics-core:2.2.0' > > > > > > On Fri, May 6, 2016 at 7:47 AM, Eric Friedman > > > wrote: > >> > >> Hello, > >> > >> I've been using createDirectStream with Kafka and now need to switch to > >> the version of that API that lets me supply offsets for my topics. I'm > >> unable to get this to compile for some reason, even if I lift the very > same > >> usage from the Spark test suite. > >> > >> I'm calling it like this: > >> > >> val topic = "offset" > >> > >> val topicPartition = TopicAndPartition(topic, 0) > >> > >> val messageHandler = (mmd: MessageAndMetadata[String, String]) => > >> (mmd.key, mmd.message) > >> > >> val stream = KafkaUtils.createDirectStream[String, String, > >> StringDecoder, StringDecoder, String]( > >> > >> ssc, kafkaParams, Map(topicPartition -> 11L), messageHandler) > >> > >> > >> > >> > >> Error: > >> > >> MyCode.scala:97: overloaded method value createDirectStream with > >> alternatives: > >> > >> (jssc: > >> org.apache.spark.streaming.api.java.JavaStreamingContext,keyClass: > >> Class[String],valueClass: Class[String],keyDecoderClass: > >> Class[kafka.serializer.StringDecoder],valueDecoderClass: > >> Class[kafka.serializer.StringDecoder],recordClass: > >> Class[String],kafkaParams: java.util.Map[String,String],fromOffsets: > >> > java.util.Map[kafka.common.TopicAndPartition,java.lang.Long],messageHandler: > >> > org.apache.spark.api.java.function.Function[kafka.message.MessageAndMetadata[String,String],String])org.apache.spark.streaming.api.java.JavaInputDStream[String] > >> > >> > >> (ssc: org.apache.spark.streaming.StreamingContext,kafkaParams: > >> scala.collection.immutable.Map[String,String],fromOffsets: > >> > scala.collection.immutable.Map[kafka.common.TopicAndPartition,scala.Long],messageHandler: > >> kafka.message.MessageAndMetadata[String,String] => String)(implicit > >> evidence$14: scala.reflect.ClassTag[String], implicit evidence$15: > >> scala.reflect.ClassTag[String], implicit evidence$16: > >> scala.reflect.ClassTag[kafka.serializer.StringDecoder], implicit > >> evidence$17: scala.reflect.ClassTag[kafka.serializer.StringDecoder], > >> implicit evidence$18: > >> > scala.reflect.ClassTag[String])org.apache.spark.streaming.dstream.InputDStream[String] > >> > >> cannot be applied to (org.apache.spark.streaming.StreamingContext, > >> scala.collection.mutable.Map[String,String], > >> scala.collection.mutable.Map[kafka.common.TopicAndPartition,scala.Long], > >> kafka.message.MessageAndMetadata[String,String] => (String, String)) > >> > >> val stream = KafkaUtils.createDirectStream[String, String, > >> StringDecoder, StringDecoder, String]( > >> > >>^ > >> > >> one error found > > > > >
Re: createDirectStream with offsets
Look carefully at the error message, the types you're passing in don't match. For instance, you're passing in a message handler that returns a tuple, but the rdd return type you're specifying (the 5th type argument) is just String. On Fri, May 6, 2016 at 9:49 AM, Eric Friedman wrote: > My build dependencies: > > > compile 'org.scala-lang:scala-library:2.10.4' > > compile 'org.apache.spark:spark-core_2.10:1.6.1' > > compile 'org.apache.spark:spark-sql_2.10:1.6.1' > > compile 'org.apache.spark:spark-hive_2.10:1.6.1' > > compile 'org.apache.spark:spark-streaming_2.10:1.6.1' > > compile 'com.databricks:spark-avro_2.10:2.0.1' > > > compile 'org.apache.spark:spark-streaming-kafka_2.10:1.6.1' > > compile 'org.apache.kafka:kafka-clients:0.8.2.1' > > compile 'org.apache.kafka:kafka_2.10:0.8.2.1' > > compile 'com.yammer.metrics:metrics-core:2.2.0' > > > On Fri, May 6, 2016 at 7:47 AM, Eric Friedman > wrote: >> >> Hello, >> >> I've been using createDirectStream with Kafka and now need to switch to >> the version of that API that lets me supply offsets for my topics. I'm >> unable to get this to compile for some reason, even if I lift the very same >> usage from the Spark test suite. >> >> I'm calling it like this: >> >> val topic = "offset" >> >> val topicPartition = TopicAndPartition(topic, 0) >> >> val messageHandler = (mmd: MessageAndMetadata[String, String]) => >> (mmd.key, mmd.message) >> >> val stream = KafkaUtils.createDirectStream[String, String, >> StringDecoder, StringDecoder, String]( >> >> ssc, kafkaParams, Map(topicPartition -> 11L), messageHandler) >> >> >> >> >> Error: >> >> MyCode.scala:97: overloaded method value createDirectStream with >> alternatives: >> >> (jssc: >> org.apache.spark.streaming.api.java.JavaStreamingContext,keyClass: >> Class[String],valueClass: Class[String],keyDecoderClass: >> Class[kafka.serializer.StringDecoder],valueDecoderClass: >> Class[kafka.serializer.StringDecoder],recordClass: >> Class[String],kafkaParams: java.util.Map[String,String],fromOffsets: >> java.util.Map[kafka.common.TopicAndPartition,java.lang.Long],messageHandler: >> org.apache.spark.api.java.function.Function[kafka.message.MessageAndMetadata[String,String],String])org.apache.spark.streaming.api.java.JavaInputDStream[String] >> >> >> (ssc: org.apache.spark.streaming.StreamingContext,kafkaParams: >> scala.collection.immutable.Map[String,String],fromOffsets: >> scala.collection.immutable.Map[kafka.common.TopicAndPartition,scala.Long],messageHandler: >> kafka.message.MessageAndMetadata[String,String] => String)(implicit >> evidence$14: scala.reflect.ClassTag[String], implicit evidence$15: >> scala.reflect.ClassTag[String], implicit evidence$16: >> scala.reflect.ClassTag[kafka.serializer.StringDecoder], implicit >> evidence$17: scala.reflect.ClassTag[kafka.serializer.StringDecoder], >> implicit evidence$18: >> scala.reflect.ClassTag[String])org.apache.spark.streaming.dstream.InputDStream[String] >> >> cannot be applied to (org.apache.spark.streaming.StreamingContext, >> scala.collection.mutable.Map[String,String], >> scala.collection.mutable.Map[kafka.common.TopicAndPartition,scala.Long], >> kafka.message.MessageAndMetadata[String,String] => (String, String)) >> >> val stream = KafkaUtils.createDirectStream[String, String, >> StringDecoder, StringDecoder, String]( >> >>^ >> >> one error found > > - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: createDirectStream with offsets
My build dependencies: compile 'org.scala-lang:scala-library:2.10.4' compile 'org.apache.spark:spark-core_2.10:1.6.1' compile 'org.apache.spark:spark-sql_2.10:1.6.1' compile 'org.apache.spark:spark-hive_2.10:1.6.1' compile 'org.apache.spark:spark-streaming_2.10:1.6.1' compile 'com.databricks:spark-avro_2.10:2.0.1' compile 'org.apache.spark:spark-streaming-kafka_2.10:1.6.1' compile 'org.apache.kafka:kafka-clients:0.8.2.1' compile 'org.apache.kafka:kafka_2.10:0.8.2.1' compile 'com.yammer.metrics:metrics-core:2.2.0' On Fri, May 6, 2016 at 7:47 AM, Eric Friedman wrote: > Hello, > > I've been using createDirectStream with Kafka and now need to switch to > the version of that API that lets me supply offsets for my topics. I'm > unable to get this to compile for some reason, even if I lift the very same > usage from the Spark test suite. > > I'm calling it like this: > > val topic = "offset" > > val topicPartition = TopicAndPartition(topic, 0) > > val messageHandler = (mmd: MessageAndMetadata[String, String]) => > (mmd.key, mmd.message) > > val stream = KafkaUtils.createDirectStream[String, String, > StringDecoder, StringDecoder, String]( > > ssc, kafkaParams, Map(topicPartition -> 11L), messageHandler) > > > > Error: > > MyCode.scala:97: overloaded method value createDirectStream with > alternatives: > > (jssc: > org.apache.spark.streaming.api.java.JavaStreamingContext,keyClass: > Class[String],valueClass: Class[String],keyDecoderClass: > Class[kafka.serializer.StringDecoder],valueDecoderClass: > Class[kafka.serializer.StringDecoder],recordClass: > Class[String],kafkaParams: java.util.Map[String,String],fromOffsets: > java.util.Map[kafka.common.TopicAndPartition,java.lang.Long],messageHandler: > org.apache.spark.api.java.function.Function[kafka.message.MessageAndMetadata[String,String],String])org.apache.spark.streaming.api.java.JavaInputDStream[String] > > > (ssc: org.apache.spark.streaming.StreamingContext,kafkaParams: > scala.collection.immutable.Map[String,String],fromOffsets: > scala.collection.immutable.Map[kafka.common.TopicAndPartition,scala.Long],messageHandler: > kafka.message.MessageAndMetadata[String,String] => String)(implicit > evidence$14: scala.reflect.ClassTag[String], implicit evidence$15: > scala.reflect.ClassTag[String], implicit evidence$16: > scala.reflect.ClassTag[kafka.serializer.StringDecoder], implicit > evidence$17: scala.reflect.ClassTag[kafka.serializer.StringDecoder], > implicit evidence$18: > scala.reflect.ClassTag[String])org.apache.spark.streaming.dstream.InputDStream[String] > > cannot be applied to (org.apache.spark.streaming.StreamingContext, > scala.collection.mutable.Map[String,String], > scala.collection.mutable.Map[kafka.common.TopicAndPartition,scala.Long], > kafka.message.MessageAndMetadata[String,String] => (String, String)) > > val stream = KafkaUtils.createDirectStream[String, String, > StringDecoder, StringDecoder, String]( > >^ > > one error found >
createDirectStream with offsets
Hello, I've been using createDirectStream with Kafka and now need to switch to the version of that API that lets me supply offsets for my topics. I'm unable to get this to compile for some reason, even if I lift the very same usage from the Spark test suite. I'm calling it like this: val topic = "offset" val topicPartition = TopicAndPartition(topic, 0) val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key, mmd.message) val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, String]( ssc, kafkaParams, Map(topicPartition -> 11L), messageHandler) Error: MyCode.scala:97: overloaded method value createDirectStream with alternatives: (jssc: org.apache.spark.streaming.api.java.JavaStreamingContext,keyClass: Class[String],valueClass: Class[String],keyDecoderClass: Class[kafka.serializer.StringDecoder],valueDecoderClass: Class[kafka.serializer.StringDecoder],recordClass: Class[String],kafkaParams: java.util.Map[String,String],fromOffsets: java.util.Map[kafka.common.TopicAndPartition,java.lang.Long],messageHandler: org.apache.spark.api.java.function.Function[kafka.message.MessageAndMetadata[String,String],String])org.apache.spark.streaming.api.java.JavaInputDStream[String] (ssc: org.apache.spark.streaming.StreamingContext,kafkaParams: scala.collection.immutable.Map[String,String],fromOffsets: scala.collection.immutable.Map[kafka.common.TopicAndPartition,scala.Long],messageHandler: kafka.message.MessageAndMetadata[String,String] => String)(implicit evidence$14: scala.reflect.ClassTag[String], implicit evidence$15: scala.reflect.ClassTag[String], implicit evidence$16: scala.reflect.ClassTag[kafka.serializer.StringDecoder], implicit evidence$17: scala.reflect.ClassTag[kafka.serializer.StringDecoder], implicit evidence$18: scala.reflect.ClassTag[String])org.apache.spark.streaming.dstream.InputDStream[String] cannot be applied to (org.apache.spark.streaming.StreamingContext, scala.collection.mutable.Map[String,String], scala.collection.mutable.Map[kafka.common.TopicAndPartition,scala.Long], kafka.message.MessageAndMetadata[String,String] => (String, String)) val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, String]( ^ one error found