This is an automated email from the ASF dual-hosted git repository. anton pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 7c92fe0 [BEAM-7154] Updating Go SDK errors (Part 3) new 4f9361b Merge pull request #8560 from youngoli/beam7154-2 7c92fe0 is described below commit 7c92fe0bc3ba73320ee26b6323eb01884381afcc Author: Daniel Oliveira <daniel.o.program...@gmail.com> AuthorDate: Sun May 12 20:51:46 2019 -0700 [BEAM-7154] Updating Go SDK errors (Part 3) --- sdks/go/pkg/beam/core/funcx/fn.go | 10 +- sdks/go/pkg/beam/core/funcx/signature.go | 15 ++- sdks/go/pkg/beam/core/graph/bind.go | 60 +++++---- sdks/go/pkg/beam/core/graph/coder/coder.go | 13 +- sdks/go/pkg/beam/core/graph/coder/registry.go | 7 +- sdks/go/pkg/beam/core/graph/coder/varint.go | 2 +- sdks/go/pkg/beam/core/graph/edge.go | 41 ++++-- sdks/go/pkg/beam/core/graph/fn.go | 16 +-- sdks/go/pkg/beam/core/graph/graph.go | 7 +- sdks/go/pkg/beam/core/runtime/coderx/float.go | 5 +- sdks/go/pkg/beam/core/runtime/coderx/varint.go | 9 +- sdks/go/pkg/beam/core/runtime/exec/coder.go | 3 +- sdks/go/pkg/beam/core/runtime/exec/cogbk.go | 4 +- sdks/go/pkg/beam/core/runtime/exec/combine.go | 23 ++-- sdks/go/pkg/beam/core/runtime/exec/combine_test.go | 3 +- sdks/go/pkg/beam/core/runtime/exec/datasink.go | 3 +- sdks/go/pkg/beam/core/runtime/exec/datasource.go | 17 +-- sdks/go/pkg/beam/core/runtime/exec/fn.go | 15 ++- sdks/go/pkg/beam/core/runtime/exec/input.go | 3 +- sdks/go/pkg/beam/core/runtime/exec/pardo.go | 9 +- sdks/go/pkg/beam/core/runtime/exec/plan.go | 11 +- sdks/go/pkg/beam/core/runtime/exec/translate.go | 33 ++--- sdks/go/pkg/beam/core/runtime/exec/unit_test.go | 12 +- sdks/go/pkg/beam/core/runtime/exec/util.go | 5 +- sdks/go/pkg/beam/core/runtime/graphx/coder.go | 28 ++-- sdks/go/pkg/beam/core/runtime/graphx/dataflow.go | 27 ++-- sdks/go/pkg/beam/core/runtime/graphx/serialize.go | 149 ++++++++++++++------- sdks/go/pkg/beam/core/runtime/graphx/translate.go | 3 +- sdks/go/pkg/beam/core/runtime/harness/datamgr.go | 12 +- sdks/go/pkg/beam/core/runtime/harness/harness.go | 7 +- sdks/go/pkg/beam/core/runtime/harness/logging.go | 3 +- sdks/go/pkg/beam/core/runtime/harness/session.go | 13 +- sdks/go/pkg/beam/core/runtime/harness/statemgr.go | 14 +- sdks/go/pkg/beam/core/runtime/pipelinex/replace.go | 3 +- sdks/go/pkg/beam/core/runtime/symbols.go | 4 +- sdks/go/pkg/beam/core/typex/fulltype.go | 12 +- sdks/go/pkg/beam/core/util/dot/dot.go | 5 +- sdks/go/pkg/beam/core/util/hooks/hooks.go | 12 +- sdks/go/pkg/beam/core/util/ioutilx/read.go | 3 +- sdks/go/pkg/beam/core/util/protox/any.go | 9 +- sdks/go/pkg/beam/core/util/protox/base64.go | 4 +- sdks/go/pkg/beam/core/util/reflectx/call.go | 4 +- sdks/go/pkg/beam/core/util/reflectx/json.go | 5 +- sdks/go/pkg/beam/core/util/symtab/symtab.go | 15 ++- 44 files changed, 376 insertions(+), 282 deletions(-) diff --git a/sdks/go/pkg/beam/core/funcx/fn.go b/sdks/go/pkg/beam/core/funcx/fn.go index 48129a5..c924782 100644 --- a/sdks/go/pkg/beam/core/funcx/fn.go +++ b/sdks/go/pkg/beam/core/funcx/fn.go @@ -16,12 +16,12 @@ package funcx import ( - "errors" "fmt" "reflect" "github.com/apache/beam/sdks/go/pkg/beam/core/typex" "github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx" + "github.com/apache/beam/sdks/go/pkg/beam/internal/errors" ) // Note that we can't tell the difference between K, V and V, S before binding. @@ -250,7 +250,7 @@ func New(fn reflectx.Func) (*Fn, error) { case IsReIter(t): kind = FnReIter default: - return nil, fmt.Errorf("bad parameter type for %s: %v", fn.Name(), t) + return nil, errors.Errorf("bad parameter type for %s: %v", fn.Name(), t) } param = append(param, FnParam{Kind: kind, T: t}) @@ -269,7 +269,7 @@ func New(fn reflectx.Func) (*Fn, error) { case typex.IsContainer(t), typex.IsConcrete(t), typex.IsUniversal(t): kind = RetValue default: - return nil, fmt.Errorf("bad return type for %s: %v", fn.Name(), t) + return nil, errors.Errorf("bad return type for %s: %v", fn.Name(), t) } ret = append(ret, ReturnParam{Kind: kind, T: t}) @@ -314,14 +314,14 @@ func validateOrder(u *Fn) error { // Validate the parameter ordering. for i, p := range u.Param { if paramState, err = nextParamState(paramState, p.Kind); err != nil { - return fmt.Errorf("%s at parameter %d for %s", err.Error(), i, u.Fn.Name()) + return errors.WithContextf(err, "validating parameter %d for %s", i, u.Fn.Name()) } } // Validate the return value ordering. retState := rsStart for i, r := range u.Ret { if retState, err = nextRetState(retState, r.Kind); err != nil { - return fmt.Errorf("%s for return value %d for %s", err.Error(), i, u.Fn.Name()) + return errors.WithContextf(err, "validating return value %d for %s", i, u.Fn.Name()) } } return nil diff --git a/sdks/go/pkg/beam/core/funcx/signature.go b/sdks/go/pkg/beam/core/funcx/signature.go index 6caa5cd..8bcd386 100644 --- a/sdks/go/pkg/beam/core/funcx/signature.go +++ b/sdks/go/pkg/beam/core/funcx/signature.go @@ -22,6 +22,7 @@ import ( "github.com/apache/beam/sdks/go/pkg/beam/core/typex" "github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx" + "github.com/apache/beam/sdks/go/pkg/beam/internal/errors" ) // Signature is a concise representation of a group of function types. The @@ -118,7 +119,7 @@ func Satisfy(fn interface{}, sig *Signature) error { default: value := reflect.ValueOf(fn) if value.Kind() != reflect.Func { - return fmt.Errorf("not a function: %v", value) + return errors.Errorf("not a function: %v", value) } typ = value.Type() } @@ -129,11 +130,11 @@ func Satisfy(fn interface{}, sig *Signature) error { out = append(out, typ.Out(i)) } if len(in) < len(sig.Args) || len(out) < len(sig.Return) { - return fmt.Errorf("not enough required parameters: %v", typ) + return errors.Errorf("not enough required parameters: %v", typ) } if len(in) > len(sig.Args)+len(sig.OptArgs) || len(out) > len(sig.Return)+len(sig.OptReturn) { - return fmt.Errorf("too many parameters: %v", typ) + return errors.Errorf("too many parameters: %v", typ) } // (1) Create generic binding. If inconsistent, reject fn. We do not allow @@ -170,7 +171,7 @@ func bind(list, models []reflect.Type, m map[string]reflect.Type) error { name := list[i].Name() if current, ok := m[name]; ok && current != t { - return fmt.Errorf("bind conflict for %v: %v != %v", name, current, t) + return errors.Errorf("bind conflict for %v: %v != %v", name, current, t) } m[name] = t } @@ -210,7 +211,7 @@ func matchOpt(list, models []reflect.Type, m map[string]reflect.Type) error { // Substitute optional types, if bound. subst, ok := m[t.Name()] if !ok { - return fmt.Errorf("optional generic parameter not bound %v", t.Name()) + return errors.Errorf("optional generic parameter not bound %v", t.Name()) } t = subst } @@ -219,7 +220,7 @@ func matchOpt(list, models []reflect.Type, m map[string]reflect.Type) error { } if i == len(models) { - return fmt.Errorf("failed to match optional parameter %v", t) + return errors.Errorf("failed to match optional parameter %v", t) } } return nil @@ -228,6 +229,6 @@ func matchOpt(list, models []reflect.Type, m map[string]reflect.Type) error { // MustSatisfy panics if the given fn does not satisfy the signature. func MustSatisfy(fn interface{}, sig *Signature) { if err := Satisfy(fn, sig); err != nil { - panic(fmt.Sprintf("fn does not satisfy signature %v: %v", sig, err)) + panic(errors.Wrapf(err, "fn does not satisfy signature %v", sig)) } } diff --git a/sdks/go/pkg/beam/core/graph/bind.go b/sdks/go/pkg/beam/core/graph/bind.go index 3aeb3d2..77a6d69 100644 --- a/sdks/go/pkg/beam/core/graph/bind.go +++ b/sdks/go/pkg/beam/core/graph/bind.go @@ -16,11 +16,11 @@ package graph import ( - "fmt" "reflect" "github.com/apache/beam/sdks/go/pkg/beam/core/funcx" "github.com/apache/beam/sdks/go/pkg/beam/core/typex" + "github.com/apache/beam/sdks/go/pkg/beam/internal/errors" ) // TODO(herohde) 4/21/2017: Bind is where most user mistakes will likely show @@ -62,27 +62,28 @@ import ( func Bind(fn *funcx.Fn, typedefs map[string]reflect.Type, in ...typex.FullType) ([]typex.FullType, []InputKind, []typex.FullType, []typex.FullType, error) { inbound, kinds, err := findInbound(fn, in...) if err != nil { - return nil, nil, nil, nil, fmt.Errorf("binding fn %v: %v", fn.Fn.Name(), err) + return nil, nil, nil, nil, errors.WithContextf(err, "binding fn %v", fn.Fn.Name()) } outbound, err := findOutbound(fn) if err != nil { - return nil, nil, nil, nil, fmt.Errorf("binding fn %v: %v", fn.Fn.Name(), err) + return nil, nil, nil, nil, errors.WithContextf(err, "binding fn %v", fn.Fn.Name()) } subst, err := typex.Bind(inbound, in) if err != nil { - return nil, nil, nil, nil, fmt.Errorf("binding fn %v: %v", fn.Fn.Name(), err) + return nil, nil, nil, nil, errors.WithContextf(err, "binding fn %v", fn.Fn.Name()) } for k, v := range typedefs { if substK, exists := subst[k]; exists { - return nil, nil, nil, nil, fmt.Errorf("binding fn %v: cannot substitute type %v with %v, already defined as %v", fn.Fn.Name(), k, v, substK) + err := errors.Errorf("cannot substitute type %v with %v, already defined as %v", k, v, substK) + return nil, nil, nil, nil, errors.WithContextf(err, "binding fn %v", fn.Fn.Name()) } subst[k] = v } out, err := typex.Substitute(outbound, subst) if err != nil { - return nil, nil, nil, nil, fmt.Errorf("binding fn %v: %v", fn.Fn.Name(), err) + return nil, nil, nil, nil, errors.WithContextf(err, "binding fn %v", fn.Fn.Name()) } return inbound, kinds, outbound, out, nil } @@ -102,7 +103,7 @@ func findOutbound(fn *funcx.Fn) ([]typex.FullType, error) { case 2: outbound = append(outbound, typex.NewKV(typex.New(ret[0]), typex.New(ret[1]))) default: - return nil, fmt.Errorf("too many return values: %v", ret) + return nil, errors.Errorf("too many return values: %v", ret) } for _, param := range params { @@ -135,26 +136,29 @@ func findInbound(fn *funcx.Fn, in ...typex.FullType) ([]typex.FullType, []InputK for _, input := range in { arity, err := inboundArity(input, index == 0) if err != nil { - return nil, nil, fmt.Errorf("binding params %v to input %v: %v", params, input, err) + return nil, nil, errors.WithContextf(err, "binding params %v to input %v", params, input) } if len(params)-index < arity { - return nil, nil, fmt.Errorf("binding params %v to input %v: too few params", params[index:], input) + err := errors.New("too few params") + return nil, nil, errors.WithContextf(err, "binding params %v to input %v", params[index:], input) } paramsToBind := params[index : index+arity] elm, kind, err := tryBindInbound(input, paramsToBind, index == 0) if err != nil { - return nil, nil, fmt.Errorf("binding params %v to input %v: %v", paramsToBind, input, err) + return nil, nil, errors.WithContextf(err, "binding params %v to input %v", paramsToBind, input) } inbound = append(inbound, elm) kinds = append(kinds, kind) index += arity } if index < len(params) { - return nil, nil, fmt.Errorf("binding params %v to inputs %v: too few inputs. Forgot an input or to annotate options?", params, in) + err := errors.New("too few inputs: forgot an input or to annotate options?") + return nil, nil, errors.WithContextf(err, "binding params %v to inputs %v:", params, in) } if index > len(params) { - return nil, nil, fmt.Errorf("binding params %v to inputs %v: too many inputs", params, in) + err := errors.New("too many inputs") + return nil, nil, errors.WithContextf(err, "binding params %v to inputs %v:", params, in) } return inbound, kinds, nil } @@ -188,7 +192,7 @@ func tryBindInbound(t typex.FullType, args []funcx.FnParam, isMain bool) (typex. values, _ := funcx.UnfoldIter(args[0].T) trimmed := trimIllegal(values) if len(trimmed) != 1 { - return nil, kind, fmt.Errorf("%v cannot bind to %v", t, args[0]) + return nil, kind, errors.Errorf("%v cannot bind to %v", t, args[0]) } kind = Iter @@ -198,14 +202,14 @@ func tryBindInbound(t typex.FullType, args []funcx.FnParam, isMain bool) (typex. values, _ := funcx.UnfoldReIter(args[0].T) trimmed := trimIllegal(values) if len(trimmed) != 1 { - return nil, kind, fmt.Errorf("%v cannot bind to %v", t, args[0]) + return nil, kind, errors.Errorf("%v cannot bind to %v", t, args[0]) } kind = ReIter other = typex.New(trimmed[0]) default: - return nil, kind, fmt.Errorf("unexpected param kind: %v", arg) + return nil, kind, errors.Errorf("unexpected param kind: %v", arg) } } case typex.Composite: @@ -213,10 +217,10 @@ func tryBindInbound(t typex.FullType, args []funcx.FnParam, isMain bool) (typex. case typex.KVType: if isMain { if args[0].Kind != funcx.FnValue { - return nil, kind, fmt.Errorf("key of %v cannot bind to %v", t, args[0]) + return nil, kind, errors.Errorf("key of %v cannot bind to %v", t, args[0]) } if args[1].Kind != funcx.FnValue { - return nil, kind, fmt.Errorf("value of %v cannot bind to %v", t, args[1]) + return nil, kind, errors.Errorf("value of %v cannot bind to %v", t, args[1]) } other = typex.NewKV(typex.New(args[0].T), typex.New(args[1].T)) } else { @@ -227,7 +231,7 @@ func tryBindInbound(t typex.FullType, args []funcx.FnParam, isMain bool) (typex. values, _ := funcx.UnfoldIter(args[0].T) trimmed := trimIllegal(values) if len(trimmed) != 2 { - return nil, kind, fmt.Errorf("%v cannot bind to %v", t, args[0]) + return nil, kind, errors.Errorf("%v cannot bind to %v", t, args[0]) } kind = Iter @@ -237,20 +241,20 @@ func tryBindInbound(t typex.FullType, args []funcx.FnParam, isMain bool) (typex. values, _ := funcx.UnfoldReIter(args[0].T) trimmed := trimIllegal(values) if len(trimmed) != 2 { - return nil, kind, fmt.Errorf("%v cannot bind to %v", t, args[0]) + return nil, kind, errors.Errorf("%v cannot bind to %v", t, args[0]) } kind = ReIter other = typex.NewKV(typex.New(trimmed[0]), typex.New(trimmed[1])) default: - return nil, kind, fmt.Errorf("%v cannot bind to %v", t, args[0]) + return nil, kind, errors.Errorf("%v cannot bind to %v", t, args[0]) } } case typex.CoGBKType: if args[0].Kind != funcx.FnValue { - return nil, kind, fmt.Errorf("key of %v cannot bind to %v", t, args[0]) + return nil, kind, errors.Errorf("key of %v cannot bind to %v", t, args[0]) } components := []typex.FullType{typex.New(args[0].T)} @@ -261,7 +265,7 @@ func tryBindInbound(t typex.FullType, args []funcx.FnParam, isMain bool) (typex. values, _ := funcx.UnfoldIter(args[i].T) trimmed := trimIllegal(values) if len(trimmed) != 1 { - return nil, kind, fmt.Errorf("values of %v cannot bind to %v", t, args[i]) + return nil, kind, errors.Errorf("values of %v cannot bind to %v", t, args[i]) } components = append(components, typex.New(trimmed[0])) @@ -269,25 +273,25 @@ func tryBindInbound(t typex.FullType, args []funcx.FnParam, isMain bool) (typex. values, _ := funcx.UnfoldReIter(args[i].T) trimmed := trimIllegal(values) if len(trimmed) != 1 { - return nil, kind, fmt.Errorf("values of %v cannot bind to %v", t, args[i]) + return nil, kind, errors.Errorf("values of %v cannot bind to %v", t, args[i]) } components = append(components, typex.New(trimmed[0])) default: - return nil, kind, fmt.Errorf("values of %v cannot bind to %v", t, args[i]) + return nil, kind, errors.Errorf("values of %v cannot bind to %v", t, args[i]) } } other = typex.NewCoGBK(components...) default: - return nil, kind, fmt.Errorf("unexpected inbound type: %v", t.Type()) + return nil, kind, errors.Errorf("unexpected inbound type: %v", t.Type()) } default: - return nil, kind, fmt.Errorf("unexpected inbound class: %v", t.Class()) + return nil, kind, errors.Errorf("unexpected inbound class: %v", t.Class()) } if !typex.IsStructurallyAssignable(t, other) { - return nil, kind, fmt.Errorf("%v is not assignable to %v", t, other) + return nil, kind, errors.Errorf("%v is not assignable to %v", t, other) } return other, kind, nil } @@ -304,7 +308,7 @@ func inboundArity(t typex.FullType, isMain bool) (int, error) { case typex.CoGBKType: return len(t.Components()), nil default: - return 0, fmt.Errorf("unexpected composite inbound type: %v", t.Type()) + return 0, errors.Errorf("unexpected composite inbound type: %v", t.Type()) } } return 1, nil diff --git a/sdks/go/pkg/beam/core/graph/coder/coder.go b/sdks/go/pkg/beam/core/graph/coder/coder.go index cfcca35..4de630a 100644 --- a/sdks/go/pkg/beam/core/graph/coder/coder.go +++ b/sdks/go/pkg/beam/core/graph/coder/coder.go @@ -26,6 +26,7 @@ import ( "github.com/apache/beam/sdks/go/pkg/beam/core/funcx" "github.com/apache/beam/sdks/go/pkg/beam/core/typex" "github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx" + "github.com/apache/beam/sdks/go/pkg/beam/internal/errors" ) // CustomCoder contains possibly untyped encode/decode user functions that are @@ -106,7 +107,7 @@ type ElementDecoder interface { func validateEncoder(t reflect.Type, encode interface{}) error { // Check if it uses the real type in question. if err := funcx.Satisfy(encode, funcx.Replace(encodeSig, typex.TType, t)); err != nil { - return fmt.Errorf("validateEncoder: incorrect signature: %v", err) + return errors.WithContext(err, "validateEncoder: validating signature") } // TODO(lostluck): 2019.02.03 - Determine if there are encode allocation bottlenecks. return nil @@ -115,7 +116,7 @@ func validateEncoder(t reflect.Type, encode interface{}) error { func validateDecoder(t reflect.Type, decode interface{}) error { // Check if it uses the real type in question. if err := funcx.Satisfy(decode, funcx.Replace(decodeSig, typex.TType, t)); err != nil { - return fmt.Errorf("validateDecoder: incorrect signature: %v", err) + return errors.WithContext(err, "validateDecoder: validating signature") } // TODO(lostluck): 2019.02.03 - Expand cases to avoid []byte -> interface{} conversion // in exec, & a beam Decoder interface. @@ -126,19 +127,19 @@ func validateDecoder(t reflect.Type, decode interface{}) error { // particular encoding strategy. func NewCustomCoder(id string, t reflect.Type, encode, decode interface{}) (*CustomCoder, error) { if err := validateEncoder(t, encode); err != nil { - return nil, fmt.Errorf("NewCustomCoder: %v", err) + return nil, errors.WithContext(err, "NewCustomCoder") } enc, err := funcx.New(reflectx.MakeFunc(encode)) if err != nil { - return nil, fmt.Errorf("bad encode: %v", err) + return nil, errors.Wrap(err, "bad encode") } if err := validateDecoder(t, decode); err != nil { - return nil, fmt.Errorf("NewCustomCoder: %v", err) + return nil, errors.WithContext(err, "NewCustomCoder") } dec, err := funcx.New(reflectx.MakeFunc(decode)) if err != nil { - return nil, fmt.Errorf("bad decode: %v", err) + return nil, errors.Wrap(err, "bad decode") } c := &CustomCoder{ diff --git a/sdks/go/pkg/beam/core/graph/coder/registry.go b/sdks/go/pkg/beam/core/graph/coder/registry.go index d95258d..5be9556 100644 --- a/sdks/go/pkg/beam/core/graph/coder/registry.go +++ b/sdks/go/pkg/beam/core/graph/coder/registry.go @@ -16,8 +16,9 @@ package coder import ( - "fmt" "reflect" + + "github.com/apache/beam/sdks/go/pkg/beam/internal/errors" ) var ( @@ -45,7 +46,7 @@ func RegisterCoder(t reflect.Type, enc, dec interface{}) { key := tkey(t) if _, err := NewCustomCoder(t.String(), t, enc, dec); err != nil { - panic(fmt.Sprintf("RegisterCoder failed for type %v: %v", t, err)) + panic(errors.Wrapf(err, "RegisterCoder failed for type %v", t)) } if t.Kind() == reflect.Interface { @@ -72,7 +73,7 @@ func RegisterCoder(t reflect.Type, enc, dec interface{}) { cc, err := NewCustomCoder(name, rt, enc, dec) if err != nil { // An error on look up shouldn't happen after the validation. - panic(fmt.Sprintf("Creating %v CustomCoder for type %v failed: %v", name, rt, err)) + panic(errors.Wrapf(err, "Creating %v CustomCoder for type %v failed", name, rt)) } return cc } diff --git a/sdks/go/pkg/beam/core/graph/coder/varint.go b/sdks/go/pkg/beam/core/graph/coder/varint.go index 1c93d1d..4f311a0 100644 --- a/sdks/go/pkg/beam/core/graph/coder/varint.go +++ b/sdks/go/pkg/beam/core/graph/coder/varint.go @@ -16,10 +16,10 @@ package coder import ( - "errors" "io" "github.com/apache/beam/sdks/go/pkg/beam/core/util/ioutilx" + "github.com/apache/beam/sdks/go/pkg/beam/internal/errors" ) // ErrVarIntTooLong indicates a data corruption issue that needs special diff --git a/sdks/go/pkg/beam/core/graph/edge.go b/sdks/go/pkg/beam/core/graph/edge.go index 24b57fe..846dceb 100644 --- a/sdks/go/pkg/beam/core/graph/edge.go +++ b/sdks/go/pkg/beam/core/graph/edge.go @@ -24,6 +24,7 @@ import ( "github.com/apache/beam/sdks/go/pkg/beam/core/graph/window" "github.com/apache/beam/sdks/go/pkg/beam/core/typex" "github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx" + "github.com/apache/beam/sdks/go/pkg/beam/internal/errors" ) // Opcode represents a primitive Beam instruction kind. @@ -187,10 +188,12 @@ func (e *MultiEdge) String() string { func NewCoGBK(g *Graph, s *Scope, ns []*Node) (*MultiEdge, error) { if len(ns) == 0 { // TODO(BEAM-7086) Reduce the repetition in the context of all the errors in this file. - return nil, fmt.Errorf("creating new CoGBK in scope %v: needs at least 1 input", s) + err := errors.New("needs at least 1 input") + return nil, errors.WithContextf(err, "creating new CoGBK in scope %v", s) } if !typex.IsKV(ns[0].Type()) { - return nil, fmt.Errorf("creating new CoGBK in scope %v: input type must be KV: %v", s, ns[0]) + err := errors.Errorf("input type must be KV: %v", ns[0]) + return nil, errors.WithContextf(err, "creating new CoGBK in scope %v", s) } // (1) Create CoGBK result type: KV<T,U>, .., KV<T,Z> -> CoGBK<T,U,..,Z>. @@ -203,16 +206,20 @@ func NewCoGBK(g *Graph, s *Scope, ns []*Node) (*MultiEdge, error) { for i := 1; i < len(ns); i++ { n := ns[i] if !typex.IsKV(n.Type()) { - return nil, fmt.Errorf("creating new CoGBK in scope %v: input type must be KV: %v", s, n) + err := errors.Errorf("input type must be KV: %v", n) + return nil, errors.WithContextf(err, "creating new CoGBK in scope %v", s) } if !n.Coder.Components[0].Equals(c) { - return nil, fmt.Errorf("creating new CoGBK in scope %v: key coder for %v is %v, want %v", s, n, n.Coder.Components[0], c) + err := errors.Errorf("key coder for %v is %v, want %v", n, n.Coder.Components[0], c) + return nil, errors.WithContextf(err, "creating new CoGBK in scope %v", s) } if !w.Equals(n.WindowingStrategy()) { - return nil, fmt.Errorf("creating new CoGBK in scope %v: mismatched CoGBK windowing strategies: %v, want %v", s, n.WindowingStrategy(), w) + err := errors.Errorf("mismatched CoGBK windowing strategies: %v, want %v", n.WindowingStrategy(), w) + return nil, errors.WithContextf(err, "creating new CoGBK in scope %v", s) } if bounded != n.Bounded() { - return nil, fmt.Errorf("creating new CoGBK in scope %v: unmatched CoGBK boundedness: %v, want %v", s, n.Bounded(), bounded) + err := errors.Errorf("unmatched CoGBK boundedness: %v, want %v", n.Bounded(), bounded) + return nil, errors.WithContextf(err, "creating new CoGBK in scope %v", s) } comp = append(comp, n.Type().Components()[1]) @@ -236,7 +243,8 @@ func NewCoGBK(g *Graph, s *Scope, ns []*Node) (*MultiEdge, error) { // the shared input type. func NewFlatten(g *Graph, s *Scope, in []*Node) (*MultiEdge, error) { if len(in) < 2 { - return nil, fmt.Errorf("creating new Flatten in scope %v: Flatten needs at least 2 input, got %v", s, len(in)) + err := errors.Errorf("Flatten needs at least 2 input, got %v", len(in)) + return nil, errors.WithContextf(err, "creating new Flatten in scope %v", s) } t := in[0].Type() w := inputWindow(in) @@ -252,14 +260,17 @@ func NewFlatten(g *Graph, s *Scope, in []*Node) (*MultiEdge, error) { } for _, n := range in { if !typex.IsEqual(t, n.Type()) { - return nil, fmt.Errorf("creating new Flatten in scope %v: mismatched Flatten input types: %v, want %v", s, n.Type(), t) + err := errors.Errorf("mismatched Flatten input types: %v, want %v", n.Type(), t) + return nil, errors.WithContextf(err, "creating new Flatten in scope %v", s) } if !w.Equals(n.WindowingStrategy()) { - return nil, fmt.Errorf("creating new Flatten in scope %v: mismatched Flatten window types: %v, want %v", s, n.WindowingStrategy(), w) + err := errors.Errorf("mismatched Flatten window types: %v, want %v", n.WindowingStrategy(), w) + return nil, errors.WithContextf(err, "creating new Flatten in scope %v", s) } } if typex.IsCoGBK(t) { - return nil, fmt.Errorf("creating new Flatten in scope %v: Flatten input type cannot be CoGBK: %v", s, t) + err := errors.Errorf("Flatten input type cannot be CoGBK: %v", t) + return nil, errors.WithContextf(err, "creating new Flatten in scope %v", s) } edge := g.NewEdge(s) @@ -300,7 +311,7 @@ func newDoFnNode(op Opcode, g *Graph, s *Scope, u *DoFn, in []*Node, typedefs ma inbound, kinds, outbound, out, err := Bind(u.ProcessElementFn(), typedefs, NodeTypes(in)...) if err != nil { - return nil, fmt.Errorf("creating new DoFn in scope %v: %v", s, err) + return nil, errors.WithContextf(err, "creating new DoFn in scope %v", s) } edge := g.NewEdge(s) @@ -329,10 +340,12 @@ const CombinePerKeyScope = "CombinePerKey" func NewCombine(g *Graph, s *Scope, u *CombineFn, in *Node, ac *coder.Coder) (*MultiEdge, error) { inT := in.Type() if !typex.IsCoGBK(inT) { - return nil, fmt.Errorf("creating new Combine in scope %v: Combine requires CoGBK type: %v", s, inT) + err := errors.Errorf("Combine requires CoGBK type: %v", inT) + return nil, errors.WithContextf(err, "creating new Combine in scope %v", s) } if len(inT.Components()) > 2 { - return nil, fmt.Errorf("creating new Combine in scope %v: Combine cannot follow multi-input CoGBK: %v", s, inT) + err := errors.Errorf("Combine cannot follow multi-input CoGBK: %v", inT) + return nil, errors.WithContextf(err, "creating new Combine in scope %v", s) } // Create a synthetic function for binding purposes. It takes main input @@ -381,7 +394,7 @@ func NewCombine(g *Graph, s *Scope, u *CombineFn, in *Node, ac *coder.Coder) (*M inbound, kinds, outbound, out, err := Bind(synth, nil, inT) if err != nil { - return nil, fmt.Errorf("creating new Combine in scope %v: %v", s, err) + return nil, errors.WithContextf(err, "creating new Combine in scope %v", s) } edge := g.NewEdge(s) diff --git a/sdks/go/pkg/beam/core/graph/fn.go b/sdks/go/pkg/beam/core/graph/fn.go index 7418c8c..d174c4f 100644 --- a/sdks/go/pkg/beam/core/graph/fn.go +++ b/sdks/go/pkg/beam/core/graph/fn.go @@ -94,7 +94,7 @@ func NewFn(fn interface{}) (*Fn, error) { case reflect.Ptr: if val.Elem().Kind() != reflect.Struct { - return nil, fmt.Errorf("value %v must be ptr to struct", fn) + return nil, errors.Errorf("value %v must be ptr to struct", fn) } // Note that a ptr receiver is necessary if struct fields are updated in the @@ -107,7 +107,7 @@ func NewFn(fn interface{}) (*Fn, error) { for name, mfn := range methodsFuncs { f, err := funcx.New(mfn) if err != nil { - return nil, fmt.Errorf("method %v invalid: %v", name, err) + return nil, errors.Wrapf(err, "method %v invalid", name) } methods[name] = f } @@ -133,14 +133,14 @@ func NewFn(fn interface{}) (*Fn, error) { f, err := funcx.New(reflectx.MakeFunc(val.Method(i).Interface())) if err != nil { - return nil, fmt.Errorf("method %v invalid: %v", m.Name, err) + return nil, errors.Wrapf(err, "method %v invalid", m.Name) } methods[m.Name] = f } return &Fn{Recv: fn, methods: methods}, nil default: - return nil, fmt.Errorf("value %v must be function or (ptr to) struct", fn) + return nil, errors.Errorf("value %v must be function or (ptr to) struct", fn) } } @@ -220,7 +220,7 @@ func AsDoFn(fn *Fn) (*DoFn, error) { } if _, ok := fn.methods[processElementName]; !ok { - return nil, fmt.Errorf("graph.AsDoFn: failed to find %v method: %v", processElementName, fn) + return nil, errors.Errorf("graph.AsDoFn: failed to find %v method: %v", processElementName, fn) } // TODO(herohde) 5/18/2017: validate the signatures, incl. consistency. @@ -296,7 +296,7 @@ func AsCombineFn(fn *Fn) (*CombineFn, error) { mergeFn, ok := fn.methods[mergeAccumulatorsName] if !ok { - return nil, fmt.Errorf("%v: failed to find required %v method on type: %v", fnKind, mergeAccumulatorsName, fn.Name()) + return nil, errors.Errorf("%v: failed to find required %v method on type: %v", fnKind, mergeAccumulatorsName, fn.Name()) } // CombineFn methods must satisfy the following: @@ -306,7 +306,7 @@ func AsCombineFn(fn *Fn) (*CombineFn, error) { // ExtractOutput func(A) (O, error?) // This means that the other signatures *must* match the type used in MergeAccumulators. if len(mergeFn.Ret) <= 0 { - return nil, fmt.Errorf("%v: %v requires at least 1 return value. : %v", fnKind, mergeAccumulatorsName, mergeFn) + return nil, errors.Errorf("%v: %v requires at least 1 return value. : %v", fnKind, mergeAccumulatorsName, mergeFn) } accumType := mergeFn.Ret[0].T @@ -359,7 +359,7 @@ func verifyValidNames(fnKind string, fn *Fn, names ...string) error { for key := range fn.methods { if !m[key] { - return fmt.Errorf("%s: unexpected exported method %v present. Valid methods are: %v", fnKind, key, names) + return errors.Errorf("%s: unexpected exported method %v present. Valid methods are: %v", fnKind, key, names) } } return nil diff --git a/sdks/go/pkg/beam/core/graph/graph.go b/sdks/go/pkg/beam/core/graph/graph.go index dd68885..fb80cba 100644 --- a/sdks/go/pkg/beam/core/graph/graph.go +++ b/sdks/go/pkg/beam/core/graph/graph.go @@ -21,6 +21,7 @@ import ( "github.com/apache/beam/sdks/go/pkg/beam/core/graph/window" "github.com/apache/beam/sdks/go/pkg/beam/core/typex" + "github.com/apache/beam/sdks/go/pkg/beam/internal/errors" ) // Graph represents an in-progress deferred execution graph and is easily @@ -87,7 +88,7 @@ func (g *Graph) Build() ([]*MultiEdge, []*Node, error) { for _, n := range g.nodes { nodes[n] = true if n.Coder == nil { - return nil, nil, fmt.Errorf("node %v in graph has undefined coder", n.id) + return nil, nil, errors.Errorf("node %v in graph has undefined coder", n.id) } } // Build a map of all nodes that are reachable by g.edges. @@ -102,12 +103,12 @@ func (g *Graph) Build() ([]*MultiEdge, []*Node, error) { } for n := range nodes { if _, ok := reachable[n]; !ok { - return nil, nil, fmt.Errorf("node %v in graph is unconnected", n.id) + return nil, nil, errors.Errorf("node %v in graph is unconnected", n.id) } } for n, e := range reachable { if _, ok := nodes[n]; !ok { - return nil, nil, fmt.Errorf("node %v is reachable by edge %v, but it's not in same graph", n.id, e.id) + return nil, nil, errors.Errorf("node %v is reachable by edge %v, but it's not in same graph", n.id, e.id) } } return g.edges, g.nodes, nil diff --git a/sdks/go/pkg/beam/core/runtime/coderx/float.go b/sdks/go/pkg/beam/core/runtime/coderx/float.go index 3179f73..b502d05 100644 --- a/sdks/go/pkg/beam/core/runtime/coderx/float.go +++ b/sdks/go/pkg/beam/core/runtime/coderx/float.go @@ -24,6 +24,7 @@ import ( "github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder" "github.com/apache/beam/sdks/go/pkg/beam/core/typex" "github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx" + "github.com/apache/beam/sdks/go/pkg/beam/internal/errors" ) func encFloat(v typex.T) []byte { @@ -43,7 +44,7 @@ func encFloat(v typex.T) []byte { func decFloat(t reflect.Type, data []byte) (typex.T, error) { uval, err := decVarUintZ(reflectx.Uint64, data) if err != nil { - return nil, fmt.Errorf("invalid float encoding for: %v", data) + return nil, errors.Errorf("invalid float encoding for: %v", data) } n := math.Float64frombits(bits.ReverseBytes64(uval.(uint64))) @@ -64,6 +65,6 @@ func NewFloat(t reflect.Type) (*coder.CustomCoder, error) { case reflect.Float32, reflect.Float64: return coder.NewCustomCoder("float", t, encFloat, decFloat) default: - return nil, fmt.Errorf("not a float type: %v", t) + return nil, errors.Errorf("not a float type: %v", t) } } diff --git a/sdks/go/pkg/beam/core/runtime/coderx/varint.go b/sdks/go/pkg/beam/core/runtime/coderx/varint.go index c0b2b79..07e4ce1 100644 --- a/sdks/go/pkg/beam/core/runtime/coderx/varint.go +++ b/sdks/go/pkg/beam/core/runtime/coderx/varint.go @@ -22,6 +22,7 @@ import ( "github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder" "github.com/apache/beam/sdks/go/pkg/beam/core/typex" + "github.com/apache/beam/sdks/go/pkg/beam/internal/errors" ) // NewVarIntZ returns a varint coder for the given integer type. It uses a zig-zag scheme, @@ -31,7 +32,7 @@ func NewVarIntZ(t reflect.Type) (*coder.CustomCoder, error) { case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: return coder.NewCustomCoder("varintz", t, encVarIntZ, decVarIntZ) default: - return nil, fmt.Errorf("not a signed integer type: %v", t) + return nil, errors.Errorf("not a signed integer type: %v", t) } } @@ -42,7 +43,7 @@ func NewVarUintZ(t reflect.Type) (*coder.CustomCoder, error) { case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64: return coder.NewCustomCoder("varuintz", t, encVarUintZ, decVarUintZ) default: - return nil, fmt.Errorf("not a unsigned integer type: %v", t) + return nil, errors.Errorf("not a unsigned integer type: %v", t) } } @@ -70,7 +71,7 @@ func encVarIntZ(v typex.T) []byte { func decVarIntZ(t reflect.Type, data []byte) (typex.T, error) { n, size := binary.Varint(data) if size <= 0 { - return nil, fmt.Errorf("invalid varintz encoding for: %v", data) + return nil, errors.Errorf("invalid varintz encoding for: %v", data) } switch t.Kind() { case reflect.Int: @@ -112,7 +113,7 @@ func encVarUintZ(v typex.T) []byte { func decVarUintZ(t reflect.Type, data []byte) (typex.T, error) { n, size := binary.Uvarint(data) if size <= 0 { - return nil, fmt.Errorf("invalid varuintz encoding for: %v", data) + return nil, errors.Errorf("invalid varuintz encoding for: %v", data) } switch t.Kind() { case reflect.Uint: diff --git a/sdks/go/pkg/beam/core/runtime/exec/coder.go b/sdks/go/pkg/beam/core/runtime/exec/coder.go index dfe2ac1..a55d7f7 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/coder.go +++ b/sdks/go/pkg/beam/core/runtime/exec/coder.go @@ -27,6 +27,7 @@ import ( "github.com/apache/beam/sdks/go/pkg/beam/core/graph/window" "github.com/apache/beam/sdks/go/pkg/beam/core/typex" "github.com/apache/beam/sdks/go/pkg/beam/core/util/ioutilx" + "github.com/apache/beam/sdks/go/pkg/beam/internal/errors" ) // NOTE(herohde) 4/30/2017: The main complication is CoGBK results, which have @@ -119,7 +120,7 @@ func (*bytesEncoder) Encode(val *FullValue, w io.Writer) error { var data []byte data, ok := val.Elm.([]byte) if !ok { - return fmt.Errorf("received unknown value type: want []byte, got %T", val.Elm) + return errors.Errorf("received unknown value type: want []byte, got %T", val.Elm) } size := len(data) diff --git a/sdks/go/pkg/beam/core/runtime/exec/cogbk.go b/sdks/go/pkg/beam/core/runtime/exec/cogbk.go index 40f5636..5f207a2 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/cogbk.go +++ b/sdks/go/pkg/beam/core/runtime/exec/cogbk.go @@ -19,6 +19,8 @@ import ( "bytes" "context" "fmt" + + "github.com/apache/beam/sdks/go/pkg/beam/internal/errors" ) // TODO(BEAM-490): This file contains support for the handling of CoGBK @@ -167,7 +169,7 @@ func (f *filterStream) Read() (*FullValue, error) { v, err := f.dec.Decode(bytes.NewReader(value)) if err != nil { - return nil, fmt.Errorf("failed to decode union value '%v' for key %v: %v", value, key, err) + return nil, errors.Wrapf(err, "failed to decode union value '%v' for key %v", value, key) } v.Timestamp = elm.Timestamp v.Windows = elm.Windows diff --git a/sdks/go/pkg/beam/core/runtime/exec/combine.go b/sdks/go/pkg/beam/core/runtime/exec/combine.go index ef8c3cd..7815ac1 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/combine.go +++ b/sdks/go/pkg/beam/core/runtime/exec/combine.go @@ -27,6 +27,7 @@ import ( "github.com/apache/beam/sdks/go/pkg/beam/core/metrics" "github.com/apache/beam/sdks/go/pkg/beam/core/typex" "github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx" + "github.com/apache/beam/sdks/go/pkg/beam/internal/errors" "github.com/apache/beam/sdks/go/pkg/beam/util/errorx" ) @@ -64,7 +65,7 @@ func (n *Combine) ID() UnitID { // Up initializes this CombineFn and runs its SetupFn() method. func (n *Combine) Up(ctx context.Context) error { if n.status != Initializing { - return fmt.Errorf("invalid status for combine %v: %v", n.UID, n.status) + return errors.Errorf("invalid status for combine %v: %v", n.UID, n.status) } n.status = Up @@ -103,7 +104,7 @@ func (n *Combine) mergeAccumulators(ctx context.Context, a, b interface{}) (inte in := &MainInput{Key: FullValue{Elm: a}} val, err := n.mergeInv.InvokeWithoutEventTime(ctx, in, b) if err != nil { - return nil, n.fail(fmt.Errorf("MergeAccumulators failed: %v", err)) + return nil, n.fail(errors.WithContext(err, "invoking MergeAccumulators")) } return val.Elm, nil } @@ -111,7 +112,7 @@ func (n *Combine) mergeAccumulators(ctx context.Context, a, b interface{}) (inte // StartBundle initializes processing this bundle for combines. func (n *Combine) StartBundle(ctx context.Context, id string, data DataContext) error { if n.status != Up { - return fmt.Errorf("invalid status for combine %v: %v", n.UID, n.status) + return errors.Errorf("invalid status for combine %v: %v", n.UID, n.status) } n.status = Active @@ -130,7 +131,7 @@ func (n *Combine) StartBundle(ctx context.Context, id string, data DataContext) // AddInput, MergeAccumulators, and ExtractOutput functions. func (n *Combine) ProcessElement(ctx context.Context, value *FullValue, values ...ReStream) error { if n.status != Active { - return fmt.Errorf("invalid status for combine %v: %v", n.UID, n.status) + return errors.Errorf("invalid status for combine %v: %v", n.UID, n.status) } // Note that we do not explicitly call merge, although it may @@ -173,7 +174,7 @@ func (n *Combine) ProcessElement(ctx context.Context, value *FullValue, values . // FinishBundle completes this node's processing of a bundle. func (n *Combine) FinishBundle(ctx context.Context) error { if n.status != Active { - return fmt.Errorf("invalid status for combine %v: %v", n.UID, n.status) + return errors.Errorf("invalid status for combine %v: %v", n.UID, n.status) } n.status = Up if n.createAccumInv != nil { @@ -221,7 +222,7 @@ func (n *Combine) newAccum(ctx context.Context, key interface{}) (interface{}, e val, err := n.createAccumInv.InvokeWithoutEventTime(ctx, opt) if err != nil { - return nil, fmt.Errorf("CreateAccumulator failed: %v", err) + return nil, n.fail(errors.WithContext(err, "invoking CreateAccumulator")) } return val.Elm, nil } @@ -264,7 +265,7 @@ func (n *Combine) addInput(ctx context.Context, accum, key, value interface{}, t val, err := n.addInputInv.InvokeWithoutEventTime(ctx, opt, v) if err != nil { - return nil, n.fail(fmt.Errorf("AddInput failed: %v", err)) + return nil, n.fail(errors.WithContext(err, "invoking AddInput")) } return val.Elm, err } @@ -278,7 +279,7 @@ func (n *Combine) extract(ctx context.Context, accum interface{}) (interface{}, val, err := n.extractOutputInv.InvokeWithoutEventTime(ctx, nil, accum) if err != nil { - return nil, n.fail(fmt.Errorf("ExtractOutput failed: %v", err)) + return nil, n.fail(errors.WithContext(err, "invoking ExtractOutput")) } return val.Elm, err } @@ -335,7 +336,7 @@ func (n *LiftedCombine) StartBundle(ctx context.Context, id string, data DataCon // policy is used. func (n *LiftedCombine) ProcessElement(ctx context.Context, value *FullValue, values ...ReStream) error { if n.status != Active { - return fmt.Errorf("invalid status for precombine %v: %v", n.UID, n.status) + return errors.Errorf("invalid status for precombine %v: %v", n.UID, n.status) } key, err := n.keyHash.Hash(value.Elm) @@ -435,7 +436,7 @@ func (n *MergeAccumulators) String() string { // runs the MergeAccumulatorsFn over them repeatedly. func (n *MergeAccumulators) ProcessElement(ctx context.Context, value *FullValue, values ...ReStream) error { if n.status != Active { - return fmt.Errorf("invalid status for combine merge %v: %v", n.UID, n.status) + return errors.Errorf("invalid status for combine merge %v: %v", n.UID, n.status) } a, err := n.newAccum(n.Combine.ctx, value.Elm) if err != nil { @@ -490,7 +491,7 @@ func (n *ExtractOutput) String() string { // ProcessElement accepts an accumulator value, and extracts the final return type from it. func (n *ExtractOutput) ProcessElement(ctx context.Context, value *FullValue, values ...ReStream) error { if n.status != Active { - return fmt.Errorf("invalid status for combine extract %v: %v", n.UID, n.status) + return errors.Errorf("invalid status for combine extract %v: %v", n.UID, n.status) } out, err := n.extract(n.Combine.ctx, value.Elm2) if err != nil { diff --git a/sdks/go/pkg/beam/core/runtime/exec/combine_test.go b/sdks/go/pkg/beam/core/runtime/exec/combine_test.go index c206fc0..c60ab52 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/combine_test.go +++ b/sdks/go/pkg/beam/core/runtime/exec/combine_test.go @@ -30,6 +30,7 @@ import ( "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/coderx" "github.com/apache/beam/sdks/go/pkg/beam/core/typex" "github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx" + "github.com/apache/beam/sdks/go/pkg/beam/internal/errors" ) var intInput = []interface{}{int(1), int(2), int(3), int(4), int(5), int(6)} @@ -315,7 +316,7 @@ func (*MyErrorCombine) MergeAccumulators(a, b int64) (int64, error) { func intCoder(t reflect.Type) *coder.Coder { c, err := coderx.NewVarIntZ(t) if err != nil { - panic(fmt.Sprintf("Couldn't get VarInt coder for %v: %v", t, err)) + panic(errors.Wrapf(err, "Couldn't get VarInt coder for %v", t)) } return &coder.Coder{Kind: coder.Custom, T: typex.New(t), Custom: c} } diff --git a/sdks/go/pkg/beam/core/runtime/exec/datasink.go b/sdks/go/pkg/beam/core/runtime/exec/datasink.go index 66eb380..e11c47a 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/datasink.go +++ b/sdks/go/pkg/beam/core/runtime/exec/datasink.go @@ -24,6 +24,7 @@ import ( "time" "github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder" + "github.com/apache/beam/sdks/go/pkg/beam/internal/errors" "github.com/apache/beam/sdks/go/pkg/beam/log" ) @@ -71,7 +72,7 @@ func (n *DataSink) ProcessElement(ctx context.Context, value *FullValue, values return err } if err := n.enc.Encode(value, &b); err != nil { - return fmt.Errorf("failed to encode element %v with coder %v: %v", value, n.enc, err) + return errors.WithContextf(err, "encoding element %v with coder %v", value, n.enc) } if _, err := n.w.Write(b.Bytes()); err != nil { return err diff --git a/sdks/go/pkg/beam/core/runtime/exec/datasource.go b/sdks/go/pkg/beam/core/runtime/exec/datasource.go index a6e141c..5d40b4a 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/datasource.go +++ b/sdks/go/pkg/beam/core/runtime/exec/datasource.go @@ -23,6 +23,7 @@ import ( "time" "github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder" + "github.com/apache/beam/sdks/go/pkg/beam/internal/errors" "github.com/apache/beam/sdks/go/pkg/beam/log" ) @@ -74,14 +75,14 @@ func (n *DataSource) Process(ctx context.Context) error { if err == io.EOF { return nil } - return fmt.Errorf("source failed: %v", err) + return errors.Wrap(err, "source failed") } // Decode key key, err := ck.Decode(r) if err != nil { - return fmt.Errorf("source decode failed: %v", err) + return errors.Wrap(err, "source decode failed") } key.Timestamp = t key.Windows = ws @@ -94,7 +95,7 @@ func (n *DataSource) Process(ctx context.Context) error { size, err := coder.DecodeInt32(r) if err != nil { - return fmt.Errorf("stream size decoding failed: %v", err) + return errors.Wrap(err, "stream size decoding failed") } if size > -1 { @@ -106,7 +107,7 @@ func (n *DataSource) Process(ctx context.Context) error { for i := int32(0); i < size; i++ { value, err := cv.Decode(r) if err != nil { - return fmt.Errorf("stream value decode failed: %v", err) + return errors.Wrap(err, "stream value decode failed") } buf = append(buf, *value) } @@ -116,7 +117,7 @@ func (n *DataSource) Process(ctx context.Context) error { for { chunk, err := coder.DecodeVarUint64(r) if err != nil { - return fmt.Errorf("stream chunk size decoding failed: %v", err) + return errors.Wrap(err, "stream chunk size decoding failed") } // log.Printf("Chunk size=%v", chunk) @@ -129,7 +130,7 @@ func (n *DataSource) Process(ctx context.Context) error { for i := uint64(0); i < chunk; i++ { value, err := cv.Decode(r) if err != nil { - return fmt.Errorf("stream value decode failed: %v", err) + return errors.Wrap(err, "stream value decode failed") } buf = append(buf, *value) } @@ -152,12 +153,12 @@ func (n *DataSource) Process(ctx context.Context) error { if err == io.EOF { return nil } - return fmt.Errorf("source failed: %v", err) + return errors.Wrap(err, "source failed") } elm, err := ec.Decode(r) if err != nil { - return fmt.Errorf("source decode failed: %v", err) + return errors.Wrap(err, "source decode failed") } elm.Timestamp = t elm.Windows = ws diff --git a/sdks/go/pkg/beam/core/runtime/exec/fn.go b/sdks/go/pkg/beam/core/runtime/exec/fn.go index 0b13744..b6c479c 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/fn.go +++ b/sdks/go/pkg/beam/core/runtime/exec/fn.go @@ -25,6 +25,7 @@ import ( "github.com/apache/beam/sdks/go/pkg/beam/core/graph/mtime" "github.com/apache/beam/sdks/go/pkg/beam/core/graph/window" "github.com/apache/beam/sdks/go/pkg/beam/core/typex" + "github.com/apache/beam/sdks/go/pkg/beam/internal/errors" ) //go:generate specialize --input=fn_arity.tmpl @@ -127,7 +128,7 @@ func (n *invoker) Invoke(ctx context.Context, ws []typex.Window, ts typex.EventT } if n.wndIdx >= 0 { if len(ws) != 1 { - return nil, fmt.Errorf("DoFns that observe windows must be invoked with single window: %v", opt.Key.Windows) + return nil, errors.Errorf("DoFns that observe windows must be invoked with single window: %v", opt.Key.Windows) } args[n.wndIdx] = ws[0] } @@ -157,7 +158,7 @@ func (n *invoker) Invoke(ctx context.Context, ws []typex.Window, ts typex.EventT param := fn.Param[in[i]] if param.Kind != funcx.FnIter { - return nil, fmt.Errorf("GBK/CoGBK result values must be iterable: %v", param) + return nil, errors.Errorf("GBK/CoGBK result values must be iterable: %v", param) } // TODO(herohde) 12/12/2017: allow form conversion on GBK results? @@ -250,11 +251,11 @@ func makeSideInputs(fn *funcx.Fn, in []*graph.Inbound, side []ReStream) ([]Reusa } if len(in) != len(side)+1 { - return nil, fmt.Errorf("found %v inbound, want %v", len(in), len(side)+1) + return nil, errors.Errorf("found %v inbound, want %v", len(in), len(side)+1) } param := fn.Params(funcx.FnValue | funcx.FnIter | funcx.FnReIter) if len(param) <= len(side) { - return nil, fmt.Errorf("found %v params, want >%v", len(param), len(side)) + return nil, errors.Errorf("found %v params, want >%v", len(param), len(side)) } // The side input are last of the above params, so we can compute the offset easily. @@ -264,7 +265,7 @@ func makeSideInputs(fn *funcx.Fn, in []*graph.Inbound, side []ReStream) ([]Reusa for i := 0; i < len(side); i++ { s, err := makeSideInput(in[i+1].Kind, fn.Param[param[i+offset]].T, side[i]) if err != nil { - return nil, fmt.Errorf("failed to make side input %v: %v", i, err) + return nil, errors.WithContextf(err, "making side input %v", i) } ret = append(ret, s) } @@ -282,7 +283,7 @@ func makeEmitters(fn *funcx.Fn, nodes []Node) ([]ReusableEmitter, error) { } out := fn.Params(funcx.FnEmit) if len(out) != len(nodes)-offset { - return nil, fmt.Errorf("found %v emitters, want %v", len(out), len(nodes)-offset) + return nil, errors.Errorf("found %v emitters, want %v", len(out), len(nodes)-offset) } var ret []ReusableEmitter @@ -302,7 +303,7 @@ func makeSideInput(kind graph.InputKind, t reflect.Type, values ReStream) (Reusa return nil, err } if len(elms) != 1 { - return nil, fmt.Errorf("singleton side input %v for %v ill-defined", kind, t) + return nil, errors.Errorf("singleton side input %v for %v ill-defined", kind, t) } return &fixedValue{val: Convert(elms[0].Elm, t)}, nil diff --git a/sdks/go/pkg/beam/core/runtime/exec/input.go b/sdks/go/pkg/beam/core/runtime/exec/input.go index c6ddb25..74851c7 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/input.go +++ b/sdks/go/pkg/beam/core/runtime/exec/input.go @@ -23,6 +23,7 @@ import ( "github.com/apache/beam/sdks/go/pkg/beam/core/funcx" "github.com/apache/beam/sdks/go/pkg/beam/core/typex" + "github.com/apache/beam/sdks/go/pkg/beam/internal/errors" ) // TODO(herohde) 4/26/2017: SideInput representation? We want it to be amenable @@ -158,7 +159,7 @@ func (v *iterValue) invoke(args []reflect.Value) []reflect.Value { if err == io.EOF { return []reflect.Value{reflect.ValueOf(false)} } - panic(fmt.Sprintf("broken stream: %v", err)) + panic(errors.Wrap(err, "broken stream")) } // We expect 1-3 out parameters: func (*int, *string) bool. diff --git a/sdks/go/pkg/beam/core/runtime/exec/pardo.go b/sdks/go/pkg/beam/core/runtime/exec/pardo.go index 49f74c5..612e7ff 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/pardo.go +++ b/sdks/go/pkg/beam/core/runtime/exec/pardo.go @@ -26,6 +26,7 @@ import ( "github.com/apache/beam/sdks/go/pkg/beam/core/graph/window" "github.com/apache/beam/sdks/go/pkg/beam/core/metrics" "github.com/apache/beam/sdks/go/pkg/beam/core/typex" + "github.com/apache/beam/sdks/go/pkg/beam/internal/errors" "github.com/apache/beam/sdks/go/pkg/beam/util/errorx" ) @@ -69,7 +70,7 @@ func (n *ParDo) ID() UnitID { // Up initializes this ParDo and does one-time DoFn setup. func (n *ParDo) Up(ctx context.Context) error { if n.status != Initializing { - return fmt.Errorf("invalid status for pardo %v: %v, want Initializing", n.UID, n.status) + return errors.Errorf("invalid status for pardo %v: %v, want Initializing", n.UID, n.status) } n.status = Up n.inv = newInvoker(n.Fn.ProcessElementFn()) @@ -89,7 +90,7 @@ func (n *ParDo) Up(ctx context.Context) error { // StartBundle does pre-bundle processing operation for the DoFn. func (n *ParDo) StartBundle(ctx context.Context, id string, data DataContext) error { if n.status != Up { - return fmt.Errorf("invalid status for pardo %v: %v, want Up", n.UID, n.status) + return errors.Errorf("invalid status for pardo %v: %v, want Up", n.UID, n.status) } n.status = Active n.side = data.SideInput @@ -113,7 +114,7 @@ func (n *ParDo) StartBundle(ctx context.Context, id string, data DataContext) er // ProcessElement processes each parallel element with the DoFn. func (n *ParDo) ProcessElement(ctx context.Context, elm *FullValue, values ...ReStream) error { if n.status != Active { - return fmt.Errorf("invalid status for pardo %v: %v, want Active", n.UID, n.status) + return errors.Errorf("invalid status for pardo %v: %v, want Active", n.UID, n.status) } // If the function observes windows, we must invoke it for each window. The expected fast path // is that either there is a single window or the function doesn't observes windows. @@ -162,7 +163,7 @@ func mustExplodeWindows(fn *funcx.Fn, elm *FullValue, usesSideInput bool) bool { // persisted at this point. func (n *ParDo) FinishBundle(ctx context.Context) error { if n.status != Active { - return fmt.Errorf("invalid status for pardo %v: %v, want Active", n.UID, n.status) + return errors.Errorf("invalid status for pardo %v: %v, want Active", n.UID, n.status) } n.status = Up n.inv.Reset() diff --git a/sdks/go/pkg/beam/core/runtime/exec/plan.go b/sdks/go/pkg/beam/core/runtime/exec/plan.go index 2436f08..5031c3e 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/plan.go +++ b/sdks/go/pkg/beam/core/runtime/exec/plan.go @@ -23,6 +23,7 @@ import ( "strings" "github.com/apache/beam/sdks/go/pkg/beam/core/metrics" + "github.com/apache/beam/sdks/go/pkg/beam/internal/errors" fnpb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1" ) @@ -55,7 +56,7 @@ func NewPlan(id string, units []Unit) (*Plan, error) { for _, u := range units { if u == nil { - return nil, fmt.Errorf("no <nil> units") + return nil, errors.Errorf("no <nil> units") } if r, ok := u.(Root); ok { roots = append(roots, r) @@ -68,7 +69,7 @@ func NewPlan(id string, units []Unit) (*Plan, error) { } } if len(roots) == 0 { - return nil, fmt.Errorf("no root units") + return nil, errors.Errorf("no root units") } return &Plan{ @@ -102,7 +103,7 @@ func (p *Plan) Execute(ctx context.Context, id string, manager DataContext) erro } if p.status != Up { - return fmt.Errorf("invalid status for plan %v: %v", p.id, p.status) + return errors.Errorf("invalid status for plan %v: %v", p.id, p.status) } // Process bundle. If there are any kinds of failures, we bail and mark the plan broken. @@ -149,9 +150,9 @@ func (p *Plan) Down(ctx context.Context) error { case 0: return nil case 1: - return fmt.Errorf("plan %v failed: %v", p.id, errs[0]) + return errors.Wrapf(errs[0], "plan %v failed", p.id) default: - return fmt.Errorf("plan %v failed with multiple errors: %v", p.id, errs) + return errors.Errorf("plan %v failed with multiple errors: %v", p.id, errs) } } diff --git a/sdks/go/pkg/beam/core/runtime/exec/translate.go b/sdks/go/pkg/beam/core/runtime/exec/translate.go index df69a9a..e3981d3 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/translate.go +++ b/sdks/go/pkg/beam/core/runtime/exec/translate.go @@ -29,6 +29,7 @@ import ( "github.com/apache/beam/sdks/go/pkg/beam/core/typex" "github.com/apache/beam/sdks/go/pkg/beam/core/util/protox" "github.com/apache/beam/sdks/go/pkg/beam/core/util/stringx" + "github.com/apache/beam/sdks/go/pkg/beam/internal/errors" fnpb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1" pb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1" "github.com/golang/protobuf/proto" @@ -55,7 +56,7 @@ func UnmarshalPlan(desc *fnpb.ProcessBundleDescriptor) (*Plan, error) { continue } if len(transform.GetOutputs()) != 1 { - return nil, fmt.Errorf("expected one output from DataSource, got %v", transform.GetOutputs()) + return nil, errors.Errorf("expected one output from DataSource, got %v", transform.GetOutputs()) } port, cid, err := unmarshalPort(transform.GetSpec().GetPayload()) @@ -85,7 +86,7 @@ func UnmarshalPlan(desc *fnpb.ProcessBundleDescriptor) (*Plan, error) { return nil, err } if !coder.IsW(u.Coder) { - return nil, fmt.Errorf("unwindowed coder %v on DataSource %v: %v", cid, id, u.Coder) + return nil, errors.Errorf("unwindowed coder %v on DataSource %v: %v", cid, id, u.Coder) } } } @@ -165,7 +166,7 @@ func (b *builder) makeWindowingStrategy(id string) (*window.WindowingStrategy, e ws, ok := b.desc.GetWindowingStrategies()[id] if !ok { - return nil, fmt.Errorf("windowing strategy %v not found", id) + return nil, errors.Errorf("windowing strategy %v not found", id) } wfn, err := unmarshalWindowFn(ws.GetWindowFn().GetSpec()) if err != nil { @@ -219,7 +220,7 @@ func unmarshalWindowFn(wfn *pb.FunctionSpec) (*window.Fn, error) { return window.NewSessions(gap), nil default: - return nil, fmt.Errorf("unsupported window type: %v", urn) + return nil, errors.Errorf("unsupported window type: %v", urn) } } @@ -238,7 +239,7 @@ func (b *builder) makePCollections(out []string) ([]Node, error) { func (b *builder) makeCoderForPCollection(id string) (*coder.Coder, *coder.WindowCoder, error) { col, ok := b.desc.GetPcollections()[id] if !ok { - return nil, nil, fmt.Errorf("pcollection %v not found", id) + return nil, nil, errors.Errorf("pcollection %v not found", id) } c, err := b.coders.Coder(col.CoderId) if err != nil { @@ -254,7 +255,7 @@ func (b *builder) makeCoderForPCollection(id string) (*coder.Coder, *coder.Windo ws, ok := b.desc.GetWindowingStrategies()[col.GetWindowingStrategyId()] if !ok { - return nil, nil, fmt.Errorf("windowing strategy %v not found", id) + return nil, nil, errors.Errorf("windowing strategy %v not found", id) } wc, err := b.coders.WindowCoder(ws.GetWindowCoderId()) if err != nil { @@ -342,13 +343,13 @@ func (b *builder) makeLink(from string, id linkID) (Node, error) { case graphx.URNParDo: var pardo pb.ParDoPayload if err := proto.Unmarshal(payload, &pardo); err != nil { - return nil, fmt.Errorf("invalid ParDo payload for %v: %v", transform, err) + return nil, errors.Wrapf(err, "invalid ParDo payload for %v", transform) } data = string(pardo.GetDoFn().GetSpec().GetPayload()) case urnPerKeyCombinePre, urnPerKeyCombineMerge, urnPerKeyCombineExtract: var cmb pb.CombinePayload if err := proto.Unmarshal(payload, &cmb); err != nil { - return nil, fmt.Errorf("invalid CombinePayload payload for %v: %v", transform, err) + return nil, errors.Wrapf(err, "invalid CombinePayload payload for %v", transform) } data = string(cmb.GetCombineFn().GetSpec().GetPayload()) default: @@ -363,7 +364,7 @@ func (b *builder) makeLink(from string, id linkID) (Node, error) { var tp v1.TransformPayload if err := protox.DecodeBase64(data, &tp); err != nil { - return nil, fmt.Errorf("invalid transform payload for %v: %v", transform, err) + return nil, errors.Wrapf(err, "invalid transform payload for %v", transform) } switch tpUrn := tp.GetUrn(); tpUrn { @@ -422,14 +423,14 @@ func (b *builder) makeLink(from string, id linkID) (Node, error) { case urnPerKeyCombinePre: inputs := unmarshalKeyedValues(transform.GetInputs()) if len(inputs) != 1 { - return nil, fmt.Errorf("unexpected sideinput to combine: got %d, want 1", len(inputs)) + return nil, errors.Errorf("unexpected sideinput to combine: got %d, want 1", len(inputs)) } ec, _, err := b.makeCoderForPCollection(inputs[0]) if err != nil { return nil, err } if !coder.IsKV(ec) { - return nil, fmt.Errorf("unexpected non-KV coder PCollection input to combine: %v", ec) + return nil, errors.Errorf("unexpected non-KV coder PCollection input to combine: %v", ec) } u = &LiftedCombine{Combine: cn, KeyCoder: ec.Components[0]} case urnPerKeyCombineMerge: @@ -452,7 +453,7 @@ func (b *builder) makeLink(from string, id linkID) (Node, error) { return nil, err } if !coder.IsKV(c) { - return nil, fmt.Errorf("unexpected inject coder: %v", c) + return nil, errors.Errorf("unexpected inject coder: %v", c) } u = &Inject{UID: b.idgen.New(), N: (int)(tp.Inject.N), ValueEncoder: MakeElementEncoder(c.Components[1]), Out: out[0]} @@ -466,7 +467,7 @@ func (b *builder) makeLink(from string, id linkID) (Node, error) { return nil, err } if !coder.IsCoGBK(c) { - return nil, fmt.Errorf("unexpected expand coder: %v", c) + return nil, errors.Errorf("unexpected expand coder: %v", c) } var decoders []ElementDecoder @@ -476,13 +477,13 @@ func (b *builder) makeLink(from string, id linkID) (Node, error) { u = &Expand{UID: b.idgen.New(), ValueDecoders: decoders, Out: out[0]} default: - return nil, fmt.Errorf("unexpected payload: %v", tp) + return nil, errors.Errorf("unexpected payload: %v", tp) } case graphx.URNWindow: var wp pb.WindowIntoPayload if err := proto.Unmarshal(payload, &wp); err != nil { - return nil, fmt.Errorf("invalid WindowInto payload for %v: %v", transform, err) + return nil, errors.Wrapf(err, "invalid WindowInto payload for %v", transform) } wfn, err := unmarshalWindowFn(wp.GetWindowFn().GetSpec()) if err != nil { @@ -516,7 +517,7 @@ func (b *builder) makeLink(from string, id linkID) (Node, error) { return nil, err } if !coder.IsW(sink.Coder) { - return nil, fmt.Errorf("unwindowed coder %v on DataSink %v: %v", cid, id, sink.Coder) + return nil, errors.Errorf("unwindowed coder %v on DataSink %v: %v", cid, id, sink.Coder) } } } diff --git a/sdks/go/pkg/beam/core/runtime/exec/unit_test.go b/sdks/go/pkg/beam/core/runtime/exec/unit_test.go index aa0934a..0d17746 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/unit_test.go +++ b/sdks/go/pkg/beam/core/runtime/exec/unit_test.go @@ -17,9 +17,9 @@ package exec import ( "context" - "fmt" "github.com/apache/beam/sdks/go/pkg/beam/core/typex" + "github.com/apache/beam/sdks/go/pkg/beam/internal/errors" ) // CaptureNode is a test Node that captures all elements for verification. It also @@ -37,7 +37,7 @@ func (n *CaptureNode) ID() UnitID { func (n *CaptureNode) Up(ctx context.Context) error { if n.status != Initializing { - return fmt.Errorf("invalid status for %v: %v, want Initializing", n.UID, n.status) + return errors.Errorf("invalid status for %v: %v, want Initializing", n.UID, n.status) } n.status = Up return nil @@ -45,7 +45,7 @@ func (n *CaptureNode) Up(ctx context.Context) error { func (n *CaptureNode) StartBundle(ctx context.Context, id string, data DataContext) error { if n.status != Up { - return fmt.Errorf("invalid status for %v: %v, want Up", n.UID, n.status) + return errors.Errorf("invalid status for %v: %v, want Up", n.UID, n.status) } n.status = Active return nil @@ -53,7 +53,7 @@ func (n *CaptureNode) StartBundle(ctx context.Context, id string, data DataConte func (n *CaptureNode) ProcessElement(ctx context.Context, elm *FullValue, values ...ReStream) error { if n.status != Active { - return fmt.Errorf("invalid status for pardo %v: %v, want Active", n.UID, n.status) + return errors.Errorf("invalid status for pardo %v: %v, want Active", n.UID, n.status) } n.Elements = append(n.Elements, *elm) @@ -62,7 +62,7 @@ func (n *CaptureNode) ProcessElement(ctx context.Context, elm *FullValue, values func (n *CaptureNode) FinishBundle(ctx context.Context) error { if n.status != Active { - return fmt.Errorf("invalid status for %v: %v, want Active", n.UID, n.status) + return errors.Errorf("invalid status for %v: %v, want Active", n.UID, n.status) } n.status = Up return nil @@ -70,7 +70,7 @@ func (n *CaptureNode) FinishBundle(ctx context.Context) error { func (n *CaptureNode) Down(ctx context.Context) error { if n.status != Up { - return fmt.Errorf("invalid status for %v: %v, want Up", n.UID, n.status) + return errors.Errorf("invalid status for %v: %v, want Up", n.UID, n.status) } n.status = Down return nil diff --git a/sdks/go/pkg/beam/core/runtime/exec/util.go b/sdks/go/pkg/beam/core/runtime/exec/util.go index 1fdfd76..16682b9 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/util.go +++ b/sdks/go/pkg/beam/core/runtime/exec/util.go @@ -17,8 +17,9 @@ package exec import ( "context" - "fmt" "runtime/debug" + + "github.com/apache/beam/sdks/go/pkg/beam/internal/errors" ) // GenID is a simple UnitID generator. @@ -36,7 +37,7 @@ func (g *GenID) New() UnitID { func callNoPanic(ctx context.Context, fn func(context.Context) error) (err error) { defer func() { if r := recover(); r != nil { - err = fmt.Errorf("panic: %v %s", r, debug.Stack()) + err = errors.Errorf("panic: %v %s", r, debug.Stack()) } }() return fn(ctx) diff --git a/sdks/go/pkg/beam/core/runtime/graphx/coder.go b/sdks/go/pkg/beam/core/runtime/graphx/coder.go index 43458f9..50e4591 100644 --- a/sdks/go/pkg/beam/core/runtime/graphx/coder.go +++ b/sdks/go/pkg/beam/core/runtime/graphx/coder.go @@ -23,6 +23,7 @@ import ( "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx/v1" "github.com/apache/beam/sdks/go/pkg/beam/core/typex" "github.com/apache/beam/sdks/go/pkg/beam/core/util/protox" + "github.com/apache/beam/sdks/go/pkg/beam/internal/errors" pb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1" "github.com/golang/protobuf/proto" ) @@ -61,7 +62,7 @@ func UnmarshalCoders(ids []string, m map[string]*pb.Coder) ([]*coder.Coder, erro for _, id := range ids { c, err := b.Coder(id) if err != nil { - return nil, fmt.Errorf("failed to unmarshal coder %v: %v", id, err) + return nil, err } coders = append(coders, c) } @@ -105,12 +106,13 @@ func (b *CoderUnmarshaller) Coder(id string) (*coder.Coder, error) { } c, ok := b.models[id] if !ok { - return nil, fmt.Errorf("coder with id %v not found", id) + err := errors.Errorf("coder with id %v not found", id) + return nil, errors.WithContextf(err, "unmarshalling coder %v", id) } ret, err := b.makeCoder(c) if err != nil { - return nil, fmt.Errorf("failed to unmarshal coder %v: %v", id, err) + return nil, errors.WithContextf(err, "unmarshalling coder %v", id) } b.coders[id] = ret @@ -157,7 +159,7 @@ func (b *CoderUnmarshaller) makeCoder(c *pb.Coder) (*coder.Coder, error) { case urnKVCoder: if len(components) != 2 { - return nil, fmt.Errorf("could not unmarshal KV coder from %v, want exactly 2 components but have %d", c, len(components)) + return nil, errors.Errorf("could not unmarshal KV coder from %v, want exactly 2 components but have %d", c, len(components)) } key, err := b.Coder(components[0]) @@ -206,7 +208,7 @@ func (b *CoderUnmarshaller) makeCoder(c *pb.Coder) (*coder.Coder, error) { case urnLengthPrefixCoder: if len(components) != 1 { - return nil, fmt.Errorf("could not unmarshal length prefix coder from %v, want a single sub component but have %d", c, len(components)) + return nil, errors.Errorf("could not unmarshal length prefix coder from %v, want a single sub component but have %d", c, len(components)) } elm, err := b.peek(components[0]) @@ -217,7 +219,7 @@ func (b *CoderUnmarshaller) makeCoder(c *pb.Coder) (*coder.Coder, error) { // the portable pipeline model directly (BEAM-2885) if elm.GetSpec().GetSpec().GetUrn() != "" && elm.GetSpec().GetSpec().GetUrn() != urnCustomCoder { // TODO(herohde) 11/17/2017: revisit this restriction - return nil, fmt.Errorf("could not unmarshal length prefix coder from %v, want a custom coder as a sub component but got %v", c, elm) + return nil, errors.Errorf("could not unmarshal length prefix coder from %v, want a custom coder as a sub component but got %v", c, elm) } var ref v1.CustomCoder @@ -233,7 +235,7 @@ func (b *CoderUnmarshaller) makeCoder(c *pb.Coder) (*coder.Coder, error) { case urnWindowedValueCoder: if len(components) != 2 { - return nil, fmt.Errorf("could not unmarshal windowed value coder from %v, expected two components but got %d", c, len(components)) + return nil, errors.Errorf("could not unmarshal windowed value coder from %v, expected two components but got %d", c, len(components)) } elm, err := b.Coder(components[0]) @@ -248,7 +250,7 @@ func (b *CoderUnmarshaller) makeCoder(c *pb.Coder) (*coder.Coder, error) { return &coder.Coder{Kind: coder.WindowedValue, T: t, Components: []*coder.Coder{elm}, Window: w}, nil case streamType: - return nil, fmt.Errorf("could not unmarshal stream type coder from %v, stream must be pair value", c) + return nil, errors.Errorf("could not unmarshal stream type coder from %v, stream must be pair value", c) case "": // TODO(herohde) 11/27/2017: we still see CoderRefs from Dataflow. Handle that @@ -258,23 +260,23 @@ func (b *CoderUnmarshaller) makeCoder(c *pb.Coder) (*coder.Coder, error) { var ref CoderRef if err := json.Unmarshal(payload, &ref); err != nil { - return nil, fmt.Errorf("could not unmarshal CoderRef from %v, failed to decode urn-less coder's payload \"%v\": %v", c, string(payload), err) + return nil, errors.Wrapf(err, "could not unmarshal CoderRef from %v, failed to decode urn-less coder's payload \"%v\"", c, string(payload)) } c, err := DecodeCoderRef(&ref) if err != nil { - return nil, fmt.Errorf("could not unmarshal CoderRef from %v, failed to decode CoderRef \"%v\": %v", c, string(payload), err) + return nil, errors.Wrapf(err, "could not unmarshal CoderRef from %v, failed to decode CoderRef \"%v\"", c, string(payload)) } return c, nil default: - return nil, fmt.Errorf("could not unmarshal coder from %v, unknown URN %v", c, urn) + return nil, errors.Errorf("could not unmarshal coder from %v, unknown URN %v", c, urn) } } func (b *CoderUnmarshaller) peek(id string) (*pb.Coder, error) { c, ok := b.models[id] if !ok { - return nil, fmt.Errorf("coder with id %v not found", id) + return nil, errors.Errorf("coder with id %v not found", id) } return c, nil } @@ -326,7 +328,7 @@ func (b *CoderMarshaller) Add(c *coder.Coder) string { } data, err := protox.EncodeBase64(ref) if err != nil { - panic(fmt.Sprintf("Failed to marshal custom coder %v: %v", c, err)) + panic(errors.Wrapf(err, "Failed to marshal custom coder %v", c)) } inner := b.internCoder(&pb.Coder{ Spec: &pb.SdkFunctionSpec{ diff --git a/sdks/go/pkg/beam/core/runtime/graphx/dataflow.go b/sdks/go/pkg/beam/core/runtime/graphx/dataflow.go index 52d5f6a..fd29f40 100644 --- a/sdks/go/pkg/beam/core/runtime/graphx/dataflow.go +++ b/sdks/go/pkg/beam/core/runtime/graphx/dataflow.go @@ -16,12 +16,11 @@ package graphx import ( - "fmt" - "github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder" "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx/v1" "github.com/apache/beam/sdks/go/pkg/beam/core/typex" "github.com/apache/beam/sdks/go/pkg/beam/core/util/protox" + "github.com/apache/beam/sdks/go/pkg/beam/internal/errors" ) // TODO(herohde) 7/17/2018: move CoderRef to dataflowlib once Dataflow @@ -95,7 +94,7 @@ func EncodeCoderRef(c *coder.Coder) (*CoderRef, error) { case coder.KV: if len(c.Components) != 2 { - return nil, fmt.Errorf("bad KV: %v", c) + return nil, errors.Errorf("bad KV: %v", c) } key, err := EncodeCoderRef(c.Components[0]) @@ -110,7 +109,7 @@ func EncodeCoderRef(c *coder.Coder) (*CoderRef, error) { case coder.CoGBK: if len(c.Components) < 2 { - return nil, fmt.Errorf("bad CoGBK: %v", c) + return nil, errors.Errorf("bad CoGBK: %v", c) } refs, err := EncodeCoderRefs(c.Components) @@ -131,7 +130,7 @@ func EncodeCoderRef(c *coder.Coder) (*CoderRef, error) { case coder.WindowedValue: if len(c.Components) != 1 || c.Window == nil { - return nil, fmt.Errorf("bad windowed value: %v", c) + return nil, errors.Errorf("bad windowed value: %v", c) } elm, err := EncodeCoderRef(c.Components[0]) @@ -152,7 +151,7 @@ func EncodeCoderRef(c *coder.Coder) (*CoderRef, error) { return &CoderRef{Type: varIntType}, nil default: - return nil, fmt.Errorf("bad coder kind: %v", c.Kind) + return nil, errors.Errorf("bad coder kind: %v", c.Kind) } } @@ -180,7 +179,7 @@ func DecodeCoderRef(c *CoderRef) (*coder.Coder, error) { case pairType: if len(c.Components) != 2 { - return nil, fmt.Errorf("bad pair: %+v", c) + return nil, errors.Errorf("bad pair: %+v", c) } key, err := DecodeCoderRef(c.Components[0]) @@ -223,12 +222,12 @@ func DecodeCoderRef(c *CoderRef) (*coder.Coder, error) { case lengthPrefixType: if len(c.Components) != 1 { - return nil, fmt.Errorf("bad length prefix: %+v", c) + return nil, errors.Errorf("bad length prefix: %+v", c) } var ref v1.CustomCoder if err := protox.DecodeBase64(c.Components[0].Type, &ref); err != nil { - return nil, fmt.Errorf("base64 decode for %v failed: %v", c.Components[0].Type, err) + return nil, errors.Wrapf(err, "base64 decode for %v failed", c.Components[0].Type) } custom, err := decodeCustomCoder(&ref) if err != nil { @@ -239,7 +238,7 @@ func DecodeCoderRef(c *CoderRef) (*coder.Coder, error) { case windowedValueType: if len(c.Components) != 2 { - return nil, fmt.Errorf("bad windowed value: %+v", c) + return nil, errors.Errorf("bad windowed value: %+v", c) } elm, err := DecodeCoderRef(c.Components[0]) @@ -255,10 +254,10 @@ func DecodeCoderRef(c *CoderRef) (*coder.Coder, error) { return &coder.Coder{Kind: coder.WindowedValue, T: t, Components: []*coder.Coder{elm}, Window: w}, nil case streamType: - return nil, fmt.Errorf("stream must be pair value: %+v", c) + return nil, errors.Errorf("stream must be pair value: %+v", c) default: - return nil, fmt.Errorf("custom coders must be length prefixed: %+v", c) + return nil, errors.Errorf("custom coders must be length prefixed: %+v", c) } } @@ -283,7 +282,7 @@ func encodeWindowCoder(w *coder.WindowCoder) (*CoderRef, error) { case coder.IntervalWindow: return &CoderRef{Type: intervalWindowType}, nil default: - return nil, fmt.Errorf("bad window kind: %v", w.Kind) + return nil, errors.Errorf("bad window kind: %v", w.Kind) } } @@ -296,6 +295,6 @@ func decodeWindowCoder(w *CoderRef) (*coder.WindowCoder, error) { case intervalWindowType: return coder.NewIntervalWindow(), nil default: - return nil, fmt.Errorf("bad window: %v", w.Type) + return nil, errors.Errorf("bad window: %v", w.Type) } } diff --git a/sdks/go/pkg/beam/core/runtime/graphx/serialize.go b/sdks/go/pkg/beam/core/runtime/graphx/serialize.go index 5f0e944..61e5174 100644 --- a/sdks/go/pkg/beam/core/runtime/graphx/serialize.go +++ b/sdks/go/pkg/beam/core/runtime/graphx/serialize.go @@ -30,6 +30,7 @@ import ( "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx/v1" "github.com/apache/beam/sdks/go/pkg/beam/core/typex" "github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx" + "github.com/apache/beam/sdks/go/pkg/beam/internal/errors" ) var genFnType = reflect.TypeOf((*func(string, reflect.Type, []byte) reflectx.Func)(nil)).Elem() @@ -43,14 +44,16 @@ func EncodeMultiEdge(edge *graph.MultiEdge) (*v1.MultiEdge, error) { if edge.DoFn != nil { ref, err := encodeFn((*graph.Fn)(edge.DoFn)) if err != nil { - return nil, fmt.Errorf("failed to encode userfn %v, bad userfn: %v", edge, err) + wrapped := errors.Wrap(err, "bad userfn") + return nil, errors.WithContextf(wrapped, "encoding userfn %v", edge) } ret.Fn = ref } if edge.CombineFn != nil { ref, err := encodeFn((*graph.Fn)(edge.CombineFn)) if err != nil { - return nil, fmt.Errorf("failed to encode userfn %v, bad combinefn: %v", edge, err) + wrapped := errors.Wrap(err, "bad combinefn") + return nil, errors.WithContextf(wrapped, "encoding userfn %v", edge) } ret.Fn = ref } @@ -62,14 +65,16 @@ func EncodeMultiEdge(edge *graph.MultiEdge) (*v1.MultiEdge, error) { kind := encodeInputKind(in.Kind) t, err := encodeFullType(in.Type) if err != nil { - return nil, fmt.Errorf("failed to encode userfn %v, bad input type: %v", edge, err) + wrapped := errors.Wrap(err, "bad input type") + return nil, errors.WithContextf(wrapped, "encoding userfn %v", edge) } ret.Inbound = append(ret.Inbound, &v1.MultiEdge_Inbound{Kind: kind, Type: t}) } for _, out := range edge.Output { t, err := encodeFullType(out.Type) if err != nil { - return nil, fmt.Errorf("failed to encode userfn %v, bad output type: %v", edge, err) + wrapped := errors.Wrap(err, "bad output type") + return nil, errors.WithContextf(wrapped, "encoding userfn %v", edge) } ret.Outbound = append(ret.Outbound, &v1.MultiEdge_Outbound{Type: t}) } @@ -91,7 +96,8 @@ func DecodeMultiEdge(edge *v1.MultiEdge) (graph.Opcode, *graph.Fn, *window.Fn, [ var err error u, err = decodeFn(edge.Fn) if err != nil { - return "", nil, nil, nil, nil, fmt.Errorf("failed to decode userfn %v, bad function: %v", edge, err) + wrapped := errors.Wrap(err, "bad function") + return "", nil, nil, nil, nil, errors.WithContextf(wrapped, "decoding userfn %v", edge) } } if edge.WindowFn != nil { @@ -100,18 +106,21 @@ func DecodeMultiEdge(edge *v1.MultiEdge) (graph.Opcode, *graph.Fn, *window.Fn, [ for _, in := range edge.Inbound { kind, err := decodeInputKind(in.Kind) if err != nil { - return "", nil, nil, nil, nil, fmt.Errorf("failed to decode userfn %v, bad input kind: %v", edge, err) + wrapped := errors.Wrap(err, "bad input kind") + return "", nil, nil, nil, nil, errors.WithContextf(wrapped, "decoding userfn %v", edge) } t, err := decodeFullType(in.Type) if err != nil { - return "", nil, nil, nil, nil, fmt.Errorf("failed to decode userfn %v, bad input type: %v", edge, err) + wrapped := errors.Wrap(err, "bad input type") + return "", nil, nil, nil, nil, errors.WithContextf(wrapped, "decoding userfn %v", edge) } inbound = append(inbound, &graph.Inbound{Kind: kind, Type: t}) } for _, out := range edge.Outbound { t, err := decodeFullType(out.Type) if err != nil { - return "", nil, nil, nil, nil, fmt.Errorf("failed to decode userfn %v, bad output type: %v", edge, err) + wrapped := errors.Wrap(err, "bad output type") + return "", nil, nil, nil, nil, errors.WithContextf(wrapped, "decoding userfn %v", edge) } outbound = append(outbound, &graph.Outbound{Type: t}) } @@ -122,15 +131,17 @@ func DecodeMultiEdge(edge *v1.MultiEdge) (graph.Opcode, *graph.Fn, *window.Fn, [ func encodeCustomCoder(c *coder.CustomCoder) (*v1.CustomCoder, error) { t, err := encodeType(c.Type) if err != nil { - return nil, fmt.Errorf("failed to encode custom coder %v for type %v: %v", c, c.Type, err) + return nil, errors.WithContextf(err, "encoding custom coder %v for type %v", c, c.Type) } enc, err := encodeUserFn(c.Enc) if err != nil { - return nil, fmt.Errorf("failed to encode custom coder %v, bad encoding function: %v", c, err) + wrapped := errors.Wrap(err, "bad encoding function") + return nil, errors.WithContextf(wrapped, "encoding custom coder %v", c) } dec, err := encodeUserFn(c.Dec) if err != nil { - return nil, fmt.Errorf("failed to encode custom coder %v, bad decoding function: %v", c, err) + wrapped := errors.Wrap(err, "bad decoding function") + return nil, errors.WithContextf(wrapped, "encoding custom coder %v", c) } ret := &v1.CustomCoder{ @@ -145,20 +156,22 @@ func encodeCustomCoder(c *coder.CustomCoder) (*v1.CustomCoder, error) { func decodeCustomCoder(c *v1.CustomCoder) (*coder.CustomCoder, error) { t, err := decodeType(c.Type) if err != nil { - return nil, fmt.Errorf("failed to decode custom coder %v for type %v: %v", c, c.Type, err) + return nil, errors.WithContextf(err, "decoding custom coder %v for type %v", c, c.Type) } enc, err := decodeUserFn(c.Enc) if err != nil { - return nil, fmt.Errorf("failed to decode custom coder %v, bad encoding function: %v", c, err) + wrapped := errors.Wrap(err, "bad encoding function") + return nil, errors.WithContextf(wrapped, "decoding custom coder %v", c) } dec, err := decodeUserFn(c.Dec) if err != nil { - return nil, fmt.Errorf("failed to decode custom coder %v, bad decoding function: %v", c, err) + wrapped := errors.Wrap(err, "bad decoding function") + return nil, errors.WithContextf(wrapped, "decoding custom coder %v", c) } ret, err := coder.NewCustomCoder(c.Name, t, enc, dec) if err != nil { - return nil, fmt.Errorf("failed to decode custom coder %v: %v", c, err) + return nil, errors.WithContextf(err, "decoding custom coder %v", c) } return ret, nil } @@ -195,7 +208,8 @@ func encodeFn(u *graph.Fn) (*v1.Fn, error) { gen := reflectx.FunctionName(u.DynFn.Gen) t, err := encodeType(u.DynFn.T) if err != nil { - return nil, fmt.Errorf("failed to encode dynamic DoFn %v, bad function type: %v", u, err) + wrapped := errors.Wrap(err, "bad function type") + return nil, errors.WithContextf(wrapped, "encoding dynamic DoFn %v", u) } return &v1.Fn{Dynfn: &v1.DynFn{ Name: u.DynFn.Name, @@ -207,7 +221,8 @@ func encodeFn(u *graph.Fn) (*v1.Fn, error) { case u.Fn != nil: fn, err := encodeUserFn(u.Fn) if err != nil { - return nil, fmt.Errorf("failed to encode DoFn %v, bad userfn: %v", u, err) + wrapped := errors.Wrap(err, "bad userfn") + return nil, errors.WithContextf(wrapped, "encoding DoFn %v", u) } return &v1.Fn{Fn: fn}, nil @@ -215,19 +230,23 @@ func encodeFn(u *graph.Fn) (*v1.Fn, error) { t := reflect.TypeOf(u.Recv) k, ok := runtime.TypeKey(reflectx.SkipPtr(t)) if !ok { - return nil, fmt.Errorf("failed to encode structural DoFn %v, failed to create TypeKey for receiver type %T", u, u.Recv) + err := errors.Errorf("failed to create TypeKey for receiver type %T", u.Recv) + return nil, errors.WithContextf(err, "encoding structural DoFn %v", u) } if _, ok := runtime.LookupType(k); !ok { - return nil, fmt.Errorf("failed to encode structural DoFn %v, receiver type %v must be registered", u, t) + err := errors.Errorf("receiver type %v must be registered", t) + return nil, errors.WithContextf(err, "encoding structural DoFn %v", u) } typ, err := encodeType(t) if err != nil { - panic(fmt.Sprintf("Failed to encode structural DoFn %v, failed to encode receiver type %T: %v", u, u.Recv, err)) + wrapped := errors.Wrapf(err, "failed to encode receiver type %T", u.Recv) + panic(errors.WithContextf(wrapped, "encoding structural DoFn %v", u)) } data, err := json.Marshal(u.Recv) if err != nil { - return nil, fmt.Errorf("failed to encode structural DoFn %v, failed to marshal receiver %v: %v", u, u.Recv, err) + wrapped := errors.Wrapf(err, "failed to marshal receiver %v", u.Recv) + return nil, errors.WithContextf(wrapped, "encoding structural DoFn %v", u) } return &v1.Fn{Type: typ, Opt: string(data)}, nil @@ -240,12 +259,14 @@ func decodeFn(u *v1.Fn) (*graph.Fn, error) { if u.Dynfn != nil { gen, err := runtime.ResolveFunction(u.Dynfn.Gen, genFnType) if err != nil { - return nil, fmt.Errorf("failed to decode dynamic DoFn %v, bad symbol %v: %v", u, u.Dynfn.Gen, err) + wrapped := errors.Wrapf(err, "bad symbol %v", u.Dynfn.Gen) + return nil, errors.WithContextf(wrapped, "decoding dynamic DoFn %v", u) } t, err := decodeType(u.Dynfn.Type) if err != nil { - return nil, fmt.Errorf("failed to decode dynamic DoFn %v, bad type: %v", u, err) + wrapped := errors.Wrap(err, "bad type") + return nil, errors.WithContextf(wrapped, "failed to decode dynamic DoFn %v", u) } return graph.NewFn(&graph.DynFn{ Name: u.Dynfn.Name, @@ -257,22 +278,26 @@ func decodeFn(u *v1.Fn) (*graph.Fn, error) { if u.Fn != nil { fn, err := decodeUserFn(u.Fn) if err != nil { - return nil, fmt.Errorf("failed to decode DoFn %v, failed to decode userfn: %v", u, err) + wrapped := errors.Wrap(err, "failed to decode userfn") + return nil, errors.WithContextf(wrapped, "decoding DoFn %v", u) } fx, err := funcx.New(reflectx.MakeFunc(fn)) if err != nil { - return nil, fmt.Errorf("failed to decode DoFn %v, failed to construct userfn: %v", u, err) + wrapped := errors.Wrap(err, "failed to construct userfn") + return nil, errors.WithContextf(wrapped, "decoding DoFn %v", u) } return &graph.Fn{Fn: fx}, nil } t, err := decodeType(u.Type) if err != nil { - return nil, fmt.Errorf("failed to decode structural DoFn %v, bad type: %v", u, err) + wrapped := errors.Wrap(err, "bad type") + return nil, errors.WithContextf(wrapped, "decoding structural DoFn %v", u) } fn, err := reflectx.UnmarshalJSON(t, u.Opt) if err != nil { - return nil, fmt.Errorf("failed to decode structural DoFn %v, bad struct encoding: %v", u, err) + wrapped := errors.Wrap(err, "bad struct encoding") + return nil, errors.WithContextf(wrapped, "decoding structural DoFn %v", u) } return graph.NewFn(fn) } @@ -286,7 +311,8 @@ func encodeUserFn(u *funcx.Fn) (*v1.UserFn, error) { symbol := u.Fn.Name() t, err := encodeType(u.Fn.Type()) if err != nil { - return nil, fmt.Errorf("failed to encode userfn %v, bad function type: %v", u, err) + wrapped := errors.Wrap(err, "bad function type") + return nil, errors.WithContextf(wrapped, "encoding userfn %v", u) } return &v1.UserFn{Name: symbol, Type: t}, nil } @@ -319,7 +345,8 @@ func encodeFullType(t typex.FullType) (*v1.FullType, error) { prim, err := encodeType(t.Type()) if err != nil { - return nil, fmt.Errorf("failed to encode full type %v, bad type: %v", t, err) + wrapped := errors.Wrap(err, "bad type") + return nil, errors.WithContextf(wrapped, "encoding full type %v", t) } return &v1.FullType{Type: prim, Components: components}, nil } @@ -336,7 +363,8 @@ func decodeFullType(t *v1.FullType) (typex.FullType, error) { prim, err := decodeType(t.Type) if err != nil { - return nil, fmt.Errorf("failed to decode full type %v, bad type: %v", t, err) + wrapped := errors.Wrap(err, "bad type") + return nil, errors.WithContextf(wrapped, "decoding full type %v", t) } return typex.New(prim, components...), nil } @@ -388,7 +416,8 @@ func encodeType(t reflect.Type) (*v1.Type, error) { case reflect.Slice: elm, err := encodeType(t.Elem()) if err != nil { - return nil, fmt.Errorf("failed to encode slice %v, bad element type: %v", t, err) + wrapped := errors.Wrap(err, "bad element type") + return nil, errors.WithContextf(wrapped, "encoding slice %v", t) } return &v1.Type{Kind: v1.Type_SLICE, Element: elm}, nil @@ -399,7 +428,8 @@ func encodeType(t reflect.Type) (*v1.Type, error) { fType, err := encodeType(f.Type) if err != nil { - return nil, fmt.Errorf("failed to encode struct %v, bad field type: %v", t, err) + wrapped := errors.Wrap(err, "bad field type") + return nil, errors.WithContextf(wrapped, "encoding struct %v", t) } field := &v1.Type_StructField{ @@ -420,7 +450,8 @@ func encodeType(t reflect.Type) (*v1.Type, error) { for i := 0; i < t.NumIn(); i++ { param, err := encodeType(t.In(i)) if err != nil { - return nil, fmt.Errorf("failed to encode function %v, bad parameter type: %v", t, err) + wrapped := errors.Wrap(err, "bad parameter type") + return nil, errors.WithContextf(wrapped, "encoding function %v", t) } in = append(in, param) } @@ -428,7 +459,8 @@ func encodeType(t reflect.Type) (*v1.Type, error) { for i := 0; i < t.NumOut(); i++ { ret, err := encodeType(t.Out(i)) if err != nil { - return nil, fmt.Errorf("failed to encode function %v, bad return type: %v", t, err) + wrapped := errors.Wrap(err, "bad return type") + return nil, errors.WithContextf(wrapped, "encoding function %v", t) } out = append(out, ret) } @@ -437,7 +469,8 @@ func encodeType(t reflect.Type) (*v1.Type, error) { case reflect.Chan: elm, err := encodeType(t.Elem()) if err != nil { - return nil, fmt.Errorf("failed to encode channel %v, bad element type: %v", t, err) + wrapped := errors.Wrap(err, "bad element type") + return nil, errors.WithContextf(wrapped, "encoding channel %v", t) } dir := encodeChanDir(t.ChanDir()) return &v1.Type{Kind: v1.Type_CHAN, Element: elm, ChanDir: dir}, nil @@ -445,12 +478,13 @@ func encodeType(t reflect.Type) (*v1.Type, error) { case reflect.Ptr: elm, err := encodeType(t.Elem()) if err != nil { - return nil, fmt.Errorf("failed to encode pointer %v, bad base type: %v", t, err) + wrapped := errors.Wrap(err, "bad base type") + return nil, errors.WithContextf(wrapped, "encoding pointer %v", t) } return &v1.Type{Kind: v1.Type_PTR, Element: elm}, nil default: - return nil, fmt.Errorf("unencodable type %v", t) + return nil, errors.Errorf("unencodable type %v", t) } } @@ -496,7 +530,8 @@ func tryEncodeSpecial(t reflect.Type) (v1.Type_Special, bool) { func decodeType(t *v1.Type) (reflect.Type, error) { if t == nil { - return nil, fmt.Errorf("failed to decode type %v, empty type", t) + err := errors.New("empty type") + return nil, errors.WithContextf(err, "decoding type %v", t) } switch t.Kind { @@ -532,7 +567,8 @@ func decodeType(t *v1.Type) (reflect.Type, error) { case v1.Type_SLICE: elm, err := decodeType(t.GetElement()) if err != nil { - return nil, fmt.Errorf("failed to decode type %v, bad element: %v", t, err) + wrapped := errors.Wrap(err, "bad element") + return nil, errors.WithContextf(wrapped, "failed to decode type %v, bad element: %v", t) } return reflect.SliceOf(elm), nil @@ -541,7 +577,8 @@ func decodeType(t *v1.Type) (reflect.Type, error) { for _, f := range t.Fields { fType, err := decodeType(f.Type) if err != nil { - return nil, fmt.Errorf("failed to decode type %v, bad field type: %v", t, err) + wrapped := errors.Wrap(err, "bad field type") + return nil, errors.WithContextf(wrapped, "failed to decode type %v, bad field type: %v", t) } field := reflect.StructField{ @@ -560,48 +597,56 @@ func decodeType(t *v1.Type) (reflect.Type, error) { case v1.Type_FUNC: in, err := decodeTypes(t.GetParameterTypes()) if err != nil { - return nil, fmt.Errorf("failed to decode type %v, bad parameter type: %v", t, err) + wrapped := errors.Wrap(err, "bad parameter type") + return nil, errors.WithContextf(wrapped, "decoding type %v", t) } out, err := decodeTypes(t.GetReturnTypes()) if err != nil { - return nil, fmt.Errorf("failed to decode type %v, bad return type: %v", t, err) + wrapped := errors.Wrap(err, "bad return type") + return nil, errors.WithContextf(wrapped, "decoding type %v", t) } return reflect.FuncOf(in, out, t.GetIsVariadic()), nil case v1.Type_CHAN: elm, err := decodeType(t.GetElement()) if err != nil { - return nil, fmt.Errorf("failed to decode type %v, bad element: %v", t, err) + wrapped := errors.Wrap(err, "bad element") + return nil, errors.WithContextf(wrapped, "decoding type %v", t) } dir, err := decodeChanDir(t.GetChanDir()) if err != nil { - return nil, fmt.Errorf("failed to decode type %v, bad channel direction: %v", t, err) + wrapped := errors.Wrap(err, "bad channel direction") + return nil, errors.WithContextf(wrapped, "decoding type %v", t) } return reflect.ChanOf(dir, elm), nil case v1.Type_PTR: elm, err := decodeType(t.GetElement()) if err != nil { - return nil, fmt.Errorf("failed to decode type %v, bad element: %v", t, err) + wrapped := errors.Wrap(err, "bad element") + return nil, errors.WithContextf(wrapped, "decoding type %v", t) } return reflect.PtrTo(elm), nil case v1.Type_SPECIAL: ret, err := decodeSpecial(t.Special) if err != nil { - return nil, fmt.Errorf("failed to decode type %v, bad element: %v", t, err) + wrapped := errors.Wrap(err, "bad element") + return nil, errors.WithContextf(wrapped, "decoding type %v", t) } return ret, nil case v1.Type_EXTERNAL: ret, ok := runtime.LookupType(t.ExternalKey) if !ok { - return nil, fmt.Errorf("failed to decode type %v, external key not found %v", t, t.ExternalKey) + err := errors.Errorf("external key not found %v", t.ExternalKey) + return nil, errors.WithContextf(err, "decoding type %v", t) } return ret, nil default: - return nil, fmt.Errorf("failed to decode type %v, unexpected type kind %v", t, t.Kind) + err := errors.Errorf("unexpected type kind %v", t.Kind) + return nil, errors.WithContextf(err, "failed to decode type %v", t) } } @@ -641,7 +686,7 @@ func decodeSpecial(s v1.Type_Special) (reflect.Type, error) { return typex.ZType, nil default: - return nil, fmt.Errorf("failed to decode special type, unknown type %v", s) + return nil, errors.Errorf("failed to decode special type, unknown type %v", s) } } @@ -695,7 +740,8 @@ func decodeChanDir(dir v1.Type_ChanDir) (reflect.ChanDir, error) { case v1.Type_BOTH: return reflect.BothDir, nil default: - return reflect.BothDir, fmt.Errorf("failed to decode channel direction, invalid value: %v", dir) + err := errors.Errorf("invalid value: %v", dir) + return reflect.BothDir, errors.WithContext(err, "decoding channel direction") } } @@ -737,6 +783,7 @@ func decodeInputKind(k v1.MultiEdge_Inbound_InputKind) (graph.InputKind, error) case v1.MultiEdge_Inbound_REITER: return graph.ReIter, nil default: - return graph.Main, fmt.Errorf("failed to decode input kind, invalid value: %v", k) + err := errors.Errorf("invalid value: %v", k) + return graph.Main, errors.WithContext(err, "decoding input kind") } } diff --git a/sdks/go/pkg/beam/core/runtime/graphx/translate.go b/sdks/go/pkg/beam/core/runtime/graphx/translate.go index 16328a7..3798be4 100644 --- a/sdks/go/pkg/beam/core/runtime/graphx/translate.go +++ b/sdks/go/pkg/beam/core/runtime/graphx/translate.go @@ -24,6 +24,7 @@ import ( v1 "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx/v1" "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/pipelinex" "github.com/apache/beam/sdks/go/pkg/beam/core/util/protox" + "github.com/apache/beam/sdks/go/pkg/beam/internal/errors" pb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1" "github.com/golang/protobuf/proto" "github.com/golang/protobuf/ptypes" @@ -567,7 +568,7 @@ func makeWindowCoder(w *window.Fn) *coder.WindowCoder { func mustEncodeMultiEdgeBase64(edge *graph.MultiEdge) string { ref, err := EncodeMultiEdge(edge) if err != nil { - panic(fmt.Sprintf("Failed to serialize %v: %v", edge, err)) + panic(errors.Wrapf(err, "Failed to serialize %v", edge)) } return protox.MustEncodeBase64(&v1.TransformPayload{ Urn: URNDoFn, diff --git a/sdks/go/pkg/beam/core/runtime/harness/datamgr.go b/sdks/go/pkg/beam/core/runtime/harness/datamgr.go index 5c25be0..2d985d4 100644 --- a/sdks/go/pkg/beam/core/runtime/harness/datamgr.go +++ b/sdks/go/pkg/beam/core/runtime/harness/datamgr.go @@ -17,12 +17,12 @@ package harness import ( "context" - "fmt" "io" "sync" "time" "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec" + "github.com/apache/beam/sdks/go/pkg/beam/internal/errors" "github.com/apache/beam/sdks/go/pkg/beam/log" pb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1" ) @@ -69,7 +69,7 @@ func (s *ScopedDataManager) open(ctx context.Context, port exec.Port) (*DataChan s.mu.Lock() if s.closed { s.mu.Unlock() - return nil, fmt.Errorf("instruction %v no longer processing", s.instID) + return nil, errors.Errorf("instruction %v no longer processing", s.instID) } local := s.mgr s.mu.Unlock() @@ -147,12 +147,12 @@ type DataChannel struct { func newDataChannel(ctx context.Context, port exec.Port) (*DataChannel, error) { cc, err := dial(ctx, port.URL, 15*time.Second) if err != nil { - return nil, fmt.Errorf("failed to connect: %v", err) + return nil, errors.Wrap(err, "failed to connect") } client, err := pb.NewBeamFnDataClient(cc).Data(ctx) if err != nil { cc.Close() - return nil, fmt.Errorf("failed to connect to data service: %v", err) + return nil, errors.Wrap(err, "failed to connect to data service") } return makeDataChannel(ctx, port.URL, client), nil } @@ -187,7 +187,7 @@ func (c *DataChannel) read(ctx context.Context) { log.Warnf(ctx, "DataChannel %v closed", c.id) return } - panic(fmt.Errorf("channel %v bad: %v", c.id, err)) + panic(errors.Wrapf(err, "channel %v bad", c.id)) } recordStreamReceive(msg) @@ -374,7 +374,7 @@ func (w *dataWriter) Write(p []byte) (n int, err error) { l := len(w.buf) // We can't fit this message into the buffer. We need to flush the buffer if err := w.Flush(); err != nil { - return 0, fmt.Errorf("datamgr.go: error flushing buffer of length %d: %v", l, err) + return 0, errors.Wrapf(err, "datamgr.go: error flushing buffer of length %d", l) } } diff --git a/sdks/go/pkg/beam/core/runtime/harness/harness.go b/sdks/go/pkg/beam/core/runtime/harness/harness.go index 08ee7dc..0c40f27 100644 --- a/sdks/go/pkg/beam/core/runtime/harness/harness.go +++ b/sdks/go/pkg/beam/core/runtime/harness/harness.go @@ -25,6 +25,7 @@ import ( "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec" "github.com/apache/beam/sdks/go/pkg/beam/core/util/hooks" + "github.com/apache/beam/sdks/go/pkg/beam/internal/errors" "github.com/apache/beam/sdks/go/pkg/beam/log" fnpb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1" "github.com/apache/beam/sdks/go/pkg/beam/util/grpcx" @@ -49,13 +50,13 @@ func Main(ctx context.Context, loggingEndpoint, controlEndpoint string) error { conn, err := dial(ctx, controlEndpoint, 60*time.Second) if err != nil { - return fmt.Errorf("Failed to connect: %v", err) + return errors.Wrap(err, "failed to connect") } defer conn.Close() client, err := fnpb.NewBeamFnControlClient(conn).Control(ctx) if err != nil { - return fmt.Errorf("Failed to connect to control service: %v", err) + return errors.Wrapf(err, "failed to connect to control service") } log.Debugf(ctx, "Successfully connected to control @ %v", controlEndpoint) @@ -101,7 +102,7 @@ func Main(ctx context.Context, loggingEndpoint, controlEndpoint string) error { recordFooter() return nil } - return fmt.Errorf("recv failed: %v", err) + return errors.Wrapf(err, "recv failed") } // Launch a goroutine to handle the control message. diff --git a/sdks/go/pkg/beam/core/runtime/harness/logging.go b/sdks/go/pkg/beam/core/runtime/harness/logging.go index f7608cd..ad24148 100644 --- a/sdks/go/pkg/beam/core/runtime/harness/logging.go +++ b/sdks/go/pkg/beam/core/runtime/harness/logging.go @@ -22,6 +22,7 @@ import ( "runtime" "time" + "github.com/apache/beam/sdks/go/pkg/beam/internal/errors" "github.com/apache/beam/sdks/go/pkg/beam/log" pb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1" "github.com/golang/protobuf/ptypes" @@ -148,5 +149,5 @@ func (w *remoteWriter) connect(ctx context.Context) error { // fmt.Fprintf(os.Stderr, "SENT: %v\n", msg) } - return fmt.Errorf("internal: buffer closed?") + return errors.New("internal: buffer closed?") } diff --git a/sdks/go/pkg/beam/core/runtime/harness/session.go b/sdks/go/pkg/beam/core/runtime/harness/session.go index 7fecf9f..d783533 100644 --- a/sdks/go/pkg/beam/core/runtime/harness/session.go +++ b/sdks/go/pkg/beam/core/runtime/harness/session.go @@ -24,6 +24,7 @@ import ( "github.com/apache/beam/sdks/go/pkg/beam/core/runtime" "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness/session" "github.com/apache/beam/sdks/go/pkg/beam/core/util/hooks" + "github.com/apache/beam/sdks/go/pkg/beam/internal/errors" pb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1" "github.com/golang/protobuf/proto" ) @@ -68,7 +69,7 @@ func recordMessage(opcode session.Kind, pb *session.Entry) error { body := bufPool.Get().(*proto.Buffer) defer bufPool.Put(body) if err := body.Marshal(pb); err != nil { - return fmt.Errorf("Unable to marshal message for session recording: %v", err) + return errors.Wrap(err, "unable to marshal message for session recording") } eh := &session.EntryHeader{ @@ -79,13 +80,13 @@ func recordMessage(opcode session.Kind, pb *session.Entry) error { hdr := bufPool.Get().(*proto.Buffer) defer bufPool.Put(hdr) if err := hdr.Marshal(eh); err != nil { - return fmt.Errorf("Unable to marshal message header for session recording: %v", err) + return errors.Wrap(err, "unable to marshal message header for session recording") } l := bufPool.Get().(*proto.Buffer) defer bufPool.Put(l) if err := l.EncodeVarint(uint64(len(hdr.Bytes()))); err != nil { - return fmt.Errorf("Unable to write entry header length: %v", err) + return errors.Wrap(err, "unable to write entry header length") } // Acquire the lock to write the file. @@ -93,13 +94,13 @@ func recordMessage(opcode session.Kind, pb *session.Entry) error { defer sessionLock.Unlock() if _, err := capture.Write(l.Bytes()); err != nil { - return fmt.Errorf("Unable to write entry header length: %v", err) + return errors.Wrap(err, "unable to write entry header length") } if _, err := capture.Write(hdr.Bytes()); err != nil { - return fmt.Errorf("Unable to write entry header: %v", err) + return errors.Wrap(err, "unable to write entry header") } if _, err := capture.Write(body.Bytes()); err != nil { - return fmt.Errorf("Unable to write entry body: %v", err) + return errors.Wrap(err, "unable to write entry body") } return nil } diff --git a/sdks/go/pkg/beam/core/runtime/harness/statemgr.go b/sdks/go/pkg/beam/core/runtime/harness/statemgr.go index 6445618..f5d3102 100644 --- a/sdks/go/pkg/beam/core/runtime/harness/statemgr.go +++ b/sdks/go/pkg/beam/core/runtime/harness/statemgr.go @@ -24,10 +24,10 @@ import ( "time" "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec" + "github.com/apache/beam/sdks/go/pkg/beam/internal/errors" "github.com/apache/beam/sdks/go/pkg/beam/log" pb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1" "github.com/golang/protobuf/proto" - "github.com/pkg/errors" ) // ScopedSideInputReader scopes the global gRPC state manager to a single instruction @@ -55,7 +55,7 @@ func (s *ScopedSideInputReader) Open(ctx context.Context, id exec.StreamID, key, s.mu.Lock() if s.closed { s.mu.Unlock() - return nil, fmt.Errorf("instruction %v no longer processing", s.instID) + return nil, errors.Errorf("instruction %v no longer processing", s.instID) } ret := newSideInputReader(ch, id.Target, s.instID, key, w) s.opened = append(s.opened, ret) @@ -67,7 +67,7 @@ func (s *ScopedSideInputReader) open(ctx context.Context, port exec.Port) (*Stat s.mu.Lock() if s.closed { s.mu.Unlock() - return nil, fmt.Errorf("instruction %v no longer processing", s.instID) + return nil, errors.Errorf("instruction %v no longer processing", s.instID) } local := s.mgr s.mu.Unlock() @@ -129,7 +129,7 @@ func (r *sideInputReader) Read(buf []byte) (int, error) { r.mu.Lock() if r.closed { r.mu.Unlock() - return 0, fmt.Errorf("side input closed") + return 0, errors.New("side input closed") } local := r.ch r.mu.Unlock() @@ -219,12 +219,12 @@ type StateChannel struct { func newStateChannel(ctx context.Context, port exec.Port) (*StateChannel, error) { cc, err := dial(ctx, port.URL, 15*time.Second) if err != nil { - return nil, fmt.Errorf("failed to connect: %v", err) + return nil, errors.Wrap(err, "failed to connect") } client, err := pb.NewBeamFnStateClient(cc).State(ctx) if err != nil { cc.Close() - return nil, fmt.Errorf("failed to connect to data service: %v", err) + return nil, errors.Wrap(err, "failed to connect to data service") } ret := &StateChannel{ @@ -248,7 +248,7 @@ func (c *StateChannel) read(ctx context.Context) { log.Warnf(ctx, "StateChannel %v closed", c.id) return } - panic(fmt.Errorf("state channel %v bad: %v", c.id, err)) + panic(errors.Wrapf(err, "state channel %v bad", c.id)) } c.mu.Lock() diff --git a/sdks/go/pkg/beam/core/runtime/pipelinex/replace.go b/sdks/go/pkg/beam/core/runtime/pipelinex/replace.go index 56f14b3..ad1175c 100644 --- a/sdks/go/pkg/beam/core/runtime/pipelinex/replace.go +++ b/sdks/go/pkg/beam/core/runtime/pipelinex/replace.go @@ -22,6 +22,7 @@ import ( "sort" "github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx" + "github.com/apache/beam/sdks/go/pkg/beam/internal/errors" pb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1" ) @@ -44,7 +45,7 @@ func Update(p *pb.Pipeline, values *pb.Components) (*pb.Pipeline, error) { // subtransform list. func Normalize(p *pb.Pipeline) (*pb.Pipeline, error) { if len(p.GetComponents().GetTransforms()) == 0 { - return nil, fmt.Errorf("empty pipeline") + return nil, errors.New("empty pipeline") } ret := shallowClonePipeline(p) diff --git a/sdks/go/pkg/beam/core/runtime/symbols.go b/sdks/go/pkg/beam/core/runtime/symbols.go index f5089f1..6dbefad 100644 --- a/sdks/go/pkg/beam/core/runtime/symbols.go +++ b/sdks/go/pkg/beam/core/runtime/symbols.go @@ -16,13 +16,13 @@ package runtime import ( - "fmt" "os" "reflect" "sync" "github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx" "github.com/apache/beam/sdks/go/pkg/beam/core/util/symtab" + "github.com/apache/beam/sdks/go/pkg/beam/internal/errors" ) var ( @@ -105,5 +105,5 @@ func ResolveFunction(name string, t reflect.Type) (interface{}, error) { type failResolver bool func (p failResolver) Sym2Addr(name string) (uintptr, error) { - return 0, fmt.Errorf("%v not found. Use runtime.RegisterFunction in unit tests", name) + return 0, errors.Errorf("%v not found. Use runtime.RegisterFunction in unit tests", name) } diff --git a/sdks/go/pkg/beam/core/typex/fulltype.go b/sdks/go/pkg/beam/core/typex/fulltype.go index 5066023..a96e6f9 100644 --- a/sdks/go/pkg/beam/core/typex/fulltype.go +++ b/sdks/go/pkg/beam/core/typex/fulltype.go @@ -21,6 +21,8 @@ import ( "fmt" "reflect" "strings" + + "github.com/apache/beam/sdks/go/pkg/beam/internal/errors" ) // FullType represents the tree structure of data types processed by the graph. @@ -298,7 +300,7 @@ func IsBound(t FullType) bool { // produce {"T" -> string}. func Bind(types, models []FullType) (map[string]reflect.Type, error) { if len(types) != len(models) { - return nil, fmt.Errorf("typex.Bind: invalid number of models: %v, want %v", len(models), len(types)) + return nil, errors.Errorf("typex.Bind: invalid number of models: %v, want %v", len(models), len(types)) } m := make(map[string]reflect.Type) @@ -307,7 +309,7 @@ func Bind(types, models []FullType) (map[string]reflect.Type, error) { model := models[i] if !IsStructurallyAssignable(model, t) { - return nil, fmt.Errorf("typex.Bind: %v is not assignable to %v", model, t) + return nil, errors.Errorf("typex.Bind: %v is not assignable to %v", model, t) } if err := walk(t, model, m); err != nil { return nil, err @@ -326,7 +328,7 @@ func walk(t, model FullType, m map[string]reflect.Type) error { name := t.Type().Name() if current, ok := m[name]; ok && current != model.Type() { - return fmt.Errorf("bind conflict for %v: %v != %v", name, current, model.Type()) + return errors.Errorf("bind conflict for %v: %v != %v", name, current, model.Type()) } m[name] = model.Type() return nil @@ -363,7 +365,7 @@ func substitute(t FullType, m map[string]reflect.Type) (FullType, error) { name := t.Type().Name() repl, ok := m[name] if !ok { - return nil, fmt.Errorf("substituting type %v: type not bound", name) + return nil, errors.Errorf("substituting type %v: type not bound", name) } return New(repl), nil case Container: @@ -374,7 +376,7 @@ func substitute(t FullType, m map[string]reflect.Type) (FullType, error) { if IsList(t.Type()) { return New(reflect.SliceOf(comp[0].Type()), comp...), nil } - return nil, fmt.Errorf("unexpected aggregate %v, only slices allowed", t) + return nil, errors.Errorf("unexpected aggregate %v, only slices allowed", t) case Composite: comp, err := substituteList(t.Components(), m) if err != nil { diff --git a/sdks/go/pkg/beam/core/util/dot/dot.go b/sdks/go/pkg/beam/core/util/dot/dot.go index e92e501..d4d09a2 100644 --- a/sdks/go/pkg/beam/core/util/dot/dot.go +++ b/sdks/go/pkg/beam/core/util/dot/dot.go @@ -22,6 +22,7 @@ import ( "text/template" "github.com/apache/beam/sdks/go/pkg/beam/core/graph" + "github.com/apache/beam/sdks/go/pkg/beam/internal/errors" ) var ( @@ -114,14 +115,14 @@ func Render(edges []*graph.MultiEdge, nodes []*graph.Node, w io.Writer) error { for _, ib := range edge.Input { err := edgeTmpl.Execute(w, struct{ From, To string }{ib.From.String(), e}) if err != nil { - return fmt.Errorf("render DOT failed: %v", err) + return errors.Wrap(err, "render DOT failed") } } for _, ob := range edge.Output { uniqNodes[ob.To].From = ob err := edgeTmpl.Execute(w, struct{ From, To string }{e, ob.To.String()}) if err != nil { - return fmt.Errorf("render DOT failed: %v", err) + return errors.Wrap(err, "render DOT failed") } } } diff --git a/sdks/go/pkg/beam/core/util/hooks/hooks.go b/sdks/go/pkg/beam/core/util/hooks/hooks.go index db31792..9c873aa 100644 --- a/sdks/go/pkg/beam/core/util/hooks/hooks.go +++ b/sdks/go/pkg/beam/core/util/hooks/hooks.go @@ -31,10 +31,10 @@ import ( "context" "encoding/csv" "encoding/json" - "fmt" "strings" "github.com/apache/beam/sdks/go/pkg/beam/core/runtime" + "github.com/apache/beam/sdks/go/pkg/beam/internal/errors" "github.com/apache/beam/sdks/go/pkg/beam/log" fnpb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1" ) @@ -126,7 +126,7 @@ func SerializeHooksToOptions() { data, err := json.Marshal(enabledHooks) if err != nil { // Shouldn't happen, since all the data is strings. - panic(fmt.Sprintf("Couldn't serialize hooks: %v", err)) + panic(errors.Wrap(err, "Couldn't serialize hooks")) } runtime.GlobalOptions.Set("hooks", string(data)) } @@ -141,7 +141,7 @@ func DeserializeHooksFromOptions(ctx context.Context) { } if err := json.Unmarshal([]byte(cfg), &enabledHooks); err != nil { // Shouldn't happen, since all the data is strings. - panic(fmt.Sprintf("DeserializeHooks failed on input %q: %v", cfg, err)) + panic(errors.Wrapf(err, "DeserializeHooks failed on input %q", cfg)) } for h, opts := range enabledHooks { @@ -155,7 +155,7 @@ func DeserializeHooksFromOptions(ctx context.Context) { // if a hook wants to compose behavior. func EnableHook(name string, args ...string) error { if _, ok := hookRegistry[name]; !ok { - return fmt.Errorf("EnableHook: hook %s not found", name) + return errors.Errorf("EnableHook: hook %s not found", name) } enabledHooks[name] = args return nil @@ -176,7 +176,7 @@ func Encode(name string, opts []string) string { w := csv.NewWriter(&cfg) // This should never happen since a bytes.Buffer doesn't fail to write. if err := w.Write(append([]string{name}, opts...)); err != nil { - panic(fmt.Sprintf("error encoding arguments: %v", err)) + panic(errors.Wrap(err, "error encoding arguments")) } w.Flush() return cfg.String() @@ -189,7 +189,7 @@ func Decode(in string) (string, []string) { r := csv.NewReader(strings.NewReader(in)) s, err := r.Read() if err != nil { - panic(fmt.Sprintf("malformed input for decoding: %s %v", in, err)) + panic(errors.Wrapf(err, "malformed input for decoding: %s", in)) } return s[0], s[1:] } diff --git a/sdks/go/pkg/beam/core/util/ioutilx/read.go b/sdks/go/pkg/beam/core/util/ioutilx/read.go index 106eff0..cf3568c 100644 --- a/sdks/go/pkg/beam/core/util/ioutilx/read.go +++ b/sdks/go/pkg/beam/core/util/ioutilx/read.go @@ -17,9 +17,10 @@ package ioutilx import ( - "errors" "io" "unsafe" + + "github.com/apache/beam/sdks/go/pkg/beam/internal/errors" ) // ReadN reads exactly N bytes from the reader. Fails otherwise. diff --git a/sdks/go/pkg/beam/core/util/protox/any.go b/sdks/go/pkg/beam/core/util/protox/any.go index a4ed3c1..0568f81 100644 --- a/sdks/go/pkg/beam/core/util/protox/any.go +++ b/sdks/go/pkg/beam/core/util/protox/any.go @@ -16,8 +16,7 @@ package protox import ( - "fmt" - + "github.com/apache/beam/sdks/go/pkg/beam/internal/errors" "github.com/golang/protobuf/proto" protobuf "github.com/golang/protobuf/ptypes/any" protobufw "github.com/golang/protobuf/ptypes/wrappers" @@ -30,7 +29,7 @@ const ( // Unpack decodes a proto. func Unpack(data *protobuf.Any, url string, ret proto.Message) error { if data.TypeUrl != url { - return fmt.Errorf("Bad type: %v, want %v", data.TypeUrl, url) + return errors.Errorf("bad type: %v, want %v", data.TypeUrl, url) } return proto.Unmarshal(data.Value, ret) } @@ -75,12 +74,12 @@ func PackBase64Proto(in proto.Message) (*protobuf.Any, error) { // UnpackBytes removes the BytesValue wrapper. func UnpackBytes(data *protobuf.Any) ([]byte, error) { if data.TypeUrl != bytesValueTypeURL { - return nil, fmt.Errorf("Bad type: %v, want %v", data.TypeUrl, bytesValueTypeURL) + return nil, errors.Errorf("bad type: %v, want %v", data.TypeUrl, bytesValueTypeURL) } var buf protobufw.BytesValue if err := proto.Unmarshal(data.Value, &buf); err != nil { - return nil, fmt.Errorf("BytesValue unmarshal failed: %v", err) + return nil, errors.Wrap(err, "BytesValue unmarshal failed") } return buf.Value, nil } diff --git a/sdks/go/pkg/beam/core/util/protox/base64.go b/sdks/go/pkg/beam/core/util/protox/base64.go index 90fb13f..296d397 100644 --- a/sdks/go/pkg/beam/core/util/protox/base64.go +++ b/sdks/go/pkg/beam/core/util/protox/base64.go @@ -17,8 +17,8 @@ package protox import ( "encoding/base64" - "fmt" + "github.com/apache/beam/sdks/go/pkg/beam/internal/errors" "github.com/golang/protobuf/proto" ) @@ -44,7 +44,7 @@ func EncodeBase64(msg proto.Message) (string, error) { func DecodeBase64(data string, ret proto.Message) error { decoded, err := base64.StdEncoding.DecodeString(data) if err != nil { - return fmt.Errorf("base64 decoding failed: %v", err) + return errors.Wrap(err, "base64 decoding failed") } return proto.Unmarshal(decoded, ret) } diff --git a/sdks/go/pkg/beam/core/util/reflectx/call.go b/sdks/go/pkg/beam/core/util/reflectx/call.go index f69ffda..44f0602 100644 --- a/sdks/go/pkg/beam/core/util/reflectx/call.go +++ b/sdks/go/pkg/beam/core/util/reflectx/call.go @@ -19,7 +19,7 @@ import ( "reflect" "sync" - "fmt" + "github.com/apache/beam/sdks/go/pkg/beam/internal/errors" "runtime/debug" ) @@ -90,7 +90,7 @@ func (c *reflectFunc) Call(args []interface{}) []interface{} { func CallNoPanic(fn Func, args []interface{}) (ret []interface{}, err error) { defer func() { if r := recover(); r != nil { - err = fmt.Errorf("panic: %v %s", r, debug.Stack()) + err = errors.Errorf("panic: %v %s", r, debug.Stack()) } }() return fn.Call(args), nil diff --git a/sdks/go/pkg/beam/core/util/reflectx/json.go b/sdks/go/pkg/beam/core/util/reflectx/json.go index 530b9f8..2ca6fe7 100644 --- a/sdks/go/pkg/beam/core/util/reflectx/json.go +++ b/sdks/go/pkg/beam/core/util/reflectx/json.go @@ -17,8 +17,9 @@ package reflectx import ( "encoding/json" - "fmt" "reflect" + + "github.com/apache/beam/sdks/go/pkg/beam/internal/errors" ) // UnmarshalJSON decodes a json string as then given type. It is suitable @@ -26,7 +27,7 @@ import ( func UnmarshalJSON(t reflect.Type, str string) (interface{}, error) { data := reflect.New(t).Interface() if err := json.Unmarshal([]byte(str), data); err != nil { - return nil, fmt.Errorf("failed to decode data: %v", err) + return nil, errors.Wrap(err, "failed to decode data") } return reflect.ValueOf(data).Elem().Interface(), nil } diff --git a/sdks/go/pkg/beam/core/util/symtab/symtab.go b/sdks/go/pkg/beam/core/util/symtab/symtab.go index 408af22..ded72a7 100644 --- a/sdks/go/pkg/beam/core/util/symtab/symtab.go +++ b/sdks/go/pkg/beam/core/util/symtab/symtab.go @@ -21,8 +21,9 @@ import ( "debug/elf" "debug/macho" "debug/pe" - "fmt" "os" + + "github.com/apache/beam/sdks/go/pkg/beam/internal/errors" ) // SymbolTable allows for mapping between symbols and their addresses. @@ -48,7 +49,7 @@ func New(filename string) (*SymbolTable, error) { d, err := ef.DWARF() if err != nil { f.Close() - return nil, fmt.Errorf("No working DWARF: %v", err) + return nil, errors.Wrap(err, "No working DWARF") } return &SymbolTable{d}, nil } @@ -59,7 +60,7 @@ func New(filename string) (*SymbolTable, error) { d, err := mf.DWARF() if err != nil { f.Close() - return nil, fmt.Errorf("No working DWARF: %v", err) + return nil, errors.Wrap(err, "No working DWARF") } return &SymbolTable{d}, nil } @@ -70,14 +71,14 @@ func New(filename string) (*SymbolTable, error) { d, err := pf.DWARF() if err != nil { f.Close() - return nil, fmt.Errorf("No working DWARF: %v", err) + return nil, errors.Wrap(err, "No working DWARF") } return &SymbolTable{d}, nil } // Give up, we don't recognize it f.Close() - return nil, fmt.Errorf("Unknown file format") + return nil, errors.New("Unknown file format") } // Addr2Sym returns the symbol name for the provided address. @@ -100,7 +101,7 @@ func (s *SymbolTable) Addr2Sym(addr uintptr) (string, error) { } } } - return "", fmt.Errorf("no symbol found at address %x", addr) + return "", errors.Errorf("no symbol found at address %x", addr) } // Sym2Addr returns the address of the provided symbol name. @@ -124,5 +125,5 @@ func (s *SymbolTable) Sym2Addr(symbol string) (uintptr, error) { } } } - return 0, fmt.Errorf("no symbol %q", symbol) + return 0, errors.Errorf("no symbol %q", symbol) }