Re: Update Batch DF with Streaming

2016-06-19 Thread Amit Assudani
Please help

From: amit assudani 
Date: Thursday, June 16, 2016 at 6:11 PM
To: "user@spark.apache.org" 
Subject: Update Batch DF with Streaming


Hi All,



Can I update batch data frames loaded in memory with Streaming data,



For eg,



I have employee DF is registered as temporary table, it has EmployeeID, Name, 
Address, etc. fields,  and assuming it is very big and takes time to load in 
memory,



I've two types of employee events (both having empID bundled in payload) coming 
in streams,



1) which looks up  for a particular empID in batch data and does some 
calculation and persist the results,

2) which has updated values of some of the fields for an empID,



Now I want to keep the employee DF up to date with the updates coming in type 2 
events for future type 1 events to use,



Now the question is can I update the employee DF with type 2 events in memory ? 
Do I need the whole DF refresh ?



p.s. I can join the stream with batch and get the joined table, but i am not 
sure how to get and use the handle of joined data for subsequent events,



Regards,

Amit








NOTE: This message may contain information that is confidential, proprietary, 
privileged or otherwise protected by law. The message is intended solely for 
the named addressee. If received in error, please destroy and notify the 
sender. Any use of this email is prohibited when received in error. Impetus 
does not represent, warrant and/or guarantee, that the integrity of this 
communication has been maintained nor that the communication is free of errors, 
virus, interception or interference.








NOTE: This message may contain information that is confidential, proprietary, 
privileged or otherwise protected by law. The message is intended solely for 
the named addressee. If received in error, please destroy and notify the 
sender. Any use of this email is prohibited when received in error. Impetus 
does not represent, warrant and/or guarantee, that the integrity of this 
communication has been maintained nor that the communication is free of errors, 
virus, interception or interference.


Update Batch DF with Streaming

2016-06-16 Thread Amit Assudani
Hi All,


Can I update batch data frames loaded in memory with Streaming data,


For eg,


I have employee DF is registered as temporary table, it has EmployeeID, Name, 
Address, etc. fields,  and assuming it is very big and takes time to load in 
memory,


I've two types of employee events (both having empID bundled in payload) coming 
in streams,


1) which looks up  for a particular empID in batch data and does some 
calculation and persist the results,

2) which has updated values of some of the fields for an empID,


Now I want to keep the employee DF up to date with the updates coming in type 2 
events for future type 1 events to use,


Now the question is can I update the employee DF with type 2 events in memory ? 
Do I need the whole DF refresh ?


p.s. I can join the stream with batch and get the joined table, but i am not 
sure how to get and use the handle of joined data for subsequent events,


Regards,

Amit








NOTE: This message may contain information that is confidential, proprietary, 
privileged or otherwise protected by law. The message is intended solely for 
the named addressee. If received in error, please destroy and notify the 
sender. Any use of this email is prohibited when received in error. Impetus 
does not represent, warrant and/or guarantee, that the integrity of this 
communication has been maintained nor that the communication is free of errors, 
virus, interception or interference.


Re: Spark streaming not remembering previous state

2016-02-27 Thread Amit Assudani
Your context is not being created using checkpoints, use get or create,

From: Vinti Maheshwari mailto:vinti.u...@gmail.com>>
Date: Saturday, February 27, 2016 at 3:28 PM
To: user mailto:user@spark.apache.org>>
Subject: Spark streaming not remembering previous state

Hi All,

I wrote spark streaming program with stateful transformation.
It seems like my spark streaming application is doing computation correctly 
with check pointing.
But i terminate my program and i start it again, it's not reading the previous 
checkpointing data and staring from the beginning. Is it the expected behaviour?

Do i need to change anything in my program so that it will remember the 
previous data and start computation from there?

Thanks in advance.

For reference my program:

  def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("HBaseStream")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(5))
val inputStream = 
ssc.socketTextStream("ttsv-vccp-01.juniper.net",
 )

ssc.checkpoint("hdfs://ttsv-lab-vmdb-01.englab.juniper.net:8020/user/spark/checkpoints_dir")
inputStream.print(1)
val parsedStream = inputStream
  .map(line => {
val splitLines = line.split(",")
(splitLines(1), splitLines.slice(2, 
splitLines.length).map((_.trim.toLong)))
  })
import breeze.linalg.{DenseVector => BDV}
import scala.util.Try

val state: DStream[(String, Array[Long])] = parsedStream.updateStateByKey(
  (current: Seq[Array[Long]], prev: Option[Array[Long]]) =>  {
prev.map(_ +: current).orElse(Some(current))
  .flatMap(as => Try(as.map(BDV(_)).reduce(_ + _).toArray).toOption)
  })
state.checkpoint(Duration(1))
state.foreachRDD(rdd => rdd.foreach(Blaher.blah))

// Start the computation
ssc.start()
// Wait for the computation to terminate
ssc.awaitTermination()

  }
}


Regards,

~Vinti








NOTE: This message may contain information that is confidential, proprietary, 
privileged or otherwise protected by law. The message is intended solely for 
the named addressee. If received in error, please destroy and notify the 
sender. Any use of this email is prohibited when received in error. Impetus 
does not represent, warrant and/or guarantee, that the integrity of this 
communication has been maintained nor that the communication is free of errors, 
virus, interception or interference.


Re: How to recover in case user errors in streaming

2015-07-01 Thread Amit Assudani
Hi TD,

Why don’t we have OnBatchError or similar method in StreamingListener ?

Also, is StreamingListener only for receiver based approach or does it work for 
Kafka Direct API / File Based Streaming as well ?

Regards,
Amit

From: Tathagata Das mailto:t...@databricks.com>>
Date: Monday, June 29, 2015 at 5:24 PM
To: amit assudani mailto:aassud...@impetus.com>>
Cc: Cody Koeninger mailto:c...@koeninger.org>>, 
"user@spark.apache.org<mailto:user@spark.apache.org>" 
mailto:user@spark.apache.org>>
Subject: Re: How to recover in case user errors in streaming

