This blog post was an excellent find. If I had infinite time I'd take a stab at implementing this. They basically outline an algorithm which *might* be appropriate for a generalized solution. It certainly beats my "try to parse 3 records and if you do pretend you're good" method.
Peter On Tue, Apr 24, 2018 at 4:46 PM, Eugene Kirpichov <[email protected]> wrote: > Actually, you're right, this is not a pathological case. If we take a > regular 1TB-sized CSV file that actually doesn't have any quotes, and start > looking somewhere in the middle of it, there is no way to know whether > we're currently inside or outside quotes without scanning the whole file - > in theory there might be a quote lurking a few GB back. I suppose this can > be addressed with specifying limits on field sizes in bytes: e.g. with a > limit of 1kb, if there's no quotes in the preceding 1kb, then we're > definitely in an unquoted context. However, if there is a quote, it may be > either opening or closing the quoted context. There might be some way to > resolve the ambiguity, https://blog.etleap.com/2016/ > 11/27/distributed-csv-parsing/ seems to discuss this in detail. > > On Tue, Apr 24, 2018 at 3:26 PM Eugene Kirpichov <[email protected]> > wrote: > >> Robert - you're right, but this is a pathological case. It signals that >> there *might* be cases where we'll need to scan the whole file, however for >> practical purposes it's more important whether we need to scan the whole >> file in *all* (or most) cases - i.e. whether no amount of backward scanning >> of a non-pathological file can give us confidence that we're truly located >> a record boundary. >> >> On Tue, Apr 24, 2018 at 3:21 PM Robert Bradshaw <[email protected]> >> wrote: >> >>> On Tue, Apr 24, 2018 at 3:18 PM Eugene Kirpichov <[email protected]> >>> wrote: >>> >>> > I think the first question that has to be answered here is: Is it >>> possible *at all* to implement parallel reading of RFC 4180? >>> >>> No. Consider a multi-record CSV file with no quotes. Placing a quote at >>> the >>> start and end gives a new CSV file with exactly one element. >>> >>> > I.e., given a start byte offset, is it possible to reliably locate the >>> first record boundary at or after that offset while scanning only a small >>> amount of data? >>> > If it is possible, then that's what the SDF (or BoundedSource, etc.) >>> should do - split into blind byte ranges, and use this algorithm to >>> assign >>> consistent meaning to byte ranges. >>> >>> > To answer your questions 2 and 3: think of it this way. >>> > The SDF's ProcessElement takes an element and a restriction. >>> > ProcessElement must make only one promise: that it will correctly >>> perform >>> exactly the work associated with this element and restriction. >>> > The challenge is that the restriction can become smaller while >>> ProcessElement runs - in which case, ProcessElement must also do fewer >>> work. This can happen concurrently to ProcessElement running, so really >>> the >>> guarantee should be rephrased as "By the time ProcessElement completes, >>> it >>> should have performed exactly the work associated with the element and >>> tracker.currentRestriction() at the moment of completion". >>> >>> > This is all that is asked of ProcessElement. If Beam decides to ask the >>> tracker to split itself into two ranges (making the current one - >>> "primary" >>> - smaller, and producing an additional one - "residual"), Beam of course >>> takes the responsibility for executing the residual restriction somewhere >>> else: it won't be lost. >>> >>> > E.g. if ProcessElement was invoked with [a, b), but while it was >>> invoked >>> it was split into [a, b-100) and [b-100, b), then the current >>> ProcessElement call must process [a, b-100), and Beam guarantees that it >>> will fire up another ProcessElement call for [b-100, b) (Of course, both >>> of >>> these calls may end up being recursively split further). >>> >>> > I'm not quite sure what you mean by "recombining" - please let me know >>> if >>> the explanation above makes things clear enough or not. >>> >>> > On Tue, Apr 24, 2018 at 2:55 PM Peter Brumblay < >>> [email protected]> >>> wrote: >>> >>> >> Hi Eugene, thank you for the feedback! >>> >>> >> TextIO.read() can't handle RFC 4180 in full (at least I don't think it >>> does!) - we have a lot of source data with embedded newlines. These >>> records >>> get split improperly because TextIO.read() blindly looks for newline >>> characters. We need something which natively understands embedded >>> newlines >>> in quoted fields ... like so: >>> >>> >> foo,bar,"this has an\r\nembedded newline",192928\r\n >>> >>> >> As for the other feedback: >>> >>> >> 1. Claiming the entire range - yes, I figured this was a major >>> mistake. >>> Thanks for the confirmation. >>> >> 2. The code for initial splitting of the restriction seems very >>> complex... >>> >>> >> Follow-up question: if I process (and claim) only a subset of a range, >>> say [a, b - 100), and [b - 100, b) represents an incomplete block, will >>> beam SDF dynamically recombine ranges such that [b - 100, b + N) is sent >>> to >>> a worker with a (potentially) complete block? >>> >>> >> 3. Fine-tuning the evenness .... if beam SDF re-combines ranges for >>> split blocks then it sounds like arbitrary splits in splitFunction() >>> makes >>> more sense. >>> >>> >> I'll try to take another pass at this with your feedback in mind. >>> >>> >> Peter >>> >>> >>> >>> >> On Tue, Apr 24, 2018 at 3:08 PM, Eugene Kirpichov < >>> [email protected]> >>> wrote: >>> >>> >>> Hi Peter, >>> >>> >>> Thanks for experimenting with SDF! However, in this particular case: >>> any reason why you can't just use TextIO.read() and parse each line as >>> CSV? >>> Seems like that would require considerably less code. >>> >>> >>> A few comments on this code per se: >>> >>> - The ProcessElement implementation immediately claims the entire >>> range, which means that there can be no dynamic splitting and the code >>> behaves equivalently to a regular DoFn >>> >>> - The code for initial splitting of the restriction seems very >>> complex >>> - can you just split it blindly into a bunch of byte ranges of about >>> equal >>> size? Looking at the actual data while splitting should be never >>> necessary >>> - you should be able to just look at the file size (say, 100MB) and split >>> it into a bunch of splits, say, [0, 10MB), [10MB, 20MB) etc. >>> >>> - It seems that the splitting code tries to align splits with record >>> boundaries - this is not useful: it does not matter whether the split >>> boundaries fall onto record boundaries or not; instead, the reading code >>> should be able to read an arbitrary range of bytes in a meaningful way. >>> That typically means that reading [a, b) means "start at the first record >>> boundary located at or after "a", end at the first record boundary >>> located >>> at or after "b"" >>> >>> - Fine-tuning the evenness of initial splitting is also not useful: >>> dynamic splitting will even things out anyway; moreover, even if you are >>> able to achieve an equal amount of data read by different restrictions, >>> it >>> does not translate into equal time to process the data with the ParDo's >>> fused into the same bundle (and that time is unpredictable). >>> >>> >>> >>> On Tue, Apr 24, 2018 at 1:24 PM Peter Brumblay >>> >>> <[email protected]> >>> wrote: >>> >>> >>>> Hi All, >>> >>> >>>> I noticed that there is no support for CSV file reading (e.g. >>> rfc4180) >>> in Apache Beam - at least no native transform. There's an issue to add >>> this >>> support: https://issues.apache.org/jira/browse/BEAM-51. >>> >>> >>>> I've seen examples which use the apache commons csv parser. I took a >>> shot at implementing a SplittableDoFn transform. I have the full code and >>> some questions in a gist here: >>> https://gist.github.com/pbrumblay/9474dcc6cd238c3f1d26d869a20e863d. >>> >>> >>>> I suspect it could be improved quite a bit. If anyone has time to >>> provide feedback I would really appreciate it. >>> >>> >>>> Regards, >>> >>> >>>> Peter Brumblay >>> >>>> Fearless Technology Group, Inc. >>> >>
