Is it that in scala its allowed for derived class to have any return type ?
And streaming jar is originally created in scala so its allowed for DirectKafkaInputDStream to return Option[KafkaRDD[K, V, U, T, R]] compute method ? On Tue, Aug 18, 2015 at 8:36 PM, Shushant Arora <shushantaror...@gmail.com> wrote: > looking at source code of > org.apache.spark.streaming.kafka.DirectKafkaInputDStream > > override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] = { > val untilOffsets = clamp(latestLeaderOffsets(maxRetries)) > val rdd = KafkaRDD[K, V, U, T, R]( > context.sparkContext, kafkaParams, currentOffsets, untilOffsets, > messageHandler) > > currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset) > Some(rdd) > } > > > But in DStream its def compute (validTime: Time): Option[RDD[T]] , > > So what should be the return type of custom DStream extends > DirectKafkaInputDStream . > Since I want the behaviour to be same as of DirectKafkaInputDStream in > normal scenarios and return none in specific scenario. > > And why the same error did not come while extending > DirectKafkaInputDStream from InputDStream ? Since new return type > Option[KafkaRDD[K, > V, U, T, R]] is not subclass of Option[RDD[T] so it should have been > failed? > > > > > On Tue, Aug 18, 2015 at 7:28 PM, Cody Koeninger <c...@koeninger.org> > wrote: > >> The superclass method in DStream is defined as returning an Option[RDD[T]] >> >> On Tue, Aug 18, 2015 at 8:07 AM, Shushant Arora < >> shushantaror...@gmail.com> wrote: >> >>> Getting compilation error while overriding compute method of >>> DirectKafkaInputDStream. >>> >>> >>> [ERROR] CustomDirectKafkaInputDstream.java:[51,83] >>> compute(org.apache.spark.streaming.Time) in CustomDirectKafkaInputDstream >>> cannot override compute(org.apache.spark.streaming.Time) in >>> org.apache.spark.streaming.dstream.DStream; attempting to use incompatible >>> return type >>> >>> [ERROR] found : >>> scala.Option<org.apache.spark.streaming.kafka.KafkaRDD<byte[],byte[],kafka.serializer.DefaultDecoder,kafka.serializer.DefaultDecoder,byte[][]>> >>> >>> [ERROR] required: scala.Option<org.apache.spark.rdd.RDD<byte[][]>> >>> >>> >>> class : >>> >>> public class CustomDirectKafkaInputDstream extends >>> DirectKafkaInputDStream<byte[], byte[], kafka.serializer.DefaultDecoder, >>> kafka.serializer.DefaultDecoder, byte[][]>{ >>> >>> @Override >>> public Option<KafkaRDD<byte[], byte[], DefaultDecoder, DefaultDecoder, >>> byte[][]>> compute( >>> Time validTime) { >>> >>> int processed=processedCounter.value(); >>> int failed = failedProcessingsCounter.value(); >>> if((processed==failed)){ >>> System.out.println("backing off since its 100 % failure"); >>> return Option.empty(); >>> }else{ >>> System.out.println("starting the stream "); >>> >>> return super.compute(validTime); >>> } >>> } >>> } >>> >>> >>> What should be the return type of compute method ? super class is >>> returning Option<KafkaRDD<byte[], byte[], DefaultDecoder, DefaultDecoder, >>> byte[][]>> but its expecting >>> scala.Option<org.apache.spark.rdd.RDD<byte[][]>> from derived class . Is >>> there something wring with code? >>> >>> On Mon, Aug 17, 2015 at 7:08 PM, Cody Koeninger <c...@koeninger.org> >>> wrote: >>> >>>> Look at the definitions of the java-specific >>>> KafkaUtils.createDirectStream methods (the ones that take a >>>> JavaStreamingContext) >>>> >>>> On Mon, Aug 17, 2015 at 5:13 AM, Shushant Arora < >>>> shushantaror...@gmail.com> wrote: >>>> >>>>> How to create classtag in java ?Also Constructor >>>>> of DirectKafkaInputDStream takes Function1 not Function but >>>>> kafkautils.createDirectStream allows function. >>>>> >>>>> I have below as overriden DirectKafkaInputDStream. >>>>> >>>>> >>>>> public class CustomDirectKafkaInputDstream extends >>>>> DirectKafkaInputDStream<byte[], byte[], kafka.serializer.DefaultDecoder, >>>>> kafka.serializer.DefaultDecoder, byte[][]>{ >>>>> >>>>> public CustomDirectKafkaInputDstream( >>>>> StreamingContext ssc_, >>>>> Map<String, String> kafkaParams, >>>>> Map<TopicAndPartition, Object> fromOffsets, >>>>> Function1<MessageAndMetadata<byte[], byte[]>, byte[][]> messageHandler, >>>>> ClassTag<byte[]> evidence$1, ClassTag<byte[]> evidence$2, >>>>> ClassTag<DefaultDecoder> evidence$3, >>>>> ClassTag<DefaultDecoder> evidence$4, ClassTag<byte[][]> evidence$5) { >>>>> super(ssc_, kafkaParams, fromOffsets, messageHandler, evidence$1, >>>>> evidence$2, >>>>> evidence$3, evidence$4, evidence$5); >>>>> } >>>>> @Override >>>>> public Option<KafkaRDD<byte[], byte[], DefaultDecoder, DefaultDecoder, >>>>> byte[][]>> compute( >>>>> Time validTime) { >>>>> int processe=processedCounter.value(); >>>>> int failed = failedProcessingsCounter.value(); >>>>> if((processed==failed)){ >>>>> System.out.println("backing off since its 100 % failure"); >>>>> return Option.empty(); >>>>> }else{ >>>>> System.out.println("starting the stream "); >>>>> >>>>> return super.compute(validTime); >>>>> } >>>>> } >>>>> >>>>> >>>>> >>>>> To create this stream >>>>> I am using >>>>> scala.collection.immutable.Map<String, String> scalakafkaParams = >>>>> JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala().toMap(Predef.<Tuple2<String, >>>>> String>>conforms()); >>>>> scala.collection.immutable.Map<TopicAndPartition, Long> >>>>> scalaktopicOffsetMap= >>>>> JavaConverters.mapAsScalaMapConverter(topicOffsetMap).asScala().toMap(Predef.<Tuple2<TopicAndPartition, >>>>> Long>>conforms()); >>>>> >>>>> scala.Function1<MessageAndMetadata<byte[], byte[]>, byte[][]> handler >>>>> = new Function<MessageAndMetadata<byte[], byte[]>, byte[][]>() { >>>>> ..}); >>>>> JavaDStream<byte[][]> directKafkaStream = new >>>>> CustomDirectKafkaInputDstream(jssc,scalakafkaParams ,scalaktopicOffsetMap, >>>>> handler,byte[].class,byte[].class, kafka.serializer.DefaultDecoder.class, >>>>> kafka.serializer.DefaultDecoder.class,byte[][].class); >>>>> >>>>> >>>>> >>>>> How to pass classTag to constructor in CustomDirectKafkaInputDstream ? >>>>> And how to use Function instead of Function1 ? >>>>> >>>>> >>>>> >>>>> On Thu, Aug 13, 2015 at 12:16 AM, Cody Koeninger <c...@koeninger.org> >>>>> wrote: >>>>> >>>>>> I'm not aware of an existing api per se, but you could create your >>>>>> own subclass of the DStream that returns None for compute() under certain >>>>>> conditions. >>>>>> >>>>>> >>>>>> >>>>>> On Wed, Aug 12, 2015 at 1:03 PM, Shushant Arora < >>>>>> shushantaror...@gmail.com> wrote: >>>>>> >>>>>>> Hi Cody >>>>>>> >>>>>>> Can you help here if streaming 1.3 has any api for not consuming any >>>>>>> message in next few runs? >>>>>>> >>>>>>> Thanks >>>>>>> >>>>>>> ---------- Forwarded message ---------- >>>>>>> From: Shushant Arora <shushantaror...@gmail.com> >>>>>>> Date: Wed, Aug 12, 2015 at 11:23 PM >>>>>>> Subject: spark streaming 1.3 doubts(force it to not consume anything) >>>>>>> To: user <user@spark.apache.org> >>>>>>> >>>>>>> >>>>>>> I Can't make my stream application batch interval to change at run >>>>>>> time . Its always fixed and it always creates jobs at specified batch >>>>>>> inetval and enqueue them if earleir batch is not finished. >>>>>>> >>>>>>> My requirement is to process the events and post them to some >>>>>>> external server and if external server is down I want to increase the >>>>>>> batch >>>>>>> time - that is not possible but can I make it not to consume any >>>>>>> messages >>>>>>> in say next 5 successive runs ? >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >