Re: CSVSplitter - Splittable DoFn

2018-06-18 Thread Robert Bradshaw
Anecdotal evidence is that most people are reading the csv files
line-by-line with TextIO and then parsing into columns in a subsequent
DoFn, ignoring (or asserting) that quoted newlines won't occur in their
data.

On Mon, Jun 18, 2018 at 11:27 AM Austin Bennett 
wrote:

> Hi Beam Users/Dev,
>
> How are people handling currently handling CSVs as input to Beam (or not
> really doing so)?
>
> I see the things listed at the start of this thread -- any others?
>
> I have many batch workflows involve getting multi-GB CSV files from third
> party data aggregators (ex: hourly) and ingesting.  Currently this goes to
> S3/Redshift, and have written some spark so s3/Parquet.  It'd be great to
> take the csv.gz and write to BigQuery.  Is Beam not up to the task yet (and
> then should use something else and transform to newline json, Avro, parquet
> on GS and run bq load from there)?  Is there much thought on development to
> support/formalize these workflows?
>
> Thanks for any additional info beyond what is already in this thread (and
> thanks to Peter for prelim conversation),
>
> Austin
>
>
>
>
> On Wed, Apr 25, 2018 at 1:01 PM, Peter Brumblay 
> wrote:
>
>> This blog post was an excellent find. If I had infinite time I'd take a
>> stab at implementing this. They basically outline an algorithm which
>> *might* be appropriate for a generalized solution. It certainly beats my
>> "try to parse 3 records and if you do pretend you're good" method.
>>
>> Peter
>>
>> On Tue, Apr 24, 2018 at 4:46 PM, Eugene Kirpichov 
>> wrote:
>>
>>> Actually, you're right, this is not a pathological case. If we take a
>>> regular 1TB-sized CSV file that actually doesn't have any quotes, and start
>>> looking somewhere in the middle of it, there is no way to know whether
>>> we're currently inside or outside quotes without scanning the whole file -
>>> in theory there might be a quote lurking a few GB back. I suppose this can
>>> be addressed with specifying limits on field sizes in bytes: e.g. with a
>>> limit of 1kb, if there's no quotes in the preceding 1kb, then we're
>>> definitely in an unquoted context. However, if there is a quote, it may be
>>> either opening or closing the quoted context. There might be some way to
>>> resolve the ambiguity,
>>> https://blog.etleap.com/2016/11/27/distributed-csv-parsing/ seems to
>>> discuss this in detail.
>>>
>>> On Tue, Apr 24, 2018 at 3:26 PM Eugene Kirpichov 
>>> wrote:
>>>
 Robert - you're right, but this is a pathological case. It signals that
 there *might* be cases where we'll need to scan the whole file, however for
 practical purposes it's more important whether we need to scan the whole
 file in *all* (or most) cases - i.e. whether no amount of backward scanning
 of a non-pathological file can give us confidence that we're truly located
 a record boundary.

 On Tue, Apr 24, 2018 at 3:21 PM Robert Bradshaw 
 wrote:

> On Tue, Apr 24, 2018 at 3:18 PM Eugene Kirpichov  >
> wrote:
>
> > I think the first question that has to be answered here is: Is it
> possible *at all* to implement parallel reading of RFC 4180?
>
> No. Consider a multi-record CSV file with no quotes. Placing a quote
> at the
> start and end gives a new CSV file with exactly one element.
>
> > I.e., given a start byte offset, is it possible to reliably locate
> the
> first record boundary at or after that offset while scanning only a
> small
> amount of data?
> > If it is possible, then that's what the SDF (or BoundedSource, etc.)
> should do - split into blind byte ranges, and use this algorithm to
> assign
> consistent meaning to byte ranges.
>
> > To answer your questions 2 and 3: think of it this way.
> > The SDF's ProcessElement takes an element and a restriction.
> > ProcessElement must make only one promise: that it will correctly
> perform
> exactly the work associated with this element and restriction.
> > The challenge is that the restriction can become smaller while
> ProcessElement runs - in which case, ProcessElement must also do fewer
> work. This can happen concurrently to ProcessElement running, so
> really the
> guarantee should be rephrased as "By the time ProcessElement
> completes, it
> should have performed exactly the work associated with the element and
> tracker.currentRestriction() at the moment of completion".
>
> > This is all that is asked of ProcessElement. If Beam decides to ask
> the
> tracker to split itself into two ranges (making the current one -
> "primary"
> - smaller, and producing an additional one - "residual"), Beam of
> course
> takes the responsibility for executing the residual restriction
> somewhere
> else: it won't be lost.
>
> > E.g. if ProcessElement was invoked with [a, b), but while it was
> invoked
> it was split into [a, b-100) and 

