This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch go-sdk
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 0af2503d72c20dd88b9e63a557c63319068d70af
Author: Bill Neubauer <w...@google.com>
AuthorDate: Sun Jan 14 20:15:13 2018 -0800

    BEAM-3299: Add source reporting support.
    
    Makes the harness sufficiently multithreaded so we can handle progress
    reports while we are performing work. The plans maintaing their ability
    to be reused, but still can't be used concurrently.
---
 sdks/go/pkg/beam/core/runtime/exec/datasource.go | 25 ++++++
 sdks/go/pkg/beam/core/runtime/exec/plan.go       | 12 +++
 sdks/go/pkg/beam/core/runtime/harness/harness.go | 97 ++++++++++++++++++------
 3 files changed, 109 insertions(+), 25 deletions(-)

diff --git a/sdks/go/pkg/beam/core/runtime/exec/datasource.go 
b/sdks/go/pkg/beam/core/runtime/exec/datasource.go
index e29239b..6a87978 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/datasource.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/datasource.go
@@ -19,9 +19,12 @@ import (
        "context"
        "fmt"
        "io"
+       "sync/atomic"
+       "time"
 
        "github.com/apache/beam/sdks/go/pkg/beam/core/graph"
        "github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
+       "github.com/apache/beam/sdks/go/pkg/beam/log"
 )
 
 // DataSource is a Root execution unit.
