I think I understand.

Once split, ranges are never merged. My question was about whether the
runtime would ever merge two ranges again. It sounds like they can only be
processed as a whole or further split.

Is it appropriate to return the same range from splitFunction() if it
*cannot* be split further? Or is there some other indicator / return value
we should use to indicate it can't be split?

Peter

On Tue, Apr 24, 2018 at 4:18 PM, Eugene Kirpichov <[email protected]>
wrote:

> I think the first question that has to be answered here is: Is it possible
> *at all* to implement parallel reading of RFC 4180?
>
> I.e., given a start byte offset, is it possible to reliably locate the
> first record boundary at or after that offset while scanning only a small
> amount of data?
> If it is possible, then that's what the SDF (or BoundedSource, etc.)
> should do - split into blind byte ranges, and use this algorithm to assign
> consistent meaning to byte ranges.
>
> To answer your questions 2 and 3: think of it this way.
> The SDF's ProcessElement takes an element and a restriction.
> ProcessElement must make only one promise: that it will correctly perform
> exactly the work associated with this element and restriction.
> The challenge is that the restriction can become smaller while
> ProcessElement runs - in which case, ProcessElement must also do fewer
> work. This can happen concurrently to ProcessElement running, so really the
> guarantee should be rephrased as "By the time ProcessElement completes, it
> should have performed exactly the work associated with the element and
> tracker.currentRestriction() at the moment of completion".
>
> This is all that is asked of ProcessElement. If Beam decides to ask the
> tracker to split itself into two ranges (making the current one - "primary"
> - smaller, and producing an additional one - "residual"), Beam of course
> takes the responsibility for executing the residual restriction somewhere
> else: it won't be lost.
>
> E.g. if ProcessElement was invoked with [a, b), but while it was invoked
> it was split into [a, b-100) and [b-100, b), then the current
> ProcessElement call must process [a, b-100), and Beam guarantees that it
> will fire up another ProcessElement call for [b-100, b) (Of course, both of
> these calls may end up being recursively split further).
>
> I'm not quite sure what you mean by "recombining" - please let me know if
> the explanation above makes things clear enough or not.
>
> On Tue, Apr 24, 2018 at 2:55 PM Peter Brumblay <[email protected]>
> wrote:
>
>> Hi Eugene, thank you for the feedback!
>>
>> TextIO.read() can't handle RFC 4180 in full (at least I don't think it
>> does!) - we have a lot of source data with embedded newlines. These
>> records get split improperly because TextIO.read() blindly looks for
>> newline characters. We need something which natively understands embedded
>> newlines in quoted fields ... like so:
>>
>> foo,bar,"this has an\r\nembedded newline",192928\r\n
>>
>> As for the other feedback:
>>
>> 1. Claiming the entire range - yes, I figured this was a major mistake.
>> Thanks for the confirmation.
>> 2. The code for initial splitting of the restriction seems very complex...
>>
>> Follow-up question: if I process (and claim) only a subset of a range,
>> say [a, b - 100), and [b - 100, b) represents an incomplete block, will
>> beam SDF dynamically recombine ranges such that [b - 100, b + N) is sent to
>> a worker with a (potentially) complete block?
>>
>> 3. Fine-tuning the evenness .... if beam SDF re-combines ranges for split
>> blocks then it sounds like arbitrary splits in splitFunction() makes more
>> sense.
>>
>> I'll try to take another pass at this with your feedback in mind.
>>
>> Peter
>>
>>
>>
>> On Tue, Apr 24, 2018 at 3:08 PM, Eugene Kirpichov <[email protected]>
>> wrote:
>>
>>> Hi Peter,
>>>
>>> Thanks for experimenting with SDF! However, in this particular case: any
>>> reason why you can't just use TextIO.read() and parse each line as CSV?
>>> Seems like that would require considerably less code.
>>>
>>> A few comments on this code per se:
>>> - The ProcessElement implementation immediately claims the entire range,
>>> which means that there can be no dynamic splitting and the code behaves
>>> equivalently to a regular DoFn
>>> - The code for initial splitting of the restriction seems very complex -
>>> can you just split it blindly into a bunch of byte ranges of about equal
>>> size? Looking at the actual data while splitting should be never necessary
>>> - you should be able to just look at the file size (say, 100MB) and split
>>> it into a bunch of splits, say, [0, 10MB), [10MB, 20MB) etc.
>>> - It seems that the splitting code tries to align splits with record
>>> boundaries - this is not useful: it does not matter whether the split
>>> boundaries fall onto record boundaries or not; instead, the reading code
>>> should be able to read an arbitrary range of bytes in a meaningful way.
>>> That typically means that reading [a, b) means "start at the first record
>>> boundary located at or after "a", end at the first record boundary located
>>> at or after "b""
>>> - Fine-tuning the evenness of initial splitting is also not useful:
>>> dynamic splitting will even things out anyway; moreover, even if you are
>>> able to achieve an equal amount of data read by different restrictions, it
>>> does not translate into equal time to process the data with the ParDo's
>>> fused into the same bundle (and that time is unpredictable).
>>>
>>>
>>> On Tue, Apr 24, 2018 at 1:24 PM Peter Brumblay <[email protected]>
>>> wrote:
>>>
>>>> Hi All,
>>>>
>>>> I noticed that there is no support for CSV file reading (e.g. rfc4180)
>>>> in Apache Beam - at least no native transform. There's an issue to add this
>>>> support: https://issues.apache.org/jira/browse/BEAM-51.
>>>>
>>>> I've seen examples which use the apache commons csv parser. I took a
>>>> shot at implementing a SplittableDoFn transform. I have the full code and
>>>> some questions in a gist here: https://gist.github.com/pbrumblay/
>>>> 9474dcc6cd238c3f1d26d869a20e863d.
>>>>
>>>> I suspect it could be improved quite a bit. If anyone has time to
>>>> provide feedback I would really appreciate it.
>>>>
>>>> Regards,
>>>>
>>>> Peter Brumblay
>>>> Fearless Technology Group, Inc.
>>>>
>>>>
>>

Reply via email to