Is it that in scala its allowed for derived class to have any return type ?

 And streaming jar is originally created in scala so its allowed for
DirectKafkaInputDStream  to return Option[KafkaRDD[K, V, U, T, R]]
compute method ?

On Tue, Aug 18, 2015 at 8:36 PM, Shushant Arora <shushantaror...@gmail.com>
wrote:

> looking at source code of
> org.apache.spark.streaming.kafka.DirectKafkaInputDStream
>
> override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] = {
>     val untilOffsets = clamp(latestLeaderOffsets(maxRetries))
>     val rdd = KafkaRDD[K, V, U, T, R](
>       context.sparkContext, kafkaParams, currentOffsets, untilOffsets,
> messageHandler)
>
>     currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset)
>     Some(rdd)
>   }
>
>
> But in DStream its def compute (validTime: Time): Option[RDD[T]]  ,
>
> So what should  be the return type of custom DStream extends
> DirectKafkaInputDStream .
> Since I want the behaviour to be same as of DirectKafkaInputDStream  in
> normal scenarios and return none in specific scenario.
>
> And why the same error did not come while extending
> DirectKafkaInputDStream from InputDStream ? Since new return type 
> Option[KafkaRDD[K,
> V, U, T, R]] is not subclass of Option[RDD[T] so it should have been
> failed?
>
>
>
>
> On Tue, Aug 18, 2015 at 7:28 PM, Cody Koeninger <c...@koeninger.org>
> wrote:
>
>> The superclass method in DStream is defined as returning an Option[RDD[T]]
>>
>> On Tue, Aug 18, 2015 at 8:07 AM, Shushant Arora <
>> shushantaror...@gmail.com> wrote:
>>
>>> Getting compilation error while overriding compute method of
>>> DirectKafkaInputDStream.
>>>
>>>
>>> [ERROR] CustomDirectKafkaInputDstream.java:[51,83]
>>> compute(org.apache.spark.streaming.Time) in CustomDirectKafkaInputDstream
>>> cannot override compute(org.apache.spark.streaming.Time) in
>>> org.apache.spark.streaming.dstream.DStream; attempting to use incompatible
>>> return type
>>>
>>> [ERROR] found   :
>>> scala.Option<org.apache.spark.streaming.kafka.KafkaRDD<byte[],byte[],kafka.serializer.DefaultDecoder,kafka.serializer.DefaultDecoder,byte[][]>>
>>>
>>> [ERROR] required: scala.Option<org.apache.spark.rdd.RDD<byte[][]>>
>>>
>>>
>>> class :
>>>
>>> public class CustomDirectKafkaInputDstream extends
>>> DirectKafkaInputDStream<byte[], byte[], kafka.serializer.DefaultDecoder,
>>> kafka.serializer.DefaultDecoder, byte[][]>{
>>>
>>> @Override
>>> public Option<KafkaRDD<byte[], byte[], DefaultDecoder, DefaultDecoder,
>>> byte[][]>> compute(
>>> Time validTime) {
>>>
>>> int processed=processedCounter.value();
>>> int failed = failedProcessingsCounter.value();
>>> if((processed==failed)){
>>> System.out.println("backing off since its 100 % failure");
>>> return Option.empty();
>>> }else{
>>> System.out.println("starting the stream ");
>>>
>>> return super.compute(validTime);
>>> }
>>> }
>>> }
>>>
>>>
>>> What should be the return type of compute method ? super class is
>>> returning Option<KafkaRDD<byte[], byte[], DefaultDecoder, DefaultDecoder,
>>> byte[][]>>  but its expecting
>>>  scala.Option<org.apache.spark.rdd.RDD<byte[][]>> from derived  class . Is
>>> there something wring with code?
>>>
>>> On Mon, Aug 17, 2015 at 7:08 PM, Cody Koeninger <c...@koeninger.org>
>>> wrote:
>>>
>>>> Look at the definitions of the java-specific
>>>> KafkaUtils.createDirectStream methods (the ones that take a
>>>> JavaStreamingContext)
>>>>
>>>> On Mon, Aug 17, 2015 at 5:13 AM, Shushant Arora <
>>>> shushantaror...@gmail.com> wrote:
>>>>
>>>>> How to create classtag in java ?Also Constructor
>>>>> of DirectKafkaInputDStream takes Function1 not Function but
>>>>> kafkautils.createDirectStream allows function.
>>>>>
>>>>> I have below as overriden DirectKafkaInputDStream.
>>>>>
>>>>>
>>>>> public class CustomDirectKafkaInputDstream extends
>>>>> DirectKafkaInputDStream<byte[], byte[], kafka.serializer.DefaultDecoder,
>>>>> kafka.serializer.DefaultDecoder, byte[][]>{
>>>>>
>>>>> public CustomDirectKafkaInputDstream(
>>>>> StreamingContext ssc_,
>>>>> Map<String, String> kafkaParams,
>>>>> Map<TopicAndPartition, Object> fromOffsets,
>>>>> Function1<MessageAndMetadata<byte[], byte[]>, byte[][]> messageHandler,
>>>>> ClassTag<byte[]> evidence$1, ClassTag<byte[]> evidence$2,
>>>>> ClassTag<DefaultDecoder> evidence$3,
>>>>> ClassTag<DefaultDecoder> evidence$4, ClassTag<byte[][]> evidence$5) {
>>>>> super(ssc_, kafkaParams, fromOffsets, messageHandler, evidence$1,
>>>>> evidence$2,
>>>>> evidence$3, evidence$4, evidence$5);
>>>>> }
>>>>> @Override
>>>>> public Option<KafkaRDD<byte[], byte[], DefaultDecoder, DefaultDecoder,
>>>>> byte[][]>> compute(
>>>>> Time validTime) {
>>>>> int processe=processedCounter.value();
>>>>> int failed = failedProcessingsCounter.value();
>>>>> if((processed==failed)){
>>>>> System.out.println("backing off since its 100 % failure");
>>>>> return Option.empty();
>>>>> }else{
>>>>> System.out.println("starting the stream ");
>>>>>
>>>>> return super.compute(validTime);
>>>>> }
>>>>> }
>>>>>
>>>>>
>>>>>
>>>>> To create this stream
>>>>> I am using
>>>>> scala.collection.immutable.Map<String, String> scalakafkaParams =
>>>>> JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala().toMap(Predef.<Tuple2<String,
>>>>> String>>conforms());
>>>>> scala.collection.immutable.Map<TopicAndPartition, Long>
>>>>> scalaktopicOffsetMap=
>>>>> JavaConverters.mapAsScalaMapConverter(topicOffsetMap).asScala().toMap(Predef.<Tuple2<TopicAndPartition,
>>>>> Long>>conforms());
>>>>>
>>>>> scala.Function1<MessageAndMetadata<byte[], byte[]>, byte[][]> handler
>>>>> = new Function<MessageAndMetadata<byte[], byte[]>, byte[][]>() {
>>>>>         ..});
>>>>> JavaDStream<byte[][]> directKafkaStream = new
>>>>> CustomDirectKafkaInputDstream(jssc,scalakafkaParams ,scalaktopicOffsetMap,
>>>>> handler,byte[].class,byte[].class, kafka.serializer.DefaultDecoder.class,
>>>>> kafka.serializer.DefaultDecoder.class,byte[][].class);
>>>>>
>>>>>
>>>>>
>>>>> How to pass classTag to constructor in CustomDirectKafkaInputDstream ?
>>>>> And how to use Function instead of Function1 ?
>>>>>
>>>>>
>>>>>
>>>>> On Thu, Aug 13, 2015 at 12:16 AM, Cody Koeninger <c...@koeninger.org>
>>>>> wrote:
>>>>>
>>>>>> I'm not aware of an existing api per se, but you could create your
>>>>>> own subclass of the DStream that returns None for compute() under certain
>>>>>> conditions.
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, Aug 12, 2015 at 1:03 PM, Shushant Arora <
>>>>>> shushantaror...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Cody
>>>>>>>
>>>>>>> Can you help here if streaming 1.3 has any api for not consuming any
>>>>>>> message in next few runs?
>>>>>>>
>>>>>>> Thanks
>>>>>>>
>>>>>>> ---------- Forwarded message ----------
>>>>>>> From: Shushant Arora <shushantaror...@gmail.com>
>>>>>>> Date: Wed, Aug 12, 2015 at 11:23 PM
>>>>>>> Subject: spark streaming 1.3 doubts(force it to not consume anything)
>>>>>>> To: user <user@spark.apache.org>
>>>>>>>
>>>>>>>
>>>>>>> I Can't make my stream application batch interval to change at run
>>>>>>> time . Its always fixed and it always creates jobs at specified batch
>>>>>>> inetval and enqueue them if earleir batch is not finished.
>>>>>>>
>>>>>>> My requirement is to process the events and post them to some
>>>>>>> external server and if external server is down I want to increase the 
>>>>>>> batch
>>>>>>> time - that is not possible but can I make it not to consume any 
>>>>>>> messages
>>>>>>> in say next 5 successive runs ?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to