Re: correct and fast way to stop streaming application

2015-10-27 Thread Krot Viacheslav
Any ideas? This is so important because we use kafka direct streaming and
save processed offsets manually as last step in the job, so we archive
at-least-once.
But see what happens when new batch is scheduled after a job fails:
- suppose we start from offset 10 loaded from zookeeper
- job starts with offsets 10-20
- job fails N times, awaitTermination notices that and stops context (or
even jvm with System.exit), but Scheduler has already started new job, it
is job for offsets 20-30, and sent it to executor.
- executor does all the steps (if there is only one stage) and saves offset
30 to zookeeper.

This way I loose data in offsets 10-20

How should this be handled correctly?

пн, 26 окт. 2015 г. в 18:37, varun sharma <varunsharman...@gmail.com>:

> +1, wanted to do same.
>
> On Mon, Oct 26, 2015 at 8:58 PM, Krot Viacheslav <
> krot.vyaches...@gmail.com> wrote:
>
>> Hi all,
>>
>> I wonder what is the correct way to stop streaming application if some
>> job failed?
>> What I have now:
>>
>> val ssc = new StreamingContext
>> 
>> ssc.start()
>> try {
>>ssc.awaitTermination()
>> } catch {
>>case e => ssc.stop(stopSparkContext = true, stopGracefully = false)
>> }
>>
>> It works but one problem still exists - after job failed and before
>> streaming context is stopped it manages to start job for next batch. That
>> is not desirable for me.
>> It works like this because JobScheduler is an actor and after it reports
>> error, it goes on with next message that starts next batch job. While
>> ssc.awaitTermination() works in another thread and happens after next batch
>> starts.
>>
>> Is there a way to stop before next job is submitted?
>>
>
>
>
> --
> *VARUN SHARMA*
> *Flipkart*
> *Bangalore*
>


Re: correct and fast way to stop streaming application

2015-10-27 Thread Krot Viacheslav
Actually a great idea, I even didn't think about that. Thanks a lot!

вт, 27 окт. 2015 г. в 17:29, varun sharma <varunsharman...@gmail.com>:

> One more thing we can try is before committing offset we can verify the
> latest offset of that partition(in zookeeper) with fromOffset in
> OffsetRange.
> Just a thought...
>
> Let me know if it works..
>
> On Tue, Oct 27, 2015 at 9:00 PM, Cody Koeninger <c...@koeninger.org>
> wrote:
>
>> If you want to make sure that your offsets are increasing without gaps...
>> one way to do that is to enforce that invariant when you're saving to your
>> database.  That would probably mean using a real database instead of
>> zookeeper though.
>>
>> On Tue, Oct 27, 2015 at 4:13 AM, Krot Viacheslav <
>> krot.vyaches...@gmail.com> wrote:
>>
>>> Any ideas? This is so important because we use kafka direct streaming
>>> and save processed offsets manually as last step in the job, so we archive
>>> at-least-once.
>>> But see what happens when new batch is scheduled after a job fails:
>>> - suppose we start from offset 10 loaded from zookeeper
>>> - job starts with offsets 10-20
>>> - job fails N times, awaitTermination notices that and stops context (or
>>> even jvm with System.exit), but Scheduler has already started new job, it
>>> is job for offsets 20-30, and sent it to executor.
>>> - executor does all the steps (if there is only one stage) and saves
>>> offset 30 to zookeeper.
>>>
>>> This way I loose data in offsets 10-20
>>>
>>> How should this be handled correctly?
>>>
>>> пн, 26 окт. 2015 г. в 18:37, varun sharma <varunsharman...@gmail.com>:
>>>
>>>> +1, wanted to do same.
>>>>
>>>> On Mon, Oct 26, 2015 at 8:58 PM, Krot Viacheslav <
>>>> krot.vyaches...@gmail.com> wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>> I wonder what is the correct way to stop streaming application if some
>>>>> job failed?
>>>>> What I have now:
>>>>>
>>>>> val ssc = new StreamingContext
>>>>> 
>>>>> ssc.start()
>>>>> try {
>>>>>ssc.awaitTermination()
>>>>> } catch {
>>>>>case e => ssc.stop(stopSparkContext = true, stopGracefully = false)
>>>>> }
>>>>>
>>>>> It works but one problem still exists - after job failed and before
>>>>> streaming context is stopped it manages to start job for next batch. That
>>>>> is not desirable for me.
>>>>> It works like this because JobScheduler is an actor and after it
>>>>> reports error, it goes on with next message that starts next batch job.
>>>>> While ssc.awaitTermination() works in another thread and happens after 
>>>>> next
>>>>> batch starts.
>>>>>
>>>>> Is there a way to stop before next job is submitted?
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> *VARUN SHARMA*
>>>> *Flipkart*
>>>> *Bangalore*
>>>>
>>>
>>
>
>
> --
> *VARUN SHARMA*
> *Flipkart*
> *Bangalore*
>


