Re: Golang Beam SDK GroupByKey not working when running locally
Hi 8, This is a bug in the Go SDK regarding direct output after GBK. As a workaround, if you change this signature func(word string, values func(*int) bool) (string, int) to func(word string, values func(*int) bool, emit func (string, int)) and emits the result instead of returning it, it works. Opened https://issues.apache.org/jira/browse/BEAM-3978. Thanks, Henning PS: Btw, the minimal_wordcount doesn't log the direct.Execute error (among other things) and is there mainly to mimic the progression in Java. It's not a good model for real pipelines. On Fri, Mar 30, 2018 at 5:21 PM 8 Gianfortoni <8...@tokentransit.com> wrote: > Oh, I forgot to mention that I pulled from master with this commit as > latest: > https://github.com/apache/beam/commit/95a524e52606de1467b5d8b2cc99263b8a111a8d > > > > On Fri, Mar 30, 2018, 5:09 PM 8 Gianfortoni <8...@tokentransit.com> wrote: > >> Fix cc to correct Holden. >> >> On Fri, Mar 30, 2018 at 5:05 PM, 8 Gianfortoni <8...@tokentransit.com> >> wrote: >> >>> Hi dev team, >>> >>> I'm having a lot of trouble running any pipeline that calls GroupByKey. >>> Maybe I'm doing something wrong, but for some reason I cannot get >>> GroupByKey not to crash the program. >>> >>> I have edited wordcount.go and minimal_wordcount.go to work similarly >>> to my own program, and it crashes for those as well. >>> >>> Here is the snippet of code I added to minimal_wordcount (full source >>> attached): >>> >>> // Concept #3: Invoke the stats.Count transform on our >>> PCollection of >>> >>> // individual words. The Count transform returns a new >>> PCollection of >>> >>> // key/value pairs, where each key represents a unique word in >>> the text. >>> >>> // The associated value is the occurrence count for that word. >>> >>> singles := beam.ParDo(s, func(word string) (string, int) { >>> >>> return word, 1 >>> >>> }, words) >>> >>> >>> grouped := beam.GroupByKey(s, singles) >>> >>> >>> counted := beam.ParDo(s, func(word string, values func(*int) >>> bool) (string, int) { >>> >>> sum := 0 >>> >>> for { >>> >>> var i int >>> >>> if values() { >>> >>> sum = sum + i >>> >>> } else { >>> >>> break >>> >>> } >>> >>> } >>> >>> return word, sum >>> >>> }, grouped) >>> >>> >>> // Use a ParDo to format our PCollection of word counts into a >>> printable >>> >>> // string, suitable for writing to an output file. When each >>> element >>> >>> // produces exactly one element, the DoFn can simply return it. >>> >>> formatted := beam.ParDo(s, func(w string, c int) string { >>> >>> return fmt.Sprintf("%s: %v", w, c) >>> >>> }, counted) >>> >>> >>> >>> I also attached the full source code and output that happens when I run >>> both wordcount and minimal_wordcount. >>> >>> Am I just doing something wrong here? In any case, it seems >>> inappropriate to panic during runtime without any debugging information >>> (save a stack trace, but only if you call beamx.Run() as opposed to >>> direct.Execute(), which just dies without any info. >>> >>> Thank you so much, >>> 8 >>> >> >>
Re: Golang Beam SDK GroupByKey not working when running locally
Oh, I forgot to mention that I pulled from master with this commit as latest: https://github.com/apache/beam/commit/95a524e52606de1467b5d8b2cc99263b8a111a8d On Fri, Mar 30, 2018, 5:09 PM 8 Gianfortoni <8...@tokentransit.com> wrote: > Fix cc to correct Holden. > > On Fri, Mar 30, 2018 at 5:05 PM, 8 Gianfortoni <8...@tokentransit.com> wrote: > >> Hi dev team, >> >> I'm having a lot of trouble running any pipeline that calls GroupByKey. >> Maybe I'm doing something wrong, but for some reason I cannot get >> GroupByKey not to crash the program. >> >> I have edited wordcount.go and minimal_wordcount.go to work similarly to >> my own program, and it crashes for those as well. >> >> Here is the snippet of code I added to minimal_wordcount (full source >> attached): >> >> // Concept #3: Invoke the stats.Count transform on our >> PCollection of >> >> // individual words. The Count transform returns a new >> PCollection of >> >> // key/value pairs, where each key represents a unique word in >> the text. >> >> // The associated value is the occurrence count for that word. >> >> singles := beam.ParDo(s, func(word string) (string, int) { >> >> return word, 1 >> >> }, words) >> >> >> grouped := beam.GroupByKey(s, singles) >> >> >> counted := beam.ParDo(s, func(word string, values func(*int) >> bool) (string, int) { >> >> sum := 0 >> >> for { >> >> var i int >> >> if values() { >> >> sum = sum + i >> >> } else { >> >> break >> >> } >> >> } >> >> return word, sum >> >> }, grouped) >> >> >> // Use a ParDo to format our PCollection of word counts into a >> printable >> >> // string, suitable for writing to an output file. When each >> element >> >> // produces exactly one element, the DoFn can simply return it. >> >> formatted := beam.ParDo(s, func(w string, c int) string { >> >> return fmt.Sprintf("%s: %v", w, c) >> >> }, counted) >> >> >> >> I also attached the full source code and output that happens when I run >> both wordcount and minimal_wordcount. >> >> Am I just doing something wrong here? In any case, it seems inappropriate >> to panic during runtime without any debugging information (save a stack >> trace, but only if you call beamx.Run() as opposed to direct.Execute(), >> which just dies without any info. >> >> Thank you so much, >> 8 >> > >
Re: Golang Beam SDK GroupByKey not working when running locally
Fix cc to correct Holden. On Fri, Mar 30, 2018 at 5:05 PM, 8 Gianfortoni <8...@tokentransit.com> wrote: > Hi dev team, > > I'm having a lot of trouble running any pipeline that calls GroupByKey. > Maybe I'm doing something wrong, but for some reason I cannot get > GroupByKey not to crash the program. > > I have edited wordcount.go and minimal_wordcount.go to work similarly to > my own program, and it crashes for those as well. > > Here is the snippet of code I added to minimal_wordcount (full source > attached): > > // Concept #3: Invoke the stats.Count transform on our > PCollection of > > // individual words. The Count transform returns a new > PCollection of > > // key/value pairs, where each key represents a unique word in > the text. > > // The associated value is the occurrence count for that word. > > singles := beam.ParDo(s, func(word string) (string, int) { > > return word, 1 > > }, words) > > > grouped := beam.GroupByKey(s, singles) > > > counted := beam.ParDo(s, func(word string, values func(*int) > bool) (string, int) { > > sum := 0 > > for { > > var i int > > if values() { > > sum = sum + i > > } else { > > break > > } > > } > > return word, sum > > }, grouped) > > > // Use a ParDo to format our PCollection of word counts into a > printable > > // string, suitable for writing to an output file. When each > element > > // produces exactly one element, the DoFn can simply return it. > > formatted := beam.ParDo(s, func(w string, c int) string { > > return fmt.Sprintf("%s: %v", w, c) > > }, counted) > > > > I also attached the full source code and output that happens when I run > both wordcount and minimal_wordcount. > > Am I just doing something wrong here? In any case, it seems inappropriate > to panic during runtime without any debugging information (save a stack > trace, but only if you call beamx.Run() as opposed to direct.Execute(), > which just dies without any info. > > Thank you so much, > 8 >