This is an automated email from the ASF dual-hosted git repository.

anton pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 7c92fe0  [BEAM-7154] Updating Go SDK errors (Part 3)
     new 4f9361b  Merge pull request #8560 from youngoli/beam7154-2
7c92fe0 is described below

commit 7c92fe0bc3ba73320ee26b6323eb01884381afcc
Author: Daniel Oliveira <daniel.o.program...@gmail.com>
AuthorDate: Sun May 12 20:51:46 2019 -0700

    [BEAM-7154] Updating Go SDK errors (Part 3)
---
 sdks/go/pkg/beam/core/funcx/fn.go                  |  10 +-
 sdks/go/pkg/beam/core/funcx/signature.go           |  15 ++-
 sdks/go/pkg/beam/core/graph/bind.go                |  60 +++++----
 sdks/go/pkg/beam/core/graph/coder/coder.go         |  13 +-
 sdks/go/pkg/beam/core/graph/coder/registry.go      |   7 +-
 sdks/go/pkg/beam/core/graph/coder/varint.go        |   2 +-
 sdks/go/pkg/beam/core/graph/edge.go                |  41 ++++--
 sdks/go/pkg/beam/core/graph/fn.go                  |  16 +--
 sdks/go/pkg/beam/core/graph/graph.go               |   7 +-
 sdks/go/pkg/beam/core/runtime/coderx/float.go      |   5 +-
 sdks/go/pkg/beam/core/runtime/coderx/varint.go     |   9 +-
 sdks/go/pkg/beam/core/runtime/exec/coder.go        |   3 +-
 sdks/go/pkg/beam/core/runtime/exec/cogbk.go        |   4 +-
 sdks/go/pkg/beam/core/runtime/exec/combine.go      |  23 ++--
 sdks/go/pkg/beam/core/runtime/exec/combine_test.go |   3 +-
 sdks/go/pkg/beam/core/runtime/exec/datasink.go     |   3 +-
 sdks/go/pkg/beam/core/runtime/exec/datasource.go   |  17 +--
 sdks/go/pkg/beam/core/runtime/exec/fn.go           |  15 ++-
 sdks/go/pkg/beam/core/runtime/exec/input.go        |   3 +-
 sdks/go/pkg/beam/core/runtime/exec/pardo.go        |   9 +-
 sdks/go/pkg/beam/core/runtime/exec/plan.go         |  11 +-
 sdks/go/pkg/beam/core/runtime/exec/translate.go    |  33 ++---
 sdks/go/pkg/beam/core/runtime/exec/unit_test.go    |  12 +-
 sdks/go/pkg/beam/core/runtime/exec/util.go         |   5 +-
 sdks/go/pkg/beam/core/runtime/graphx/coder.go      |  28 ++--
 sdks/go/pkg/beam/core/runtime/graphx/dataflow.go   |  27 ++--
 sdks/go/pkg/beam/core/runtime/graphx/serialize.go  | 149 ++++++++++++++-------
 sdks/go/pkg/beam/core/runtime/graphx/translate.go  |   3 +-
 sdks/go/pkg/beam/core/runtime/harness/datamgr.go   |  12 +-
 sdks/go/pkg/beam/core/runtime/harness/harness.go   |   7 +-
 sdks/go/pkg/beam/core/runtime/harness/logging.go   |   3 +-
 sdks/go/pkg/beam/core/runtime/harness/session.go   |  13 +-
 sdks/go/pkg/beam/core/runtime/harness/statemgr.go  |  14 +-
 sdks/go/pkg/beam/core/runtime/pipelinex/replace.go |   3 +-
 sdks/go/pkg/beam/core/runtime/symbols.go           |   4 +-
 sdks/go/pkg/beam/core/typex/fulltype.go            |  12 +-
 sdks/go/pkg/beam/core/util/dot/dot.go              |   5 +-
 sdks/go/pkg/beam/core/util/hooks/hooks.go          |  12 +-
 sdks/go/pkg/beam/core/util/ioutilx/read.go         |   3 +-
 sdks/go/pkg/beam/core/util/protox/any.go           |   9 +-
 sdks/go/pkg/beam/core/util/protox/base64.go        |   4 +-
 sdks/go/pkg/beam/core/util/reflectx/call.go        |   4 +-
 sdks/go/pkg/beam/core/util/reflectx/json.go        |   5 +-
 sdks/go/pkg/beam/core/util/symtab/symtab.go        |  15 ++-
 44 files changed, 376 insertions(+), 282 deletions(-)