correct and fast way to stop streaming application

2015-10-26 Thread Krot Viacheslav
Hi all,

I wonder what is the correct way to stop streaming application if some job
failed?
What I have now:

val ssc = new StreamingContext

ssc.start()
try {
   ssc.awaitTermination()
} catch {
   case e => ssc.stop(stopSparkContext = true, stopGracefully = false)
}

It works but one problem still exists - after job failed and before
streaming context is stopped it manages to start job for next batch. That
is not desirable for me.
It works like this because JobScheduler is an actor and after it reports
error, it goes on with next message that starts next batch job. While
ssc.awaitTermination() works in another thread and happens after next batch
starts.

Is there a way to stop before next job is submitted?


stop streaming context of job failure

2015-06-16 Thread Krot Viacheslav
Hi all,

Is there a way to stop streaming context when some batch processing failed?
I want to set reasonable reties count, say 10, and if failed - stop context
completely.
Is that possible?


Re: updateStateByKey and kafka direct approach without shuffle

2015-06-02 Thread Krot Viacheslav
Thanks, this works.
Hopefully I didn't miss something important with this approach.

вт, 2 июня 2015 г. в 20:15, Cody Koeninger c...@koeninger.org:

 If you're using the spark partition id directly as the key, then you don't
 need to access offset ranges at all, right?
 You can create a single instance of a partitioner in advance, and all it
 needs to know is the number of partitions (which is just the count of all
 the kafka topic/partitions).

 On Tue, Jun 2, 2015 at 12:40 PM, Krot Viacheslav 
 krot.vyaches...@gmail.com wrote:

 Cody,

 Thanks, good point. I fixed getting partition id to:

 class MyPartitioner(offsetRanges: Array[OffsetRange]) extends Partitioner
 {
   override def numPartitions: Int = offsetRanges.size

   override def getPartition(key: Any): Int = {
 // this is set in .map(m = (TaskContext.get().partitionId(),
 m.value))
 key.asInstanceOf[Int]
   }
 }

 inputStream
 .map(m = (TaskContext.get().partitionId(), m.value))
 .transform { rdd =
 val part = new
 MyPartitioner(rdd.asInstanceOf[HasOffsetRanges].offsetRanges)
 new ProxyRDDWithPartitioner(rdd, part)
 }
 ...

 But how can I create same partitioner during updateStateByKey call?   I
 have no idea how to access rdd when calling updateStateByKey.

 вт, 2 июня 2015 г. в 19:15, Cody Koeninger c...@koeninger.org:

 I think the general idea is worth pursuing.

 However, this line:

  override def getPartition(key: Any): Int = {
 key.asInstanceOf[(String, Int)]._2
   }

 is using the kafka partition id, not the spark partition index, so it's
 going to give you fewer partitions / incorrect index

 Cast the rdd to HasOffsetRanges, get the offsetRanges from it.  The
 index into the offset range array matches the (spark) partition id.  That
 will also tell you what the value of numPartitions should be.







 On Tue, Jun 2, 2015 at 11:50 AM, Krot Viacheslav 
 krot.vyaches...@gmail.com wrote:

 Hi all,
 In my streaming job I'm using kafka streaming direct approach and want
 to maintain state with updateStateByKey. My PairRDD has message's topic
 name + partition id as a key. So, I assume that updateByState could work
 within same partition as KafkaRDD and not lead to shuffles. Actually this
 is not true, because updateStateByKey leads to cogroup transformation that
 thinks, that state rdd and kafka rdd are not co-partitioned, as kafka rdd
 does not have partitioner at all. So, dependency is considered to be wide
 and leads to shuffle.

 I tried to avoid shuffling by providing custom partitioner to
 updateStateByKey, but KafkaRDD need to use same partitioner. For this I
 created a proxy RDD that just returns my partitioner.

 class ProxyRDDWithPartitioner[T: ClassTag](prev: RDD[T], part:
 Partitioner) extends RDD[T](prev) {

   override val partitioner = Some(part)

   override def compute(split: Partition, context: TaskContext):
 Iterator[T] = prev.compute(split, context)

   override protected def getPartitions: Array[Partition] =
 prev.partitions

   override def getPreferredLocations(thePart: Partition): Seq[String] =
 prev.preferredLocations(thePart)
 }

 I use it as:
 val partitioner = new Partitioner {
   // TODO this should be retrieved from kafka
   override def numPartitions: Int = 2

   override def getPartition(key: Any): Int = {
 key.asInstanceOf[(String, Int)]._2
   }
 }

 inputStream
   .map(m = ((m.topic, m.partition), m.value))
   .transform(new ProxyRDDWithPartitioner(_, partitioner))
   .updateStateByKey(func, partitioner)
   

 The question is - is it safe to do such trick?






updateStateByKey and kafka direct approach without shuffle

