It looks like there's an issue with the 'Parameters' pojo I'm using within my driver program. For some reason that needs to be serializable, which is odd.
java.io.NotSerializableException: com.kona.consumer.kafka.spark.Parameters Giving it another whirl though having to make it serializable seems odd to me.. On Fri, Jul 31, 2015 at 1:52 PM, Sean Owen <so...@cloudera.com> wrote: > If you've set the checkpoint dir, it seems like indeed the intent is > to use a default checkpoint interval in DStream: > > private[streaming] def initialize(time: Time) { > ... > // Set the checkpoint interval to be slideDuration or 10 seconds, > which ever is larger > if (mustCheckpoint && checkpointDuration == null) { > checkpointDuration = slideDuration * math.ceil(Seconds(10) / > slideDuration).toInt > logInfo("Checkpoint interval automatically set to " + > checkpointDuration) > } > > Do you see that log message? what's the interval? that could at least > explain why it's not doing anything, if it's quite long. > > It sort of seems wrong though since > https://spark.apache.org/docs/latest/streaming-programming-guide.html > suggests it was intended to be a multiple of the batch interval. The > slide duration wouldn't always be relevant anyway. > > On Fri, Jul 31, 2015 at 6:16 PM, Dmitry Goldenberg > <dgoldenberg...@gmail.com> wrote: > > I've instrumented checkpointing per the programming guide and I can tell > > that Spark Streaming is creating the checkpoint directories but I'm not > > seeing any content being created in those directories nor am I seeing the > > effects I'd expect from checkpointing. I'd expect any data that comes > into > > Kafka while the consumers are down, to get picked up when the consumers > are > > restarted; I'm not seeing that. > > > > For now my checkpoint directory is set to the local file system with the > > directory URI being in this form: file:///mnt/dir1/dir2. I see a > > subdirectory named with a UUID being created under there but no files. > > > > I'm using a custom JavaStreamingContextFactory which creates a > > JavaStreamingContext with the directory set into it via the > > checkpoint(String) method. > > > > I'm currently not invoking the checkpoint(Duration) method on the DStream > > since I want to first rely on Spark's default checkpointing interval. My > > streaming batch duration millis is set to 1 second. > > > > Anyone have any idea what might be going wrong? > > > > Also, at which point does Spark delete files from checkpointing? > > > > Thanks. >