I recommend writing using dstream.foreachRDD, and then 
rdd.saveAsNewAPIHadoopFile inside try catch. See the implementation of 
dstream.saveAsNewAPIHadoopFiles

https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala#L716

On Mon, Jun 29, 2015 at 8:44 AM, Amit Assudani 
mailto:aassud...@impetus.com>> wrote:
Also, how do you suggest catching exceptions while using with connector API 
like, saveAsNewAPIHadoopFiles ?

From: amit assudani mailto:aassud...@impetus.com>>
Date: Monday, June 29, 2015 at 9:55 AM
To: Tathagata Das mailto:t...@databricks.com>>

Cc: Cody Koeninger mailto:c...@koeninger.org>>, 
"user@spark.apache.org<mailto:user@spark.apache.org>" 
mailto:user@spark.apache.org>>
Subject: Re: How to recover in case user errors in streaming

Thanks TD, this helps.

Looking forward to some fix where framework handles the batch failures by some 
callback methods. This will help not having to write try/catch in every 
transformation / action.

Regards,
Amit

From: Tathagata Das mailto:t...@databricks.com>>
Date: Saturday, June 27, 2015 at 5:14 AM
To: amit assudani mailto:aassud...@impetus.com>>
Cc: Cody Koeninger mailto:c...@koeninger.org>>, 
"user@spark.apache.org<mailto:user@spark.apache.org>" 
mailto:user@spark.apache.org>>
Subject: Re: How to recover in case user errors in streaming

I looked at the code and found that batch exceptions are indeed ignored. This 
is something that is worth fixing, that batch exceptions should not be silently 
ignored.

Also, you can catch failed batch jobs (irrespective of the number of retries) 
by catch the exception in foreachRDD. Here is an example.

dstream.foreachRDD { rdd =>

   try {

   } catch {

   }
}


This will catch failures at the granularity of the job, after all the max 
retries of a task has been done. But it will be hard to filter and find the 
push the failed record(s) somewhere. To do that, I would do use rdd.foreach or 
rdd.foreachPartition, inside which I would catch the exception and push that 
record out to another Kafka topic, and continue normal processing of other 
records. This would prevent the task process the partition from failing (as you 
are catching the bad records).

dstream.foreachRDD {  rdd =>

rdd.foreachPartition { iterator =>

 // Create Kafka producer for bad records

iterator.foreach { record =>
 try {
 // process record
 } catch {
case ExpectedException =>
// publish bad record to error topic in Kafka using above 
producer
 }
}
}
}


TD

PS: Apologies for the Scala examples, hope you get the idea :)

On Fri, Jun 26, 2015 at 9:56 AM, Amit Assudani 
mailto:aassud...@impetus.com>> wrote:
Also, I get TaskContext.get() null when used in foreach function below ( I get 
it when I use it in map, but the whole point here is to handle something that 
is breaking in action ). Please help. :(

From: amit assudani mailto:aassud...@impetus.com>>
Date: Friday, June 26, 2015 at 11:41 AM

To: Cody Koeninger mailto:c...@koeninger.org>>
Cc: "user@spark.apache.org<mailto:user@spark.apache.org>" 
mailto:user@spark.apache.org>>, Tathagata Das 
mailto:t...@databricks.com>>
Subject: Re: How to recover in case user errors in streaming

Hmm, not sure why, but when I run this code, it always keeps on consuming from 
Kafka and proceeds ignoring the previous failed batches,

Also, Now that I get the attempt number from TaskContext and I have information 
of max retries, I am supposed to handle it in the try/catch block, but does it 
mean I’ve to handle these kind of exceptions / errors in every transformation 
step ( map, reduce, transform, etc. ), isn’t there any callback where it says 
it has been retried max number of times and before being ignored you’ve a 
handle to do whatever you want to do with the batch / message in hand.

Regards,
Amit

From: Cody Koeninger mailto:c...@koeninger.org>>
Date: Friday, June 26, 2015 at 11:32 AM
To: amit assudani mailto:aassud...@impetus.com>>
Cc: "user@spark.apache.org<mailto:user@spark.apache.org>" 
mailto:user@spark.apache.org>>, Tathagata Das 
mailto

Checkpoint FS failure or connectivity issue

2015-06-29 Thread Amit Assudani
Hi All,

While using Checkpoints ( using HDFS ), if connectivity to hadoop cluster is 
lost for a while and gets restored in some time, what happens to the running 
streaming job.

Is it always assumed that connection to checkpoint FS ( this case HDFS ) would 
ALWAYS be HA and would never fail for any reasons.

Regards,
Amit

P.s. I've tried using Kafka Direct and Kafka Receiver using WAL. Faced issues 
in both of 'em.










NOTE: This message may contain information that is confidential, proprietary, 
privileged or otherwise protected by law. The message is intended solely for 
the named addressee. If received in error, please destroy and notify the 
sender. Any use of this email is prohibited when received in error. Impetus 
does not represent, warrant and/or guarantee, that the integrity of this 
communication has been maintained nor that the communication is free of errors, 
virus, interception or interference.


Re: How to recover in case user errors in streaming

2015-06-29 Thread Amit Assudani
Also, how do you suggest catching exceptions while using with connector API 
like, saveAsNewAPIHadoopFiles ?

From: amit assudani mailto:aassud...@impetus.com>>
Date: Monday, June 29, 2015 at 9:55 AM
To: Tathagata Das mailto:t...@databricks.com>>
Cc: Cody Koeninger mailto:c...@koeninger.org>>, 
"user@spark.apache.org<mailto:user@spark.apache.org>" 
mailto:user@spark.apache.org>>
Subject: Re: How to recover in case user errors in streaming

Thanks TD, this helps.

Looking forward to some fix where framework handles the batch failures by some 
callback methods. This will help not having to write try/catch in every 
transformation / action.

Regards,
Amit

From: Tathagata Das mailto:t...@databricks.com>>
Date: Saturday, June 27, 2015 at 5:14 AM
To: amit assudani mailto:aassud...@impetus.com>>
Cc: Cody Koeninger mailto:c...@koeninger.org>>, 
"user@spark.apache.org<mailto:user@spark.apache.org>" 
mailto:user@spark.apache.org>>
Subject: Re: How to recover in case user errors in streaming

I looked at the code and found that batch exceptions are indeed ignored. This 
is something that is worth fixing, that batch exceptions should not be silently 
ignored.

Also, you can catch failed batch jobs (irrespective of the number of retries) 
by catch the exception in foreachRDD. Here is an example.

dstream.foreachRDD { rdd =>

   try {

   } catch {

   }
}


This will catch failures at the granularity of the job, after all the max 
retries of a task has been done. But it will be hard to filter and find the 
push the failed record(s) somewhere. To do that, I would do use rdd.foreach or 
rdd.foreachPartition, inside which I would catch the exception and push that 
record out to another Kafka topic, and continue normal processing of other 
records. This would prevent the task process the partition from failing (as you 
are catching the bad records).

dstream.foreachRDD {  rdd =>

rdd.foreachPartition { iterator =>

 // Create Kafka producer for bad records

iterator.foreach { record =>
 try {
 // process record
 } catch {
case ExpectedException =>
// publish bad record to error topic in Kafka using above 
producer
 }
}
}
}


TD

PS: Apologies for the Scala examples, hope you get the idea :)