diff --git a/sdks/go/pkg/beam/core/funcx/fn.go 
b/sdks/go/pkg/beam/core/funcx/fn.go
index 48129a5..c924782 100644
--- a/sdks/go/pkg/beam/core/funcx/fn.go
+++ b/sdks/go/pkg/beam/core/funcx/fn.go
@@ -16,12 +16,12 @@
 package funcx
 
 import (
-       "errors"
        "fmt"
        "reflect"
 
        "github.com/apache/beam/sdks/go/pkg/beam/core/typex"
        "github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
+       "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 )
 
 // Note that we can't tell the difference between K, V and V, S before binding.
@@ -250,7 +250,7 @@ func New(fn reflectx.Func) (*Fn, error) {
                case IsReIter(t):
                        kind = FnReIter
                default:
-                       return nil, fmt.Errorf("bad parameter type for %s: %v", 
fn.Name(), t)
+                       return nil, errors.Errorf("bad parameter type for %s: 
%v", fn.Name(), t)
                }
 
                param = append(param, FnParam{Kind: kind, T: t})
@@ -269,7 +269,7 @@ func New(fn reflectx.Func) (*Fn, error) {
                case typex.IsContainer(t), typex.IsConcrete(t), 
typex.IsUniversal(t):
                        kind = RetValue
                default:
-                       return nil, fmt.Errorf("bad return type for %s: %v", 
fn.Name(), t)
+                       return nil, errors.Errorf("bad return type for %s: %v", 
fn.Name(), t)
                }
 
                ret = append(ret, ReturnParam{Kind: kind, T: t})
@@ -314,14 +314,14 @@ func validateOrder(u *Fn) error {
        // Validate the parameter ordering.
        for i, p := range u.Param {
                if paramState, err = nextParamState(paramState, p.Kind); err != 
nil {
-                       return fmt.Errorf("%s at parameter %d for %s", 
err.Error(), i, u.Fn.Name())
+                       return errors.WithContextf(err, "validating parameter 
%d for %s", i, u.Fn.Name())
                }
        }
        // Validate the return value ordering.
        retState := rsStart
        for i, r := range u.Ret {
                if retState, err = nextRetState(retState, r.Kind); err != nil {
-                       return fmt.Errorf("%s for return value %d for %s", 
err.Error(), i, u.Fn.Name())
+                       return errors.WithContextf(err, "validating return 
value %d for %s", i, u.Fn.Name())
                }
        }
        return nil
diff --git a/sdks/go/pkg/beam/core/funcx/signature.go 
b/sdks/go/pkg/beam/core/funcx/signature.go
index 6caa5cd..8bcd386 100644
--- a/sdks/go/pkg/beam/core/funcx/signature.go
+++ b/sdks/go/pkg/beam/core/funcx/signature.go
@@ -22,6 +22,7 @@ import (
 
        "github.com/apache/beam/sdks/go/pkg/beam/core/typex"
        "github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
+       "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 )
 
 // Signature is a concise representation of a group of function types. The
@@ -118,7 +119,7 @@ func Satisfy(fn interface{}, sig *Signature) error {
        default:
                value := reflect.ValueOf(fn)
                if value.Kind() != reflect.Func {
-                       return fmt.Errorf("not a function: %v", value)
+                       return errors.Errorf("not a function: %v", value)
                }
                typ = value.Type()
        }
@@ -129,11 +130,11 @@ func Satisfy(fn interface{}, sig *Signature) error {
                out = append(out, typ.Out(i))
        }
        if len(in) < len(sig.Args) || len(out) < len(sig.Return) {
-               return fmt.Errorf("not enough required parameters: %v", typ)
+               return errors.Errorf("not enough required parameters: %v", typ)
        }
 
        if len(in) > len(sig.Args)+len(sig.OptArgs) || len(out) > 
len(sig.Return)+len(sig.OptReturn) {
-               return fmt.Errorf("too many parameters: %v", typ)
+               return errors.Errorf("too many parameters: %v", typ)
        }
 
        // (1) Create generic binding. If inconsistent, reject fn. We do not 
allow
@@ -170,7 +171,7 @@ func bind(list, models []reflect.Type, m 
map[string]reflect.Type) error {
 
                name := list[i].Name()
                if current, ok := m[name]; ok && current != t {
-                       return fmt.Errorf("bind conflict for %v: %v != %v", 
name, current, t)
+                       return errors.Errorf("bind conflict for %v: %v != %v", 
name, current, t)
                }
                m[name] = t
        }
@@ -210,7 +211,7 @@ func matchOpt(list, models []reflect.Type, m 
map[string]reflect.Type) error {
                        // Substitute optional types, if bound.
                        subst, ok := m[t.Name()]
                        if !ok {
-                               return fmt.Errorf("optional generic parameter 
not bound %v", t.Name())
+                               return errors.Errorf("optional generic 
parameter not bound %v", t.Name())
                        }
                        t = subst
                }
@@ -219,7 +220,7 @@ func matchOpt(list, models []reflect.Type, m 
map[string]reflect.Type) error {
                }
 
                if i == len(models) {
-                       return fmt.Errorf("failed to match optional parameter 
%v", t)
+                       return errors.Errorf("failed to match optional 
parameter %v", t)
                }
        }
        return nil
@@ -228,6 +229,6 @@ func matchOpt(list, models []reflect.Type, m 
map[string]reflect.Type) error {
 // MustSatisfy panics if the given fn does not satisfy the signature.
 func MustSatisfy(fn interface{}, sig *Signature) {
        if err := Satisfy(fn, sig); err != nil {
-               panic(fmt.Sprintf("fn does not satisfy signature %v: %v", sig, 
err))
+               panic(errors.Wrapf(err, "fn does not satisfy signature %v", 
sig))
        }
 }
diff --git a/sdks/go/pkg/beam/core/graph/bind.go 
b/sdks/go/pkg/beam/core/graph/bind.go
index 3aeb3d2..77a6d69 100644
--- a/sdks/go/pkg/beam/core/graph/bind.go
+++ b/sdks/go/pkg/beam/core/graph/bind.go
@@ -16,11 +16,11 @@
 package graph
 
 import (
-       "fmt"
        "reflect"
 
        "github.com/apache/beam/sdks/go/pkg/beam/core/funcx"
        "github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+       "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 )
 
 // TODO(herohde) 4/21/2017: Bind is where most user mistakes will likely show
@@ -62,27 +62,28 @@ import (
 func Bind(fn *funcx.Fn, typedefs map[string]reflect.Type, in 
...typex.FullType) ([]typex.FullType, []InputKind, []typex.FullType, 
[]typex.FullType, error) {
        inbound, kinds, err := findInbound(fn, in...)
        if err != nil {
-               return nil, nil, nil, nil, fmt.Errorf("binding fn %v: %v", 
fn.Fn.Name(), err)
+               return nil, nil, nil, nil, errors.WithContextf(err, "binding fn 
%v", fn.Fn.Name())
        }
        outbound, err := findOutbound(fn)
        if err != nil {
-               return nil, nil, nil, nil, fmt.Errorf("binding fn %v: %v", 
fn.Fn.Name(), err)
+               return nil, nil, nil, nil, errors.WithContextf(err, "binding fn 
%v", fn.Fn.Name())
        }
 
        subst, err := typex.Bind(inbound, in)
        if err != nil {
-               return nil, nil, nil, nil, fmt.Errorf("binding fn %v: %v", 
fn.Fn.Name(), err)
+               return nil, nil, nil, nil, errors.WithContextf(err, "binding fn 
%v", fn.Fn.Name())
        }
        for k, v := range typedefs {
                if substK, exists := subst[k]; exists {
-                       return nil, nil, nil, nil, fmt.Errorf("binding fn %v: 
cannot substitute type %v with %v, already defined as %v", fn.Fn.Name(), k, v, 
substK)
+                       err := errors.Errorf("cannot substitute type %v with 
%v, already defined as %v", k, v, substK)
+                       return nil, nil, nil, nil, errors.WithContextf(err, 
"binding fn %v", fn.Fn.Name())
                }
                subst[k] = v
        }
 
        out, err := typex.Substitute(outbound, subst)
        if err != nil {
-               return nil, nil, nil, nil, fmt.Errorf("binding fn %v: %v", 
fn.Fn.Name(), err)
+               return nil, nil, nil, nil, errors.WithContextf(err, "binding fn 
%v", fn.Fn.Name())
        }
        return inbound, kinds, outbound, out, nil
 }
@@ -102,7 +103,7 @@ func findOutbound(fn *funcx.Fn) ([]typex.FullType, error) {
        case 2:
                outbound = append(outbound, typex.NewKV(typex.New(ret[0]), 
typex.New(ret[1])))
        default:
-               return nil, fmt.Errorf("too many return values: %v", ret)
+               return nil, errors.Errorf("too many return values: %v", ret)
        }
 
        for _, param := range params {
@@ -135,26 +136,29 @@ func findInbound(fn *funcx.Fn, in ...typex.FullType) 
([]typex.FullType, []InputK
        for _, input := range in {
                arity, err := inboundArity(input, index == 0)
                if err != nil {
-                       return nil, nil, fmt.Errorf("binding params %v to input 
%v: %v", params, input, err)
+                       return nil, nil, errors.WithContextf(err, "binding 
params %v to input %v", params, input)
                }
                if len(params)-index < arity {
-                       return nil, nil, fmt.Errorf("binding params %v to input 
%v: too few params", params[index:], input)
+                       err := errors.New("too few params")
+                       return nil, nil, errors.WithContextf(err, "binding 
params %v to input %v", params[index:], input)
                }
 
                paramsToBind := params[index : index+arity]
                elm, kind, err := tryBindInbound(input, paramsToBind, index == 
0)
                if err != nil {
-                       return nil, nil, fmt.Errorf("binding params %v to input 
%v: %v", paramsToBind, input, err)
+                       return nil, nil, errors.WithContextf(err, "binding 
params %v to input %v", paramsToBind, input)
                }
                inbound = append(inbound, elm)
                kinds = append(kinds, kind)
                index += arity
        }
        if index < len(params) {
-               return nil, nil, fmt.Errorf("binding params %v to inputs %v: 
too few inputs. Forgot an input or to annotate options?", params, in)
+               err := errors.New("too few inputs: forgot an input or to 
annotate options?")
+               return nil, nil, errors.WithContextf(err, "binding params %v to 
inputs %v:", params, in)
        }
        if index > len(params) {
-               return nil, nil, fmt.Errorf("binding params %v to inputs %v: 
too many inputs", params, in)
+               err := errors.New("too many inputs")
+               return nil, nil, errors.WithContextf(err, "binding params %v to 
inputs %v:", params, in)
        }
        return inbound, kinds, nil
 }
@@ -188,7 +192,7 @@ func tryBindInbound(t typex.FullType, args []funcx.FnParam, 
isMain bool) (typex.
                                values, _ := funcx.UnfoldIter(args[0].T)
                                trimmed := trimIllegal(values)
                                if len(trimmed) != 1 {
-                                       return nil, kind, fmt.Errorf("%v cannot 
bind to %v", t, args[0])
+                                       return nil, kind, errors.Errorf("%v 
cannot bind to %v", t, args[0])
                                }
 
                                kind = Iter
@@ -198,14 +202,14 @@ func tryBindInbound(t typex.FullType, args 
[]funcx.FnParam, isMain bool) (typex.
                                values, _ := funcx.UnfoldReIter(args[0].T)
                                trimmed := trimIllegal(values)
                                if len(trimmed) != 1 {
-                                       return nil, kind, fmt.Errorf("%v cannot 
bind to %v", t, args[0])
+                                       return nil, kind, errors.Errorf("%v 
cannot bind to %v", t, args[0])
                                }
 
                                kind = ReIter
                                other = typex.New(trimmed[0])
 
                        default:
-                               return nil, kind, fmt.Errorf("unexpected param 
kind: %v", arg)
+                               return nil, kind, errors.Errorf("unexpected 
param kind: %v", arg)
                        }
                }
        case typex.Composite:
@@ -213,10 +217,10 @@ func tryBindInbound(t typex.FullType, args 
[]funcx.FnParam, isMain bool) (typex.
                case typex.KVType:
                        if isMain {
                                if args[0].Kind != funcx.FnValue {
-                                       return nil, kind, fmt.Errorf("key of %v 
cannot bind to %v", t, args[0])
+                                       return nil, kind, errors.Errorf("key of 
%v cannot bind to %v", t, args[0])
                                }
                                if args[1].Kind != funcx.FnValue {
-                                       return nil, kind, fmt.Errorf("value of 
%v cannot bind to %v", t, args[1])
+                                       return nil, kind, errors.Errorf("value 
of %v cannot bind to %v", t, args[1])
                                }
                                other = typex.NewKV(typex.New(args[0].T), 
typex.New(args[1].T))
                        } else {
@@ -227,7 +231,7 @@ func tryBindInbound(t typex.FullType, args []funcx.FnParam, 
isMain bool) (typex.
                                        values, _ := funcx.UnfoldIter(args[0].T)
                                        trimmed := trimIllegal(values)
                                        if len(trimmed) != 2 {
-                                               return nil, kind, 
fmt.Errorf("%v cannot bind to %v", t, args[0])
+                                               return nil, kind, 
errors.Errorf("%v cannot bind to %v", t, args[0])
                                        }
 
                                        kind = Iter
@@ -237,20 +241,20 @@ func tryBindInbound(t typex.FullType, args 
[]funcx.FnParam, isMain bool) (typex.
                                        values, _ := 
funcx.UnfoldReIter(args[0].T)
                                        trimmed := trimIllegal(values)
                                        if len(trimmed) != 2 {
-                                               return nil, kind, 
fmt.Errorf("%v cannot bind to %v", t, args[0])
+                                               return nil, kind, 
errors.Errorf("%v cannot bind to %v", t, args[0])
                                        }
 
                                        kind = ReIter
                                        other = 
typex.NewKV(typex.New(trimmed[0]), typex.New(trimmed[1]))
 
                                default:
-                                       return nil, kind, fmt.Errorf("%v cannot 
bind to %v", t, args[0])
+                                       return nil, kind, errors.Errorf("%v 
cannot bind to %v", t, args[0])
                                }
                        }
 
                case typex.CoGBKType:
                        if args[0].Kind != funcx.FnValue {
-                               return nil, kind, fmt.Errorf("key of %v cannot 
bind to %v", t, args[0])
+                               return nil, kind, errors.Errorf("key of %v 
cannot bind to %v", t, args[0])
                        }
 
                        components := []typex.FullType{typex.New(args[0].T)}
@@ -261,7 +265,7 @@ func tryBindInbound(t typex.FullType, args []funcx.FnParam, 
isMain bool) (typex.
                                        values, _ := funcx.UnfoldIter(args[i].T)
                                        trimmed := trimIllegal(values)
                                        if len(trimmed) != 1 {
-                                               return nil, kind, 
fmt.Errorf("values of %v cannot bind to %v", t, args[i])
+                                               return nil, kind, 
errors.Errorf("values of %v cannot bind to %v", t, args[i])
                                        }
                                        components = append(components, 
typex.New(trimmed[0]))
 
@@ -269,25 +273,25 @@ func tryBindInbound(t typex.FullType, args 
[]funcx.FnParam, isMain bool) (typex.
                                        values, _ := 
funcx.UnfoldReIter(args[i].T)
                                        trimmed := trimIllegal(values)
                                        if len(trimmed) != 1 {
-                                               return nil, kind, 
fmt.Errorf("values of %v cannot bind to %v", t, args[i])
+                                               return nil, kind, 
errors.Errorf("values of %v cannot bind to %v", t, args[i])
                                        }
                                        components = append(components, 
typex.New(trimmed[0]))
                                default:
-                                       return nil, kind, fmt.Errorf("values of 
%v cannot bind to %v", t, args[i])
+                                       return nil, kind, errors.Errorf("values 
of %v cannot bind to %v", t, args[i])
                                }
                        }
                        other = typex.NewCoGBK(components...)
 
                default:
-                       return nil, kind, fmt.Errorf("unexpected inbound type: 
%v", t.Type())
+                       return nil, kind, errors.Errorf("unexpected inbound 
type: %v", t.Type())
                }
 
        default:
-               return nil, kind, fmt.Errorf("unexpected inbound class: %v", 
t.Class())
+               return nil, kind, errors.Errorf("unexpected inbound class: %v", 
t.Class())
        }
 
        if !typex.IsStructurallyAssignable(t, other) {
-               return nil, kind, fmt.Errorf("%v is not assignable to %v", t, 
other)
+               return nil, kind, errors.Errorf("%v is not assignable to %v", 
t, other)
        }
        return other, kind, nil
 }
@@ -304,7 +308,7 @@ func inboundArity(t typex.FullType, isMain bool) (int, 
error) {
                case typex.CoGBKType:
                        return len(t.Components()), nil
                default:
-                       return 0, fmt.Errorf("unexpected composite inbound 
type: %v", t.Type())
+                       return 0, errors.Errorf("unexpected composite inbound 
type: %v", t.Type())
                }
        }
        return 1, nil
diff --git a/sdks/go/pkg/beam/core/graph/coder/coder.go 
b/sdks/go/pkg/beam/core/graph/coder/coder.go
index cfcca35..4de630a 100644
--- a/sdks/go/pkg/beam/core/graph/coder/coder.go
+++ b/sdks/go/pkg/beam/core/graph/coder/coder.go
@@ -26,6 +26,7 @@ import (
        "github.com/apache/beam/sdks/go/pkg/beam/core/funcx"
        "github.com/apache/beam/sdks/go/pkg/beam/core/typex"
        "github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
+       "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 )
 
 // CustomCoder contains possibly untyped encode/decode user functions that are
@@ -106,7 +107,7 @@ type ElementDecoder interface {
 func validateEncoder(t reflect.Type, encode interface{}) error {
        // Check if it uses the real type in question.
        if err := funcx.Satisfy(encode, funcx.Replace(encodeSig, typex.TType, 
t)); err != nil {
-               return fmt.Errorf("validateEncoder: incorrect signature: %v", 
err)
+               return errors.WithContext(err, "validateEncoder: validating 
signature")
        }
        // TODO(lostluck): 2019.02.03 - Determine if there are encode 
allocation bottlenecks.
        return nil
@@ -115,7 +116,7 @@ func validateEncoder(t reflect.Type, encode interface{}) 
error {
 func validateDecoder(t reflect.Type, decode interface{}) error {
        // Check if it uses the real type in question.
        if err := funcx.Satisfy(decode, funcx.Replace(decodeSig, typex.TType, 
t)); err != nil {
-               return fmt.Errorf("validateDecoder: incorrect signature: %v", 
err)
+               return errors.WithContext(err, "validateDecoder: validating 
signature")
        }
        // TODO(lostluck): 2019.02.03 - Expand cases to avoid []byte -> 
interface{} conversion
        // in exec, & a beam Decoder interface.
@@ -126,19 +127,19 @@ func validateDecoder(t reflect.Type, decode interface{}) 
error {
 // particular encoding strategy.
 func NewCustomCoder(id string, t reflect.Type, encode, decode interface{}) 
(*CustomCoder, error) {
        if err := validateEncoder(t, encode); err != nil {
-               return nil, fmt.Errorf("NewCustomCoder: %v", err)
+               return nil, errors.WithContext(err, "NewCustomCoder")
        }
        enc, err := funcx.New(reflectx.MakeFunc(encode))
        if err != nil {
-               return nil, fmt.Errorf("bad encode: %v", err)
+               return nil, errors.Wrap(err, "bad encode")
        }
        if err := validateDecoder(t, decode); err != nil {
-               return nil, fmt.Errorf("NewCustomCoder: %v", err)
+               return nil, errors.WithContext(err, "NewCustomCoder")
        }
 
        dec, err := funcx.New(reflectx.MakeFunc(decode))
        if err != nil {
-               return nil, fmt.Errorf("bad decode: %v", err)
+               return nil, errors.Wrap(err, "bad decode")
        }
 
        c := &CustomCoder{
diff --git a/sdks/go/pkg/beam/core/graph/coder/registry.go 
b/sdks/go/pkg/beam/core/graph/coder/registry.go
index d95258d..5be9556 100644
--- a/sdks/go/pkg/beam/core/graph/coder/registry.go
+++ b/sdks/go/pkg/beam/core/graph/coder/registry.go
@@ -16,8 +16,9 @@
 package coder
 
 import (
-       "fmt"
        "reflect"
+
+       "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 )
 
 var (
@@ -45,7 +46,7 @@ func RegisterCoder(t reflect.Type, enc, dec interface{}) {
        key := tkey(t)
 
        if _, err := NewCustomCoder(t.String(), t, enc, dec); err != nil {
-               panic(fmt.Sprintf("RegisterCoder failed for type %v: %v", t, 
err))
+               panic(errors.Wrapf(err, "RegisterCoder failed for type %v", t))
        }
 
        if t.Kind() == reflect.Interface {
@@ -72,7 +73,7 @@ func RegisterCoder(t reflect.Type, enc, dec interface{}) {
                cc, err := NewCustomCoder(name, rt, enc, dec)
                if err != nil {
                        // An error on look up shouldn't happen after the 
validation.
-                       panic(fmt.Sprintf("Creating %v CustomCoder for type %v 
failed: %v", name, rt, err))
+                       panic(errors.Wrapf(err, "Creating %v CustomCoder for 
type %v failed", name, rt))
                }
                return cc
        }
diff --git a/sdks/go/pkg/beam/core/graph/coder/varint.go 
b/sdks/go/pkg/beam/core/graph/coder/varint.go
index 1c93d1d..4f311a0 100644
--- a/sdks/go/pkg/beam/core/graph/coder/varint.go
+++ b/sdks/go/pkg/beam/core/graph/coder/varint.go
@@ -16,10 +16,10 @@
 package coder
 
 import (
-       "errors"
        "io"
 
        "github.com/apache/beam/sdks/go/pkg/beam/core/util/ioutilx"
+       "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 )
 
 // ErrVarIntTooLong indicates a data corruption issue that needs special
diff --git a/sdks/go/pkg/beam/core/graph/edge.go 
b/sdks/go/pkg/beam/core/graph/edge.go
index 24b57fe..846dceb 100644
--- a/sdks/go/pkg/beam/core/graph/edge.go
+++ b/sdks/go/pkg/beam/core/graph/edge.go
@@ -24,6 +24,7 @@ import (
        "github.com/apache/beam/sdks/go/pkg/beam/core/graph/window"
        "github.com/apache/beam/sdks/go/pkg/beam/core/typex"
        "github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
+       "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 )
 
 // Opcode represents a primitive Beam instruction kind.
@@ -187,10 +188,12 @@ func (e *MultiEdge) String() string {
 func NewCoGBK(g *Graph, s *Scope, ns []*Node) (*MultiEdge, error) {
        if len(ns) == 0 {
                // TODO(BEAM-7086) Reduce the repetition in the context of all 
the errors in this file.
-               return nil, fmt.Errorf("creating new CoGBK in scope %v: needs 
at least 1 input", s)
+               err := errors.New("needs at least 1 input")
+               return nil, errors.WithContextf(err, "creating new CoGBK in 
scope %v", s)
        }
        if !typex.IsKV(ns[0].Type()) {
-               return nil, fmt.Errorf("creating new CoGBK in scope %v: input 
type must be KV: %v", s, ns[0])
+               err := errors.Errorf("input type must be KV: %v", ns[0])
+               return nil, errors.WithContextf(err, "creating new CoGBK in 
scope %v", s)
        }
 
        // (1) Create CoGBK result type: KV<T,U>, .., KV<T,Z> -> 
CoGBK<T,U,..,Z>.
@@ -203,16 +206,20 @@ func NewCoGBK(g *Graph, s *Scope, ns []*Node) 
(*MultiEdge, error) {
        for i := 1; i < len(ns); i++ {
                n := ns[i]
                if !typex.IsKV(n.Type()) {
-                       return nil, fmt.Errorf("creating new CoGBK in scope %v: 
input type must be KV: %v", s, n)
+                       err := errors.Errorf("input type must be KV: %v", n)
+                       return nil, errors.WithContextf(err, "creating new 
CoGBK in scope %v", s)
                }
                if !n.Coder.Components[0].Equals(c) {
-                       return nil, fmt.Errorf("creating new CoGBK in scope %v: 
key coder for %v is %v, want %v", s, n, n.Coder.Components[0], c)
+                       err := errors.Errorf("key coder for %v is %v, want %v", 
n, n.Coder.Components[0], c)
+                       return nil, errors.WithContextf(err, "creating new 
CoGBK in scope %v", s)
                }
                if !w.Equals(n.WindowingStrategy()) {
-                       return nil, fmt.Errorf("creating new CoGBK in scope %v: 
mismatched CoGBK windowing strategies: %v, want %v", s, n.WindowingStrategy(), 
w)
+                       err := errors.Errorf("mismatched CoGBK windowing 
strategies: %v, want %v", n.WindowingStrategy(), w)
+                       return nil, errors.WithContextf(err, "creating new 
CoGBK in scope %v", s)
                }
                if bounded != n.Bounded() {
-                       return nil, fmt.Errorf("creating new CoGBK in scope %v: 
unmatched CoGBK boundedness: %v, want %v", s, n.Bounded(), bounded)
+                       err := errors.Errorf("unmatched CoGBK boundedness: %v, 
want %v", n.Bounded(), bounded)
+                       return nil, errors.WithContextf(err, "creating new 
CoGBK in scope %v", s)
                }
 
                comp = append(comp, n.Type().Components()[1])
@@ -236,7 +243,8 @@ func NewCoGBK(g *Graph, s *Scope, ns []*Node) (*MultiEdge, 
error) {
 // the shared input type.
 func NewFlatten(g *Graph, s *Scope, in []*Node) (*MultiEdge, error) {
        if len(in) < 2 {
-               return nil, fmt.Errorf("creating new Flatten in scope %v: 
Flatten needs at least 2 input, got %v", s, len(in))
+               err := errors.Errorf("Flatten needs at least 2 input, got %v", 
len(in))
+               return nil, errors.WithContextf(err, "creating new Flatten in 
scope %v", s)
        }
        t := in[0].Type()
        w := inputWindow(in)
@@ -252,14 +260,17 @@ func NewFlatten(g *Graph, s *Scope, in []*Node) 
(*MultiEdge, error) {
        }
        for _, n := range in {
                if !typex.IsEqual(t, n.Type()) {
-                       return nil, fmt.Errorf("creating new Flatten in scope 
%v: mismatched Flatten input types: %v, want %v", s, n.Type(), t)
+                       err := errors.Errorf("mismatched Flatten input types: 
%v, want %v", n.Type(), t)
+                       return nil, errors.WithContextf(err, "creating new 
Flatten in scope %v", s)
                }
                if !w.Equals(n.WindowingStrategy()) {
-                       return nil, fmt.Errorf("creating new Flatten in scope 
%v: mismatched Flatten window types: %v, want %v", s, n.WindowingStrategy(), w)
+                       err := errors.Errorf("mismatched Flatten window types: 
%v, want %v", n.WindowingStrategy(), w)
+                       return nil, errors.WithContextf(err, "creating new 
Flatten in scope %v", s)
                }
        }
        if typex.IsCoGBK(t) {
-               return nil, fmt.Errorf("creating new Flatten in scope %v: 
Flatten input type cannot be CoGBK: %v", s, t)
+               err := errors.Errorf("Flatten input type cannot be CoGBK: %v", 
t)
+               return nil, errors.WithContextf(err, "creating new Flatten in 
scope %v", s)
        }
 
        edge := g.NewEdge(s)
@@ -300,7 +311,7 @@ func newDoFnNode(op Opcode, g *Graph, s *Scope, u *DoFn, in 
[]*Node, typedefs ma
 
        inbound, kinds, outbound, out, err := Bind(u.ProcessElementFn(), 
typedefs, NodeTypes(in)...)
        if err != nil {
-               return nil, fmt.Errorf("creating new DoFn in scope %v: %v", s, 
err)
+               return nil, errors.WithContextf(err, "creating new DoFn in 
scope %v", s)
        }
 
        edge := g.NewEdge(s)
@@ -329,10 +340,12 @@ const CombinePerKeyScope = "CombinePerKey"
 func NewCombine(g *Graph, s *Scope, u *CombineFn, in *Node, ac *coder.Coder) 
(*MultiEdge, error) {
        inT := in.Type()
        if !typex.IsCoGBK(inT) {
-               return nil, fmt.Errorf("creating new Combine in scope %v: 
Combine requires CoGBK type: %v", s, inT)
+               err := errors.Errorf("Combine requires CoGBK type: %v", inT)
+               return nil, errors.WithContextf(err, "creating new Combine in 
scope %v", s)
        }
        if len(inT.Components()) > 2 {
-               return nil, fmt.Errorf("creating new Combine in scope %v: 
Combine cannot follow multi-input CoGBK: %v", s, inT)
+               err := errors.Errorf("Combine cannot follow multi-input CoGBK: 
%v", inT)
+               return nil, errors.WithContextf(err, "creating new Combine in 
scope %v", s)
        }
 
        // Create a synthetic function for binding purposes. It takes main input
@@ -381,7 +394,7 @@ func NewCombine(g *Graph, s *Scope, u *CombineFn, in *Node, 
ac *coder.Coder) (*M
 
        inbound, kinds, outbound, out, err := Bind(synth, nil, inT)
        if err != nil {
-               return nil, fmt.Errorf("creating new Combine in scope %v: %v", 
s, err)
+               return nil, errors.WithContextf(err, "creating new Combine in 
scope %v", s)
        }
 
        edge := g.NewEdge(s)
diff --git a/sdks/go/pkg/beam/core/graph/fn.go 
b/sdks/go/pkg/beam/core/graph/fn.go
index 7418c8c..d174c4f 100644
--- a/sdks/go/pkg/beam/core/graph/fn.go
+++ b/sdks/go/pkg/beam/core/graph/fn.go
@@ -94,7 +94,7 @@ func NewFn(fn interface{}) (*Fn, error) {
 
        case reflect.Ptr:
                if val.Elem().Kind() != reflect.Struct {
-                       return nil, fmt.Errorf("value %v must be ptr to 
struct", fn)
+                       return nil, errors.Errorf("value %v must be ptr to 
struct", fn)
                }
 
                // Note that a ptr receiver is necessary if struct fields are 
updated in the
@@ -107,7 +107,7 @@ func NewFn(fn interface{}) (*Fn, error) {
                        for name, mfn := range methodsFuncs {
                                f, err := funcx.New(mfn)
                                if err != nil {
-                                       return nil, fmt.Errorf("method %v 
invalid: %v", name, err)
+                                       return nil, errors.Wrapf(err, "method 
%v invalid", name)
                                }
                                methods[name] = f
                        }
@@ -133,14 +133,14 @@ func NewFn(fn interface{}) (*Fn, error) {
 
                        f, err := 
funcx.New(reflectx.MakeFunc(val.Method(i).Interface()))
                        if err != nil {
-                               return nil, fmt.Errorf("method %v invalid: %v", 
m.Name, err)
+                               return nil, errors.Wrapf(err, "method %v 
invalid", m.Name)
                        }
                        methods[m.Name] = f
                }
                return &Fn{Recv: fn, methods: methods}, nil
 
        default:
-               return nil, fmt.Errorf("value %v must be function or (ptr to) 
struct", fn)
+               return nil, errors.Errorf("value %v must be function or (ptr 
to) struct", fn)
        }
 }
 
@@ -220,7 +220,7 @@ func AsDoFn(fn *Fn) (*DoFn, error) {
        }
 
        if _, ok := fn.methods[processElementName]; !ok {
-               return nil, fmt.Errorf("graph.AsDoFn: failed to find %v method: 
%v", processElementName, fn)
+               return nil, errors.Errorf("graph.AsDoFn: failed to find %v 
method: %v", processElementName, fn)
        }
 
        // TODO(herohde) 5/18/2017: validate the signatures, incl. consistency.
@@ -296,7 +296,7 @@ func AsCombineFn(fn *Fn) (*CombineFn, error) {
 
        mergeFn, ok := fn.methods[mergeAccumulatorsName]
        if !ok {
-               return nil, fmt.Errorf("%v: failed to find required %v method 
on type: %v", fnKind, mergeAccumulatorsName, fn.Name())
+               return nil, errors.Errorf("%v: failed to find required %v 
method on type: %v", fnKind, mergeAccumulatorsName, fn.Name())
        }
 
        // CombineFn methods must satisfy the following:
@@ -306,7 +306,7 @@ func AsCombineFn(fn *Fn) (*CombineFn, error) {
        // ExtractOutput func(A) (O, error?)
        // This means that the other signatures *must* match the type used in 
MergeAccumulators.
        if len(mergeFn.Ret) <= 0 {
-               return nil, fmt.Errorf("%v: %v requires at least 1 return 
value. : %v", fnKind, mergeAccumulatorsName, mergeFn)
+               return nil, errors.Errorf("%v: %v requires at least 1 return 
value. : %v", fnKind, mergeAccumulatorsName, mergeFn)
        }
        accumType := mergeFn.Ret[0].T
 
@@ -359,7 +359,7 @@ func verifyValidNames(fnKind string, fn *Fn, names 
...string) error {
 
        for key := range fn.methods {
                if !m[key] {
-                       return fmt.Errorf("%s: unexpected exported method %v 
present. Valid methods are: %v", fnKind, key, names)
+                       return errors.Errorf("%s: unexpected exported method %v 
present. Valid methods are: %v", fnKind, key, names)
                }
        }
        return nil
diff --git a/sdks/go/pkg/beam/core/graph/graph.go 
b/sdks/go/pkg/beam/core/graph/graph.go
index dd68885..fb80cba 100644
--- a/sdks/go/pkg/beam/core/graph/graph.go
+++ b/sdks/go/pkg/beam/core/graph/graph.go
@@ -21,6 +21,7 @@ import (
 
        "github.com/apache/beam/sdks/go/pkg/beam/core/graph/window"
        "github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+       "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 )
 
 // Graph represents an in-progress deferred execution graph and is easily
@@ -87,7 +88,7 @@ func (g *Graph) Build() ([]*MultiEdge, []*Node, error) {
        for _, n := range g.nodes {
                nodes[n] = true
                if n.Coder == nil {
-                       return nil, nil, fmt.Errorf("node %v in graph has 
undefined coder", n.id)
+                       return nil, nil, errors.Errorf("node %v in graph has 
undefined coder", n.id)
                }
        }
        // Build a map of all nodes that are reachable by g.edges.
@@ -102,12 +103,12 @@ func (g *Graph) Build() ([]*MultiEdge, []*Node, error) {
        }
        for n := range nodes {
                if _, ok := reachable[n]; !ok {
-                       return nil, nil, fmt.Errorf("node %v in graph is 
unconnected", n.id)
+                       return nil, nil, errors.Errorf("node %v in graph is 
unconnected", n.id)
                }
        }
        for n, e := range reachable {
                if _, ok := nodes[n]; !ok {
-                       return nil, nil, fmt.Errorf("node %v is reachable by 
edge %v, but it's not in same graph", n.id, e.id)
+                       return nil, nil, errors.Errorf("node %v is reachable by 
edge %v, but it's not in same graph", n.id, e.id)
                }
        }
        return g.edges, g.nodes, nil
diff --git a/sdks/go/pkg/beam/core/runtime/coderx/float.go 
b/sdks/go/pkg/beam/core/runtime/coderx/float.go
index 3179f73..b502d05 100644
--- a/sdks/go/pkg/beam/core/runtime/coderx/float.go
+++ b/sdks/go/pkg/beam/core/runtime/coderx/float.go
@@ -24,6 +24,7 @@ import (
        "github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
        "github.com/apache/beam/sdks/go/pkg/beam/core/typex"
        "github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
+       "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 )
 
 func encFloat(v typex.T) []byte {
@@ -43,7 +44,7 @@ func encFloat(v typex.T) []byte {
 func decFloat(t reflect.Type, data []byte) (typex.T, error) {
        uval, err := decVarUintZ(reflectx.Uint64, data)
        if err != nil {
-               return nil, fmt.Errorf("invalid float encoding for: %v", data)
+               return nil, errors.Errorf("invalid float encoding for: %v", 
data)
        }
 
        n := math.Float64frombits(bits.ReverseBytes64(uval.(uint64)))
@@ -64,6 +65,6 @@ func NewFloat(t reflect.Type) (*coder.CustomCoder, error) {
        case reflect.Float32, reflect.Float64:
                return coder.NewCustomCoder("float", t, encFloat, decFloat)
        default:
-               return nil, fmt.Errorf("not a float type: %v", t)
+               return nil, errors.Errorf("not a float type: %v", t)
        }
 }
diff --git a/sdks/go/pkg/beam/core/runtime/coderx/varint.go 
b/sdks/go/pkg/beam/core/runtime/coderx/varint.go
index c0b2b79..07e4ce1 100644
--- a/sdks/go/pkg/beam/core/runtime/coderx/varint.go
+++ b/sdks/go/pkg/beam/core/runtime/coderx/varint.go
@@ -22,6 +22,7 @@ import (
 
        "github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
        "github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+       "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 )
 
 // NewVarIntZ returns a varint coder for the given integer type. It uses a 
zig-zag scheme,
@@ -31,7 +32,7 @@ func NewVarIntZ(t reflect.Type) (*coder.CustomCoder, error) {
        case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, 
reflect.Int64:
                return coder.NewCustomCoder("varintz", t, encVarIntZ, 
decVarIntZ)
        default:
-               return nil, fmt.Errorf("not a signed integer type: %v", t)
+               return nil, errors.Errorf("not a signed integer type: %v", t)
        }
 }
 
@@ -42,7 +43,7 @@ func NewVarUintZ(t reflect.Type) (*coder.CustomCoder, error) {
        case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, 
reflect.Uint64:
                return coder.NewCustomCoder("varuintz", t, encVarUintZ, 
decVarUintZ)
        default:
-               return nil, fmt.Errorf("not a unsigned integer type: %v", t)
+               return nil, errors.Errorf("not a unsigned integer type: %v", t)
        }
 }
 
@@ -70,7 +71,7 @@ func encVarIntZ(v typex.T) []byte {
 func decVarIntZ(t reflect.Type, data []byte) (typex.T, error) {
        n, size := binary.Varint(data)
        if size <= 0 {
-               return nil, fmt.Errorf("invalid varintz encoding for: %v", data)
+               return nil, errors.Errorf("invalid varintz encoding for: %v", 
data)
        }
        switch t.Kind() {
        case reflect.Int:
@@ -112,7 +113,7 @@ func encVarUintZ(v typex.T) []byte {
 func decVarUintZ(t reflect.Type, data []byte) (typex.T, error) {
        n, size := binary.Uvarint(data)
        if size <= 0 {
-               return nil, fmt.Errorf("invalid varuintz encoding for: %v", 
data)
+               return nil, errors.Errorf("invalid varuintz encoding for: %v", 
data)
        }
        switch t.Kind() {
        case reflect.Uint:
diff --git a/sdks/go/pkg/beam/core/runtime/exec/coder.go 
b/sdks/go/pkg/beam/core/runtime/exec/coder.go
index dfe2ac1..a55d7f7 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/coder.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/coder.go
@@ -27,6 +27,7 @@ import (
        "github.com/apache/beam/sdks/go/pkg/beam/core/graph/window"
        "github.com/apache/beam/sdks/go/pkg/beam/core/typex"
        "github.com/apache/beam/sdks/go/pkg/beam/core/util/ioutilx"
+       "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 )
 
 // NOTE(herohde) 4/30/2017: The main complication is CoGBK results, which have
@@ -119,7 +120,7 @@ func (*bytesEncoder) Encode(val *FullValue, w io.Writer) 
error {
        var data []byte
        data, ok := val.Elm.([]byte)
        if !ok {
-               return fmt.Errorf("received unknown value type: want []byte, 
got %T", val.Elm)
+               return errors.Errorf("received unknown value type: want []byte, 
got %T", val.Elm)
        }
        size := len(data)
 
diff --git a/sdks/go/pkg/beam/core/runtime/exec/cogbk.go 
b/sdks/go/pkg/beam/core/runtime/exec/cogbk.go
index 40f5636..5f207a2 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/cogbk.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/cogbk.go
@@ -19,6 +19,8 @@ import (
        "bytes"
        "context"
        "fmt"
+
+       "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 )
 
 // TODO(BEAM-490): This file contains support for the handling of CoGBK
@@ -167,7 +169,7 @@ func (f *filterStream) Read() (*FullValue, error) {
 
                v, err := f.dec.Decode(bytes.NewReader(value))
                if err != nil {
-                       return nil, fmt.Errorf("failed to decode union value 
'%v' for key %v: %v", value, key, err)
+                       return nil, errors.Wrapf(err, "failed to decode union 
value '%v' for key %v", value, key)
                }
                v.Timestamp = elm.Timestamp
                v.Windows = elm.Windows
diff --git a/sdks/go/pkg/beam/core/runtime/exec/combine.go 
b/sdks/go/pkg/beam/core/runtime/exec/combine.go
index ef8c3cd..7815ac1 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/combine.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/combine.go
@@ -27,6 +27,7 @@ import (
        "github.com/apache/beam/sdks/go/pkg/beam/core/metrics"
        "github.com/apache/beam/sdks/go/pkg/beam/core/typex"
        "github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
+       "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
        "github.com/apache/beam/sdks/go/pkg/beam/util/errorx"
 )
 
@@ -64,7 +65,7 @@ func (n *Combine) ID() UnitID {
 // Up initializes this CombineFn and runs its SetupFn() method.
 func (n *Combine) Up(ctx context.Context) error {
        if n.status != Initializing {
-               return fmt.Errorf("invalid status for combine %v: %v", n.UID, 
n.status)
+               return errors.Errorf("invalid status for combine %v: %v", 
n.UID, n.status)
        }
        n.status = Up
 
@@ -103,7 +104,7 @@ func (n *Combine) mergeAccumulators(ctx context.Context, a, 
b interface{}) (inte
        in := &MainInput{Key: FullValue{Elm: a}}
        val, err := n.mergeInv.InvokeWithoutEventTime(ctx, in, b)
        if err != nil {
-               return nil, n.fail(fmt.Errorf("MergeAccumulators failed: %v", 
err))
+               return nil, n.fail(errors.WithContext(err, "invoking 
MergeAccumulators"))
        }
        return val.Elm, nil
 }
@@ -111,7 +112,7 @@ func (n *Combine) mergeAccumulators(ctx context.Context, a, 
b interface{}) (inte
 // StartBundle initializes processing this bundle for combines.
 func (n *Combine) StartBundle(ctx context.Context, id string, data 
DataContext) error {
        if n.status != Up {
-               return fmt.Errorf("invalid status for combine %v: %v", n.UID, 
n.status)
+               return errors.Errorf("invalid status for combine %v: %v", 
n.UID, n.status)
        }
        n.status = Active
 
@@ -130,7 +131,7 @@ func (n *Combine) StartBundle(ctx context.Context, id 
string, data DataContext)
 // AddInput, MergeAccumulators, and ExtractOutput functions.
 func (n *Combine) ProcessElement(ctx context.Context, value *FullValue, values 
...ReStream) error {
        if n.status != Active {
-               return fmt.Errorf("invalid status for combine %v: %v", n.UID, 
n.status)
+               return errors.Errorf("invalid status for combine %v: %v", 
n.UID, n.status)
        }
 
        // Note that we do not explicitly call merge, although it may
@@ -173,7 +174,7 @@ func (n *Combine) ProcessElement(ctx context.Context, value 
*FullValue, values .
 // FinishBundle completes this node's processing of a bundle.
 func (n *Combine) FinishBundle(ctx context.Context) error {
        if n.status != Active {
-               return fmt.Errorf("invalid status for combine %v: %v", n.UID, 
n.status)
+               return errors.Errorf("invalid status for combine %v: %v", 
n.UID, n.status)
        }
        n.status = Up
        if n.createAccumInv != nil {
@@ -221,7 +222,7 @@ func (n *Combine) newAccum(ctx context.Context, key 
interface{}) (interface{}, e
 
        val, err := n.createAccumInv.InvokeWithoutEventTime(ctx, opt)
        if err != nil {
-               return nil, fmt.Errorf("CreateAccumulator failed: %v", err)
+               return nil, n.fail(errors.WithContext(err, "invoking 
CreateAccumulator"))
        }
        return val.Elm, nil
 }
@@ -264,7 +265,7 @@ func (n *Combine) addInput(ctx context.Context, accum, key, 
value interface{}, t
 
        val, err := n.addInputInv.InvokeWithoutEventTime(ctx, opt, v)
        if err != nil {
-               return nil, n.fail(fmt.Errorf("AddInput failed: %v", err))
+               return nil, n.fail(errors.WithContext(err, "invoking AddInput"))
        }
        return val.Elm, err
 }
@@ -278,7 +279,7 @@ func (n *Combine) extract(ctx context.Context, accum 
interface{}) (interface{},
 
        val, err := n.extractOutputInv.InvokeWithoutEventTime(ctx, nil, accum)
        if err != nil {
-               return nil, n.fail(fmt.Errorf("ExtractOutput failed: %v", err))
+               return nil, n.fail(errors.WithContext(err, "invoking 
ExtractOutput"))
        }
        return val.Elm, err
 }
@@ -335,7 +336,7 @@ func (n *LiftedCombine) StartBundle(ctx context.Context, id 
string, data DataCon
 // policy is used.
 func (n *LiftedCombine) ProcessElement(ctx context.Context, value *FullValue, 
values ...ReStream) error {
        if n.status != Active {
-               return fmt.Errorf("invalid status for precombine %v: %v", 
n.UID, n.status)
+               return errors.Errorf("invalid status for precombine %v: %v", 
n.UID, n.status)
        }
 
        key, err := n.keyHash.Hash(value.Elm)
@@ -435,7 +436,7 @@ func (n *MergeAccumulators) String() string {
 // runs the MergeAccumulatorsFn over them repeatedly.
 func (n *MergeAccumulators) ProcessElement(ctx context.Context, value 
*FullValue, values ...ReStream) error {
        if n.status != Active {
-               return fmt.Errorf("invalid status for combine merge %v: %v", 
n.UID, n.status)
+               return errors.Errorf("invalid status for combine merge %v: %v", 
n.UID, n.status)
        }
        a, err := n.newAccum(n.Combine.ctx, value.Elm)
        if err != nil {
@@ -490,7 +491,7 @@ func (n *ExtractOutput) String() string {
 // ProcessElement accepts an accumulator value, and extracts the final return 
type from it.
 func (n *ExtractOutput) ProcessElement(ctx context.Context, value *FullValue, 
values ...ReStream) error {
        if n.status != Active {
-               return fmt.Errorf("invalid status for combine extract %v: %v", 
n.UID, n.status)
+               return errors.Errorf("invalid status for combine extract %v: 
%v", n.UID, n.status)
        }
        out, err := n.extract(n.Combine.ctx, value.Elm2)
        if err != nil {
diff --git a/sdks/go/pkg/beam/core/runtime/exec/combine_test.go 
b/sdks/go/pkg/beam/core/runtime/exec/combine_test.go
index c206fc0..c60ab52 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/combine_test.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/combine_test.go
@@ -30,6 +30,7 @@ import (
        "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/coderx"
        "github.com/apache/beam/sdks/go/pkg/beam/core/typex"
        "github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
+       "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 )
 
 var intInput = []interface{}{int(1), int(2), int(3), int(4), int(5), int(6)}
@@ -315,7 +316,7 @@ func (*MyErrorCombine) MergeAccumulators(a, b int64) 
(int64, error) {
 func intCoder(t reflect.Type) *coder.Coder {
        c, err := coderx.NewVarIntZ(t)
        if err != nil {
-               panic(fmt.Sprintf("Couldn't get VarInt coder for %v: %v", t, 
err))
+               panic(errors.Wrapf(err, "Couldn't get VarInt coder for %v", t))
        }
        return &coder.Coder{Kind: coder.Custom, T: typex.New(t), Custom: c}
 }
diff --git a/sdks/go/pkg/beam/core/runtime/exec/datasink.go 
b/sdks/go/pkg/beam/core/runtime/exec/datasink.go
index 66eb380..e11c47a 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/datasink.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/datasink.go
@@ -24,6 +24,7 @@ import (
        "time"
 
        "github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
+       "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
        "github.com/apache/beam/sdks/go/pkg/beam/log"
 )
 
@@ -71,7 +72,7 @@ func (n *DataSink) ProcessElement(ctx context.Context, value 
*FullValue, values
                return err
        }
        if err := n.enc.Encode(value, &b); err != nil {
-               return fmt.Errorf("failed to encode element %v with coder %v: 
%v", value, n.enc, err)
+               return errors.WithContextf(err, "encoding element %v with coder 
%v", value, n.enc)
        }
        if _, err := n.w.Write(b.Bytes()); err != nil {
                return err
diff --git a/sdks/go/pkg/beam/core/runtime/exec/datasource.go 
b/sdks/go/pkg/beam/core/runtime/exec/datasource.go
index a6e141c..5d40b4a 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/datasource.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/datasource.go
@@ -23,6 +23,7 @@ import (
        "time"
 
        "github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
+       "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
        "github.com/apache/beam/sdks/go/pkg/beam/log"
 )
 
@@ -74,14 +75,14 @@ func (n *DataSource) Process(ctx context.Context) error {
                                if err == io.EOF {
                                        return nil
                                }
-                               return fmt.Errorf("source failed: %v", err)
+                               return errors.Wrap(err, "source failed")
                        }
 
                        // Decode key
 
                        key, err := ck.Decode(r)
                        if err != nil {
-                               return fmt.Errorf("source decode failed: %v", 
err)
+                               return errors.Wrap(err, "source decode failed")
                        }
                        key.Timestamp = t
                        key.Windows = ws
@@ -94,7 +95,7 @@ func (n *DataSource) Process(ctx context.Context) error {
 
                        size, err := coder.DecodeInt32(r)
                        if err != nil {
-                               return fmt.Errorf("stream size decoding failed: 
%v", err)
+                               return errors.Wrap(err, "stream size decoding 
failed")
                        }
 
                        if size > -1 {
@@ -106,7 +107,7 @@ func (n *DataSource) Process(ctx context.Context) error {
                                for i := int32(0); i < size; i++ {
                                        value, err := cv.Decode(r)
                                        if err != nil {
-                                               return fmt.Errorf("stream value 
decode failed: %v", err)
+                                               return errors.Wrap(err, "stream 
value decode failed")
                                        }
                                        buf = append(buf, *value)
                                }
@@ -116,7 +117,7 @@ func (n *DataSource) Process(ctx context.Context) error {
                                for {
                                        chunk, err := coder.DecodeVarUint64(r)
                                        if err != nil {
-                                               return fmt.Errorf("stream chunk 
size decoding failed: %v", err)
+                                               return errors.Wrap(err, "stream 
chunk size decoding failed")
                                        }
 
                                        // log.Printf("Chunk size=%v", chunk)
@@ -129,7 +130,7 @@ func (n *DataSource) Process(ctx context.Context) error {
                                        for i := uint64(0); i < chunk; i++ {
                                                value, err := cv.Decode(r)
                                                if err != nil {
-                                                       return 
fmt.Errorf("stream value decode failed: %v", err)
+                                                       return errors.Wrap(err, 
"stream value decode failed")
                                                }
                                                buf = append(buf, *value)
                                        }
@@ -152,12 +153,12 @@ func (n *DataSource) Process(ctx context.Context) error {
                                if err == io.EOF {
                                        return nil
                                }
-                               return fmt.Errorf("source failed: %v", err)
+                               return errors.Wrap(err, "source failed")
                        }
 
                        elm, err := ec.Decode(r)
                        if err != nil {
-                               return fmt.Errorf("source decode failed: %v", 
err)
+                               return errors.Wrap(err, "source decode failed")
                        }
                        elm.Timestamp = t
                        elm.Windows = ws
diff --git a/sdks/go/pkg/beam/core/runtime/exec/fn.go 
b/sdks/go/pkg/beam/core/runtime/exec/fn.go
index 0b13744..b6c479c 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/fn.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/fn.go
@@ -25,6 +25,7 @@ import (
        "github.com/apache/beam/sdks/go/pkg/beam/core/graph/mtime"
        "github.com/apache/beam/sdks/go/pkg/beam/core/graph/window"
        "github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+       "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 )
 
 //go:generate specialize --input=fn_arity.tmpl
@@ -127,7 +128,7 @@ func (n *invoker) Invoke(ctx context.Context, ws 
[]typex.Window, ts typex.EventT
        }
        if n.wndIdx >= 0 {
                if len(ws) != 1 {
-                       return nil, fmt.Errorf("DoFns that observe windows must 
be invoked with single window: %v", opt.Key.Windows)
+                       return nil, errors.Errorf("DoFns that observe windows 
must be invoked with single window: %v", opt.Key.Windows)
                }
                args[n.wndIdx] = ws[0]
        }
@@ -157,7 +158,7 @@ func (n *invoker) Invoke(ctx context.Context, ws 
[]typex.Window, ts typex.EventT
                        param := fn.Param[in[i]]
 
                        if param.Kind != funcx.FnIter {
-                               return nil, fmt.Errorf("GBK/CoGBK result values 
must be iterable: %v", param)
+                               return nil, errors.Errorf("GBK/CoGBK result 
values must be iterable: %v", param)
                        }
 
                        // TODO(herohde) 12/12/2017: allow form conversion on 
GBK results?
@@ -250,11 +251,11 @@ func makeSideInputs(fn *funcx.Fn, in []*graph.Inbound, 
side []ReStream) ([]Reusa
        }
 
        if len(in) != len(side)+1 {
-               return nil, fmt.Errorf("found %v inbound, want %v", len(in), 
len(side)+1)
+               return nil, errors.Errorf("found %v inbound, want %v", len(in), 
len(side)+1)
        }
        param := fn.Params(funcx.FnValue | funcx.FnIter | funcx.FnReIter)
        if len(param) <= len(side) {
-               return nil, fmt.Errorf("found %v params, want >%v", len(param), 
len(side))
+               return nil, errors.Errorf("found %v params, want >%v", 
len(param), len(side))
        }
 
        // The side input are last of the above params, so we can compute the 
offset easily.
@@ -264,7 +265,7 @@ func makeSideInputs(fn *funcx.Fn, in []*graph.Inbound, side 
[]ReStream) ([]Reusa
        for i := 0; i < len(side); i++ {
                s, err := makeSideInput(in[i+1].Kind, 
fn.Param[param[i+offset]].T, side[i])
                if err != nil {
-                       return nil, fmt.Errorf("failed to make side input %v: 
%v", i, err)
+                       return nil, errors.WithContextf(err, "making side input 
%v", i)
                }
                ret = append(ret, s)
        }
@@ -282,7 +283,7 @@ func makeEmitters(fn *funcx.Fn, nodes []Node) 
([]ReusableEmitter, error) {
        }
        out := fn.Params(funcx.FnEmit)
        if len(out) != len(nodes)-offset {
-               return nil, fmt.Errorf("found %v emitters, want %v", len(out), 
len(nodes)-offset)
+               return nil, errors.Errorf("found %v emitters, want %v", 
len(out), len(nodes)-offset)
        }
 
        var ret []ReusableEmitter
@@ -302,7 +303,7 @@ func makeSideInput(kind graph.InputKind, t reflect.Type, 
values ReStream) (Reusa
                        return nil, err
                }
                if len(elms) != 1 {
-                       return nil, fmt.Errorf("singleton side input %v for %v 
ill-defined", kind, t)
+                       return nil, errors.Errorf("singleton side input %v for 
%v ill-defined", kind, t)
                }
                return &fixedValue{val: Convert(elms[0].Elm, t)}, nil
 
diff --git a/sdks/go/pkg/beam/core/runtime/exec/input.go 
b/sdks/go/pkg/beam/core/runtime/exec/input.go
index c6ddb25..74851c7 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/input.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/input.go
@@ -23,6 +23,7 @@ import (
 
        "github.com/apache/beam/sdks/go/pkg/beam/core/funcx"
        "github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+       "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 )
 
 // TODO(herohde) 4/26/2017: SideInput representation? We want it to be amenable
@@ -158,7 +159,7 @@ func (v *iterValue) invoke(args []reflect.Value) 
[]reflect.Value {
                if err == io.EOF {
                        return []reflect.Value{reflect.ValueOf(false)}
                }
-               panic(fmt.Sprintf("broken stream: %v", err))
+               panic(errors.Wrap(err, "broken stream"))
        }
 
        // We expect 1-3 out parameters: func (*int, *string) bool.
diff --git a/sdks/go/pkg/beam/core/runtime/exec/pardo.go 
b/sdks/go/pkg/beam/core/runtime/exec/pardo.go
index 49f74c5..612e7ff 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/pardo.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/pardo.go
@@ -26,6 +26,7 @@ import (
        "github.com/apache/beam/sdks/go/pkg/beam/core/graph/window"
        "github.com/apache/beam/sdks/go/pkg/beam/core/metrics"
        "github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+       "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
        "github.com/apache/beam/sdks/go/pkg/beam/util/errorx"
 )
 
@@ -69,7 +70,7 @@ func (n *ParDo) ID() UnitID {
 // Up initializes this ParDo and does one-time DoFn setup.
 func (n *ParDo) Up(ctx context.Context) error {
        if n.status != Initializing {
-               return fmt.Errorf("invalid status for pardo %v: %v, want 
Initializing", n.UID, n.status)
+               return errors.Errorf("invalid status for pardo %v: %v, want 
Initializing", n.UID, n.status)
        }
        n.status = Up
        n.inv = newInvoker(n.Fn.ProcessElementFn())
@@ -89,7 +90,7 @@ func (n *ParDo) Up(ctx context.Context) error {
 // StartBundle does pre-bundle processing operation for the DoFn.
 func (n *ParDo) StartBundle(ctx context.Context, id string, data DataContext) 
error {
        if n.status != Up {
-               return fmt.Errorf("invalid status for pardo %v: %v, want Up", 
n.UID, n.status)
+               return errors.Errorf("invalid status for pardo %v: %v, want 
Up", n.UID, n.status)
        }
        n.status = Active
        n.side = data.SideInput
@@ -113,7 +114,7 @@ func (n *ParDo) StartBundle(ctx context.Context, id string, 
data DataContext) er
 // ProcessElement processes each parallel element with the DoFn.
 func (n *ParDo) ProcessElement(ctx context.Context, elm *FullValue, values 
...ReStream) error {
        if n.status != Active {
-               return fmt.Errorf("invalid status for pardo %v: %v, want 
Active", n.UID, n.status)
+               return errors.Errorf("invalid status for pardo %v: %v, want 
Active", n.UID, n.status)
        }
        // If the function observes windows, we must invoke it for each window. 
The expected fast path
        // is that either there is a single window or the function doesn't 
observes windows.
@@ -162,7 +163,7 @@ func mustExplodeWindows(fn *funcx.Fn, elm *FullValue, 
usesSideInput bool) bool {
 // persisted at this point.
 func (n *ParDo) FinishBundle(ctx context.Context) error {
        if n.status != Active {
-               return fmt.Errorf("invalid status for pardo %v: %v, want 
Active", n.UID, n.status)
+               return errors.Errorf("invalid status for pardo %v: %v, want 
Active", n.UID, n.status)
        }
        n.status = Up
        n.inv.Reset()
diff --git a/sdks/go/pkg/beam/core/runtime/exec/plan.go 
b/sdks/go/pkg/beam/core/runtime/exec/plan.go
index 2436f08..5031c3e 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/plan.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/plan.go
@@ -23,6 +23,7 @@ import (
        "strings"
 
        "github.com/apache/beam/sdks/go/pkg/beam/core/metrics"
+       "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
        fnpb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1"
 )
 
@@ -55,7 +56,7 @@ func NewPlan(id string, units []Unit) (*Plan, error) {
 
        for _, u := range units {
                if u == nil {
-                       return nil, fmt.Errorf("no <nil> units")
+                       return nil, errors.Errorf("no <nil> units")
                }
                if r, ok := u.(Root); ok {
                        roots = append(roots, r)
@@ -68,7 +69,7 @@ func NewPlan(id string, units []Unit) (*Plan, error) {
                }
        }
        if len(roots) == 0 {
-               return nil, fmt.Errorf("no root units")
+               return nil, errors.Errorf("no root units")
        }
 
        return &Plan{
@@ -102,7 +103,7 @@ func (p *Plan) Execute(ctx context.Context, id string, 
manager DataContext) erro
        }
 
        if p.status != Up {
-               return fmt.Errorf("invalid status for plan %v: %v", p.id, 
p.status)
+               return errors.Errorf("invalid status for plan %v: %v", p.id, 
p.status)
        }
 
        // Process bundle. If there are any kinds of failures, we bail and mark 
the plan broken.
@@ -149,9 +150,9 @@ func (p *Plan) Down(ctx context.Context) error {
        case 0:
                return nil
        case 1:
-               return fmt.Errorf("plan %v failed: %v", p.id, errs[0])
+               return errors.Wrapf(errs[0], "plan %v failed", p.id)
        default:
-               return fmt.Errorf("plan %v failed with multiple errors: %v", 
p.id, errs)
+               return errors.Errorf("plan %v failed with multiple errors: %v", 
p.id, errs)
        }
 }
 
diff --git a/sdks/go/pkg/beam/core/runtime/exec/translate.go 
b/sdks/go/pkg/beam/core/runtime/exec/translate.go
index df69a9a..e3981d3 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/translate.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/translate.go
@@ -29,6 +29,7 @@ import (
        "github.com/apache/beam/sdks/go/pkg/beam/core/typex"
        "github.com/apache/beam/sdks/go/pkg/beam/core/util/protox"
        "github.com/apache/beam/sdks/go/pkg/beam/core/util/stringx"
+       "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
        fnpb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1"
        pb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
        "github.com/golang/protobuf/proto"
@@ -55,7 +56,7 @@ func UnmarshalPlan(desc *fnpb.ProcessBundleDescriptor) 
(*Plan, error) {
                        continue
                }
                if len(transform.GetOutputs()) != 1 {
-                       return nil, fmt.Errorf("expected one output from 
DataSource, got %v", transform.GetOutputs())
+                       return nil, errors.Errorf("expected one output from 
DataSource, got %v", transform.GetOutputs())
                }
 
                port, cid, err := 
unmarshalPort(transform.GetSpec().GetPayload())
@@ -85,7 +86,7 @@ func UnmarshalPlan(desc *fnpb.ProcessBundleDescriptor) 
(*Plan, error) {
                                        return nil, err
                                }
                                if !coder.IsW(u.Coder) {
-                                       return nil, fmt.Errorf("unwindowed 
coder %v on DataSource %v: %v", cid, id, u.Coder)
+                                       return nil, errors.Errorf("unwindowed 
coder %v on DataSource %v: %v", cid, id, u.Coder)
                                }
                        }
                }
@@ -165,7 +166,7 @@ func (b *builder) makeWindowingStrategy(id string) 
(*window.WindowingStrategy, e
 
        ws, ok := b.desc.GetWindowingStrategies()[id]
        if !ok {
-               return nil, fmt.Errorf("windowing strategy %v not found", id)
+               return nil, errors.Errorf("windowing strategy %v not found", id)
        }
        wfn, err := unmarshalWindowFn(ws.GetWindowFn().GetSpec())
        if err != nil {
@@ -219,7 +220,7 @@ func unmarshalWindowFn(wfn *pb.FunctionSpec) (*window.Fn, 
error) {
                return window.NewSessions(gap), nil
 
        default:
-               return nil, fmt.Errorf("unsupported window type: %v", urn)
+               return nil, errors.Errorf("unsupported window type: %v", urn)
        }
 }
 
@@ -238,7 +239,7 @@ func (b *builder) makePCollections(out []string) ([]Node, 
error) {
 func (b *builder) makeCoderForPCollection(id string) (*coder.Coder, 
*coder.WindowCoder, error) {
        col, ok := b.desc.GetPcollections()[id]
        if !ok {
-               return nil, nil, fmt.Errorf("pcollection %v not found", id)
+               return nil, nil, errors.Errorf("pcollection %v not found", id)
        }
        c, err := b.coders.Coder(col.CoderId)
        if err != nil {
@@ -254,7 +255,7 @@ func (b *builder) makeCoderForPCollection(id string) 
(*coder.Coder, *coder.Windo
 
        ws, ok := b.desc.GetWindowingStrategies()[col.GetWindowingStrategyId()]
        if !ok {
-               return nil, nil, fmt.Errorf("windowing strategy %v not found", 
id)
+               return nil, nil, errors.Errorf("windowing strategy %v not 
found", id)
        }
        wc, err := b.coders.WindowCoder(ws.GetWindowCoderId())
        if err != nil {
@@ -342,13 +343,13 @@ func (b *builder) makeLink(from string, id linkID) (Node, 
error) {
                case graphx.URNParDo:
                        var pardo pb.ParDoPayload
                        if err := proto.Unmarshal(payload, &pardo); err != nil {
-                               return nil, fmt.Errorf("invalid ParDo payload 
for %v: %v", transform, err)
+                               return nil, errors.Wrapf(err, "invalid ParDo 
payload for %v", transform)
                        }
                        data = string(pardo.GetDoFn().GetSpec().GetPayload())
                case urnPerKeyCombinePre, urnPerKeyCombineMerge, 
urnPerKeyCombineExtract:
                        var cmb pb.CombinePayload
                        if err := proto.Unmarshal(payload, &cmb); err != nil {
-                               return nil, fmt.Errorf("invalid CombinePayload 
payload for %v: %v", transform, err)
+                               return nil, errors.Wrapf(err, "invalid 
CombinePayload payload for %v", transform)
                        }
                        data = string(cmb.GetCombineFn().GetSpec().GetPayload())
                default:
@@ -363,7 +364,7 @@ func (b *builder) makeLink(from string, id linkID) (Node, 
error) {
 
                var tp v1.TransformPayload
                if err := protox.DecodeBase64(data, &tp); err != nil {
-                       return nil, fmt.Errorf("invalid transform payload for 
%v: %v", transform, err)
+                       return nil, errors.Wrapf(err, "invalid transform 
payload for %v", transform)
                }
 
                switch tpUrn := tp.GetUrn(); tpUrn {
@@ -422,14 +423,14 @@ func (b *builder) makeLink(from string, id linkID) (Node, 
error) {
                                case urnPerKeyCombinePre:
                                        inputs := 
unmarshalKeyedValues(transform.GetInputs())
                                        if len(inputs) != 1 {
-                                               return nil, 
fmt.Errorf("unexpected sideinput to combine: got %d, want 1", len(inputs))
+                                               return nil, 
errors.Errorf("unexpected sideinput to combine: got %d, want 1", len(inputs))
                                        }
                                        ec, _, err := 
b.makeCoderForPCollection(inputs[0])
                                        if err != nil {
                                                return nil, err
                                        }
                                        if !coder.IsKV(ec) {
-                                               return nil, 
fmt.Errorf("unexpected non-KV coder PCollection input to combine: %v", ec)
+                                               return nil, 
errors.Errorf("unexpected non-KV coder PCollection input to combine: %v", ec)
                                        }
                                        u = &LiftedCombine{Combine: cn, 
KeyCoder: ec.Components[0]}
                                case urnPerKeyCombineMerge:
@@ -452,7 +453,7 @@ func (b *builder) makeLink(from string, id linkID) (Node, 
error) {
                                return nil, err
                        }
                        if !coder.IsKV(c) {
-                               return nil, fmt.Errorf("unexpected inject 
coder: %v", c)
+                               return nil, errors.Errorf("unexpected inject 
coder: %v", c)
                        }
                        u = &Inject{UID: b.idgen.New(), N: (int)(tp.Inject.N), 
ValueEncoder: MakeElementEncoder(c.Components[1]), Out: out[0]}
 
@@ -466,7 +467,7 @@ func (b *builder) makeLink(from string, id linkID) (Node, 
error) {
                                return nil, err
                        }
                        if !coder.IsCoGBK(c) {
-                               return nil, fmt.Errorf("unexpected expand 
coder: %v", c)
+                               return nil, errors.Errorf("unexpected expand 
coder: %v", c)
                        }
 
                        var decoders []ElementDecoder
@@ -476,13 +477,13 @@ func (b *builder) makeLink(from string, id linkID) (Node, 
error) {
                        u = &Expand{UID: b.idgen.New(), ValueDecoders: 
decoders, Out: out[0]}
 
                default:
-                       return nil, fmt.Errorf("unexpected payload: %v", tp)
+                       return nil, errors.Errorf("unexpected payload: %v", tp)
                }
 
        case graphx.URNWindow:
                var wp pb.WindowIntoPayload
                if err := proto.Unmarshal(payload, &wp); err != nil {
-                       return nil, fmt.Errorf("invalid WindowInto payload for 
%v: %v", transform, err)
+                       return nil, errors.Wrapf(err, "invalid WindowInto 
payload for %v", transform)
                }
                wfn, err := unmarshalWindowFn(wp.GetWindowFn().GetSpec())
                if err != nil {
@@ -516,7 +517,7 @@ func (b *builder) makeLink(from string, id linkID) (Node, 
error) {
                                        return nil, err
                                }
                                if !coder.IsW(sink.Coder) {
-                                       return nil, fmt.Errorf("unwindowed 
coder %v on DataSink %v: %v", cid, id, sink.Coder)
+                                       return nil, errors.Errorf("unwindowed 
coder %v on DataSink %v: %v", cid, id, sink.Coder)
                                }
                        }
                }
diff --git a/sdks/go/pkg/beam/core/runtime/exec/unit_test.go 
b/sdks/go/pkg/beam/core/runtime/exec/unit_test.go
index aa0934a..0d17746 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/unit_test.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/unit_test.go
@@ -17,9 +17,9 @@ package exec
 
 import (
        "context"
-       "fmt"
 
        "github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+       "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 )
 
 // CaptureNode is a test Node that captures all elements for verification. It 
also
@@ -37,7 +37,7 @@ func (n *CaptureNode) ID() UnitID {
 
 func (n *CaptureNode) Up(ctx context.Context) error {
        if n.status != Initializing {
-               return fmt.Errorf("invalid status for %v: %v, want 
Initializing", n.UID, n.status)
+               return errors.Errorf("invalid status for %v: %v, want 
Initializing", n.UID, n.status)
        }
        n.status = Up
        return nil
@@ -45,7 +45,7 @@ func (n *CaptureNode) Up(ctx context.Context) error {
 
 func (n *CaptureNode) StartBundle(ctx context.Context, id string, data 
DataContext) error {
        if n.status != Up {
-               return fmt.Errorf("invalid status for %v: %v, want Up", n.UID, 
n.status)
+               return errors.Errorf("invalid status for %v: %v, want Up", 
n.UID, n.status)
        }
        n.status = Active
        return nil
@@ -53,7 +53,7 @@ func (n *CaptureNode) StartBundle(ctx context.Context, id 
string, data DataConte
 
 func (n *CaptureNode) ProcessElement(ctx context.Context, elm *FullValue, 
values ...ReStream) error {
        if n.status != Active {
-               return fmt.Errorf("invalid status for pardo %v: %v, want 
Active", n.UID, n.status)
+               return errors.Errorf("invalid status for pardo %v: %v, want 
Active", n.UID, n.status)
        }
 
        n.Elements = append(n.Elements, *elm)
@@ -62,7 +62,7 @@ func (n *CaptureNode) ProcessElement(ctx context.Context, elm 
*FullValue, values
 
 func (n *CaptureNode) FinishBundle(ctx context.Context) error {
        if n.status != Active {
-               return fmt.Errorf("invalid status for %v: %v, want Active", 
n.UID, n.status)
+               return errors.Errorf("invalid status for %v: %v, want Active", 
n.UID, n.status)
        }
        n.status = Up
        return nil
@@ -70,7 +70,7 @@ func (n *CaptureNode) FinishBundle(ctx context.Context) error 
{
 
 func (n *CaptureNode) Down(ctx context.Context) error {
        if n.status != Up {
-               return fmt.Errorf("invalid status for %v: %v, want Up", n.UID, 
n.status)
+               return errors.Errorf("invalid status for %v: %v, want Up", 
n.UID, n.status)
        }
        n.status = Down
        return nil
diff --git a/sdks/go/pkg/beam/core/runtime/exec/util.go 
b/sdks/go/pkg/beam/core/runtime/exec/util.go
index 1fdfd76..16682b9 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/util.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/util.go
@@ -17,8 +17,9 @@ package exec
 
 import (
        "context"
-       "fmt"
        "runtime/debug"
+
+       "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 )
 
 // GenID is a simple UnitID generator.
@@ -36,7 +37,7 @@ func (g *GenID) New() UnitID {
 func callNoPanic(ctx context.Context, fn func(context.Context) error) (err 
error) {
        defer func() {
                if r := recover(); r != nil {
-                       err = fmt.Errorf("panic: %v %s", r, debug.Stack())
+                       err = errors.Errorf("panic: %v %s", r, debug.Stack())
                }
        }()
        return fn(ctx)
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/coder.go 
b/sdks/go/pkg/beam/core/runtime/graphx/coder.go
index 43458f9..50e4591 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/coder.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/coder.go
@@ -23,6 +23,7 @@ import (
        "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx/v1"
        "github.com/apache/beam/sdks/go/pkg/beam/core/typex"
        "github.com/apache/beam/sdks/go/pkg/beam/core/util/protox"
+       "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
        pb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
        "github.com/golang/protobuf/proto"
 )
@@ -61,7 +62,7 @@ func UnmarshalCoders(ids []string, m map[string]*pb.Coder) 
([]*coder.Coder, erro
        for _, id := range ids {
                c, err := b.Coder(id)
                if err != nil {
-                       return nil, fmt.Errorf("failed to unmarshal coder %v: 
%v", id, err)
+                       return nil, err
                }
                coders = append(coders, c)
        }
@@ -105,12 +106,13 @@ func (b *CoderUnmarshaller) Coder(id string) 
(*coder.Coder, error) {
        }
        c, ok := b.models[id]
        if !ok {
-               return nil, fmt.Errorf("coder with id %v not found", id)
+               err := errors.Errorf("coder with id %v not found", id)
+               return nil, errors.WithContextf(err, "unmarshalling coder %v", 
id)
        }
 
        ret, err := b.makeCoder(c)
        if err != nil {
-               return nil, fmt.Errorf("failed to unmarshal coder %v: %v", id, 
err)
+               return nil, errors.WithContextf(err, "unmarshalling coder %v", 
id)
        }
 
        b.coders[id] = ret
@@ -157,7 +159,7 @@ func (b *CoderUnmarshaller) makeCoder(c *pb.Coder) 
(*coder.Coder, error) {
 
        case urnKVCoder:
                if len(components) != 2 {
-                       return nil, fmt.Errorf("could not unmarshal KV coder 
from %v, want exactly 2 components but have %d", c, len(components))
+                       return nil, errors.Errorf("could not unmarshal KV coder 
from %v, want exactly 2 components but have %d", c, len(components))
                }
 
                key, err := b.Coder(components[0])
@@ -206,7 +208,7 @@ func (b *CoderUnmarshaller) makeCoder(c *pb.Coder) 
(*coder.Coder, error) {
 
        case urnLengthPrefixCoder:
                if len(components) != 1 {
-                       return nil, fmt.Errorf("could not unmarshal length 
prefix coder from %v, want a single sub component but have %d", c, 
len(components))
+                       return nil, errors.Errorf("could not unmarshal length 
prefix coder from %v, want a single sub component but have %d", c, 
len(components))
                }
 
                elm, err := b.peek(components[0])
@@ -217,7 +219,7 @@ func (b *CoderUnmarshaller) makeCoder(c *pb.Coder) 
(*coder.Coder, error) {
                // the portable pipeline model directly (BEAM-2885)
                if elm.GetSpec().GetSpec().GetUrn() != "" && 
elm.GetSpec().GetSpec().GetUrn() != urnCustomCoder {
                        // TODO(herohde) 11/17/2017: revisit this restriction
-                       return nil, fmt.Errorf("could not unmarshal length 
prefix coder from %v, want a custom coder as a sub component but got %v", c, 
elm)
+                       return nil, errors.Errorf("could not unmarshal length 
prefix coder from %v, want a custom coder as a sub component but got %v", c, 
elm)
                }
 
                var ref v1.CustomCoder
@@ -233,7 +235,7 @@ func (b *CoderUnmarshaller) makeCoder(c *pb.Coder) 
(*coder.Coder, error) {
 
        case urnWindowedValueCoder:
                if len(components) != 2 {
-                       return nil, fmt.Errorf("could not unmarshal windowed 
value coder from %v, expected two components but got %d", c, len(components))
+                       return nil, errors.Errorf("could not unmarshal windowed 
value coder from %v, expected two components but got %d", c, len(components))
                }
 
                elm, err := b.Coder(components[0])
@@ -248,7 +250,7 @@ func (b *CoderUnmarshaller) makeCoder(c *pb.Coder) 
(*coder.Coder, error) {
                return &coder.Coder{Kind: coder.WindowedValue, T: t, 
Components: []*coder.Coder{elm}, Window: w}, nil
 
        case streamType:
-               return nil, fmt.Errorf("could not unmarshal stream type coder 
from %v, stream must be pair value", c)
+               return nil, errors.Errorf("could not unmarshal stream type 
coder from %v, stream must be pair value", c)
 
        case "":
                // TODO(herohde) 11/27/2017: we still see CoderRefs from 
Dataflow. Handle that
@@ -258,23 +260,23 @@ func (b *CoderUnmarshaller) makeCoder(c *pb.Coder) 
(*coder.Coder, error) {
 
                var ref CoderRef
                if err := json.Unmarshal(payload, &ref); err != nil {
-                       return nil, fmt.Errorf("could not unmarshal CoderRef 
from %v, failed to decode urn-less coder's payload \"%v\": %v", c, 
string(payload), err)
+                       return nil, errors.Wrapf(err, "could not unmarshal 
CoderRef from %v, failed to decode urn-less coder's payload \"%v\"", c, 
string(payload))
                }
                c, err := DecodeCoderRef(&ref)
                if err != nil {
-                       return nil, fmt.Errorf("could not unmarshal CoderRef 
from %v, failed to decode CoderRef \"%v\": %v", c, string(payload), err)
+                       return nil, errors.Wrapf(err, "could not unmarshal 
CoderRef from %v, failed to decode CoderRef \"%v\"", c, string(payload))
                }
                return c, nil
 
        default:
-               return nil, fmt.Errorf("could not unmarshal coder from %v, 
unknown URN %v", c, urn)
+               return nil, errors.Errorf("could not unmarshal coder from %v, 
unknown URN %v", c, urn)
        }
 }
 
 func (b *CoderUnmarshaller) peek(id string) (*pb.Coder, error) {
        c, ok := b.models[id]
        if !ok {
-               return nil, fmt.Errorf("coder with id %v not found", id)
+               return nil, errors.Errorf("coder with id %v not found", id)
        }
        return c, nil
 }
@@ -326,7 +328,7 @@ func (b *CoderMarshaller) Add(c *coder.Coder) string {
                }
                data, err := protox.EncodeBase64(ref)
                if err != nil {
-                       panic(fmt.Sprintf("Failed to marshal custom coder %v: 
%v", c, err))
+                       panic(errors.Wrapf(err, "Failed to marshal custom coder 
%v", c))
                }
                inner := b.internCoder(&pb.Coder{
                        Spec: &pb.SdkFunctionSpec{
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/dataflow.go 
b/sdks/go/pkg/beam/core/runtime/graphx/dataflow.go
index 52d5f6a..fd29f40 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/dataflow.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/dataflow.go
@@ -16,12 +16,11 @@
 package graphx
 
 import (
-       "fmt"
-
        "github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
        "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx/v1"
        "github.com/apache/beam/sdks/go/pkg/beam/core/typex"
        "github.com/apache/beam/sdks/go/pkg/beam/core/util/protox"
+       "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 )
 
 // TODO(herohde) 7/17/2018: move CoderRef to dataflowlib once Dataflow
@@ -95,7 +94,7 @@ func EncodeCoderRef(c *coder.Coder) (*CoderRef, error) {
 
        case coder.KV:
                if len(c.Components) != 2 {
-                       return nil, fmt.Errorf("bad KV: %v", c)
+                       return nil, errors.Errorf("bad KV: %v", c)
                }
 
                key, err := EncodeCoderRef(c.Components[0])
@@ -110,7 +109,7 @@ func EncodeCoderRef(c *coder.Coder) (*CoderRef, error) {
 
        case coder.CoGBK:
                if len(c.Components) < 2 {
-                       return nil, fmt.Errorf("bad CoGBK: %v", c)
+                       return nil, errors.Errorf("bad CoGBK: %v", c)
                }
 
                refs, err := EncodeCoderRefs(c.Components)
@@ -131,7 +130,7 @@ func EncodeCoderRef(c *coder.Coder) (*CoderRef, error) {
 
        case coder.WindowedValue:
                if len(c.Components) != 1 || c.Window == nil {
-                       return nil, fmt.Errorf("bad windowed value: %v", c)
+                       return nil, errors.Errorf("bad windowed value: %v", c)
                }
 
                elm, err := EncodeCoderRef(c.Components[0])
@@ -152,7 +151,7 @@ func EncodeCoderRef(c *coder.Coder) (*CoderRef, error) {
                return &CoderRef{Type: varIntType}, nil
 
        default:
-               return nil, fmt.Errorf("bad coder kind: %v", c.Kind)
+               return nil, errors.Errorf("bad coder kind: %v", c.Kind)
        }
 }
 
@@ -180,7 +179,7 @@ func DecodeCoderRef(c *CoderRef) (*coder.Coder, error) {
 
        case pairType:
                if len(c.Components) != 2 {
-                       return nil, fmt.Errorf("bad pair: %+v", c)
+                       return nil, errors.Errorf("bad pair: %+v", c)
                }
 
                key, err := DecodeCoderRef(c.Components[0])
@@ -223,12 +222,12 @@ func DecodeCoderRef(c *CoderRef) (*coder.Coder, error) {
 
        case lengthPrefixType:
                if len(c.Components) != 1 {
-                       return nil, fmt.Errorf("bad length prefix: %+v", c)
+                       return nil, errors.Errorf("bad length prefix: %+v", c)
                }
 
                var ref v1.CustomCoder
                if err := protox.DecodeBase64(c.Components[0].Type, &ref); err 
!= nil {
-                       return nil, fmt.Errorf("base64 decode for %v failed: 
%v", c.Components[0].Type, err)
+                       return nil, errors.Wrapf(err, "base64 decode for %v 
failed", c.Components[0].Type)
                }
                custom, err := decodeCustomCoder(&ref)
                if err != nil {
@@ -239,7 +238,7 @@ func DecodeCoderRef(c *CoderRef) (*coder.Coder, error) {
 
        case windowedValueType:
                if len(c.Components) != 2 {
-                       return nil, fmt.Errorf("bad windowed value: %+v", c)
+                       return nil, errors.Errorf("bad windowed value: %+v", c)
                }
 
                elm, err := DecodeCoderRef(c.Components[0])
@@ -255,10 +254,10 @@ func DecodeCoderRef(c *CoderRef) (*coder.Coder, error) {
                return &coder.Coder{Kind: coder.WindowedValue, T: t, 
Components: []*coder.Coder{elm}, Window: w}, nil
 
        case streamType:
-               return nil, fmt.Errorf("stream must be pair value: %+v", c)
+               return nil, errors.Errorf("stream must be pair value: %+v", c)
 
        default:
-               return nil, fmt.Errorf("custom coders must be length prefixed: 
%+v", c)
+               return nil, errors.Errorf("custom coders must be length 
prefixed: %+v", c)
        }
 }
 
@@ -283,7 +282,7 @@ func encodeWindowCoder(w *coder.WindowCoder) (*CoderRef, 
error) {
        case coder.IntervalWindow:
                return &CoderRef{Type: intervalWindowType}, nil
        default:
-               return nil, fmt.Errorf("bad window kind: %v", w.Kind)
+               return nil, errors.Errorf("bad window kind: %v", w.Kind)
        }
 }
 
@@ -296,6 +295,6 @@ func decodeWindowCoder(w *CoderRef) (*coder.WindowCoder, 
error) {
        case intervalWindowType:
                return coder.NewIntervalWindow(), nil
        default:
-               return nil, fmt.Errorf("bad window: %v", w.Type)
+               return nil, errors.Errorf("bad window: %v", w.Type)
        }
 }
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/serialize.go 
b/sdks/go/pkg/beam/core/runtime/graphx/serialize.go
index 5f0e944..61e5174 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/serialize.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/serialize.go
@@ -30,6 +30,7 @@ import (
        "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx/v1"
        "github.com/apache/beam/sdks/go/pkg/beam/core/typex"
        "github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
+       "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 )
 
 var genFnType = reflect.TypeOf((*func(string, reflect.Type, []byte) 
reflectx.Func)(nil)).Elem()
@@ -43,14 +44,16 @@ func EncodeMultiEdge(edge *graph.MultiEdge) (*v1.MultiEdge, 
error) {
        if edge.DoFn != nil {
                ref, err := encodeFn((*graph.Fn)(edge.DoFn))
                if err != nil {
-                       return nil, fmt.Errorf("failed to encode userfn %v, bad 
userfn: %v", edge, err)
+                       wrapped := errors.Wrap(err, "bad userfn")
+                       return nil, errors.WithContextf(wrapped, "encoding 
userfn %v", edge)
                }
                ret.Fn = ref
        }
        if edge.CombineFn != nil {
                ref, err := encodeFn((*graph.Fn)(edge.CombineFn))
                if err != nil {
-                       return nil, fmt.Errorf("failed to encode userfn %v, bad 
combinefn: %v", edge, err)
+                       wrapped := errors.Wrap(err, "bad combinefn")
+                       return nil, errors.WithContextf(wrapped, "encoding 
userfn %v", edge)
                }
                ret.Fn = ref
        }
@@ -62,14 +65,16 @@ func EncodeMultiEdge(edge *graph.MultiEdge) (*v1.MultiEdge, 
error) {
                kind := encodeInputKind(in.Kind)
                t, err := encodeFullType(in.Type)
                if err != nil {
-                       return nil, fmt.Errorf("failed to encode userfn %v, bad 
input type: %v", edge, err)
+                       wrapped := errors.Wrap(err, "bad input type")
+                       return nil, errors.WithContextf(wrapped, "encoding 
userfn %v", edge)
                }
                ret.Inbound = append(ret.Inbound, &v1.MultiEdge_Inbound{Kind: 
kind, Type: t})
        }
        for _, out := range edge.Output {
                t, err := encodeFullType(out.Type)
                if err != nil {
-                       return nil, fmt.Errorf("failed to encode userfn %v, bad 
output type: %v", edge, err)
+                       wrapped := errors.Wrap(err, "bad output type")
+                       return nil, errors.WithContextf(wrapped, "encoding 
userfn %v", edge)
                }
                ret.Outbound = append(ret.Outbound, 
&v1.MultiEdge_Outbound{Type: t})
        }
@@ -91,7 +96,8 @@ func DecodeMultiEdge(edge *v1.MultiEdge) (graph.Opcode, 
*graph.Fn, *window.Fn, [
                var err error
                u, err = decodeFn(edge.Fn)
                if err != nil {
-                       return "", nil, nil, nil, nil, fmt.Errorf("failed to 
decode userfn %v, bad function: %v", edge, err)
+                       wrapped := errors.Wrap(err, "bad function")
+                       return "", nil, nil, nil, nil, 
errors.WithContextf(wrapped, "decoding userfn %v", edge)
                }
        }
        if edge.WindowFn != nil {
@@ -100,18 +106,21 @@ func DecodeMultiEdge(edge *v1.MultiEdge) (graph.Opcode, 
*graph.Fn, *window.Fn, [
        for _, in := range edge.Inbound {
                kind, err := decodeInputKind(in.Kind)
                if err != nil {
-                       return "", nil, nil, nil, nil, fmt.Errorf("failed to 
decode userfn %v, bad input kind: %v", edge, err)
+                       wrapped := errors.Wrap(err, "bad input kind")
+                       return "", nil, nil, nil, nil, 
errors.WithContextf(wrapped, "decoding userfn %v", edge)
                }
                t, err := decodeFullType(in.Type)
                if err != nil {
-                       return "", nil, nil, nil, nil, fmt.Errorf("failed to 
decode userfn %v, bad input type: %v", edge, err)
+                       wrapped := errors.Wrap(err, "bad input type")
+                       return "", nil, nil, nil, nil, 
errors.WithContextf(wrapped, "decoding userfn %v", edge)
                }
                inbound = append(inbound, &graph.Inbound{Kind: kind, Type: t})
        }
        for _, out := range edge.Outbound {
                t, err := decodeFullType(out.Type)
                if err != nil {
-                       return "", nil, nil, nil, nil, fmt.Errorf("failed to 
decode userfn %v, bad output type: %v", edge, err)
+                       wrapped := errors.Wrap(err, "bad output type")
+                       return "", nil, nil, nil, nil, 
errors.WithContextf(wrapped, "decoding userfn %v", edge)
                }
                outbound = append(outbound, &graph.Outbound{Type: t})
        }
@@ -122,15 +131,17 @@ func DecodeMultiEdge(edge *v1.MultiEdge) (graph.Opcode, 
*graph.Fn, *window.Fn, [
 func encodeCustomCoder(c *coder.CustomCoder) (*v1.CustomCoder, error) {
        t, err := encodeType(c.Type)
        if err != nil {
-               return nil, fmt.Errorf("failed to encode custom coder %v for 
type %v: %v", c, c.Type, err)
+               return nil, errors.WithContextf(err, "encoding custom coder %v 
for type %v", c, c.Type)
        }
        enc, err := encodeUserFn(c.Enc)
        if err != nil {
-               return nil, fmt.Errorf("failed to encode custom coder %v, bad 
encoding function: %v", c, err)
+               wrapped := errors.Wrap(err, "bad encoding function")
+               return nil, errors.WithContextf(wrapped, "encoding custom coder 
%v", c)
        }
        dec, err := encodeUserFn(c.Dec)
        if err != nil {
-               return nil, fmt.Errorf("failed to encode custom coder %v, bad 
decoding function: %v", c, err)
+               wrapped := errors.Wrap(err, "bad decoding function")
+               return nil, errors.WithContextf(wrapped, "encoding custom coder 
%v", c)
        }
 
        ret := &v1.CustomCoder{
@@ -145,20 +156,22 @@ func encodeCustomCoder(c *coder.CustomCoder) 
(*v1.CustomCoder, error) {
 func decodeCustomCoder(c *v1.CustomCoder) (*coder.CustomCoder, error) {
        t, err := decodeType(c.Type)
        if err != nil {
-               return nil, fmt.Errorf("failed to decode custom coder %v for 
type %v: %v", c, c.Type, err)
+               return nil, errors.WithContextf(err, "decoding custom coder %v 
for type %v", c, c.Type)
        }
        enc, err := decodeUserFn(c.Enc)
        if err != nil {
-               return nil, fmt.Errorf("failed to decode custom coder %v, bad 
encoding function: %v", c, err)
+               wrapped := errors.Wrap(err, "bad encoding function")
+               return nil, errors.WithContextf(wrapped, "decoding custom coder 
%v", c)
        }
        dec, err := decodeUserFn(c.Dec)
        if err != nil {
-               return nil, fmt.Errorf("failed to decode custom coder %v, bad 
decoding function: %v", c, err)
+               wrapped := errors.Wrap(err, "bad decoding function")
+               return nil, errors.WithContextf(wrapped, "decoding custom coder 
%v", c)
        }
 
        ret, err := coder.NewCustomCoder(c.Name, t, enc, dec)
        if err != nil {
-               return nil, fmt.Errorf("failed to decode custom coder %v: %v", 
c, err)
+               return nil, errors.WithContextf(err, "decoding custom coder 
%v", c)
        }
        return ret, nil
 }
@@ -195,7 +208,8 @@ func encodeFn(u *graph.Fn) (*v1.Fn, error) {
                gen := reflectx.FunctionName(u.DynFn.Gen)
                t, err := encodeType(u.DynFn.T)
                if err != nil {
-                       return nil, fmt.Errorf("failed to encode dynamic DoFn 
%v, bad function type: %v", u, err)
+                       wrapped := errors.Wrap(err, "bad function type")
+                       return nil, errors.WithContextf(wrapped, "encoding 
dynamic DoFn %v", u)
                }
                return &v1.Fn{Dynfn: &v1.DynFn{
                        Name: u.DynFn.Name,
@@ -207,7 +221,8 @@ func encodeFn(u *graph.Fn) (*v1.Fn, error) {
        case u.Fn != nil:
                fn, err := encodeUserFn(u.Fn)
                if err != nil {
-                       return nil, fmt.Errorf("failed to encode DoFn %v, bad 
userfn: %v", u, err)
+                       wrapped := errors.Wrap(err, "bad userfn")
+                       return nil, errors.WithContextf(wrapped, "encoding DoFn 
%v", u)
                }
                return &v1.Fn{Fn: fn}, nil
 
@@ -215,19 +230,23 @@ func encodeFn(u *graph.Fn) (*v1.Fn, error) {
                t := reflect.TypeOf(u.Recv)
                k, ok := runtime.TypeKey(reflectx.SkipPtr(t))
                if !ok {
-                       return nil, fmt.Errorf("failed to encode structural 
DoFn %v, failed to create TypeKey for receiver type %T", u, u.Recv)
+                       err := errors.Errorf("failed to create TypeKey for 
receiver type %T", u.Recv)
+                       return nil, errors.WithContextf(err, "encoding 
structural DoFn %v", u)
                }
                if _, ok := runtime.LookupType(k); !ok {
-                       return nil, fmt.Errorf("failed to encode structural 
DoFn %v, receiver type %v must be registered", u, t)
+                       err := errors.Errorf("receiver type %v must be 
registered", t)
+                       return nil, errors.WithContextf(err, "encoding 
structural DoFn %v", u)
                }
                typ, err := encodeType(t)
                if err != nil {
-                       panic(fmt.Sprintf("Failed to encode structural DoFn %v, 
failed to encode receiver type %T: %v", u, u.Recv, err))
+                       wrapped := errors.Wrapf(err, "failed to encode receiver 
type %T", u.Recv)
+                       panic(errors.WithContextf(wrapped, "encoding structural 
DoFn %v", u))
                }
 
                data, err := json.Marshal(u.Recv)
                if err != nil {
-                       return nil, fmt.Errorf("failed to encode structural 
DoFn %v, failed to marshal receiver %v: %v", u, u.Recv, err)
+                       wrapped := errors.Wrapf(err, "failed to marshal 
receiver %v", u.Recv)
+                       return nil, errors.WithContextf(wrapped, "encoding 
structural DoFn %v", u)
                }
                return &v1.Fn{Type: typ, Opt: string(data)}, nil
 
@@ -240,12 +259,14 @@ func decodeFn(u *v1.Fn) (*graph.Fn, error) {
        if u.Dynfn != nil {
                gen, err := runtime.ResolveFunction(u.Dynfn.Gen, genFnType)
                if err != nil {
-                       return nil, fmt.Errorf("failed to decode dynamic DoFn 
%v, bad symbol %v: %v", u, u.Dynfn.Gen, err)
+                       wrapped := errors.Wrapf(err, "bad symbol %v", 
u.Dynfn.Gen)
+                       return nil, errors.WithContextf(wrapped, "decoding 
dynamic DoFn %v", u)
                }
 
                t, err := decodeType(u.Dynfn.Type)
                if err != nil {
-                       return nil, fmt.Errorf("failed to decode dynamic DoFn 
%v, bad type: %v", u, err)
+                       wrapped := errors.Wrap(err, "bad type")
+                       return nil, errors.WithContextf(wrapped, "failed to 
decode dynamic DoFn %v", u)
                }
                return graph.NewFn(&graph.DynFn{
                        Name: u.Dynfn.Name,
@@ -257,22 +278,26 @@ func decodeFn(u *v1.Fn) (*graph.Fn, error) {
        if u.Fn != nil {
                fn, err := decodeUserFn(u.Fn)
                if err != nil {
-                       return nil, fmt.Errorf("failed to decode DoFn %v, 
failed to decode userfn: %v", u, err)
+                       wrapped := errors.Wrap(err, "failed to decode userfn")
+                       return nil, errors.WithContextf(wrapped, "decoding DoFn 
%v", u)
                }
                fx, err := funcx.New(reflectx.MakeFunc(fn))
                if err != nil {
-                       return nil, fmt.Errorf("failed to decode DoFn %v, 
failed to construct userfn: %v", u, err)
+                       wrapped := errors.Wrap(err, "failed to construct 
userfn")
+                       return nil, errors.WithContextf(wrapped, "decoding DoFn 
%v", u)
                }
                return &graph.Fn{Fn: fx}, nil
        }
 
        t, err := decodeType(u.Type)
        if err != nil {
-               return nil, fmt.Errorf("failed to decode structural DoFn %v, 
bad type: %v", u, err)
+               wrapped := errors.Wrap(err, "bad type")
+               return nil, errors.WithContextf(wrapped, "decoding structural 
DoFn %v", u)
        }
        fn, err := reflectx.UnmarshalJSON(t, u.Opt)
        if err != nil {
-               return nil, fmt.Errorf("failed to decode structural DoFn %v, 
bad struct encoding: %v", u, err)
+               wrapped := errors.Wrap(err, "bad struct encoding")
+               return nil, errors.WithContextf(wrapped, "decoding structural 
DoFn %v", u)
        }
        return graph.NewFn(fn)
 }
@@ -286,7 +311,8 @@ func encodeUserFn(u *funcx.Fn) (*v1.UserFn, error) {
        symbol := u.Fn.Name()
        t, err := encodeType(u.Fn.Type())
        if err != nil {
-               return nil, fmt.Errorf("failed to encode userfn %v, bad 
function type: %v", u, err)
+               wrapped := errors.Wrap(err, "bad function type")
+               return nil, errors.WithContextf(wrapped, "encoding userfn %v", 
u)
        }
        return &v1.UserFn{Name: symbol, Type: t}, nil
 }
@@ -319,7 +345,8 @@ func encodeFullType(t typex.FullType) (*v1.FullType, error) 
{
 
        prim, err := encodeType(t.Type())
        if err != nil {
-               return nil, fmt.Errorf("failed to encode full type %v, bad 
type: %v", t, err)
+               wrapped := errors.Wrap(err, "bad type")
+               return nil, errors.WithContextf(wrapped, "encoding full type 
%v", t)
        }
        return &v1.FullType{Type: prim, Components: components}, nil
 }
@@ -336,7 +363,8 @@ func decodeFullType(t *v1.FullType) (typex.FullType, error) 
{
 
        prim, err := decodeType(t.Type)
        if err != nil {
-               return nil, fmt.Errorf("failed to decode full type %v, bad 
type: %v", t, err)
+               wrapped := errors.Wrap(err, "bad type")
+               return nil, errors.WithContextf(wrapped, "decoding full type 
%v", t)
        }
        return typex.New(prim, components...), nil
 }
@@ -388,7 +416,8 @@ func encodeType(t reflect.Type) (*v1.Type, error) {
        case reflect.Slice:
                elm, err := encodeType(t.Elem())
                if err != nil {
-                       return nil, fmt.Errorf("failed to encode slice %v, bad 
element type: %v", t, err)
+                       wrapped := errors.Wrap(err, "bad element type")
+                       return nil, errors.WithContextf(wrapped, "encoding 
slice %v", t)
                }
                return &v1.Type{Kind: v1.Type_SLICE, Element: elm}, nil
 
@@ -399,7 +428,8 @@ func encodeType(t reflect.Type) (*v1.Type, error) {
 
                        fType, err := encodeType(f.Type)
                        if err != nil {
-                               return nil, fmt.Errorf("failed to encode struct 
%v, bad field type: %v", t, err)
+                               wrapped := errors.Wrap(err, "bad field type")
+                               return nil, errors.WithContextf(wrapped, 
"encoding struct %v", t)
                        }
 
                        field := &v1.Type_StructField{
@@ -420,7 +450,8 @@ func encodeType(t reflect.Type) (*v1.Type, error) {
                for i := 0; i < t.NumIn(); i++ {
                        param, err := encodeType(t.In(i))
                        if err != nil {
-                               return nil, fmt.Errorf("failed to encode 
function %v, bad parameter type: %v", t, err)
+                               wrapped := errors.Wrap(err, "bad parameter 
type")
+                               return nil, errors.WithContextf(wrapped, 
"encoding function %v", t)
                        }
                        in = append(in, param)
                }
@@ -428,7 +459,8 @@ func encodeType(t reflect.Type) (*v1.Type, error) {
                for i := 0; i < t.NumOut(); i++ {
                        ret, err := encodeType(t.Out(i))
                        if err != nil {
-                               return nil, fmt.Errorf("failed to encode 
function %v, bad return type: %v", t, err)
+                               wrapped := errors.Wrap(err, "bad return type")
+                               return nil, errors.WithContextf(wrapped, 
"encoding function %v", t)
                        }
                        out = append(out, ret)
                }
@@ -437,7 +469,8 @@ func encodeType(t reflect.Type) (*v1.Type, error) {
        case reflect.Chan:
                elm, err := encodeType(t.Elem())
                if err != nil {
-                       return nil, fmt.Errorf("failed to encode channel %v, 
bad element type: %v", t, err)
+                       wrapped := errors.Wrap(err, "bad element type")
+                       return nil, errors.WithContextf(wrapped, "encoding 
channel %v", t)
                }
                dir := encodeChanDir(t.ChanDir())
                return &v1.Type{Kind: v1.Type_CHAN, Element: elm, ChanDir: 
dir}, nil
@@ -445,12 +478,13 @@ func encodeType(t reflect.Type) (*v1.Type, error) {
        case reflect.Ptr:
                elm, err := encodeType(t.Elem())
                if err != nil {
-                       return nil, fmt.Errorf("failed to encode pointer %v, 
bad base type: %v", t, err)
+                       wrapped := errors.Wrap(err, "bad base type")
+                       return nil, errors.WithContextf(wrapped, "encoding 
pointer %v", t)
                }
                return &v1.Type{Kind: v1.Type_PTR, Element: elm}, nil
 
        default:
-               return nil, fmt.Errorf("unencodable type %v", t)
+               return nil, errors.Errorf("unencodable type %v", t)
        }
 }
 
@@ -496,7 +530,8 @@ func tryEncodeSpecial(t reflect.Type) (v1.Type_Special, 
bool) {
 
 func decodeType(t *v1.Type) (reflect.Type, error) {
        if t == nil {
-               return nil, fmt.Errorf("failed to decode type %v, empty type", 
t)
+               err := errors.New("empty type")
+               return nil, errors.WithContextf(err, "decoding type %v", t)
        }
 
        switch t.Kind {
@@ -532,7 +567,8 @@ func decodeType(t *v1.Type) (reflect.Type, error) {
        case v1.Type_SLICE:
                elm, err := decodeType(t.GetElement())
                if err != nil {
-                       return nil, fmt.Errorf("failed to decode type %v, bad 
element: %v", t, err)
+                       wrapped := errors.Wrap(err, "bad element")
+                       return nil, errors.WithContextf(wrapped, "failed to 
decode type %v, bad element: %v", t)
                }
                return reflect.SliceOf(elm), nil
 
@@ -541,7 +577,8 @@ func decodeType(t *v1.Type) (reflect.Type, error) {
                for _, f := range t.Fields {
                        fType, err := decodeType(f.Type)
                        if err != nil {
-                               return nil, fmt.Errorf("failed to decode type 
%v, bad field type: %v", t, err)
+                               wrapped := errors.Wrap(err, "bad field type")
+                               return nil, errors.WithContextf(wrapped, 
"failed to decode type %v, bad field type: %v", t)
                        }
 
                        field := reflect.StructField{
@@ -560,48 +597,56 @@ func decodeType(t *v1.Type) (reflect.Type, error) {
        case v1.Type_FUNC:
                in, err := decodeTypes(t.GetParameterTypes())
                if err != nil {
-                       return nil, fmt.Errorf("failed to decode type %v, bad 
parameter type: %v", t, err)
+                       wrapped := errors.Wrap(err, "bad parameter type")
+                       return nil, errors.WithContextf(wrapped, "decoding type 
%v", t)
                }
                out, err := decodeTypes(t.GetReturnTypes())
                if err != nil {
-                       return nil, fmt.Errorf("failed to decode type %v, bad 
return type: %v", t, err)
+                       wrapped := errors.Wrap(err, "bad return type")
+                       return nil, errors.WithContextf(wrapped, "decoding type 
%v", t)
                }
                return reflect.FuncOf(in, out, t.GetIsVariadic()), nil
 
        case v1.Type_CHAN:
                elm, err := decodeType(t.GetElement())
                if err != nil {
-                       return nil, fmt.Errorf("failed to decode type %v, bad 
element: %v", t, err)
+                       wrapped := errors.Wrap(err, "bad element")
+                       return nil, errors.WithContextf(wrapped, "decoding type 
%v", t)
                }
                dir, err := decodeChanDir(t.GetChanDir())
                if err != nil {
-                       return nil, fmt.Errorf("failed to decode type %v, bad 
channel direction: %v", t, err)
+                       wrapped := errors.Wrap(err, "bad channel direction")
+                       return nil, errors.WithContextf(wrapped, "decoding type 
%v", t)
                }
                return reflect.ChanOf(dir, elm), nil
 
        case v1.Type_PTR:
                elm, err := decodeType(t.GetElement())
                if err != nil {
-                       return nil, fmt.Errorf("failed to decode type %v, bad 
element: %v", t, err)
+                       wrapped := errors.Wrap(err, "bad element")
+                       return nil, errors.WithContextf(wrapped, "decoding type 
%v", t)
                }
                return reflect.PtrTo(elm), nil
 
        case v1.Type_SPECIAL:
                ret, err := decodeSpecial(t.Special)
                if err != nil {
-                       return nil, fmt.Errorf("failed to decode type %v, bad 
element: %v", t, err)
+                       wrapped := errors.Wrap(err, "bad element")
+                       return nil, errors.WithContextf(wrapped, "decoding type 
%v", t)
                }
                return ret, nil
 
        case v1.Type_EXTERNAL:
                ret, ok := runtime.LookupType(t.ExternalKey)
                if !ok {
-                       return nil, fmt.Errorf("failed to decode type %v, 
external key not found %v", t, t.ExternalKey)
+                       err := errors.Errorf("external key not found %v", 
t.ExternalKey)
+                       return nil, errors.WithContextf(err, "decoding type 
%v", t)
                }
                return ret, nil
 
        default:
-               return nil, fmt.Errorf("failed to decode type %v, unexpected 
type kind %v", t, t.Kind)
+               err := errors.Errorf("unexpected type kind %v", t.Kind)
+               return nil, errors.WithContextf(err, "failed to decode type 
%v", t)
        }
 }
 
@@ -641,7 +686,7 @@ func decodeSpecial(s v1.Type_Special) (reflect.Type, error) 
{
                return typex.ZType, nil
 
        default:
-               return nil, fmt.Errorf("failed to decode special type, unknown 
type %v", s)
+               return nil, errors.Errorf("failed to decode special type, 
unknown type %v", s)
        }
 }
 
@@ -695,7 +740,8 @@ func decodeChanDir(dir v1.Type_ChanDir) (reflect.ChanDir, 
error) {
        case v1.Type_BOTH:
                return reflect.BothDir, nil
        default:
-               return reflect.BothDir, fmt.Errorf("failed to decode channel 
direction, invalid value: %v", dir)
+               err := errors.Errorf("invalid value: %v", dir)
+               return reflect.BothDir, errors.WithContext(err, "decoding 
channel direction")
        }
 }
 
@@ -737,6 +783,7 @@ func decodeInputKind(k v1.MultiEdge_Inbound_InputKind) 
(graph.InputKind, error)
        case v1.MultiEdge_Inbound_REITER:
                return graph.ReIter, nil
        default:
-               return graph.Main, fmt.Errorf("failed to decode input kind, 
invalid value: %v", k)
+               err := errors.Errorf("invalid value: %v", k)
+               return graph.Main, errors.WithContext(err, "decoding input 
kind")
        }
 }
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/translate.go 
b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
index 16328a7..3798be4 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/translate.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
@@ -24,6 +24,7 @@ import (
        v1 "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx/v1"
        "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/pipelinex"
        "github.com/apache/beam/sdks/go/pkg/beam/core/util/protox"
+       "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
        pb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
        "github.com/golang/protobuf/proto"
        "github.com/golang/protobuf/ptypes"
@@ -567,7 +568,7 @@ func makeWindowCoder(w *window.Fn) *coder.WindowCoder {
 func mustEncodeMultiEdgeBase64(edge *graph.MultiEdge) string {
        ref, err := EncodeMultiEdge(edge)
        if err != nil {
-               panic(fmt.Sprintf("Failed to serialize %v: %v", edge, err))
+               panic(errors.Wrapf(err, "Failed to serialize %v", edge))
        }
        return protox.MustEncodeBase64(&v1.TransformPayload{
                Urn:  URNDoFn,
diff --git a/sdks/go/pkg/beam/core/runtime/harness/datamgr.go 
b/sdks/go/pkg/beam/core/runtime/harness/datamgr.go
index 5c25be0..2d985d4 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/datamgr.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/datamgr.go
@@ -17,12 +17,12 @@ package harness
 
 import (
        "context"
-       "fmt"
        "io"
        "sync"
        "time"
 
        "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec"
+       "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
        "github.com/apache/beam/sdks/go/pkg/beam/log"
        pb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1"
 )
@@ -69,7 +69,7 @@ func (s *ScopedDataManager) open(ctx context.Context, port 
exec.Port) (*DataChan
        s.mu.Lock()
        if s.closed {
                s.mu.Unlock()
-               return nil, fmt.Errorf("instruction %v no longer processing", 
s.instID)
+               return nil, errors.Errorf("instruction %v no longer 
processing", s.instID)
        }
        local := s.mgr
        s.mu.Unlock()
@@ -147,12 +147,12 @@ type DataChannel struct {
 func newDataChannel(ctx context.Context, port exec.Port) (*DataChannel, error) 
{
        cc, err := dial(ctx, port.URL, 15*time.Second)
        if err != nil {
-               return nil, fmt.Errorf("failed to connect: %v", err)
+               return nil, errors.Wrap(err, "failed to connect")
        }
        client, err := pb.NewBeamFnDataClient(cc).Data(ctx)
        if err != nil {
                cc.Close()
-               return nil, fmt.Errorf("failed to connect to data service: %v", 
err)
+               return nil, errors.Wrap(err, "failed to connect to data 
service")
        }
        return makeDataChannel(ctx, port.URL, client), nil
 }
@@ -187,7 +187,7 @@ func (c *DataChannel) read(ctx context.Context) {
                                log.Warnf(ctx, "DataChannel %v closed", c.id)
                                return
                        }
-                       panic(fmt.Errorf("channel %v bad: %v", c.id, err))
+                       panic(errors.Wrapf(err, "channel %v bad", c.id))
                }
 
                recordStreamReceive(msg)
@@ -374,7 +374,7 @@ func (w *dataWriter) Write(p []byte) (n int, err error) {
                l := len(w.buf)
                // We can't fit this message into the buffer. We need to flush 
the buffer
                if err := w.Flush(); err != nil {
-                       return 0, fmt.Errorf("datamgr.go: error flushing buffer 
of length %d: %v", l, err)
+                       return 0, errors.Wrapf(err, "datamgr.go: error flushing 
buffer of length %d", l)
                }
        }
 
diff --git a/sdks/go/pkg/beam/core/runtime/harness/harness.go 
b/sdks/go/pkg/beam/core/runtime/harness/harness.go
index 08ee7dc..0c40f27 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/harness.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/harness.go
@@ -25,6 +25,7 @@ import (
 
        "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec"
        "github.com/apache/beam/sdks/go/pkg/beam/core/util/hooks"
+       "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
        "github.com/apache/beam/sdks/go/pkg/beam/log"
        fnpb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1"
        "github.com/apache/beam/sdks/go/pkg/beam/util/grpcx"
@@ -49,13 +50,13 @@ func Main(ctx context.Context, loggingEndpoint, 
controlEndpoint string) error {
 
        conn, err := dial(ctx, controlEndpoint, 60*time.Second)
        if err != nil {
-               return fmt.Errorf("Failed to connect: %v", err)
+               return errors.Wrap(err, "failed to connect")
        }
        defer conn.Close()
 
        client, err := fnpb.NewBeamFnControlClient(conn).Control(ctx)
        if err != nil {
-               return fmt.Errorf("Failed to connect to control service: %v", 
err)
+               return errors.Wrapf(err, "failed to connect to control service")
        }
 
        log.Debugf(ctx, "Successfully connected to control @ %v", 
controlEndpoint)
@@ -101,7 +102,7 @@ func Main(ctx context.Context, loggingEndpoint, 
controlEndpoint string) error {
                                recordFooter()
                                return nil
                        }
-                       return fmt.Errorf("recv failed: %v", err)
+                       return errors.Wrapf(err, "recv failed")
                }
 
                // Launch a goroutine to handle the control message.
diff --git a/sdks/go/pkg/beam/core/runtime/harness/logging.go 
b/sdks/go/pkg/beam/core/runtime/harness/logging.go
index f7608cd..ad24148 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/logging.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/logging.go
@@ -22,6 +22,7 @@ import (
        "runtime"
        "time"
 
+       "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
        "github.com/apache/beam/sdks/go/pkg/beam/log"
        pb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1"
        "github.com/golang/protobuf/ptypes"
@@ -148,5 +149,5 @@ func (w *remoteWriter) connect(ctx context.Context) error {
 
                // fmt.Fprintf(os.Stderr, "SENT: %v\n", msg)
        }
-       return fmt.Errorf("internal: buffer closed?")
+       return errors.New("internal: buffer closed?")
 }
diff --git a/sdks/go/pkg/beam/core/runtime/harness/session.go 
b/sdks/go/pkg/beam/core/runtime/harness/session.go
index 7fecf9f..d783533 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/session.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/session.go
@@ -24,6 +24,7 @@ import (
        "github.com/apache/beam/sdks/go/pkg/beam/core/runtime"
        "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness/session"
        "github.com/apache/beam/sdks/go/pkg/beam/core/util/hooks"
+       "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
        pb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1"
        "github.com/golang/protobuf/proto"
 )
@@ -68,7 +69,7 @@ func recordMessage(opcode session.Kind, pb *session.Entry) 
error {
        body := bufPool.Get().(*proto.Buffer)
        defer bufPool.Put(body)
        if err := body.Marshal(pb); err != nil {
-               return fmt.Errorf("Unable to marshal message for session 
recording: %v", err)
+               return errors.Wrap(err, "unable to marshal message for session 
recording")
        }
 
        eh := &session.EntryHeader{
@@ -79,13 +80,13 @@ func recordMessage(opcode session.Kind, pb *session.Entry) 
error {
        hdr := bufPool.Get().(*proto.Buffer)
        defer bufPool.Put(hdr)
        if err := hdr.Marshal(eh); err != nil {
-               return fmt.Errorf("Unable to marshal message header for session 
recording: %v", err)
+               return errors.Wrap(err, "unable to marshal message header for 
session recording")
        }
 
        l := bufPool.Get().(*proto.Buffer)
        defer bufPool.Put(l)
        if err := l.EncodeVarint(uint64(len(hdr.Bytes()))); err != nil {
-               return fmt.Errorf("Unable to write entry header length: %v", 
err)
+               return errors.Wrap(err, "unable to write entry header length")
        }
 
        // Acquire the lock to write the file.
@@ -93,13 +94,13 @@ func recordMessage(opcode session.Kind, pb *session.Entry) 
error {
        defer sessionLock.Unlock()
 
        if _, err := capture.Write(l.Bytes()); err != nil {
-               return fmt.Errorf("Unable to write entry header length: %v", 
err)
+               return errors.Wrap(err, "unable to write entry header length")
        }
        if _, err := capture.Write(hdr.Bytes()); err != nil {
-               return fmt.Errorf("Unable to write entry header: %v", err)
+               return errors.Wrap(err, "unable to write entry header")
        }
        if _, err := capture.Write(body.Bytes()); err != nil {
-               return fmt.Errorf("Unable to write entry body: %v", err)
+               return errors.Wrap(err, "unable to write entry body")
        }
        return nil
 }
diff --git a/sdks/go/pkg/beam/core/runtime/harness/statemgr.go 
b/sdks/go/pkg/beam/core/runtime/harness/statemgr.go
index 6445618..f5d3102 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/statemgr.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/statemgr.go
@@ -24,10 +24,10 @@ import (
        "time"
 
        "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec"
+       "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
        "github.com/apache/beam/sdks/go/pkg/beam/log"
        pb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1"
        "github.com/golang/protobuf/proto"
-       "github.com/pkg/errors"
 )
 
 // ScopedSideInputReader scopes the global gRPC state manager to a single 
instruction
@@ -55,7 +55,7 @@ func (s *ScopedSideInputReader) Open(ctx context.Context, id 
exec.StreamID, key,
        s.mu.Lock()
        if s.closed {
                s.mu.Unlock()
-               return nil, fmt.Errorf("instruction %v no longer processing", 
s.instID)
+               return nil, errors.Errorf("instruction %v no longer 
processing", s.instID)
        }
        ret := newSideInputReader(ch, id.Target, s.instID, key, w)
        s.opened = append(s.opened, ret)
@@ -67,7 +67,7 @@ func (s *ScopedSideInputReader) open(ctx context.Context, 
port exec.Port) (*Stat
        s.mu.Lock()
        if s.closed {
                s.mu.Unlock()
-               return nil, fmt.Errorf("instruction %v no longer processing", 
s.instID)
+               return nil, errors.Errorf("instruction %v no longer 
processing", s.instID)
        }
        local := s.mgr
        s.mu.Unlock()
@@ -129,7 +129,7 @@ func (r *sideInputReader) Read(buf []byte) (int, error) {
                r.mu.Lock()
                if r.closed {
                        r.mu.Unlock()
-                       return 0, fmt.Errorf("side input closed")
+                       return 0, errors.New("side input closed")
                }
                local := r.ch
                r.mu.Unlock()
@@ -219,12 +219,12 @@ type StateChannel struct {
 func newStateChannel(ctx context.Context, port exec.Port) (*StateChannel, 
error) {
        cc, err := dial(ctx, port.URL, 15*time.Second)
        if err != nil {
-               return nil, fmt.Errorf("failed to connect: %v", err)
+               return nil, errors.Wrap(err, "failed to connect")
        }
        client, err := pb.NewBeamFnStateClient(cc).State(ctx)
        if err != nil {
                cc.Close()
-               return nil, fmt.Errorf("failed to connect to data service: %v", 
err)
+               return nil, errors.Wrap(err, "failed to connect to data 
service")
        }
 
        ret := &StateChannel{
@@ -248,7 +248,7 @@ func (c *StateChannel) read(ctx context.Context) {
                                log.Warnf(ctx, "StateChannel %v closed", c.id)
                                return
                        }
-                       panic(fmt.Errorf("state channel %v bad: %v", c.id, err))
+                       panic(errors.Wrapf(err, "state channel %v bad", c.id))
                }
 
                c.mu.Lock()
diff --git a/sdks/go/pkg/beam/core/runtime/pipelinex/replace.go 
b/sdks/go/pkg/beam/core/runtime/pipelinex/replace.go
index 56f14b3..ad1175c 100644
--- a/sdks/go/pkg/beam/core/runtime/pipelinex/replace.go
+++ b/sdks/go/pkg/beam/core/runtime/pipelinex/replace.go
@@ -22,6 +22,7 @@ import (
        "sort"
 
        "github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
+       "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
        pb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
 )
 
@@ -44,7 +45,7 @@ func Update(p *pb.Pipeline, values *pb.Components) 
(*pb.Pipeline, error) {
 // subtransform list.
 func Normalize(p *pb.Pipeline) (*pb.Pipeline, error) {
        if len(p.GetComponents().GetTransforms()) == 0 {
-               return nil, fmt.Errorf("empty pipeline")
+               return nil, errors.New("empty pipeline")
        }
 
        ret := shallowClonePipeline(p)
diff --git a/sdks/go/pkg/beam/core/runtime/symbols.go 
b/sdks/go/pkg/beam/core/runtime/symbols.go
index f5089f1..6dbefad 100644
--- a/sdks/go/pkg/beam/core/runtime/symbols.go
+++ b/sdks/go/pkg/beam/core/runtime/symbols.go
@@ -16,13 +16,13 @@
 package runtime
 
 import (
-       "fmt"
        "os"
        "reflect"
        "sync"
 
        "github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
        "github.com/apache/beam/sdks/go/pkg/beam/core/util/symtab"
+       "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 )
 
 var (
@@ -105,5 +105,5 @@ func ResolveFunction(name string, t reflect.Type) 
(interface{}, error) {
 type failResolver bool
 
 func (p failResolver) Sym2Addr(name string) (uintptr, error) {
-       return 0, fmt.Errorf("%v not found. Use runtime.RegisterFunction in 
unit tests", name)
+       return 0, errors.Errorf("%v not found. Use runtime.RegisterFunction in 
unit tests", name)
 }
diff --git a/sdks/go/pkg/beam/core/typex/fulltype.go 
b/sdks/go/pkg/beam/core/typex/fulltype.go
index 5066023..a96e6f9 100644
--- a/sdks/go/pkg/beam/core/typex/fulltype.go
+++ b/sdks/go/pkg/beam/core/typex/fulltype.go
@@ -21,6 +21,8 @@ import (
        "fmt"
        "reflect"
        "strings"
+
+       "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 )
 
 // FullType represents the tree structure of data types processed by the graph.
@@ -298,7 +300,7 @@ func IsBound(t FullType) bool {
 // produce {"T" -> string}.
 func Bind(types, models []FullType) (map[string]reflect.Type, error) {
        if len(types) != len(models) {
-               return nil, fmt.Errorf("typex.Bind: invalid number of models: 
%v, want %v", len(models), len(types))
+               return nil, errors.Errorf("typex.Bind: invalid number of 
models: %v, want %v", len(models), len(types))
        }
 
        m := make(map[string]reflect.Type)
@@ -307,7 +309,7 @@ func Bind(types, models []FullType) 
(map[string]reflect.Type, error) {
                model := models[i]
 
                if !IsStructurallyAssignable(model, t) {
-                       return nil, fmt.Errorf("typex.Bind: %v is not 
assignable to %v", model, t)
+                       return nil, errors.Errorf("typex.Bind: %v is not 
assignable to %v", model, t)
                }
                if err := walk(t, model, m); err != nil {
                        return nil, err
@@ -326,7 +328,7 @@ func walk(t, model FullType, m map[string]reflect.Type) 
error {
 
                name := t.Type().Name()
                if current, ok := m[name]; ok && current != model.Type() {
-                       return fmt.Errorf("bind conflict for %v: %v != %v", 
name, current, model.Type())
+                       return errors.Errorf("bind conflict for %v: %v != %v", 
name, current, model.Type())
                }
                m[name] = model.Type()
                return nil
@@ -363,7 +365,7 @@ func substitute(t FullType, m map[string]reflect.Type) 
(FullType, error) {
                name := t.Type().Name()
                repl, ok := m[name]
                if !ok {
-                       return nil, fmt.Errorf("substituting type %v: type not 
bound", name)
+                       return nil, errors.Errorf("substituting type %v: type 
not bound", name)
                }
                return New(repl), nil
        case Container:
@@ -374,7 +376,7 @@ func substitute(t FullType, m map[string]reflect.Type) 
(FullType, error) {
                if IsList(t.Type()) {
                        return New(reflect.SliceOf(comp[0].Type()), comp...), 
nil
                }
-               return nil, fmt.Errorf("unexpected aggregate %v, only slices 
allowed", t)
+               return nil, errors.Errorf("unexpected aggregate %v, only slices 
allowed", t)
        case Composite:
                comp, err := substituteList(t.Components(), m)
                if err != nil {
diff --git a/sdks/go/pkg/beam/core/util/dot/dot.go 
b/sdks/go/pkg/beam/core/util/dot/dot.go
index e92e501..d4d09a2 100644
--- a/sdks/go/pkg/beam/core/util/dot/dot.go
+++ b/sdks/go/pkg/beam/core/util/dot/dot.go
@@ -22,6 +22,7 @@ import (
        "text/template"
 
        "github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+       "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 )
 
 var (
@@ -114,14 +115,14 @@ func Render(edges []*graph.MultiEdge, nodes 
[]*graph.Node, w io.Writer) error {
                for _, ib := range edge.Input {
                        err := edgeTmpl.Execute(w, struct{ From, To string 
}{ib.From.String(), e})
                        if err != nil {
-                               return fmt.Errorf("render DOT failed: %v", err)
+                               return errors.Wrap(err, "render DOT failed")
                        }
                }
                for _, ob := range edge.Output {
                        uniqNodes[ob.To].From = ob
                        err := edgeTmpl.Execute(w, struct{ From, To string }{e, 
ob.To.String()})
                        if err != nil {
-                               return fmt.Errorf("render DOT failed: %v", err)
+                               return errors.Wrap(err, "render DOT failed")
                        }
                }
        }
diff --git a/sdks/go/pkg/beam/core/util/hooks/hooks.go 
b/sdks/go/pkg/beam/core/util/hooks/hooks.go
index db31792..9c873aa 100644
--- a/sdks/go/pkg/beam/core/util/hooks/hooks.go
+++ b/sdks/go/pkg/beam/core/util/hooks/hooks.go
@@ -31,10 +31,10 @@ import (
        "context"
        "encoding/csv"
        "encoding/json"
-       "fmt"
        "strings"
 
        "github.com/apache/beam/sdks/go/pkg/beam/core/runtime"
+       "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
        "github.com/apache/beam/sdks/go/pkg/beam/log"
        fnpb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1"
 )
@@ -126,7 +126,7 @@ func SerializeHooksToOptions() {
        data, err := json.Marshal(enabledHooks)
        if err != nil {
                // Shouldn't happen, since all the data is strings.
-               panic(fmt.Sprintf("Couldn't serialize hooks: %v", err))
+               panic(errors.Wrap(err, "Couldn't serialize hooks"))
        }
        runtime.GlobalOptions.Set("hooks", string(data))
 }
@@ -141,7 +141,7 @@ func DeserializeHooksFromOptions(ctx context.Context) {
        }
        if err := json.Unmarshal([]byte(cfg), &enabledHooks); err != nil {
                // Shouldn't happen, since all the data is strings.
-               panic(fmt.Sprintf("DeserializeHooks failed on input %q: %v", 
cfg, err))
+               panic(errors.Wrapf(err, "DeserializeHooks failed on input %q", 
cfg))
        }
 
        for h, opts := range enabledHooks {
@@ -155,7 +155,7 @@ func DeserializeHooksFromOptions(ctx context.Context) {
 // if a hook wants to compose behavior.
 func EnableHook(name string, args ...string) error {
        if _, ok := hookRegistry[name]; !ok {
-               return fmt.Errorf("EnableHook: hook %s not found", name)
+               return errors.Errorf("EnableHook: hook %s not found", name)
        }
        enabledHooks[name] = args
        return nil
@@ -176,7 +176,7 @@ func Encode(name string, opts []string) string {
        w := csv.NewWriter(&cfg)
        // This should never happen since a bytes.Buffer doesn't fail to write.
        if err := w.Write(append([]string{name}, opts...)); err != nil {
-               panic(fmt.Sprintf("error encoding arguments: %v", err))
+               panic(errors.Wrap(err, "error encoding arguments"))
        }
        w.Flush()
        return cfg.String()
@@ -189,7 +189,7 @@ func Decode(in string) (string, []string) {
        r := csv.NewReader(strings.NewReader(in))
        s, err := r.Read()
        if err != nil {
-               panic(fmt.Sprintf("malformed input for decoding: %s %v", in, 
err))
+               panic(errors.Wrapf(err, "malformed input for decoding: %s", in))
        }
        return s[0], s[1:]
 }
diff --git a/sdks/go/pkg/beam/core/util/ioutilx/read.go 
b/sdks/go/pkg/beam/core/util/ioutilx/read.go
index 106eff0..cf3568c 100644
--- a/sdks/go/pkg/beam/core/util/ioutilx/read.go
+++ b/sdks/go/pkg/beam/core/util/ioutilx/read.go
@@ -17,9 +17,10 @@
 package ioutilx
 
 import (
-       "errors"
        "io"
        "unsafe"
+
+       "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 )
 
 // ReadN reads exactly N bytes from the reader. Fails otherwise.
diff --git a/sdks/go/pkg/beam/core/util/protox/any.go 
b/sdks/go/pkg/beam/core/util/protox/any.go
index a4ed3c1..0568f81 100644
--- a/sdks/go/pkg/beam/core/util/protox/any.go
+++ b/sdks/go/pkg/beam/core/util/protox/any.go
@@ -16,8 +16,7 @@
 package protox
 
 import (
-       "fmt"
-
+       "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
        "github.com/golang/protobuf/proto"
        protobuf "github.com/golang/protobuf/ptypes/any"
        protobufw "github.com/golang/protobuf/ptypes/wrappers"
@@ -30,7 +29,7 @@ const (
 // Unpack decodes a proto.
 func Unpack(data *protobuf.Any, url string, ret proto.Message) error {
        if data.TypeUrl != url {
-               return fmt.Errorf("Bad type: %v, want %v", data.TypeUrl, url)
+               return errors.Errorf("bad type: %v, want %v", data.TypeUrl, url)
        }
        return proto.Unmarshal(data.Value, ret)
 }
@@ -75,12 +74,12 @@ func PackBase64Proto(in proto.Message) (*protobuf.Any, 
error) {
 // UnpackBytes removes the BytesValue wrapper.
 func UnpackBytes(data *protobuf.Any) ([]byte, error) {
        if data.TypeUrl != bytesValueTypeURL {
-               return nil, fmt.Errorf("Bad type: %v, want %v", data.TypeUrl, 
bytesValueTypeURL)
+               return nil, errors.Errorf("bad type: %v, want %v", 
data.TypeUrl, bytesValueTypeURL)
        }
 
        var buf protobufw.BytesValue
        if err := proto.Unmarshal(data.Value, &buf); err != nil {
-               return nil, fmt.Errorf("BytesValue unmarshal failed: %v", err)
+               return nil, errors.Wrap(err, "BytesValue unmarshal failed")
        }
        return buf.Value, nil
 }
diff --git a/sdks/go/pkg/beam/core/util/protox/base64.go 
b/sdks/go/pkg/beam/core/util/protox/base64.go
index 90fb13f..296d397 100644
--- a/sdks/go/pkg/beam/core/util/protox/base64.go
+++ b/sdks/go/pkg/beam/core/util/protox/base64.go
@@ -17,8 +17,8 @@ package protox
 
 import (
        "encoding/base64"
-       "fmt"
 
+       "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
        "github.com/golang/protobuf/proto"
 )
 
@@ -44,7 +44,7 @@ func EncodeBase64(msg proto.Message) (string, error) {
 func DecodeBase64(data string, ret proto.Message) error {
        decoded, err := base64.StdEncoding.DecodeString(data)
        if err != nil {
-               return fmt.Errorf("base64 decoding failed: %v", err)
+               return errors.Wrap(err, "base64 decoding failed")
        }
        return proto.Unmarshal(decoded, ret)
 }
diff --git a/sdks/go/pkg/beam/core/util/reflectx/call.go 
b/sdks/go/pkg/beam/core/util/reflectx/call.go
index f69ffda..44f0602 100644
--- a/sdks/go/pkg/beam/core/util/reflectx/call.go
+++ b/sdks/go/pkg/beam/core/util/reflectx/call.go
@@ -19,7 +19,7 @@ import (
        "reflect"
        "sync"
 
-       "fmt"
+       "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
        "runtime/debug"
 )
 
@@ -90,7 +90,7 @@ func (c *reflectFunc) Call(args []interface{}) []interface{} {
 func CallNoPanic(fn Func, args []interface{}) (ret []interface{}, err error) {
        defer func() {
                if r := recover(); r != nil {
-                       err = fmt.Errorf("panic: %v %s", r, debug.Stack())
+                       err = errors.Errorf("panic: %v %s", r, debug.Stack())
                }
        }()
        return fn.Call(args), nil
diff --git a/sdks/go/pkg/beam/core/util/reflectx/json.go 
b/sdks/go/pkg/beam/core/util/reflectx/json.go
index 530b9f8..2ca6fe7 100644
--- a/sdks/go/pkg/beam/core/util/reflectx/json.go
+++ b/sdks/go/pkg/beam/core/util/reflectx/json.go
@@ -17,8 +17,9 @@ package reflectx
 
 import (
        "encoding/json"
-       "fmt"
        "reflect"
+
+       "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 )
 
 // UnmarshalJSON decodes a json string as then given type. It is suitable
@@ -26,7 +27,7 @@ import (
 func UnmarshalJSON(t reflect.Type, str string) (interface{}, error) {
        data := reflect.New(t).Interface()
        if err := json.Unmarshal([]byte(str), data); err != nil {
-               return nil, fmt.Errorf("failed to decode data: %v", err)
+               return nil, errors.Wrap(err, "failed to decode data")
        }
        return reflect.ValueOf(data).Elem().Interface(), nil
 }
diff --git a/sdks/go/pkg/beam/core/util/symtab/symtab.go 
b/sdks/go/pkg/beam/core/util/symtab/symtab.go
index 408af22..ded72a7 100644
--- a/sdks/go/pkg/beam/core/util/symtab/symtab.go
+++ b/sdks/go/pkg/beam/core/util/symtab/symtab.go
@@ -21,8 +21,9 @@ import (
        "debug/elf"
        "debug/macho"
        "debug/pe"
-       "fmt"
        "os"
+
+       "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 )
 
 // SymbolTable allows for mapping between symbols and their addresses.
@@ -48,7 +49,7 @@ func New(filename string) (*SymbolTable, error) {
                d, err := ef.DWARF()
                if err != nil {
                        f.Close()
-                       return nil, fmt.Errorf("No working DWARF: %v", err)
+                       return nil, errors.Wrap(err, "No working DWARF")
                }
                return &SymbolTable{d}, nil
        }
@@ -59,7 +60,7 @@ func New(filename string) (*SymbolTable, error) {
                d, err := mf.DWARF()
                if err != nil {
                        f.Close()
-                       return nil, fmt.Errorf("No working DWARF: %v", err)
+                       return nil, errors.Wrap(err, "No working DWARF")
                }
                return &SymbolTable{d}, nil
        }
@@ -70,14 +71,14 @@ func New(filename string) (*SymbolTable, error) {
                d, err := pf.DWARF()
                if err != nil {
                        f.Close()
-                       return nil, fmt.Errorf("No working DWARF: %v", err)
+                       return nil, errors.Wrap(err, "No working DWARF")
                }
                return &SymbolTable{d}, nil
        }
 
        // Give up, we don't recognize it
        f.Close()
-       return nil, fmt.Errorf("Unknown file format")
+       return nil, errors.New("Unknown file format")
 }
 
 // Addr2Sym returns the symbol name for the provided address.
@@ -100,7 +101,7 @@ func (s *SymbolTable) Addr2Sym(addr uintptr) (string, 
error) {
                        }
                }
        }
-       return "", fmt.Errorf("no symbol found at address %x", addr)
+       return "", errors.Errorf("no symbol found at address %x", addr)
 }
 
 // Sym2Addr returns the address of the provided symbol name.
@@ -124,5 +125,5 @@ func (s *SymbolTable) Sym2Addr(symbol string) (uintptr, 
error) {
                        }
                }
        }
-       return 0, fmt.Errorf("no symbol %q", symbol)
+       return 0, errors.Errorf("no symbol %q", symbol)
 }

Reply via email to