Re: CSVSplitter - Splittable DoFn

2018-06-18 Thread Austin Bennett
Hi Beam Users/Dev,

How are people handling currently handling CSVs as input to Beam (or not
really doing so)?

I see the things listed at the start of this thread -- any others?

I have many batch workflows involve getting multi-GB CSV files from third
party data aggregators (ex: hourly) and ingesting.  Currently this goes to
S3/Redshift, and have written some spark so s3/Parquet.  It'd be great to
take the csv.gz and write to BigQuery.  Is Beam not up to the task yet (and
then should use something else and transform to newline json, Avro, parquet
on GS and run bq load from there)?  Is there much thought on development to
support/formalize these workflows?

Thanks for any additional info beyond what is already in this thread (and
thanks to Peter for prelim conversation),

Austin




On Wed, Apr 25, 2018 at 1:01 PM, Peter Brumblay 
wrote:

> This blog post was an excellent find. If I had infinite time I'd take a
> stab at implementing this. They basically outline an algorithm which
> *might* be appropriate for a generalized solution. It certainly beats my
> "try to parse 3 records and if you do pretend you're good" method.
>
> Peter
>
> On Tue, Apr 24, 2018 at 4:46 PM, Eugene Kirpichov 
> wrote:
>
>> Actually, you're right, this is not a pathological case. If we take a
>> regular 1TB-sized CSV file that actually doesn't have any quotes, and start
>> looking somewhere in the middle of it, there is no way to know whether
>> we're currently inside or outside quotes without scanning the whole file -
>> in theory there might be a quote lurking a few GB back. I suppose this can
>> be addressed with specifying limits on field sizes in bytes: e.g. with a
>> limit of 1kb, if there's no quotes in the preceding 1kb, then we're
>> definitely in an unquoted context. However, if there is a quote, it may be
>> either opening or closing the quoted context. There might be some way to
>> resolve the ambiguity, https://blog.etleap.com/2016/1
>> 1/27/distributed-csv-parsing/ seems to discuss this in detail.
>>
>> On Tue, Apr 24, 2018 at 3:26 PM Eugene Kirpichov 
>> wrote:
>>
>>> Robert - you're right, but this is a pathological case. It signals that
>>> there *might* be cases where we'll need to scan the whole file, however for
>>> practical purposes it's more important whether we need to scan the whole
>>> file in *all* (or most) cases - i.e. whether no amount of backward scanning
>>> of a non-pathological file can give us confidence that we're truly located
>>> a record boundary.
>>>
>>> On Tue, Apr 24, 2018 at 3:21 PM Robert Bradshaw 
>>> wrote:
>>>
 On Tue, Apr 24, 2018 at 3:18 PM Eugene Kirpichov 
 wrote:

 > I think the first question that has to be answered here is: Is it
 possible *at all* to implement parallel reading of RFC 4180?

 No. Consider a multi-record CSV file with no quotes. Placing a quote at
 the
 start and end gives a new CSV file with exactly one element.

 > I.e., given a start byte offset, is it possible to reliably locate the
 first record boundary at or after that offset while scanning only a
 small
 amount of data?
 > If it is possible, then that's what the SDF (or BoundedSource, etc.)
 should do - split into blind byte ranges, and use this algorithm to
 assign
 consistent meaning to byte ranges.

 > To answer your questions 2 and 3: think of it this way.
 > The SDF's ProcessElement takes an element and a restriction.
 > ProcessElement must make only one promise: that it will correctly
 perform
 exactly the work associated with this element and restriction.
 > The challenge is that the restriction can become smaller while
 ProcessElement runs - in which case, ProcessElement must also do fewer
 work. This can happen concurrently to ProcessElement running, so really
 the
 guarantee should be rephrased as "By the time ProcessElement completes,
 it
 should have performed exactly the work associated with the element and
 tracker.currentRestriction() at the moment of completion".

 > This is all that is asked of ProcessElement. If Beam decides to ask
 the
 tracker to split itself into two ranges (making the current one -
 "primary"
 - smaller, and producing an additional one - "residual"), Beam of course
 takes the responsibility for executing the residual restriction
 somewhere
 else: it won't be lost.

 > E.g. if ProcessElement was invoked with [a, b), but while it was
 invoked
 it was split into [a, b-100) and [b-100, b), then the current
 ProcessElement call must process [a, b-100), and Beam guarantees that it
 will fire up another ProcessElement call for [b-100, b) (Of course,
 both of
 these calls may end up being recursively split further).

 > I'm not quite sure what you mean by "recombining" - please let me
 know if
 the explanation above makes things clear enough or not.

 > On 