On Fri, Jun 26, 2015 at 9:56 AM, Amit Assudani 
mailto:aassud...@impetus.com>> wrote:
Also, I get TaskContext.get() null when used in foreach function below ( I get 
it when I use it in map, but the whole point here is to handle something that 
is breaking in action ). Please help. :(

From: amit assudani mailto:aassud...@impetus.com>>
Date: Friday, June 26, 2015 at 11:41 AM

To: Cody Koeninger mailto:c...@koeninger.org>>
Cc: "user@spark.apache.org<mailto:user@spark.apache.org>" 
mailto:user@spark.apache.org>>, Tathagata Das 
mailto:t...@databricks.com>>
Subject: Re: How to recover in case user errors in streaming

Hmm, not sure why, but when I run this code, it always keeps on consuming from 
Kafka and proceeds ignoring the previous failed batches,

Also, Now that I get the attempt number from TaskContext and I have information 
of max retries, I am supposed to handle it in the try/catch block, but does it 
mean I’ve to handle these kind of exceptions / errors in every transformation 
step ( map, reduce, transform, etc. ), isn’t there any callback where it says 
it has been retried max number of times and before being ignored you’ve a 
handle to do whatever you want to do with the batch / message in hand.

Regards,
Amit

From: Cody Koeninger mailto:c...@koeninger.org>>
Date: Friday, June 26, 2015 at 11:32 AM
To: amit assudani mailto:aassud...@impetus.com>>
Cc: "user@spark.apache.org<mailto:user@spark.apache.org>" 
mailto:user@spark.apache.org>>, Tathagata Das 
mailto:t...@databricks.com>>
Subject: Re: How to recover in case user errors in streaming

No, if you have a bad message that you are continually throwing exceptions on, 
your stream will not progress to future batches.

On Fri, Jun 26, 2015 at 10:28 AM, Amit Assudani 
mailto:aassud...@impetus.com>> wrote:
Also, what I understand is, max failures doesn’t stop the entire stream, it 
fails the job created for the specific batch, but the subsequent batches still 
proceed, isn’t it right ? And question still remains, how to keep track of 
those failed batches ?

From: amit assudani mailto:aassud...@impetus.com>>
Date: Friday, June 26, 2015 at 11:21 AM
To: Cody Koeninger mailto:c...@koeninger.org>>

Cc: "user@spark.apache.org<mailto:user@spark.apache.org>" 
mailto:user@spark.apache.org>>, Tathagata Das 
mailto:t...@databricks.com>>
Subject: Re: How to recover in case user errors in streaming

Thanks for quick response,

My question here

Re: How to recover in case user errors in streaming

2015-06-29 Thread Amit Assudani
Thanks TD, this helps.

Looking forward to some fix where framework handles the batch failures by some 
callback methods. This will help not having to write try/catch in every 
transformation / action.

Regards,
Amit

From: Tathagata Das mailto:t...@databricks.com>>
Date: Saturday, June 27, 2015 at 5:14 AM
To: amit assudani mailto:aassud...@impetus.com>>
Cc: Cody Koeninger mailto:c...@koeninger.org>>, 
"user@spark.apache.org<mailto:user@spark.apache.org>" 
mailto:user@spark.apache.org>>
Subject: Re: How to recover in case user errors in streaming

I looked at the code and found that batch exceptions are indeed ignored. This 
is something that is worth fixing, that batch exceptions should not be silently 
ignored.

Also, you can catch failed batch jobs (irrespective of the number of retries) 
by catch the exception in foreachRDD. Here is an example.

dstream.foreachRDD { rdd =>

   try {

   } catch {

   }
}


This will catch failures at the granularity of the job, after all the max 
retries of a task has been done. But it will be hard to filter and find the 
push the failed record(s) somewhere. To do that, I would do use rdd.foreach or 
rdd.foreachPartition, inside which I would catch the exception and push that 
record out to another Kafka topic, and continue normal processing of other 
records. This would prevent the task process the partition from failing (as you 
are catching the bad records).

dstream.foreachRDD {  rdd =>

rdd.foreachPartition { iterator =>

 // Create Kafka producer for bad records

iterator.foreach { record =>
 try {
 // process record
 } catch {
case ExpectedException =>
// publish bad record to error topic in Kafka using above 
producer
 }
}
}
}


TD

PS: Apologies for the Scala examples, hope you get the idea :)

