[GitHub] [beam] lostluck commented on a change in pull request #12445: [BEAM-9919] Added an External Transform API to Go SDK

2020-08-17 Thread GitBox


lostluck commented on a change in pull request #12445:
URL: https://github.com/apache/beam/pull/12445#discussion_r471837313



##
File path: sdks/go/pkg/beam/core/graph/xlang.go
##
@@ -0,0 +1,129 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package graph
+
+import (
+   "github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+   "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
+   pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+)
+
+type ExpandedTransform struct {
+   Components_ interface{} // *pipepb.Components
+   Transform_  interface{} //*pipepb.PTransform
+   Requirements_   []string
+   BoundedOutputs_ map[string]bool
+}
+
+func (exp *ExpandedTransform) Components() *pipepb.Components {
+   if c, ok := exp.Components_.(*pipepb.Components); ok {
+   return c
+   }
+   panic(errors.Errorf("malformed components; %v lacks a conforming 
pipeline component", exp))
+}
+
+func (exp *ExpandedTransform) Transform() *pipepb.PTransform {
+   if t, ok := exp.Transform_.(*pipepb.PTransform); ok {
+   return t
+   }
+   panic(errors.Errorf("malformed transform; %v lacks a conforming 
pipeline ptransform", exp))
+}
+
+func (exp *ExpandedTransform) Requirements() []string {
+   if exp.Requirements_ != nil {
+   return exp.Requirements_
+   }
+   return nil
+}
+
+func (exp *ExpandedTransform) BoundedOutputs() map[string]bool {
+   if exp.BoundedOutputs_ != nil {
+   return exp.BoundedOutputs_
+   }
+   return nil
+}
+
+// ExternalTransform represents the cross-language transform in and out of the 
Pipeline as a MultiEdge and Expanded proto respectively
+type ExternalTransform struct {
+   Urn   string
+   Payload   []byte
+   ExpansionAddr string
+
+   //replace all input/output fields with Inbound and Outbound id maps 
referencing the orginal Multiedge
+
+   inputs  map[string]*Node
+   Outputs map[string]*Node
+   outputTypes map[string]typex.FullType
+
+   Expanded_ *ExpandedTransform
+}
+
+func (ext ExternalTransform) WithNamedInputs(inputs map[string]*Node) 
ExternalTransform {
+   if ext.inputs != nil {
+   panic(errors.Errorf("inputs already set as: \n%v", ext.inputs))
+   }
+   ext.inputs = inputs
+   return ext
+}
+
+func (ext ExternalTransform) WithNamedOutputs(outputTypes 
map[string]typex.FullType) ExternalTransform {

Review comment:
   Good use of a value method here and the similar functions. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] lostluck commented on a change in pull request #12445: [BEAM-9919] Added an External Transform API to Go SDK

2020-08-06 Thread GitBox


lostluck commented on a change in pull request #12445:
URL: https://github.com/apache/beam/pull/12445#discussion_r466552912



##
File path: sdks/go/examples/xlang/wordcount/xlang_wordcount.go
##
@@ -0,0 +1,107 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// xlang_wordcount exemplifies using a cross language transform from Python to 
count words
+package main
+
+import (
+   "context"
+   "flag"
+   "fmt"
+   "log"
+   "regexp"
+   "strings"
+
+   "github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+   "github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
+
+   "github.com/apache/beam/sdks/go/pkg/beam"
+   "github.com/apache/beam/sdks/go/pkg/beam/io/textio"
+   "github.com/apache/beam/sdks/go/pkg/beam/x/beamx"
+
+   // Imports to enable correct filesystem access and runner setup in 
LOOPBACK mode
+   _ "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem/gcs"
+   _ "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem/local"
+   _ "github.com/apache/beam/sdks/go/pkg/beam/runners/universal"
+)
+
+var (
+   // Set this option to choose a different input file or glob.
+   input = flag.String("input", "./input", "File(s) to read.")
+
+   // Set this required option to specify where to write the output.
+   output = flag.String("output", "./output", "Output file (required).")
+)
+
+var (
+   wordRE  = regexp.MustCompile(`[a-zA-Z]+('[a-z])?`)
+   empty   = beam.NewCounter("extract", "emptyLines")
+   lineLen = beam.NewDistribution("extract", "lineLenDistro")
+)
+
+// extractFn is a DoFn that emits the words in a given line.
+func extractFn(ctx context.Context, line string, emit func(string)) {
+   lineLen.Update(ctx, int64(len(line)))
+   if len(strings.TrimSpace(line)) == 0 {
+   empty.Inc(ctx, 1)
+   }
+   for _, word := range wordRE.FindAllString(line, -1) {
+   emit(word)
+   }
+}
+
+// formatFn is a DoFn that formats a word and its count as a string.
+func formatFn(w string, c int64) string {
+   return fmt.Sprintf("%s: %v", w, c)
+}
+
+func init() {
+   beam.RegisterFunction(extractFn)
+   beam.RegisterFunction(formatFn)
+}
+
+func main() {
+   flag.Parse()
+   beam.Init()
+
+   if *output == "" {
+   log.Fatal("No output provided")
+   }
+
+   p := beam.NewPipeline()
+   s := p.Root()
+
+   lines := textio.Read(s, *input)
+   col := beam.ParDo(s, extractFn, lines)
+
+   // Using Cross-language Count from Python's test expansion service
+   // TODO(pskevin): Cleaner using-face API
+   outputType := typex.NewKV(typex.New(reflectx.String), 
typex.New(reflectx.Int64))
+   external := {
+   In:[]beam.PCollection{col},
+   Urn:   "beam:transforms:xlang:count",
+   ExpansionAddr: "localhost:8118",

Review comment:
   Alas, it's got to be a parameter. Users may request transforms from 
multiple independent expansion servers (eg. a Java one, a python one, a 
dataflow one...)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] lostluck commented on a change in pull request #12445: [BEAM-9919] Added an External Transform API to Go SDK

2020-08-05 Thread GitBox


lostluck commented on a change in pull request #12445:
URL: https://github.com/apache/beam/pull/12445#discussion_r465879720



##
File path: sdks/go/pkg/beam/external.go
##
@@ -16,10 +16,144 @@
 package beam
 
 import (
+   "context"
+   "fmt"
+
"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+   "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
+   jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
+   pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+   "google.golang.org/grpc"
 )
 
+// ExternalTransform represents the cross-language transform in and out of the 
Pipeline as a MultiEdge and Expanded proto respectively
+type ExternalTransform struct {

Review comment:
   Per the other thread, please move these to parameters on CrossLanguage 
and TryCrossLanguage instead. Do not try to force in compatibility with the 
legacy External, it's OK for them to have two separate calls and paths.
   By having them as a struct it's not clear what is required and what is not, 
and the compiler won't help the user by failing at compile time. 
   
   An aside: The other issue here is you've mixed up user side parameters with 
internal implementation details, and made them part of the API surface. APIs 
are easiest to use when the user knows how to fill everything and what is 
required or not. The components and Expanded transform and requirements fields 
are not something that users would be filling in for example. Types are cheap. 
Make a new type instead of trying to reuse something that almost fits.

##
File path: sdks/go/pkg/beam/external.go
##
@@ -16,10 +16,151 @@
 package beam
 
 import (
+   "context"
+   "fmt"
+
"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+   "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
+   jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
+   pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+   "google.golang.org/grpc"
 )
 
+// ExternalTransform represents the cross-language transform in and out of the 
Pipeline as a MultiEdge and Expanded proto respectively
+type ExternalTransform struct {
+   idint
+   Urn   string
+   Payload   []byte
+   In[]PCollection
+   Out   []FullType
+   Bounded   bool
+   ExpansionAddr string
+   Components*pipepb.Components
+   ExpandedTransform *pipepb.PTransform
+   Requirements  []string
+}
+
+// CrossLanguage is the temporary API to execute external transforms
+// TODO(pskevin): Handle errors using the TryN and Must strategies instead one 
function handling multiple points of failure
+func CrossLanguage(s Scope, p *Pipeline, e *ExternalTransform) []PCollection {
+   if e.ExpansionAddr == "" { // TODO(pskevin): Better way to check if the 
value was ever set

Review comment:
   Note, by this comment, the intent was for you to remove this dead code, 
as it's unnecessary.

##
File path: sdks/go/pkg/beam/external.go
##
@@ -16,10 +16,144 @@
 package beam
 
 import (
+   "context"
+   "fmt"
+
"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+   "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
+   jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
+   pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+   "google.golang.org/grpc"
 )
 
+// ExternalTransform represents the cross-language transform in and out of the 
Pipeline as a MultiEdge and Expanded proto respectively
+type ExternalTransform struct {
+   idint
+   Urn   string
+   Payload   []byte
+   In[]PCollection
+   Out   []FullType
+   Bounded   bool
+   ExpansionAddr string
+   Components*pipepb.Components
+   ExpandedTransform *pipepb.PTransform
+   Requirements  []string

Review comment:
   Move these to a graph.CrossLanguage struct, but have their proto types 
be interface{} instead, with a comment about what the types should be. Given 
those fields are only used by beam framework internals, there's little risk in 
using type assertions for them in the right places, such as the graphx package.

##
File path: sdks/go/pkg/beam/external.go
##
@@ -16,10 +16,151 @@
 package beam
 
 import (
+   "context"
+   "fmt"
+
"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+   "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
+

[GitHub] [beam] lostluck commented on a change in pull request #12445: [BEAM-9919] Added an External Transform API to Go SDK

2020-08-05 Thread GitBox


lostluck commented on a change in pull request #12445:
URL: https://github.com/apache/beam/pull/12445#discussion_r464721180



##
File path: sdks/go/pkg/beam/external.go
##
@@ -16,10 +16,151 @@
 package beam
 
 import (
+   "context"
+   "fmt"
+
"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+   "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
+   jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
+   pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+   "google.golang.org/grpc"
 )
 
+// ExternalTransform represents the cross-language transform in and out of the 
Pipeline as a MultiEdge and Expanded proto respectively
+type ExternalTransform struct {
+   idint
+   Urn   string
+   Payload   []byte
+   In[]PCollection
+   Out   []FullType
+   Bounded   bool
+   ExpansionAddr string
+   Components*pipepb.Components
+   ExpandedTransform *pipepb.PTransform
+   Requirements  []string
+}
+
+// CrossLanguage is the temporary API to execute external transforms
+// TODO(pskevin): Handle errors using the TryN and Must strategies instead one 
function handling multiple points of failure
+func CrossLanguage(s Scope, p *Pipeline, e *ExternalTransform) []PCollection {
+   if e.ExpansionAddr == "" { // TODO(pskevin): Better way to check if the 
value was ever set
+   // return Legacy External API
+   }
+
+   /*
+   Add ExternalTranform to the Graph
+   */
+   // Validating scope and inputs
+   if !s.IsValid() {
+   // return nil, errors.New("invalid scope")
+   fmt.Println("invalid scope")
+   }
+   for i, col := range e.In {
+   if !col.IsValid() {
+   // return nil, errors.Errorf("invalid pcollection to 
external: index %v", i)
+   fmt.Printf("\ninvalid pcollection to external: index 
%v", i)
+
+   }
+   }
+
+   // Using exisiting MultiEdge format to represent ExternalTransform 
(already backwards compatible)
+   payload := {
+   URN:  e.Urn,
+   Data: e.Payload,
+   }
+   var ins []*graph.Node
+   for _, col := range e.In {
+   ins = append(ins, col.n)
+   }
+   edge := graph.NewCrossLanguage(s.real, s.scope, ins, payload)
+
+   // TODO(pskevin): There needs to be a better way of associating this 
ExternalTransform to the pipeline
+   // Adding ExternalTransform to pipeline referenced by MultiEdge ID
+   if p.ExpandedTransforms == nil {
+   p.ExpandedTransforms = make(map[string]*ExternalTransform)
+   }
+   p.ExpandedTransforms[fmt.Sprintf("e%v", edge.ID())] = e

Review comment:
   As discussed, this data can be part of the graph.External node (or a new 
graph.CrossLanguage struct if desired) which keeps it as part of the graph and 
can be handled appropriately in graphx/translate.go. There's absolutely no need 
to add a new way to pass information in through the pipeline OR the suggestion 
you have for scope.
   
   Use the existing abstraction. If it's not sufficient, please articulate why.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] lostluck commented on a change in pull request #12445: [BEAM-9919] Added an External Transform API to Go SDK

2020-08-05 Thread GitBox


lostluck commented on a change in pull request #12445:
URL: https://github.com/apache/beam/pull/12445#discussion_r464721180



##
File path: sdks/go/pkg/beam/external.go
##
@@ -16,10 +16,151 @@
 package beam
 
 import (
+   "context"
+   "fmt"
+
"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+   "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
+   jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
+   pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+   "google.golang.org/grpc"
 )
 
+// ExternalTransform represents the cross-language transform in and out of the 
Pipeline as a MultiEdge and Expanded proto respectively
+type ExternalTransform struct {
+   idint
+   Urn   string
+   Payload   []byte
+   In[]PCollection
+   Out   []FullType
+   Bounded   bool
+   ExpansionAddr string
+   Components*pipepb.Components
+   ExpandedTransform *pipepb.PTransform
+   Requirements  []string
+}
+
+// CrossLanguage is the temporary API to execute external transforms
+// TODO(pskevin): Handle errors using the TryN and Must strategies instead one 
function handling multiple points of failure
+func CrossLanguage(s Scope, p *Pipeline, e *ExternalTransform) []PCollection {
+   if e.ExpansionAddr == "" { // TODO(pskevin): Better way to check if the 
value was ever set
+   // return Legacy External API
+   }
+
+   /*
+   Add ExternalTranform to the Graph
+   */
+   // Validating scope and inputs
+   if !s.IsValid() {
+   // return nil, errors.New("invalid scope")
+   fmt.Println("invalid scope")
+   }
+   for i, col := range e.In {
+   if !col.IsValid() {
+   // return nil, errors.Errorf("invalid pcollection to 
external: index %v", i)
+   fmt.Printf("\ninvalid pcollection to external: index 
%v", i)
+
+   }
+   }
+
+   // Using exisiting MultiEdge format to represent ExternalTransform 
(already backwards compatible)
+   payload := {
+   URN:  e.Urn,
+   Data: e.Payload,
+   }
+   var ins []*graph.Node
+   for _, col := range e.In {
+   ins = append(ins, col.n)
+   }
+   edge := graph.NewCrossLanguage(s.real, s.scope, ins, payload)
+
+   // TODO(pskevin): There needs to be a better way of associating this 
ExternalTransform to the pipeline
+   // Adding ExternalTransform to pipeline referenced by MultiEdge ID
+   if p.ExpandedTransforms == nil {
+   p.ExpandedTransforms = make(map[string]*ExternalTransform)
+   }
+   p.ExpandedTransforms[fmt.Sprintf("e%v", edge.ID())] = e

Review comment:
   As discussed, this data can be part of the graph.External node (or a new 
graph.CrossLanguage struct if desired) which keeps it as part of the graph and 
can be handled appropriately in graphx/translate.go. There's absolutely no need 
to add a new way to pass information in through the pipeline.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] lostluck commented on a change in pull request #12445: [BEAM-9919] Added an External Transform API to Go SDK

2020-08-04 Thread GitBox


lostluck commented on a change in pull request #12445:
URL: https://github.com/apache/beam/pull/12445#discussion_r465380821



##
File path: sdks/go/pkg/beam/external.go
##
@@ -16,10 +16,151 @@
 package beam
 
 import (
+   "context"
+   "fmt"
+
"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+   "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
+   jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
+   pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+   "google.golang.org/grpc"
 )
 
+// ExternalTransform represents the cross-language transform in and out of the 
Pipeline as a MultiEdge and Expanded proto respectively
+type ExternalTransform struct {
+   idint
+   Urn   string
+   Payload   []byte
+   In[]PCollection
+   Out   []FullType
+   Bounded   bool
+   ExpansionAddr string
+   Components*pipepb.Components
+   ExpandedTransform *pipepb.PTransform
+   Requirements  []string
+}
+
+// CrossLanguage is the temporary API to execute external transforms
+// TODO(pskevin): Handle errors using the TryN and Must strategies instead one 
function handling multiple points of failure
+func CrossLanguage(s Scope, p *Pipeline, e *ExternalTransform) []PCollection {
+   if e.ExpansionAddr == "" { // TODO(pskevin): Better way to check if the 
value was ever set
+   // return Legacy External API
+   }
+
+   /*
+   Add ExternalTranform to the Graph

Review comment:
   To be pedantically clear: No block comments doesn't mean no comments. In 
my original comment, I suggested the line comment alternative.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] lostluck commented on a change in pull request #12445: [BEAM-9919] Added an External Transform API to Go SDK

2020-08-04 Thread GitBox


lostluck commented on a change in pull request #12445:
URL: https://github.com/apache/beam/pull/12445#discussion_r465380563



##
File path: sdks/go/pkg/beam/external.go
##
@@ -16,10 +16,151 @@
 package beam
 
 import (
+   "context"
+   "fmt"
+
"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+   "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
+   jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
+   pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+   "google.golang.org/grpc"
 )
 
+// ExternalTransform represents the cross-language transform in and out of the 
Pipeline as a MultiEdge and Expanded proto respectively
+type ExternalTransform struct {
+   idint
+   Urn   string
+   Payload   []byte
+   In[]PCollection
+   Out   []FullType
+   Bounded   bool
+   ExpansionAddr string
+   Components*pipepb.Components
+   ExpandedTransform *pipepb.PTransform
+   Requirements  []string
+}
+
+// CrossLanguage is the temporary API to execute external transforms
+// TODO(pskevin): Handle errors using the TryN and Must strategies instead one 
function handling multiple points of failure
+func CrossLanguage(s Scope, p *Pipeline, e *ExternalTransform) []PCollection {
+   if e.ExpansionAddr == "" { // TODO(pskevin): Better way to check if the 
value was ever set
+   // return Legacy External API
+   }
+
+   /*
+   Add ExternalTranform to the Graph

Review comment:
   ExternalTranform should be ExternalTransform.
   
   I'd prefer no block comments. They're uncommon in Go code.  
   
   To be clear, commenting on what the next sections of code is what comments 
are for. They don't need to take up so much space though.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] lostluck commented on a change in pull request #12445: [BEAM-9919] Added an External Transform API to Go SDK

2020-08-03 Thread GitBox


lostluck commented on a change in pull request #12445:
URL: https://github.com/apache/beam/pull/12445#discussion_r464717996



##
File path: sdks/go/examples/xlang/wordcount/xlang_wordcount.go
##
@@ -0,0 +1,100 @@
+package main
+
+import (
+   "context"
+   "flag"
+   "fmt"
+   "log"
+   "regexp"
+   "strings"
+
+   "github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+   "github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
+
+   "github.com/apache/beam/sdks/go/pkg/beam"
+   "github.com/apache/beam/sdks/go/pkg/beam/io/textio"
+   "github.com/apache/beam/sdks/go/pkg/beam/x/beamx"
+
+   // Imports to enable correct filesystem access and runner setup in 
LOOPBACK mode
+   _ "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem/gcs"
+   _ "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem/local"
+   _ "github.com/apache/beam/sdks/go/pkg/beam/runners/universal"
+)
+
+var (
+   // Set this option to choose a different input file or glob.
+   input = flag.String("input", "./input", "File(s) to read.")
+
+   // Set this required option to specify where to write the output.
+   output = flag.String("output", "./output", "Output file (required).")
+)
+
+var (
+   wordRE  = regexp.MustCompile(`[a-zA-Z]+('[a-z])?`)
+   empty   = beam.NewCounter("extract", "emptyLines")
+   lineLen = beam.NewDistribution("extract", "lineLenDistro")
+)
+
+// extractFn is a DoFn that emits the words in a given line.
+func extractFn(ctx context.Context, line string, emit func(string)) {
+   lineLen.Update(ctx, int64(len(line)))
+   if len(strings.TrimSpace(line)) == 0 {
+   empty.Inc(ctx, 1)
+   }
+   for _, word := range wordRE.FindAllString(line, -1) {
+   emit(word)
+   }
+}
+
+// formatFn is a DoFn that formats a word and its count as a string.
+func formatFn(w string, c int64) string {
+   fmt.Println(w, c)

Review comment:
   Arguably we can remove this line for debugging.

##
File path: sdks/go/examples/xlang/wordcount/xlang_wordcount.go
##
@@ -0,0 +1,100 @@
+package main
+
+import (
+   "context"
+   "flag"
+   "fmt"
+   "log"
+   "regexp"
+   "strings"
+
+   "github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+   "github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
+
+   "github.com/apache/beam/sdks/go/pkg/beam"
+   "github.com/apache/beam/sdks/go/pkg/beam/io/textio"
+   "github.com/apache/beam/sdks/go/pkg/beam/x/beamx"
+
+   // Imports to enable correct filesystem access and runner setup in 
LOOPBACK mode
+   _ "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem/gcs"
+   _ "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem/local"
+   _ "github.com/apache/beam/sdks/go/pkg/beam/runners/universal"
+)
+
+var (
+   // Set this option to choose a different input file or glob.
+   input = flag.String("input", "./input", "File(s) to read.")
+
+   // Set this required option to specify where to write the output.
+   output = flag.String("output", "./output", "Output file (required).")
+)
+
+var (
+   wordRE  = regexp.MustCompile(`[a-zA-Z]+('[a-z])?`)
+   empty   = beam.NewCounter("extract", "emptyLines")
+   lineLen = beam.NewDistribution("extract", "lineLenDistro")
+)
+
+// extractFn is a DoFn that emits the words in a given line.
+func extractFn(ctx context.Context, line string, emit func(string)) {
+   lineLen.Update(ctx, int64(len(line)))
+   if len(strings.TrimSpace(line)) == 0 {
+   empty.Inc(ctx, 1)
+   }
+   for _, word := range wordRE.FindAllString(line, -1) {
+   emit(word)
+   }
+}
+
+// formatFn is a DoFn that formats a word and its count as a string.
+func formatFn(w string, c int64) string {
+   fmt.Println(w, c)
+   return fmt.Sprintf("%s: %v", w, c)
+}
+
+func init() {
+   beam.RegisterFunction(extractFn)
+   beam.RegisterFunction(formatFn)
+}
+
+func main() {
+   // If beamx or Go flags are used, flags must be parsed first.

Review comment:
   Feel free to delete the copy pasted documentation from the original 
wordcount here, it doesn't need to be repeated as it draws focus away from the 
important part: Cross Language.

##
File path: sdks/go/examples/xlang/wordcount/xlang_wordcount.go
##
@@ -0,0 +1,100 @@
+package main

Review comment:
   Go packages and binaries should have a doc string with a blank line 
between it and the apache license.
   
   // xlang_wordcount use a cross language transform from Python to count words 
from a file.

##
File path: sdks/go/pkg/beam/external.go
##
@@ -16,10 +16,151 @@
 package beam
 
 import (
+   "context"
+   "fmt"
+
"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+   "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"