Re: CSVSplitter - Splittable DoFn

2018-04-25 Thread Peter Brumblay
I think I understand.

Once split, ranges are never merged. My question was about whether the
runtime would ever merge two ranges again. It sounds like they can only be
processed as a whole or further split.

Is it appropriate to return the same range from splitFunction() if it
*cannot* be split further? Or is there some other indicator / return value
we should use to indicate it can't be split?

Peter

On Tue, Apr 24, 2018 at 4:18 PM, Eugene Kirpichov 
wrote:

> I think the first question that has to be answered here is: Is it possible
> *at all* to implement parallel reading of RFC 4180?
>
> I.e., given a start byte offset, is it possible to reliably locate the
> first record boundary at or after that offset while scanning only a small
> amount of data?
> If it is possible, then that's what the SDF (or BoundedSource, etc.)
> should do - split into blind byte ranges, and use this algorithm to assign
> consistent meaning to byte ranges.
>
> To answer your questions 2 and 3: think of it this way.
> The SDF's ProcessElement takes an element and a restriction.
> ProcessElement must make only one promise: that it will correctly perform
> exactly the work associated with this element and restriction.
> The challenge is that the restriction can become smaller while
> ProcessElement runs - in which case, ProcessElement must also do fewer
> work. This can happen concurrently to ProcessElement running, so really the
> guarantee should be rephrased as "By the time ProcessElement completes, it
> should have performed exactly the work associated with the element and
> tracker.currentRestriction() at the moment of completion".
>
> This is all that is asked of ProcessElement. If Beam decides to ask the
> tracker to split itself into two ranges (making the current one - "primary"
> - smaller, and producing an additional one - "residual"), Beam of course
> takes the responsibility for executing the residual restriction somewhere
> else: it won't be lost.
>
> E.g. if ProcessElement was invoked with [a, b), but while it was invoked
> it was split into [a, b-100) and [b-100, b), then the current
> ProcessElement call must process [a, b-100), and Beam guarantees that it
> will fire up another ProcessElement call for [b-100, b) (Of course, both of
> these calls may end up being recursively split further).
>
> I'm not quite sure what you mean by "recombining" - please let me know if
> the explanation above makes things clear enough or not.
>
> On Tue, Apr 24, 2018 at 2:55 PM Peter Brumblay 
> wrote:
>
>> Hi Eugene, thank you for the feedback!
>>
>> TextIO.read() can't handle RFC 4180 in full (at least I don't think it
>> does!) - we have a lot of source data with embedded newlines. These
>> records get split improperly because TextIO.read() blindly looks for
>> newline characters. We need something which natively understands embedded
>> newlines in quoted fields ... like so:
>>
>> foo,bar,"this has an\r\nembedded newline",192928\r\n
>>
>> As for the other feedback:
>>
>> 1. Claiming the entire range - yes, I figured this was a major mistake.
>> Thanks for the confirmation.
>> 2. The code for initial splitting of the restriction seems very complex...
>>
>> Follow-up question: if I process (and claim) only a subset of a range,
>> say [a, b - 100), and [b - 100, b) represents an incomplete block, will
>> beam SDF dynamically recombine ranges such that [b - 100, b + N) is sent to
>> a worker with a (potentially) complete block?
>>
>> 3. Fine-tuning the evenness  if beam SDF re-combines ranges for split
>> blocks then it sounds like arbitrary splits in splitFunction() makes more
>> sense.
>>
>> I'll try to take another pass at this with your feedback in mind.
>>
>> Peter
>>
>>
>>
>> On Tue, Apr 24, 2018 at 3:08 PM, Eugene Kirpichov 
>> wrote:
>>
>>> Hi Peter,
>>>
>>> Thanks for experimenting with SDF! However, in this particular case: any
>>> reason why you can't just use TextIO.read() and parse each line as CSV?
>>> Seems like that would require considerably less code.
>>>
>>> A few comments on this code per se:
>>> - The ProcessElement implementation immediately claims the entire range,
>>> which means that there can be no dynamic splitting and the code behaves
>>> equivalently to a regular DoFn
>>> - The code for initial splitting of the restriction seems very complex -
>>> can you just split it blindly into a bunch of byte ranges of about equal
>>> size? Looking at the actual data while splitting should be never necessary
>>> - you should be able to just look at the file size (say, 100MB) and split
>>> it into a bunch of splits, say, [0, 10MB), [10MB, 20MB) etc.
>>> - It seems that the splitting code tries to align splits with record
>>> boundaries - this is not useful: it does not matter whether the split
>>> boundaries fall onto record boundaries or not; instead, the reading code
>>> should be able to read an arbitrary range of bytes in a meaningful 