On Fri, Jun 26, 2015 at 9:56 AM, Amit Assudani 
mailto:aassud...@impetus.com>> wrote:
Also, I get TaskContext.get() null when used in foreach function below ( I get 
it when I use it in map, but the whole point here is to handle something that 
is breaking in action ). Please help. :(

From: amit assudani mailto:aassud...@impetus.com>>
Date: Friday, June 26, 2015 at 11:41 AM

To: Cody Koeninger mailto:c...@koeninger.org>>
Cc: "user@spark.apache.org<mailto:user@spark.apache.org>" 
mailto:user@spark.apache.org>>, Tathagata Das 
mailto:t...@databricks.com>>
Subject: Re: How to recover in case user errors in streaming

Hmm, not sure why, but when I run this code, it always keeps on consuming from 
Kafka and proceeds ignoring the previous failed batches,

Also, Now that I get the attempt number from TaskContext and I have information 
of max retries, I am supposed to handle it in the try/catch block, but does it 
mean I’ve to handle these kind of exceptions / errors in every transformation 
step ( map, reduce, transform, etc. ), isn’t there any callback where it says 
it has been retried max number of times and before being ignored you’ve a 
handle to do whatever you want to do with the batch / message in hand.

Regards,
Amit

From: Cody Koeninger mailto:c...@koeninger.org>>
Date: Friday, June 26, 2015 at 11:32 AM
To: amit assudani mailto:aassud...@impetus.com>>
Cc: "user@spark.apache.org<mailto:user@spark.apache.org>" 
mailto:user@spark.apache.org>>, Tathagata Das 
mailto:t...@databricks.com>>
Subject: Re: How to recover in case user errors in streaming

No, if you have a bad message that you are continually throwing exceptions on, 
your stream will not progress to future batches.

On Fri, Jun 26, 2015 at 10:28 AM, Amit Assudani 
mailto:aassud...@impetus.com>> wrote:
Also, what I understand is, max failures doesn’t stop the entire stream, it 
fails the job created for the specific batch, but the subsequent batches still 
proceed, isn’t it right ? And question still remains, how to keep track of 
those failed batches ?

From: amit assudani mailto:aassud...@impetus.com>>
Date: Friday, June 26, 2015 at 11:21 AM
To: Cody Koeninger mailto:c...@koeninger.org>>

Cc: "user@spark.apache.org<mailto:user@spark.apache.org>" 
mailto:user@spark.apache.org>>, Tathagata Das 
mailto:t...@databricks.com>>
Subject: Re: How to recover in case user errors in streaming

Thanks for quick response,

My question here is how do I know that the max retries are done ( because in my 
code I never know whether it is failure of first try or the last try ) and I 
need to handle this message, is there any callback ?

Also, I know the limitation of checkpoint in upgrading the code, but my main 
focus here to mitigate the connectivity issues to persistent store which gets 
resolved in a while, but how do I know which all messages failed and need 
rework ?

Regards,
Amit

From: Cody Koeninger 

Re: How to recover in case user errors in streaming

2015-06-26 Thread Amit Assudani
Also, I get TaskContext.get() null when used in foreach function below ( I get 
it when I use it in map, but the whole point here is to handle something that 
is breaking in action ). Please help. :(

From: amit assudani mailto:aassud...@impetus.com>>
Date: Friday, June 26, 2015 at 11:41 AM
To: Cody Koeninger mailto:c...@koeninger.org>>
Cc: "user@spark.apache.org<mailto:user@spark.apache.org>" 
mailto:user@spark.apache.org>>, Tathagata Das 
mailto:t...@databricks.com>>
Subject: Re: How to recover in case user errors in streaming

Hmm, not sure why, but when I run this code, it always keeps on consuming from 
Kafka and proceeds ignoring the previous failed batches,

Also, Now that I get the attempt number from TaskContext and I have information 
of max retries, I am supposed to handle it in the try/catch block, but does it 
mean I’ve to handle these kind of exceptions / errors in every transformation 
step ( map, reduce, transform, etc. ), isn’t there any callback where it says 
it has been retried max number of times and before being ignored you’ve a 
handle to do whatever you want to do with the batch / message in hand.

Regards,
Amit

From: Cody Koeninger mailto:c...@koeninger.org>>
Date: Friday, June 26, 2015 at 11:32 AM
To: amit assudani mailto:aassud...@impetus.com>>
Cc: "user@spark.apache.org<mailto:user@spark.apache.org>" 
mailto:user@spark.apache.org>>, Tathagata Das 
mailto:t...@databricks.com>>
Subject: Re: How to recover in case user errors in streaming

No, if you have a bad message that you are continually throwing exceptions on, 
your stream will not progress to future batches.

On Fri, Jun 26, 2015 at 10:28 AM, Amit Assudani 
mailto:aassud...@impetus.com>> wrote:
Also, what I understand is, max failures doesn’t stop the entire stream, it 
fails the job created for the specific batch, but the subsequent batches still 
proceed, isn’t it right ? And question still remains, how to keep track of 
those failed batches ?

From: amit assudani mailto:aassud...@impetus.com>>
Date: Friday, June 26, 2015 at 11:21 AM
To: Cody Koeninger mailto:c...@koeninger.org>>

Cc: "user@spark.apache.org<mailto:user@spark.apache.org>" 
mailto:user@spark.apache.org>>, Tathagata Das 
mailto:t...@databricks.com>>
Subject: Re: How to recover in case user errors in streaming

Thanks for quick response,

My question here is how do I know that the max retries are done ( because in my 
code I never know whether it is failure of first try or the last try ) and I 
need to handle this message, is there any callback ?

Also, I know the limitation of checkpoint in upgrading the code, but my main 
focus here to mitigate the connectivity issues to persistent store which gets 
resolved in a while, but how do I know which all messages failed and need 
rework ?

Regards,
Amit

From: Cody Koeninger mailto:c...@koeninger.org>>
Date: Friday, June 26, 2015 at 11:16 AM
To: amit assudani mailto:aassud...@impetus.com>>
Cc: "user@spark.apache.org<mailto:user@spark.apache.org>" 
mailto:user@spark.apache.org>>, Tathagata Das 
mailto:t...@databricks.com>>
Subject: Re: How to recover in case user errors in streaming

If you're consistently throwing exceptions and thus failing tasks, once you 
reach max failures the whole stream will stop.

It's up to you to either catch those exceptions, or restart your stream 
appropriately once it stops.

Keep in mind that if you're relying on checkpoints, and fixing the error 
requires changing your code, you may not be able to recover the checkpoint.

On Fri, Jun 26, 2015 at 9:05 AM, Amit Assudani 
mailto:aassud...@impetus.com>> wrote:
Problem: how do we recover from user errors (connectivity issues / storage 
service down / etc.)?
Environment: Spark streaming using Kafka Direct Streams
Code Snippet:

HashSet topicsSet = new HashSet(Arrays.asList("kafkaTopic1"));
HashMap kafkaParams = new HashMap();
kafkaParams.put("metadata.broker.list", "localhost:9092");
kafkaParams.put("auto.offset.reset", "smallest");


JavaPairInputDStream messages = KafkaUtils
.createDirectStream(jssc, String.class, String.class, StringDecoder.class, 
StringDecoder.class, kafkaParams, topicsSet);

JavaDStream inputStream = messages
   .map(newFunction, String>() {
   @Override
   public String call(Tuple2 tuple2) {
  returntuple2._2();
   }});

inputStream.foreachRDD(newFunction, Void>() {

   @Override
   public Void call(JavaRDD rdd)throws Exception {
  if(!rdd.isEmpty())
  {
rdd.foreach(newVoidFunction(){
@Override
  publicvoid call(String arg0)throws Exception {
System.out.println("rdd--"+arg0);
Thread.sleep(1000);

thrownew Exception(" :::user and/or se

Re: How to recover in case user errors in streaming

2015-06-26 Thread Amit Assudani
Hmm, not sure why, but when I run this code, it always keeps on consuming from 
Kafka and proceeds ignoring the previous failed batches,

Also, Now that I get the attempt number from TaskContext and I have information 
of max retries, I am supposed to handle it in the try/catch block, but does it 
mean I’ve to handle these kind of exceptions / errors in every transformation 
step ( map, reduce, transform, etc. ), isn’t there any callback where it says 
it has been retried max number of times and before being ignored you’ve a 
handle to do whatever you want to do with the batch / message in hand.

Regards,
Amit

From: Cody Koeninger mailto:c...@koeninger.org>>
Date: Friday, June 26, 2015 at 11:32 AM
To: amit assudani mailto:aassud...@impetus.com>>
Cc: "user@spark.apache.org<mailto:user@spark.apache.org>" 
mailto:user@spark.apache.org>>, Tathagata Das 
mailto:t...@databricks.com>>
Subject: Re: How to recover in case user errors in streaming

No, if you have a bad message that you are continually throwing exceptions on, 
your stream will not progress to future batches.

On Fri, Jun 26, 2015 at 10:28 AM, Amit Assudani 
mailto:aassud...@impetus.com>> wrote:
Also, what I understand is, max failures doesn’t stop the entire stream, it 
fails the job created for the specific batch, but the subsequent batches still 
proceed, isn’t it right ? And question still remains, how to keep track of 
those failed batches ?

From: amit assudani mailto:aassud...@impetus.com>>
Date: Friday, June 26, 2015 at 11:21 AM
To: Cody Koeninger mailto:c...@koeninger.org>>

Cc: "user@spark.apache.org<mailto:user@spark.apache.org>" 
mailto:user@spark.apache.org>>, Tathagata Das 
mailto:t...@databricks.com>>
Subject: Re: How to recover in case user errors in streaming

Thanks for quick response,

My question here is how do I know that the max retries are done ( because in my 
code I never know whether it is failure of first try or the last try ) and I 
need to handle this message, is there any callback ?

Also, I know the limitation of checkpoint in upgrading the code, but my main 
focus here to mitigate the connectivity issues to persistent store which gets 
resolved in a while, but how do I know which all messages failed and need 
rework ?

Regards,
Amit

From: Cody Koeninger mailto:c...@koeninger.org>>
Date: Friday, June 26, 2015 at 11:16 AM
To: amit assudani mailto:aassud...@impetus.com>>
Cc: "user@spark.apache.org<mailto:user@spark.apache.org>" 
mailto:user@spark.apache.org>>, Tathagata Das 
mailto:t...@databricks.com>>
Subject: Re: How to recover in case user errors in streaming

If you're consistently throwing exceptions and thus failing tasks, once you 
reach max failures the whole stream will stop.

It's up to you to either catch those exceptions, or restart your stream 
appropriately once it stops.

Keep in mind that if you're relying on checkpoints, and fixing the error 
requires changing your code, you may not be able to recover the checkpoint.

On Fri, Jun 26, 2015 at 9:05 AM, Amit Assudani 
mailto:aassud...@impetus.com>> wrote:
Problem: how do we recover from user errors (connectivity issues / storage 
service down / etc.)?
Environment: Spark streaming using Kafka Direct Streams
Code Snippet:

HashSet topicsSet = new HashSet(Arrays.asList("kafkaTopic1"));
HashMap kafkaParams = new HashMap();
kafkaParams.put("metadata.broker.list", "localhost:9092");
kafkaParams.put("auto.offset.reset", "smallest");


JavaPairInputDStream messages = KafkaUtils
.createDirectStream(jssc, String.class, String.class, StringDecoder.class, 
StringDecoder.class, kafkaParams, topicsSet);

JavaDStream inputStream = messages
   .map(newFunction, String>() {
   @Override
   public String call(Tuple2 tuple2) {
  returntuple2._2();
   }});

inputStream.foreachRDD(newFunction, Void>() {

   @Override
   public Void call(JavaRDD rdd)throws Exception {
  if(!rdd.isEmpty())
  {
rdd.foreach(newVoidFunction(){
@Override
  publicvoid call(String arg0)throws Exception {
System.out.println("rdd--"+arg0);
Thread.sleep(1000);

thrownew Exception(" :::user and/or service 
exception::"+arg0);

  }});

  }
  returnnull;
   }
});

Detailed Description: Using spark streaming I read the text messages from kafka 
using direct API. For sake of simplicity, all I do in processing is printing 
each message on console and sleep of 1 sec. as a placeholder for actual 
processing. Assuming we get a user error may be due to bad record, format error 
or the service connectivity issues or let’s say the persistent store downtime. 
I’ve represented that with throwing an Exception from 

Re: How to recover in case user errors in streaming

2015-06-26 Thread Amit Assudani
Also, what I understand is, max failures doesn’t stop the entire stream, it 
fails the job created for the specific batch, but the subsequent batches still 
proceed, isn’t it right ? And question still remains, how to keep track of 
those failed batches ?

From: amit assudani mailto:aassud...@impetus.com>>
Date: Friday, June 26, 2015 at 11:21 AM
To: Cody Koeninger mailto:c...@koeninger.org>>
Cc: "user@spark.apache.org<mailto:user@spark.apache.org>" 
mailto:user@spark.apache.org>>, Tathagata Das 
mailto:t...@databricks.com>>
Subject: Re: How to recover in case user errors in streaming

Thanks for quick response,

My question here is how do I know that the max retries are done ( because in my 
code I never know whether it is failure of first try or the last try ) and I 
need to handle this message, is there any callback ?

Also, I know the limitation of checkpoint in upgrading the code, but my main 
focus here to mitigate the connectivity issues to persistent store which gets 
resolved in a while, but how do I know which all messages failed and need 
rework ?

Regards,
Amit

From: Cody Koeninger mailto:c...@koeninger.org>>
Date: Friday, June 26, 2015 at 11:16 AM
To: amit assudani mailto:aassud...@impetus.com>>
Cc: "user@spark.apache.org<mailto:user@spark.apache.org>" 
mailto:user@spark.apache.org>>, Tathagata Das 
mailto:t...@databricks.com>>
Subject: Re: How to recover in case user errors in streaming

If you're consistently throwing exceptions and thus failing tasks, once you 
reach max failures the whole stream will stop.

It's up to you to either catch those exceptions, or restart your stream 
appropriately once it stops.

Keep in mind that if you're relying on checkpoints, and fixing the error 
requires changing your code, you may not be able to recover the checkpoint.

On Fri, Jun 26, 2015 at 9:05 AM, Amit Assudani 
mailto:aassud...@impetus.com>> wrote:
Problem: how do we recover from user errors (connectivity issues / storage 
service down / etc.)?
Environment: Spark streaming using Kafka Direct Streams
Code Snippet:

HashSet topicsSet = new HashSet(Arrays.asList("kafkaTopic1"));
HashMap kafkaParams = new HashMap();
kafkaParams.put("metadata.broker.list", "localhost:9092");
kafkaParams.put("auto.offset.reset", "smallest");


JavaPairInputDStream messages = KafkaUtils
.createDirectStream(jssc, String.class, String.class, StringDecoder.class, 
StringDecoder.class, kafkaParams, topicsSet);

JavaDStream inputStream = messages
   .map(newFunction, String>() {
   @Override
   public String call(Tuple2 tuple2) {
  returntuple2._2();
   }});

inputStream.foreachRDD(newFunction, Void>() {

   @Override
   public Void call(JavaRDD rdd)throws Exception {
  if(!rdd.isEmpty())
  {
rdd.foreach(newVoidFunction(){
@Override
  publicvoid call(String arg0)throws Exception {
System.out.println("rdd--"+arg0);
Thread.sleep(1000);

thrownew Exception(" :::user and/or service 
exception::"+arg0);

  }});

  }
  returnnull;
   }
});

Detailed Description: Using spark streaming I read the text messages from kafka 
using direct API. For sake of simplicity, all I do in processing is printing 
each message on console and sleep of 1 sec. as a placeholder for actual 
processing. Assuming we get a user error may be due to bad record, format error 
or the service connectivity issues or let’s say the persistent store downtime. 
I’ve represented that with throwing an Exception from foreach block. I 
understand spark retries this configurable number of times and  proceeds ahead. 
The question is what happens to those failed messages, does ( if yes when ) 
spark re-tries those ? If not, does it have any callback method so as user can 
log / dump it in error queue and provision it for further analysis and / or 
retrials manually. Also, fyi, checkpoints are enabled and above code is in 
create context method to recover from spark driver / worker failures.








NOTE: This message may contain information that is confidential, proprietary, 
privileged or otherwise protected by law. The message is intended solely for 
the named addressee. If received in error, please destroy and notify the 
sender. Any use of this email is prohibited when received in error. Impetus 
does not represent, warrant and/or guarantee, that the integrity of this 
communication has been maintained nor that the communication is free of errors, 
virus, interception or interference.


-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org>
For additional co

Re: How to recover in case user errors in streaming

2015-06-26 Thread Amit Assudani
Thanks for quick response,

My question here is how do I know that the max retries are done ( because in my 
code I never know whether it is failure of first try or the last try ) and I 
need to handle this message, is there any callback ?

Also, I know the limitation of checkpoint in upgrading the code, but my main 
focus here to mitigate the connectivity issues to persistent store which gets 
resolved in a while, but how do I know which all messages failed and need 
rework ?

Regards,
Amit

From: Cody Koeninger mailto:c...@koeninger.org>>
Date: Friday, June 26, 2015 at 11:16 AM
To: amit assudani mailto:aassud...@impetus.com>>
Cc: "user@spark.apache.org<mailto:user@spark.apache.org>" 
mailto:user@spark.apache.org>>, Tathagata Das 
mailto:t...@databricks.com>>
Subject: Re: How to recover in case user errors in streaming

If you're consistently throwing exceptions and thus failing tasks, once you 
reach max failures the whole stream will stop.

It's up to you to either catch those exceptions, or restart your stream 
appropriately once it stops.

Keep in mind that if you're relying on checkpoints, and fixing the error 
requires changing your code, you may not be able to recover the checkpoint.

On Fri, Jun 26, 2015 at 9:05 AM, Amit Assudani 
mailto:aassud...@impetus.com>> wrote:
Problem: how do we recover from user errors (connectivity issues / storage 
service down / etc.)?
Environment: Spark streaming using Kafka Direct Streams
Code Snippet:

HashSet topicsSet = new HashSet(Arrays.asList("kafkaTopic1"));
HashMap kafkaParams = new HashMap();
kafkaParams.put("metadata.broker.list", "localhost:9092");
kafkaParams.put("auto.offset.reset", "smallest");


JavaPairInputDStream messages = KafkaUtils
.createDirectStream(jssc, String.class, String.class, StringDecoder.class, 
StringDecoder.class, kafkaParams, topicsSet);

JavaDStream inputStream = messages
   .map(newFunction, String>() {
   @Override
   public String call(Tuple2 tuple2) {
  returntuple2._2();
   }});

inputStream.foreachRDD(newFunction, Void>() {

   @Override
   public Void call(JavaRDD rdd)throws Exception {
  if(!rdd.isEmpty())
  {
rdd.foreach(newVoidFunction(){
@Override
  publicvoid call(String arg0)throws Exception {
System.out.println("rdd--"+arg0);
Thread.sleep(1000);

thrownew Exception(" :::user and/or service 
exception::"+arg0);

  }});

  }
  returnnull;
   }
});

Detailed Description: Using spark streaming I read the text messages from kafka 
using direct API. For sake of simplicity, all I do in processing is printing 
each message on console and sleep of 1 sec. as a placeholder for actual 
processing. Assuming we get a user error may be due to bad record, format error 
or the service connectivity issues or let's say the persistent store downtime. 
I've represented that with throwing an Exception from foreach block. I 
understand spark retries this configurable number of times and  proceeds ahead. 
The question is what happens to those failed messages, does ( if yes when ) 
spark re-tries those ? If not, does it have any callback method so as user can 
log / dump it in error queue and provision it for further analysis and / or 
retrials manually. Also, fyi, checkpoints are enabled and above code is in 
create context method to recover from spark driver / worker failures.








NOTE: This message may contain information that is confidential, proprietary, 
privileged or otherwise protected by law. The message is intended solely for 
the named addressee. If received in error, please destroy and notify the 
sender. Any use of this email is prohibited when received in error. Impetus 
does not represent, warrant and/or guarantee, that the integrity of this 
communication has been maintained nor that the communication is free of errors, 
virus, interception or interference.


-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org>
For additional commands, e-mail: 
user-h...@spark.apache.org<mailto:user-h...@spark.apache.org>









NOTE: This message may contain information that is confidential, proprietary, 
privileged or otherwise protected by law. The message is intended solely for 
the named addressee. If received in error, please destroy and notify the 
sender. Any use of this email is prohibited when received in error. Impetus 
does not represent, warrant and/or guarantee, that the integrity of this 
communication has been maintained nor that the communication is free of errors, 
virus, interception or interference.


How to recover in case user errors in streaming

2015-06-26 Thread Amit Assudani
Problem: how do we recover from user errors (connectivity issues / storage 
service down / etc.)?
Environment: Spark streaming using Kafka Direct Streams
Code Snippet:

HashSet topicsSet = new HashSet(Arrays.asList("kafkaTopic1"));
HashMap kafkaParams = new HashMap();
kafkaParams.put("metadata.broker.list", "localhost:9092");
kafkaParams.put("auto.offset.reset", "smallest");


JavaPairInputDStream messages = KafkaUtils
.createDirectStream(jssc, String.class, String.class, StringDecoder.class, 
StringDecoder.class, kafkaParams, topicsSet);

JavaDStream inputStream = messages
   .map(new Function, String>() {
   @Override
   public String call(Tuple2 tuple2) {
  return tuple2._2();
   }});

inputStream.foreachRDD(new Function, Void>() {

   @Override
   public Void call(JavaRDD rdd) throws Exception {
  if(!rdd.isEmpty())
  {
rdd.foreach(new VoidFunction(){
@Override
  public void call(String arg0) throws Exception {
System.out.println("rdd--"+arg0);
Thread.sleep(1000);

throw new Exception(" :::user and/or service 
exception::"+arg0);

  }});

  }
  return null;
   }
});

Detailed Description: Using spark streaming I read the text messages from kafka 
using direct API. For sake of simplicity, all I do in processing is printing 
each message on console and sleep of 1 sec. as a placeholder for actual 
processing. Assuming we get a user error may be due to bad record, format error 
or the service connectivity issues or let's say the persistent store downtime. 
I've represented that with throwing an Exception from foreach block. I 
understand spark retries this configurable number of times and  proceeds ahead. 
The question is what happens to those failed messages, does ( if yes when ) 
spark re-tries those ? If not, does it have any callback method so as user can 
log / dump it in error queue and provision it for further analysis and / or 
retrials manually. Also, fyi, checkpoints are enabled and above code is in 
create context method to recover from spark driver / worker failures.








NOTE: This message may contain information that is confidential, proprietary, 
privileged or otherwise protected by law. The message is intended solely for 
the named addressee. If received in error, please destroy and notify the 
sender. Any use of this email is prohibited when received in error. Impetus 
does not represent, warrant and/or guarantee, that the integrity of this 
communication has been maintained nor that the communication is free of errors, 
virus, interception or interference.


default.xml
Description: default.xml

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Re: Lookup / Access of master data in spark streaming

2015-04-09 Thread Amit Assudani
Thanks a lot TD for detailed answers. The answers lead to few more questions,


  1.  "the transform RDD-to-RDD function runs on the driver “ - I didn’t 
understand this, does it mean when I use transform function on DStream, it is 
not parallelized, surely I m missing something here.
  2.   updateStateByKey I think won’t work in this use case,  I have three 
separate attribute streams ( with different frequencies ) make up the combined 
state ( i.e. Entity ) at point in time on which I want to do some processing. 
Do you think otherwise ?
  3.  transform+join seems only option so far, but any guestimate how would 
this perform/ react on cluster ? Assuming, master data in 100s of Gbs, and join 
is based on some row key. We are talking about slice of stream data to be 
joined with 100s of Gbs of master data continuously. Is it something can be 
done but should not be done ?

Regards,
Amit

From: Tathagata Das mailto:t...@databricks.com>>
Date: Thursday, April 9, 2015 at 3:13 PM
To: amit assudani mailto:aassud...@impetus.com>>
Cc: "user@spark.apache.org<mailto:user@spark.apache.org>" 
mailto:user@spark.apache.org>>
Subject: Re: Lookup / Access of master data in spark streaming

Responses inline. Hope they help.

On Thu, Apr 9, 2015 at 8:20 AM, Amit Assudani 
mailto:aassud...@impetus.com>> wrote:
Hi Friends,

I am trying to solve a use case in spark streaming, I need help on getting to 
right approach on lookup / update the master data.

Use case ( simplified )
I’ve a dataset of entity with three attributes and identifier/row key in a 
persistent store.

Each attribute along with row key come from a different stream let’s say, 
effectively 3 source streams.

Now whenever any attribute comes up, I want to update/sync the persistent store 
and do some processing, but the processing would require the latest state of 
entity with latest values of three attributes.

I wish if I have the all the entities cached in some sort of centralized cache 
( like we have data in hdfs ) within spark streaming which may be used for data 
local processing. But I assume there is no such thing.

potential approaches I m thinking of, I suspect first two are not feasible, but 
I want to confirm,
  1.  Is Broadcast Variables mutable ? If yes, can I use it as cache for 
all entities sizing  around 100s of GBs provided i have a cluster with enough 
RAM.

Broadcast variables are not mutable. But you can always create a new broadcast 
variable when you want and use the "latest" broadcast variable in your 
computation.

dstream.transform { rdd =>

   val latestBroacast = getLatestBroadcastVariable()  // fetch existing or 
update+create new and return
   val transformedRDD = rdd. ..  // use  latestBroacast in RDD 
tranformations
   transformedRDD
}

Since the transform RDD-to-RDD function runs on the driver every batch 
interval, it will always use the latest broadcast variable that you want. 
Though note that whenever you create a new broadcast, the next batch may take a 
little longer to as the data needs to be actually broadcasted out. That can 
also be made asynchronous by running a simple task (to force the broadcasting 
out) on any new broadcast variable in a different thread as Spark Streaming 
batch schedule, but using the same underlying Spark Context.



  1.  Is there any kind of sticky partition possible, so that I route my stream 
data to go through the same node where I've the corresponding entities, subset 
of entire store, cached in memory within JVM / off heap on the node, this would 
avoid lookups from store.

You could use updateStateByKey. That is quite sticky, but does not eliminate 
the possibility that it can run on a different node. In fact this is necessary 
for fault-tolerance - what if the node it was supposed to run goes down? The 
task will be run on a different node, and you have to  design your application 
such that it can handle that.


  1.  If I stream the entities from persistent store into engine, this becomes 
4th stream - the entity stream, how do i use join / merge to enable stream 
1,2,3 to lookup and update the data from stream 4. Would DStream.join work for 
few seconds worth of data in attribute streams with all data in entity stream ? 
Or do I use transform and within that use rdd join, I’ve doubts if I am leaning 
towards core spark approach in spark streaming ?

Depends on what kind of join! If you want the join every batch in stream with a 
static data set (or rarely updated dataset), the transform+join is the way to 
go. If you want to join one stream with a window of data from another stream, 
then DStream.join is the way to go.

  1.

  1.  The last approach, which i think will surely work but i want to avoid, is 
i keep the entities in IMDB and do lookup/update calls on from stream 1,2 and 3.

Any help is deeply appreciated as this would help me design my system 
efficiently and the solution approach may become a 

Lookup / Access of master data in spark streaming

2015-04-09 Thread Amit Assudani
Hi Friends,

I am trying to solve a use case in spark streaming, I need help on getting to 
right approach on lookup / update the master data.

Use case ( simplified )
I've a dataset of entity with three attributes and identifier/row key in a 
persistent store.

Each attribute along with row key come from a different stream let's say, 
effectively 3 source streams.

Now whenever any attribute comes up, I want to update/sync the persistent store 
and do some processing, but the processing would require the latest state of 
entity with latest values of three attributes.

I wish if I have the all the entities cached in some sort of centralized cache 
( like we have data in hdfs ) within spark streaming which may be used for data 
local processing. But I assume there is no such thing.

potential approaches I m thinking of, I suspect first two are not feasible, but 
I want to confirm,
  1.  Is Broadcast Variables mutable ? If yes, can I use it as cache for 
all entities sizing  around 100s of GBs provided i have a cluster with enough 
RAM.

  1.  Is there any kind of sticky partition possible, so that I route my stream 
data to go through the same node where I've the corresponding entities, subset 
of entire store, cached in memory within JVM / off heap on the node, this would 
avoid lookups from store.
  2.  If I stream the entities from persistent store into engine, this becomes 
4th stream - the entity stream, how do i use join / merge to enable stream 
1,2,3 to lookup and update the data from stream 4. Would DStream.join work for 
few seconds worth of data in attribute streams with all data in entity stream ? 
Or do I use transform and within that use rdd join, I've doubts if I am leaning 
towards core spark approach in spark streaming ?

  1.  The last approach, which i think will surely work but i want to avoid, is 
i keep the entities in IMDB and do lookup/update calls on from stream 1,2 and 3.

Any help is deeply appreciated as this would help me design my system 
efficiently and the solution approach may become a beacon for lookup master 
data sort of problems.

Regards,
Amit








NOTE: This message may contain information that is confidential, proprietary, 
privileged or otherwise protected by law. The message is intended solely for 
the named addressee. If received in error, please destroy and notify the 
sender. Any use of this email is prohibited when received in error. Impetus 
does not represent, warrant and/or guarantee, that the integrity of this 
communication has been maintained nor that the communication is free of errors, 
virus, interception or interference.