Thanks Guozhang, I’ve updated the KIP and hopefully addressed all the comments so far. In the process also changed the name of the KIP to reflect its scope better: https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+deserialization+exception+handlers <https://cwiki.apache.org/confluence/display/KAFKA/KIP-161:+streams+deserialization+exception+handlers>
Any other feedback appreciated, otherwise I’ll start the vote soon. Thanks Eno > On Jun 12, 2017, at 6:28 AM, Guozhang Wang <wangg...@gmail.com> wrote: > > Eno, Thanks for bringing this proposal up and sorry for getting late on > this. Here are my two cents: > > 1. First some meta comments regarding "fail fast" v.s. "making progress". I > agree that in general we should better "enforce user to do the right thing" > in system design, but we also need to keep in mind that Kafka is a > multi-tenant system, i.e. from a Streams app's pov you probably would not > control the whole streaming processing pipeline end-to-end. E.g. Your input > data may not be controlled by yourself; it could be written by another app, > or another team in your company, or even a different organization, and if > an error happens maybe you cannot fix "to do the right thing" just by > yourself in time. In such an environment I think it is important to leave > the door open to let users be more resilient. So I find the current > proposal which does leave the door open for either fail-fast or make > progress quite reasonable. > > 2. On the other hand, if the question is whether we should provide a > built-in "send to bad queue" handler from the library, I think that might > be an overkill: with some tweaks (see my detailed comments below) on the > API we can allow users to implement such handlers pretty easily. In fact, I > feel even "LogAndThresholdExceptionHandler" is not necessary as a built-in > handler, as it would then require users to specify the threshold via > configs, etc. I think letting people provide such "eco-libraries" may be > better. > > 3. Regarding the CRC error: today we validate CRC on both the broker end > upon receiving produce requests and on consumer end upon receiving fetch > responses; and if the CRC validation fails in the former case it would not > be appended to the broker logs. So if we do see a CRC failure on the > consumer side it has to be that either we have a flipped bit on the broker > disks or over the wire. For the first case it is fatal while for the second > it is retriable. Unfortunately we cannot tell which case it is when seeing > CRC validation failures. But in either case, just skipping and making > progress seems not a good choice here, and hence I would personally exclude > these errors from the general serde errors to NOT leave the door open of > making progress. > > Currently such errors are thrown as KafkaException that wraps an > InvalidRecordException, which may be too general and we could consider just > throwing the InvalidRecordException directly. But that could be an > orthogonal discussion if we agrees that CRC failures should not be > considered in this KIP. > > ---------------- > > Now some detailed comments: > > 4. Could we consider adding the processor context in the handle() function > as well? This context will be wrapping as the source node that is about to > process the record. This could expose more info like which task / source > node sees this error, which timestamp of the message, etc, and also can > allow users to implement their handlers by exposing some metrics, by > calling context.forward() to implement the "send to bad queue" behavior etc. > > 5. Could you add the string name of > StreamsConfig.DEFAULT_RECORD_EXCEPTION_HANDLER as well in the KIP? > Personally I find "default" prefix a bit misleading since we do not allow > users to override it per-node yet. But I'm okay either way as I can see we > may extend it in the future and probably would like to not rename the > config again. Also from the experience of `default partitioner` and > `default timestamp extractor` we may also make sure that the passed in > object can be either a string "class name" or a class object? > > > Guozhang > > > On Wed, Jun 7, 2017 at 2:16 PM, Jan Filipiak <jan.filip...@trivago.com> > wrote: > >> Hi Eno, >> >> On 07.06.2017 22:49, Eno Thereska wrote: >> >>> Comments inline: >>> >>> On 5 Jun 2017, at 18:19, Jan Filipiak <jan.filip...@trivago.com> wrote: >>>> >>>> Hi >>>> >>>> just my few thoughts >>>> >>>> On 05.06.2017 11:44, Eno Thereska wrote: >>>> >>>>> Hi there, >>>>> >>>>> Sorry for the late reply, I was out this past week. Looks like good >>>>> progress was made with the discussions either way. Let me recap a couple >>>>> of >>>>> points I saw into one big reply: >>>>> >>>>> 1. Jan mentioned CRC errors. I think this is a good point. As these >>>>> happen in Kafka, before Kafka Streams gets a chance to inspect anything, >>>>> I'd like to hear the opinion of more Kafka folks like Ismael or Jason on >>>>> this one. Currently the documentation is not great with what to do once a >>>>> CRC check has failed. From looking at the code, it looks like the client >>>>> gets a KafkaException (bubbled up from the fetcher) and currently we in >>>>> streams catch this as part of poll() and fail. It might be advantageous to >>>>> treat CRC handling in a similar way to serialisation handling (e.g., have >>>>> the option to fail/skip). Let's see what the other folks say. Worst-case >>>>> we >>>>> can do a separate KIP for that if it proved too hard to do in one go. >>>>> >>>> there is no reasonable way to "skip" a crc error. How can you know the >>>> length you read was anything reasonable? you might be completely lost >>>> inside your response. >>>> >>> On the client side, every record received is checked for validity. As it >>> happens, if the CRC check fails the exception is wrapped with a >>> KafkaException that is thrown all the way to poll(). Assuming we change >>> that and poll() throws a CRC exception, I was thinking we could treat it >>> similarly to a deserialize exception and pass it to the exception handler >>> to decide what to do. Default would be to fail. This might need a Kafka KIP >>> btw and can be done separately from this KIP, but Jan, would you find this >>> useful? >>> >> I don't think so. IMO you can not reasonably continue parsing when the >> checksum of a message is not correct. If you are not sure you got the >> correct length, how can you be sure to find the next record? I would always >> straight fail in all cases. Its to hard for me to understand why one would >> try to continue. I mentioned CRC's because thats the only bad pills I ever >> saw so far. But I am happy that it just stopped and I could check what was >> going on. This will also be invasive in the client code then. >> >> If you ask me, I am always going to vote for "grind to halt" let the >> developers see what happened and let them fix it. It helps building good >> kafka experiences and better software and architectures. For me this is: >> "force the user todo the right thing". https://youtu.be/aAb7hSCtvGw?t=374 >> eg. not letting unexpected input slip by. Letting unexpected input slip by >> is what bought us 15+years of war of all sorts of ingestion attacks. I >> don't even dare to estimate how many missingrecords-search-teams going be >> formed, maybe some hackerone for stream apps :D >> >> Best Jan >> >> >>> >>>>> At a minimum, handling this type of exception will need to involve the >>>>> exactly-once (EoS) logic. We'd still allow the option of failing or >>>>> skipping, but EoS would need to clean up by rolling back all the side >>>>> effects from the processing so far. Matthias, how does this sound? >>>>> >>>> Eos will not help the record might be 5,6 repartitions down into the >>>> topology. I haven't followed but I pray you made EoS optional! We don't >>>> need this and we don't want this and we will turn it off if it comes. So I >>>> wouldn't recommend relying on it. The option to turn it off is better than >>>> forcing it and still beeing unable to rollback badpills (as explained >>>> before) >>>> >>> Yeah as Matthias mentioned EoS is optional. >>> >>> Thanks, >>> Eno >>> >>> >>> 6. Will add an end-to-end example as Michael suggested. >>>>> >>>>> Thanks >>>>> Eno >>>>> >>>>> >>>>> >>>>> On 4 Jun 2017, at 02:35, Matthias J. Sax <matth...@confluent.io> wrote: >>>>>> >>>>>> What I don't understand is this: >>>>>> >>>>>> From there on its the easiest way forward: fix, redeploy, start => >>>>>>> done >>>>>>> >>>>>> If you have many producers that work fine and a new "bad" producer >>>>>> starts up and writes bad data into your input topic, your Streams app >>>>>> dies but all your producers, including the bad one, keep writing. >>>>>> >>>>>> Thus, how would you fix this, as you cannot "remove" the corrupted date >>>>>> from the topic? It might take some time to identify the root cause and >>>>>> stop the bad producer. Up to this point you get good and bad data into >>>>>> your Streams input topic. If Streams app in not able to skip over those >>>>>> bad records, how would you get all the good data from the topic? Not >>>>>> saying it's not possible, but it's extra work copying the data with a >>>>>> new non-Streams consumer-producer-app into a new topic and than feed >>>>>> your Streams app from this new topic -- you also need to update all >>>>>> your >>>>>> upstream producers to write to the new topic. >>>>>> >>>>>> Thus, if you want to fail fast, you can still do this. And after you >>>>>> detected and fixed the bad producer you might just reconfigure your app >>>>>> to skip bad records until it reaches the good part of the data. >>>>>> Afterwards, you could redeploy with fail-fast again. >>>>>> >>>>>> >>>>>> Thus, for this pattern, I actually don't see any reason why to stop the >>>>>> Streams app at all. If you have a callback, and use the callback to >>>>>> raise an alert (and maybe get the bad data into a bad record queue), it >>>>>> will not take longer to identify and stop the "bad" producer. But for >>>>>> this case, you have zero downtime for your Streams app. >>>>>> >>>>>> This seems to be much simpler. Or do I miss anything? >>>>>> >>>>>> >>>>>> Having said this, I agree that the "threshold based callback" might be >>>>>> questionable. But as you argue for strict "fail-fast", I want to argue >>>>>> that this must not always be the best pattern to apply and that the >>>>>> overall KIP idea is super useful from my point of view. >>>>>> >>>>>> >>>>>> -Matthias >>>>>> >>>>>> >>>>>> On 6/3/17 11:57 AM, Jan Filipiak wrote: >>>>>> >>>>>>> Could not agree more! >>>>>>> >>>>>>> But then I think the easiest is still: print exception and die. >>>>>>> From there on its the easiest way forward: fix, redeploy, start => >>>>>>> done >>>>>>> >>>>>>> All the other ways to recover a pipeline that was processing partially >>>>>>> all the time >>>>>>> and suddenly went over a "I cant take it anymore" threshold is not >>>>>>> straight forward IMO. >>>>>>> >>>>>>> How to find the offset, when it became to bad when it is not the >>>>>>> latest >>>>>>> commited one? >>>>>>> How to reset there? with some reasonable stuff in your rockses? >>>>>>> >>>>>>> If one would do the following. The continuing Handler would measure >>>>>>> for >>>>>>> a threshold and >>>>>>> would terminate after a certain threshold has passed (per task). Then >>>>>>> one can use offset commit/ flush intervals >>>>>>> to make reasonable assumption of how much is slipping by + you get an >>>>>>> easy recovery when it gets to bad >>>>>>> + you could also account for "in processing" records. >>>>>>> >>>>>>> Setting this threshold to zero would cover all cases with 1 >>>>>>> implementation. It is still beneficial to have it pluggable >>>>>>> >>>>>>> Again CRC-Errors are the only bad pills we saw in production for now. >>>>>>> >>>>>>> Best Jan >>>>>>> >>>>>>> >>>>>>> On 02.06.2017 17:37, Jay Kreps wrote: >>>>>>> >>>>>>>> Jan, I agree with you philosophically. I think one practical >>>>>>>> challenge >>>>>>>> has >>>>>>>> to do with data formats. Many people use untyped events, so there is >>>>>>>> simply >>>>>>>> no guarantee on the form of the input. E.g. many companies use JSON >>>>>>>> without >>>>>>>> any kind of schema so it becomes very hard to assert anything about >>>>>>>> the >>>>>>>> input which makes these programs very fragile to the "one accidental >>>>>>>> message publication that creates an unsolvable problem. >>>>>>>> >>>>>>>> For that reason I do wonder if limiting to just serialization >>>>>>>> actually >>>>>>>> gets >>>>>>>> you a useful solution. For JSON it will help with the problem of >>>>>>>> non-parseable JSON, but sounds like it won't help in the case where >>>>>>>> the >>>>>>>> JSON is well-formed but does not have any of the fields you expect >>>>>>>> and >>>>>>>> depend on for your processing. I expect the reason for limiting the >>>>>>>> scope >>>>>>>> is it is pretty hard to reason about correctness for anything that >>>>>>>> stops in >>>>>>>> the middle of processing an operator DAG? >>>>>>>> >>>>>>>> -Jay >>>>>>>> >>>>>>>> On Fri, Jun 2, 2017 at 4:50 AM, Jan Filipiak < >>>>>>>> jan.filip...@trivago.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>> IMHO your doing it wrong then. + building to much support into the >>>>>>>>> kafka >>>>>>>>> eco system is very counterproductive in fostering a happy userbase >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On 02.06.2017 13:15, Damian Guy wrote: >>>>>>>>> >>>>>>>>> Jan, you have a choice to Fail fast if you want. This is about >>>>>>>>>> giving >>>>>>>>>> people options and there are times when you don't want to fail >>>>>>>>>> fast. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Fri, 2 Jun 2017 at 11:00 Jan Filipiak <jan.filip...@trivago.com >>>>>>>>>>> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>> Hi >>>>>>>>>> >>>>>>>>>>> 1. >>>>>>>>>>> That greatly complicates monitoring. Fail Fast gives you that >>>>>>>>>>> when >>>>>>>>>>> you >>>>>>>>>>> monitor only the lag of all your apps >>>>>>>>>>> you are completely covered. With that sort of new application >>>>>>>>>>> Monitoring >>>>>>>>>>> is very much more complicated as >>>>>>>>>>> you know need to monitor fail % of some special apps aswell. In my >>>>>>>>>>> opinion that is a huge downside already. >>>>>>>>>>> >>>>>>>>>>> 2. >>>>>>>>>>> using a schema regerstry like Avrostuff it might not even be the >>>>>>>>>>> record >>>>>>>>>>> that is broken, it might be just your app >>>>>>>>>>> unable to fetch a schema it needs now know. Maybe you got >>>>>>>>>>> partitioned >>>>>>>>>>> away from that registry. >>>>>>>>>>> >>>>>>>>>>> 3. When you get alerted because of to high fail percentage. what >>>>>>>>>>> are the >>>>>>>>>>> steps you gonna do? >>>>>>>>>>> shut it down to buy time. fix the problem. spend way to much time >>>>>>>>>>> to >>>>>>>>>>> find a good reprocess offset. >>>>>>>>>>> Your timewindows are in bad shape anyways, and you pretty much >>>>>>>>>>> lost. >>>>>>>>>>> This routine is nonsense. >>>>>>>>>>> >>>>>>>>>>> Dead letter queues would be the worst possible addition to the >>>>>>>>>>> kafka >>>>>>>>>>> toolkit that I can think of. It just doesn't fit the architecture >>>>>>>>>>> of having clients falling behind is a valid option. >>>>>>>>>>> >>>>>>>>>>> Further. I mentioned already the only bad pill ive seen so far is >>>>>>>>>>> crc >>>>>>>>>>> errors. any plans for those? >>>>>>>>>>> >>>>>>>>>>> Best Jan >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On 02.06.2017 11:34, Damian Guy wrote: >>>>>>>>>>> >>>>>>>>>>> I agree with what Matthias has said w.r.t failing fast. There are >>>>>>>>>>>> plenty >>>>>>>>>>>> >>>>>>>>>>>> of >>>>>>>>>>> >>>>>>>>>>> times when you don't want to fail-fast and must attempt to make >>>>>>>>>>>> >>>>>>>>>>>> progress. >>>>>>>>>>> >>>>>>>>>>> The dead-letter queue is exactly for these circumstances. Of >>>>>>>>>>>> course if >>>>>>>>>>>> every record is failing, then you probably do want to give up. >>>>>>>>>>>> >>>>>>>>>>>> On Fri, 2 Jun 2017 at 07:56 Matthias J. Sax < >>>>>>>>>>>> matth...@confluent.io> >>>>>>>>>>>> >>>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>> First a meta comment. KIP discussion should take place on the dev >>>>>>>>>>>> list >>>>>>>>>>>> >>>>>>>>>>>>> -- if user list is cc'ed please make sure to reply to both >>>>>>>>>>>>> lists. >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks. >>>>>>>>>>>> Thanks for making the scope of the KIP clear. Makes a lot of >>>>>>>>>>>> sense to >>>>>>>>>>>> >>>>>>>>>>>>> focus on deserialization exceptions for now. >>>>>>>>>>>>> >>>>>>>>>>>>> With regard to corrupted state stores, would it make sense to >>>>>>>>>>>>> fail a >>>>>>>>>>>>> task and wipe out the store to repair it via recreation from the >>>>>>>>>>>>> changelog? That's of course a quite advance pattern, but I want >>>>>>>>>>>>> to >>>>>>>>>>>>> bring >>>>>>>>>>>>> it up to design the first step in a way such that we can get >>>>>>>>>>>>> there (if >>>>>>>>>>>>> we think it's a reasonable idea). >>>>>>>>>>>>> >>>>>>>>>>>>> I also want to comment about fail fast vs making progress. I >>>>>>>>>>>>> think that >>>>>>>>>>>>> fail-fast must not always be the best option. The scenario I >>>>>>>>>>>>> have in >>>>>>>>>>>>> mind is like this: you got a bunch of producers that feed the >>>>>>>>>>>>> Streams >>>>>>>>>>>>> input topic. Most producers work find, but maybe one producer >>>>>>>>>>>>> miss >>>>>>>>>>>>> behaves and the data it writes is corrupted. You might not even >>>>>>>>>>>>> be able >>>>>>>>>>>>> to recover this lost data at any point -- thus, there is no >>>>>>>>>>>>> reason to >>>>>>>>>>>>> stop processing but you just skip over those records. Of >>>>>>>>>>>>> course, you >>>>>>>>>>>>> need to fix the root cause, and thus you need to alert (either >>>>>>>>>>>>> via logs >>>>>>>>>>>>> of the exception handler directly) and you need to start to >>>>>>>>>>>>> investigate >>>>>>>>>>>>> to find the bad producer, shut it down and fix it. >>>>>>>>>>>>> >>>>>>>>>>>>> Here the dead letter queue comes into place. From my >>>>>>>>>>>>> understanding, the >>>>>>>>>>>>> purpose of this feature is solely enable post debugging. I don't >>>>>>>>>>>>> think >>>>>>>>>>>>> those record would be fed back at any point in time (so I don't >>>>>>>>>>>>> see any >>>>>>>>>>>>> ordering issue -- a skipped record, with this regard, is just >>>>>>>>>>>>> "fully >>>>>>>>>>>>> processed"). Thus, the dead letter queue should actually encode >>>>>>>>>>>>> the >>>>>>>>>>>>> original records metadata (topic, partition offset etc) to >>>>>>>>>>>>> enable >>>>>>>>>>>>> such >>>>>>>>>>>>> debugging. I guess, this might also be possible if you just log >>>>>>>>>>>>> the bad >>>>>>>>>>>>> records, but it would be harder to access (you first must find >>>>>>>>>>>>> the >>>>>>>>>>>>> Streams instance that did write the log and extract the >>>>>>>>>>>>> information >>>>>>>>>>>>> from >>>>>>>>>>>>> there). Reading it from topic is much simpler. >>>>>>>>>>>>> >>>>>>>>>>>>> I also want to mention the following. Assume you have such a >>>>>>>>>>>>> topic with >>>>>>>>>>>>> some bad records and some good records. If we always fail-fast, >>>>>>>>>>>>> it's >>>>>>>>>>>>> going to be super hard to process the good data. You would need >>>>>>>>>>>>> to >>>>>>>>>>>>> write >>>>>>>>>>>>> an extra app that copied the data into a new topic filtering >>>>>>>>>>>>> out the >>>>>>>>>>>>> bad >>>>>>>>>>>>> records (or apply the map() workaround withing stream). So I >>>>>>>>>>>>> don't >>>>>>>>>>>>> think >>>>>>>>>>>>> that failing fast is most likely the best option in production >>>>>>>>>>>>> is >>>>>>>>>>>>> necessarily, true. >>>>>>>>>>>>> >>>>>>>>>>>>> Or do you think there are scenarios, for which you can recover >>>>>>>>>>>>> the >>>>>>>>>>>>> corrupted records successfully? And even if this is possible, it >>>>>>>>>>>>> might >>>>>>>>>>>>> be a case for reprocessing instead of failing the whole >>>>>>>>>>>>> application? >>>>>>>>>>>>> Also, if you think you can "repair" a corrupted record, should >>>>>>>>>>>>> the >>>>>>>>>>>>> handler allow to return a "fixed" record? This would solve the >>>>>>>>>>>>> ordering >>>>>>>>>>>>> problem. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> -Matthias >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On 5/30/17 1:47 AM, Michael Noll wrote: >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks for your work on this KIP, Eno -- much appreciated! >>>>>>>>>>>>>> >>>>>>>>>>>>>> - I think it would help to improve the KIP by adding an >>>>>>>>>>>>>> end-to-end >>>>>>>>>>>>>> code >>>>>>>>>>>>>> example that demonstrates, with the DSL and with the Processor >>>>>>>>>>>>>> API, >>>>>>>>>>>>>> how >>>>>>>>>>>>>> >>>>>>>>>>>>>> the >>>>>>>>>>>>> >>>>>>>>>>>>> user would write a simple application that would then be >>>>>>>>>>>>>> augmented >>>>>>>>>>>>>> with >>>>>>>>>>>>>> >>>>>>>>>>>>>> the >>>>>>>>>>>>> >>>>>>>>>>>>> proposed KIP changes to handle exceptions. It should also >>>>>>>>>>>>>> become much >>>>>>>>>>>>>> clearer then that e.g. the KIP would lead to different code >>>>>>>>>>>>>> paths for >>>>>>>>>>>>>> >>>>>>>>>>>>>> the >>>>>>>>>>>>> >>>>>>>>>>>> happy case and any failure scenarios. >>>>>>>>>>>> >>>>>>>>>>>>> - Do we have sufficient information available to make informed >>>>>>>>>>>>>> >>>>>>>>>>>>>> decisions >>>>>>>>>>>>> >>>>>>>>>>>> on >>>>>>>>>>>> >>>>>>>>>>>>> what to do next? For example, do we know in which part of the >>>>>>>>>>>>>> topology >>>>>>>>>>>>>> >>>>>>>>>>>>>> the >>>>>>>>>>>>> >>>>>>>>>>>>> record failed? `ConsumerRecord` gives us access to topic, >>>>>>>>>>>>>> partition, >>>>>>>>>>>>>> offset, timestamp, etc., but what about topology-related >>>>>>>>>>>>>> information >>>>>>>>>>>>>> >>>>>>>>>>>>>> (e.g. >>>>>>>>>>>>> >>>>>>>>>>>>> what is the associated state store, if any)? >>>>>>>>>>>>>> >>>>>>>>>>>>>> - Only partly on-topic for the scope of this KIP, but this is >>>>>>>>>>>>>> about >>>>>>>>>>>>>> the >>>>>>>>>>>>>> bigger picture: This KIP would give users the option to send >>>>>>>>>>>>>> corrupted >>>>>>>>>>>>>> records to dead letter queue (quarantine topic). But, what >>>>>>>>>>>>>> pattern >>>>>>>>>>>>>> >>>>>>>>>>>>>> would >>>>>>>>>>>>> >>>>>>>>>>>> we advocate to process such a dead letter queue then, e.g. how to >>>>>>>>>>>> allow >>>>>>>>>>>> >>>>>>>>>>>>> for >>>>>>>>>>>>> >>>>>>>>>>>>> retries with backoff ("If the first record in the dead letter >>>>>>>>>>>>>> queue >>>>>>>>>>>>>> >>>>>>>>>>>>>> fails >>>>>>>>>>>>> >>>>>>>>>>>> again, then try the second record for the time being and go back >>>>>>>>>>>> to the >>>>>>>>>>>> >>>>>>>>>>>>> first record at a later time"). Jay and Jan already alluded to >>>>>>>>>>>>>> >>>>>>>>>>>>>> ordering >>>>>>>>>>>>> >>>>>>>>>>>> problems that will be caused by dead letter queues. As I said, >>>>>>>>>>>> retries >>>>>>>>>>>> >>>>>>>>>>>>> might be out of scope but perhaps the implications should be >>>>>>>>>>>>>> considered >>>>>>>>>>>>>> >>>>>>>>>>>>>> if >>>>>>>>>>>>> >>>>>>>>>>>>> possible? >>>>>>>>>>>>>> >>>>>>>>>>>>>> Also, I wrote the text below before reaching the point in the >>>>>>>>>>>>>> >>>>>>>>>>>>>> conversation >>>>>>>>>>>>> >>>>>>>>>>>>> that this KIP's scope will be limited to exceptions in the >>>>>>>>>>>>>> category of >>>>>>>>>>>>>> poison pills / deserialization errors. But since Jay brought >>>>>>>>>>>>>> up >>>>>>>>>>>>>> user >>>>>>>>>>>>>> >>>>>>>>>>>>>> code >>>>>>>>>>>>> >>>>>>>>>>>>> errors again, I decided to include it again. >>>>>>>>>>>>>> >>>>>>>>>>>>>> ----------------------------snip---------------------------- >>>>>>>>>>>>>> A meta comment: I am not sure about this split between the >>>>>>>>>>>>>> code for >>>>>>>>>>>>>> the >>>>>>>>>>>>>> happy path (e.g. map/filter/... in the DSL) from the failure >>>>>>>>>>>>>> path >>>>>>>>>>>>>> >>>>>>>>>>>>>> (using >>>>>>>>>>>>> >>>>>>>>>>>> exception handlers). In Scala, for example, we can do: >>>>>>>>>>>> >>>>>>>>>>>>> scala> val computation = scala.util.Try(1 / 0) >>>>>>>>>>>>>> computation: scala.util.Try[Int] = >>>>>>>>>>>>>> Failure(java.lang.ArithmeticException: / by zero) >>>>>>>>>>>>>> >>>>>>>>>>>>>> scala> computation.getOrElse(42) >>>>>>>>>>>>>> res2: Int = 42 >>>>>>>>>>>>>> >>>>>>>>>>>>>> Another example with Scala's pattern matching, which is >>>>>>>>>>>>>> similar to >>>>>>>>>>>>>> `KStream#branch()`: >>>>>>>>>>>>>> >>>>>>>>>>>>>> computation match { >>>>>>>>>>>>>> case scala.util.Success(x) => x * 5 >>>>>>>>>>>>>> case scala.util.Failure(_) => 42 >>>>>>>>>>>>>> } >>>>>>>>>>>>>> >>>>>>>>>>>>>> (The above isn't the most idiomatic way to handle this in >>>>>>>>>>>>>> Scala, >>>>>>>>>>>>>> but >>>>>>>>>>>>>> >>>>>>>>>>>>>> that's >>>>>>>>>>>>> >>>>>>>>>>>>> not the point I'm trying to make here.) >>>>>>>>>>>>>> >>>>>>>>>>>>>> Hence the question I'm raising here is: Do we want to have an >>>>>>>>>>>>>> API >>>>>>>>>>>>>> where >>>>>>>>>>>>>> >>>>>>>>>>>>>> you >>>>>>>>>>>>> >>>>>>>>>>>>> code "the happy path", and then have a different code path for >>>>>>>>>>>>>> failures >>>>>>>>>>>>>> (using exceptions and handlers); or should we treat both >>>>>>>>>>>>>> Success and >>>>>>>>>>>>>> Failure in the same way? >>>>>>>>>>>>>> >>>>>>>>>>>>>> I think the failure/exception handling approach (as proposed in >>>>>>>>>>>>>> this >>>>>>>>>>>>>> >>>>>>>>>>>>>> KIP) >>>>>>>>>>>>> >>>>>>>>>>>> is well-suited for errors in the category of deserialization >>>>>>>>>>>> problems >>>>>>>>>>>> >>>>>>>>>>>>> aka >>>>>>>>>>>>> >>>>>>>>>>>> poison pills, partly because the (default) serdes are defined >>>>>>>>>>>> through >>>>>>>>>>>> >>>>>>>>>>>>> configuration (explicit serdes however are defined through API >>>>>>>>>>>>>> calls). >>>>>>>>>>>>>> >>>>>>>>>>>>>> However, I'm not yet convinced that the failure/exception >>>>>>>>>>>>>> handling >>>>>>>>>>>>>> >>>>>>>>>>>>>> approach >>>>>>>>>>>>> >>>>>>>>>>>>> is the best idea for user code exceptions, e.g. if you fail to >>>>>>>>>>>>>> guard >>>>>>>>>>>>>> against NPE in your lambdas or divide a number by zero. >>>>>>>>>>>>>> >>>>>>>>>>>>>> scala> val stream = Seq(1, 2, 3, 4, 5) >>>>>>>>>>>>>> stream: Seq[Int] = List(1, 2, 3, 4, 5) >>>>>>>>>>>>>> >>>>>>>>>>>>>> // Here: Fallback to a sane default when encountering >>>>>>>>>>>>>> failed >>>>>>>>>>>>>> >>>>>>>>>>>>>> records >>>>>>>>>>>>> >>>>>>>>>>>> scala> stream.map(x => Try(1/(3 - x))).flatMap(t => >>>>>>>>>>>> >>>>>>>>>>>>> Seq(t.getOrElse(42))) >>>>>>>>>>>>>> res19: Seq[Int] = List(0, 1, 42, -1, 0) >>>>>>>>>>>>>> >>>>>>>>>>>>>> // Here: Skip over failed records >>>>>>>>>>>>>> scala> stream.map(x => Try(1/(3 - x))).collect{ case >>>>>>>>>>>>>> Success(s) >>>>>>>>>>>>>> >>>>>>>>>>>>>> => s >>>>>>>>>>>>> >>>>>>>>>>>> } >>>>>>>>>>>> >>>>>>>>>>>>> res20: Seq[Int] = List(0, 1, -1, 0) >>>>>>>>>>>>>> >>>>>>>>>>>>>> The above is more natural to me than using error handlers to >>>>>>>>>>>>>> define >>>>>>>>>>>>>> how >>>>>>>>>>>>>> >>>>>>>>>>>>>> to >>>>>>>>>>>>> >>>>>>>>>>>>> deal with failed records (here, the value `3` causes an >>>>>>>>>>>>>> arithmetic >>>>>>>>>>>>>> exception). Again, it might help the KIP if we added an >>>>>>>>>>>>>> end-to-end >>>>>>>>>>>>>> >>>>>>>>>>>>>> example >>>>>>>>>>>>> >>>>>>>>>>>>> for such user code errors. >>>>>>>>>>>>>> ----------------------------snip---------------------------- >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Tue, May 30, 2017 at 9:24 AM, Jan Filipiak < >>>>>>>>>>>>>> >>>>>>>>>>>>>> jan.filip...@trivago.com> >>>>>>>>>>>>> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi Jay, >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Eno mentioned that he will narrow down the scope to only >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> ConsumerRecord >>>>>>>>>>>>>> >>>>>>>>>>>>> deserialisation. >>>>>>>>>>>> >>>>>>>>>>>>> I am working with Database Changelogs only. I would really not >>>>>>>>>>>>>>> like >>>>>>>>>>>>>>> to >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> see >>>>>>>>>>>>>> a dead letter queue or something >>>>>>>>>>>>>> >>>>>>>>>>>>>>> similliar. how am I expected to get these back in order. Just >>>>>>>>>>>>>>> grind >>>>>>>>>>>>>>> to >>>>>>>>>>>>>>> hold an call me on the weekend. I'll fix it >>>>>>>>>>>>>>> then in a few minutes rather spend 2 weeks ordering dead >>>>>>>>>>>>>>> letters. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> (where >>>>>>>>>>>>>> >>>>>>>>>>>>> reprocessing might be even the faster fix) >>>>>>>>>>>> >>>>>>>>>>>>> Best Jan >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On 29.05.2017 20:23, Jay Kreps wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> - I think we should hold off on retries unless we have >>>>>>>>>>>>>>> worked >>>>>>>>>>>>>>> out >>>>>>>>>>>>>>> >>>>>>>>>>>>>> the >>>>>>>>>>>> >>>>>>>>>>>>> full usage pattern, people can always implement their >>>>>>>>>>>>>> own. I >>>>>>>>>>>>>> >>>>>>>>>>>>>>> think >>>>>>>>>>>>>>> >>>>>>>>>>>>>> the idea >>>>>>>>>>>> >>>>>>>>>>>>> is that you send the message to some kind of dead >>>>>>>>>>>>>>>> letter queue >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>> >>>>>>>>>>>>>> then >>>>>>>>>>>> >>>>>>>>>>>>> replay these later. This obviously destroys all semantic >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> guarantees >>>>>>>>>>>>>>> >>>>>>>>>>>>>> we are >>>>>>>>>>>> >>>>>>>>>>>>> working hard to provide right now, which may be okay. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >> > > > -- > -- Guozhang