Re: CSVSplitter - Splittable DoFn

2018-04-24 Thread Eugene Kirpichov
Robert - you're right, but this is a pathological case. It signals that
there *might* be cases where we'll need to scan the whole file, however for
practical purposes it's more important whether we need to scan the whole
file in *all* (or most) cases - i.e. whether no amount of backward scanning
of a non-pathological file can give us confidence that we're truly located
a record boundary.

On Tue, Apr 24, 2018 at 3:21 PM Robert Bradshaw  wrote:

> On Tue, Apr 24, 2018 at 3:18 PM Eugene Kirpichov 
> wrote:
>
> > I think the first question that has to be answered here is: Is it
> possible *at all* to implement parallel reading of RFC 4180?
>
> No. Consider a multi-record CSV file with no quotes. Placing a quote at the
> start and end gives a new CSV file with exactly one element.
>
> > I.e., given a start byte offset, is it possible to reliably locate the
> first record boundary at or after that offset while scanning only a small
> amount of data?
> > If it is possible, then that's what the SDF (or BoundedSource, etc.)
> should do - split into blind byte ranges, and use this algorithm to assign
> consistent meaning to byte ranges.
>
> > To answer your questions 2 and 3: think of it this way.
> > The SDF's ProcessElement takes an element and a restriction.
> > ProcessElement must make only one promise: that it will correctly perform
> exactly the work associated with this element and restriction.
> > The challenge is that the restriction can become smaller while
> ProcessElement runs - in which case, ProcessElement must also do fewer
> work. This can happen concurrently to ProcessElement running, so really the
> guarantee should be rephrased as "By the time ProcessElement completes, it
> should have performed exactly the work associated with the element and
> tracker.currentRestriction() at the moment of completion".
>
> > This is all that is asked of ProcessElement. If Beam decides to ask the
> tracker to split itself into two ranges (making the current one - "primary"
> - smaller, and producing an additional one - "residual"), Beam of course
> takes the responsibility for executing the residual restriction somewhere
> else: it won't be lost.
>
> > E.g. if ProcessElement was invoked with [a, b), but while it was invoked
> it was split into [a, b-100) and [b-100, b), then the current
> ProcessElement call must process [a, b-100), and Beam guarantees that it
> will fire up another ProcessElement call for [b-100, b) (Of course, both of
> these calls may end up being recursively split further).
>
> > I'm not quite sure what you mean by "recombining" - please let me know if
> the explanation above makes things clear enough or not.
>
> > On Tue, Apr 24, 2018 at 2:55 PM Peter Brumblay  >
> wrote:
>
> >> Hi Eugene, thank you for the feedback!
>
> >> TextIO.read() can't handle RFC 4180 in full (at least I don't think it
> does!) - we have a lot of source data with embedded newlines. These records
> get split improperly because TextIO.read() blindly looks for newline
> characters. We need something which natively understands embedded newlines
> in quoted fields ... like so:
>
> >> foo,bar,"this has an\r\nembedded newline",192928\r\n
>
> >> As for the other feedback:
>
> >> 1. Claiming the entire range - yes, I figured this was a major mistake.
> Thanks for the confirmation.
> >> 2. The code for initial splitting of the restriction seems very
> complex...
>
> >> Follow-up question: if I process (and claim) only a subset of a range,
> say [a, b - 100), and [b - 100, b) represents an incomplete block, will
> beam SDF dynamically recombine ranges such that [b - 100, b + N) is sent to
> a worker with a (potentially) complete block?
>
> >> 3. Fine-tuning the evenness  if beam SDF re-combines ranges for
> split blocks then it sounds like arbitrary splits in splitFunction() makes
> more sense.
>
> >> I'll try to take another pass at this with your feedback in mind.
>
> >> Peter
>
>
>
> >> On Tue, Apr 24, 2018 at 3:08 PM, Eugene Kirpichov  >
> wrote:
>
> >>> Hi Peter,
>
> >>> Thanks for experimenting with SDF! However, in this particular case:
> any reason why you can't just use TextIO.read() and parse each line as CSV?
> Seems like that would require considerably less code.
>
> >>> A few comments on this code per se:
> >>> - The ProcessElement implementation immediately claims the entire
> range, which means that there can be no dynamic splitting and the code
> behaves equivalently to a regular DoFn
> >>> - The code for initial splitting of the restriction seems very complex
> - can you just split it blindly into a bunch of byte ranges of about equal
> size? Looking at the actual data while splitting should be never necessary
> - you should be able to just look at the file size (say, 100MB) and split
> it into a bunch of splits, say, [0, 10MB), [10MB, 20MB) etc.
> >>> - It seems that the splitting code tries to align splits with 

