Hi Robert! Your fix was much more thorough than what I was cooking up. Thanks for such a comprehensive fix.
Also, thanks to you and Kyle for explaining the details of the other things I observed. This is working very well now! Jeff On Sun, Nov 14, 2021 at 4:04 PM Robert Burke <[email protected]> wrote: > With the 2.35.0 cut coming on Wednesday (Nov 17th) I took the liberty to > fix all the Go SDK examples under Spark 3. I don't like "stealing" work, > but we had not heard from you since this was brought to our attention. So, > for that, I'm sorry. > > https://github.com/apache/beam/pull/15970 > > Found a bug with the schema row decoder along the way too. > > Since the website tracks live, getting the quick start to use Spark 3 > doesn't have to happen before the cut, so that's still available to do. > > I really appreciate the clear errors and repros you provided! > > Thanks again > Robert Burke > > A > > On Mon, Nov 8, 2021, 1:27 PM Robert Burke <[email protected]> wrote: > >> +1 to Kyle's LOOPBACK suggestion. Gives you your local file system, and >> you can println debug to the console. However, only would be a single >> worker. >> >> On Mon, Nov 8, 2021 at 1:23 PM Robert Burke <[email protected]> wrote: >> >>> Oh that's definitely something needs updating. Yes please to those PRs. >>> >>> Please add a mention to @lostluck for me to review them. >>> >>> The "Unsupported class file major version" is a mismatch between Java8 >>> and Java 11, unrelated to the Go SDK, so I agree that the example should >>> spin up a spark3 instead of the older version. >>> >>> The "Failed to deduce Step from MonitoringInfo" messages are an >>> unfortunately noisy error message post successful job, because the code >>> doesn't know how to map PCollections to their Parent DoFn yet. Ritesh is >>> working on that. They probably need to be consolidated or ignored for now. >>> Right now, they come from here: >>> https://github.com/apache/beam/blob/e668460f61540638fb29e05997087b56ebcee4f3/sdks/go/pkg/beam/core/runtime/metricsx/metricsx.go#L51 >>> >>> >>> >>> On Mon, Nov 8, 2021 at 1:12 PM Kenneth Knowles <[email protected]> wrote: >>> >>>> Awesome! Just going to add a few colleagues (who are subscribed anyhow) >>>> to make sure this hits the top of their inbox. >>>> >>>> +Robert Burke <[email protected]> +Chamikara Jayalath >>>> <[email protected]> +Kyle Weaver <[email protected]> >>>> >>>> Kenn >>>> >>>> On Thu, Nov 4, 2021 at 11:40 PM Jeff Rhyason <[email protected]> >>>> wrote: >>>> >>>>> I'm interested to see the Go SDK work with the Spark runner. Based on >>>>> the instructions at https://beam.apache.org/get-started/quickstart-go/, >>>>> I run these commands and get the following failure: >>>>> >>>>> $ ./gradlew :runners:spark:2:job-server:runShadow >>>>> in another window: >>>>> $ cd sdks >>>>> $ go1.16.4 run ./go/examples/wordcount/wordcount.go --output foo >>>>> --runner spark --endpoint localhost:8099 >>>>> 2021/11/04 22:06:46 No environment config specified. Using default >>>>> config: 'apache/beam_go_sdk:2.35.0.dev' >>>>> 2021/11/04 22:06:46 Failed to execute job: generating model >>>>> pipeline >>>>> failed to add scope tree: &{{CountWords root/CountWords} >>>>> [{main.extractFn 5: ParDo [In(Main): string <- {4: string/string GLO}] -> >>>>> [Out: string -> {5: string/string GLO}]}] [0xc000096cd0]} >>>>> caused by: >>>>> failed to add input kind: {main.extractFn 5: ParDo [In(Main): string >>>>> <- {4: string/string GLO}] -> [Out: string -> {5: string/string GLO}]} >>>>> caused by: >>>>> failed to serialize 5: ParDo [In(Main): string <- {4: string/string >>>>> GLO}] -> [Out: string -> {5: string/string GLO}] >>>>> caused by: >>>>> encoding userfn 5: ParDo [In(Main): string <- {4: >>>>> string/string GLO}] -> [Out: string -> {5: string/string GLO}] >>>>> bad userfn >>>>> caused by: >>>>> encoding structural DoFn &{<nil> 0xc000460ae8 <nil> >>>>> map[ProcessElement:0xc0004fcac0] map[]} >>>>> receiver type *main.extractFn must be registered >>>>> exit status 1 >>>>> >>>>> I was able to register that type, like this: >>>>> >>>>> diff --git a/sdks/go/examples/wordcount/wordcount.go >>>>> b/sdks/go/examples/wordcount/wordcount.go >>>>> index 4d54db9a2d..6db99d6220 100644 >>>>> --- a/sdks/go/examples/wordcount/wordcount.go >>>>> +++ b/sdks/go/examples/wordcount/wordcount.go >>>>> @@ -60,6 +60,7 @@ import ( >>>>> "flag" >>>>> "fmt" >>>>> "log" >>>>> + "reflect" >>>>> "regexp" >>>>> "strings" >>>>> >>>>> @@ -107,6 +108,7 @@ var ( >>>>> // by calling beam.RegisterFunction in an init() call. >>>>> func init() { >>>>> beam.RegisterFunction(formatFn) >>>>> + beam.RegisterDoFn(reflect.TypeOf((*extractFn)(nil)).Elem()) >>>>> } >>>>> >>>>> var ( >>>>> >>>>> >>>>> Then I encountered: >>>>> >>>>> $ go1.16.4 run ./go/examples/wordcount/wordcount.go --output foo >>>>> --runner spark --endpoint localhost:8099 >>>>> ... >>>>> 2021/11/04 23:07:26 (): java.lang.IllegalArgumentException: >>>>> Unsupported class file major version 55 >>>>> 2021/11/04 23:07:26 Job state: FAILED >>>>> 2021/11/04 23:07:26 Failed to execute job: job >>>>> go0job0101636092444228385176-jar-1105060726-caffd2f4_ef37c3a9-b2a8-47bd-b1c7-a6e2771263f2 >>>>> failed >>>>> exit status 1 >>>>> >>>>> >>>>> Switching to the Spark 3.0 job server changed things: >>>>> $ cd .. ; ./gradlew :runners:spark:3:job-server:runShadow >>>>> ... >>>>> $ cd sdks ; go1.16.4 run ./go/examples/wordcount/wordcount.go >>>>> --output foo --runner spark --endpoint localhost:8099 >>>>> ... >>>>> 2021/11/04 23:12:04 Staged binary artifact with token: >>>>> 2021/11/04 23:12:04 Submitted job: >>>>> go0job0101636092722590274154-jar-1105061204-7f1d879e_28e28e06-1331-41c6-8288-4dcfa87afd13 >>>>> 2021/11/04 23:12:04 Job state: STOPPED >>>>> 2021/11/04 23:12:04 Job state: STARTING >>>>> 2021/11/04 23:12:04 Job state: RUNNING >>>>> 2021/11/04 23:12:17 Job state: DONE >>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo: >>>>> urn:"beam:metric:element_count:v1" type:"beam:metrics:sum_int64:v1" >>>>> payload:"\x01" labels:{key:"PCOLLECTION" value:"n1"} >>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo: >>>>> urn:"beam:metric:element_count:v1" type:"beam:metrics:sum_int64:v1" >>>>> payload:"\x01" labels:{key:"PCOLLECTION" value:"n3"} >>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo: >>>>> urn:"beam:metric:element_count:v1" type:"beam:metrics:sum_int64:v1" >>>>> payload:"\x81\xdc\x01" labels:{key:"PCOLLECTION" value:"n5"} >>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo: >>>>> urn:"beam:metric:element_count:v1" type:"beam:metrics:sum_int64:v1" >>>>> payload:"\x01" labels:{key:"PCOLLECTION" value:"n2"} >>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo: >>>>> urn:"beam:metric:element_count:v1" type:"beam:metrics:sum_int64:v1" >>>>> payload:"\x95+" labels:{key:"PCOLLECTION" value:"n4"} >>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo: >>>>> urn:"beam:metric:element_count:v1" type:"beam:metrics:sum_int64:v1" >>>>> payload:"\x81\xdc\x01" labels:{key:"PCOLLECTION" value:"n6"} >>>>> 2021/11/04 23:12:17 unknown metric type >>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo: >>>>> urn:"beam:metric:sampled_byte_size:v1" >>>>> type:"beam:metrics:distribution_int64:v1" payload:"\x01\x01\x01\x01" >>>>> labels:{key:"PCOLLECTION" value:"n1"} >>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo: >>>>> urn:"beam:metric:sampled_byte_size:v1" >>>>> type:"beam:metrics:distribution_int64:v1" payload:"\x01222" >>>>> labels:{key:"PCOLLECTION" value:"n3"} >>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo: >>>>> urn:"beam:metric:sampled_byte_size:v1" >>>>> type:"beam:metrics:distribution_int64:v1" payload:"\x01222" >>>>> labels:{key:"PCOLLECTION" value:"n2"} >>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo: >>>>> urn:"beam:metric:sampled_byte_size:v1" >>>>> type:"beam:metrics:distribution_int64:v1" >>>>> payload:"\x88\x01\xd9\x05\x02\x0b" labels:{key:"PCOLLECTION" >>>>> value:"n5"} >>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo: >>>>> urn:"beam:metric:sampled_byte_size:v1" >>>>> type:"beam:metrics:distribution_int64:v1" payload:"y\xfb\x16\x01=" >>>>> labels:{key:"PCOLLECTION" value:"n4"} >>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo: >>>>> urn:"beam:metric:sampled_byte_size:v1" >>>>> type:"beam:metrics:distribution_int64:v1" >>>>> payload:"\x81\xdc\x01\xff\xd5\"\x11\x1f" labels:{key:"PCOLLECTION" >>>>> value:"n6"} >>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo: >>>>> urn:"beam:metric:element_count:v1" type:"beam:metrics:sum_int64:v1" >>>>> payload:"\x8d%" labels:{key:"PCOLLECTION" value:"n9"} >>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo: >>>>> urn:"beam:metric:element_count:v1" type:"beam:metrics:sum_int64:v1" >>>>> payload:"\x8d%" labels:{key:"PCOLLECTION" value:"n10"} >>>>> 2021/11/04 23:12:17 unknown metric type >>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo: >>>>> urn:"beam:metric:element_count:v1" type:"beam:metrics:sum_int64:v1" >>>>> payload:"\x8d%" labels:{key:"PCOLLECTION" value:"n8"} >>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo: >>>>> urn:"beam:metric:element_count:v1" type:"beam:metrics:sum_int64:v1" >>>>> payload:"\x8d%" labels:{key:"PCOLLECTION" value:"n7"} >>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo: >>>>> urn:"beam:metric:sampled_byte_size:v1" >>>>> type:"beam:metrics:distribution_int64:v1" >>>>> payload:"\x8d%\xfa\xbf\x05\x04\xa8\x0c" labels:{key:"PCOLLECTION" >>>>> value:"n7"} >>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo: >>>>> urn:"beam:metric:sampled_byte_size:v1" >>>>> type:"beam:metrics:distribution_int64:v1" >>>>> payload:"\xbc\x02\x9b\x19\x06\x13" labels:{key:"PCOLLECTION" >>>>> value:"n9"} >>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo: >>>>> urn:"beam:metric:sampled_byte_size:v1" >>>>> type:"beam:metrics:distribution_int64:v1" >>>>> payload:"\xb5\x02\xf1\x15\x05\x10" labels:{key:"PCOLLECTION" >>>>> value:"n8"} >>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo: >>>>> urn:"beam:metric:sampled_byte_size:v1" >>>>> type:"beam:metrics:distribution_int64:v1" >>>>> payload:"\x8d%\xb3\xa7\x07\x14\"" labels:{key:"PCOLLECTION" >>>>> value:"n10"} >>>>> 2021/11/04 23:12:17 unknown metric type >>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo: >>>>> urn:"beam:metric:element_count:v1" type:"beam:metrics:sum_int64:v1" >>>>> payload:"\x01" labels:{key:"PCOLLECTION" value:"n11"} >>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo: >>>>> urn:"beam:metric:sampled_byte_size:v1" >>>>> type:"beam:metrics:distribution_int64:v1" >>>>> payload:"\x01\xf2\xfa\x02\xf2\xfa\x02\xf2\xfa\x02" >>>>> labels:{key:"PCOLLECTION" value:"n11"} >>>>> >>>>> However misleading those failures are, the process exits successfully. >>>>> I have more to learn about where the output went. >>>>> >>>>> It's really neat to see this working. >>>>> >>>>> Would you be interested in PRs for these? >>>>> * Go examples to register all the types needed for other runners >>>>> * updating the Go Quick Start to use the Spark 3 runner so it plays >>>>> better with the embedded Spark cluster >>>>> >>>>> Jeff >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>>
