Awesome! Just going to add a few colleagues (who are subscribed anyhow) to
make sure this hits the top of their inbox.

+Robert Burke <r...@google.com> +Chamikara Jayalath
<chamik...@google.com> +Kyle
Weaver <kcwea...@google.com>

Kenn

On Thu, Nov 4, 2021 at 11:40 PM Jeff Rhyason <jrhya...@gmail.com> wrote:

> I'm interested to see the Go SDK work with the Spark runner. Based on the
> instructions at https://beam.apache.org/get-started/quickstart-go/, I run
> these commands and get the following failure:
>
> $ ./gradlew :runners:spark:2:job-server:runShadow
> in another window:
> $ cd sdks
> $ go1.16.4 run ./go/examples/wordcount/wordcount.go  --output foo --runner
> spark --endpoint localhost:8099
> 2021/11/04 22:06:46 No environment config specified. Using default config:
> 'apache/beam_go_sdk:2.35.0.dev'
> 2021/11/04 22:06:46 Failed to execute job:      generating model pipeline
> failed to add scope tree: &{{CountWords root/CountWords} [{main.extractFn
> 5: ParDo [In(Main): string <- {4: string/string GLO}] -> [Out: string ->
> {5: string/string GLO}]}] [0xc000096cd0]}
>         caused by:
> failed to add input kind: {main.extractFn 5: ParDo [In(Main): string <-
> {4: string/string GLO}] -> [Out: string -> {5: string/string GLO}]}
>         caused by:
> failed to serialize 5: ParDo [In(Main): string <- {4: string/string GLO}]
> -> [Out: string -> {5: string/string GLO}]
>         caused by:
>         encoding userfn 5: ParDo [In(Main): string <- {4: string/string
> GLO}] -> [Out: string -> {5: string/string GLO}]
> bad userfn
>         caused by:
>         encoding structural DoFn &{<nil> 0xc000460ae8 <nil>
> map[ProcessElement:0xc0004fcac0] map[]}
> receiver type *main.extractFn must be registered
> exit status 1
>
> I was able to register that type, like this:
>
> diff --git a/sdks/go/examples/wordcount/wordcount.go
> b/sdks/go/examples/wordcount/wordcount.go
> index 4d54db9a2d..6db99d6220 100644
> --- a/sdks/go/examples/wordcount/wordcount.go
> +++ b/sdks/go/examples/wordcount/wordcount.go
> @@ -60,6 +60,7 @@ import (
>         "flag"
>         "fmt"
>         "log"
> +       "reflect"
>         "regexp"
>         "strings"
>
> @@ -107,6 +108,7 @@ var (
>  // by calling beam.RegisterFunction in an init() call.
>  func init() {
>         beam.RegisterFunction(formatFn)
> +       beam.RegisterDoFn(reflect.TypeOf((*extractFn)(nil)).Elem())
>  }
>
>  var (
>
>
> Then I encountered:
>
> $ go1.16.4 run ./go/examples/wordcount/wordcount.go  --output foo --runner
> spark   --endpoint localhost:8099
> ...
> 2021/11/04 23:07:26  (): java.lang.IllegalArgumentException: Unsupported
> class file major version 55
> 2021/11/04 23:07:26 Job state: FAILED
> 2021/11/04 23:07:26 Failed to execute job: job
> go0job0101636092444228385176-jar-1105060726-caffd2f4_ef37c3a9-b2a8-47bd-b1c7-a6e2771263f2
> failed
> exit status 1
>
>
> Switching to the Spark 3.0 job server changed things:
> $ cd .. ;  ./gradlew :runners:spark:3:job-server:runShadow
> ...
> $ cd sdks ; go1.16.4 run ./go/examples/wordcount/wordcount.go  --output
> foo --runner spark   --endpoint localhost:8099
> ...
> 2021/11/04 23:12:04 Staged binary artifact with token:
> 2021/11/04 23:12:04 Submitted job:
> go0job0101636092722590274154-jar-1105061204-7f1d879e_28e28e06-1331-41c6-8288-4dcfa87afd13
> 2021/11/04 23:12:04 Job state: STOPPED
> 2021/11/04 23:12:04 Job state: STARTING
> 2021/11/04 23:12:04 Job state: RUNNING
> 2021/11/04 23:12:17 Job state: DONE
> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>  payload:"\x01"  labels:{key:"PCOLLECTION"  value:"n1"}
> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>  payload:"\x01"  labels:{key:"PCOLLECTION"  value:"n3"}
> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>  payload:"\x81\xdc\x01"  labels:{key:"PCOLLECTION"  value:"n5"}
> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>  payload:"\x01"  labels:{key:"PCOLLECTION"  value:"n2"}
> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>  payload:"\x95+"  labels:{key:"PCOLLECTION"  value:"n4"}
> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>  payload:"\x81\xdc\x01"  labels:{key:"PCOLLECTION"  value:"n6"}
> 2021/11/04 23:12:17 unknown metric type
> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
> urn:"beam:metric:sampled_byte_size:v1"
>  type:"beam:metrics:distribution_int64:v1"  payload:"\x01\x01\x01\x01"
>  labels:{key:"PCOLLECTION"  value:"n1"}
> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
> urn:"beam:metric:sampled_byte_size:v1"
>  type:"beam:metrics:distribution_int64:v1"  payload:"\x01222"
>  labels:{key:"PCOLLECTION"  value:"n3"}
> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
> urn:"beam:metric:sampled_byte_size:v1"
>  type:"beam:metrics:distribution_int64:v1"  payload:"\x01222"
>  labels:{key:"PCOLLECTION"  value:"n2"}
> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
> urn:"beam:metric:sampled_byte_size:v1"
>  type:"beam:metrics:distribution_int64:v1"
>  payload:"\x88\x01\xd9\x05\x02\x0b"  labels:{key:"PCOLLECTION"  value:"n5"}
> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
> urn:"beam:metric:sampled_byte_size:v1"
>  type:"beam:metrics:distribution_int64:v1"  payload:"y\xfb\x16\x01="
>  labels:{key:"PCOLLECTION"  value:"n4"}
> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
> urn:"beam:metric:sampled_byte_size:v1"
>  type:"beam:metrics:distribution_int64:v1"
>  payload:"\x81\xdc\x01\xff\xd5\"\x11\x1f"  labels:{key:"PCOLLECTION"
>  value:"n6"}
> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>  payload:"\x8d%"  labels:{key:"PCOLLECTION"  value:"n9"}
> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>  payload:"\x8d%"  labels:{key:"PCOLLECTION"  value:"n10"}
> 2021/11/04 23:12:17 unknown metric type
> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>  payload:"\x8d%"  labels:{key:"PCOLLECTION"  value:"n8"}
> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>  payload:"\x8d%"  labels:{key:"PCOLLECTION"  value:"n7"}
> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
> urn:"beam:metric:sampled_byte_size:v1"
>  type:"beam:metrics:distribution_int64:v1"
>  payload:"\x8d%\xfa\xbf\x05\x04\xa8\x0c"  labels:{key:"PCOLLECTION"
>  value:"n7"}
> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
> urn:"beam:metric:sampled_byte_size:v1"
>  type:"beam:metrics:distribution_int64:v1"
>  payload:"\xbc\x02\x9b\x19\x06\x13"  labels:{key:"PCOLLECTION"  value:"n9"}
> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
> urn:"beam:metric:sampled_byte_size:v1"
>  type:"beam:metrics:distribution_int64:v1"
>  payload:"\xb5\x02\xf1\x15\x05\x10"  labels:{key:"PCOLLECTION"  value:"n8"}
> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
> urn:"beam:metric:sampled_byte_size:v1"
>  type:"beam:metrics:distribution_int64:v1"
>  payload:"\x8d%\xb3\xa7\x07\x14\""  labels:{key:"PCOLLECTION"  value:"n10"}
> 2021/11/04 23:12:17 unknown metric type
> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>  payload:"\x01"  labels:{key:"PCOLLECTION"  value:"n11"}
> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
> urn:"beam:metric:sampled_byte_size:v1"
>  type:"beam:metrics:distribution_int64:v1"
>  payload:"\x01\xf2\xfa\x02\xf2\xfa\x02\xf2\xfa\x02"
>  labels:{key:"PCOLLECTION"  value:"n11"}
>
> However misleading those failures are, the process exits successfully. I
> have more to learn about where the output went.
>
> It's really neat to see this working.
>
> Would you be interested in PRs for these?
> * Go examples to register all the types needed for other runners
> * updating the Go Quick Start to use the Spark 3 runner so it plays better
> with the embedded Spark cluster
>
> Jeff
>
>
>
>
>
>
>
>
>

Reply via email to