Re: CSVSplitter - Splittable DoFn

2018-04-24 Thread Robert Bradshaw
On Tue, Apr 24, 2018 at 3:18 PM Eugene Kirpichov 
wrote:

> I think the first question that has to be answered here is: Is it
possible *at all* to implement parallel reading of RFC 4180?

No. Consider a multi-record CSV file with no quotes. Placing a quote at the
start and end gives a new CSV file with exactly one element.

> I.e., given a start byte offset, is it possible to reliably locate the
first record boundary at or after that offset while scanning only a small
amount of data?
> If it is possible, then that's what the SDF (or BoundedSource, etc.)
should do - split into blind byte ranges, and use this algorithm to assign
consistent meaning to byte ranges.

> To answer your questions 2 and 3: think of it this way.
> The SDF's ProcessElement takes an element and a restriction.
> ProcessElement must make only one promise: that it will correctly perform
exactly the work associated with this element and restriction.
> The challenge is that the restriction can become smaller while
ProcessElement runs - in which case, ProcessElement must also do fewer
work. This can happen concurrently to ProcessElement running, so really the
guarantee should be rephrased as "By the time ProcessElement completes, it
should have performed exactly the work associated with the element and
tracker.currentRestriction() at the moment of completion".

> This is all that is asked of ProcessElement. If Beam decides to ask the
tracker to split itself into two ranges (making the current one - "primary"
- smaller, and producing an additional one - "residual"), Beam of course
takes the responsibility for executing the residual restriction somewhere
else: it won't be lost.

