I don't think it's possible to come up with something that will catch failing elements at the level of whole PTransform's, because: - You can't return elements failing some internal transform of the composite PTransform, because 1) they are a private implementation detail of that transform and should not be exposed 2) their type is unknown and different from the type of input elements - That leaves you only with the option of returning elements of (one of) the transform's input(s) that in some way caused something else inside the transform to fail. Tracking this across a GroupByKey is, I believe, near-impossible (or if possible, it would be very hard to interpret the result of that), e.g. imagine you're computing a sum of your elements, and then a downstream function fails declaring that the sum is too large: which of the elements that went into the sum should the failure be attributed to?
I think the only practical thing you can do here is 1) come up with a utility "DoFn that emits failing elements to an additional output tag" 2) come up with "best practices" for structuring your composite transform in ways that surface failures nicely, without violating abstraction boundaries (per bullet 1 above). Not sure what the best design for that would look like, but seems doable, suggestions welcome :) On Thu, Jun 22, 2017 at 6:25 PM Dmitry Demeshchuk <[email protected]> wrote: > I guess, an even more ideal approach would be something like this, which > also seems more doable: > > def third_char_is_an_a(word): > if word[2] == 'a': > return [word] > return [] > > output = (p > | 'read' >> > ReadFromText('gs://dataflow-samples/shakespeare/kinglear.txt' > > ) > | 'recorder' >> CaptureDownstreamFailures('my_failure_key') > | 'split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x)) > | 'find_words_where_third_char_is_an_a' >> > (beam.FlatMap(third_char_is_an_a))) > > output | 'failures' >> Failures('my_failure_key') >> > WriteToText('gs://my-bucket/failures') > output | 'write' >> WriteToText('gs://my-bucket/output') > > > > On Thu, Jun 22, 2017 at 5:34 PM, Dmitry Demeshchuk <[email protected]> > wrote: > >> Hi folks, >> >> I’ve been recently struggling with the following problem. >> >> Data is quirky. It can have unicode, it can have poor escaping, or can be >> truncated, etc. Sometimes it happens due to problems with the processing >> code, sometimes it’s the data producing code, sometimes both. >> >> That said, if my data pipeline fails somewhere, I’d like to dump the >> problematic piece of data somewhere for later analysis. Suppose we have a >> pipeline: >> >> def third_char_is_an_a(word): >> if word[2] == 'a': >> return [word] >> return [] >> >> output = (p >> | 'read' >> >> ReadFromText('gs://dataflow-samples/shakespeare/kinglear.txt') >> | 'split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x)) >> | 'find_words_where_third_char_is_an_a' >> >> (beam.FlatMap(third_char_is_an_a))) >> >> output | 'write' >> WriteToText('gs://my-bucket/output') >> >> This pipeline will be failing, because I’m an idiot, and English has some >> words shorter than 3 characters. What I’d like, however, is being able to >> easily record these failures. For example I can just rewrite the whole >> pipeline: >> >> def third_char_is_an_a(word): >> try: >> if word[2] == 'a': >> return [] >> return [] >> except Exception: >> return [word] >> >> output = (p >> | 'read' >> >> ReadFromText('gs://dataflow-samples/shakespeare/kinglear.txt') >> | 'split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x)) >> | 'find_words_where_third_char_is_an_a' >> >> (beam.FlatMap(third_char_is_an_a))) >> >> output | 'write' >> WriteToText('gs://my-bucket/failed_words') >> >> If I wanted to still keep the succeeded results, I’d normally need to >> write something more complicated: >> >> def third_char_is_an_a(word): >> try: >> if word[2] == 'a': >> return [(1, word)] >> return [] >> except Exception: >> return [(0, word)] >> >> output = (p >> | 'read' >> >> ReadFromText('gs://dataflow-samples/shakespeare/kinglear.txt') >> | 'split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x)) >> | 'find_words_where_third_char_is_an_a' >> >> (beam.FlatMap(third_char_is_an_a)) >> >> | 'partition' >> (beam.Partition(lambda x: x[0]))) >> >> successes = output[1] | 'extract' >> beam.Map(lambda x: x[1]) >> failures = output[0] | 'extract' >> beam.Map(lambda x: x[0]) >> >> successes | 'write' >> WriteToText('gs://my-bucket/failed_words') >> failures | 'write' >> WriteToText('gs://my-bucket/failed_words') >> >> Would it be possible to instead make a generic PTransform named like >> DoOrReportBadData that allows doing something like this? >> >> def third_char_is_an_a(word): >> if word[2] == 'a': >> return [word] >> return [] >> >> failure_sink = WriteToText('gs://my-bucket/failed_words') >> >> output = (p >> | 'read' >> >> ReadFromText('gs://dataflow-samples/shakespeare/kinglear.txt') >> | 'split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x)) >> | 'find_words_where_third_char_is_an_a' >> >> DoOrRecordBadData(beam.FlatMap(third_char_is_an_a), >> failure_sink=WriteToText('gs://my-bucket/failed_words'))) >> >> output | 'write' >> WriteToText('gs://my-bucket/output') >> >> I’ve been trying to think of a way to implement that for any arbitrary >> PTransform, but in vain. It’s easy enough to implement that for a DoFn, >> and maybe that’s what I should do for starters? >> >> Also, this begs the second question. Can we somehow report the failures >> back to the upstream step? Say, instead of recording the actual word that >> failed, I’d rather record the initial data. For example: >> >> def third_char_is_an_a(word): >> if word[2] == 'a': >> return [word] >> return [] >> >> output = (p >> | 'read' >> >> ReadFromText('gs://dataflow-samples/shakespeare/kinglear.txt') >> | 'reporter' >> >> ReportDownstreamFailures(WriteToText('gs://my-bucket/failed_words')) >> | 'split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x)) >> | 'find_words_where_third_char_is_an_a' >> >> (beam.FlatMap(third_char_is_an_a))) >> >> output | 'write' >> WriteToText('gs://my-bucket/output') >> >> My guess is that the second option isn’t possible, but I’m still learning >> Beam, so may be wrong on that. I think either option (but especially the >> second one) would be super useful down the road for stream processing of >> data, so that pipelines can have some sort of a dumping ground for >> problematic items (which can be then looked into by human beings), while >> overall the pipeline is still running. >> >> Any thoughts would be very appreciated. >> >> -- >> Best regards, >> Dmitry Demeshchuk. >> > > > > -- > Best regards, > Dmitry Demeshchuk. >
