Comments inline: > On 5 Jun 2017, at 18:19, Jan Filipiak <jan.filip...@trivago.com> wrote: > > Hi > > just my few thoughts > > On 05.06.2017 11:44, Eno Thereska wrote: >> Hi there, >> >> Sorry for the late reply, I was out this past week. Looks like good progress >> was made with the discussions either way. Let me recap a couple of points I >> saw into one big reply: >> >> 1. Jan mentioned CRC errors. I think this is a good point. As these happen >> in Kafka, before Kafka Streams gets a chance to inspect anything, I'd like >> to hear the opinion of more Kafka folks like Ismael or Jason on this one. >> Currently the documentation is not great with what to do once a CRC check >> has failed. From looking at the code, it looks like the client gets a >> KafkaException (bubbled up from the fetcher) and currently we in streams >> catch this as part of poll() and fail. It might be advantageous to treat CRC >> handling in a similar way to serialisation handling (e.g., have the option >> to fail/skip). Let's see what the other folks say. Worst-case we can do a >> separate KIP for that if it proved too hard to do in one go. > there is no reasonable way to "skip" a crc error. How can you know the length > you read was anything reasonable? you might be completely lost inside your > response.
On the client side, every record received is checked for validity. As it happens, if the CRC check fails the exception is wrapped with a KafkaException that is thrown all the way to poll(). Assuming we change that and poll() throws a CRC exception, I was thinking we could treat it similarly to a deserialize exception and pass it to the exception handler to decide what to do. Default would be to fail. This might need a Kafka KIP btw and can be done separately from this KIP, but Jan, would you find this useful? >> >> >> At a minimum, handling this type of exception will need to involve the >> exactly-once (EoS) logic. We'd still allow the option of failing or >> skipping, but EoS would need to clean up by rolling back all the side >> effects from the processing so far. Matthias, how does this sound? > Eos will not help the record might be 5,6 repartitions down into the > topology. I haven't followed but I pray you made EoS optional! We don't need > this and we don't want this and we will turn it off if it comes. So I > wouldn't recommend relying on it. The option to turn it off is better than > forcing it and still beeing unable to rollback badpills (as explained before) >> Yeah as Matthias mentioned EoS is optional. Thanks, Eno >> 6. Will add an end-to-end example as Michael suggested. >> >> Thanks >> Eno >> >> >> >>> On 4 Jun 2017, at 02:35, Matthias J. Sax <matth...@confluent.io> wrote: >>> >>> What I don't understand is this: >>> >>>> From there on its the easiest way forward: fix, redeploy, start => done >>> If you have many producers that work fine and a new "bad" producer >>> starts up and writes bad data into your input topic, your Streams app >>> dies but all your producers, including the bad one, keep writing. >>> >>> Thus, how would you fix this, as you cannot "remove" the corrupted date >>> from the topic? It might take some time to identify the root cause and >>> stop the bad producer. Up to this point you get good and bad data into >>> your Streams input topic. If Streams app in not able to skip over those >>> bad records, how would you get all the good data from the topic? Not >>> saying it's not possible, but it's extra work copying the data with a >>> new non-Streams consumer-producer-app into a new topic and than feed >>> your Streams app from this new topic -- you also need to update all your >>> upstream producers to write to the new topic. >>> >>> Thus, if you want to fail fast, you can still do this. And after you >>> detected and fixed the bad producer you might just reconfigure your app >>> to skip bad records until it reaches the good part of the data. >>> Afterwards, you could redeploy with fail-fast again. >>> >>> >>> Thus, for this pattern, I actually don't see any reason why to stop the >>> Streams app at all. If you have a callback, and use the callback to >>> raise an alert (and maybe get the bad data into a bad record queue), it >>> will not take longer to identify and stop the "bad" producer. But for >>> this case, you have zero downtime for your Streams app. >>> >>> This seems to be much simpler. Or do I miss anything? >>> >>> >>> Having said this, I agree that the "threshold based callback" might be >>> questionable. But as you argue for strict "fail-fast", I want to argue >>> that this must not always be the best pattern to apply and that the >>> overall KIP idea is super useful from my point of view. >>> >>> >>> -Matthias >>> >>> >>> On 6/3/17 11:57 AM, Jan Filipiak wrote: >>>> Could not agree more! >>>> >>>> But then I think the easiest is still: print exception and die. >>>> From there on its the easiest way forward: fix, redeploy, start => done >>>> >>>> All the other ways to recover a pipeline that was processing partially >>>> all the time >>>> and suddenly went over a "I cant take it anymore" threshold is not >>>> straight forward IMO. >>>> >>>> How to find the offset, when it became to bad when it is not the latest >>>> commited one? >>>> How to reset there? with some reasonable stuff in your rockses? >>>> >>>> If one would do the following. The continuing Handler would measure for >>>> a threshold and >>>> would terminate after a certain threshold has passed (per task). Then >>>> one can use offset commit/ flush intervals >>>> to make reasonable assumption of how much is slipping by + you get an >>>> easy recovery when it gets to bad >>>> + you could also account for "in processing" records. >>>> >>>> Setting this threshold to zero would cover all cases with 1 >>>> implementation. It is still beneficial to have it pluggable >>>> >>>> Again CRC-Errors are the only bad pills we saw in production for now. >>>> >>>> Best Jan >>>> >>>> >>>> On 02.06.2017 17:37, Jay Kreps wrote: >>>>> Jan, I agree with you philosophically. I think one practical challenge >>>>> has >>>>> to do with data formats. Many people use untyped events, so there is >>>>> simply >>>>> no guarantee on the form of the input. E.g. many companies use JSON >>>>> without >>>>> any kind of schema so it becomes very hard to assert anything about the >>>>> input which makes these programs very fragile to the "one accidental >>>>> message publication that creates an unsolvable problem. >>>>> >>>>> For that reason I do wonder if limiting to just serialization actually >>>>> gets >>>>> you a useful solution. For JSON it will help with the problem of >>>>> non-parseable JSON, but sounds like it won't help in the case where the >>>>> JSON is well-formed but does not have any of the fields you expect and >>>>> depend on for your processing. I expect the reason for limiting the scope >>>>> is it is pretty hard to reason about correctness for anything that >>>>> stops in >>>>> the middle of processing an operator DAG? >>>>> >>>>> -Jay >>>>> >>>>> On Fri, Jun 2, 2017 at 4:50 AM, Jan Filipiak <jan.filip...@trivago.com> >>>>> wrote: >>>>> >>>>>> IMHO your doing it wrong then. + building to much support into the kafka >>>>>> eco system is very counterproductive in fostering a happy userbase >>>>>> >>>>>> >>>>>> >>>>>> On 02.06.2017 13:15, Damian Guy wrote: >>>>>> >>>>>>> Jan, you have a choice to Fail fast if you want. This is about giving >>>>>>> people options and there are times when you don't want to fail fast. >>>>>>> >>>>>>> >>>>>>> On Fri, 2 Jun 2017 at 11:00 Jan Filipiak <jan.filip...@trivago.com> >>>>>>> wrote: >>>>>>> >>>>>>> Hi >>>>>>>> 1. >>>>>>>> That greatly complicates monitoring. Fail Fast gives you that when >>>>>>>> you >>>>>>>> monitor only the lag of all your apps >>>>>>>> you are completely covered. With that sort of new application >>>>>>>> Monitoring >>>>>>>> is very much more complicated as >>>>>>>> you know need to monitor fail % of some special apps aswell. In my >>>>>>>> opinion that is a huge downside already. >>>>>>>> >>>>>>>> 2. >>>>>>>> using a schema regerstry like Avrostuff it might not even be the >>>>>>>> record >>>>>>>> that is broken, it might be just your app >>>>>>>> unable to fetch a schema it needs now know. Maybe you got partitioned >>>>>>>> away from that registry. >>>>>>>> >>>>>>>> 3. When you get alerted because of to high fail percentage. what >>>>>>>> are the >>>>>>>> steps you gonna do? >>>>>>>> shut it down to buy time. fix the problem. spend way to much time to >>>>>>>> find a good reprocess offset. >>>>>>>> Your timewindows are in bad shape anyways, and you pretty much lost. >>>>>>>> This routine is nonsense. >>>>>>>> >>>>>>>> Dead letter queues would be the worst possible addition to the kafka >>>>>>>> toolkit that I can think of. It just doesn't fit the architecture >>>>>>>> of having clients falling behind is a valid option. >>>>>>>> >>>>>>>> Further. I mentioned already the only bad pill ive seen so far is crc >>>>>>>> errors. any plans for those? >>>>>>>> >>>>>>>> Best Jan >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On 02.06.2017 11:34, Damian Guy wrote: >>>>>>>> >>>>>>>>> I agree with what Matthias has said w.r.t failing fast. There are >>>>>>>>> plenty >>>>>>>>> >>>>>>>> of >>>>>>>> >>>>>>>>> times when you don't want to fail-fast and must attempt to make >>>>>>>>> >>>>>>>> progress. >>>>>>>> >>>>>>>>> The dead-letter queue is exactly for these circumstances. Of >>>>>>>>> course if >>>>>>>>> every record is failing, then you probably do want to give up. >>>>>>>>> >>>>>>>>> On Fri, 2 Jun 2017 at 07:56 Matthias J. Sax <matth...@confluent.io> >>>>>>>>> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> First a meta comment. KIP discussion should take place on the dev >>>>>>>>> list >>>>>>>>>> -- if user list is cc'ed please make sure to reply to both lists. >>>>>>>>>> >>>>>>>>> Thanks. >>>>>>>>> Thanks for making the scope of the KIP clear. Makes a lot of sense to >>>>>>>>>> focus on deserialization exceptions for now. >>>>>>>>>> >>>>>>>>>> With regard to corrupted state stores, would it make sense to fail a >>>>>>>>>> task and wipe out the store to repair it via recreation from the >>>>>>>>>> changelog? That's of course a quite advance pattern, but I want to >>>>>>>>>> bring >>>>>>>>>> it up to design the first step in a way such that we can get >>>>>>>>>> there (if >>>>>>>>>> we think it's a reasonable idea). >>>>>>>>>> >>>>>>>>>> I also want to comment about fail fast vs making progress. I >>>>>>>>>> think that >>>>>>>>>> fail-fast must not always be the best option. The scenario I have in >>>>>>>>>> mind is like this: you got a bunch of producers that feed the >>>>>>>>>> Streams >>>>>>>>>> input topic. Most producers work find, but maybe one producer miss >>>>>>>>>> behaves and the data it writes is corrupted. You might not even >>>>>>>>>> be able >>>>>>>>>> to recover this lost data at any point -- thus, there is no >>>>>>>>>> reason to >>>>>>>>>> stop processing but you just skip over those records. Of course, you >>>>>>>>>> need to fix the root cause, and thus you need to alert (either >>>>>>>>>> via logs >>>>>>>>>> of the exception handler directly) and you need to start to >>>>>>>>>> investigate >>>>>>>>>> to find the bad producer, shut it down and fix it. >>>>>>>>>> >>>>>>>>>> Here the dead letter queue comes into place. From my >>>>>>>>>> understanding, the >>>>>>>>>> purpose of this feature is solely enable post debugging. I don't >>>>>>>>>> think >>>>>>>>>> those record would be fed back at any point in time (so I don't >>>>>>>>>> see any >>>>>>>>>> ordering issue -- a skipped record, with this regard, is just "fully >>>>>>>>>> processed"). Thus, the dead letter queue should actually encode the >>>>>>>>>> original records metadata (topic, partition offset etc) to enable >>>>>>>>>> such >>>>>>>>>> debugging. I guess, this might also be possible if you just log >>>>>>>>>> the bad >>>>>>>>>> records, but it would be harder to access (you first must find the >>>>>>>>>> Streams instance that did write the log and extract the information >>>>>>>>>> from >>>>>>>>>> there). Reading it from topic is much simpler. >>>>>>>>>> >>>>>>>>>> I also want to mention the following. Assume you have such a >>>>>>>>>> topic with >>>>>>>>>> some bad records and some good records. If we always fail-fast, it's >>>>>>>>>> going to be super hard to process the good data. You would need to >>>>>>>>>> write >>>>>>>>>> an extra app that copied the data into a new topic filtering out the >>>>>>>>>> bad >>>>>>>>>> records (or apply the map() workaround withing stream). So I don't >>>>>>>>>> think >>>>>>>>>> that failing fast is most likely the best option in production is >>>>>>>>>> necessarily, true. >>>>>>>>>> >>>>>>>>>> Or do you think there are scenarios, for which you can recover the >>>>>>>>>> corrupted records successfully? And even if this is possible, it >>>>>>>>>> might >>>>>>>>>> be a case for reprocessing instead of failing the whole application? >>>>>>>>>> Also, if you think you can "repair" a corrupted record, should the >>>>>>>>>> handler allow to return a "fixed" record? This would solve the >>>>>>>>>> ordering >>>>>>>>>> problem. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -Matthias >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On 5/30/17 1:47 AM, Michael Noll wrote: >>>>>>>>>> >>>>>>>>>>> Thanks for your work on this KIP, Eno -- much appreciated! >>>>>>>>>>> >>>>>>>>>>> - I think it would help to improve the KIP by adding an end-to-end >>>>>>>>>>> code >>>>>>>>>>> example that demonstrates, with the DSL and with the Processor API, >>>>>>>>>>> how >>>>>>>>>>> >>>>>>>>>> the >>>>>>>>>> >>>>>>>>>>> user would write a simple application that would then be augmented >>>>>>>>>>> with >>>>>>>>>>> >>>>>>>>>> the >>>>>>>>>> >>>>>>>>>>> proposed KIP changes to handle exceptions. It should also >>>>>>>>>>> become much >>>>>>>>>>> clearer then that e.g. the KIP would lead to different code >>>>>>>>>>> paths for >>>>>>>>>>> >>>>>>>>>> the >>>>>>>>> happy case and any failure scenarios. >>>>>>>>>>> - Do we have sufficient information available to make informed >>>>>>>>>>> >>>>>>>>>> decisions >>>>>>>>> on >>>>>>>>>>> what to do next? For example, do we know in which part of the >>>>>>>>>>> topology >>>>>>>>>>> >>>>>>>>>> the >>>>>>>>>> >>>>>>>>>>> record failed? `ConsumerRecord` gives us access to topic, >>>>>>>>>>> partition, >>>>>>>>>>> offset, timestamp, etc., but what about topology-related >>>>>>>>>>> information >>>>>>>>>>> >>>>>>>>>> (e.g. >>>>>>>>>> >>>>>>>>>>> what is the associated state store, if any)? >>>>>>>>>>> >>>>>>>>>>> - Only partly on-topic for the scope of this KIP, but this is about >>>>>>>>>>> the >>>>>>>>>>> bigger picture: This KIP would give users the option to send >>>>>>>>>>> corrupted >>>>>>>>>>> records to dead letter queue (quarantine topic). But, what pattern >>>>>>>>>>> >>>>>>>>>> would >>>>>>>>> we advocate to process such a dead letter queue then, e.g. how to >>>>>>>>> allow >>>>>>>>>> for >>>>>>>>>> >>>>>>>>>>> retries with backoff ("If the first record in the dead letter queue >>>>>>>>>>> >>>>>>>>>> fails >>>>>>>>> again, then try the second record for the time being and go back >>>>>>>>> to the >>>>>>>>>>> first record at a later time"). Jay and Jan already alluded to >>>>>>>>>>> >>>>>>>>>> ordering >>>>>>>>> problems that will be caused by dead letter queues. As I said, >>>>>>>>> retries >>>>>>>>>>> might be out of scope but perhaps the implications should be >>>>>>>>>>> considered >>>>>>>>>>> >>>>>>>>>> if >>>>>>>>>> >>>>>>>>>>> possible? >>>>>>>>>>> >>>>>>>>>>> Also, I wrote the text below before reaching the point in the >>>>>>>>>>> >>>>>>>>>> conversation >>>>>>>>>> >>>>>>>>>>> that this KIP's scope will be limited to exceptions in the >>>>>>>>>>> category of >>>>>>>>>>> poison pills / deserialization errors. But since Jay brought up >>>>>>>>>>> user >>>>>>>>>>> >>>>>>>>>> code >>>>>>>>>> >>>>>>>>>>> errors again, I decided to include it again. >>>>>>>>>>> >>>>>>>>>>> ----------------------------snip---------------------------- >>>>>>>>>>> A meta comment: I am not sure about this split between the code for >>>>>>>>>>> the >>>>>>>>>>> happy path (e.g. map/filter/... in the DSL) from the failure path >>>>>>>>>>> >>>>>>>>>> (using >>>>>>>>> exception handlers). In Scala, for example, we can do: >>>>>>>>>>> scala> val computation = scala.util.Try(1 / 0) >>>>>>>>>>> computation: scala.util.Try[Int] = >>>>>>>>>>> Failure(java.lang.ArithmeticException: / by zero) >>>>>>>>>>> >>>>>>>>>>> scala> computation.getOrElse(42) >>>>>>>>>>> res2: Int = 42 >>>>>>>>>>> >>>>>>>>>>> Another example with Scala's pattern matching, which is similar to >>>>>>>>>>> `KStream#branch()`: >>>>>>>>>>> >>>>>>>>>>> computation match { >>>>>>>>>>> case scala.util.Success(x) => x * 5 >>>>>>>>>>> case scala.util.Failure(_) => 42 >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>> (The above isn't the most idiomatic way to handle this in Scala, >>>>>>>>>>> but >>>>>>>>>>> >>>>>>>>>> that's >>>>>>>>>> >>>>>>>>>>> not the point I'm trying to make here.) >>>>>>>>>>> >>>>>>>>>>> Hence the question I'm raising here is: Do we want to have an API >>>>>>>>>>> where >>>>>>>>>>> >>>>>>>>>> you >>>>>>>>>> >>>>>>>>>>> code "the happy path", and then have a different code path for >>>>>>>>>>> failures >>>>>>>>>>> (using exceptions and handlers); or should we treat both >>>>>>>>>>> Success and >>>>>>>>>>> Failure in the same way? >>>>>>>>>>> >>>>>>>>>>> I think the failure/exception handling approach (as proposed in >>>>>>>>>>> this >>>>>>>>>>> >>>>>>>>>> KIP) >>>>>>>>> is well-suited for errors in the category of deserialization problems >>>>>>>>>> aka >>>>>>>>> poison pills, partly because the (default) serdes are defined through >>>>>>>>>>> configuration (explicit serdes however are defined through API >>>>>>>>>>> calls). >>>>>>>>>>> >>>>>>>>>>> However, I'm not yet convinced that the failure/exception handling >>>>>>>>>>> >>>>>>>>>> approach >>>>>>>>>> >>>>>>>>>>> is the best idea for user code exceptions, e.g. if you fail to >>>>>>>>>>> guard >>>>>>>>>>> against NPE in your lambdas or divide a number by zero. >>>>>>>>>>> >>>>>>>>>>> scala> val stream = Seq(1, 2, 3, 4, 5) >>>>>>>>>>> stream: Seq[Int] = List(1, 2, 3, 4, 5) >>>>>>>>>>> >>>>>>>>>>> // Here: Fallback to a sane default when encountering failed >>>>>>>>>>> >>>>>>>>>> records >>>>>>>>> scala> stream.map(x => Try(1/(3 - x))).flatMap(t => >>>>>>>>>>> Seq(t.getOrElse(42))) >>>>>>>>>>> res19: Seq[Int] = List(0, 1, 42, -1, 0) >>>>>>>>>>> >>>>>>>>>>> // Here: Skip over failed records >>>>>>>>>>> scala> stream.map(x => Try(1/(3 - x))).collect{ case >>>>>>>>>>> Success(s) >>>>>>>>>>> >>>>>>>>>> => s >>>>>>>>> } >>>>>>>>>>> res20: Seq[Int] = List(0, 1, -1, 0) >>>>>>>>>>> >>>>>>>>>>> The above is more natural to me than using error handlers to define >>>>>>>>>>> how >>>>>>>>>>> >>>>>>>>>> to >>>>>>>>>> >>>>>>>>>>> deal with failed records (here, the value `3` causes an arithmetic >>>>>>>>>>> exception). Again, it might help the KIP if we added an end-to-end >>>>>>>>>>> >>>>>>>>>> example >>>>>>>>>> >>>>>>>>>>> for such user code errors. >>>>>>>>>>> ----------------------------snip---------------------------- >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Tue, May 30, 2017 at 9:24 AM, Jan Filipiak < >>>>>>>>>>> >>>>>>>>>> jan.filip...@trivago.com> >>>>>>>>> wrote: >>>>>>>>>>> Hi Jay, >>>>>>>>>>>> Eno mentioned that he will narrow down the scope to only >>>>>>>>>>>> >>>>>>>>>>> ConsumerRecord >>>>>>>>> deserialisation. >>>>>>>>>>>> I am working with Database Changelogs only. I would really not >>>>>>>>>>>> like >>>>>>>>>>>> to >>>>>>>>>>>> >>>>>>>>>>> see >>>>>>>>>>> a dead letter queue or something >>>>>>>>>>>> similliar. how am I expected to get these back in order. Just >>>>>>>>>>>> grind >>>>>>>>>>>> to >>>>>>>>>>>> hold an call me on the weekend. I'll fix it >>>>>>>>>>>> then in a few minutes rather spend 2 weeks ordering dead letters. >>>>>>>>>>>> >>>>>>>>>>> (where >>>>>>>>> reprocessing might be even the faster fix) >>>>>>>>>>>> Best Jan >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On 29.05.2017 20:23, Jay Kreps wrote: >>>>>>>>>>>> >>>>>>>>>>>> - I think we should hold off on retries unless we have >>>>>>>>>>>> worked >>>>>>>>>>>> out >>>>>>>>> the >>>>>>>>>>> full usage pattern, people can always implement their own. I >>>>>>>>>>>> think >>>>>>>>> the idea >>>>>>>>>>>>> is that you send the message to some kind of dead >>>>>>>>>>>>> letter queue >>>>>>>>>>>>> >>>>>>>>>>>> and >>>>>>>>> then >>>>>>>>>>>>> replay these later. This obviously destroys all semantic >>>>>>>>>>>>> >>>>>>>>>>>> guarantees >>>>>>>>> we are >>>>>>>>>>>>> working hard to provide right now, which may be okay. >>>>>>>>>>>>> >>>>>>>>>>>>> >