> E.g. if ProcessElement was invoked with [a, b), but while it was invoked
it was split into [a, b-100) and [b-100, b), then the current
ProcessElement call must process [a, b-100), and Beam guarantees that it
will fire up another ProcessElement call for [b-100, b) (Of course, both of
these calls may end up being recursively split further).

> I'm not quite sure what you mean by "recombining" - please let me know if
the explanation above makes things clear enough or not.

> On Tue, Apr 24, 2018 at 2:55 PM Peter Brumblay 
wrote:

>> Hi Eugene, thank you for the feedback!

>> TextIO.read() can't handle RFC 4180 in full (at least I don't think it
does!) - we have a lot of source data with embedded newlines. These records
get split improperly because TextIO.read() blindly looks for newline
characters. We need something which natively understands embedded newlines
in quoted fields ... like so:

>> foo,bar,"this has an\r\nembedded newline",192928\r\n

>> As for the other feedback:

>> 1. Claiming the entire range - yes, I figured this was a major mistake.
Thanks for the confirmation.
>> 2. The code for initial splitting of the restriction seems very
complex...

>> Follow-up question: if I process (and claim) only a subset of a range,
say [a, b - 100), and [b - 100, b) represents an incomplete block, will
beam SDF dynamically recombine ranges such that [b - 100, b + N) is sent to
a worker with a (potentially) complete block?

>> 3. Fine-tuning the evenness  if beam SDF re-combines ranges for
split blocks then it sounds like arbitrary splits in splitFunction() makes
more sense.

>> I'll try to take another pass at this with your feedback in mind.

>> Peter



>> On Tue, Apr 24, 2018 at 3:08 PM, Eugene Kirpichov 
wrote:

>>> Hi Peter,

>>> Thanks for experimenting with SDF! However, in this particular case:
any reason why you can't just use TextIO.read() and parse each line as CSV?
Seems like that would require considerably less code.

>>> A few comments on this code per se:
>>> - The ProcessElement implementation immediately claims the entire
range, which means that there can be no dynamic splitting and the code
behaves equivalently to a regular DoFn
>>> - The code for initial splitting of the restriction seems very complex
- can you just split it blindly into a bunch of byte ranges of about equal
size? Looking at the actual data while splitting should be never necessary
- you should be able to just look at the file size (say, 100MB) and split
it into a bunch of splits, say, [0, 10MB), [10MB, 20MB) etc.
>>> - It seems that the splitting code tries to align splits with record
boundaries - this is not useful: it does not matter whether the split
boundaries fall onto record boundaries or not; instead, the reading code
should be able to read an arbitrary range of bytes in a meaningful way.
That typically means that reading [a, b) means "start at the first record
boundary located at or after "a", end at the first record boundary located
at or after "b""
>>> - Fine-tuning the evenness of initial splitting is also not useful:
dynamic splitting will even things out anyway; moreover, even if you are
able to achieve an equal amount of data read by different restrictions, it
does not translate into equal time to 

Re: CSVSplitter - Splittable DoFn

2018-04-24 Thread Peter Brumblay
Hi Eugene, thank you for the feedback!

