Re: Spark Kinesis Checkpointing/Processing Delay

2015-08-10 Thread Tathagata Das
You are correct. The earlier Kinesis receiver (as of Spark 1.4) was not
saving checkpoints correctly and was in general not reliable (even with WAL
enabled). We have improved this in Spark 1.5 with updated Kinesis receiver,
that keeps track of the Kinesis sequence numbers as part of the Spark
Streaming's DStream checkpointing, and the KCL checkpoint is updated only
after the sequence number has been written to the DStream checkpoints. This
allows a recovered streaming program (that is, restart from checkpoint) to
recover the sequence numbers from the checkpoint information and
reprocessed the corresponding records (those which had not been
successfully processed). This will give better guarantees.

If you are interested to learn more, see the JIRA:
https://issues.apache.org/jira/browse/SPARK-9215

Related to this, for your scenarios, you should be setting rate limits
(spark.streaming.rateLimit) to prevent spark from receiving data faster
that it can process.

On Mon, Aug 10, 2015 at 4:40 PM, Phil Kallos  wrote:

> Hi! Sorry if this is a repost.
>
> I'm using Spark + Kinesis ASL to process and persist stream data to
> ElasticSearch. For the most part it works nicely.
>
> There is a subtle issue I'm running into about how failures are handled.
>
> For example's sake, let's say I am processing a Kinesis stream that
> produces 400 records per second, continuously.
>
> Kinesis provides a 24hr buffer of data, and I'm setting my Kinesis DStream
> consumer to use "TRIM_HORIZON", to mean "go as far back as possible and
> start processing the stream data as quickly as possible, until you catch up
> to the tip of the stream".
>
> This means that for some period of time, Spark will suck in data from
> Kinesis as quickly as it can, let's say at 5000 records per second.
>
> In my specific case, ElasticSearch can gracefully handle 400 writes per
> second, but is NOT happy to process 5000 writes per second. Let's say it
> only handles 2000 wps. This means that the processing time will exceed the
> batch time, scheduling delay in the stream will rise consistently and
> batches of data will get "backlogged" for some period of time.
>
> In normal circumstances, this is fine. When the Spark consumers catch up
> to "real-time", the data input rate slows to 400rps and the backlogged
> batches eventually get flushed to ES. The system stabilizes.
>
> However! It appears to me that the Kinesis consumer actively submits
> checkpoints, even though the records may not have been processed yet (since
> they are backlogged). If for some reason there is processing delay and the
> Spark process crashes, the checkpoint will have advanced too far. If I
> resume the Spark Streaming process, there is essentially a gap in my
> ElasticSearch data.
>
> In principle I understand the reason for this, but is there a way to
> adjust this behavior? Or is there another way to handle this specific
> problem? Ideally I would be able to configure the process to only submit
> Kinesis checkpoints only after my data is successfully written to ES.
>
> Thanks,
> Phil
>
>


Spark Kinesis Checkpointing/Processing Delay

2015-08-10 Thread Phil Kallos
Hi! Sorry if this is a repost.

I'm using Spark + Kinesis ASL to process and persist stream data to
ElasticSearch. For the most part it works nicely.

There is a subtle issue I'm running into about how failures are handled.

For example's sake, let's say I am processing a Kinesis stream that
produces 400 records per second, continuously.

Kinesis provides a 24hr buffer of data, and I'm setting my Kinesis DStream
consumer to use "TRIM_HORIZON", to mean "go as far back as possible and
start processing the stream data as quickly as possible, until you catch up
to the tip of the stream".

This means that for some period of time, Spark will suck in data from
Kinesis as quickly as it can, let's say at 5000 records per second.

In my specific case, ElasticSearch can gracefully handle 400 writes per
second, but is NOT happy to process 5000 writes per second. Let's say it
only handles 2000 wps. This means that the processing time will exceed the
batch time, scheduling delay in the stream will rise consistently and
batches of data will get "backlogged" for some period of time.

In normal circumstances, this is fine. When the Spark consumers catch up to
"real-time", the data input rate slows to 400rps and the backlogged batches
eventually get flushed to ES. The system stabilizes.

However! It appears to me that the Kinesis consumer actively submits
checkpoints, even though the records may not have been processed yet (since
they are backlogged). If for some reason there is processing delay and the
Spark process crashes, the checkpoint will have advanced too far. If I
resume the Spark Streaming process, there is essentially a gap in my
ElasticSearch data.

