Awesome! Just going to add a few colleagues (who are subscribed anyhow) to make sure this hits the top of their inbox.
+Robert Burke <r...@google.com> +Chamikara Jayalath <chamik...@google.com> +Kyle Weaver <kcwea...@google.com> Kenn On Thu, Nov 4, 2021 at 11:40 PM Jeff Rhyason <jrhya...@gmail.com> wrote: > I'm interested to see the Go SDK work with the Spark runner. Based on the > instructions at https://beam.apache.org/get-started/quickstart-go/, I run > these commands and get the following failure: > > $ ./gradlew :runners:spark:2:job-server:runShadow > in another window: > $ cd sdks > $ go1.16.4 run ./go/examples/wordcount/wordcount.go --output foo --runner > spark --endpoint localhost:8099 > 2021/11/04 22:06:46 No environment config specified. Using default config: > 'apache/beam_go_sdk:2.35.0.dev' > 2021/11/04 22:06:46 Failed to execute job: generating model pipeline > failed to add scope tree: &{{CountWords root/CountWords} [{main.extractFn > 5: ParDo [In(Main): string <- {4: string/string GLO}] -> [Out: string -> > {5: string/string GLO}]}] [0xc000096cd0]} > caused by: > failed to add input kind: {main.extractFn 5: ParDo [In(Main): string <- > {4: string/string GLO}] -> [Out: string -> {5: string/string GLO}]} > caused by: > failed to serialize 5: ParDo [In(Main): string <- {4: string/string GLO}] > -> [Out: string -> {5: string/string GLO}] > caused by: > encoding userfn 5: ParDo [In(Main): string <- {4: string/string > GLO}] -> [Out: string -> {5: string/string GLO}] > bad userfn > caused by: > encoding structural DoFn &{<nil> 0xc000460ae8 <nil> > map[ProcessElement:0xc0004fcac0] map[]} > receiver type *main.extractFn must be registered > exit status 1 > > I was able to register that type, like this: > > diff --git a/sdks/go/examples/wordcount/wordcount.go > b/sdks/go/examples/wordcount/wordcount.go > index 4d54db9a2d..6db99d6220 100644 > --- a/sdks/go/examples/wordcount/wordcount.go > +++ b/sdks/go/examples/wordcount/wordcount.go > @@ -60,6 +60,7 @@ import ( > "flag" > "fmt" > "log" > + "reflect" > "regexp" > "strings" > > @@ -107,6 +108,7 @@ var ( > // by calling beam.RegisterFunction in an init() call. > func init() { > beam.RegisterFunction(formatFn) > + beam.RegisterDoFn(reflect.TypeOf((*extractFn)(nil)).Elem()) > } > > var ( > > > Then I encountered: > > $ go1.16.4 run ./go/examples/wordcount/wordcount.go --output foo --runner > spark --endpoint localhost:8099 > ... > 2021/11/04 23:07:26 (): java.lang.IllegalArgumentException: Unsupported > class file major version 55 > 2021/11/04 23:07:26 Job state: FAILED > 2021/11/04 23:07:26 Failed to execute job: job > go0job0101636092444228385176-jar-1105060726-caffd2f4_ef37c3a9-b2a8-47bd-b1c7-a6e2771263f2 > failed > exit status 1 > > > Switching to the Spark 3.0 job server changed things: > $ cd .. ; ./gradlew :runners:spark:3:job-server:runShadow > ... > $ cd sdks ; go1.16.4 run ./go/examples/wordcount/wordcount.go --output > foo --runner spark --endpoint localhost:8099 > ... > 2021/11/04 23:12:04 Staged binary artifact with token: > 2021/11/04 23:12:04 Submitted job: > go0job0101636092722590274154-jar-1105061204-7f1d879e_28e28e06-1331-41c6-8288-4dcfa87afd13 > 2021/11/04 23:12:04 Job state: STOPPED > 2021/11/04 23:12:04 Job state: STARTING > 2021/11/04 23:12:04 Job state: RUNNING > 2021/11/04 23:12:17 Job state: DONE > 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo: > urn:"beam:metric:element_count:v1" type:"beam:metrics:sum_int64:v1" > payload:"\x01" labels:{key:"PCOLLECTION" value:"n1"} > 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo: > urn:"beam:metric:element_count:v1" type:"beam:metrics:sum_int64:v1" > payload:"\x01" labels:{key:"PCOLLECTION" value:"n3"} > 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo: > urn:"beam:metric:element_count:v1" type:"beam:metrics:sum_int64:v1" > payload:"\x81\xdc\x01" labels:{key:"PCOLLECTION" value:"n5"} > 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo: > urn:"beam:metric:element_count:v1" type:"beam:metrics:sum_int64:v1" > payload:"\x01" labels:{key:"PCOLLECTION" value:"n2"} > 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo: > urn:"beam:metric:element_count:v1" type:"beam:metrics:sum_int64:v1" > payload:"\x95+" labels:{key:"PCOLLECTION" value:"n4"} > 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo: > urn:"beam:metric:element_count:v1" type:"beam:metrics:sum_int64:v1" > payload:"\x81\xdc\x01" labels:{key:"PCOLLECTION" value:"n6"} > 2021/11/04 23:12:17 unknown metric type > 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo: > urn:"beam:metric:sampled_byte_size:v1" > type:"beam:metrics:distribution_int64:v1" payload:"\x01\x01\x01\x01" > labels:{key:"PCOLLECTION" value:"n1"} > 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo: > urn:"beam:metric:sampled_byte_size:v1" > type:"beam:metrics:distribution_int64:v1" payload:"\x01222" > labels:{key:"PCOLLECTION" value:"n3"} > 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo: > urn:"beam:metric:sampled_byte_size:v1" > type:"beam:metrics:distribution_int64:v1" payload:"\x01222" > labels:{key:"PCOLLECTION" value:"n2"} > 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo: > urn:"beam:metric:sampled_byte_size:v1" > type:"beam:metrics:distribution_int64:v1" > payload:"\x88\x01\xd9\x05\x02\x0b" labels:{key:"PCOLLECTION" value:"n5"} > 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo: > urn:"beam:metric:sampled_byte_size:v1" > type:"beam:metrics:distribution_int64:v1" payload:"y\xfb\x16\x01=" > labels:{key:"PCOLLECTION" value:"n4"} > 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo: > urn:"beam:metric:sampled_byte_size:v1" > type:"beam:metrics:distribution_int64:v1" > payload:"\x81\xdc\x01\xff\xd5\"\x11\x1f" labels:{key:"PCOLLECTION" > value:"n6"} > 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo: > urn:"beam:metric:element_count:v1" type:"beam:metrics:sum_int64:v1" > payload:"\x8d%" labels:{key:"PCOLLECTION" value:"n9"} > 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo: > urn:"beam:metric:element_count:v1" type:"beam:metrics:sum_int64:v1" > payload:"\x8d%" labels:{key:"PCOLLECTION" value:"n10"} > 2021/11/04 23:12:17 unknown metric type > 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo: > urn:"beam:metric:element_count:v1" type:"beam:metrics:sum_int64:v1" > payload:"\x8d%" labels:{key:"PCOLLECTION" value:"n8"} > 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo: > urn:"beam:metric:element_count:v1" type:"beam:metrics:sum_int64:v1" > payload:"\x8d%" labels:{key:"PCOLLECTION" value:"n7"} > 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo: > urn:"beam:metric:sampled_byte_size:v1" > type:"beam:metrics:distribution_int64:v1" > payload:"\x8d%\xfa\xbf\x05\x04\xa8\x0c" labels:{key:"PCOLLECTION" > value:"n7"} > 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo: > urn:"beam:metric:sampled_byte_size:v1" > type:"beam:metrics:distribution_int64:v1" > payload:"\xbc\x02\x9b\x19\x06\x13" labels:{key:"PCOLLECTION" value:"n9"} > 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo: > urn:"beam:metric:sampled_byte_size:v1" > type:"beam:metrics:distribution_int64:v1" > payload:"\xb5\x02\xf1\x15\x05\x10" labels:{key:"PCOLLECTION" value:"n8"} > 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo: > urn:"beam:metric:sampled_byte_size:v1" > type:"beam:metrics:distribution_int64:v1" > payload:"\x8d%\xb3\xa7\x07\x14\"" labels:{key:"PCOLLECTION" value:"n10"} > 2021/11/04 23:12:17 unknown metric type > 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo: > urn:"beam:metric:element_count:v1" type:"beam:metrics:sum_int64:v1" > payload:"\x01" labels:{key:"PCOLLECTION" value:"n11"} > 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo: > urn:"beam:metric:sampled_byte_size:v1" > type:"beam:metrics:distribution_int64:v1" > payload:"\x01\xf2\xfa\x02\xf2\xfa\x02\xf2\xfa\x02" > labels:{key:"PCOLLECTION" value:"n11"} > > However misleading those failures are, the process exits successfully. I > have more to learn about where the output went. > > It's really neat to see this working. > > Would you be interested in PRs for these? > * Go examples to register all the types needed for other runners > * updating the Go Quick Start to use the Spark 3 runner so it plays better > with the embedded Spark cluster > > Jeff > > > > > > > > >