Re: Issue in spark batches

2015-10-21 Thread Tathagata Das
Unfortunately, you will have to write that code yourself.

TD

On Tue, Oct 20, 2015 at 11:28 PM, varun sharma 
wrote:

> Hi TD,
> Is there any way in spark  I can fail/retry batch in case of any
> exceptions or do I have to write code to explicitly keep on retrying?
> Also If some batch fail, I want to block further batches to be processed
> as it would create inconsistency in updation of zookeeper offsets and maybe
> kill the job itself after lets say 3 retries.
>
> Any pointers to achieve same are appreciated.
>
> On Wed, Oct 21, 2015 at 1:15 AM, Tathagata Das 
> wrote:
>
>> That is actually a bug in the UI that got fixed in 1.5.1. The batch is
>> actually completing with exception, the UI does not update correctly.
>>
>> On Tue, Oct 20, 2015 at 8:38 AM, varun sharma 
>> wrote:
>>
>>> Also, As you can see the timestamps in attached image. batches coming
>>> after the Cassandra server comes up(21:04) are processed and batches which
>>> are in hung state(21:03) never get processed.
>>> So, How do I fail those batches so that those can be processed again.
>>>
>>> On Tue, Oct 20, 2015 at 9:02 PM, varun sharma >> > wrote:
>>>
 Hi TD,
 Yes saveToCassandra throws exception. How do I fail that task
 explicitly if i catch any exceptions?.
 Right now that batch doesn't fail and remain in hung state. Is there
 any way I fail that batch so that it can be tried again.

 Thanks
 Varun

 On Tue, Oct 20, 2015 at 2:50 AM, Tathagata Das 
 wrote:

> If cassandra is down, does saveToCassandra throw an exception? If it
> does, you can catch that exception and write your own logic to retry 
> and/or
> no update. Once the foreachRDD function completes, that batch will be
> internally marked as completed.
>
> TD
>
> On Mon, Oct 19, 2015 at 5:48 AM, varun sharma <
> varunsharman...@gmail.com> wrote:
>
>> Hi,
>> I am facing this issue consistently in spark-cassandra-kafka *streaming
>> job.*
>> *Spark 1.4.0*
>> *cassandra connector 1.4.0-M3*
>> *Issue is:*
>>
>> I am reading data from *Kafka* using DirectStream, writing to
>> *Cassandra* after parsing the json and the subsequently updating the
>> offsets in *zookeeper*.
>> If Cassandra cluster is down, it throws exception but the batch which
>> arrives in that time window is not processed ever though the offsets are
>> updated in zookeeper.
>> It is resulting data loss.
>> Once the Cassandra cluster is up, this job process the data normally.
>> PFA the screenshots of hung batches and code.
>>
>> *Code:*
>>
>> data_rdd.foreachRDD(rdd=> {
>>   val stream = rdd
>> .map(x =>JsonUtility.deserialize(x))
>>   stream.saveToCassandra(CASSANDRA_KEY_SPACE, SIGNATURE_TABLE, 
>> StreamModel.getColumns)
>>
>>
>>   //commit the offsets once everything is done
>>   ZookeeperManager.updateOffsetsinZk(zkProperties, rdd)
>> })
>>
>> *I have even tried this variant:*
>>
>> data_rdd.foreachRDD(rdd=> {
>>   val stream = rdd
>> .map(x =>JsonUtility.deserialize(x))
>>   stream.saveToCassandra(CASSANDRA_KEY_SPACE, SIGNATURE_TABLE, 
>> StreamModel.getColumns)
>> })
>>
>> data_rdd.foreachRDD(rdd=> {
>>
>>   //commit the offsets once everything is done
>>
>>   ZookeeperManager.updateOffsetsinZk(zkProperties, rdd)
>>
>> }
>>
>> Exception when cassandra cluster is down:
>> [2015-10-19 12:49:20] [JobScheduler] [ERROR]
>> [org.apache.spark.streaming.scheduler.JobScheduler] - Error running job
>> streaming job 144523914 ms.3
>> java.io.IOException: Failed to open native connection to Cassandra at
>> {..}
>>
>> --
>> *VARUN SHARMA*
>> *Flipkart*
>> *Bangalore*
>>
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>
>


 --
 *VARUN SHARMA*
 *Flipkart*
 *Bangalore*

>>>
>>>
>>>
>>> --
>>> *VARUN SHARMA*
>>> *Flipkart*
>>> *Bangalore*
>>>
>>
>>
>
>
> --
> *VARUN SHARMA*
> *Flipkart*
> *Bangalore*
>


Re: Issue in spark batches

2015-10-21 Thread varun sharma
Hi TD,
Is there any way in spark  I can fail/retry batch in case of any exceptions
or do I have to write code to explicitly keep on retrying?
Also If some batch fail, I want to block further batches to be processed as
it would create inconsistency in updation of zookeeper offsets and maybe
kill the job itself after lets say 3 retries.

Any pointers to achieve same are appreciated.

On Wed, Oct 21, 2015 at 1:15 AM, Tathagata Das  wrote:

> That is actually a bug in the UI that got fixed in 1.5.1. The batch is
> actually completing with exception, the UI does not update correctly.
>
> On Tue, Oct 20, 2015 at 8:38 AM, varun sharma 
> wrote:
>
>> Also, As you can see the timestamps in attached image. batches coming
>> after the Cassandra server comes up(21:04) are processed and batches which
>> are in hung state(21:03) never get processed.
>> So, How do I fail those batches so that those can be processed again.
>>
>> On Tue, Oct 20, 2015 at 9:02 PM, varun sharma 
>> wrote:
>>
>>> Hi TD,
>>> Yes saveToCassandra throws exception. How do I fail that task explicitly
>>> if i catch any exceptions?.
>>> Right now that batch doesn't fail and remain in hung state. Is there any
>>> way I fail that batch so that it can be tried again.
>>>
>>> Thanks
>>> Varun
>>>
>>> On Tue, Oct 20, 2015 at 2:50 AM, Tathagata Das 
>>> wrote:
>>>
 If cassandra is down, does saveToCassandra throw an exception? If it
 does, you can catch that exception and write your own logic to retry and/or
 no update. Once the foreachRDD function completes, that batch will be
 internally marked as completed.

 TD

 On Mon, Oct 19, 2015 at 5:48 AM, varun sharma <
 varunsharman...@gmail.com> wrote:

> Hi,
> I am facing this issue consistently in spark-cassandra-kafka *streaming
> job.*
> *Spark 1.4.0*
> *cassandra connector 1.4.0-M3*
> *Issue is:*
>
> I am reading data from *Kafka* using DirectStream, writing to
> *Cassandra* after parsing the json and the subsequently updating the
> offsets in *zookeeper*.
> If Cassandra cluster is down, it throws exception but the batch which
> arrives in that time window is not processed ever though the offsets are
> updated in zookeeper.
> It is resulting data loss.
> Once the Cassandra cluster is up, this job process the data normally.
> PFA the screenshots of hung batches and code.
>
> *Code:*
>
> data_rdd.foreachRDD(rdd=> {
>   val stream = rdd
> .map(x =>JsonUtility.deserialize(x))
>   stream.saveToCassandra(CASSANDRA_KEY_SPACE, SIGNATURE_TABLE, 
> StreamModel.getColumns)
>
>
>   //commit the offsets once everything is done
>   ZookeeperManager.updateOffsetsinZk(zkProperties, rdd)
> })
>
> *I have even tried this variant:*
>
> data_rdd.foreachRDD(rdd=> {
>   val stream = rdd
> .map(x =>JsonUtility.deserialize(x))
>   stream.saveToCassandra(CASSANDRA_KEY_SPACE, SIGNATURE_TABLE, 
> StreamModel.getColumns)
> })
>
> data_rdd.foreachRDD(rdd=> {
>
>   //commit the offsets once everything is done
>
>   ZookeeperManager.updateOffsetsinZk(zkProperties, rdd)
>
> }
>
> Exception when cassandra cluster is down:
> [2015-10-19 12:49:20] [JobScheduler] [ERROR]
> [org.apache.spark.streaming.scheduler.JobScheduler] - Error running job
> streaming job 144523914 ms.3
> java.io.IOException: Failed to open native connection to Cassandra at
> {..}
>
> --
> *VARUN SHARMA*
> *Flipkart*
> *Bangalore*
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>


>>>
>>>
>>> --
>>> *VARUN SHARMA*
>>> *Flipkart*
>>> *Bangalore*
>>>
>>
>>
>>
>> --
>> *VARUN SHARMA*
>> *Flipkart*
>> *Bangalore*
>>
>
>


-- 
*VARUN SHARMA*
*Flipkart*
*Bangalore*


Re: Issue in spark batches

2015-10-20 Thread varun sharma
Hi TD,
Yes saveToCassandra throws exception. How do I fail that task explicitly if
i catch any exceptions?.
Right now that batch doesn't fail and remain in hung state. Is there any
way I fail that batch so that it can be tried again.

Thanks
Varun

On Tue, Oct 20, 2015 at 2:50 AM, Tathagata Das  wrote:

> If cassandra is down, does saveToCassandra throw an exception? If it does,
> you can catch that exception and write your own logic to retry and/or no
> update. Once the foreachRDD function completes, that batch will be
> internally marked as completed.
>
> TD
>
> On Mon, Oct 19, 2015 at 5:48 AM, varun sharma 
> wrote:
>
>> Hi,
>> I am facing this issue consistently in spark-cassandra-kafka *streaming
>> job.*
>> *Spark 1.4.0*
>> *cassandra connector 1.4.0-M3*
>> *Issue is:*
>>
>> I am reading data from *Kafka* using DirectStream, writing to *Cassandra* 
>> after
>> parsing the json and the subsequently updating the offsets in *zookeeper*
>> .
>> If Cassandra cluster is down, it throws exception but the batch which
>> arrives in that time window is not processed ever though the offsets are
>> updated in zookeeper.
>> It is resulting data loss.
>> Once the Cassandra cluster is up, this job process the data normally.
>> PFA the screenshots of hung batches and code.
>>
>> *Code:*
>>
>> data_rdd.foreachRDD(rdd=> {
>>   val stream = rdd
>> .map(x =>JsonUtility.deserialize(x))
>>   stream.saveToCassandra(CASSANDRA_KEY_SPACE, SIGNATURE_TABLE, 
>> StreamModel.getColumns)
>>
>>
>>   //commit the offsets once everything is done
>>   ZookeeperManager.updateOffsetsinZk(zkProperties, rdd)
>> })
>>
>> *I have even tried this variant:*
>>
>> data_rdd.foreachRDD(rdd=> {
>>   val stream = rdd
>> .map(x =>JsonUtility.deserialize(x))
>>   stream.saveToCassandra(CASSANDRA_KEY_SPACE, SIGNATURE_TABLE, 
>> StreamModel.getColumns)
>> })
>>
>> data_rdd.foreachRDD(rdd=> {
>>
>>   //commit the offsets once everything is done
>>
>>   ZookeeperManager.updateOffsetsinZk(zkProperties, rdd)
>>
>> }
>>
>> Exception when cassandra cluster is down:
>> [2015-10-19 12:49:20] [JobScheduler] [ERROR]
>> [org.apache.spark.streaming.scheduler.JobScheduler] - Error running job
>> streaming job 144523914 ms.3
>> java.io.IOException: Failed to open native connection to Cassandra at
>> {..}
>>
>> --
>> *VARUN SHARMA*
>> *Flipkart*
>> *Bangalore*
>>
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>
>


-- 
*VARUN SHARMA*
*Flipkart*
*Bangalore*


Re: Issue in spark batches

2015-10-20 Thread Tathagata Das
That is actually a bug in the UI that got fixed in 1.5.1. The batch is
actually completing with exception, the UI does not update correctly.

On Tue, Oct 20, 2015 at 8:38 AM, varun sharma 
wrote:

> Also, As you can see the timestamps in attached image. batches coming
> after the Cassandra server comes up(21:04) are processed and batches which
> are in hung state(21:03) never get processed.
> So, How do I fail those batches so that those can be processed again.
>
> On Tue, Oct 20, 2015 at 9:02 PM, varun sharma 
> wrote:
>
>> Hi TD,
>> Yes saveToCassandra throws exception. How do I fail that task explicitly
>> if i catch any exceptions?.
>> Right now that batch doesn't fail and remain in hung state. Is there any
>> way I fail that batch so that it can be tried again.
>>
>> Thanks
>> Varun
>>
>> On Tue, Oct 20, 2015 at 2:50 AM, Tathagata Das 
>> wrote:
>>
>>> If cassandra is down, does saveToCassandra throw an exception? If it
>>> does, you can catch that exception and write your own logic to retry and/or
>>> no update. Once the foreachRDD function completes, that batch will be
>>> internally marked as completed.
>>>
>>> TD
>>>
>>> On Mon, Oct 19, 2015 at 5:48 AM, varun sharma >> > wrote:
>>>
 Hi,
 I am facing this issue consistently in spark-cassandra-kafka *streaming
 job.*
 *Spark 1.4.0*
 *cassandra connector 1.4.0-M3*
 *Issue is:*

 I am reading data from *Kafka* using DirectStream, writing to
 *Cassandra* after parsing the json and the subsequently updating the
 offsets in *zookeeper*.
 If Cassandra cluster is down, it throws exception but the batch which
 arrives in that time window is not processed ever though the offsets are
 updated in zookeeper.
 It is resulting data loss.
 Once the Cassandra cluster is up, this job process the data normally.
 PFA the screenshots of hung batches and code.

 *Code:*

 data_rdd.foreachRDD(rdd=> {
   val stream = rdd
 .map(x =>JsonUtility.deserialize(x))
   stream.saveToCassandra(CASSANDRA_KEY_SPACE, SIGNATURE_TABLE, 
 StreamModel.getColumns)


   //commit the offsets once everything is done
   ZookeeperManager.updateOffsetsinZk(zkProperties, rdd)
 })

 *I have even tried this variant:*

 data_rdd.foreachRDD(rdd=> {
   val stream = rdd
 .map(x =>JsonUtility.deserialize(x))
   stream.saveToCassandra(CASSANDRA_KEY_SPACE, SIGNATURE_TABLE, 
 StreamModel.getColumns)
 })

 data_rdd.foreachRDD(rdd=> {

   //commit the offsets once everything is done

   ZookeeperManager.updateOffsetsinZk(zkProperties, rdd)

 }

 Exception when cassandra cluster is down:
 [2015-10-19 12:49:20] [JobScheduler] [ERROR]
 [org.apache.spark.streaming.scheduler.JobScheduler] - Error running job
 streaming job 144523914 ms.3
 java.io.IOException: Failed to open native connection to Cassandra at
 {..}

 --
 *VARUN SHARMA*
 *Flipkart*
 *Bangalore*


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org

>>>
>>>
>>
>>
>> --
>> *VARUN SHARMA*
>> *Flipkart*
>> *Bangalore*
>>
>
>
>
> --
> *VARUN SHARMA*
> *Flipkart*
> *Bangalore*
>


Re: Issue in spark batches

2015-10-19 Thread Tathagata Das
If cassandra is down, does saveToCassandra throw an exception? If it does,
you can catch that exception and write your own logic to retry and/or no
update. Once the foreachRDD function completes, that batch will be
internally marked as completed.

TD

On Mon, Oct 19, 2015 at 5:48 AM, varun sharma 
wrote:

> Hi,
> I am facing this issue consistently in spark-cassandra-kafka *streaming
> job.*
> *Spark 1.4.0*
> *cassandra connector 1.4.0-M3*
> *Issue is:*
>
> I am reading data from *Kafka* using DirectStream, writing to *Cassandra* 
> after
> parsing the json and the subsequently updating the offsets in *zookeeper*.
> If Cassandra cluster is down, it throws exception but the batch which
> arrives in that time window is not processed ever though the offsets are
> updated in zookeeper.
> It is resulting data loss.
> Once the Cassandra cluster is up, this job process the data normally.
> PFA the screenshots of hung batches and code.
>
> *Code:*
>
> data_rdd.foreachRDD(rdd=> {
>   val stream = rdd
> .map(x =>JsonUtility.deserialize(x))
>   stream.saveToCassandra(CASSANDRA_KEY_SPACE, SIGNATURE_TABLE, 
> StreamModel.getColumns)
>
>
>   //commit the offsets once everything is done
>   ZookeeperManager.updateOffsetsinZk(zkProperties, rdd)
> })
>
> *I have even tried this variant:*
>
> data_rdd.foreachRDD(rdd=> {
>   val stream = rdd
> .map(x =>JsonUtility.deserialize(x))
>   stream.saveToCassandra(CASSANDRA_KEY_SPACE, SIGNATURE_TABLE, 
> StreamModel.getColumns)
> })
>
> data_rdd.foreachRDD(rdd=> {
>
>   //commit the offsets once everything is done
>
>   ZookeeperManager.updateOffsetsinZk(zkProperties, rdd)
>
> }
>
> Exception when cassandra cluster is down:
> [2015-10-19 12:49:20] [JobScheduler] [ERROR]
> [org.apache.spark.streaming.scheduler.JobScheduler] - Error running job
> streaming job 144523914 ms.3
> java.io.IOException: Failed to open native connection to Cassandra at
> {..}
>
> --
> *VARUN SHARMA*
> *Flipkart*
> *Bangalore*
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>