In principle I understand the reason for this, but is there a way to adjust
this behavior? Or is there another way to handle this specific problem?
Ideally I would be able to configure the process to only submit Kinesis
checkpoints only after my data is successfully written to ES.

Thanks,
Phil


Re: Spark Kinesis Checkpointing/Processing Delay

2015-08-06 Thread Patanachai Tangchaisin

Hi,

I actually run into the same problem although our endpoint is not 
ElasticSearch. When the spark job is dead, we lose some data because 
Kinesis checkpoint is already beyond the last point that spark is processed.


Currently, our workaround is to use spark's checkpoint mechanism with 
write ahead log (WAL)


https://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing
https://spark.apache.org/docs/latest/streaming-programming-guide.html#deploying-applications

Using checkpointing comes with some disadvantage like application code 
is not upgradable, etc.


I believe there is some work to fix this problem like Kafka direct API.
Not sure if this is it : https://issues.apache.org/jira/browse/SPARK-9215

Thanks,
Patanachai


On 08/06/2015 12:08 PM, phibit wrote:

Hi! I'm using Spark + Kinesis ASL to process and persist stream data to
ElasticSearch. For the most part it works nicely.

There is a subtle issue I'm running into about how failures are handled.

For example's sake, let's say I am processing a Kinesis stream that produces
400 records per second, continuously.

Kinesis provides a 24hr buffer of data, and I'm setting my Kinesis DStream
consumer to use "TRIM_HORIZON", to mean "go as far back as possible and
start processing the stream data as quickly as possible, until you catch up
to the tip of the stream".

This means that for some period of time, Spark will suck in data from
Kinesis as quickly as it can, let's say at 5000 records per second.

In my specific case, ElasticSearch can gracefully handle 400 writes per
second, but is NOT happy to process 5000 writes per second. Let's say it
only handles 2000 wps. This means that the processing time will exceed the
batch time, scheduling delay in the stream will rise consistently and
batches of data will get "backlogged" for some period of time.

In normal circumstances, this is fine. When the Spark consumers catch up to
"real-time", the data input rate slows to 400rps and the backlogged batches
eventually get flushed to ES. The system stabilizes.

However! It appears to me that the Kinesis consumer actively submits
checkpoints, even though the records may not have been processed yet (since
they are backlogged). If for some reason there is processing delay and the
Spark process crashes, the checkpoint will have advanced too far. If I
resume the Spark Streaming process, there is essentially a gap in my
ElasticSearch data.

In principle I understand the reason for this, but is there a way to adjust
this behavior? Or is there another way to handle this specific problem?
Ideally I would be able to configure the process to only submit Kinesis
checkpoints only after my data is successfully written to ES.

Thanks,
Phil





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Kinesis-Checkpointing-Processing-Delay-tp24157.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



--
Patanachai


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark Kinesis Checkpointing/Processing Delay

2015-08-06 Thread phibit
Hi! I'm using Spark + Kinesis ASL to process and persist stream data to
ElasticSearch. For the most part it works nicely.

There is a subtle issue I'm running into about how failures are handled.

For example's sake, let's say I am processing a Kinesis stream that produces
400 records per second, continuously.

Kinesis provides a 24hr buffer of data, and I'm setting my Kinesis DStream
consumer to use "TRIM_HORIZON", to mean "go as far back as possible and
start processing the stream data as quickly as possible, until you catch up
to the tip of the stream".

This means that for some period of time, Spark will suck in data from
Kinesis as quickly as it can, let's say at 5000 records per second.

In my specific case, ElasticSearch can gracefully handle 400 writes per
second, but is NOT happy to process 5000 writes per second. Let's say it
only handles 2000 wps. This means that the processing time will exceed the
batch time, scheduling delay in the stream will rise consistently and
batches of data will get "backlogged" for some period of time.

In normal circumstances, this is fine. When the Spark consumers catch up to
"real-time", the data input rate slows to 400rps and the backlogged batches
eventually get flushed to ES. The system stabilizes.

However! It appears to me that the Kinesis consumer actively submits
checkpoints, even though the records may not have been processed yet (since
they are backlogged). If for some reason there is processing delay and the
Spark process crashes, the checkpoint will have advanced too far. If I
resume the Spark Streaming process, there is essentially a gap in my
ElasticSearch data.

In principle I understand the reason for this, but is there a way to adjust
this behavior? Or is there another way to handle this specific problem?
Ideally I would be able to configure the process to only submit Kinesis
checkpoints only after my data is successfully written to ES.

Thanks,
Phil





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Kinesis-Checkpointing-Processing-Delay-tp24157.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org