Re: Golang Beam SDK GroupByKey not working when running locally

2018-03-30 Thread Henning Rohde
Hi 8,

 This is a bug in the Go SDK regarding direct output after GBK. As a
workaround, if you change this signature

func(word string, values func(*int) bool) (string, int)

to

func(word string, values func(*int) bool, emit func (string, int))

and emits the result instead of returning it, it works. Opened
https://issues.apache.org/jira/browse/BEAM-3978.

Thanks,
 Henning

PS: Btw, the minimal_wordcount doesn't log the direct.Execute error (among
other things) and is there mainly to mimic the progression in Java. It's
not a good model for real pipelines.



On Fri, Mar 30, 2018 at 5:21 PM 8 Gianfortoni <8...@tokentransit.com> wrote:

> Oh, I forgot to mention that I pulled from master with this commit as
> latest:
> https://github.com/apache/beam/commit/95a524e52606de1467b5d8b2cc99263b8a111a8d
>
>
>
> On Fri, Mar 30, 2018, 5:09 PM 8 Gianfortoni <8...@tokentransit.com> wrote:
>
>> Fix cc to correct Holden.
>>
>> On Fri, Mar 30, 2018 at 5:05 PM, 8 Gianfortoni <8...@tokentransit.com>
>> wrote:
>>
>>> Hi dev team,
>>>
>>> I'm having a lot of trouble running any pipeline that calls GroupByKey.
>>> Maybe I'm doing something wrong, but for some reason I cannot get
>>> GroupByKey not to crash the program.
>>>
>>> I have edited wordcount.go and minimal_wordcount.go to work similarly
>>> to my own program, and it crashes for those as well.
>>>
>>> Here is the snippet of code I added to minimal_wordcount (full source
>>> attached):
>>>
>>> // Concept #3: Invoke the stats.Count transform on our
>>> PCollection of
>>>
>>> // individual words. The Count transform returns a new
>>> PCollection of
>>>
>>> // key/value pairs, where each key represents a unique word in
>>> the text.
>>>
>>> // The associated value is the occurrence count for that word.
>>>
>>> singles := beam.ParDo(s, func(word string) (string, int) {
>>>
>>> return word, 1
>>>
>>> }, words)
>>>
>>>
>>> grouped := beam.GroupByKey(s, singles)
>>>
>>>
>>> counted := beam.ParDo(s, func(word string, values func(*int)
>>> bool) (string, int) {
>>>
>>> sum := 0
>>>
>>> for {
>>>
>>> var i int
>>>
>>> if values() {
>>>
>>> sum = sum + i
>>>
>>> } else {
>>>
>>> break
>>>
>>> }
>>>
>>> }
>>>
>>> return word, sum
>>>
>>> }, grouped)
>>>
>>>
>>> // Use a ParDo to format our PCollection of word counts into a
>>> printable
>>>
>>> // string, suitable for writing to an output file. When each
>>> element
>>>
>>> // produces exactly one element, the DoFn can simply return it.
>>>
>>> formatted := beam.ParDo(s, func(w string, c int) string {
>>>
>>> return fmt.Sprintf("%s: %v", w, c)
>>>
>>> }, counted)
>>>
>>>
>>>
>>> I also attached the full source code and output that happens when I run
>>> both wordcount and minimal_wordcount.
>>>
>>> Am I just doing something wrong here? In any case, it seems
>>> inappropriate to panic during runtime without any debugging information
>>> (save a stack trace, but only if you call beamx.Run() as opposed to
>>> direct.Execute(), which just dies without any info.
>>>
>>> Thank you so much,
>>> 8
>>>
>>
>>


Re: Golang Beam SDK GroupByKey not working when running locally

2018-03-30 Thread 8 Gianfortoni
Oh, I forgot to mention that I pulled from master with this commit as
latest:
https://github.com/apache/beam/commit/95a524e52606de1467b5d8b2cc99263b8a111a8d



On Fri, Mar 30, 2018, 5:09 PM 8 Gianfortoni <8...@tokentransit.com> wrote:

> Fix cc to correct Holden.
>
> On Fri, Mar 30, 2018 at 5:05 PM, 8 Gianfortoni <8...@tokentransit.com> wrote:
>
>> Hi dev team,
>>
>> I'm having a lot of trouble running any pipeline that calls GroupByKey.
>> Maybe I'm doing something wrong, but for some reason I cannot get
>> GroupByKey not to crash the program.
>>
>> I have edited wordcount.go and minimal_wordcount.go to work similarly to
>> my own program, and it crashes for those as well.
>>
>> Here is the snippet of code I added to minimal_wordcount (full source
>> attached):
>>
>> // Concept #3: Invoke the stats.Count transform on our
>> PCollection of
>>
>> // individual words. The Count transform returns a new
>> PCollection of
>>
>> // key/value pairs, where each key represents a unique word in
>> the text.
>>
>> // The associated value is the occurrence count for that word.
>>
>> singles := beam.ParDo(s, func(word string) (string, int) {
>>
>> return word, 1
>>
>> }, words)
>>
>>
>> grouped := beam.GroupByKey(s, singles)
>>
>>
>> counted := beam.ParDo(s, func(word string, values func(*int)
>> bool) (string, int) {
>>
>> sum := 0
>>
>> for {
>>
>> var i int
>>
>> if values() {
>>
>> sum = sum + i
>>
>> } else {
>>
>> break
>>
>> }
>>
>> }
>>
>> return word, sum
>>
>> }, grouped)
>>
>>
>> // Use a ParDo to format our PCollection of word counts into a
>> printable
>>
>> // string, suitable for writing to an output file. When each
>> element
>>
>> // produces exactly one element, the DoFn can simply return it.
>>
>> formatted := beam.ParDo(s, func(w string, c int) string {
>>
>> return fmt.Sprintf("%s: %v", w, c)
>>
>> }, counted)
>>
>>
>>
>> I also attached the full source code and output that happens when I run
>> both wordcount and minimal_wordcount.
>>
>> Am I just doing something wrong here? In any case, it seems inappropriate
>> to panic during runtime without any debugging information (save a stack
>> trace, but only if you call beamx.Run() as opposed to direct.Execute(),
>> which just dies without any info.
>>
>> Thank you so much,
>> 8
>>
>
>


Re: Golang Beam SDK GroupByKey not working when running locally

2018-03-30 Thread 8 Gianfortoni
Fix cc to correct Holden.

On Fri, Mar 30, 2018 at 5:05 PM, 8 Gianfortoni <8...@tokentransit.com> wrote:

> Hi dev team,
>
> I'm having a lot of trouble running any pipeline that calls GroupByKey.
> Maybe I'm doing something wrong, but for some reason I cannot get
> GroupByKey not to crash the program.
>
> I have edited wordcount.go and minimal_wordcount.go to work similarly to
> my own program, and it crashes for those as well.
>
> Here is the snippet of code I added to minimal_wordcount (full source
> attached):
>
> // Concept #3: Invoke the stats.Count transform on our
> PCollection of
>
> // individual words. The Count transform returns a new
> PCollection of
>
> // key/value pairs, where each key represents a unique word in
> the text.
>
> // The associated value is the occurrence count for that word.
>
> singles := beam.ParDo(s, func(word string) (string, int) {
>
> return word, 1
>
> }, words)
>
>
> grouped := beam.GroupByKey(s, singles)
>
>
> counted := beam.ParDo(s, func(word string, values func(*int)
> bool) (string, int) {
>
> sum := 0
>
> for {
>
> var i int
>
> if values() {
>
> sum = sum + i
>
> } else {
>
> break
>
> }
>
> }
>
> return word, sum
>
> }, grouped)
>
>
> // Use a ParDo to format our PCollection of word counts into a
> printable
>
> // string, suitable for writing to an output file. When each
> element
>
> // produces exactly one element, the DoFn can simply return it.
>
> formatted := beam.ParDo(s, func(w string, c int) string {
>
> return fmt.Sprintf("%s: %v", w, c)
>
> }, counted)
>
>
>
> I also attached the full source code and output that happens when I run
> both wordcount and minimal_wordcount.
>
> Am I just doing something wrong here? In any case, it seems inappropriate
> to panic during runtime without any debugging information (save a stack
> trace, but only if you call beamx.Run() as opposed to direct.Execute(),
> which just dies without any info.
>
> Thank you so much,
> 8
>


Golang Beam SDK GroupByKey not working when running locally

2018-03-30 Thread 8 Gianfortoni
Hi dev team,

I'm having a lot of trouble running any pipeline that calls GroupByKey.
Maybe I'm doing something wrong, but for some reason I cannot get
GroupByKey not to crash the program.

I have edited wordcount.go and minimal_wordcount.go to work similarly to my
own program, and it crashes for those as well.

Here is the snippet of code I added to minimal_wordcount (full source
attached):

// Concept #3: Invoke the stats.Count transform on our PCollection
of

// individual words. The Count transform returns a new PCollection
of

// key/value pairs, where each key represents a unique word in the
text.

// The associated value is the occurrence count for that word.

singles := beam.ParDo(s, func(word string) (string, int) {

return word, 1

}, words)


grouped := beam.GroupByKey(s, singles)


counted := beam.ParDo(s, func(word string, values func(*int) bool)
(string, int) {

sum := 0

for {

var i int

if values() {

sum = sum + i

} else {

break

}

}

return word, sum

}, grouped)


// Use a ParDo to format our PCollection of word counts into a
printable

// string, suitable for writing to an output file. When each element

// produces exactly one element, the DoFn can simply return it.

formatted := beam.ParDo(s, func(w string, c int) string {

return fmt.Sprintf("%s: %v", w, c)

}, counted)



I also attached the full source code and output that happens when I run
both wordcount and minimal_wordcount.

Am I just doing something wrong here? In any case, it seems inappropriate
to panic during runtime without any debugging information (save a stack
trace, but only if you call beamx.Run() as opposed to direct.Execute(),
which just dies without any info.

Thank you so much,
8
[{6: KV/GW/KV}]
[{10: KV/GW/KV}]
2018/03/30 16:32:15 Pipeline:
2018/03/30 16:32:15 Nodes: {1: []uint8/GW/bytes}
{2: string/GW/bytes}
{3: string/GW/bytes}
{4: string/GW/bytes}
{5: string/GW/bytes}
{6: KV/GW/KV}
{7: CoGBK/GW/CoGBK}
{8: KV/GW/KV}
{9: string/GW/bytes}
{10: KV/GW/KV}
{11: CoGBK/GW/CoGBK}
Edges: 1: Impulse [] -> [Out: []uint8 -> {1: []uint8/GW/bytes}]
2: ParDo [In(Main): []uint8 <- {1: []uint8/GW/bytes}] -> [Out: T -> {2: 
string/GW/bytes}]
3: ParDo [In(Main): string <- {2: string/GW/bytes}] -> [Out: string -> {3: 
string/GW/bytes}]
4: ParDo [In(Main): string <- {3: string/GW/bytes}] -> [Out: string -> {4: 
string/GW/bytes}]
5: ParDo [In(Main): string <- {4: string/GW/bytes}] -> [Out: string -> {5: 
string/GW/bytes}]
6: ParDo [In(Main): string <- {5: string/GW/bytes}] -> [Out: KV -> 
{6: KV/GW/KV}]
7: CoGBK [In(Main): KV <- {6: 
KV/GW/KV}] -> [Out: CoGBK -> {7: 
CoGBK/GW/CoGBK}]
8: ParDo [In(Main): CoGBK <- {7: 
CoGBK/GW/CoGBK}] -> [Out: KV -> {8: 
KV/GW/KV}]
9: ParDo [In(Main): KV <- {8: 
KV/GW/KV}] -> [Out: string -> {9: 
string/GW/bytes}]
10: ParDo [In(Main): T <- {9: string/GW/bytes}] -> [Out: KV -> {10: 
KV/GW/KV}]
11: CoGBK [In(Main): KV <- {10: 
KV/GW/KV}] -> [Out: CoGBK -> {11: 
CoGBK/GW/CoGBK}]
12: ParDo [In(Main): CoGBK <- {11: 
CoGBK/GW/CoGBK}] -> []
2018/03/30 16:32:16 Reading from 
gs://apache-beam-samples/shakespeare/1kinghenryiv.txt
2018/03/30 16:32:16 Reading from 
gs://apache-beam-samples/shakespeare/1kinghenryvi.txt
2018/03/30 16:32:17 Reading from 
gs://apache-beam-samples/shakespeare/2kinghenryiv.txt
2018/03/30 16:32:17 Reading from 
gs://apache-beam-samples/shakespeare/2kinghenryvi.txt
2018/03/30 16:32:18 Reading from 
gs://apache-beam-samples/shakespeare/3kinghenryvi.txt
2018/03/30 16:32:18 Reading from 
gs://apache-beam-samples/shakespeare/allswellthatendswell.txt
2018/03/30 16:32:19 Reading from 
gs://apache-beam-samples/shakespeare/antonyandcleopatra.txt
2018/03/30 16:32:19 Reading from 
gs://apache-beam-samples/shakespeare/asyoulikeit.txt
2018/03/30 16:32:19 Reading from 
gs://apache-beam-samples/shakespeare/comedyoferrors.txt
2018/03/30 16:32:20 Reading from 
gs://apache-beam-samples/shakespeare/coriolanus.txt
2018/03/30 16:32:20 Reading from