Hi Tathagata, I have managed to implement the logic into the Kafka-Spark consumer to recover from Driver failure. This is just a interim fix till actual fix is done from Spark side.
The logic is something like this. 1. When the Individual Receivers starts for every Topic partition, it writes the Kafka messages along with certain meta data in Block Store. This meta data contains the details of message offset, partition id, topic name and consumer id. You can see this logic in PartitionManager.java next() method. 2. In the Driver code ( Consumer.java) , I am creating the union of all there individual D-Streams, and processing the data using forEachRDD call. In the driver code, I am receiving the RDD which contains the Kafka messages along with meta data details. In the driver code, periodically I am committing the "processed" offset of the Kafka message into ZK. 3. When driver stops, and restart again, the Receiver starts again, and this time in PartitionManager.java, I am checking what is the actual "committed" offset for the partition, and what is the actual "processed" offset of the same partition. This logic is in the PartitionManager constructor. If this is a Receiver restart, and "processed" offset of less than "Committed" offset, I am started fetching again from "Processed" offset. This may lead to duplicate records, but our system can handle duplicates. I have tested with multiple driver kill/stops and I found no data loss in Kafka consumer. In the Driver code, I have not done any "checkpointing" yet, will test that tomorrow. One interesting thing I found, if I do "repartition" of original stream , I can still see the issue of data loss in this logic. What I believe, during re- partitioning Spark might be changing the order of RDDs the way it generated from Kafka stream. So during re-partition case, even when I am committing processed offset, but as this is not in order I still see issue. Not sure if this understanding is correct, but not able to find any other explanation. But if I do not use repartition this solution works fine. I can make this as configurable, so that when actual fix is available , this feature in consumer can be turned off as this is an overhead for the consumer . Let me know what you think.. Regards, Dibyendu On Fri, Sep 5, 2014 at 11:14 PM, Tathagata Das <tathagata.das1...@gmail.com> wrote: > Some thoughts on this thread to clarify the doubts. > > 1. Driver recovery: The current (1.1 to be released) does not recover the > raw data that has been received but not processes. This is because when the > driver dies, the executors die and so does the raw data that was stored in > it. Only for HDFS, the data is not lost by driver recovery as the data is > already present reliably in HDFS. This is something we want to fix by Spark > 1.2 (3 month from now). Regarding recovery by replaying the data from > Kafka, it is possible but tricky. Our goal is to provide strong guarantee, > exactly-once semantics in all transformations. To guarantee this for all > kinds of streaming computations stateful and not-stateful computations, it > is requires that the data be replayed through Kafka in exactly same order, > and the underlying blocks of data in Spark be regenerated in the exact way > as it would have if there was no driver failure. This is quite tricky to > implement, requires manipulation of zookeeper offsets, etc, that is hard to > do with the high level consumer that KafkaUtil uses. Dibyendu's low level > Kafka receiver may enable such approaches in the future. For now we > definitely plan to solve the first problem very very soon. > > 3. Repartitioning: I am trying to understand the repartition issue. One > common mistake I have seen is that developers repartition a stream but not > use the repartitioned stream. > > WRONG: > inputDstream.repartition(100) > inputDstream.map(...).count().print() > > RIGHT: > val repartitionedDStream = inputDStream.repartitoin(100) > repartitionedDStream.map(...).count().print() > > Not sure if this helps solve the problem that you all the facing. I am > going to add this to the stremaing programming guide to make sure this > common mistake is avoided. > > TD > > > > > On Wed, Sep 3, 2014 at 10:38 AM, Dibyendu Bhattacharya < > dibyendu.bhattach...@gmail.com> wrote: > >> Hi, >> >> Sorry for little delay . As discussed in this thread, I have modified the >> Kafka-Spark-Consumer ( https://github.com/dibbhatt/kafka-spark-consumer) >> code to have dedicated Receiver for every Topic Partition. You can see the >> example howto create Union of these receivers >> in consumer.kafka.client.Consumer.java . >> >> Thanks to Chris for suggesting this change. >> >> Regards, >> Dibyendu >> >> >> On Mon, Sep 1, 2014 at 2:55 AM, RodrigoB <rodrigo.boav...@aspect.com> >> wrote: >> >>> Just a comment on the recovery part. >>> >>> Is it correct to say that currently Spark Streaming recovery design does >>> not >>> consider re-computations (upon metadata lineage recovery) that depend on >>> blocks of data of the received stream? >>> https://issues.apache.org/jira/browse/SPARK-1647 >>> >>> Just to illustrate a real use case (mine): >>> - We have object states which have a Duration field per state which is >>> incremented on every batch interval. Also this object state is reset to 0 >>> upon incoming state changing events. Let's supposed there is at least one >>> event since the last data checkpoint. This will lead to inconsistency >>> upon >>> driver recovery: The Duration field will get incremented from the data >>> checkpoint version until the recovery moment, but the state change event >>> will never be re-processed...so in the end we have the old state with the >>> wrong Duration value. >>> To make things worst, let's imagine we're dumping the Duration increases >>> somewhere...which means we're spreading the problem across our system. >>> Re-computation awareness is something I've commented on another thread >>> and >>> rather treat it separately. >>> >>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-checkpoint-recovery-causes-IO-re-execution-td12568.html#a13205 >>> >>> Re-computations do occur, but the only RDD's that are recovered are the >>> ones >>> from the data checkpoint. This is what we've seen. Is not enough by >>> itself >>> to ensure recovery of computed data and this partial recovery leads to >>> inconsistency in some cases. >>> >>> Roger - I share the same question with you - I'm just not sure if the >>> replicated data really gets persisted on every batch. The execution >>> lineage >>> is checkpointed, but if we have big chunks of data being consumed to >>> Receiver node on let's say a second bases then having it persisted to >>> HDFS >>> every second could be a big challenge for keeping JVM performance - maybe >>> that could be reason why it's not really implemented...assuming it isn't. >>> >>> Dibyendu had a great effort with the offset controlling code but the >>> general >>> state consistent recovery feels to me like another big issue to address. >>> >>> I plan on having a dive into the Streaming code and try to at least >>> contribute with some ideas. Some more insight from anyone on the dev team >>> will be very appreciated. >>> >>> tnks, >>> Rod >>> >>> >>> >>> >>> -- >>> View this message in context: >>> http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p13208.html >>> Sent from the Apache Spark User List mailing list archive at Nabble.com. >>> >>> --------------------------------------------------------------------- >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>> For additional commands, e-mail: user-h...@spark.apache.org >>> >>> >> >