2015-06-02 Thread Krot Viacheslav
Hi all,
In my streaming job I'm using kafka streaming direct approach and want to
maintain state with updateStateByKey. My PairRDD has message's topic name +
partition id as a key. So, I assume that updateByState could work within
same partition as KafkaRDD and not lead to shuffles. Actually this is not
true, because updateStateByKey leads to cogroup transformation that thinks,
that state rdd and kafka rdd are not co-partitioned, as kafka rdd does not
have partitioner at all. So, dependency is considered to be wide and leads
to shuffle.

I tried to avoid shuffling by providing custom partitioner to
updateStateByKey, but KafkaRDD need to use same partitioner. For this I
created a proxy RDD that just returns my partitioner.

class ProxyRDDWithPartitioner[T: ClassTag](prev: RDD[T], part: Partitioner)
extends RDD[T](prev) {

  override val partitioner = Some(part)

  override def compute(split: Partition, context: TaskContext): Iterator[T]
= prev.compute(split, context)

  override protected def getPartitions: Array[Partition] = prev.partitions

  override def getPreferredLocations(thePart: Partition): Seq[String] =
prev.preferredLocations(thePart)
}

I use it as:
val partitioner = new Partitioner {
  // TODO this should be retrieved from kafka
  override def numPartitions: Int = 2

  override def getPartition(key: Any): Int = {
key.asInstanceOf[(String, Int)]._2
  }
}

inputStream
  .map(m = ((m.topic, m.partition), m.value))
  .transform(new ProxyRDDWithPartitioner(_, partitioner))
  .updateStateByKey(func, partitioner)
  

The question is - is it safe to do such trick?


Re: updateStateByKey and kafka direct approach without shuffle

2015-06-02 Thread Krot Viacheslav
Cody,

Thanks, good point. I fixed getting partition id to:

class MyPartitioner(offsetRanges: Array[OffsetRange]) extends Partitioner {
  override def numPartitions: Int = offsetRanges.size

  override def getPartition(key: Any): Int = {
// this is set in .map(m = (TaskContext.get().partitionId(), m.value))
key.asInstanceOf[Int]
  }
}

inputStream
.map(m = (TaskContext.get().partitionId(), m.value))
.transform { rdd =
val part = new
MyPartitioner(rdd.asInstanceOf[HasOffsetRanges].offsetRanges)
new ProxyRDDWithPartitioner(rdd, part)
}
...

But how can I create same partitioner during updateStateByKey call?   I
have no idea how to access rdd when calling updateStateByKey.

вт, 2 июня 2015 г. в 19:15, Cody Koeninger c...@koeninger.org:

 I think the general idea is worth pursuing.

 However, this line:

  override def getPartition(key: Any): Int = {
 key.asInstanceOf[(String, Int)]._2
   }

 is using the kafka partition id, not the spark partition index, so it's
 going to give you fewer partitions / incorrect index

 Cast the rdd to HasOffsetRanges, get the offsetRanges from it.  The index
 into the offset range array matches the (spark) partition id.  That will
 also tell you what the value of numPartitions should be.







 On Tue, Jun 2, 2015 at 11:50 AM, Krot Viacheslav 
 krot.vyaches...@gmail.com wrote:

 Hi all,
 In my streaming job I'm using kafka streaming direct approach and want to
 maintain state with updateStateByKey. My PairRDD has message's topic name +
 partition id as a key. So, I assume that updateByState could work within
 same partition as KafkaRDD and not lead to shuffles. Actually this is not
 true, because updateStateByKey leads to cogroup transformation that thinks,
 that state rdd and kafka rdd are not co-partitioned, as kafka rdd does not
 have partitioner at all. So, dependency is considered to be wide and leads
 to shuffle.

 I tried to avoid shuffling by providing custom partitioner to
 updateStateByKey, but KafkaRDD need to use same partitioner. For this I
 created a proxy RDD that just returns my partitioner.

 class ProxyRDDWithPartitioner[T: ClassTag](prev: RDD[T], part:
 Partitioner) extends RDD[T](prev) {

   override val partitioner = Some(part)

   override def compute(split: Partition, context: TaskContext):
 Iterator[T] = prev.compute(split, context)

   override protected def getPartitions: Array[Partition] = prev.partitions

   override def getPreferredLocations(thePart: Partition): Seq[String] =
 prev.preferredLocations(thePart)
 }

 I use it as:
 val partitioner = new Partitioner {
   // TODO this should be retrieved from kafka
   override def numPartitions: Int = 2

   override def getPartition(key: Any): Int = {
 key.asInstanceOf[(String, Int)]._2
   }
 }

 inputStream
   .map(m = ((m.topic, m.partition), m.value))
   .transform(new ProxyRDDWithPartitioner(_, partitioner))
   .updateStateByKey(func, partitioner)
   

 The question is - is it safe to do such trick?