Hi ,

The latest changes with Kafka message re-play by manipulating ZK offset
seems to be working fine for us. This gives us some relief till actual
issue is fixed in Spark 1.2 .

I have some question on how Spark process the Received data. The logic I
used is basically to pull messages form individual partitions using
dedicated Receivers, and doing a Union of these Stream . After that I
process this union stream.

Today I wanted to test this consumer with our Internal Kafka cluster which
has around 50 million records, with this huge backlog I found Spark only
running the Receiver task and not running the Processing task (or rather
doing it very slow) . Is this a issue with the Consumer or it is a issue
from Spark side ? Ideally when Receivers durably write data to "Store" ,
the processing should start in parallel . Why does the processing task need
to wait till the Receiver consumes all 50 million messages. ...Or may be I
am doing something wrong ? I can share the driver log if you want.

in Driver I can see only "storage.BlockManagerInfo: Added input..." type
messages, but hardly I see "scheduler.TaskSetManager: Starting task..."
messages.. I see data getting written to target system in very very slow
pace.


Regards,
Dibyendu






On Mon, Sep 8, 2014 at 12:08 AM, Dibyendu Bhattacharya <
dibyendu.bhattach...@gmail.com> wrote:

> Hi Tathagata,
>
> I have managed to implement the logic into the Kafka-Spark consumer to
> recover from Driver failure. This is just a interim fix till actual fix is
> done from Spark side.
>
> The logic is something like this.
>
> 1. When the Individual Receivers starts for every Topic partition, it
> writes the Kafka messages along with certain meta data in Block Store. This
> meta data contains the details of message offset, partition id, topic name
> and consumer id. You can see this logic in PartitionManager.java  next()
> method.
>
> 2.  In the Driver code ( Consumer.java) , I am creating the union of all
> there individual D-Streams, and processing the data using forEachRDD call.
> In the driver code, I am receiving the RDD which contains the Kafka
> messages along with meta data details. In the driver code, periodically I
> am committing the "processed" offset of the Kafka message into ZK.
>
> 3. When driver stops, and restart again, the Receiver starts again, and
> this time in PartitionManager.java, I am checking what is the actual
> "committed" offset for the partition, and what is the actual "processed"
> offset of the same partition. This logic is in the PartitionManager
> constructor.
>
> If this is a Receiver restart, and "processed" offset of less than
> "Committed" offset, I am started fetching again from "Processed" offset.
> This may lead to duplicate records, but our system can handle duplicates.
>
> I have tested with multiple driver kill/stops and I found no data loss in
> Kafka consumer.
>
> In the Driver code, I have not done any "checkpointing" yet, will test
> that tomorrow.
>
>
> One interesting thing I found, if I do "repartition" of original stream ,
> I can still see the issue of data loss in this logic. What I believe,
> during re- partitioning Spark might be changing the order of RDDs the way
> it generated from Kafka stream. So during re-partition case, even when I am
> committing processed offset, but as this is not in order I still see issue.
> Not sure if this understanding is correct, but not able to find any other
> explanation.
>
> But if I do not use repartition this solution works fine.
>
> I can make this as configurable, so that when actual fix is available ,
> this feature in consumer can be turned off as this is an overhead for the
> consumer . Let me know what you think..
>
> Regards,
> Dibyendu
>
>
>
>
> On Fri, Sep 5, 2014 at 11:14 PM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>> Some thoughts on this thread to clarify the doubts.
>>
>> 1. Driver recovery: The current (1.1 to be released) does not recover the
>> raw data that has been received but not processes. This is because when the
>> driver dies, the executors die and so does the raw data that was stored in
>> it. Only for HDFS, the data is not lost by driver recovery as the data is
>> already present reliably in HDFS. This is something we want to fix by Spark
>> 1.2 (3 month from now). Regarding recovery by replaying the data from
>> Kafka, it is possible but tricky. Our goal is to provide strong guarantee,
>> exactly-once semantics in all transformations. To guarantee this for all
>> kinds of streaming computations stateful and not-stateful computations, it
>> is requires that the data be replayed through Kafka in exactly same order,
>> and the underlying blocks of data in Spark be regenerated in the exact way
>> as it would have if there was no driver failure. This is quite tricky to
>> implement, requires manipulation of zookeeper offsets, etc, that is hard to
>> do with the high level consumer that KafkaUtil uses. Dibyendu's low level
>> Kafka receiver may enable such approaches in the future. For now we
>> definitely plan to solve the first problem very very soon.
>>
>> 3. Repartitioning: I am trying to understand the repartition issue. One
>> common mistake I have seen is that developers repartition a stream but not
>> use the repartitioned stream.
>>
>> WRONG:
>> inputDstream.repartition(100)
>> inputDstream.map(...).count().print()
>>
>> RIGHT:
>> val repartitionedDStream = inputDStream.repartitoin(100)
>> repartitionedDStream.map(...).count().print()
>>
>> Not sure if this helps solve the problem that you all the facing. I am
>> going to add this to the stremaing programming guide to make sure this
>> common mistake is avoided.
>>
>> TD
>>
>>
>>
>>
>> On Wed, Sep 3, 2014 at 10:38 AM, Dibyendu Bhattacharya <
>> dibyendu.bhattach...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> Sorry for little delay . As discussed in this thread, I have modified
>>> the Kafka-Spark-Consumer (
>>> https://github.com/dibbhatt/kafka-spark-consumer) code to have
>>> dedicated Receiver for every Topic Partition. You can see the example howto
>>> create Union of these receivers in consumer.kafka.client.Consumer.java .
>>>
>>> Thanks to Chris for suggesting this change.
>>>
>>> Regards,
>>> Dibyendu
>>>
>>>
>>> On Mon, Sep 1, 2014 at 2:55 AM, RodrigoB <rodrigo.boav...@aspect.com>
>>> wrote:
>>>
>>>> Just a comment on the recovery part.
>>>>
>>>> Is it correct to say that currently Spark Streaming recovery design
>>>> does not
>>>> consider re-computations (upon metadata lineage recovery) that depend on
>>>> blocks of data of the received stream?
>>>> https://issues.apache.org/jira/browse/SPARK-1647
>>>>
>>>> Just to illustrate a real use case (mine):
>>>> - We have object states which have a Duration field per state which is
>>>> incremented on every batch interval. Also this object state is reset to
>>>> 0
>>>> upon incoming state changing events. Let's supposed there is at least
>>>> one
>>>> event since the last data checkpoint. This will lead to inconsistency
>>>> upon
>>>> driver recovery: The Duration field will get incremented from the data
>>>> checkpoint version until the recovery moment, but the state change event
>>>> will never be re-processed...so in the end we have the old state with
>>>> the
>>>> wrong Duration value.
>>>> To make things worst, let's imagine we're dumping the Duration increases
>>>> somewhere...which means we're spreading the problem across our system.
>>>> Re-computation awareness is something I've commented on another thread
>>>> and
>>>> rather treat it separately.
>>>>
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-checkpoint-recovery-causes-IO-re-execution-td12568.html#a13205
>>>>
>>>> Re-computations do occur, but the only RDD's that are recovered are the
>>>> ones
>>>> from the data checkpoint. This is what we've seen. Is not enough by
>>>> itself
>>>> to ensure recovery of computed data and this partial recovery leads to
>>>> inconsistency in some cases.
>>>>
>>>> Roger - I share the same question with you - I'm just not sure if the
>>>> replicated data really gets persisted on every batch. The execution
>>>> lineage
>>>> is checkpointed, but if we have big chunks of data being consumed to
>>>> Receiver node on let's say a second bases then having it persisted to
>>>> HDFS
>>>> every second could be a big challenge for keeping JVM performance -
>>>> maybe
>>>> that could be reason why it's not really implemented...assuming it
>>>> isn't.
>>>>
>>>> Dibyendu had a great effort with the offset controlling code but the
>>>> general
>>>> state consistent recovery feels to me like another big issue to address.
>>>>
>>>> I plan on having a dive into the Streaming code and try to at least
>>>> contribute with some ideas. Some more insight from anyone on the dev
>>>> team
>>>> will be very appreciated.
>>>>
>>>> tnks,
>>>> Rod
>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context:
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p13208.html
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>>
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>
>>>>
>>>
>>
>

Reply via email to