@@ -32,6 +35,8 @@ type DataSource struct {
 
        sid    StreamID
        source DataReader
+       count  int64
+       start  time.Time
 }
 
 func (n *DataSource) ID() UnitID {
@@ -45,6 +50,8 @@ func (n *DataSource) Up(ctx context.Context) error {
 func (n *DataSource) StartBundle(ctx context.Context, id string, data 
DataManager) error {
        n.sid = StreamID{Port: *n.Edge.Port, Target: *n.Edge.Target, InstID: id}
        n.source = data
+       n.start = time.Now()
+       atomic.StoreInt64(&n.count, 0)
        return n.Out.StartBundle(ctx, id, data)
 }
 
@@ -93,6 +100,7 @@ func (n *DataSource) Process(ctx context.Context) error {
                                // Single chunk stream.
 
                                // log.Printf("Fixed size=%v", size)
+                               atomic.AddInt64(&n.count, int64(size))
 
                                for i := int32(0); i < size; i++ {
                                        value, err := cv.Decode(r)
@@ -116,6 +124,7 @@ func (n *DataSource) Process(ctx context.Context) error {
                                                break
                                        }
 
+                                       atomic.AddInt64(&n.count, int64(chunk))
                                        for i := uint64(0); i < chunk; i++ {
                                                value, err := cv.Decode(r)
                                                if err != nil {
@@ -139,6 +148,7 @@ func (n *DataSource) Process(ctx context.Context) error {
                ec := MakeElementDecoder(coder.SkipW(c))
 
                for {
+                       atomic.AddInt64(&n.count, 1)
                        t, err := DecodeWindowedValueHeader(c, r)
                        if err != nil {
                                if err == io.EOF {
@@ -163,6 +173,7 @@ func (n *DataSource) Process(ctx context.Context) error {
 }
 
 func (n *DataSource) FinishBundle(ctx context.Context) error {
+       log.Infof(context.Background(), "DataSource: %d elements in %d ns", 
n.count, time.Now().Sub(n.start))
        n.sid = StreamID{}
        n.source = nil
        return n.Out.FinishBundle(ctx)
@@ -178,3 +189,17 @@ func (n *DataSource) String() string {
        sid := StreamID{Port: *n.Edge.Port, Target: *n.Edge.Target}
        return fmt.Sprintf("DataSource[%v] Out:%v", sid, n.Out.ID())
 }
+
+// ProgressReportSnapshot captures the progress reading an input source.
+type ProgressReportSnapshot struct {
+       ID, Name string
+       Count    int64
+}
+
+// Progress returns a snapshot of the source's progress.
+func (n *DataSource) Progress() ProgressReportSnapshot {
+       if n == nil {
+               return ProgressReportSnapshot{}
+       }
+       return ProgressReportSnapshot{n.sid.Target.ID, n.sid.Target.Name, 
atomic.LoadInt64(&n.count)}
+}
diff --git a/sdks/go/pkg/beam/core/runtime/exec/plan.go 
b/sdks/go/pkg/beam/core/runtime/exec/plan.go
index d26fb1f..732f315 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/plan.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/plan.go
@@ -32,11 +32,14 @@ type Plan struct {
        units []Unit
 
        status Status
+       source *DataSource
 }
 
 // NewPlan returns a new bundle execution plan from the given units.
 func NewPlan(id string, units []Unit) (*Plan, error) {
        var roots []Root
+       var source *DataSource
+
        for _, u := range units {
                if u == nil {
                        return nil, fmt.Errorf("no <nil> units")
@@ -44,6 +47,9 @@ func NewPlan(id string, units []Unit) (*Plan, error) {
                if r, ok := u.(Root); ok {
                        roots = append(roots, r)
                }
+               if s, ok := u.(*DataSource); ok {
+                       source = s
+               }
        }
        if len(roots) == 0 {
                return nil, fmt.Errorf("no root units")
@@ -54,6 +60,7 @@ func NewPlan(id string, units []Unit) (*Plan, error) {
                status: Initializing,
                roots:  roots,
                units:  units,
+               source: source,
        }, nil
 }
 
@@ -137,3 +144,8 @@ func (p *Plan) String() string {
        }
        return fmt.Sprintf("Plan[%v]:\n%v", p.ID(), strings.Join(units, "\n"))
 }
+
+// ProgressReport returns a snapshot of input progress of the plan.
+func (p *Plan) ProgressReport() ProgressReportSnapshot {
+       return p.source.Progress()
+}
diff --git a/sdks/go/pkg/beam/core/runtime/harness/harness.go 
b/sdks/go/pkg/beam/core/runtime/harness/harness.go
index 4ba3fb6..72db1f1 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/harness.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/harness.go
@@ -70,6 +70,9 @@ func Main(ctx context.Context, loggingEndpoint, 
controlEndpoint string) error {
        respc := make(chan *fnpb.InstructionResponse, 100)
 
        wg.Add(1)
+
+       // gRPC requires all writers to a stream be the same goroutine, so this 
is the
+       // goroutine for managing responses back to the control service.
        go func() {
                defer wg.Done()
                for resp := range respc {
@@ -100,34 +103,39 @@ func Main(ctx context.Context, loggingEndpoint, 
controlEndpoint string) error {
                        return fmt.Errorf("recv failed: %v", err)
                }
 
-               log.Debugf(ctx, "RECV: %v", proto.MarshalTextString(req))
-               recordInstructionRequest(req)
+               // Launch a goroutine to handle the control message.
+               // TODO(wcn): maybe only do this for 'heavy' messages?
+               // TODO(wcn): implement a rate limiter for 'heavy' messages?
+               go func() {
+                       log.Debugf(ctx, "RECV: %v", 
proto.MarshalTextString(req))
+                       recordInstructionRequest(req)
 
-               if isEnabled("cpu_profiling") {
-                       cpuProfBuf.Reset()
-                       pprof.StartCPUProfile(&cpuProfBuf)
-               }
-               resp := ctrl.handleInstruction(ctx, req)
+                       if isEnabled("cpu_profiling") {
+                               cpuProfBuf.Reset()
+                               pprof.StartCPUProfile(&cpuProfBuf)
+                       }
+                       resp := ctrl.handleInstruction(ctx, req)
 
-               if isEnabled("cpu_profiling") {
-                       pprof.StopCPUProfile()
-                       if err := ioutil.WriteFile(fmt.Sprintf("%s/cpu_prof%s", 
storagePath, req.InstructionId), cpuProfBuf.Bytes(), 0644); err != nil {
-                               log.Warnf(ctx, "Failed to write CPU profile for 
instruction %s: %v", req.InstructionId, err)
+                       if isEnabled("cpu_profiling") {
+                               pprof.StopCPUProfile()
+                               if err := 
ioutil.WriteFile(fmt.Sprintf("%s/cpu_prof%s", storagePath, req.InstructionId), 
cpuProfBuf.Bytes(), 0644); err != nil {
+                                       log.Warnf(ctx, "Failed to write CPU 
profile for instruction %s: %v", req.InstructionId, err)
+                               }
                        }
-               }
 
-               recordInstructionResponse(resp)
-               if resp != nil {
-                       respc <- resp
-               }
+                       recordInstructionResponse(resp)
+                       if resp != nil {
+                               respc <- resp
+                       }
+               }()
        }
 }
 
 type control struct {
-       plans map[string]*exec.Plan
-       data  *DataManager
-
-       // TODO: running pipelines
+       plans  map[string]*exec.Plan // protected by mu
+       active map[string]*exec.Plan // protected by mu
+       mu     sync.Mutex
+       data   *DataManager
 }
 
 func (c *control) handleInstruction(ctx context.Context, req 
*fnpb.InstructionRequest) *fnpb.InstructionResponse {
@@ -140,7 +148,7 @@ func (c *control) handleInstruction(ctx context.Context, 
req *fnpb.InstructionRe
 
                for _, desc := range msg.GetProcessBundleDescriptor() {
                        var roots []string
-                       for id, _ := range desc.GetTransforms() {
+                       for id := range desc.GetTransforms() {
                                roots = append(roots, id)
                        }
                        p := &pb.Pipeline{
@@ -170,7 +178,9 @@ func (c *control) handleInstruction(ctx context.Context, 
req *fnpb.InstructionRe
                        }
                        log.Debugf(ctx, "Plan %v: %v", desc.GetId(), plan)
 
+                       c.mu.Lock()
                        c.plans[desc.GetId()] = plan
+                       c.mu.Unlock()
                }
 
                return &fnpb.InstructionResponse{
@@ -188,14 +198,27 @@ func (c *control) handleInstruction(ctx context.Context, 
req *fnpb.InstructionRe
                log.Debugf(ctx, "PB: %v", msg)
 
                ref := msg.GetProcessBundleDescriptorReference()
+               c.mu.Lock()
                plan, ok := c.plans[ref]
+               // Make the plan active, and remove it from candidates
+               // since a plan can't be run concurrently.
+               c.active[id] = plan
+               delete(c.plans, ref)
+               c.mu.Unlock()
+
                if !ok {
                        return fail(id, "execution plan for %v not found", ref)
                }
 
-               // TODO: Async execution. The below assumes serial bundle 
execution.
+               err := plan.Execute(ctx, id, c.data)
 
-               if err := plan.Execute(ctx, id, c.data); err != nil {
+               // Move the plan back to the candidate state
+               c.mu.Lock()
+               c.plans[plan.ID()] = plan
+               delete(c.active, id)
+               c.mu.Unlock()
+
+               if err != nil {
                        return fail(id, "execute failed: %v", err)
                }
 
@@ -209,12 +232,36 @@ func (c *control) handleInstruction(ctx context.Context, 
req *fnpb.InstructionRe
        case req.GetProcessBundleProgress() != nil:
                msg := req.GetProcessBundleProgress()
 
-               log.Debugf(ctx, "PB Progress: %v", msg)
+               // log.Debugf(ctx, "PB Progress: %v", msg)
+
+               ref := msg.GetInstructionReference()
+               c.mu.Lock()
+               plan, ok := c.active[ref]
+               c.mu.Unlock()
+               if !ok {
+                       return fail(id, "execution plan for %v not found", ref)
+               }
+
+               snapshot := plan.ProgressReport()
 
                return &fnpb.InstructionResponse{
                        InstructionId: id,
                        Response: 
&fnpb.InstructionResponse_ProcessBundleProgress{
-                               ProcessBundleProgress: 
&fnpb.ProcessBundleProgressResponse{},
+                               ProcessBundleProgress: 
&fnpb.ProcessBundleProgressResponse{
+                                       Metrics: &fnpb.Metrics{
+                                               Ptransforms: 
map[string]*fnpb.Metrics_PTransform{
+                                                       snapshot.ID: 
&fnpb.Metrics_PTransform{
+                                                               
ProcessedElements: &fnpb.Metrics_PTransform_ProcessedElements{
+                                                                       
Measured: &fnpb.Metrics_PTransform_Measured{
+                                                                               
OutputElementCounts: map[string]int64{
+                                                                               
        snapshot.Name: snapshot.Count,
+                                                                               
},
+                                                                       },
+                                                               },
+                                                       },
+                                               },
+                                       },
+                               },
                        },
                }
 

-- 
To stop receiving notification emails like this one, please contact
"commits@beam.apache.org" <commits@beam.apache.org>.

Reply via email to