TextIO.read() can't handle RFC 4180 in full (at least I don't think it
does!) - we have a lot of source data with embedded newlines. These records
get split improperly because TextIO.read() blindly looks for newline
characters. We need something which natively understands embedded newlines
in quoted fields ... like so:

foo,bar,"this has an\r\nembedded newline",192928\r\n

As for the other feedback:

1. Claiming the entire range - yes, I figured this was a major mistake.
Thanks for the confirmation.
2. The code for initial splitting of the restriction seems very complex...

Follow-up question: if I process (and claim) only a subset of a range, say
[a, b - 100), and [b - 100, b) represents an incomplete block, will beam
SDF dynamically recombine ranges such that [b - 100, b + N) is sent to a
worker with a (potentially) complete block?

3. Fine-tuning the evenness  if beam SDF re-combines ranges for split
blocks then it sounds like arbitrary splits in splitFunction() makes more
sense.

I'll try to take another pass at this with your feedback in mind.

Peter



On Tue, Apr 24, 2018 at 3:08 PM, Eugene Kirpichov 
wrote:

> Hi Peter,
>
> Thanks for experimenting with SDF! However, in this particular case: any
> reason why you can't just use TextIO.read() and parse each line as CSV?
> Seems like that would require considerably less code.
>
> A few comments on this code per se:
> - The ProcessElement implementation immediately claims the entire range,
> which means that there can be no dynamic splitting and the code behaves
> equivalently to a regular DoFn
> - The code for initial splitting of the restriction seems very complex -
> can you just split it blindly into a bunch of byte ranges of about equal
> size? Looking at the actual data while splitting should be never necessary
> - you should be able to just look at the file size (say, 100MB) and split
> it into a bunch of splits, say, [0, 10MB), [10MB, 20MB) etc.
> - It seems that the splitting code tries to align splits with record
> boundaries - this is not useful: it does not matter whether the split
> boundaries fall onto record boundaries or not; instead, the reading code
> should be able to read an arbitrary range of bytes in a meaningful way.
> That typically means that reading [a, b) means "start at the first record
> boundary located at or after "a", end at the first record boundary located
> at or after "b""
> - Fine-tuning the evenness of initial splitting is also not useful:
> dynamic splitting will even things out anyway; moreover, even if you are
> able to achieve an equal amount of data read by different restrictions, it
> does not translate into equal time to process the data with the ParDo's
> fused into the same bundle (and that time is unpredictable).
>
>
> On Tue, Apr 24, 2018 at 1:24 PM Peter Brumblay 
> wrote:
>
>> Hi All,
>>
>> I noticed that there is no support for CSV file reading (e.g. rfc4180) in
>> Apache Beam - at least no native transform. There's an issue to add this
>> support: https://issues.apache.org/jira/browse/BEAM-51.
>>
>> I've seen examples which use the apache commons csv parser. I took a shot
>> at implementing a SplittableDoFn transform. I have the full code and some
>> questions in a gist here: https://gist.github.com/pbrumblay/
>> 9474dcc6cd238c3f1d26d869a20e863d.
>>
>> I suspect it could be improved quite a bit. If anyone has time to provide
>> feedback I would really appreciate it.
>>
>> Regards,
>>
>> Peter Brumblay
>> Fearless Technology Group, Inc.
>>
>>


CSVSplitter - Splittable DoFn

2018-04-24 Thread Peter Brumblay
Hi All,

I noticed that there is no support for CSV file reading (e.g. rfc4180) in
Apache Beam - at least no native transform. There's an issue to add this
support: https://issues.apache.org/jira/browse/BEAM-51.

I've seen examples which use the apache commons csv parser. I took a shot
at implementing a SplittableDoFn transform. I have the full code and some
questions in a gist here: https://gist.github.com/pbrumblay/
9474dcc6cd238c3f1d26d869a20e863d.

I suspect it could be improved quite a bit. If anyone has time to provide
feedback I would really appreciate it.

Regards,

Peter Brumblay
Fearless Technology Group, Inc.