This is an automated email from the ASF dual-hosted git repository. lcwik pushed a commit to branch go-sdk in repository https://gitbox.apache.org/repos/asf/beam.git
commit 0af2503d72c20dd88b9e63a557c63319068d70af Author: Bill Neubauer <w...@google.com> AuthorDate: Sun Jan 14 20:15:13 2018 -0800 BEAM-3299: Add source reporting support. Makes the harness sufficiently multithreaded so we can handle progress reports while we are performing work. The plans maintaing their ability to be reused, but still can't be used concurrently. --- sdks/go/pkg/beam/core/runtime/exec/datasource.go | 25 ++++++ sdks/go/pkg/beam/core/runtime/exec/plan.go | 12 +++ sdks/go/pkg/beam/core/runtime/harness/harness.go | 97 ++++++++++++++++++------ 3 files changed, 109 insertions(+), 25 deletions(-) diff --git a/sdks/go/pkg/beam/core/runtime/exec/datasource.go b/sdks/go/pkg/beam/core/runtime/exec/datasource.go index e29239b..6a87978 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/datasource.go +++ b/sdks/go/pkg/beam/core/runtime/exec/datasource.go @@ -19,9 +19,12 @@ import ( "context" "fmt" "io" + "sync/atomic" + "time" "github.com/apache/beam/sdks/go/pkg/beam/core/graph" "github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder" + "github.com/apache/beam/sdks/go/pkg/beam/log" ) // DataSource is a Root execution unit. @@ -32,6 +35,8 @@ type DataSource struct { sid StreamID source DataReader + count int64 + start time.Time } func (n *DataSource) ID() UnitID { @@ -45,6 +50,8 @@ func (n *DataSource) Up(ctx context.Context) error { func (n *DataSource) StartBundle(ctx context.Context, id string, data DataManager) error { n.sid = StreamID{Port: *n.Edge.Port, Target: *n.Edge.Target, InstID: id} n.source = data + n.start = time.Now() + atomic.StoreInt64(&n.count, 0) return n.Out.StartBundle(ctx, id, data) } @@ -93,6 +100,7 @@ func (n *DataSource) Process(ctx context.Context) error { // Single chunk stream. // log.Printf("Fixed size=%v", size) + atomic.AddInt64(&n.count, int64(size)) for i := int32(0); i < size; i++ { value, err := cv.Decode(r) @@ -116,6 +124,7 @@ func (n *DataSource) Process(ctx context.Context) error { break } + atomic.AddInt64(&n.count, int64(chunk)) for i := uint64(0); i < chunk; i++ { value, err := cv.Decode(r) if err != nil { @@ -139,6 +148,7 @@ func (n *DataSource) Process(ctx context.Context) error { ec := MakeElementDecoder(coder.SkipW(c)) for { + atomic.AddInt64(&n.count, 1) t, err := DecodeWindowedValueHeader(c, r) if err != nil { if err == io.EOF { @@ -163,6 +173,7 @@ func (n *DataSource) Process(ctx context.Context) error { } func (n *DataSource) FinishBundle(ctx context.Context) error { + log.Infof(context.Background(), "DataSource: %d elements in %d ns", n.count, time.Now().Sub(n.start)) n.sid = StreamID{} n.source = nil return n.Out.FinishBundle(ctx) @@ -178,3 +189,17 @@ func (n *DataSource) String() string { sid := StreamID{Port: *n.Edge.Port, Target: *n.Edge.Target} return fmt.Sprintf("DataSource[%v] Out:%v", sid, n.Out.ID()) } + +// ProgressReportSnapshot captures the progress reading an input source. +type ProgressReportSnapshot struct { + ID, Name string + Count int64 +} + +// Progress returns a snapshot of the source's progress. +func (n *DataSource) Progress() ProgressReportSnapshot { + if n == nil { + return ProgressReportSnapshot{} + } + return ProgressReportSnapshot{n.sid.Target.ID, n.sid.Target.Name, atomic.LoadInt64(&n.count)} +} diff --git a/sdks/go/pkg/beam/core/runtime/exec/plan.go b/sdks/go/pkg/beam/core/runtime/exec/plan.go index d26fb1f..732f315 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/plan.go +++ b/sdks/go/pkg/beam/core/runtime/exec/plan.go @@ -32,11 +32,14 @@ type Plan struct { units []Unit status Status + source *DataSource } // NewPlan returns a new bundle execution plan from the given units. func NewPlan(id string, units []Unit) (*Plan, error) { var roots []Root + var source *DataSource + for _, u := range units { if u == nil { return nil, fmt.Errorf("no <nil> units") @@ -44,6 +47,9 @@ func NewPlan(id string, units []Unit) (*Plan, error) { if r, ok := u.(Root); ok { roots = append(roots, r) } + if s, ok := u.(*DataSource); ok { + source = s + } } if len(roots) == 0 { return nil, fmt.Errorf("no root units") @@ -54,6 +60,7 @@ func NewPlan(id string, units []Unit) (*Plan, error) { status: Initializing, roots: roots, units: units, + source: source, }, nil } @@ -137,3 +144,8 @@ func (p *Plan) String() string { } return fmt.Sprintf("Plan[%v]:\n%v", p.ID(), strings.Join(units, "\n")) } + +// ProgressReport returns a snapshot of input progress of the plan. +func (p *Plan) ProgressReport() ProgressReportSnapshot { + return p.source.Progress() +} diff --git a/sdks/go/pkg/beam/core/runtime/harness/harness.go b/sdks/go/pkg/beam/core/runtime/harness/harness.go index 4ba3fb6..72db1f1 100644 --- a/sdks/go/pkg/beam/core/runtime/harness/harness.go +++ b/sdks/go/pkg/beam/core/runtime/harness/harness.go @@ -70,6 +70,9 @@ func Main(ctx context.Context, loggingEndpoint, controlEndpoint string) error { respc := make(chan *fnpb.InstructionResponse, 100) wg.Add(1) + + // gRPC requires all writers to a stream be the same goroutine, so this is the + // goroutine for managing responses back to the control service. go func() { defer wg.Done() for resp := range respc { @@ -100,34 +103,39 @@ func Main(ctx context.Context, loggingEndpoint, controlEndpoint string) error { return fmt.Errorf("recv failed: %v", err) } - log.Debugf(ctx, "RECV: %v", proto.MarshalTextString(req)) - recordInstructionRequest(req) + // Launch a goroutine to handle the control message. + // TODO(wcn): maybe only do this for 'heavy' messages? + // TODO(wcn): implement a rate limiter for 'heavy' messages? + go func() { + log.Debugf(ctx, "RECV: %v", proto.MarshalTextString(req)) + recordInstructionRequest(req) - if isEnabled("cpu_profiling") { - cpuProfBuf.Reset() - pprof.StartCPUProfile(&cpuProfBuf) - } - resp := ctrl.handleInstruction(ctx, req) + if isEnabled("cpu_profiling") { + cpuProfBuf.Reset() + pprof.StartCPUProfile(&cpuProfBuf) + } + resp := ctrl.handleInstruction(ctx, req) - if isEnabled("cpu_profiling") { - pprof.StopCPUProfile() - if err := ioutil.WriteFile(fmt.Sprintf("%s/cpu_prof%s", storagePath, req.InstructionId), cpuProfBuf.Bytes(), 0644); err != nil { - log.Warnf(ctx, "Failed to write CPU profile for instruction %s: %v", req.InstructionId, err) + if isEnabled("cpu_profiling") { + pprof.StopCPUProfile() + if err := ioutil.WriteFile(fmt.Sprintf("%s/cpu_prof%s", storagePath, req.InstructionId), cpuProfBuf.Bytes(), 0644); err != nil { + log.Warnf(ctx, "Failed to write CPU profile for instruction %s: %v", req.InstructionId, err) + } } - } - recordInstructionResponse(resp) - if resp != nil { - respc <- resp - } + recordInstructionResponse(resp) + if resp != nil { + respc <- resp + } + }() } } type control struct { - plans map[string]*exec.Plan - data *DataManager - - // TODO: running pipelines + plans map[string]*exec.Plan // protected by mu + active map[string]*exec.Plan // protected by mu + mu sync.Mutex + data *DataManager } func (c *control) handleInstruction(ctx context.Context, req *fnpb.InstructionRequest) *fnpb.InstructionResponse { @@ -140,7 +148,7 @@ func (c *control) handleInstruction(ctx context.Context, req *fnpb.InstructionRe for _, desc := range msg.GetProcessBundleDescriptor() { var roots []string - for id, _ := range desc.GetTransforms() { + for id := range desc.GetTransforms() { roots = append(roots, id) } p := &pb.Pipeline{ @@ -170,7 +178,9 @@ func (c *control) handleInstruction(ctx context.Context, req *fnpb.InstructionRe } log.Debugf(ctx, "Plan %v: %v", desc.GetId(), plan) + c.mu.Lock() c.plans[desc.GetId()] = plan + c.mu.Unlock() } return &fnpb.InstructionResponse{ @@ -188,14 +198,27 @@ func (c *control) handleInstruction(ctx context.Context, req *fnpb.InstructionRe log.Debugf(ctx, "PB: %v", msg) ref := msg.GetProcessBundleDescriptorReference() + c.mu.Lock() plan, ok := c.plans[ref] + // Make the plan active, and remove it from candidates + // since a plan can't be run concurrently. + c.active[id] = plan + delete(c.plans, ref) + c.mu.Unlock() + if !ok { return fail(id, "execution plan for %v not found", ref) } - // TODO: Async execution. The below assumes serial bundle execution. + err := plan.Execute(ctx, id, c.data) - if err := plan.Execute(ctx, id, c.data); err != nil { + // Move the plan back to the candidate state + c.mu.Lock() + c.plans[plan.ID()] = plan + delete(c.active, id) + c.mu.Unlock() + + if err != nil { return fail(id, "execute failed: %v", err) } @@ -209,12 +232,36 @@ func (c *control) handleInstruction(ctx context.Context, req *fnpb.InstructionRe case req.GetProcessBundleProgress() != nil: msg := req.GetProcessBundleProgress() - log.Debugf(ctx, "PB Progress: %v", msg) + // log.Debugf(ctx, "PB Progress: %v", msg) + + ref := msg.GetInstructionReference() + c.mu.Lock() + plan, ok := c.active[ref] + c.mu.Unlock() + if !ok { + return fail(id, "execution plan for %v not found", ref) + } + + snapshot := plan.ProgressReport() return &fnpb.InstructionResponse{ InstructionId: id, Response: &fnpb.InstructionResponse_ProcessBundleProgress{ - ProcessBundleProgress: &fnpb.ProcessBundleProgressResponse{}, + ProcessBundleProgress: &fnpb.ProcessBundleProgressResponse{ + Metrics: &fnpb.Metrics{ + Ptransforms: map[string]*fnpb.Metrics_PTransform{ + snapshot.ID: &fnpb.Metrics_PTransform{ + ProcessedElements: &fnpb.Metrics_PTransform_ProcessedElements{ + Measured: &fnpb.Metrics_PTransform_Measured{ + OutputElementCounts: map[string]int64{ + snapshot.Name: snapshot.Count, + }, + }, + }, + }, + }, + }, + }, }, } -- To stop receiving notification emails like this one, please contact "commits@beam.apache.org" <commits@beam.apache.org>.