Re: Issue in spark batches
Unfortunately, you will have to write that code yourself. TD On Tue, Oct 20, 2015 at 11:28 PM, varun sharmawrote: > Hi TD, > Is there any way in spark I can fail/retry batch in case of any > exceptions or do I have to write code to explicitly keep on retrying? > Also If some batch fail, I want to block further batches to be processed > as it would create inconsistency in updation of zookeeper offsets and maybe > kill the job itself after lets say 3 retries. > > Any pointers to achieve same are appreciated. > > On Wed, Oct 21, 2015 at 1:15 AM, Tathagata Das > wrote: > >> That is actually a bug in the UI that got fixed in 1.5.1. The batch is >> actually completing with exception, the UI does not update correctly. >> >> On Tue, Oct 20, 2015 at 8:38 AM, varun sharma >> wrote: >> >>> Also, As you can see the timestamps in attached image. batches coming >>> after the Cassandra server comes up(21:04) are processed and batches which >>> are in hung state(21:03) never get processed. >>> So, How do I fail those batches so that those can be processed again. >>> >>> On Tue, Oct 20, 2015 at 9:02 PM, varun sharma >> > wrote: >>> Hi TD, Yes saveToCassandra throws exception. How do I fail that task explicitly if i catch any exceptions?. Right now that batch doesn't fail and remain in hung state. Is there any way I fail that batch so that it can be tried again. Thanks Varun On Tue, Oct 20, 2015 at 2:50 AM, Tathagata Das wrote: > If cassandra is down, does saveToCassandra throw an exception? If it > does, you can catch that exception and write your own logic to retry > and/or > no update. Once the foreachRDD function completes, that batch will be > internally marked as completed. > > TD > > On Mon, Oct 19, 2015 at 5:48 AM, varun sharma < > varunsharman...@gmail.com> wrote: > >> Hi, >> I am facing this issue consistently in spark-cassandra-kafka *streaming >> job.* >> *Spark 1.4.0* >> *cassandra connector 1.4.0-M3* >> *Issue is:* >> >> I am reading data from *Kafka* using DirectStream, writing to >> *Cassandra* after parsing the json and the subsequently updating the >> offsets in *zookeeper*. >> If Cassandra cluster is down, it throws exception but the batch which >> arrives in that time window is not processed ever though the offsets are >> updated in zookeeper. >> It is resulting data loss. >> Once the Cassandra cluster is up, this job process the data normally. >> PFA the screenshots of hung batches and code. >> >> *Code:* >> >> data_rdd.foreachRDD(rdd=> { >> val stream = rdd >> .map(x =>JsonUtility.deserialize(x)) >> stream.saveToCassandra(CASSANDRA_KEY_SPACE, SIGNATURE_TABLE, >> StreamModel.getColumns) >> >> >> //commit the offsets once everything is done >> ZookeeperManager.updateOffsetsinZk(zkProperties, rdd) >> }) >> >> *I have even tried this variant:* >> >> data_rdd.foreachRDD(rdd=> { >> val stream = rdd >> .map(x =>JsonUtility.deserialize(x)) >> stream.saveToCassandra(CASSANDRA_KEY_SPACE, SIGNATURE_TABLE, >> StreamModel.getColumns) >> }) >> >> data_rdd.foreachRDD(rdd=> { >> >> //commit the offsets once everything is done >> >> ZookeeperManager.updateOffsetsinZk(zkProperties, rdd) >> >> } >> >> Exception when cassandra cluster is down: >> [2015-10-19 12:49:20] [JobScheduler] [ERROR] >> [org.apache.spark.streaming.scheduler.JobScheduler] - Error running job >> streaming job 144523914 ms.3 >> java.io.IOException: Failed to open native connection to Cassandra at >> {..} >> >> -- >> *VARUN SHARMA* >> *Flipkart* >> *Bangalore* >> >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> > > -- *VARUN SHARMA* *Flipkart* *Bangalore* >>> >>> >>> >>> -- >>> *VARUN SHARMA* >>> *Flipkart* >>> *Bangalore* >>> >> >> > > > -- > *VARUN SHARMA* > *Flipkart* > *Bangalore* >
Re: Issue in spark batches
Hi TD, Is there any way in spark I can fail/retry batch in case of any exceptions or do I have to write code to explicitly keep on retrying? Also If some batch fail, I want to block further batches to be processed as it would create inconsistency in updation of zookeeper offsets and maybe kill the job itself after lets say 3 retries. Any pointers to achieve same are appreciated. On Wed, Oct 21, 2015 at 1:15 AM, Tathagata Daswrote: > That is actually a bug in the UI that got fixed in 1.5.1. The batch is > actually completing with exception, the UI does not update correctly. > > On Tue, Oct 20, 2015 at 8:38 AM, varun sharma > wrote: > >> Also, As you can see the timestamps in attached image. batches coming >> after the Cassandra server comes up(21:04) are processed and batches which >> are in hung state(21:03) never get processed. >> So, How do I fail those batches so that those can be processed again. >> >> On Tue, Oct 20, 2015 at 9:02 PM, varun sharma >> wrote: >> >>> Hi TD, >>> Yes saveToCassandra throws exception. How do I fail that task explicitly >>> if i catch any exceptions?. >>> Right now that batch doesn't fail and remain in hung state. Is there any >>> way I fail that batch so that it can be tried again. >>> >>> Thanks >>> Varun >>> >>> On Tue, Oct 20, 2015 at 2:50 AM, Tathagata Das >>> wrote: >>> If cassandra is down, does saveToCassandra throw an exception? If it does, you can catch that exception and write your own logic to retry and/or no update. Once the foreachRDD function completes, that batch will be internally marked as completed. TD On Mon, Oct 19, 2015 at 5:48 AM, varun sharma < varunsharman...@gmail.com> wrote: > Hi, > I am facing this issue consistently in spark-cassandra-kafka *streaming > job.* > *Spark 1.4.0* > *cassandra connector 1.4.0-M3* > *Issue is:* > > I am reading data from *Kafka* using DirectStream, writing to > *Cassandra* after parsing the json and the subsequently updating the > offsets in *zookeeper*. > If Cassandra cluster is down, it throws exception but the batch which > arrives in that time window is not processed ever though the offsets are > updated in zookeeper. > It is resulting data loss. > Once the Cassandra cluster is up, this job process the data normally. > PFA the screenshots of hung batches and code. > > *Code:* > > data_rdd.foreachRDD(rdd=> { > val stream = rdd > .map(x =>JsonUtility.deserialize(x)) > stream.saveToCassandra(CASSANDRA_KEY_SPACE, SIGNATURE_TABLE, > StreamModel.getColumns) > > > //commit the offsets once everything is done > ZookeeperManager.updateOffsetsinZk(zkProperties, rdd) > }) > > *I have even tried this variant:* > > data_rdd.foreachRDD(rdd=> { > val stream = rdd > .map(x =>JsonUtility.deserialize(x)) > stream.saveToCassandra(CASSANDRA_KEY_SPACE, SIGNATURE_TABLE, > StreamModel.getColumns) > }) > > data_rdd.foreachRDD(rdd=> { > > //commit the offsets once everything is done > > ZookeeperManager.updateOffsetsinZk(zkProperties, rdd) > > } > > Exception when cassandra cluster is down: > [2015-10-19 12:49:20] [JobScheduler] [ERROR] > [org.apache.spark.streaming.scheduler.JobScheduler] - Error running job > streaming job 144523914 ms.3 > java.io.IOException: Failed to open native connection to Cassandra at > {..} > > -- > *VARUN SHARMA* > *Flipkart* > *Bangalore* > > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >>> >>> >>> -- >>> *VARUN SHARMA* >>> *Flipkart* >>> *Bangalore* >>> >> >> >> >> -- >> *VARUN SHARMA* >> *Flipkart* >> *Bangalore* >> > > -- *VARUN SHARMA* *Flipkart* *Bangalore*
Re: Issue in spark batches
Hi TD, Yes saveToCassandra throws exception. How do I fail that task explicitly if i catch any exceptions?. Right now that batch doesn't fail and remain in hung state. Is there any way I fail that batch so that it can be tried again. Thanks Varun On Tue, Oct 20, 2015 at 2:50 AM, Tathagata Daswrote: > If cassandra is down, does saveToCassandra throw an exception? If it does, > you can catch that exception and write your own logic to retry and/or no > update. Once the foreachRDD function completes, that batch will be > internally marked as completed. > > TD > > On Mon, Oct 19, 2015 at 5:48 AM, varun sharma > wrote: > >> Hi, >> I am facing this issue consistently in spark-cassandra-kafka *streaming >> job.* >> *Spark 1.4.0* >> *cassandra connector 1.4.0-M3* >> *Issue is:* >> >> I am reading data from *Kafka* using DirectStream, writing to *Cassandra* >> after >> parsing the json and the subsequently updating the offsets in *zookeeper* >> . >> If Cassandra cluster is down, it throws exception but the batch which >> arrives in that time window is not processed ever though the offsets are >> updated in zookeeper. >> It is resulting data loss. >> Once the Cassandra cluster is up, this job process the data normally. >> PFA the screenshots of hung batches and code. >> >> *Code:* >> >> data_rdd.foreachRDD(rdd=> { >> val stream = rdd >> .map(x =>JsonUtility.deserialize(x)) >> stream.saveToCassandra(CASSANDRA_KEY_SPACE, SIGNATURE_TABLE, >> StreamModel.getColumns) >> >> >> //commit the offsets once everything is done >> ZookeeperManager.updateOffsetsinZk(zkProperties, rdd) >> }) >> >> *I have even tried this variant:* >> >> data_rdd.foreachRDD(rdd=> { >> val stream = rdd >> .map(x =>JsonUtility.deserialize(x)) >> stream.saveToCassandra(CASSANDRA_KEY_SPACE, SIGNATURE_TABLE, >> StreamModel.getColumns) >> }) >> >> data_rdd.foreachRDD(rdd=> { >> >> //commit the offsets once everything is done >> >> ZookeeperManager.updateOffsetsinZk(zkProperties, rdd) >> >> } >> >> Exception when cassandra cluster is down: >> [2015-10-19 12:49:20] [JobScheduler] [ERROR] >> [org.apache.spark.streaming.scheduler.JobScheduler] - Error running job >> streaming job 144523914 ms.3 >> java.io.IOException: Failed to open native connection to Cassandra at >> {..} >> >> -- >> *VARUN SHARMA* >> *Flipkart* >> *Bangalore* >> >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> > > -- *VARUN SHARMA* *Flipkart* *Bangalore*
Re: Issue in spark batches
That is actually a bug in the UI that got fixed in 1.5.1. The batch is actually completing with exception, the UI does not update correctly. On Tue, Oct 20, 2015 at 8:38 AM, varun sharmawrote: > Also, As you can see the timestamps in attached image. batches coming > after the Cassandra server comes up(21:04) are processed and batches which > are in hung state(21:03) never get processed. > So, How do I fail those batches so that those can be processed again. > > On Tue, Oct 20, 2015 at 9:02 PM, varun sharma > wrote: > >> Hi TD, >> Yes saveToCassandra throws exception. How do I fail that task explicitly >> if i catch any exceptions?. >> Right now that batch doesn't fail and remain in hung state. Is there any >> way I fail that batch so that it can be tried again. >> >> Thanks >> Varun >> >> On Tue, Oct 20, 2015 at 2:50 AM, Tathagata Das >> wrote: >> >>> If cassandra is down, does saveToCassandra throw an exception? If it >>> does, you can catch that exception and write your own logic to retry and/or >>> no update. Once the foreachRDD function completes, that batch will be >>> internally marked as completed. >>> >>> TD >>> >>> On Mon, Oct 19, 2015 at 5:48 AM, varun sharma >> > wrote: >>> Hi, I am facing this issue consistently in spark-cassandra-kafka *streaming job.* *Spark 1.4.0* *cassandra connector 1.4.0-M3* *Issue is:* I am reading data from *Kafka* using DirectStream, writing to *Cassandra* after parsing the json and the subsequently updating the offsets in *zookeeper*. If Cassandra cluster is down, it throws exception but the batch which arrives in that time window is not processed ever though the offsets are updated in zookeeper. It is resulting data loss. Once the Cassandra cluster is up, this job process the data normally. PFA the screenshots of hung batches and code. *Code:* data_rdd.foreachRDD(rdd=> { val stream = rdd .map(x =>JsonUtility.deserialize(x)) stream.saveToCassandra(CASSANDRA_KEY_SPACE, SIGNATURE_TABLE, StreamModel.getColumns) //commit the offsets once everything is done ZookeeperManager.updateOffsetsinZk(zkProperties, rdd) }) *I have even tried this variant:* data_rdd.foreachRDD(rdd=> { val stream = rdd .map(x =>JsonUtility.deserialize(x)) stream.saveToCassandra(CASSANDRA_KEY_SPACE, SIGNATURE_TABLE, StreamModel.getColumns) }) data_rdd.foreachRDD(rdd=> { //commit the offsets once everything is done ZookeeperManager.updateOffsetsinZk(zkProperties, rdd) } Exception when cassandra cluster is down: [2015-10-19 12:49:20] [JobScheduler] [ERROR] [org.apache.spark.streaming.scheduler.JobScheduler] - Error running job streaming job 144523914 ms.3 java.io.IOException: Failed to open native connection to Cassandra at {..} -- *VARUN SHARMA* *Flipkart* *Bangalore* - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org >>> >>> >> >> >> -- >> *VARUN SHARMA* >> *Flipkart* >> *Bangalore* >> > > > > -- > *VARUN SHARMA* > *Flipkart* > *Bangalore* >
Re: Issue in spark batches
If cassandra is down, does saveToCassandra throw an exception? If it does, you can catch that exception and write your own logic to retry and/or no update. Once the foreachRDD function completes, that batch will be internally marked as completed. TD On Mon, Oct 19, 2015 at 5:48 AM, varun sharmawrote: > Hi, > I am facing this issue consistently in spark-cassandra-kafka *streaming > job.* > *Spark 1.4.0* > *cassandra connector 1.4.0-M3* > *Issue is:* > > I am reading data from *Kafka* using DirectStream, writing to *Cassandra* > after > parsing the json and the subsequently updating the offsets in *zookeeper*. > If Cassandra cluster is down, it throws exception but the batch which > arrives in that time window is not processed ever though the offsets are > updated in zookeeper. > It is resulting data loss. > Once the Cassandra cluster is up, this job process the data normally. > PFA the screenshots of hung batches and code. > > *Code:* > > data_rdd.foreachRDD(rdd=> { > val stream = rdd > .map(x =>JsonUtility.deserialize(x)) > stream.saveToCassandra(CASSANDRA_KEY_SPACE, SIGNATURE_TABLE, > StreamModel.getColumns) > > > //commit the offsets once everything is done > ZookeeperManager.updateOffsetsinZk(zkProperties, rdd) > }) > > *I have even tried this variant:* > > data_rdd.foreachRDD(rdd=> { > val stream = rdd > .map(x =>JsonUtility.deserialize(x)) > stream.saveToCassandra(CASSANDRA_KEY_SPACE, SIGNATURE_TABLE, > StreamModel.getColumns) > }) > > data_rdd.foreachRDD(rdd=> { > > //commit the offsets once everything is done > > ZookeeperManager.updateOffsetsinZk(zkProperties, rdd) > > } > > Exception when cassandra cluster is down: > [2015-10-19 12:49:20] [JobScheduler] [ERROR] > [org.apache.spark.streaming.scheduler.JobScheduler] - Error running job > streaming job 144523914 ms.3 > java.io.IOException: Failed to open native connection to Cassandra at > {..} > > -- > *VARUN SHARMA* > *Flipkart* > *Bangalore* > > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org >