Hi , The latest changes with Kafka message re-play by manipulating ZK offset seems to be working fine for us. This gives us some relief till actual issue is fixed in Spark 1.2 .
I have some question on how Spark process the Received data. The logic I used is basically to pull messages form individual partitions using dedicated Receivers, and doing a Union of these Stream . After that I process this union stream. Today I wanted to test this consumer with our Internal Kafka cluster which has around 50 million records, with this huge backlog I found Spark only running the Receiver task and not running the Processing task (or rather doing it very slow) . Is this a issue with the Consumer or it is a issue from Spark side ? Ideally when Receivers durably write data to "Store" , the processing should start in parallel . Why does the processing task need to wait till the Receiver consumes all 50 million messages. ...Or may be I am doing something wrong ? I can share the driver log if you want. in Driver I can see only "storage.BlockManagerInfo: Added input..." type messages, but hardly I see "scheduler.TaskSetManager: Starting task..." messages.. I see data getting written to target system in very very slow pace. Regards, Dibyendu On Mon, Sep 8, 2014 at 12:08 AM, Dibyendu Bhattacharya < dibyendu.bhattach...@gmail.com> wrote: > Hi Tathagata, > > I have managed to implement the logic into the Kafka-Spark consumer to > recover from Driver failure. This is just a interim fix till actual fix is > done from Spark side. > > The logic is something like this. > > 1. When the Individual Receivers starts for every Topic partition, it > writes the Kafka messages along with certain meta data in Block Store. This > meta data contains the details of message offset, partition id, topic name > and consumer id. You can see this logic in PartitionManager.java next() > method. > > 2. In the Driver code ( Consumer.java) , I am creating the union of all > there individual D-Streams, and processing the data using forEachRDD call. > In the driver code, I am receiving the RDD which contains the Kafka > messages along with meta data details. In the driver code, periodically I > am committing the "processed" offset of the Kafka message into ZK. > > 3. When driver stops, and restart again, the Receiver starts again, and > this time in PartitionManager.java, I am checking what is the actual > "committed" offset for the partition, and what is the actual "processed" > offset of the same partition. This logic is in the PartitionManager > constructor. > > If this is a Receiver restart, and "processed" offset of less than > "Committed" offset, I am started fetching again from "Processed" offset. > This may lead to duplicate records, but our system can handle duplicates. > > I have tested with multiple driver kill/stops and I found no data loss in > Kafka consumer. > > In the Driver code, I have not done any "checkpointing" yet, will test > that tomorrow. > > > One interesting thing I found, if I do "repartition" of original stream , > I can still see the issue of data loss in this logic. What I believe, > during re- partitioning Spark might be changing the order of RDDs the way > it generated from Kafka stream. So during re-partition case, even when I am > committing processed offset, but as this is not in order I still see issue. > Not sure if this understanding is correct, but not able to find any other > explanation. > > But if I do not use repartition this solution works fine. > > I can make this as configurable, so that when actual fix is available , > this feature in consumer can be turned off as this is an overhead for the > consumer . Let me know what you think.. > > Regards, > Dibyendu > > > > > On Fri, Sep 5, 2014 at 11:14 PM, Tathagata Das < > tathagata.das1...@gmail.com> wrote: > >> Some thoughts on this thread to clarify the doubts. >> >> 1. Driver recovery: The current (1.1 to be released) does not recover the >> raw data that has been received but not processes. This is because when the >> driver dies, the executors die and so does the raw data that was stored in >> it. Only for HDFS, the data is not lost by driver recovery as the data is >> already present reliably in HDFS. This is something we want to fix by Spark >> 1.2 (3 month from now). Regarding recovery by replaying the data from >> Kafka, it is possible but tricky. Our goal is to provide strong guarantee, >> exactly-once semantics in all transformations. To guarantee this for all >> kinds of streaming computations stateful and not-stateful computations, it >> is requires that the data be replayed through Kafka in exactly same order, >> and the underlying blocks of data in Spark be regenerated in the exact way >> as it would have if there was no driver failure. This is quite tricky to >> implement, requires manipulation of zookeeper offsets, etc, that is hard to >> do with the high level consumer that KafkaUtil uses. Dibyendu's low level >> Kafka receiver may enable such approaches in the future. For now we >> definitely plan to solve the first problem very very soon. >> >> 3. Repartitioning: I am trying to understand the repartition issue. One >> common mistake I have seen is that developers repartition a stream but not >> use the repartitioned stream. >> >> WRONG: >> inputDstream.repartition(100) >> inputDstream.map(...).count().print() >> >> RIGHT: >> val repartitionedDStream = inputDStream.repartitoin(100) >> repartitionedDStream.map(...).count().print() >> >> Not sure if this helps solve the problem that you all the facing. I am >> going to add this to the stremaing programming guide to make sure this >> common mistake is avoided. >> >> TD >> >> >> >> >> On Wed, Sep 3, 2014 at 10:38 AM, Dibyendu Bhattacharya < >> dibyendu.bhattach...@gmail.com> wrote: >> >>> Hi, >>> >>> Sorry for little delay . As discussed in this thread, I have modified >>> the Kafka-Spark-Consumer ( >>> https://github.com/dibbhatt/kafka-spark-consumer) code to have >>> dedicated Receiver for every Topic Partition. You can see the example howto >>> create Union of these receivers in consumer.kafka.client.Consumer.java . >>> >>> Thanks to Chris for suggesting this change. >>> >>> Regards, >>> Dibyendu >>> >>> >>> On Mon, Sep 1, 2014 at 2:55 AM, RodrigoB <rodrigo.boav...@aspect.com> >>> wrote: >>> >>>> Just a comment on the recovery part. >>>> >>>> Is it correct to say that currently Spark Streaming recovery design >>>> does not >>>> consider re-computations (upon metadata lineage recovery) that depend on >>>> blocks of data of the received stream? >>>> https://issues.apache.org/jira/browse/SPARK-1647 >>>> >>>> Just to illustrate a real use case (mine): >>>> - We have object states which have a Duration field per state which is >>>> incremented on every batch interval. Also this object state is reset to >>>> 0 >>>> upon incoming state changing events. Let's supposed there is at least >>>> one >>>> event since the last data checkpoint. This will lead to inconsistency >>>> upon >>>> driver recovery: The Duration field will get incremented from the data >>>> checkpoint version until the recovery moment, but the state change event >>>> will never be re-processed...so in the end we have the old state with >>>> the >>>> wrong Duration value. >>>> To make things worst, let's imagine we're dumping the Duration increases >>>> somewhere...which means we're spreading the problem across our system. >>>> Re-computation awareness is something I've commented on another thread >>>> and >>>> rather treat it separately. >>>> >>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-checkpoint-recovery-causes-IO-re-execution-td12568.html#a13205 >>>> >>>> Re-computations do occur, but the only RDD's that are recovered are the >>>> ones >>>> from the data checkpoint. This is what we've seen. Is not enough by >>>> itself >>>> to ensure recovery of computed data and this partial recovery leads to >>>> inconsistency in some cases. >>>> >>>> Roger - I share the same question with you - I'm just not sure if the >>>> replicated data really gets persisted on every batch. The execution >>>> lineage >>>> is checkpointed, but if we have big chunks of data being consumed to >>>> Receiver node on let's say a second bases then having it persisted to >>>> HDFS >>>> every second could be a big challenge for keeping JVM performance - >>>> maybe >>>> that could be reason why it's not really implemented...assuming it >>>> isn't. >>>> >>>> Dibyendu had a great effort with the offset controlling code but the >>>> general >>>> state consistent recovery feels to me like another big issue to address. >>>> >>>> I plan on having a dive into the Streaming code and try to at least >>>> contribute with some ideas. Some more insight from anyone on the dev >>>> team >>>> will be very appreciated. >>>> >>>> tnks, >>>> Rod >>>> >>>> >>>> >>>> >>>> -- >>>> View this message in context: >>>> http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p13208.html >>>> Sent from the Apache Spark User List mailing list archive at Nabble.com. >>>> >>>> --------------------------------------------------------------------- >>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>> For additional commands, e-mail: user-h...@spark.apache.org >>>> >>>> >>> >> >