Re: Checkpointing doesn't appear to be working for direct streaming from Kafka

2015-08-14 Thread Dmitry Goldenberg
Our additional question on checkpointing is basically the logistics of it --

At which point does the data get written into checkpointing?  Is it written
as soon as the driver program retrieves an RDD from Kafka (or another
source)?  Or, is it written after that RDD has been processed and we're
basically moving on to the next RDD?

What I'm driving at is, what happens if the driver program is killed?  The
next time it's started, will it know, from Spark Streaming's checkpointing,
to resume from the same RDD that was being processed at the time of the
program getting killed?  In other words, will we, upon restarting the
consumer, resume from the RDD that was unfinished, or will we be looking at
the next RDD?

Will we pick up from the last known *successfully processed* topic offset?

Thanks.




On Fri, Jul 31, 2015 at 1:52 PM, Sean Owen so...@cloudera.com wrote:

 If you've set the checkpoint dir, it seems like indeed the intent is
 to use a default checkpoint interval in DStream:

 private[streaming] def initialize(time: Time) {
 ...
   // Set the checkpoint interval to be slideDuration or 10 seconds,
 which ever is larger
   if (mustCheckpoint  checkpointDuration == null) {
 checkpointDuration = slideDuration * math.ceil(Seconds(10) /
 slideDuration).toInt
 logInfo(Checkpoint interval automatically set to  +
 checkpointDuration)
   }

 Do you see that log message? what's the interval? that could at least
 explain why it's not doing anything, if it's quite long.

 It sort of seems wrong though since
 https://spark.apache.org/docs/latest/streaming-programming-guide.html
 suggests it was intended to be a multiple of the batch interval. The
 slide duration wouldn't always be relevant anyway.

 On Fri, Jul 31, 2015 at 6:16 PM, Dmitry Goldenberg
 dgoldenberg...@gmail.com wrote:
  I've instrumented checkpointing per the programming guide and I can tell
  that Spark Streaming is creating the checkpoint directories but I'm not
  seeing any content being created in those directories nor am I seeing the
  effects I'd expect from checkpointing.  I'd expect any data that comes
 into
  Kafka while the consumers are down, to get picked up when the consumers
 are
  restarted; I'm not seeing that.
 
  For now my checkpoint directory is set to the local file system with the
  directory URI being in this form:   file:///mnt/dir1/dir2.  I see a
  subdirectory named with a UUID being created under there but no files.
 
  I'm using a custom JavaStreamingContextFactory which creates a
  JavaStreamingContext with the directory set into it via the
  checkpoint(String) method.
 
  I'm currently not invoking the checkpoint(Duration) method on the DStream
  since I want to first rely on Spark's default checkpointing interval.  My
  streaming batch duration millis is set to 1 second.
 
  Anyone have any idea what might be going wrong?
 
  Also, at which point does Spark delete files from checkpointing?
 
  Thanks.



Re: Checkpointing doesn't appear to be working for direct streaming from Kafka

2015-08-14 Thread Cody Koeninger
You'll resume and re-process the rdd that didnt finish

On Fri, Aug 14, 2015 at 1:31 PM, Dmitry Goldenberg dgoldenberg...@gmail.com
 wrote:

 Our additional question on checkpointing is basically the logistics of it
 --

 At which point does the data get written into checkpointing?  Is it
 written as soon as the driver program retrieves an RDD from Kafka (or
 another source)?  Or, is it written after that RDD has been processed and
 we're basically moving on to the next RDD?

 What I'm driving at is, what happens if the driver program is killed?  The
 next time it's started, will it know, from Spark Streaming's checkpointing,
 to resume from the same RDD that was being processed at the time of the
 program getting killed?  In other words, will we, upon restarting the
 consumer, resume from the RDD that was unfinished, or will we be looking at
 the next RDD?

 Will we pick up from the last known *successfully processed* topic offset?

 Thanks.




 On Fri, Jul 31, 2015 at 1:52 PM, Sean Owen so...@cloudera.com wrote:

 If you've set the checkpoint dir, it seems like indeed the intent is
 to use a default checkpoint interval in DStream:

 private[streaming] def initialize(time: Time) {
 ...
   // Set the checkpoint interval to be slideDuration or 10 seconds,
 which ever is larger
   if (mustCheckpoint  checkpointDuration == null) {
 checkpointDuration = slideDuration * math.ceil(Seconds(10) /
 slideDuration).toInt
 logInfo(Checkpoint interval automatically set to  +
 checkpointDuration)
   }

 Do you see that log message? what's the interval? that could at least
 explain why it's not doing anything, if it's quite long.

 It sort of seems wrong though since
 https://spark.apache.org/docs/latest/streaming-programming-guide.html
 suggests it was intended to be a multiple of the batch interval. The
 slide duration wouldn't always be relevant anyway.

 On Fri, Jul 31, 2015 at 6:16 PM, Dmitry Goldenberg
 dgoldenberg...@gmail.com wrote:
  I've instrumented checkpointing per the programming guide and I can tell
  that Spark Streaming is creating the checkpoint directories but I'm not
  seeing any content being created in those directories nor am I seeing
 the
  effects I'd expect from checkpointing.  I'd expect any data that comes
 into
  Kafka while the consumers are down, to get picked up when the consumers
 are
  restarted; I'm not seeing that.
 
  For now my checkpoint directory is set to the local file system with the
  directory URI being in this form:   file:///mnt/dir1/dir2.  I see a
  subdirectory named with a UUID being created under there but no files.
 
  I'm using a custom JavaStreamingContextFactory which creates a
  JavaStreamingContext with the directory set into it via the
  checkpoint(String) method.
 
  I'm currently not invoking the checkpoint(Duration) method on the
 DStream
  since I want to first rely on Spark's default checkpointing interval.
 My
  streaming batch duration millis is set to 1 second.
 
  Anyone have any idea what might be going wrong?
 
  Also, at which point does Spark delete files from checkpointing?
 
  Thanks.





Re: Checkpointing doesn't appear to be working for direct streaming from Kafka

2015-08-14 Thread Dmitry Goldenberg
Thanks, Cody. It sounds like Spark Streaming has enough state info to know
how many batches have been processed and if not all of them then the RDD is
'unfinished'. I wonder if it would know whether the last micro-batch has
been fully processed successfully. Hypothetically, the driver program could
terminate as the last batch is being processed...

On Fri, Aug 14, 2015 at 6:17 PM, Cody Koeninger c...@koeninger.org wrote:

 You'll resume and re-process the rdd that didnt finish

 On Fri, Aug 14, 2015 at 1:31 PM, Dmitry Goldenberg 
 dgoldenberg...@gmail.com wrote:

 Our additional question on checkpointing is basically the logistics of it
 --

 At which point does the data get written into checkpointing?  Is it
 written as soon as the driver program retrieves an RDD from Kafka (or
 another source)?  Or, is it written after that RDD has been processed and
 we're basically moving on to the next RDD?

 What I'm driving at is, what happens if the driver program is killed?
 The next time it's started, will it know, from Spark Streaming's
 checkpointing, to resume from the same RDD that was being processed at the
 time of the program getting killed?  In other words, will we, upon
 restarting the consumer, resume from the RDD that was unfinished, or will
 we be looking at the next RDD?

 Will we pick up from the last known *successfully processed* topic
 offset?

 Thanks.




 On Fri, Jul 31, 2015 at 1:52 PM, Sean Owen so...@cloudera.com wrote:

 If you've set the checkpoint dir, it seems like indeed the intent is
 to use a default checkpoint interval in DStream:

 private[streaming] def initialize(time: Time) {
 ...
   // Set the checkpoint interval to be slideDuration or 10 seconds,
 which ever is larger
   if (mustCheckpoint  checkpointDuration == null) {
 checkpointDuration = slideDuration * math.ceil(Seconds(10) /
 slideDuration).toInt
 logInfo(Checkpoint interval automatically set to  +
 checkpointDuration)
   }

 Do you see that log message? what's the interval? that could at least
 explain why it's not doing anything, if it's quite long.

 It sort of seems wrong though since
 https://spark.apache.org/docs/latest/streaming-programming-guide.html
 suggests it was intended to be a multiple of the batch interval. The
 slide duration wouldn't always be relevant anyway.

 On Fri, Jul 31, 2015 at 6:16 PM, Dmitry Goldenberg
 dgoldenberg...@gmail.com wrote:
  I've instrumented checkpointing per the programming guide and I can
 tell
  that Spark Streaming is creating the checkpoint directories but I'm not
  seeing any content being created in those directories nor am I seeing
 the
  effects I'd expect from checkpointing.  I'd expect any data that comes
 into
  Kafka while the consumers are down, to get picked up when the
 consumers are
  restarted; I'm not seeing that.
 
  For now my checkpoint directory is set to the local file system with
 the
  directory URI being in this form:   file:///mnt/dir1/dir2.  I see a
  subdirectory named with a UUID being created under there but no files.
 
  I'm using a custom JavaStreamingContextFactory which creates a
  JavaStreamingContext with the directory set into it via the
  checkpoint(String) method.
 
  I'm currently not invoking the checkpoint(Duration) method on the
 DStream
  since I want to first rely on Spark's default checkpointing interval.
 My
  streaming batch duration millis is set to 1 second.
 
  Anyone have any idea what might be going wrong?
 
  Also, at which point does Spark delete files from checkpointing?
 
  Thanks.






Re: Checkpointing doesn't appear to be working for direct streaming from Kafka

2015-07-31 Thread Sean Owen
If you've set the checkpoint dir, it seems like indeed the intent is
to use a default checkpoint interval in DStream:

private[streaming] def initialize(time: Time) {
...
  // Set the checkpoint interval to be slideDuration or 10 seconds,
which ever is larger
  if (mustCheckpoint  checkpointDuration == null) {
checkpointDuration = slideDuration * math.ceil(Seconds(10) /
slideDuration).toInt
logInfo(Checkpoint interval automatically set to  + checkpointDuration)
  }

Do you see that log message? what's the interval? that could at least
explain why it's not doing anything, if it's quite long.

It sort of seems wrong though since
https://spark.apache.org/docs/latest/streaming-programming-guide.html
suggests it was intended to be a multiple of the batch interval. The
slide duration wouldn't always be relevant anyway.

On Fri, Jul 31, 2015 at 6:16 PM, Dmitry Goldenberg
dgoldenberg...@gmail.com wrote:
 I've instrumented checkpointing per the programming guide and I can tell
 that Spark Streaming is creating the checkpoint directories but I'm not
 seeing any content being created in those directories nor am I seeing the
 effects I'd expect from checkpointing.  I'd expect any data that comes into
 Kafka while the consumers are down, to get picked up when the consumers are
 restarted; I'm not seeing that.

 For now my checkpoint directory is set to the local file system with the
 directory URI being in this form:   file:///mnt/dir1/dir2.  I see a
 subdirectory named with a UUID being created under there but no files.

 I'm using a custom JavaStreamingContextFactory which creates a
 JavaStreamingContext with the directory set into it via the
 checkpoint(String) method.

 I'm currently not invoking the checkpoint(Duration) method on the DStream
 since I want to first rely on Spark's default checkpointing interval.  My
 streaming batch duration millis is set to 1 second.

 Anyone have any idea what might be going wrong?

 Also, at which point does Spark delete files from checkpointing?

 Thanks.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Checkpointing doesn't appear to be working for direct streaming from Kafka

2015-07-31 Thread Dmitry Goldenberg
It looks like there's an issue with the 'Parameters' pojo I'm using within
my driver program. For some reason that needs to be serializable, which is
odd.

java.io.NotSerializableException: com.kona.consumer.kafka.spark.Parameters


Giving it another whirl though having to make it serializable seems odd to
me..

On Fri, Jul 31, 2015 at 1:52 PM, Sean Owen so...@cloudera.com wrote:

 If you've set the checkpoint dir, it seems like indeed the intent is
 to use a default checkpoint interval in DStream:

 private[streaming] def initialize(time: Time) {
 ...
   // Set the checkpoint interval to be slideDuration or 10 seconds,
 which ever is larger
   if (mustCheckpoint  checkpointDuration == null) {
 checkpointDuration = slideDuration * math.ceil(Seconds(10) /
 slideDuration).toInt
 logInfo(Checkpoint interval automatically set to  +
 checkpointDuration)
   }

 Do you see that log message? what's the interval? that could at least
 explain why it's not doing anything, if it's quite long.

 It sort of seems wrong though since
 https://spark.apache.org/docs/latest/streaming-programming-guide.html
 suggests it was intended to be a multiple of the batch interval. The
 slide duration wouldn't always be relevant anyway.

 On Fri, Jul 31, 2015 at 6:16 PM, Dmitry Goldenberg
 dgoldenberg...@gmail.com wrote:
  I've instrumented checkpointing per the programming guide and I can tell
  that Spark Streaming is creating the checkpoint directories but I'm not
  seeing any content being created in those directories nor am I seeing the
  effects I'd expect from checkpointing.  I'd expect any data that comes
 into
  Kafka while the consumers are down, to get picked up when the consumers
 are
  restarted; I'm not seeing that.
 
  For now my checkpoint directory is set to the local file system with the
  directory URI being in this form:   file:///mnt/dir1/dir2.  I see a
  subdirectory named with a UUID being created under there but no files.
 
  I'm using a custom JavaStreamingContextFactory which creates a
  JavaStreamingContext with the directory set into it via the
  checkpoint(String) method.
 
  I'm currently not invoking the checkpoint(Duration) method on the DStream
  since I want to first rely on Spark's default checkpointing interval.  My
  streaming batch duration millis is set to 1 second.
 
  Anyone have any idea what might be going wrong?
 
  Also, at which point does Spark delete files from checkpointing?
 
  Thanks.



Re: Checkpointing doesn't appear to be working for direct streaming from Kafka

2015-07-31 Thread Dmitry Goldenberg
I'll check the log info message..

Meanwhile, the code is basically

public class KafkaSparkStreamingDriver implements Serializable {

..

SparkConf sparkConf = createSparkConf(appName, kahunaEnv);

JavaStreamingContext jssc = params.isCheckpointed() ?
createCheckpointedContext(sparkConf, params) : createContext(sparkConf,
params);


jssc.start();

jssc.awaitTermination();

jssc.close();

..

  private JavaStreamingContext createCheckpointedContext(SparkConf sparkConf,
Parameters params) {

JavaStreamingContextFactory factory = new JavaStreamingContextFactory()
{

  @Override

  public JavaStreamingContext create() {

return createContext(sparkConf, params);

  }

};

return JavaStreamingContext.getOrCreate(params.getCheckpointDir(),
factory);

  }

...

  private JavaStreamingContext createContext(SparkConf sparkConf,
Parameters params) {

// Create context with the specified batch interval, in milliseconds.

JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
Durations.milliseconds(params.getBatchDurationMillis()));

// Set the checkpoint directory, if we're checkpointing

if (params.isCheckpointed()) {

  jssc.checkpoint(params.getCheckpointDir());

}


SetString topicsSet = new HashSetString(Arrays.asList(params
.getTopic()));


// Set the Kafka parameters.

MapString, String kafkaParams = new HashMapString, String();

kafkaParams.put(KafkaProducerProperties.METADATA_BROKER_LIST, params
.getBrokerList());

if (StringUtils.isNotBlank(params.getAutoOffsetReset())) {

  kafkaParams.put(KafkaConsumerProperties.AUTO_OFFSET_RESET, params
.getAutoOffsetReset());

}


// Create direct Kafka stream with the brokers and the topic.

JavaPairInputDStreamString, String messages =
KafkaUtils.createDirectStream(

  jssc,

  String.class,

  String.class,

  StringDecoder.class,

  StringDecoder.class,

  kafkaParams,

  topicsSet);

// See if there's an override of the default checkpoint duration.

if (params.isCheckpointed()  params.getCheckpointMillis()  0L) {

  messages.checkpoint(Durations.milliseconds(params
.getCheckpointMillis()));

}

.




On Fri, Jul 31, 2015 at 1:52 PM, Sean Owen so...@cloudera.com wrote:

 If you've set the checkpoint dir, it seems like indeed the intent is
 to use a default checkpoint interval in DStream:

 private[streaming] def initialize(time: Time) {
 ...
   // Set the checkpoint interval to be slideDuration or 10 seconds,
 which ever is larger
   if (mustCheckpoint  checkpointDuration == null) {
 checkpointDuration = slideDuration * math.ceil(Seconds(10) /
 slideDuration).toInt
 logInfo(Checkpoint interval automatically set to  +
 checkpointDuration)
   }

 Do you see that log message? what's the interval? that could at least
 explain why it's not doing anything, if it's quite long.

 It sort of seems wrong though since
 https://spark.apache.org/docs/latest/streaming-programming-guide.html
 suggests it was intended to be a multiple of the batch interval. The
 slide duration wouldn't always be relevant anyway.

 On Fri, Jul 31, 2015 at 6:16 PM, Dmitry Goldenberg
 dgoldenberg...@gmail.com wrote:
  I've instrumented checkpointing per the programming guide and I can tell
  that Spark Streaming is creating the checkpoint directories but I'm not
  seeing any content being created in those directories nor am I seeing the
  effects I'd expect from checkpointing.  I'd expect any data that comes
 into
  Kafka while the consumers are down, to get picked up when the consumers
 are
  restarted; I'm not seeing that.
 
  For now my checkpoint directory is set to the local file system with the
  directory URI being in this form:   file:///mnt/dir1/dir2.  I see a
  subdirectory named with a UUID being created under there but no files.
 
  I'm using a custom JavaStreamingContextFactory which creates a
  JavaStreamingContext with the directory set into it via the
  checkpoint(String) method.
 
  I'm currently not invoking the checkpoint(Duration) method on the DStream
  since I want to first rely on Spark's default checkpointing interval.  My
  streaming batch duration millis is set to 1 second.
 
  Anyone have any idea what might be going wrong?
 
  Also, at which point does Spark delete files from checkpointing?
 
  Thanks.



Re: Checkpointing doesn't appear to be working for direct streaming from Kafka

2015-07-31 Thread Cody Koeninger
Show us the relevant code

On Fri, Jul 31, 2015 at 12:16 PM, Dmitry Goldenberg 
dgoldenberg...@gmail.com wrote:

 I've instrumented checkpointing per the programming guide and I can tell
 that Spark Streaming is creating the checkpoint directories but I'm not
 seeing any content being created in those directories nor am I seeing the
 effects I'd expect from checkpointing.  I'd expect any data that comes into
 Kafka while the consumers are down, to get picked up when the consumers are
 restarted; I'm not seeing that.

 For now my checkpoint directory is set to the local file system with the
 directory URI being in this form:   file:///mnt/dir1/dir2.  I see a
 subdirectory named with a UUID being created under there but no files.

 I'm using a custom JavaStreamingContextFactory which creates a
 JavaStreamingContext with the directory set into it via the
 checkpoint(String) method.

 I'm currently not invoking the checkpoint(Duration) method on the DStream
 since I want to first rely on Spark's default checkpointing interval.  My
 streaming batch duration millis is set to 1 second.

 Anyone have any idea what might be going wrong?

 Also, at which point does Spark delete files from checkpointing?

 Thanks.